Recommended way to save/load messages on producer client #4120
-
Are there any recommendations upon producer client termination to get all the messages still in the outq, write the relevant information to disk, then upon client restart load such information and produce/enqueue them again? The current client implementation defines a dr_msg_cb, has a separate thread to perform rd_kafka_poll() at frequent intervals and upon termination the sequence is as follows:
Is there a recommended way before termination to get the relevant information about the outq messages which haven't been reported as RD_KAFKA_RESP_ERR_NO_ERROR by the dr_msg_cb so that it can be saved to disk? What rd_kafka_message_t members should be needed to be saved to disk so that upon application restart? The client would loop over the loaded messages and perform a rd_kafka_producev() again on restart. I guess the payload and opaque (_private) members would be enough (the application is using the opaque as an int identifying the message). In the termination sequence described above, where would be more appropriate to retrieve the outq messages information? Asking especially in case the application gets the information between point 2. and 3. but before the Kafka handle is destroyed the retrieval races with msgs successfully sent (meaning it would not be desirable to consider a msg to be saved when it's actually been already successfully delivered). In general terms, the end goals would be:
Thanks. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Call rd_kafka_purge(), followed by rd_kafka_flush() - the messages in queue and in-flight will fail with a PURGE.. error code. From the delivery report callback, write the messages to disk or whatever. You would write the same fields as you originally specified to produce() or producev(). |
Beta Was this translation helpful? Give feedback.
Call rd_kafka_purge(), followed by rd_kafka_flush() - the messages in queue and in-flight will fail with a PURGE.. error code. From the delivery report callback, write the messages to disk or whatever.
You would write the same fields as you originally specified to produce() or producev().