From bd71c96b3d8eb7c5f7e85f94fe33f75302167b80 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Fri, 30 Aug 2019 18:08:08 +0200 Subject: [PATCH] Fix consumer protocol parse error when using aborted transactions When control messages were skipped the remaining buffer was not consumed which lead to the messageset parser reading the remaining bytes as messages, which in best case resulted in parse errors. --- src/rdkafka_msgset_reader.c | 45 +++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 6cab9000f9..ed40db8665 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -1070,31 +1070,42 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) { rd_kafka_buf_check_len(rkbuf, payload_size); if (msetr->msetr_aborted_txns == NULL && - msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL) { - /* Since there are no aborted transactions, the MessageSet - * must correspond to a commit marker. These are ignored. */ + msetr->msetr_v2_hdr->Attributes & + RD_KAFKA_MSGSET_V2_ATTR_CONTROL) { + /* Since there are no aborted transactions, + * the MessageSet must correspond to a commit marker. + * These are ignored. */ + rd_kafka_buf_skip(rkbuf, payload_size); rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); goto done; } if (msetr->msetr_aborted_txns != NULL && - msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL && - !(msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) { - - int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset( - msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID); + (msetr->msetr_v2_hdr->Attributes & + (RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL| + RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) == + RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) { + /* Transactional non-control MessageSet: + * check if it is part of an aborted transaction. */ + int64_t txn_start_offset = + rd_kafka_aborted_txns_get_offset( + msetr->msetr_aborted_txns, + msetr->msetr_v2_hdr->PID); if (txn_start_offset != -1 && - msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) { - /* MessageSet is part of an aborted transaction */ + msetr->msetr_v2_hdr->BaseOffset >= + txn_start_offset) { + /* MessageSet is part of aborted transaction */ rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", - "%s [%"PRId32"]: " - "Skipping %"PRId32" message(s) " - "in aborted transaction", - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, - msetr->msetr_v2_hdr->RecordCount); - rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); + "%s [%"PRId32"]: " + "Skipping %"PRId32" message(s) " + "in aborted transaction", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + msetr->msetr_v2_hdr->RecordCount); + rd_kafka_buf_skip(rkbuf, payload_size); + rd_slice_widen(&rkbuf->rkbuf_reader, + &save_slice); goto done; } }