Skip to content

Commit

Permalink
Merge pull request #42 from MikeAmputer/kravtsov/retryable-executor
Browse files Browse the repository at this point in the history
Retryable executor
  • Loading branch information
MikeAmputer authored Jun 6, 2024
2 parents 0066dca + 40929ea commit b61d299
Show file tree
Hide file tree
Showing 19 changed files with 205 additions and 151 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Raw SQL migrations and contexts for [ClickHouse](https://github.com/ClickHouse/C
- Parametrized queries (anonymous type parameters supported)
- Query cancellation by termination on ClickHouse side
- Transactions support (keeper is required)
- Retryable contexts via ClickHouseRetryHelpers class
- Retryable contexts
- Provides all the features of the ClickHouse.Client package
- Fully async contract
- **Testing toolkit:** seamlessly integrate unit testing into your ClickHouse.Facades components using the dedicated [ClickHouse.Facades.Testing](https://github.com/MikeAmputer/ClickHouse.Facades/tree/master/src/ClickHouse.Facades.Testing) package. Test ClickHouse contexts and facades effectively, mock facades or specific database requests, and monitor interactions with the ClickHouse database using the provided testing tools.
Expand Down Expand Up @@ -72,4 +72,4 @@ You can create as many contexts as you need with any number of facades. Facades
> ***Note:*** You can perform migrations on your ClickHouse database without the necessity of implementing contexts.
## Documentation
Documentation will be presented in [repository Wiki](https://github.com/MikeAmputer/ClickHouse.Facades/wiki) (WIP)
Documentation will be presented in [repository Wiki](https://github.com/MikeAmputer/ClickHouse.Facades/wiki) (WIP)
18 changes: 16 additions & 2 deletions examples/Example.Keeper/Context/ExampleContextFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Options;
using ClickHouse.Facades.Extensions;
using Microsoft.Extensions.Options;

namespace ClickHouse.Facades.Example;

Expand All @@ -13,13 +14,26 @@ public ExampleContextFactory(IOptions<ClickHouseConfig> config)
_connectionString = config.Value.ConnectionString;
}

protected override ClickHouseRetryPolicy DefaultRetryPolicy => new()
{
RetryCount = 2,
RetryDelayProvider = retryAttempt => TimeSpan.FromSeconds(1 << retryAttempt),
TransientExceptionPredicate = ex =>
{
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
HashSet<int> transientErrorCodes = [159, 173, 201, 202, 203, 204, 209, 210, 216, 236, 290, 364, 425, 473];
return transientErrorCodes.Contains(ex.TryGetErrorCode());
}
};

protected override void SetupContextOptions(ClickHouseContextOptionsBuilder<ExampleContext> optionsBuilder)
{
optionsBuilder
.WithConnectionString(_connectionString)
.ForceSessions()
.SetupTransactions(options => options
.AllowMultipleTransactions()
.AutoRollbackTransaction());
.AutoRollback());
}
}
53 changes: 26 additions & 27 deletions examples/Example.Keeper/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,41 @@
var contextFactory = serviceProvider.GetRequiredService<IClickHouseContextFactory<ExampleContext>>();

// retryable and transactional execution
await ClickHouseRetryHelpers.ExecuteAsync(
contextFactory,
async context =>
{
var facade = context.ExampleFacade;
await contextFactory.CreateRetryableExecutor()
.UpdateRetryPolicy(policy => policy.ExceptionHandler = ex => Console.WriteLine(ex.Message))
.ExecuteAsync(
async context =>
{
var facade = context.ExampleFacade;
await facade.Truncate();
var values = await facade.GetValues();
Console.WriteLine(
$"Values count before transaction: {values.Length}");
await facade.Truncate();
var values = await facade.GetValues();
Console.WriteLine(
$"Values count before transaction: {values.Length}");
await context.BeginTransactionAsync();
await context.BeginTransactionAsync();
await facade.InsertValue(42);
values = await facade.GetValues();
Console.WriteLine(
$"Values count inside transaction: {values.Length}");
await facade.InsertValue(42);
values = await facade.GetValues();
Console.WriteLine(
$"Values count inside transaction: {values.Length}");
await context.RollbackTransactionAsync();
await context.RollbackTransactionAsync();
values = await facade.GetValues();
Console.WriteLine(
$"Values count after rollback: {values.Length}");
values = await facade.GetValues();
Console.WriteLine(
$"Values count after rollback: {values.Length}");
await context.BeginTransactionAsync();
await context.BeginTransactionAsync();
await facade.InsertValue(42);
await facade.InsertValue(42);
await context.CommitTransactionAsync();
await context.CommitTransactionAsync();
values = await facade.GetValues();
Console.WriteLine(
$"Values count after commit: {values.Length}");
},
retryAttempt => TimeSpan.FromSeconds(1 << retryAttempt),
exceptionHandler: ex => Console.WriteLine(ex.Message));
values = await facade.GetValues();
Console.WriteLine(
$"Values count after commit: {values.Length}");
});


static IHostBuilder CreateHostBuilder(string[] args) =>
Expand Down
4 changes: 4 additions & 0 deletions src/ClickHouse.Facades/ClickHouse.Facades.csproj.DotSettings
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Ccommand/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Cconnection/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Ccontext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Cfactory/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Cretryable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=migrations_005Cinstructions/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=setup/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=sqlbuilders_005Ccreatedatabase/@EntryIndexedValue">True</s:Boolean>
Expand Down
117 changes: 0 additions & 117 deletions src/ClickHouse.Facades/Context/ClickHouseRetryHelpers.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,12 @@ public async Task<TContext> CreateContextAsync()
return context;
}

