dotnet add package Superstream -v ${SUPERSTREAM_VERSION}
using Superstream;
To use Superstream
with kafka producer, first define the producer configurations:
var config = new ProducerConfig { BootstrapServers = brokerList };
var options = new ProducerBuildOptions(config);
Then create a new instance of kafka producer and use SuperstreamInitializer.Init
to initialize the producer with Superstream
options:
var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
.Build();
using var producer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>", kafkaProducer, options);
Finally, to produce messages to kafka, use ProduceAsync
or Produce
:
producer.ProduceAsync("<topic>", new() { Value = JsonSerializer.SerializeToUtf8Bytes("{\"test_key\":\"test_value\"}") });
To use Superstream
with kafka consumer, first define the consumer configurations:
var config = new ConsumerConfig
{
GroupId = "groupid",
BootstrapServers = brokerList,
EnableAutoCommit = false
};
var options = new ConsumerBuildOptions(config);
Then create a new instance of kafka consumer and use SuperstreamInitializer.Init
to initialize the consumer with Superstream
options:
var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
using var consumer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>",kafkaConsumer, options);
Finally, to consume messages from kafka, use Consume
:
var consumeResult = consumer.Consume();
Console.WriteLine($"Message : ${Encoding.UTF8.GetString(consumeResult.Message.Value)}");