Skip to content

Commit

Permalink
Update kafkalfow packages to 2.0.2 (#9)
Browse files Browse the repository at this point in the history
* feat: update kafkalfow packages to 2.0.2

* feat: update kafkalfow packages to 2.0.2

* refactor: some variables refactor

Co-authored-by: rodrigobelo <35265964+rodrigobelo@users.noreply.github.com>
  • Loading branch information
martinhonovais and rodrigobelo authored Jul 2, 2021
1 parent 0abbcc9 commit 459705d
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 107 deletions.
6 changes: 3 additions & 3 deletions samples/KafkaFlow.Retry.Sample/Handler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ public Task Handle(IMessageContext context, TestMessage message)

Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2} | Topic: {3}",
context.Partition,
context.Offset,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text,
context.Topic);
context.ConsumerContext.Topic);

return Task.CompletedTask;
}
Expand Down
18 changes: 9 additions & 9 deletions samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KafkaFlow" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Admin" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Compressor" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Compressor.Gzip" Version="1.5.6" />
<PackageReference Include="KafkaFlow.LogHandler.Console" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Serializer" Version="1.5.6" />
<PackageReference Include="KafkaFlow.Serializer.ProtoBuf" Version="1.5.6" />
<PackageReference Include="KafkaFlow.TypedHandler" Version="1.5.6" />
<PackageReference Include="KafkaFlow" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Admin" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Compressor" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Compressor.Gzip" Version="2.0.2" />
<PackageReference Include="KafkaFlow.LogHandler.Console" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Serializer" Version="2.0.2" />
<PackageReference Include="KafkaFlow.Serializer.ProtobufNet" Version="2.0.2" />
<PackageReference Include="KafkaFlow.TypedHandler" Version="2.0.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
</ItemGroup>

