Skip to content

Commit

Permalink
Rate limit IO-based queue wakeups to linger.ms (#2509)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Sep 12, 2019
1 parent a12b909 commit b4d7f8e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
rd_kafka_toppar_unlock(rktp);

if (wakeup_q) {
rd_kafka_q_yield(wakeup_q);
rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/);
rd_kafka_q_destroy(wakeup_q);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,

if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
if (cnt > 0 && dstq->rkq_qlen == 0)
rd_kafka_q_io_event(dstq);
rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/);

/* Optimization, if 'cnt' is equal/larger than all
* items of 'srcq' we can move the entire queue. */
Expand Down Expand Up @@ -730,6 +730,8 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd,
qio->fd = fd;
qio->size = size;
qio->payload = (void *)(qio+1);
qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us;
qio->ts_last = 0;
qio->event_cb = NULL;
qio->event_cb_opaque = NULL;
memcpy(qio->payload, payload, size);
Expand Down
27 changes: 20 additions & 7 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ struct rd_kafka_q_io {
int fd;
void *payload;
size_t size;
rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */
rd_ts_t ts_last; /**< Last IO wakeup */
/* For callback-based signalling */
void (*event_cb) (rd_kafka_t *rk, void *opaque);
void *event_cb_opaque;
Expand Down Expand Up @@ -284,10 +286,12 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) {
/**
* @brief Trigger an IO event for this queue.
*
* @param rate_limit if true, rate limit IO-based wakeups.
*
* @remark Queue MUST be locked
*/
static RD_INLINE RD_UNUSED
void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
void rd_kafka_q_io_event (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {

if (likely(!rkq->rkq_qio))
return;
Expand All @@ -297,6 +301,15 @@ void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
return;
}


if (rate_limit) {
rd_ts_t now = rd_clock();
if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now))
return;

rkq->rkq_qio->ts_last = now;
}

/* Ignore errors, not much to do anyway. */
if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
(int)rkq->rkq_qio->size) == -1)
Expand All @@ -320,7 +333,7 @@ int rd_kafka_op_cmp_prio (const void *_a, const void *_b) {
* @brief Wake up waiters without enqueuing an op.
*/
static RD_INLINE RD_UNUSED void
rd_kafka_q_yield (rd_kafka_q_t *rkq) {
rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
rd_kafka_q_t *fwdq;

mtx_lock(&rkq->rkq_lock);
Expand All @@ -337,12 +350,12 @@ rd_kafka_q_yield (rd_kafka_q_t *rkq) {
rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD;
cnd_signal(&rkq->rkq_cond);
if (rkq->rkq_qlen == 0)
rd_kafka_q_io_event(rkq);
rd_kafka_q_io_event(rkq, rate_limit);

mtx_unlock(&rkq->rkq_lock);
} else {
mtx_unlock(&rkq->rkq_lock);
rd_kafka_q_yield(fwdq);
rd_kafka_q_yield(fwdq, rate_limit);
rd_kafka_q_destroy(fwdq);
}

Expand Down Expand Up @@ -413,7 +426,7 @@ int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_enq0(rkq, rko, at_head);
cnd_signal(&rkq->rkq_cond);
if (rkq->rkq_qlen == 1)
rd_kafka_q_io_event(rkq);
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);

if (do_lock)
mtx_unlock(&rkq->rkq_lock);
Expand Down Expand Up @@ -518,7 +531,7 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {

TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0)
rd_kafka_q_io_event(rkq);
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
rkq->rkq_qlen += srcq->rkq_qlen;
rkq->rkq_qsize += srcq->rkq_qsize;
cnd_signal(&rkq->rkq_cond);
Expand Down Expand Up @@ -559,7 +572,7 @@ void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq,
/* Move srcq to rkq */
TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
rd_kafka_q_io_event(rkq);
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
rkq->rkq_qlen += srcq->rkq_qlen;
rkq->rkq_qsize += srcq->rkq_qsize;

Expand Down

0 comments on commit b4d7f8e

Please sign in to comment.