diff --git a/examples/Actor/ActorClient/Program.cs b/examples/Actor/ActorClient/Program.cs index 5d7f06fb2..bae5d2ec2 100644 --- a/examples/Actor/ActorClient/Program.cs +++ b/examples/Actor/ActorClient/Program.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,7 +46,7 @@ public static async Task Main(string[] args) var proxy = ActorProxy.Create(actorId, "DemoActor"); Console.WriteLine("Making call using actor proxy to save data."); - await proxy.SaveData(data); + await proxy.SaveData(data, TimeSpan.FromMinutes(10)); Console.WriteLine("Making call using actor proxy to get data."); var receivedData = await proxy.GetData(); Console.WriteLine($"Received data is {receivedData}."); diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 62c100f79..da780d517 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -41,12 +41,12 @@ public DemoActor(ActorHost host, BankService bank) this.bank = bank; } - public async Task SaveData(MyData data) + public async Task SaveData(MyData data, TimeSpan ttl) { Console.WriteLine($"This is Actor id {this.Id} with data {data}."); // Set State using StateManager, state is saved after the method execution. - await this.StateManager.SetStateAsync(StateName, data); + await this.StateManager.SetStateAsync(StateName, data, ttl); } public Task GetData() @@ -109,7 +109,7 @@ public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSp // This method is invoked when an actor reminder is fired. var actorState = await this.StateManager.GetStateAsync(StateName); actorState.PropertyB = $"Reminder triggered at '{DateTime.Now:yyyy-MM-ddTHH:mm:ss}'"; - await this.StateManager.SetStateAsync(StateName, actorState); + await this.StateManager.SetStateAsync(StateName, actorState, ttl: TimeSpan.FromMinutes(5)); } class TimerParams @@ -173,7 +173,7 @@ public async Task TimerCallback(byte[] data) { var state = await this.StateManager.GetStateAsync(StateName); state.PropertyA = $"Timer triggered at '{DateTime.Now:yyyyy-MM-ddTHH:mm:s}'"; - await this.StateManager.SetStateAsync(StateName, state); + await this.StateManager.SetStateAsync(StateName, state, ttl: TimeSpan.FromMinutes(5)); var timerParams = JsonSerializer.Deserialize(data); Console.WriteLine("Timer parameter1: " + timerParams.IntParam); Console.WriteLine("Timer parameter2: " + timerParams.StringParam); diff --git a/examples/Actor/IDemoActor/IDemoActor.cs b/examples/Actor/IDemoActor/IDemoActor.cs index c2926a048..25ce09370 100644 --- a/examples/Actor/IDemoActor/IDemoActor.cs +++ b/examples/Actor/IDemoActor/IDemoActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,8 +27,9 @@ public interface IDemoActor : IActor /// Method to save data. /// /// DAta to save. + /// TTL of state key. /// A task that represents the asynchronous save operation. - Task SaveData(MyData data); + Task SaveData(MyData data, TimeSpan ttl); /// /// Method to get data. diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs new file mode 100644 index 000000000..22b3bf20e --- /dev/null +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -0,0 +1,50 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Communication +{ + using System; + + /// + /// Represents a response from fetching an actor state key. + /// + public class ActorStateResponse + { + /// + /// Initializes a new instance of the class. + /// + /// The response value. + /// The time to live expiration time. + public ActorStateResponse(T value, DateTimeOffset? ttlExpireTime) + { + this.Value = value; + this.TTLExpireTime = ttlExpireTime; + } + + /// + /// Gets the response value as a string. + /// + /// + /// The response value as a string. + /// + public T Value { get; } + + /// + /// Gets the time to live expiration time. + /// + /// + /// The time to live expiration time. + /// + public DateTimeOffset? TTLExpireTime { get; } + } +} diff --git a/src/Dapr.Actors/Constants.cs b/src/Dapr.Actors/Constants.cs index be2d8f49f..038caf101 100644 --- a/src/Dapr.Actors/Constants.cs +++ b/src/Dapr.Actors/Constants.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ internal static class Constants public const string RequestHeaderName = "X-DaprRequestHeader"; public const string ErrorResponseHeaderName = "X-DaprErrorResponseHeader"; public const string ReentrancyRequestHeaderName = "Dapr-Reentrancy-Id"; + public const string TTLResponseHeaderName = "Metadata.ttlExpireTime"; public const string Dapr = "dapr"; public const string Config = "config"; public const string State = "state"; diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index 4695375fb..2565bab62 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -57,7 +57,7 @@ public DaprHttpInteractor( this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout; } - public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public async Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateKeyRelativeUrlFormat, actorType, actorId, keyName); @@ -72,7 +72,18 @@ HttpRequestMessage RequestFunc() using var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken); var stringResponse = await response.Content.ReadAsStringAsync(); - return stringResponse; + + DateTimeOffset? ttlExpireTime = null; + if (response.Headers.TryGetValues(Constants.TTLResponseHeaderName, out IEnumerable headerValues)) + { + var ttlExpireTimeString = headerValues.First(); + if (!string.IsNullOrEmpty(ttlExpireTimeString)) + { + ttlExpireTime = DateTime.Parse(ttlExpireTimeString, CultureInfo.InvariantCulture); + } + } + + return new ActorStateResponse(stringResponse, ttlExpireTime); } public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index 8f30aa18f..5849328a8 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ internal interface IDaprInteractor /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); + Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); /// /// Invokes Actor method. diff --git a/src/Dapr.Actors/Runtime/ActorStateChange.cs b/src/Dapr.Actors/Runtime/ActorStateChange.cs index c09e48df6..34fa68fdf 100644 --- a/src/Dapr.Actors/Runtime/ActorStateChange.cs +++ b/src/Dapr.Actors/Runtime/ActorStateChange.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,7 +27,8 @@ public sealed class ActorStateChange /// The type of value associated with given actor state name. /// The value associated with given actor state name. /// The kind of state change for given actor state name. - public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind) + /// The time to live for the state. + public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -35,6 +36,7 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi this.Type = type; this.Value = value; this.ChangeKind = changeKind; + this.TTLExpireTime = ttlExpireTime; } /// @@ -68,5 +70,16 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi /// The kind of state change for given actor state name. /// public StateChangeKind ChangeKind { get; } + + /// + /// Gets the time to live for the state. + /// + /// + /// The time to live for the state. + /// + /// + /// If null, the state will not expire. + /// + public DateTimeOffset? TTLExpireTime { get; } } } diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index 9c752f56b..111bb80f4 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ using System.Threading; using System.Threading.Tasks; using Dapr.Actors.Resources; +using Dapr.Actors.Communication; namespace Dapr.Actors.Runtime { @@ -38,14 +39,24 @@ internal ActorStateManager(Actor actor) public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); - + if (!(await this.TryAddStateAsync(stateName, value, cancellationToken))) { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); } } - public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task AddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) + { + EnsureStateProviderInitialized(); + + if (!(await this.TryAddStateAsync(stateName, value, ttl, cancellationToken))) + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); + } + } + + public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -57,8 +68,8 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove or is expired in the cache + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) { stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update); return true; @@ -76,6 +87,37 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat return true; } + public async Task TryAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + + // Check if the property was marked as remove in the cache or has been expired. + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) + { + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttl: ttl); + return true; + } + + return false; + } + + if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) + { + return false; + } + + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttl: ttl); + return true; + } + public async Task GetStateAsync(string stateName, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); @@ -102,8 +144,8 @@ public async Task> TryGetStateAsync(string stateName, Can { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove in the cache or is expired + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) { return new ConditionalValue(false, default); } @@ -114,10 +156,11 @@ public async Task> TryGetStateAsync(string stateName, Can var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value, StateChangeKind.None)); + stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, ttlExpireTime: conditionalResult.Value.TTLExpireTime)); + return new ConditionalValue(true, conditionalResult.Value.Value); } - return conditionalResult; + return new ConditionalValue(false, default); } public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken) @@ -132,6 +175,7 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken { var stateMetadata = stateChangeTracker[stateName]; stateMetadata.Value = value; + stateMetadata.TTLExpireTime = null; if (stateMetadata.ChangeKind == StateChangeKind.None || stateMetadata.ChangeKind == StateChangeKind.Remove) @@ -149,6 +193,36 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken } } + public async Task SetStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + stateMetadata.Value = value; + stateMetadata.TTLExpireTime = DateTimeOffset.UtcNow.Add(ttl); + + if (stateMetadata.ChangeKind == StateChangeKind.None || + stateMetadata.ChangeKind == StateChangeKind.Remove) + { + stateMetadata.ChangeKind = StateChangeKind.Update; + } + } + else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) + { + stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttl: ttl)); + } + else + { + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttl: ttl); + } + } + public async Task RemoveStateAsync(string stateName, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); @@ -171,6 +245,12 @@ public async Task TryRemoveStateAsync(string stateName, CancellationToken { var stateMetadata = stateChangeTracker[stateName]; + if (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow) + { + stateChangeTracker.Remove(stateName); + return false; + } + switch (stateMetadata.ChangeKind) { case StateChangeKind.Remove: @@ -235,6 +315,24 @@ public async Task GetOrAddStateAsync(string stateName, T value, Cancellati return value; } + public async Task GetOrAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) + { + EnsureStateProviderInitialized(); + + var condRes = await this.TryGetStateAsync(stateName, cancellationToken); + + if (condRes.HasValue) + { + return condRes.Value; + } + + var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add; + + var stateChangeTracker = GetContextualStateTracker(); + stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttl: ttl); + return value; + } + public async Task AddOrUpdateStateAsync( string stateName, T addValue, @@ -272,7 +370,7 @@ public async Task AddOrUpdateStateAsync( var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value); + var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update)); return newValue; @@ -282,6 +380,54 @@ public async Task AddOrUpdateStateAsync( return addValue; } + public async Task AddOrUpdateStateAsync( + string stateName, + T addValue, + Func updateValueFactory, + TimeSpan ttl, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + + // Check if the property was marked as remove in the cache + if (stateMetadata.ChangeKind == StateChangeKind.Remove) + { + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttl: ttl); + return addValue; + } + + var newValue = updateValueFactory.Invoke(stateName, (T)stateMetadata.Value); + stateMetadata.Value = newValue; + + if (stateMetadata.ChangeKind == StateChangeKind.None) + { + stateMetadata.ChangeKind = StateChangeKind.Update; + } + + return newValue; + } + + var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); + if (conditionalResult.HasValue) + { + var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); + stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttl: ttl)); + + return newValue; + } + + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttl: ttl); + return addValue; + } + public Task ClearCacheAsync(CancellationToken cancellationToken) { EnsureStateProviderInitialized(); @@ -310,7 +456,7 @@ public async Task SaveStateAsync(CancellationToken cancellationToken = default) if (stateMetadata.ChangeKind != StateChangeKind.None) { stateChangeList.Add( - new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind)); + new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind, stateMetadata.TTLExpireTime)); if (stateMetadata.ChangeKind == StateChangeKind.Remove) { @@ -362,7 +508,7 @@ private bool IsStateMarkedForRemove(string stateName) return false; } - private Task> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) + private Task>> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); return this.actor.Host.StateProvider.TryLoadStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken); @@ -392,11 +538,20 @@ private Dictionary GetContextualStateTracker() private sealed class StateMetadata { - private StateMetadata(object value, Type type, StateChangeKind changeKind) + private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, TimeSpan? ttl = null) { this.Value = value; this.Type = type; this.ChangeKind = changeKind; + + if (ttlExpireTime.HasValue && ttl.HasValue) { + throw new ArgumentException("Cannot specify both TTLExpireTime and TTL"); + } + if (ttl.HasValue) { + this.TTLExpireTime = DateTimeOffset.UtcNow.Add(ttl.Value); + } else { + this.TTLExpireTime = ttlExpireTime; + } } public object Value { get; set; } @@ -405,11 +560,23 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind) public Type Type { get; } + public DateTimeOffset? TTLExpireTime { get; set; } + public static StateMetadata Create(T value, StateChangeKind changeKind) { return new StateMetadata(value, typeof(T), changeKind); } + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime) + { + return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime: ttlExpireTime); + } + + public static StateMetadata Create(T value, StateChangeKind changeKind, TimeSpan? ttl) + { + return new StateMetadata(value, typeof(T), changeKind, ttl: ttl); + } + public static StateMetadata CreateForRemove() { return new StateMetadata(null, typeof(object), StateChangeKind.Remove); diff --git a/src/Dapr.Actors/Runtime/ConditionalValue.cs b/src/Dapr.Actors/Runtime/ConditionalValue.cs index 1d2a197eb..ec4f3a5a6 100644 --- a/src/Dapr.Actors/Runtime/ConditionalValue.cs +++ b/src/Dapr.Actors/Runtime/ConditionalValue.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,4 +42,4 @@ public ConditionalValue(bool hasValue, TValue value) /// The value of the object. If HasValue is false, returns the default value for type of the TValue parameter. public TValue Value { get; } } -} \ No newline at end of file +} diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index ae86fb28b..e81308dbd 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ namespace Dapr.Actors.Runtime using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// State Provider to interact with Dapr runtime. @@ -43,27 +44,27 @@ public DaprStateProvider(IDaprInteractor daprInteractor, JsonSerializerOptions j this.daprInteractor = daprInteractor; } - public async Task> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) + public async Task>> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var result = new ConditionalValue(false, default); - var stringResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + var result = new ConditionalValue>(false, default); + var response = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - if (stringResult.Length != 0) + if (response.Value.Length != 0 && (!response.TTLExpireTime.HasValue || response.TTLExpireTime.Value > DateTimeOffset.UtcNow)) { T typedResult; // perform default json de-serialization if custom serializer was not provided. if (this.actorStateSerializer != null) { - var byteResult = Convert.FromBase64String(stringResult.Trim('"')); + var byteResult = Convert.FromBase64String(response.Value.Trim('"')); typedResult = this.actorStateSerializer.Deserialize(byteResult); } else { - typedResult = JsonSerializer.Deserialize(stringResult, jsonSerializerOptions); + typedResult = JsonSerializer.Deserialize(response.Value, jsonSerializerOptions); } - result = new ConditionalValue(true, typedResult); + result = new ConditionalValue>(true, new ActorStateResponse(typedResult, response.TTLExpireTime)); } return result; @@ -71,8 +72,8 @@ public async Task> TryLoadStateAsync(string actorType, st public async Task ContainsStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var byteResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - return byteResult.Length != 0; + var result = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + return (result.Value.Length != 0 && (!result.TTLExpireTime.HasValue || result.TTLExpireTime.Value > DateTimeOffset.UtcNow)); } public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) @@ -132,6 +133,15 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a writer.WritePropertyName("value"); JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); } + + if (stateChange.TTLExpireTime.HasValue) { + var ttl = (int)Math.Ceiling((stateChange.TTLExpireTime.Value - DateTimeOffset.UtcNow).TotalSeconds); + writer.WritePropertyName("metadata"); + writer.WriteStartObject(); + writer.WriteString("ttlInSeconds", ttl.ToString()); + writer.WriteEndObject(); + } + break; default: break; diff --git a/src/Dapr.Actors/Runtime/IActorStateManager.cs b/src/Dapr.Actors/Runtime/IActorStateManager.cs index df1eb3356..b85fa2a06 100644 --- a/src/Dapr.Actors/Runtime/IActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/IActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ namespace Dapr.Actors.Runtime using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// Represents an interface that exposes methods to manage state of an . @@ -45,6 +46,28 @@ public interface IActorStateManager /// Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// + /// Adds an actor state with given state name. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add. + /// Value of the actor state to add. + /// The token to monitor for cancellation requests. + /// The time to live for the state. + /// + /// A task that represents the asynchronous add operation. + /// + /// + /// An actor state with given state name already exists. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task AddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); + /// /// Gets an actor state with specified state name. /// @@ -85,6 +108,26 @@ public interface IActorStateManager /// Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// + /// Sets an actor state with given state name to specified value. + /// If an actor state with specified name does not exist, it is added. + /// + /// Type of value associated with given state name. + /// Name of the actor state to set. + /// Value of the actor state. + /// The time to live for the state. + /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous set operation. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task SetStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); + /// /// Removes an actor state with specified state name. /// @@ -121,6 +164,30 @@ public interface IActorStateManager /// Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// + /// Attempts to add an actor state with given state name and value. Returns false if an actor state with + /// the same name already exists. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add. + /// Value of the actor state to add. + /// The time to live for the state. + /// The token to monitor for cancellation requests. + /// This is optional and defaults to . + /// + /// A boolean task that represents the asynchronous add operation. Returns true if the + /// value was successfully added and false if an actor state with the same name already exists. + /// + /// The specified state name is null. + /// Provide a valid state name string. + /// The request was canceled using the specified + /// . + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task TryAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); + /// /// Attempts to get an actor state with specified state name. /// @@ -188,6 +255,29 @@ public interface IActorStateManager /// Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// + /// Gets an actor state with the given state name if it exists. If it does not + /// exist, creates and new state with the specified name and value. + /// + /// Type of value associated with given state name. + /// Name of the actor state to get or add. + /// Value of the actor state to add if it does not exist. + /// The time to live for the state. + /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous get or add operation. The value of TResult + /// parameter contains value of actor state with given state name. + /// + /// The specified state name is null. + /// Provide a valid state name string. + /// The request was canceled using the specified + /// . + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task GetOrAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); + /// /// Adds an actor state with given state name, if it does not already exist or updates /// the state with specified state name, if it exists. @@ -209,6 +299,28 @@ public interface IActorStateManager /// Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default); + /// + /// Adds an actor state with given state name, if it does not already exist or updates + /// the state with specified state name, if it exists. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add or update. + /// Value of the actor state to add if it does not exist. + /// Factory function to generate value of actor state to update if it exists. + /// The time to live for the state. + /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous add/update operation. The value of TResult + /// parameter contains value of actor state that was added/updated. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, TimeSpan ttl, CancellationToken cancellationToken = default); + /// /// Clears all the cached actor states and any operation(s) performed on /// since last state save operation. diff --git a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs index c6136c057..cff3b7c26 100644 --- a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs +++ b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto index 3faea3016..1e63b885d 100644 --- a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto +++ b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto @@ -77,7 +77,7 @@ message InvokeRequest { HTTPExtension http_extension = 4; } -// InvokeResponse is the response message inclduing data and its content type +// InvokeResponse is the response message including data and its content type // from app callback. // This message is used in InvokeService of Dapr gRPC Service and OnInvoke // of AppCallback gRPC service. diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index 883527adb..eafb5452e 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -169,6 +169,26 @@ service Dapr { // Raise an event to a running workflow instance rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} + // Starts a new instance of a workflow + rpc StartWorkflowBeta1 (StartWorkflowRequest) returns (StartWorkflowResponse) {} + + // Gets details about a started workflow instance + rpc GetWorkflowBeta1 (GetWorkflowRequest) returns (GetWorkflowResponse) {} + + // Purge Workflow + rpc PurgeWorkflowBeta1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Terminates a running workflow instance + rpc TerminateWorkflowBeta1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {} + + // Pauses a running workflow instance + rpc PauseWorkflowBeta1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {} + + // Resumes a paused workflow instance + rpc ResumeWorkflowBeta1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Raise an event to a running workflow instance + rpc RaiseEventWorkflowBeta1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} // Shutdown the sidecar rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {} } @@ -542,6 +562,9 @@ message GetActorStateRequest { // GetActorStateResponse is the response conveying the actor's state value. message GetActorStateResponse { bytes data = 1; + + // The metadata which will be sent to app. + map metadata = 2; } // ExecuteActorStateTransactionRequest is the message to execute multiple operations on a specified actor. @@ -580,10 +603,14 @@ message InvokeActorResponse { // GetMetadataResponse is a message that is returned on GetMetadata rpc call message GetMetadataResponse { string id = 1; - repeated ActiveActorsCount active_actors_count = 2; - repeated RegisteredComponents registered_components = 3; - map extended_metadata = 4; - repeated PubsubSubscription subscriptions = 5; + repeated ActiveActorsCount active_actors_count = 2 [json_name = "actors"]; + repeated RegisteredComponents registered_components = 3 [json_name = "components"]; + map extended_metadata = 4 [json_name = "extended"]; + repeated PubsubSubscription subscriptions = 5 [json_name = "subscriptions"]; + repeated MetadataHTTPEndpoint http_endpoints = 6 [json_name = "httpEndpoints"]; + AppConnectionProperties app_connection_properties = 7 [json_name = "appConnectionProperties"]; + string runtime_version = 8 [json_name = "runtimeVersion"]; + repeated string enabled_features = 9 [json_name = "enabledFeatures"]; } message ActiveActorsCount { @@ -598,12 +625,31 @@ message RegisteredComponents { repeated string capabilities = 4; } +message MetadataHTTPEndpoint { + string name = 1 [json_name = "name"]; +} + +message AppConnectionProperties { + int32 port = 1; + string protocol = 2; + string channel_address = 3 [json_name = "channelAddress"]; + int32 max_concurrency = 4 [json_name = "maxConcurrency"]; + AppConnectionHealthProperties health = 5; +} + +message AppConnectionHealthProperties { + string health_check_path = 1 [json_name = "healthCheckPath"]; + string health_probe_interval = 2 [json_name = "healthProbeInterval"]; + string health_probe_timeout = 3 [json_name = "healthProbeTimeout"]; + int32 health_threshold = 4 [json_name = "healthThreshold"]; +} + message PubsubSubscription { - string pubsub_name = 1; - string topic = 2; - map metadata = 3; - PubsubSubscriptionRules rules = 4; - string dead_letter_topic = 5; + string pubsub_name = 1 [json_name = "pubsubname"]; + string topic = 2 [json_name = "topic"]; + map metadata = 3 [json_name = "metadata"]; + PubsubSubscriptionRules rules = 4 [json_name = "rules"]; + string dead_letter_topic = 5 [json_name = "deadLetterTopic"]; } message PubsubSubscriptionRules { @@ -900,7 +946,7 @@ message EncryptRequest { // Request details. Must be present in the first message only. EncryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // EncryptRequestOptions contains options for the first message in the EncryptAlpha1 request. @@ -928,7 +974,7 @@ message EncryptRequestOptions { // EncryptResponse is the response for EncryptAlpha1. message EncryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } // DecryptRequest is the request for DecryptAlpha1. @@ -936,7 +982,7 @@ message DecryptRequest { // Request details. Must be present in the first message only. DecryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // DecryptRequestOptions contains options for the first message in the DecryptAlpha1 request. @@ -952,10 +998,10 @@ message DecryptRequestOptions { // DecryptResponse is the response for DecryptAlpha1. message DecryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } -// GetWorkflowRequest is the request for GetWorkflowAlpha1. +// GetWorkflowRequest is the request for GetWorkflowBeta1. message GetWorkflowRequest { // ID of the workflow instance to query. string instance_id = 1 [json_name = "instanceID"]; @@ -963,7 +1009,7 @@ message GetWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// GetWorkflowResponse is the response for GetWorkflowAlpha1. +// GetWorkflowResponse is the response for GetWorkflowBeta1. message GetWorkflowResponse { // ID of the workflow instance. string instance_id = 1 [json_name = "instanceID"]; @@ -979,7 +1025,7 @@ message GetWorkflowResponse { map properties = 6; } -// StartWorkflowRequest is the request for StartWorkflowAlpha1. +// StartWorkflowRequest is the request for StartWorkflowBeta1. message StartWorkflowRequest { // The ID to assign to the started workflow instance. If empty, a random ID is generated. string instance_id = 1 [json_name = "instanceID"]; @@ -993,13 +1039,13 @@ message StartWorkflowRequest { bytes input = 5; } -// StartWorkflowResponse is the response for StartWorkflowAlpha1. +// StartWorkflowResponse is the response for StartWorkflowBeta1. message StartWorkflowResponse { // ID of the started workflow instance. string instance_id = 1 [json_name = "instanceID"]; } -// TerminateWorkflowRequest is the request for TerminateWorkflowAlpha1. +// TerminateWorkflowRequest is the request for TerminateWorkflowBeta1. message TerminateWorkflowRequest { // ID of the workflow instance to terminate. string instance_id = 1 [json_name = "instanceID"]; @@ -1007,7 +1053,7 @@ message TerminateWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// PauseWorkflowRequest is the request for PauseWorkflowAlpha1. +// PauseWorkflowRequest is the request for PauseWorkflowBeta1. message PauseWorkflowRequest { // ID of the workflow instance to pause. string instance_id = 1 [json_name = "instanceID"]; @@ -1015,7 +1061,7 @@ message PauseWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// ResumeWorkflowRequest is the request for ResumeWorkflowAlpha1. +// ResumeWorkflowRequest is the request for ResumeWorkflowBeta1. message ResumeWorkflowRequest { // ID of the workflow instance to resume. string instance_id = 1 [json_name = "instanceID"]; @@ -1023,7 +1069,7 @@ message ResumeWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowAlpha1. +// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowBeta1. message RaiseEventWorkflowRequest { // ID of the workflow instance to raise an event for. string instance_id = 1 [json_name = "instanceID"]; @@ -1035,10 +1081,10 @@ message RaiseEventWorkflowRequest { bytes event_data = 4; } -// PurgeWorkflowRequest is the request for PurgeWorkflowAlpha1. +// PurgeWorkflowRequest is the request for PurgeWorkflowBeta1. message PurgeWorkflowRequest { // ID of the workflow instance to purge. string instance_id = 1 [json_name = "instanceID"]; // Name of the workflow component. string workflow_component = 2 [json_name = "workflowComponent"]; -} \ No newline at end of file +} diff --git a/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs b/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs index 93f6ba92f..6bb3c827d 100644 --- a/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs +++ b/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.Actors.Test/ActorStateManagerTest.cs b/test/Dapr.Actors.Test/ActorStateManagerTest.cs new file mode 100644 index 000000000..a6517a6b4 --- /dev/null +++ b/test/Dapr.Actors.Test/ActorStateManagerTest.cs @@ -0,0 +1,199 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Test +{ + using System; + using System.Globalization; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Security; + using System.Security.Authentication; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using FluentAssertions; + using Xunit; + using Dapr.Actors.Communication; + using Dapr.Actors.Runtime; + using Moq; + + /// + /// Contains tests for ActorStateManager. + /// + public class ActorStateManagerTest + { + [Fact] + public async Task SetGet() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key1", "value3", token)); + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key2", "value4", token)); + + await mngr.SetStateAsync("key1", "value5", token); + await mngr.SetStateAsync("key2", "value6", token); + Assert.Equal("value5", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value6", await mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateWithTTL() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await Task.Delay(TimeSpan.FromSeconds(1.5)); + + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + + // Should be able to add state again after expiry and should not expire. + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + await Task.Delay(TimeSpan.FromSeconds(1.5)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateRemoveAddTTL() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await mngr.SetStateAsync("key1", "value1", token); + await mngr.SetStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + // TTL is removed so state should not expire. + await Task.Delay(TimeSpan.FromSeconds(1.5)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + // Adding TTL back should expire state. + await mngr.SetStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.SetStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + await Task.Delay(TimeSpan.FromSeconds(1.5)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateDaprdExpireTime() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + // Existing key which has an expiry time. + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value1\"", DateTime.UtcNow.AddSeconds(1)))); + + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key1", "value3", token)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + + // No longer return the value from the state provider. + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + // Key should be expired after 1 seconds. + await Task.Delay(TimeSpan.FromSeconds(1.5)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.RemoveStateAsync("key1", token)); + await mngr.AddStateAsync("key1", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value2", await mngr.GetStateAsync("key1", token)); + } + + [Fact] + public async Task RemoveState() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await Assert.ThrowsAsync(() => mngr.RemoveStateAsync("key1", token)); + + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await mngr.RemoveStateAsync("key1", token); + await mngr.RemoveStateAsync("key2", token); + + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + + // Should be able to add state again after removal. + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + } + } +} diff --git a/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs b/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs index 80dae342f..21c142267 100644 --- a/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs +++ b/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ namespace Dapr.Actors.Test { + using System; using System.Globalization; using System.Linq; using System.Net; @@ -23,6 +24,7 @@ namespace Dapr.Actors.Test using System.Threading.Tasks; using FluentAssertions; using Xunit; + using Dapr.Actors.Communication; /// /// Contains tests for DaprHttpInteractor. @@ -350,5 +352,58 @@ public async Task InvokeActorMethodOmitsReentrancyIdIfNotSet_ValidateHeaders() request.Dismiss(); Assert.False(request.Request.Headers.Contains(Constants.ReentrancyRequestHeaderName)); } + + [Fact] + public async Task GetState_TTLExpireTimeExists() + { + await using var client = TestClient.CreateForDaprHttpInterator(); + + var actorType = "ActorType_Test"; + var actorId = "ActorId_Test"; + var keyName = "StateKey_Test"; + + var request = await client.CaptureHttpRequestAsync(async httpInteractor => + { + return await httpInteractor.GetStateAsync(actorType, actorId, keyName); + }); + + var message = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent("test"), + Headers = + { + { "Metadata.ttlExpireTime", "2023-04-05T23:22:21Z" }, + }, + }; + + var actual = await request.CompleteAsync(message); + Assert.Equal("test", actual.Value); + var expTTL = new DateTimeOffset(2023, 04, 05, 23, 22, 21, 0, new GregorianCalendar(), new TimeSpan(0, 0, 0)); + Assert.Equal(expTTL, actual.TTLExpireTime); + } + + [Fact] + public async Task GetState_TTLExpireTimeNotExists() + { + await using var client = TestClient.CreateForDaprHttpInterator(); + + var actorType = "ActorType_Test"; + var actorId = "ActorId_Test"; + var keyName = "StateKey_Test"; + + var request = await client.CaptureHttpRequestAsync(async httpInteractor => + { + return await httpInteractor.GetStateAsync(actorType, actorId, keyName); + }); + + var message = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent("test"), + }; + + var actual = await request.CompleteAsync(message); + Assert.Equal("test", actual.Value); + Assert.False(actual.TTLExpireTime.HasValue); + } } } diff --git a/test/Dapr.Actors.Test/DaprStateProviderTest.cs b/test/Dapr.Actors.Test/DaprStateProviderTest.cs new file mode 100644 index 000000000..63be89e95 --- /dev/null +++ b/test/Dapr.Actors.Test/DaprStateProviderTest.cs @@ -0,0 +1,137 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Test +{ + using System; + using System.Globalization; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Security; + using System.Security.Authentication; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using FluentAssertions; + using Xunit; + using Dapr.Actors.Communication; + using Dapr.Actors.Runtime; + using Moq; + + /// + /// Contains tests for DaprStateProvider. + /// + public class DaprStateProviderTest + { + [Fact] + public async Task SaveStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + var stateChangeList = new List(); + stateChangeList.Add( + new ActorStateChange("key1", typeof(string), "value1", StateChangeKind.Add, DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(2)))); + stateChangeList.Add( + new ActorStateChange("key2", typeof(string), "value2", StateChangeKind.Add, null)); + + string content = null; + interactor + .Setup(d => d.SaveStateTransactionallyAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Callback((actorType, actorId, data, token) => content = data) + .Returns(Task.FromResult(true)); + + await provider.SaveStateAsync("actorType", "actorId", stateChangeList, token); + Assert.Equal( + "[{\"operation\":\"upsert\",\"request\":{\"key\":\"key1\",\"value\":\"value1\",\"metadata\":{\"ttlInSeconds\":\"2\"}}},{\"operation\":\"upsert\",\"request\":{\"key\":\"key2\",\"value\":\"value2\"}}]", + content + ); + } + + [Fact] + public async Task ContainsStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + Assert.False(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", null))); + Assert.True(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + var ttl = DateTime.UtcNow.AddSeconds(1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + Assert.True(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + ttl = DateTime.UtcNow.AddSeconds(-1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + Assert.False(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + } + + [Fact] + public async Task TryLoadStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + var resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.False(resp.HasValue); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", null))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.True(resp.HasValue); + Assert.Equal("value", resp.Value.Value); + Assert.False(resp.Value.TTLExpireTime.HasValue); + + var ttl = DateTime.UtcNow.AddSeconds(1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.True(resp.HasValue); + Assert.Equal("value", resp.Value.Value); + Assert.True(resp.Value.TTLExpireTime.HasValue); + Assert.Equal(ttl, resp.Value.TTLExpireTime.Value); + + ttl = DateTime.UtcNow.AddSeconds(-1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.False(resp.HasValue); + } + } +} diff --git a/test/Dapr.Actors.Test/Runtime/ActorTests.cs b/test/Dapr.Actors.Test/Runtime/ActorTests.cs index b18800f0e..f88b4e03f 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.Actors.Test/TestDaprInteractor.cs b/test/Dapr.Actors.Test/TestDaprInteractor.cs index 92cfa7096..11f88e684 100644 --- a/test/Dapr.Actors.Test/TestDaprInteractor.cs +++ b/test/Dapr.Actors.Test/TestDaprInteractor.cs @@ -67,10 +67,10 @@ public Task InvokeActorMethodWithoutRemotingAsync(string actorType, stri /// JSON data with state changes as per the Dapr spec for transaction state update. /// Cancels the operation. /// A task that represents the asynchronous operation. - public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, + public virtual async Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) { - throw new System.NotImplementedException(); + await _testDaprInteractor.SaveStateTransactionallyAsync(actorType, actorId, data); } /// @@ -81,9 +81,9 @@ public Task SaveStateTransactionallyAsync(string actorType, string actorId, stri /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - public Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public virtual async Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { - throw new System.NotImplementedException(); + return await _testDaprInteractor.GetStateAsync(actorType, actorId, keyName); } /// diff --git a/test/Dapr.Client.Test/StateApiTest.cs b/test/Dapr.Client.Test/StateApiTest.cs index 90c06e6b1..cfa664663 100644 --- a/test/Dapr.Client.Test/StateApiTest.cs +++ b/test/Dapr.Client.Test/StateApiTest.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs index 0bf57f64c..c0e3f86a2 100644 --- a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs +++ b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.Actors/State/IStateActor.cs b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs new file mode 100644 index 000000000..f19122102 --- /dev/null +++ b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs @@ -0,0 +1,26 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Threading.Tasks; +using Dapr.Actors; + +namespace Dapr.E2E.Test.Actors.State +{ + public interface IStateActor : IPingActor, IActor + { + Task GetState(string key); + + Task SetState(string key, string value, TimeSpan? ttl); + } +} diff --git a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs index 5f2d5db86..58776fe28 100644 --- a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs +++ b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -67,4 +67,4 @@ private async Task UpdateState(bool isEnter, int callNumber) } } } -} \ No newline at end of file +} diff --git a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs index b08e483c2..57536377d 100644 --- a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.App/Actors/StateActor.cs b/test/Dapr.E2E.Test.App/Actors/StateActor.cs new file mode 100644 index 000000000..71a952e0f --- /dev/null +++ b/test/Dapr.E2E.Test.App/Actors/StateActor.cs @@ -0,0 +1,47 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dapr.Actors.Runtime; + +namespace Dapr.E2E.Test.Actors.State +{ + public class StateActor : Actor, IStateActor + { + public StateActor(ActorHost host) + : base(host) + { + } + + public Task Ping() + { + return Task.CompletedTask; + } + + public Task GetState(string key) + { + return this.StateManager.GetStateAsync(key); + } + + public Task SetState(string key, string value, TimeSpan? ttl) + { + if (ttl.HasValue) + { + return this.StateManager.SetStateAsync(key, value, ttl: ttl.Value); + } + return this.StateManager.SetStateAsync(key, value); + } + } +} diff --git a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs index bbe6cf7ae..4c6589965 100644 --- a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.App/Startup.cs b/test/Dapr.E2E.Test.App/Startup.cs index 8207c5883..bd0de7b91 100644 --- a/test/Dapr.E2E.Test.App/Startup.cs +++ b/test/Dapr.E2E.Test.App/Startup.cs @@ -16,6 +16,7 @@ namespace Dapr.E2E.Test using Dapr.E2E.Test.Actors.Reentrancy; using Dapr.E2E.Test.Actors.Reminders; using Dapr.E2E.Test.Actors.Timers; + using Dapr.E2E.Test.Actors.State; using Dapr.E2E.Test.Actors.ExceptionTesting; using Dapr.E2E.Test.Actors.Serialization; using Dapr.E2E.Test.App.ErrorTesting; @@ -104,6 +105,7 @@ public void ConfigureServices(IServiceCollection services) options.Actors.RegisterActor(); options.Actors.RegisterActor(); options.Actors.RegisterActor(); + options.Actors.RegisterActor(); }); } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs new file mode 100644 index 000000000..184a40448 --- /dev/null +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -0,0 +1,123 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ +namespace Dapr.E2E.Test +{ + using System; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using Dapr.Actors; + using Dapr.E2E.Test.Actors.State; + using Xunit; + + public partial class E2ETests : IAsyncLifetime + { + [Fact] + public async Task ActorCanSaveStateWithTTL() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); + + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + await Task.Delay(TimeSpan.FromSeconds(2.5)); + + // Assert key no longer exists. + await Assert.ThrowsAsync(() => proxy.GetState("key")); + + // Can create key again + await proxy.SetState("key", "new-value", null); + resp = await proxy.GetState("key"); + Assert.Equal("new-value", resp); + } + + [Fact] + public async Task ActorStateTTLOverridesExisting() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // TLL 4 seconds + await proxy.SetState("key", "value", TimeSpan.FromSeconds(4)); + + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 2 seconds + await Task.Delay(TimeSpan.FromSeconds(2)); + resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 4 seconds + await proxy.SetState("key", "value", TimeSpan.FromSeconds(4)); + + // TLL 2 seconds + await Task.Delay(TimeSpan.FromSeconds(2)); + resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 0 seconds + await Task.Delay(TimeSpan.FromSeconds(2.5)); + + // Assert key no longer exists. + await Assert.ThrowsAsync(() => proxy.GetState("key")); + } + + [Fact] + public async Task ActorStateTTLRemoveTTL() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Can remove TTL and then add again + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); + await proxy.SetState("key", "value", null); + await Task.Delay(TimeSpan.FromSeconds(2)); + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(2.5)); + await Assert.ThrowsAsync(() => proxy.GetState("key")); + } + + [Fact] + public async Task ActorStateBetweenProxies() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var actorId = ActorId.CreateRandom(); + var proxy1 = this.ProxyFactory.CreateActorProxy(actorId, "StateActor"); + var proxy2 = this.ProxyFactory.CreateActorProxy(actorId, "StateActor"); + + await WaitForActorRuntimeAsync(proxy1, cts.Token); + + await proxy1.SetState("key", "value", TimeSpan.FromSeconds(2)); + var resp = await proxy1.GetState("key"); + Assert.Equal("value", resp); + resp = await proxy2.GetState("key"); + Assert.Equal("value", resp); + + await Task.Delay(TimeSpan.FromSeconds(2.5)); + await Assert.ThrowsAsync(() => proxy1.GetState("key")); + await Assert.ThrowsAsync(() => proxy2.GetState("key")); + } + } +} diff --git a/test/Dapr.E2E.Test/configuration/featureconfig.yaml b/test/Dapr.E2E.Test/configuration/featureconfig.yaml index 81ef1ecb1..4806c630f 100644 --- a/test/Dapr.E2E.Test/configuration/featureconfig.yaml +++ b/test/Dapr.E2E.Test/configuration/featureconfig.yaml @@ -12,3 +12,5 @@ spec: enabled: true - name: "proxy.grpc" enabled: true + - name: "ActorStateTTL" + enabled: true