Skip to content

Commit

Permalink
feat: Move Redis dependency management to background tasks (#915)
Browse files Browse the repository at this point in the history
* feat: Process Redis dependency setting operation in backgronud task

tests and refactor

chore: Linted code for plan-technology-for-your-school.sln solution

chore: remove stopwatch

chore: reawait task

chore: add options to program extensions

chore: amend how default options work

* chore: fire and forget

* fix: Update tests
  • Loading branch information
jimwashbrook authored Dec 16, 2024
1 parent b8beefd commit ab1db97
Show file tree
Hide file tree
Showing 17 changed files with 559 additions and 228 deletions.
29 changes: 29 additions & 0 deletions src/Dfe.PlanTech.Application/Background/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Threading.Channels;
using Dfe.PlanTech.Domain.Background;
using Microsoft.Extensions.Options;

namespace Dfe.PlanTech.Application.Background;

/// <inheritdoc cref="IBackgroundTaskQueue" />
public class BackgroundTaskQueue(IOptions<BackgroundTaskQueueOptions> options) : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(CreateChannelOptions(options.Value));

/// <inheritdoc cref="IBackgroundTaskQueue" />
public async Task QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem)
{
ArgumentNullException.ThrowIfNull(workItem);
await _queue.Writer.WriteAsync(workItem);
}

/// <inheritdoc cref="IBackgroundTaskQueue" />
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}

private static BoundedChannelOptions CreateChannelOptions(BackgroundTaskQueueOptions options) => new(options.MaxQueueSize) { FullMode = options.FullMode };
}
9 changes: 1 addition & 8 deletions src/Dfe.PlanTech.Application/Caching/Interfaces/ICmsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ public interface ICmsCache : IDistributedCache
/// Iterates through all items in a dependency array and removes them from the cache
/// Then removes the dependency array itself
/// </summary>
/// <param name="contentComponentId">id of component to invalidate dependencies of</param>
/// <param name="contentComponentId">Id of component to invalidate dependencies of</param>
/// <returns></returns>
Task InvalidateCacheAsync(string contentComponentId);

/// <summary>
/// Generates a redis key for a content component dependency
/// </summary>
/// <param name="contentComponentId"></param>
/// <returns></returns>
string GetDependencyKey(string contentComponentId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public interface IDistributedCache
/// <param name="key">The key of the set.</param>
/// <param name="databaseId">The optional database identifier.</param>
/// <returns>An array of set members.</returns>
Task<string[]> GetSetMembersAsync(string key, int databaseId = -1);
Task<IEnumerable<string>> GetSetMembersAsync(string key, int databaseId = -1);

/// <summary>
/// Removes an item from a set in the cache asynchronously.
Expand All @@ -96,5 +96,5 @@ public interface IDistributedCache
/// <param name="key">The key of the set.</param>
/// <param name="items">The items to remove.</param>
/// <param name="databaseId">The optional database identifier.</param>
Task SetRemoveItemsAsync(string key, string[] items, int databaseId = -1);
Task SetRemoveItemsAsync(string key, IEnumerable<string> items, int databaseId = -1);
}
13 changes: 13 additions & 0 deletions src/Dfe.PlanTech.Domain/Background/BackgroundTaskQueueOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Channels;

namespace Dfe.PlanTech.Domain.Background;

/// <summary>
/// Options for <see cref="IBackgroundTaskQueue"/>
/// </summary>
/// <param name="MaxQueueSize">Maximum number of tasks that can be enqueued before the queue is full. Defaults to 10.</param>
/// <param name="FullMode">What to do when the queue is full. Defaults to wait. See <see cref="BoundedChannelFullMode" /> for more details.</param>
public record BackgroundTaskQueueOptions(int MaxQueueSize, BoundedChannelFullMode FullMode)
{
public BackgroundTaskQueueOptions() : this(10, BoundedChannelFullMode.Wait) { }
}
24 changes: 24 additions & 0 deletions src/Dfe.PlanTech.Domain/Background/IBackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Dfe.PlanTech.Domain.Background;

/// <summary>
/// Queue for tasks to be ran in background
/// </summary>
public interface IBackgroundTaskQueue
{
/// <summary>
/// Add an async operation to the queue for background processing.
/// </summary>
/// <param name="workItem"></param>
/// <returns></returns>
Task QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);

