From 2bd188f3d6135c8cc07f903eff44043ac74c6363 Mon Sep 17 00:00:00 2001 From: "Sergio R. Caprile" Date: Fri, 5 Apr 2024 16:31:35 -0300 Subject: [PATCH] add simple API support for app MQTT pub retries --- mongoose.c | 14 ++++++++++---- mongoose.h | 3 ++- src/mqtt.c | 14 ++++++++++---- src/mqtt.h | 3 ++- test/unit_test.c | 13 ++++++++----- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/mongoose.c b/mongoose.c index a14dd5d090..00234e7e0e 100644 --- a/mongoose.c +++ b/mongoose.c @@ -4376,7 +4376,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } } -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { + uint16_t id = opts->retransmit_id; uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); size_t len = 2 + opts->topic.len + opts->message.len; MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, @@ -4385,17 +4386,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (opts->qos > 0) len += 2; if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props); + if (opts->qos > 0 && id != 0) flags |= 1 << 3; mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.buf, opts->topic.len); - if (opts->qos > 0) { - if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; - mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); + if (opts->qos > 0) { // need to send 'id' field + if (id == 0) { // generate new one if not resending + if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; + id = c->mgr->mqtt_id; + } + mg_send_u16(c, mg_htons(id)); } if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len); + return id; } void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { diff --git a/mongoose.h b/mongoose.h index 73a1a8291c..971a7c0b5b 100644 --- a/mongoose.h +++ b/mongoose.h @@ -2359,6 +2359,7 @@ struct mg_mqtt_opts { uint8_t qos; // message quality of service uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4 uint16_t keepalive; // Keep-alive timer in seconds + uint16_t retransmit_id; // For PUBLISH, init to 0 bool retain; // Retain flag bool clean; // Clean session flag struct mg_mqtt_prop *props; // MQTT5 props array @@ -2385,7 +2386,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts); -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts); int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/src/mqtt.c b/src/mqtt.c index 9a106657f5..c0e60b4f83 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -319,7 +319,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } } -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { + uint16_t id = opts->retransmit_id; uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); size_t len = 2 + opts->topic.len + opts->message.len; MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, @@ -328,17 +329,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (opts->qos > 0) len += 2; if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props); + if (opts->qos > 0 && id != 0) flags |= 1 << 3; mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.buf, opts->topic.len); - if (opts->qos > 0) { - if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; - mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); + if (opts->qos > 0) { // need to send 'id' field + if (id == 0) { // generate new one if not resending + if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; + id = c->mgr->mqtt_id; + } + mg_send_u16(c, mg_htons(id)); } if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len); + return id; } void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { diff --git a/src/mqtt.h b/src/mqtt.h index 6edb7aa17e..eb4567bcc3 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -75,6 +75,7 @@ struct mg_mqtt_opts { uint8_t qos; // message quality of service uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4 uint16_t keepalive; // Keep-alive timer in seconds + uint16_t retransmit_id; // For PUBLISH, init to 0 bool retain; // Retain flag bool clean; // Clean session flag struct mg_mqtt_prop *props; // MQTT5 props array @@ -101,7 +102,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts); -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts); int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/test/unit_test.c b/test/unit_test.c index a4ec1004ff..3500e8a466 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -504,10 +504,10 @@ static void test_mqtt_basic(void) { // Publish with QoS1 to subscribed topic and check reception // keep former opts.topic - opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false; + opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false, opts.retransmit_id = 0; retries = 0; // don't do retries for test speed do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); + opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10); } while (test_data.flags == 0 && retries--); ASSERT(test_data.flags == flags_published); @@ -515,6 +515,7 @@ static void test_mqtt_basic(void) { check_mqtt_message(&opts, &test_data, true); memset(mbuf + 1, 0, sizeof(mbuf) - 1); test_data.flags = 0; + opts.retransmit_id = 0; // Clean Disconnect ! mg_mqtt_disconnect(c, NULL); @@ -569,7 +570,7 @@ static void test_mqtt_ver(uint8_t mqtt_version) { } retries = 0; // don't do retries for test speed do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); + opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10); } while (test_data.flags == 0 && retries--); ASSERT(test_data.flags == flags_published); @@ -577,6 +578,7 @@ static void test_mqtt_ver(uint8_t mqtt_version) { check_mqtt_message(&opts, &test_data, true); memset(mbuf + 1, 0, sizeof(mbuf) - 1); test_data.flags = 0; + opts.retransmit_id = 0; // Publish with QoS2 to subscribed topic and check (simultaneous) reception // keep former opts.topic @@ -588,13 +590,14 @@ static void test_mqtt_ver(uint8_t mqtt_version) { } retries = 0; // don't do retries for test speed do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); + opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend for (i = 0; i < 500 && !(test_data.flags & flags_received); i++) mg_mgr_poll(&mgr, 10); } while (!(test_data.flags & flags_received) && retries--); ASSERT(test_data.flags & flags_received); test_data.flags &= ~flags_received; - // Mongoose sent PUBREL, wait for PUBCOMP + opts.retransmit_id = 0; +// Mongoose sent PUBREL, wait for PUBCOMP for (i = 0; i < 500 && !(test_data.flags & flags_completed); i++) mg_mgr_poll(&mgr, 10); // TODO(): retry sending PUBREL on failure after an expected timeout