public IClickHouseRetryableExecutor<TContext> CreateRetryableExecutor()
{
return new ClickHouseRetryableExecutor<TContext>(this, DefaultRetryPolicy);
}

protected virtual ClickHouseRetryPolicy DefaultRetryPolicy => new();

protected abstract void SetupContextOptions(ClickHouseContextOptionsBuilder<TContext> optionsBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ public interface IClickHouseContextFactory<TContext>
where TContext : ClickHouseContext<TContext>
{
Task<TContext> CreateContextAsync();

IClickHouseRetryableExecutor<TContext> CreateRetryableExecutor();
}
14 changes: 14 additions & 0 deletions src/ClickHouse.Facades/Context/Retryable/ClickHouseRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using ClickHouse.Client;

namespace ClickHouse.Facades;

public class ClickHouseRetryPolicy
{
public Predicate<ClickHouseServerException> TransientExceptionPredicate { get; set; } = _ => true;

public Action<ClickHouseServerException>? ExceptionHandler { get; set; } = null;

public int RetryCount { get; set; } = 3;

public Func<int, TimeSpan> RetryDelayProvider { get; set; } = _ => TimeSpan.Zero;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using ClickHouse.Client;

namespace ClickHouse.Facades;

internal class ClickHouseRetryableExecutor<TContext> : IClickHouseRetryableExecutor<TContext>
where TContext : ClickHouseContext<TContext>
{
private readonly IClickHouseContextFactory<TContext> _contextFactory;

private ClickHouseRetryPolicy _retryPolicy;

public ClickHouseRetryableExecutor(
IClickHouseContextFactory<TContext> contextFactory,
ClickHouseRetryPolicy retryPolicy)
{
_contextFactory = contextFactory
?? throw new ArgumentNullException(nameof(contextFactory));

_retryPolicy = retryPolicy
?? throw new ArgumentNullException(nameof(retryPolicy));
}

public Task ExecuteAsync(Func<TContext, Task> action, CancellationToken cancellationToken = default)
{
return ExecuteAsync(async context =>
{
await action(context);
return 0;
},
cancellationToken);
}

public async Task<TResult> ExecuteAsync<TResult>(
Func<TContext, Task<TResult>> action,
CancellationToken cancellationToken = default)
{
var attemptNumber = 1;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
await using var context = await _contextFactory.CreateContextAsync();

return await action(context);
}
catch (ClickHouseServerException e)
{
if (attemptNumber >= _retryPolicy.RetryCount + 1)
{
throw;
}

if (_retryPolicy.TransientExceptionPredicate != null && !_retryPolicy.TransientExceptionPredicate(e))
{
throw;
}

_retryPolicy.ExceptionHandler?.Invoke(e);
}

await Task.Delay(_retryPolicy.RetryDelayProvider(attemptNumber), cancellationToken);
attemptNumber++;
}
}

public IClickHouseReadonlyRetryableExecutor<TContext> SetRetryPolicy(ClickHouseRetryPolicy retryPolicy)
{
_retryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));

return this;
}

public IClickHouseReadonlyRetryableExecutor<TContext> UpdateRetryPolicy(Action<ClickHouseRetryPolicy> updateAction)
{
updateAction(_retryPolicy);

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace ClickHouse.Facades;

public interface IClickHouseReadonlyRetryableExecutor<out TContext>
where TContext : ClickHouseContext<TContext>
{
Task ExecuteAsync(
Func<TContext, Task> action,
CancellationToken cancellationToken = default);

Task<TResult> ExecuteAsync<TResult>(
Func<TContext, Task<TResult>> action,
CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace ClickHouse.Facades;

public interface IClickHouseRetryableExecutor<out TContext> : IClickHouseReadonlyRetryableExecutor<TContext>
where TContext : ClickHouseContext<TContext>
{
IClickHouseReadonlyRetryableExecutor<TContext> SetRetryPolicy(ClickHouseRetryPolicy retryPolicy);

IClickHouseReadonlyRetryableExecutor<TContext> UpdateRetryPolicy(Action<ClickHouseRetryPolicy> updateAction);
}
Loading

0 comments on commit b61d299

Please sign in to comment.