/// <summary>
/// Removes an item from the queue
/// </summary>
/// <remarks>
/// Will wait until an item exists
/// </remarks>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
34 changes: 34 additions & 0 deletions src/Dfe.PlanTech.Infrastructure.Redis/IRedisDependencyManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using StackExchange.Redis;

namespace Dfe.PlanTech.Infrastructure.Redis;

/// <summary>
/// Manages Redis dependencies for <see cref="IContentComponent"> and their contents.
/// </summary>
public interface IRedisDependencyManager
{
/// <summary>
/// Find and set dependencies for a given <see cref="IContentComponent"/>
/// </summary>
/// <typeparam name="T">Type of the value to register as a dependency.</typeparam>
/// <param name="database">The database where dependencies are stored.</param>
/// <param name="key">Key for the parent of the dependencies.</param>
/// <param name="value">The <see cref="IContentComponent"/> parent of the dependencies </param>
/// <param name="cancellationToken"></param>
/// <returns>A task that represents the asynchronous operation.</returns>
Task RegisterDependenciesAsync<T>(IDatabase database, string key, T value, CancellationToken cancellationToken = default);

/// <summary>
/// Generates a key for the given content component ID.
/// </summary>
/// <param name="contentComponentId">The ID of the content component.</param>
/// <returns>The generated key string.</returns>
/// <example>
/// <code>
/// var contentComponentId = "example_id";
/// var dependencyKey = GetDependencyKey(contentComponentId);
/// Console.WriteLine(dependencyKey); // Output: "dependency:example_id"
/// </code>
/// </example>
string GetDependencyKey(string contentComponentId);
}
111 changes: 25 additions & 86 deletions src/Dfe.PlanTech.Infrastructure.Redis/RedisCache.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Dfe.PlanTech.Application.Caching.Interfaces;
using Dfe.PlanTech.Domain.Background;
using Dfe.PlanTech.Domain.Caching.Models;
using Dfe.PlanTech.Domain.Content.Models;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
Expand All @@ -16,8 +16,10 @@ public class RedisCache : ICmsCache
private readonly IRedisConnectionManager _connectionManager;
private readonly AsyncRetryPolicy _retryPolicyAsync;
private readonly ILogger<RedisCache> _logger;
private readonly IRedisDependencyManager _dependencyManager;
private readonly IBackgroundTaskQueue _backgroundTaskService;

