Skip to content

Commit

Permalink
Skip aborted messages in compressed MessageSets (#3169)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhowlett authored and edenhill committed Dec 8, 2020
1 parent 0f5e2c0 commit 50d0527
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 34 deletions.
75 changes: 43 additions & 32 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,43 @@ rd_kafka_msgset_reader_msg_v2 (rd_kafka_msgset_reader_t *msetr) {
*/
static rd_kafka_resp_err_t
rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr) {
rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
/* Only log decoding errors if protocol debugging enabled. */
int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug &
RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0;

if (msetr->msetr_aborted_txns != NULL &&
(msetr->msetr_v2_hdr->Attributes &
(RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL|
RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) ==
RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) {
/* Transactional non-control MessageSet:
* check if it is part of an aborted transaction. */
int64_t txn_start_offset =
rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns,
msetr->msetr_v2_hdr->PID);

if (txn_start_offset != -1 &&
msetr->msetr_v2_hdr->BaseOffset >=
txn_start_offset) {
/* MessageSet is part of aborted transaction */
rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
"%s [%"PRId32"]: "
"Skipping %"PRId32" message(s) "
"in aborted transaction "
"at offset %"PRId64,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->RecordCount,
txn_start_offset);
rd_kafka_buf_skip(msetr->msetr_rkbuf, rd_slice_remains(
&msetr->msetr_rkbuf->rkbuf_reader));
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
}

while (rd_kafka_buf_read_remain(msetr->msetr_rkbuf)) {
rd_kafka_resp_err_t err;
err = rd_kafka_msgset_reader_msg_v2(msetr);
Expand All @@ -955,6 +992,12 @@ rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr) {
}

return RD_KAFKA_RESP_ERR_NO_ERROR;

err_parse:
/* Count all parse errors as partial message errors. */
rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1);
msetr->msetr_v2_hdr = NULL;
return rkbuf->rkbuf_err;
}


Expand Down Expand Up @@ -1088,38 +1131,6 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
&save_slice, payload_size))
rd_kafka_buf_check_len(rkbuf, payload_size);

if (msetr->msetr_aborted_txns != NULL &&
(msetr->msetr_v2_hdr->Attributes &
(RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL|
RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) ==
RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) {
/* Transactional non-control MessageSet:
* check if it is part of an aborted transaction. */
int64_t txn_start_offset =
rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns,
msetr->msetr_v2_hdr->PID);

if (txn_start_offset != -1 &&
msetr->msetr_v2_hdr->BaseOffset >=
txn_start_offset) {
/* MessageSet is part of aborted transaction */
rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
"%s [%"PRId32"]: "
"Skipping %"PRId32" message(s) "
"in aborted transaction "
"at offset %"PRId64,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->RecordCount,
txn_start_offset);
rd_kafka_buf_skip(rkbuf, payload_size);
rd_slice_widen(&rkbuf->rkbuf_reader,
&save_slice);
goto done;
}
}

/* Read messages */
err = rd_kafka_msgset_reader_msgs_v2(msetr);

Expand Down
7 changes: 5 additions & 2 deletions tests/0103-transactions.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void do_produce_batch (rd_kafka_t *rk, const char *topic, uint64_t testid,
* (only consumed output for verification).
* e.g., no consumer offsets to commit with transaction.
*/
static void do_test_basic_producer_txn (void) {
static void do_test_basic_producer_txn (rd_bool_t enable_compression) {
const char *topic = test_mk_topic_name("0103_transactions", 1);
const int partition_cnt = 4;
#define _TXNCNT 6
Expand Down Expand Up @@ -131,6 +131,8 @@ static void do_test_basic_producer_txn (void) {
p_conf = rd_kafka_conf_dup(conf);
rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
test_conf_set(p_conf, "transactional.id", topic);
if (enable_compression)
test_conf_set(p_conf, "compression.type", "lz4");
p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);

// FIXME: add testing were the txn id is reused (and thus fails)
Expand Down Expand Up @@ -771,7 +773,8 @@ static void do_test_fenced_txn (rd_bool_t produce_after_fence) {
int main_0103_transactions (int argc, char **argv) {

do_test_misuse_txn();
do_test_basic_producer_txn();
do_test_basic_producer_txn(rd_false /* without compression */);
do_test_basic_producer_txn(rd_true /* with compression */);
do_test_consumer_producer_txn();
do_test_fenced_txn(rd_false /* no produce after fencing */);
do_test_fenced_txn(rd_true /* produce after fencing */);
Expand Down

0 comments on commit 50d0527

Please sign in to comment.