Skip to content

Commit

Permalink
Change message adapter (#17)
Browse files Browse the repository at this point in the history
* feat: variables name rename and message adapter refactor to use protobuf

* package update and package consolidation
* change description sql type
* added security.md and pull_request_template
  • Loading branch information
martinhonovais authored Jul 16, 2021
1 parent 459705d commit 0dfabc5
Show file tree
Hide file tree
Showing 41 changed files with 266 additions and 214 deletions.
20 changes: 20 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Description

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

Fixes # (issue)

## How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

## Checklist

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have added tests to cover my changes
- [ ] I have made corresponding changes to the documentation

### Disclaimer

By sending us your contributions, you are agreeing that your contribution is made subject to the terms of our [Contributor Ownership Statement](https://github.com/Farfetch/.github/blob/master/COS.md)
7 changes: 7 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Security Policy

Vulnerabilities are one of our main concerns

## Reporting a Vulnerability

Please, report any security vulnerability found to opensource@farfetch.com
17 changes: 8 additions & 9 deletions samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KafkaFlow" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Admin" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Compressor" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Compressor.Gzip" Version="2.0.2" />
<PackageReference Include="KafkaFlow.LogHandler.Console" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Serializer" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Serializer.ProtobufNet" Version="2.0.2" />
<PackageReference Include="KafkaFlow.TypedHandler" Version="2.0.2" />
<PackageReference Include="KafkaFlow" Version="2.1.2" />
<PackageReference Include="KafkaFlow.Admin" Version="2.1.2" />
<PackageReference Include="KafkaFlow.Compressor" Version="2.1.2" />
<PackageReference Include="KafkaFlow.LogHandler.Console" Version="2.1.2" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="2.1.2" />
<PackageReference Include="KafkaFlow.Serializer" Version="2.1.2" />
<PackageReference Include="KafkaFlow.Serializer.ProtobufNet" Version="2.1.2" />
<PackageReference Include="KafkaFlow.TypedHandler" Version="2.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
</ItemGroup>

Expand Down
9 changes: 5 additions & 4 deletions samples/KafkaFlow.Retry.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ private static async Task Main()
.RetryDurable(
configure => configure
.Handle<NonBlockingException>()
.WithMessageType(typeof(TestMessage))
.WithEmbeddedRetryCluster(
cluster,
configure => configure
.WithRetryTopicName("test-topic-retry")
.WithRetryConsumerBufferSize(4)
.WithRetryConsumerWorkersCount(2)
.WithRetryConusmerStrategy(RetryConsumerStrategy.LatestConsumption)
.WithRetryConusmerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
Expand Down Expand Up @@ -104,7 +105,7 @@ private static async Task Main()
.ShouldPauseConsumer(false)
)
)
.Retry(
.RetrySimple(
(configure) => configure
.Handle<CustomException>()
.TryTimes(2)
Expand Down Expand Up @@ -219,10 +220,10 @@ await adminProducer.ProduceAsync(
if (int.TryParse(workersInput, out var workers))
{
await adminProducer.ProduceAsync(
new ChangeConsumerWorkerCount
new ChangeConsumerWorkersCount
{
ConsumerName = consumerName,
WorkerCount = workers
WorkersCount = workers
});
}

Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.Retry.API/KafkaFlow.Retry.API.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.Retry.MongoDb/KafkaFlow.Retry.MongoDb.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.8.1" />
<PackageReference Include="MongoDB.Driver" Version="2.12.4" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ BEGIN
[CreationDate] [datetime2](7) NOT NULL,
[LastExecution] [datetime2](7),
[ModifiedStatusDate] [datetime2](7),
[Description] [ntext] NULL,
[Description] [nvarchar](4096) NULL,
CONSTRAINT [FK_RetryQueues_RetryQueueItems_IdRetryQueue] FOREIGN KEY ([IdRetryQueue]) REFERENCES [dbo].[RetryQueues]([Id]) ON DELETE CASCADE,
CONSTRAINT [FK_RetryQueues_RetryQueueItems_IdDomainRetryQueue] FOREIGN KEY ([IdDomainRetryQueue]) REFERENCES [dbo].[RetryQueues]([IdDomain]),
CONSTRAINT [FK_QueueItemStatus_RetryQueueItems] FOREIGN KEY ([IdItemStatus]) REFERENCES [dbo].[QueueItemStatus]([Code]),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
Expand All @@ -21,6 +21,10 @@
<None Include="..\..\LICENSE.md" Link="LICENSE.md" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.8.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\KafkaFlow.Retry\KafkaFlow.Retry.csproj" />
</ItemGroup>
Expand Down
31 changes: 16 additions & 15 deletions src/KafkaFlow.Retry/ConfigurationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,10 @@
using KafkaFlow.Retry.Durable;
using KafkaFlow.Retry.Durable.Repository;
using KafkaFlow.Retry.Forever;
using KafkaFlow.Retry.Simple;

public static class ConfigurationBuilderExtensions
{
public static IConsumerMiddlewareConfigurationBuilder Retry(
this IConsumerMiddlewareConfigurationBuilder middlewareBuilder,
Action<RetryDefinitionBuilder> configure)
{
var retryDefinitionBuilder = new RetryDefinitionBuilder();

configure(retryDefinitionBuilder);

return middlewareBuilder.Add(
resolver => new RetryMiddleware(
resolver.Resolve<ILogHandler>(),
retryDefinitionBuilder.Build()
));
}

public static IConsumerMiddlewareConfigurationBuilder RetryDurable(
this IConsumerMiddlewareConfigurationBuilder middlewareBuilder,
Action<RetryDurableDefinitionBuilder> configure)
Expand Down Expand Up @@ -54,5 +40,20 @@ public static IConsumerMiddlewareConfigurationBuilder RetryForever(
retryForeverDefinitionBuilder.Build()
));
}

public static IConsumerMiddlewareConfigurationBuilder RetrySimple(
this IConsumerMiddlewareConfigurationBuilder middlewareBuilder,
Action<RetrySimpleDefinitionBuilder> configure)
{
var retryDefinitionBuilder = new RetrySimpleDefinitionBuilder();

configure(retryDefinitionBuilder);

return middlewareBuilder.Add(
resolver => new RetrySimpleMiddleware(
resolver.Resolve<ILogHandler>(),
retryDefinitionBuilder.Build()
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
internal static class ConfigurationBuilderExtensions
{
public static IConsumerMiddlewareConfigurationBuilder RetryConsumerStrategy(
this IConsumerMiddlewareConfigurationBuilder middlewareBuilder,
this IConsumerMiddlewareConfigurationBuilder middlewareBuilder,
RetryConsumerStrategy retryConsumerStrategy)
{
switch (retryConsumerStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ public class RetryDurableDefinitionBuilder
{
private readonly IDependencyConfigurator dependencyConfigurator;
private readonly List<Func<RetryContext, bool>> retryWhenExceptions = new List<Func<RetryContext, bool>>();
private RetryDurablePollingDefinition retryDurablePollingDefinition;
private Type messageType;
private RetryDurableEmbeddedClusterDefinitionBuilder retryDurableEmbeddedClusterDefinitionBuilder;
private IRetryDurablePollingDefinition retryDurablePollingDefinition;
private IRetryDurableQueueRepositoryProvider retryDurableRepositoryProvider;
private RetryDurableRetryPlanBeforeDefinition retryDurableRetryPlanBeforeDefinition;
private IRetryDurableRetryPlanBeforeDefinition retryDurableRetryPlanBeforeDefinition;

public RetryDurableDefinitionBuilder(IDependencyConfigurator dependencyConfigurator)
{
Expand Down Expand Up @@ -49,13 +51,18 @@ public RetryDurableDefinitionBuilder WithEmbeddedRetryCluster(
Action<RetryDurableEmbeddedClusterDefinitionBuilder> configure
)
{
var retryDurableEmbeddedClusterBuilder = new RetryDurableEmbeddedClusterDefinitionBuilder(cluster);
configure(retryDurableEmbeddedClusterBuilder);
retryDurableEmbeddedClusterBuilder.Build();
this.retryDurableEmbeddedClusterDefinitionBuilder = new RetryDurableEmbeddedClusterDefinitionBuilder(cluster);
configure(this.retryDurableEmbeddedClusterDefinitionBuilder);

return this;
}

public RetryDurableDefinitionBuilder WithMessageType(Type messageType)
{
this.messageType = messageType;
return this;
}

public RetryDurableDefinitionBuilder WithQueuePollingJobConfiguration(Action<RetryDurableQueuePollingJobDefinitionBuilder> configure)
{
var retryDurablePollingDefinitionBuilder = new RetryDurableQueuePollingJobDefinitionBuilder();
Expand All @@ -81,7 +88,7 @@ public RetryDurableDefinitionBuilder WithRetryPlanBeforeRetryDurable(Action<Retr
return this;
}

internal RetryDurableDefinition Build()
internal IRetryDurableDefinition Build()
{
var retryDurableDefinition =
new RetryDurableDefinition(
Expand All @@ -90,18 +97,20 @@ internal RetryDurableDefinition Build()
this.retryDurablePollingDefinition
);

this.retryDurableEmbeddedClusterDefinitionBuilder.WithMessageType(this.messageType);
this.retryDurableEmbeddedClusterDefinitionBuilder.Build();

this.dependencyConfigurator.AddSingleton<IRetryDurableDefinition>(retryDurableDefinition);
this.dependencyConfigurator.AddSingleton<IMessageHeadersAdapter>(new MessageHeadersAdapter());
this.dependencyConfigurator.AddSingleton<IGzipCompressor>(new GzipCompressor());
this.dependencyConfigurator.AddSingleton<IUtf8Encoder>(new Utf8Encoder());
this.dependencyConfigurator.AddSingleton<INewtonsoftJsonSerializer>(new NewtonsoftJsonSerializer());
this.dependencyConfigurator.AddSingleton<IProtobufNetSerializer>(new ProtobufNetSerializer());
this.dependencyConfigurator
.AddSingleton<IMessageAdapter>(
resolver =>
new MessageAdapter(
resolver.Resolve<IGzipCompressor>(),
resolver.Resolve<INewtonsoftJsonSerializer>(),
resolver.Resolve<IUtf8Encoder>()));
resolver.Resolve<IProtobufNetSerializer>()));

this.dependencyConfigurator
.AddSingleton<IRetryDurableQueueRepository>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class RetryDurableEmbeddedClusterDefinitionBuilder
private const int DefaultPartitionElection = 0;
private readonly IClusterConfigurationBuilder cluster;
private bool enabled;
private Type messageType;
private int retryConsumerBufferSize;
private int retryConsumerWorkersCount;
private RetryConsumerStrategy retryConusmerStrategy = RetryConsumerStrategy.GuaranteeOrderedConsumption;
Expand Down Expand Up @@ -77,17 +78,14 @@ internal void Build()
Guard.Argument(this.retryConsumerWorkersCount)
.NotZero("A buffer size great than zero should be defined")
.NotNegative(x => "A buffer size great than zero should be defined");
Guard.Argument(this.messageType).NotNull("A message type should be defined");

this.cluster
.AddProducer(
RetryDurableConstants.EmbeddedProducerName,
producer => producer
.DefaultTopic(this.retryTopicName)
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<NewtonsoftJsonSerializer>()
)
.WithAcks(Acks.All) // TODO: this settings should be reviewed
)
.AddConsumer(
Expand Down Expand Up @@ -118,12 +116,18 @@ internal void Build()
})
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<NewtonsoftJsonSerializer>() // I think we should use a better serializer for binary (key;mesage), pls check but I think protobuff is a better option
.AddSingleTypeSerializer<ProtobufNetSerializer>(this.messageType)
.RetryConsumerStrategy(this.retryConusmerStrategy)
.Add<RetryDurableConsumerValidationMiddleware>()
.AddTypedHandlers(this.retryTypeHandlers)
)
);
}

internal RetryDurableEmbeddedClusterDefinitionBuilder WithMessageType(Type messageType)
{
this.messageType = messageType;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public RetryDurableQueuePollingJobDefinitionBuilder WithId(string id)
return this;
}

internal RetryDurablePollingDefinition Build()
internal IRetryDurablePollingDefinition Build()
{
return new RetryDurablePollingDefinition(
this.enabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
{
internal interface IRetryDurableDefinition
{
RetryDurablePollingDefinition RetryDurablePollingDefinition { get; }
IRetryDurablePollingDefinition RetryDurablePollingDefinition { get; }

RetryDurableRetryPlanBeforeDefinition RetryDurableRetryPlanBeforeDefinition { get; }
IRetryDurableRetryPlanBeforeDefinition RetryDurableRetryPlanBeforeDefinition { get; }

bool ShouldRetry(RetryContext kafkaRetryContext);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KafkaFlow.Retry.Durable.Definitions
{
internal interface IRetryDurablePollingDefinition
{
string CronExpression { get; }

bool Enabled { get; }

int ExpirationIntervalFactor { get; }

int FetchSize { get; }

string Id { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace KafkaFlow.Retry.Durable.Definitions
{
using System;

internal interface IRetryDurableRetryPlanBeforeDefinition
{
int NumberOfRetries { get; }

bool PauseConsumer { get; }

Func<int, TimeSpan> TimeBetweenTriesPlan { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ internal class RetryDurableDefinition : IRetryDurableDefinition

public RetryDurableDefinition(
IReadOnlyCollection<Func<RetryContext, bool>> retryWhenExceptions,
RetryDurableRetryPlanBeforeDefinition retryDurableRetryPlanBeforeDefinition,
RetryDurablePollingDefinition retryDurablePollingDefinition)
IRetryDurableRetryPlanBeforeDefinition retryDurableRetryPlanBeforeDefinition,
IRetryDurablePollingDefinition retryDurablePollingDefinition)
{
Guard.Argument(retryWhenExceptions).NotNull("At least an exception should be defined");
Guard.Argument(retryWhenExceptions.Count).NotNegative(value => "At least an exception should be defined");
Expand All @@ -24,9 +24,9 @@ public RetryDurableDefinition(
this.RetryDurablePollingDefinition = retryDurablePollingDefinition;
}

public RetryDurablePollingDefinition RetryDurablePollingDefinition { get; }
public IRetryDurablePollingDefinition RetryDurablePollingDefinition { get; }

public RetryDurableRetryPlanBeforeDefinition RetryDurableRetryPlanBeforeDefinition { get; }
public IRetryDurableRetryPlanBeforeDefinition RetryDurableRetryPlanBeforeDefinition { get; }

public bool ShouldRetry(RetryContext kafkaRetryContext) =>
this.retryWhenExceptions.Any(rule => rule(kafkaRetryContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using Dawn;

internal class RetryDurablePollingDefinition
internal class RetryDurablePollingDefinition : IRetryDurablePollingDefinition
{
public RetryDurablePollingDefinition(
bool enabled,
Expand Down
Loading

0 comments on commit 0dfabc5

Please sign in to comment.