public RedisCache(IRedisConnectionManager connectionManager, ILogger<RedisCache> logger)
public RedisCache(IRedisConnectionManager connectionManager, ILogger<RedisCache> logger, IRedisDependencyManager dependencyManager, IBackgroundTaskQueue backgroundTaskQueue)
{
_connectionManager = connectionManager;
_logger = logger;
Expand All @@ -30,6 +32,8 @@ public RedisCache(IRedisConnectionManager connectionManager, ILogger<RedisCache>
.OrInner<RedisException>();

_retryPolicyAsync = retryPolicyBuilder.WaitAndRetryAsync(3, _ => TimeSpan.FromMilliseconds(50));
_dependencyManager = dependencyManager;
_backgroundTaskService = backgroundTaskQueue;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -66,7 +70,7 @@ public async Task<string> SetAsync<T>(string key, T value, TimeSpan? expiry = nu
{
_logger.LogInformation("Setting cache item with key: {Key}", key);
var database = await _connectionManager.GetDatabaseAsync(databaseId);
await RegisterDependenciesAsync(database, key, value);
await _dependencyManager.RegisterDependenciesAsync(database, key, value);
return await SetAsync(database, key, value, expiry);
}

Expand Down Expand Up @@ -121,10 +125,10 @@ public async Task SetAddAsync(string key, string item, int databaseId = -1)
}

/// <inheritdoc/>
public async Task<string[]> GetSetMembersAsync(string key, int databaseId = -1)
public async Task<IEnumerable<string>> GetSetMembersAsync(string key, int databaseId = -1)
{
_logger.LogInformation("Getting set members for key: {Key}", key);
return await _retryPolicyAsync.ExecuteAsync(async () => (await (await _connectionManager.GetDatabaseAsync(databaseId)).SetMembersAsync(key)).Select(x => x.ToString()).ToArray());
return await _retryPolicyAsync.ExecuteAsync(async () => (await (await _connectionManager.GetDatabaseAsync(databaseId)).SetMembersAsync(key)).Select(x => x.ToString()));
}

/// <inheritdoc/>
Expand All @@ -135,11 +139,11 @@ public Task SetRemoveAsync(string key, string item, int databaseId = -1)
}

/// <inheritdoc/>
public async Task SetRemoveItemsAsync(string key, string[] items, int databaseId = -1)
public async Task SetRemoveItemsAsync(string key, IEnumerable<string> items, int databaseId = -1)
{
_logger.LogInformation("Removing multiple items from set with key: {Key}", key);
var database = await _connectionManager.GetDatabaseAsync(databaseId);
await _retryPolicyAsync.ExecuteAsync(() => database.SetRemoveAsync(key, items.Select(x => (RedisValue)x).ToArray()));
await _retryPolicyAsync.ExecuteAsync(() => database.SetRemoveAsync(key, [.. items.Select(x => (RedisValue)x)]));
}

/// <inheritdoc/>
Expand All @@ -148,7 +152,7 @@ private async Task<string> SetAsync<T>(IDatabase database, string key, T value,
var redisValue = value as string ?? value.Serialise();
_logger.LogInformation("Setting cache item with key: {Key} and value: {Value}", key, redisValue);
await _retryPolicyAsync.ExecuteAsync(() => database.StringSetAsync(key, GZipRedisValueCompressor.Compress(redisValue), expiry));
await RegisterDependenciesAsync(database, key, value);
await _dependencyManager.RegisterDependenciesAsync(database, key, value, default);
return key;
}

Expand Down Expand Up @@ -213,80 +217,6 @@ private static CacheResult<T> CreateCacheResult<T>(RedisValue redisResult)
return redisResult is T typed ? new CacheResult<T>(ExistedInCache: true, CacheValue: typed) : new CacheResult<T>(ExistedInCache: true, CacheValue: redisResult.Deserialise<T>());
}

/// <inheritdoc/>
public async Task RegisterDependenciesAsync<T>(IDatabase database, string key, T value)
{
var dependencies = await GetDependenciesAsync(value);
var batch = database.CreateBatch();

var tasks = dependencies.Select(dependency => batch.SetAddAsync(GetDependencyKey(dependency), key)).ToArray();
batch.Execute();

await Task.WhenAll(tasks);
}


/// <inheritdoc/>
public async Task InvalidateCacheAsync(string contentComponentId)
{
var key = GetDependencyKey(contentComponentId);
var dependencies = await GetSetMembersAsync(key);
foreach (var item in dependencies)
{
await RemoveAsync(item);
}

await SetRemoveItemsAsync(key, dependencies);
}

/// <inheritdoc/>
private async Task<IEnumerable<string>> GetDependenciesAsync<T>(T value)
{
var result = new List<string>();

if (value is IEnumerable<ContentComponent> enumerable)
{
foreach (var item in enumerable)
{
var nestedDependencies = await GetDependenciesAsync(item);
result.AddRange(nestedDependencies);
}
}
else if (value is ContentComponent contentComponent)
{
var nestedDependencies = await GetContentDependenciesAsync(contentComponent);
result.AddRange(nestedDependencies);
}

return result;
}

/// <summary>
/// Uses reflection to check for any ContentIds within the component and register the parent as a dependency
/// </summary>
/// <param name="value"></param>
private async Task<IEnumerable<string>> GetContentDependenciesAsync(ContentComponent value)
{
// RichText is a sub-component that doesn't have SystemDetails, exit for such types
if (value.Sys is null)
return [];

// add the item itself as a dependency
var results = new List<string> { value.Sys.Id };

var properties = value.GetType().GetProperties();
foreach (var property in properties)
{
if (typeof(ContentComponent).IsAssignableFrom(property.PropertyType) || typeof(IEnumerable<ContentComponent>).IsAssignableFrom(property.PropertyType))
{
var nestedDependencies = await GetDependenciesAsync(property.GetValue(value));
results.AddRange(nestedDependencies);
}
}

return results;
}

