Skip to content

Commit

Permalink
feat: IProducer, ProducerConfig, ProducerBuilder + default Serializers (
Browse files Browse the repository at this point in the history
#202)

* commit

* fix linter

* fix linter
  • Loading branch information
KirillKurdyukov authored Oct 21, 2024
1 parent 6982a40 commit d0d22de
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Ydb.Sdk.Services.Topic;

public class YdbTopicException : Exception
{
protected YdbTopicException(string message) : base(message)
{
}
}

public class YdbProducerException : YdbTopicException
{
public YdbProducerException(string message) : base(message)
{
}
}
8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Ydb.Sdk.Services.Topic;

public interface IProducer<TValue>
{
public Task<SendResult> SendAsync(TValue data);

public Task<SendResult> SendAsync(Message<TValue> message);
}
88 changes: 88 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Producer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// using System.Collections.Concurrent;
// using Microsoft.Extensions.Logging;
// using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic;

// using ProducerStream = Driver.BidirectionalStream<
// StreamWriteMessage.Types.FromClient,
// StreamWriteMessage.Types.FromServer
// >;

internal class Producer<TValue> : IProducer<TValue>
{
// private readonly Driver _driver;
// private readonly ILogger<Producer<TValue>> _logger;
// private readonly long _partitionId;
// private readonly string _sessionId;
// private readonly ISerializer<TValue> _serializer;
//
// private long _seqNum;
//
// private readonly ConcurrentQueue<StreamWriteMessage.Types.FromClient> _inFlightMessages;
// private volatile ProducerStream _stream;
//
// internal Producer(
// ProducerConfig producerConfig,
// StreamWriteMessage.Types.InitResponse initResponse,
// ProducerStream stream,
// ISerializer<TValue> serializer)
// {
// _driver = producerConfig.Driver;
// _stream = stream;
// _serializer = serializer;
// _logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
// _partitionId = initResponse.PartitionId;
// _sessionId = initResponse.SessionId;
// _seqNum = initResponse.LastSeqNo;
// _inFlightMessages = new ConcurrentQueue<StreamWriteMessage.Types.FromClient>();
// }

public Task<SendResult> SendAsync(TValue data)
{
throw new NotImplementedException();
}

public Task<SendResult> SendAsync(Message<TValue> message)
{
throw new NotImplementedException();
}
}

public class Message<TValue>
{
public Message(TValue data)
{
Data = data;
}

public DateTime Timestamp { get; set; }

public TValue Data { get; }

public List<Metadata> Metadata { get; } = new();
}

public record Metadata(string Key, byte[] Value);

public class SendResult
{
public SendResult(State status)
{
State = status;
}

public State State { get; }
}

public enum State
{
Written,
AlreadyWritten
}

internal enum ProducerState
{
Ready
// Broken
}
60 changes: 60 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using Ydb.Topic;
using Ydb.Topic.V1;

namespace Ydb.Sdk.Services.Topic;

public class ProducerBuilder<TValue>
{
private readonly ProducerConfig _config;

public ProducerBuilder(ProducerConfig config)
{
_config = config;
}

public ISerializer<TValue>? Serializer { get; set; }

public async Task<IProducer<TValue>> Build()
{
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
GrpcRequestSettings.DefaultInstance);

var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
{
initRequest.ProducerId = _config.ProducerId;
}

if (_config.MessageGroupId != null)
{
initRequest.MessageGroupId = _config.MessageGroupId;
}

await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest });
if (!await stream.MoveNextAsync())
{
throw new YdbProducerException("Write stream is closed by YDB server");
}

var receivedInitMessage = stream.Current;

Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess();

var initResponse = receivedInitMessage.InitResponse;

if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
{
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
}

throw new NotImplementedException();
// return new Producer<TValue>(
// _config, initResponse, stream,
// Serializer ?? (ISerializer<TValue>)(
// Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
// ? serializer
// : throw new YdbProducerException("The serializer is not set")
// )
// );
}
}
16 changes: 16 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Ydb.Sdk.Services.Topic;

public class ProducerConfig
{
public ProducerConfig(Driver driver, string topicPath)
{
Driver = driver;
TopicPath = topicPath;
}

public Driver Driver { get; }
public string TopicPath { get; }
public string? ProducerId { get; set; }
public string? MessageGroupId { get; set; }
public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw
}
85 changes: 85 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Serializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System.Text;

namespace Ydb.Sdk.Services.Topic;

public interface ISerializer<in TValue>
{
public byte[] Serialize(TValue data);
}

public static class Serializers
{
/// <summary>String (UTF8) serializer.</summary>
public static readonly ISerializer<string> Utf8 = new Utf8Serializer();

/// <summary>
/// System.Int64 (big endian, network byte order) serializer.
/// </summary>
public static readonly ISerializer<long> Int64 = new Int64Serializer();

/// <summary>
/// System.Int32 (big endian, network byte order) serializer.
/// </summary>
public static readonly ISerializer<int> Int32 = new Int32Serializer();

/// <summary>
/// System.Byte[] (nullable) serializer.</summary>
/// <remarks>Byte order is original order.</remarks>
public static readonly ISerializer<byte[]> ByteArray = new ByteArraySerializer();

internal static readonly Dictionary<System.Type, object> DefaultSerializers = new()
{
{ typeof(int), Int32 },
{ typeof(long), Int64 },
{ typeof(string), Utf8 },
{ typeof(byte[]), ByteArray }
};

private class Utf8Serializer : ISerializer<string>
{
public byte[] Serialize(string data)
{
return Encoding.UTF8.GetBytes(data);
}
}

private class Int64Serializer : ISerializer<long>
{
public byte[] Serialize(long data)
{
return new[]
{
(byte)(data >> 56),
(byte)(data >> 48),
(byte)(data >> 40),
(byte)(data >> 32),
(byte)(data >> 24),
(byte)(data >> 16),
(byte)(data >> 8),
(byte)data
};
}
}

private class Int32Serializer : ISerializer<int>
{
public byte[] Serialize(int data)
{
return new[]
{
(byte)(data >> 24),
(byte)(data >> 16),
(byte)(data >> 8),
(byte)data
};
}
}

private class ByteArraySerializer : ISerializer<byte[]>
{
public byte[] Serialize(byte[] data)
{
return data;
}
}
}

0 comments on commit d0d22de

Please sign in to comment.