Expand Down
18 changes: 7 additions & 11 deletions samples/KafkaFlow.Retry.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
{
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks;
using global::Microsoft.Extensions.DependencyInjection;
using KafkaFlow;
using KafkaFlow.Admin;
using KafkaFlow.Admin.Messages;
using KafkaFlow.Compressor;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Admin.Messages;
using KafkaFlow.Consumers;
using KafkaFlow.Producers;
using KafkaFlow.Retry;
using KafkaFlow.Retry.SqlServer;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.ProtoBuf;
using KafkaFlow.Retry.SqlServer;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;

internal static class Program
Expand Down Expand Up @@ -48,10 +45,10 @@ private static async Task Main()
producerName,
producer => producer
.DefaultTopic("test-topic")
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufMessageSerializer>()
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<ProtobufNetSerializer>()
)
.WithAcks(Acks.All)
)
Expand All @@ -65,8 +62,7 @@ private static async Task Main()
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<ProtobufMessageSerializer>()
.AddSerializer<ProtobufNetSerializer>()
.RetryDurable(
configure => configure
.Handle<NonBlockingException>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
using System;
using System.Linq;
using Dawn;
using KafkaFlow.Compressor;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.Retry.Durable;
using KafkaFlow.Retry.Durable.Polling;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.NewtonsoftJson;
using KafkaFlow.TypedHandler;

public class RetryDurableEmbeddedClusterDefinitionBuilder
Expand Down Expand Up @@ -86,10 +83,10 @@ internal void Build()
RetryDurableConstants.EmbeddedProducerName,
producer => producer
.DefaultTopic(this.retryTopicName)
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<NewtonsoftJsonMessageSerializer>()
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<NewtonsoftJsonSerializer>()
)
.WithAcks(Acks.All) // TODO: this settings should be reviewed
)
Expand Down Expand Up @@ -121,8 +118,7 @@ internal void Build()
})
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<NewtonsoftJsonMessageSerializer>() // I think we should use a better serializer for binary (key;mesage), pls check but I think protobuff is a better option
.AddSerializer<NewtonsoftJsonSerializer>() // I think we should use a better serializer for binary (key;mesage), pls check but I think protobuff is a better option
.RetryConsumerStrategy(this.retryConusmerStrategy)
.Add<RetryDurableConsumerValidationMiddleware>()
.AddTypedHandlers(this.retryTypeHandlers)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.Retry/Durable/Polling/QueuePollingJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Linq;
using System.Threading.Tasks;
using Dawn;
using KafkaFlow.Producers;
using KafkaFlow.Retry.Durable.Definitions;
using KafkaFlow.Retry.Durable.Encoders;
using KafkaFlow.Retry.Durable.Repository;
Expand Down Expand Up @@ -174,6 +173,7 @@ private IMessageHeaders GetMessageHeaders(
{
var messageHeaders = messageHeadersAdapter.AdaptToKafkaFlowMessageHeaders(item.Message.Headers);

//TODO: Should we have a naming pattern
messageHeaders.Add(RetryDurableConstants.AttemptsCount, utf8Encoder.Encode(item.AttemptsCount.ToString()));
messageHeaders.Add(RetryDurableConstants.QueueId, utf8Encoder.Encode(queueId.ToString()));
messageHeaders.Add(RetryDurableConstants.ItemId, utf8Encoder.Encode(item.Id.ToString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public MessageAdapter(

public byte[] AdaptFromKafkaFlowMessage(object message)
{
//TODO: Decorator Pattern
var messageSerialized = this.newtonsoftJsonSerializer.SerializeObject(message);
var messageEncoded = this.utf8Encoder.Encode(messageSerialized);
var messageEncoded = this.utf8Encoder.Encode(messageSerialized); //TODO: To be removed
var messageCompressed = this.gzipCompressor.Compress(messageEncoded);

return messageCompressed;
Expand All @@ -33,9 +34,9 @@ public byte[] AdaptFromKafkaFlowMessage(object message)
public object AdaptToKafkaFlowMessage(byte[] message, Type type)
{
var messageDecompressed = this.gzipCompressor.Decompress(message);
var messageDencoded = this.utf8Encoder.Decode(messageDecompressed);
var messageDencoded = this.utf8Encoder.Decode(messageDecompressed); //TODO: To be removed
var messageDeserialized = this.newtonsoftJsonSerializer.DeserializeObject(messageDencoded, type);

return messageDeserialized;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,22 @@ public async Task<AddIfQueueExistsResult> AddIfQueueExistsAsync(IMessageContext
RetryDurableConstants.MessageType,
$"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}"
);
//TODO: Should be passed by the client.

return await this.AddIfQueueExistsAsync(
context,
new SaveToQueueInput(
new RetryQueueItemMessage(
context.Topic,
context.PartitionKey,
context.ConsumerContext.Topic,
(byte[])context.Message.Key,
this.messageAdapter.AdaptFromKafkaFlowMessage(context.Message),
context.Partition.Value,
context.Offset.Value,
context.Consumer.MessageTimestamp,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
context.ConsumerContext.MessageTimestamp,
this.messageHeadersAdapter.AdaptFromKafkaFlowMessageHeaders(context.Headers)
),
this.retryDurablePollingDefinition.Id,
this.utf8Encoder.Decode(context.PartitionKey), // TODO: this worries me because this convertion can cause data loss.
this.utf8Encoder.Decode((byte[])context.Message.Key), // TODO: this worries me because this convertion can cause data loss.
RetryQueueStatus.Active,
RetryQueueItemStatus.Waiting,
SeverityLevel.Unknown,
Expand Down Expand Up @@ -179,16 +180,16 @@ public async Task<SaveToQueueResult> SaveToQueueAsync(IMessageContext context, s
return await this.SaveToQueueAsync(context,
new SaveToQueueInput(
new RetryQueueItemMessage(
context.Topic,
context.PartitionKey,
context.ConsumerContext.Topic,
(byte[])context.Message.Key,
this.messageAdapter.AdaptFromKafkaFlowMessage(context.Message),
context.Partition.Value,
context.Offset.Value,
context.Consumer.MessageTimestamp,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
context.ConsumerContext.MessageTimestamp,
this.messageHeadersAdapter.AdaptFromKafkaFlowMessageHeaders(context.Headers)
),
this.retryDurablePollingDefinition.Id,
this.utf8Encoder.Decode(context.PartitionKey), // TODO: this worries me because this convertion can cause data loss.
this.utf8Encoder.Decode((byte[])context.Message.Key), // TODO: this worries me because this convertion can cause data loss.
RetryQueueStatus.Active,
RetryQueueItemStatus.Waiting,
SeverityLevel.Unknown,
Expand Down
37 changes: 19 additions & 18 deletions src/KafkaFlow.Retry/Durable/RetryDurableMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public RetryDurableMiddleware(

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{

try
{
var resultAddIfQueueExistsAsync = await this
Expand All @@ -42,7 +43,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception)
{
context.Consumer.ShouldStoreOffset = false;
context.ConsumerContext.ShouldStoreOffset = false;
return;
}

Expand All @@ -60,17 +61,17 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
if (!this.controlWorkerId.HasValue)
{
this.controlWorkerId = context.WorkerId;
this.controlWorkerId = context.ConsumerContext.WorkerId;

context.Consumer.Pause();
context.ConsumerContext.Pause();

this.logHandler.Info(
"Consumer paused by retry process",
new
{
ConsumerGroup = context.GroupId,
ConsumerName = context.Consumer.Name,
Worker = context.WorkerId
ConsumerGroup = context.ConsumerContext.GroupId,
ConsumerName = context.ConsumerContext.ConsumerName,
Worker = context.ConsumerContext.WorkerId
});
}
}
Expand All @@ -83,8 +84,8 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
AttemptNumber = attemptNumber,
WaitMilliseconds = waitTime.TotalMilliseconds,
PartitionNumber = context.Partition,
Worker = context.WorkerId,
PartitionNumber = context.ConsumerContext.Partition,
Worker = context.ConsumerContext.WorkerId,
//Headers = context.HeadersAsJson(),
//Message = context.Message.ToJson(),
ExceptionType = exception.GetType().FullName,
Expand All @@ -98,14 +99,14 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
await policy
.ExecuteAsync(
_ => next(context),
context.Consumer.WorkerStopped
context.ConsumerContext.WorkerStopped
).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (context.Consumer.WorkerStopped.IsCancellationRequested)
if (context.ConsumerContext.WorkerStopped.IsCancellationRequested)
{
context.Consumer.ShouldStoreOffset = false;
context.ConsumerContext.ShouldStoreOffset = false;
}
}
catch (Exception exception)
Expand All @@ -120,7 +121,7 @@ await policy
if (resultSaveToQueue.Status != SaveToQueueResultStatus.Created
&& resultSaveToQueue.Status != SaveToQueueResultStatus.Added)
{
context.Consumer.ShouldStoreOffset = false;
context.ConsumerContext.ShouldStoreOffset = false;
}
}
else
Expand All @@ -130,23 +131,23 @@ await policy
}
finally
{
if (this.controlWorkerId == context.WorkerId)
if (this.controlWorkerId == context.ConsumerContext.WorkerId)
{
lock (this.syncPauseAndResume)
{
if (this.controlWorkerId == context.WorkerId)
if (this.controlWorkerId == context.ConsumerContext.WorkerId)
{
this.controlWorkerId = null;

context.Consumer.Resume();
context.ConsumerContext.Resume();

this.logHandler.Info(
"Consumer resumed by retry process",
new
{
ConsumerGroup = context.GroupId,
ConsumerName = context.Consumer.Name,
Worker = context.WorkerId
ConsumerGroup = context.ConsumerContext.GroupId,
ConsumerName = context.ConsumerContext.ConsumerName,
Worker = context.ConsumerContext.WorkerId
});
}
}
Expand Down
Loading

0 comments on commit 459705d

Please sign in to comment.