diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 13dd56a1cc..4dd287ace1 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -947,6 +947,43 @@ rd_kafka_msgset_reader_msg_v2 (rd_kafka_msgset_reader_t *msetr) { */ static rd_kafka_resp_err_t rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & + RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0; + + if (msetr->msetr_aborted_txns != NULL && + (msetr->msetr_v2_hdr->Attributes & + (RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL| + RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) == + RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) { + /* Transactional non-control MessageSet: + * check if it is part of an aborted transaction. */ + int64_t txn_start_offset = + rd_kafka_aborted_txns_get_offset( + msetr->msetr_aborted_txns, + msetr->msetr_v2_hdr->PID); + + if (txn_start_offset != -1 && + msetr->msetr_v2_hdr->BaseOffset >= + txn_start_offset) { + /* MessageSet is part of aborted transaction */ + rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", + "%s [%"PRId32"]: " + "Skipping %"PRId32" message(s) " + "in aborted transaction " + "at offset %"PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + msetr->msetr_v2_hdr->RecordCount, + txn_start_offset); + rd_kafka_buf_skip(msetr->msetr_rkbuf, rd_slice_remains( + &msetr->msetr_rkbuf->rkbuf_reader)); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + while (rd_kafka_buf_read_remain(msetr->msetr_rkbuf)) { rd_kafka_resp_err_t err; err = rd_kafka_msgset_reader_msg_v2(msetr); @@ -955,6 +992,12 @@ rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr) { } return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + /* Count all parse errors as partial message errors. */ + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1); + msetr->msetr_v2_hdr = NULL; + return rkbuf->rkbuf_err; } @@ -1088,38 +1131,6 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) { &save_slice, payload_size)) rd_kafka_buf_check_len(rkbuf, payload_size); - if (msetr->msetr_aborted_txns != NULL && - (msetr->msetr_v2_hdr->Attributes & - (RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL| - RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) == - RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) { - /* Transactional non-control MessageSet: - * check if it is part of an aborted transaction. */ - int64_t txn_start_offset = - rd_kafka_aborted_txns_get_offset( - msetr->msetr_aborted_txns, - msetr->msetr_v2_hdr->PID); - - if (txn_start_offset != -1 && - msetr->msetr_v2_hdr->BaseOffset >= - txn_start_offset) { - /* MessageSet is part of aborted transaction */ - rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", - "%s [%"PRId32"]: " - "Skipping %"PRId32" message(s) " - "in aborted transaction " - "at offset %"PRId64, - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, - msetr->msetr_v2_hdr->RecordCount, - txn_start_offset); - rd_kafka_buf_skip(rkbuf, payload_size); - rd_slice_widen(&rkbuf->rkbuf_reader, - &save_slice); - goto done; - } - } - /* Read messages */ err = rd_kafka_msgset_reader_msgs_v2(msetr); diff --git a/tests/0103-transactions.c b/tests/0103-transactions.c index b85f81baef..3102cef99d 100644 --- a/tests/0103-transactions.c +++ b/tests/0103-transactions.c @@ -94,7 +94,7 @@ void do_produce_batch (rd_kafka_t *rk, const char *topic, uint64_t testid, * (only consumed output for verification). * e.g., no consumer offsets to commit with transaction. */ -static void do_test_basic_producer_txn (void) { +static void do_test_basic_producer_txn (rd_bool_t enable_compression) { const char *topic = test_mk_topic_name("0103_transactions", 1); const int partition_cnt = 4; #define _TXNCNT 6 @@ -131,6 +131,8 @@ static void do_test_basic_producer_txn (void) { p_conf = rd_kafka_conf_dup(conf); rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb); test_conf_set(p_conf, "transactional.id", topic); + if (enable_compression) + test_conf_set(p_conf, "compression.type", "lz4"); p = test_create_handle(RD_KAFKA_PRODUCER, p_conf); // FIXME: add testing were the txn id is reused (and thus fails) @@ -771,7 +773,8 @@ static void do_test_fenced_txn (rd_bool_t produce_after_fence) { int main_0103_transactions (int argc, char **argv) { do_test_misuse_txn(); - do_test_basic_producer_txn(); + do_test_basic_producer_txn(rd_false /* without compression */); + do_test_basic_producer_txn(rd_true /* with compression */); do_test_consumer_producer_txn(); do_test_fenced_txn(rd_false /* no produce after fencing */); do_test_fenced_txn(rd_true /* produce after fencing */);