While using message dispatching systems, we always suffer from message lost, duplication and disordering.
Since the application (using the KafkaProducer
) might crash/restart, we might consider using certain mechanism to achieve At most once
/At least once
, and Ordering
, -- such as locally persisting the messages until successful delivery, using embedded sequence number to de-duplicate, or responding data-source to acknowledgement the delivery result, etc. These are common topics, which are not quite specific to Kafka.
Here we'd focus on KafkaProducer
, together with the idempotence
feature. Let's see, in which cases problems might happen, how to avoid them, and what's the best practise,-- to achieve No Message Lost
, Exactly Once
and Ordering
.
- The producer gets a successful delivery response after sending the message, but the
partition leader
failed to sync it to otherreplicas
.
-
First, the
partition leader
doesn't sync-up the latest message to enoughin-sync replicas
before responding with theack
-
The
partition leader
just don't need to wait for otherreplica
s response- E.g, the producer is configured with
acks=1
- E.g, the producer is configured with
-
No available
in-sync replica
to wait for the response- E.g, all other replicas are not in-sync
-
-
Then, the
partition leader
crashes, and onein-sync replica
becomes newpartition leader
- The new
partition leader
has no acknowledgement with the latest messages. Later, while new messages arrive, it would use conflicting record offsets (same with those records which thepartition leader
knows only). Then, even if the previouspartition leader
comes up again, these records have no chance to be recovered (just internally overwritten to be consistent with other replicas).
- The new
-
Make sure the leader would wait for responses from all in-sync replicas before the response
- Configuration
acks=all
is a MUST for producer
- Configuration
-
Ensure enough
In-Sync partition replicas
-
Configuration
min.insync.replicas >= 2
is a MUST for brokers-
Take
min.insync.replicas = 2
for example, it means,-
At most
replication.factor - min.insync.replicas
replicas are out-of-sync, -- the producer would still be able to send messages, otherwise, it could fail with 'no enough replica' error, and keeps retrying. -
Occasionally no more than
min.insync.replicas
in-sync-replica failures. -- otherwise, messages might be missed. In this case, if just one in-sync replica crashes after sending back the ack to the producer, the message would not be lost; if two failed, it would! Since the new leader might be a replica which was not in-sync previously, and has no acknowledgement with these latest messages.
-
-
-
Please refer to Kafka Broker Configuration for more details.
-
Then, what would happen if replicas fail
-
Fails to send (
not enough in-sync replica failure
), -- while number ofin-sync replicas
could not meetmin.insync.replication
-
Lost messages (after sending messages), -- with no
in-sync replica
survived from multi-failures -
No message lost (while with all
in-sync replicas
acknowledged, and at least onein-sync replica
available)
-
-
- After brokers successfully persisted a message, it sent the
ack
to the producer. But for some abnormal reasons (such as network failure, etc), the producer might fail to receive theack
. Thelibrdkafka
's internal queue would retry, thus another (duplicated) message would be persisted by brokers.
- The
enable.idempotence
configuration would resolve such a problem. And this configuration is RECOMMENDED for bothKafkaSyncProducer
andKafkaAsyncProducer
, as long as it's possible.
-
Make sure these
ProducerRecord
s be with the same partition-
Explicitly assigned with the same
topic-partition
-
Use the same
key
for these records
-
-
The
librdkafka
uses internal partition queues, and once a message fails to be sent successfully(e.g, brokers are down), it would be put back on the queue and retries again whileretry.backoff.ms
expires. However, before that (retry with the failed message), the brokers might recover and the messages behind (if with configurationmax.in.flight > 1
) happened to be sent successfully. In this case (with configurationmax.in.flight > 1
andretries > 0
), disordering could happen, and the user would not even be aware of it. -
Furthermore, while the last retry still failed, delivery callback would eventually be triggered. The user has to determine what to do for that (might want to re-send the message, etc). But there might be a case, -- some later messages had already been saved successfully by the server, thus no way to revert the disordering.
Please refer to the document from librdkafka, Idempotent Producer for more details.
The librdkafka
maintains the original produce() ordering per-partition for all messages produced, using an internal per-partition 64-bit counter called the msgid
which starts at 1. This msgid
allows messages to be re-inserted in the partition message queue in the original order in the case of retries.
The Idempotent Producer functionality in the Kafka protocol also has a per-message sequence number
, which is a signed 32-bit wrapping counter that is reset each time the Producer's ID (PID)
or Epoch
changes.
The msgid
is used, (along with a base msgid
value stored at the time the PID/Epoch
was bumped), to calculate the Kafka protocol's message sequence number
.
-
Since the following configuration properties are adjusted automatically (if not modified by the user). Producer instantiation will fail if user-supplied configuration is incompatible.
-
acks = all
-
max.in.flight (i.e,
max.in.flight.requests.per.connection) = 5
-
retries = INT32_MAX
-
-
Exception thrown during
send
- For these errors which could be detected locally (and could not be recovered with retrying), an exception would be thrown. E.g, invalid message, as RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (conflicting with local configuration
message.max.bytes
).
- For these errors which could be detected locally (and could not be recovered with retrying), an exception would be thrown. E.g, invalid message, as RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (conflicting with local configuration
-
Permanent errors (respond from brokers)
-
Typical errors are:
-
Invalid message: RD_KAFKA_RESP_ERR_CORRUPT_MESSAGE, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS, RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT, RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE.
-
Topic/Partition not exist: ERR_UNKNOWN_TOPIC_OR_PART, -- automatic topic creation is disabled on the broker or the application is specifying a partition that does not exist.
-
Authorization failure: ERR_TOPIC_AUTHORIZATION_FAILED, ERR_CLUSTER_AUTHORIZATION_FAILED
-
-
Normally,
Permanent error
means careless design, or wrong configuration, which should be avoided from the very beginning. -
Unless with
enable.gapless.guarantee
(EXPERIMENTAL) configured, producer would keep going with the following messages; otherwise, it would purge all messages in-flight/in-queue (with RD_KAFKA_RESP_ERR__PURGE_INFLIGHT/RD_KAFKA_RESP_ERR__PURGE_QUEUE).
-
-
Temporary errors
- Apart from those
permanent errors
, most of the left are temporary errors, which will be retried (if retry count permits); and whilemessage.timeout
expired, message delivery callback would be triggered withRD_KAFKA_RESP_ERR__TIEMD_OUT
.
- Apart from those
-
Be careful with the
RD_KAFKA_RESP_ERR__TIEMD_OUT
failure-
There's some corner cases, such as a message that has been persisted by brokers but
KafkaProducer
failed to get the response. Ifmessage.timeout.ms
has not expired, the producer could retry and eventually get the response. Otherwise, (i.e,message.timeout.ms
expired before the producer receives the successfulack
), it would be considered as a delivery failure by the producer (while the brokers wouldn't). Users might re-transmit the message thus causing duplications. -
To avoid this tricky situation, a longer
message.timeout.ms
is RECOMMENDED, to make sure there's enough time for transmission retries / on-flight responses.
-
- The main impact comes from
max.in.flight=5
limitation. Currently,max.in.flight
meansmax.in.flight.per.connection
, -- that's 5 message batches (with size of ~1MB at the most) in flight (not get theack
response yet) at the most, towards per broker. Within low-latency networks, it would not be a problem; while in other cases, it might be! Good news is, there might be a plan (inlibrdkafka
) to improve thatper.connection
limit toper.partition
, thus boost the performance a lot.
-
Enable
enable.idempotence
configuration -
Use a long
message.timeout.ms
, which would letlibrdkafka
keep retrying, before triggering the delivery failure callback.
int ret = 0;
std::atomic<bool> running = true;
KafkaSyncProducer producer(
Properties({
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "60000"}
})
);
while (running) {
const auto& msg = topMsgOfUpstream();
try {
auto record = ProducerRecord(topic, msg.key, msg.value);
producer.send(record);
popMsgFromUpstream();
} catch (const KafkaException& e) {
std::cerr << "Failed to send message! Reason: " << e.what() << std::endl;
ret = e.error();
break;
}
}
producer.close();
return ret;
-
It's easy to use
KafkaSyncProducer
, since it sends messages one by one. -
The throughput performance would not be that good, since there's only 1 message (embedded in 1 message batch) on the flight.
std::atomic<bool> running = true;
KafkaAsyncProducer producer(
Properties({
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "86400000"} // as long as 1 day
})
);
while (running) {
auto msg = fetchMsgFromUpstream();
auto record = ProducerRecord(topic, msg.key, msg.value, msg.id);
producer.send(record,
// Ack callback
[&msg](const Producer::RecordMetadata& metadata, std::error_code ec) {
// the message could be identified by `metadata.recordId()`
auto recordId = metadata.recordId();
if (ec) {
std::cerr << "Cannot send out message with recordId: " << recordId << ", error:" << ec.message() << std::endl;
} else {
commitMsgToUpstream(recordId);
}
});
}
producer.close();
- With a long
message.timeout.ms
, we're not likely to catch an error with delivery callback, --it would retry for temporary errors anyway. But be aware with permanent errors, it might be caused by careless design.