Skip to content

Commit

Permalink
Fix consumer protocol parse error when using aborted transactions
Browse files Browse the repository at this point in the history
When control messages were skipped the remaining buffer was not consumed
which lead to the messageset parser reading the remaining bytes as
messages, which in best case resulted in parse errors.
  • Loading branch information
edenhill committed Aug 31, 2019
1 parent 35d0913 commit bd71c96
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -1070,31 +1070,42 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
rd_kafka_buf_check_len(rkbuf, payload_size);

if (msetr->msetr_aborted_txns == NULL &&
msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL) {
/* Since there are no aborted transactions, the MessageSet
* must correspond to a commit marker. These are ignored. */
msetr->msetr_v2_hdr->Attributes &
RD_KAFKA_MSGSET_V2_ATTR_CONTROL) {
/* Since there are no aborted transactions,
* the MessageSet must correspond to a commit marker.
* These are ignored. */
rd_kafka_buf_skip(rkbuf, payload_size);
rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
goto done;
}

if (msetr->msetr_aborted_txns != NULL &&
msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL &&
!(msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) {

int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID);
(msetr->msetr_v2_hdr->Attributes &
(RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL|
RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) ==
RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) {
/* Transactional non-control MessageSet:
* check if it is part of an aborted transaction. */
int64_t txn_start_offset =
rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns,
msetr->msetr_v2_hdr->PID);

if (txn_start_offset != -1 &&
msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) {
/* MessageSet is part of an aborted transaction */
msetr->msetr_v2_hdr->BaseOffset >=
txn_start_offset) {
/* MessageSet is part of aborted transaction */
rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
"%s [%"PRId32"]: "
"Skipping %"PRId32" message(s) "
"in aborted transaction",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->RecordCount);
rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
"%s [%"PRId32"]: "
"Skipping %"PRId32" message(s) "
"in aborted transaction",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->RecordCount);
rd_kafka_buf_skip(rkbuf, payload_size);
rd_slice_widen(&rkbuf->rkbuf_reader,
&save_slice);
goto done;
}
}
Expand Down

0 comments on commit bd71c96

Please sign in to comment.