diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 6cab9000f9..ed40db8665 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -1070,31 +1070,42 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) { rd_kafka_buf_check_len(rkbuf, payload_size); if (msetr->msetr_aborted_txns == NULL && - msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL) { - /* Since there are no aborted transactions, the MessageSet - * must correspond to a commit marker. These are ignored. */ + msetr->msetr_v2_hdr->Attributes & + RD_KAFKA_MSGSET_V2_ATTR_CONTROL) { + /* Since there are no aborted transactions, + * the MessageSet must correspond to a commit marker. + * These are ignored. */ + rd_kafka_buf_skip(rkbuf, payload_size); rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); goto done; } if (msetr->msetr_aborted_txns != NULL && - msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL && - !(msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) { - - int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset( - msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID); + (msetr->msetr_v2_hdr->Attributes & + (RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL| + RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) == + RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) { + /* Transactional non-control MessageSet: + * check if it is part of an aborted transaction. */ + int64_t txn_start_offset = + rd_kafka_aborted_txns_get_offset( + msetr->msetr_aborted_txns, + msetr->msetr_v2_hdr->PID); if (txn_start_offset != -1 && - msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) { - /* MessageSet is part of an aborted transaction */ + msetr->msetr_v2_hdr->BaseOffset >= + txn_start_offset) { + /* MessageSet is part of aborted transaction */ rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", - "%s [%"PRId32"]: " - "Skipping %"PRId32" message(s) " - "in aborted transaction", - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, - msetr->msetr_v2_hdr->RecordCount); - rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); + "%s [%"PRId32"]: " + "Skipping %"PRId32" message(s) " + "in aborted transaction", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + msetr->msetr_v2_hdr->RecordCount); + rd_kafka_buf_skip(rkbuf, payload_size); + rd_slice_widen(&rkbuf->rkbuf_reader, + &save_slice); goto done; } }