Generally speaking, The Modern C++ based Kafka API
is quite similar with Kafka Java's API
We'd recommend users to cross-reference them, --especially the examples.
Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAutoCommitConsumer and KafkaManualCommitConsumer, --depending on whether users should call commit
manually.
-
Friendly for users, --would not care about when to commit the offsets for these received messages.
-
Internally, it would commit the offsets (for received records) within the next
poll
and the finalclose
. Note, each internalcommit
would "try its best", but "not guaranteed to succeed", -- it's supposed to be called periodically, thus occasional failure doesn't matter.
// Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
});
// Create a consumer instance.
kafka::KafkaAutoCommitConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
// Read messages from the topic.
std::cout << "% Reading messages from topic: " << topic << std::endl;
while (true) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (const auto& record: records) {
// In this example, quit on empty message
if (record.value().size() == 0) return 0;
if (!record.error()) {
std::cout << "% Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
} else {
std::cerr << record.toString() << std::endl;
}
}
}
-
ConsumerConfig::BOOTSTRAP_SERVERS
is mandatory forConsumerConfig
. -
subscribe
could take a topic list. And it's a blocking operation, -- would return after the rebalance event triggered callback was executed. -
poll
must be periodically called, and it would trigger kinds of callback handling internally. As in this example, just put it in a "while loop" would be OK. -
At the end, the user could
close
the consumer manually, or just leave it to the destructor (which wouldclose
anyway).
- Users must commit the offsets for received records manually.
// Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
});
// Create a consumer instance.
kafka::KafkaManualCommitConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
auto lastTimeCommitted = std::chrono::steady_clock::now();
// Read messages from the topic.
std::cout << "% Reading messages from topic: " << topic << std::endl;
bool allCommitted = true;
bool running = true;
while (running) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (const auto& record: records) {
// In this example, quit on empty message
if (record.value().size() == 0) {
running = false;
break;
}
if (!record.error()) {
std::cout << "% Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
allCommitted = false;
} else {
std::cerr << record.toString() << std::endl;
}
}
if (!allCommitted) {
auto now = std::chrono::steady_clock::now();
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
// Commit offsets for messages polled
std::cout << "% syncCommit offsets: " << kafka::Utility::getCurrentTime() << std::endl;
consumer.commitSync(); // or commitAsync()
lastTimeCommitted = now;
allCommitted = true;
}
}
}
-
The example is quite similar with the KafkaAutoCommitConsumer, with only 1 more line added for manual-commit.
-
commitSync
andcommitAsync
are both available for a KafkaManualConsumer. Normally, usecommitSync
to guarantee the commitment, or usecommitAsync
(withOffsetCommitCallback
) to get a better performance.
While we construct a KafkaManualCommitConsumer
with option KafkaClient::EventsPollingOption::AUTO
(default), an internal thread would be created for OffsetCommit
callbacks handling.
This might not be what you want, since then you have to use 2 different threads to process the messages and handle the OffsetCommit
responses.
Here we have another choice, -- using KafkaClient::EventsPollingOption::Manual
, thus the OffsetCommit
callbacks would be called within member function pollEvents()
.
KafkaManualCommitConsumer consumer(props, KafkaClient::EventsPollingOption::Manual);
consumer.subscribe({"topic1", "topic2"});
while (true) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (auto& record: records) {
// Process the message...
process(record);
// Here we commit the offset manually
consumer.commitSync(*record);
}
// Here we call the `OffsetCommit` callbacks
// Note, we can only do this while the consumer was constructed with `EventsPollingOption::Manual`.
consumer.pollEvents();
}
No exception would be thrown by KafkaProducer::poll()
.
Once an error occurs, the ErrorCode
would be embedded in the Consumer::ConsumerRecord
.
There're 2 cases,
-
Success
-
RD_KAFKA_RESP_ERR__NO_ERROR (0), -- got a message successfully
-
RD_KAFKA_RESP_ERR__PARTITION_EOF, -- reached the end of a partition (no message got)
-
-
Failure
-
What're the available configurations?
-
How to enhance the polling performance?
ConsumerConfig::QUEUED_MIN_MESSAGES
determines how frequently the consumer would send the FetchRequest towards brokers. The default configuration (i.e, 100000) might not be good enough for small (less than 1KB) messages, and suggest using a larger value (e.g, 1000000) for it. -
How many threads would be created by a KafkaConsumer?
Excluding the user's main thread,
KafkaAutoCommitConsumer
would start another (N + 2) threads in the background, whileKafkaManualConsumer
would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)-
Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server.
-
Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc.
-
KafkaManualConsumer has one more thread, which keeps polling the offset-commit callback event.
E.g, if a KafkaAutoCommitConsumer was created with property of
BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890
, it would take 6 threads in total (including the main thread). -
-
Which one of these threads will handle the callbacks?
There are 2 kinds of callbacks for a KafkaConsumer,
-
RebalanceCallback
will be triggered internally by the user's thread, -- within thepoll
function. -
OffsetCommitCallback
(only available forKafkaManualCommitConsumer
) will be triggered by a background thread, not by the user's thread.
-