/// <summary>
/// Creates a new cache item and stores it in the cache.
/// </summary>
Expand Down Expand Up @@ -322,8 +252,17 @@ private async Task<IEnumerable<string>> GetContentDependenciesAsync(ContentCompo
return result;
}

public string GetDependencyKey(string contentComponentId)
{
return $"Dependency:{contentComponentId}";
}
/// <inheritdoc/>
public Task InvalidateCacheAsync(string contentComponentId)
=> _backgroundTaskService.QueueBackgroundWorkItemAsync(async (cancellationToken) =>
{
var key = _dependencyManager.GetDependencyKey(contentComponentId);
var dependencies = await GetSetMembersAsync(key);
foreach (var item in dependencies)
{
await RemoveAsync(item);
}

await SetRemoveItemsAsync(key, dependencies);
});
}
72 changes: 72 additions & 0 deletions src/Dfe.PlanTech.Infrastructure.Redis/RedisDependencyManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using Dfe.PlanTech.Domain.Background;
using Dfe.PlanTech.Domain.Content.Interfaces;
using StackExchange.Redis;

namespace Dfe.PlanTech.Infrastructure.Redis;

/// <inheritdoc cref="IRedisDependencyManager"/>
/// <param name="backgroundTaskQueue">To add dependency set operations to a queue for background processing</param>
public class RedisDependencyManager(IBackgroundTaskQueue backgroundTaskQueue) : IRedisDependencyManager
{
/// <inheritdoc cref="IRedisDependencyManager"/>
public Task RegisterDependenciesAsync<T>(IDatabase database, string key, T value, CancellationToken cancellationToken = default)
=> backgroundTaskQueue.QueueBackgroundWorkItemAsync((cancellationToken) => GetAndSetDependencies(database, key, value));

/// <inheritdoc cref="IRedisDependencyManager"/>
public string GetDependencyKey(string contentComponentId) => $"Dependency:{contentComponentId}";

/// <summary>
/// Retrieves dependencies for the given <see cref="IContentComponent"/> and registers them in the Redis cache.
/// </summary>
/// <typeparam name="T">Type of the value whose dependencies are to be retrieved.</typeparam>
/// <param name="database">The database where dependencies are stored.</param>
/// <param name="key">Key for the parent of the dependencies.</param>
/// <param name="value">The <see cref="IContentComponent"/> parent of the dependencies.</param>
private async Task GetAndSetDependencies<T>(IDatabase database, string key, T value)
{
var batch = database.CreateBatch();
var tasks = GetDependencies(value).Select(dependency => batch.SetAddAsync(GetDependencyKey(dependency), key, CommandFlags.FireAndForget)).ToArray();
batch.Execute();
await Task.WhenAll(tasks);
}

/// <summary>
/// Retrieves dependencies, in the form of the Id of the <see cref="IContentComponent"/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="value"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException">Thrown when the value object is not a <see cref="IContentComponent"/> or <see cref="IEnumerable{IContentComponent}"/> </exception>
private IEnumerable<string> GetDependencies<T>(T? value)
=> value switch
{
null => [],
IEnumerable<IContentComponent> collection => collection.SelectMany(GetDependencies),
IContentComponent item => GetContentDependenciesAsync(item),
_ => throw new InvalidOperationException($"{value!.GetType()} is not a {typeof(IContentComponent)} or a {typeof(IEnumerable<IContentComponent>)}"),
};

/// <summary>
/// Uses reflection to check for any ContentIds within the <see cref="IContentComponent">, and returns the Id value of any found
/// </summary>
/// <param name="value"></param>
private IEnumerable<string> GetContentDependenciesAsync(IContentComponent value)
{
// RichText is a sub-component that doesn't have SystemDetails, exit for such types
if (value.Sys is null)
yield break;

yield return value.Sys.Id;
var properties = value.GetType().GetProperties();
foreach (var property in properties)
{
if (typeof(IContentComponent).IsAssignableFrom(property.PropertyType) || typeof(IEnumerable<IContentComponent>).IsAssignableFrom(property.PropertyType))
{
foreach (var dependency in GetDependencies(property.GetValue(value)))
{
yield return dependency;
}
}
}
}
}
Loading

0 comments on commit ab1db97

Please sign in to comment.