diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 4e3dcc4c4f..fbad14f675 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -666,7 +666,7 @@ void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { rd_kafka_toppar_unlock(rktp); if (wakeup_q) { - rd_kafka_q_yield(wakeup_q); + rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/); rd_kafka_q_destroy(wakeup_q); } } diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index f85f57961c..4af1df8351 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -256,7 +256,7 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) { if (cnt > 0 && dstq->rkq_qlen == 0) - rd_kafka_q_io_event(dstq); + rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/); /* Optimization, if 'cnt' is equal/larger than all * items of 'srcq' we can move the entire queue. */ @@ -730,6 +730,8 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd, qio->fd = fd; qio->size = size; qio->payload = (void *)(qio+1); + qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us; + qio->ts_last = 0; qio->event_cb = NULL; qio->event_cb_opaque = NULL; memcpy(qio->payload, payload, size); diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index 291c78a13e..f12109568b 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -90,6 +90,8 @@ struct rd_kafka_q_io { int fd; void *payload; size_t size; + rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */ + rd_ts_t ts_last; /**< Last IO wakeup */ /* For callback-based signalling */ void (*event_cb) (rd_kafka_t *rk, void *opaque); void *event_cb_opaque; @@ -284,10 +286,12 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) { /** * @brief Trigger an IO event for this queue. * + * @param rate_limit if true, rate limit IO-based wakeups. + * * @remark Queue MUST be locked */ static RD_INLINE RD_UNUSED -void rd_kafka_q_io_event (rd_kafka_q_t *rkq) { +void rd_kafka_q_io_event (rd_kafka_q_t *rkq, rd_bool_t rate_limit) { if (likely(!rkq->rkq_qio)) return; @@ -297,6 +301,15 @@ void rd_kafka_q_io_event (rd_kafka_q_t *rkq) { return; } + + if (rate_limit) { + rd_ts_t now = rd_clock(); + if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now)) + return; + + rkq->rkq_qio->ts_last = now; + } + /* Ignore errors, not much to do anyway. */ if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, (int)rkq->rkq_qio->size) == -1) @@ -320,7 +333,7 @@ int rd_kafka_op_cmp_prio (const void *_a, const void *_b) { * @brief Wake up waiters without enqueuing an op. */ static RD_INLINE RD_UNUSED void -rd_kafka_q_yield (rd_kafka_q_t *rkq) { +rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) { rd_kafka_q_t *fwdq; mtx_lock(&rkq->rkq_lock); @@ -337,12 +350,12 @@ rd_kafka_q_yield (rd_kafka_q_t *rkq) { rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD; cnd_signal(&rkq->rkq_cond); if (rkq->rkq_qlen == 0) - rd_kafka_q_io_event(rkq); + rd_kafka_q_io_event(rkq, rate_limit); mtx_unlock(&rkq->rkq_lock); } else { mtx_unlock(&rkq->rkq_lock); - rd_kafka_q_yield(fwdq); + rd_kafka_q_yield(fwdq, rate_limit); rd_kafka_q_destroy(fwdq); } @@ -413,7 +426,7 @@ int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_enq0(rkq, rko, at_head); cnd_signal(&rkq->rkq_cond); if (rkq->rkq_qlen == 1) - rd_kafka_q_io_event(rkq); + rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); if (do_lock) mtx_unlock(&rkq->rkq_lock); @@ -518,7 +531,7 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) { TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link); if (rkq->rkq_qlen == 0) - rd_kafka_q_io_event(rkq); + rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); rkq->rkq_qlen += srcq->rkq_qlen; rkq->rkq_qsize += srcq->rkq_qsize; cnd_signal(&rkq->rkq_cond); @@ -559,7 +572,7 @@ void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, /* Move srcq to rkq */ TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link); if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0) - rd_kafka_q_io_event(rkq); + rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); rkq->rkq_qlen += srcq->rkq_qlen; rkq->rkq_qsize += srcq->rkq_qsize;