diff --git a/mongoose.c b/mongoose.c index 110b18b019..11bdb48b91 100644 --- a/mongoose.c +++ b/mongoose.c @@ -4375,9 +4375,10 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } } -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); +static void mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts, + uint16_t id, uint8_t flags) { size_t len = 2 + opts->topic.len + opts->message.len; + flags |= (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, (char *) opts->topic.ptr, (int) opts->message.len, (char *) opts->message.ptr)); @@ -4387,16 +4388,27 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.ptr, opts->topic.len); - if (opts->qos > 0) { - if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; - mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); - } + if (opts->qos > 0) mg_send_u16(c, id); if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); if (opts->message.len > 0) mg_send(c, opts->message.ptr, opts->message.len); } +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { + uint16_t id; + if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; + id = mg_htons(c->mgr->mqtt_id); + mqtt_pub(c, opts, id, 0); + return id; +} + +void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, + uint16_t id) { + uint8_t flags = (1 << 3); + mqtt_pub(c, opts, id, flags); +} + void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { uint8_t qos_ = opts->qos & 3; size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0; diff --git a/mongoose.h b/mongoose.h index 318180ca64..1e24645278 100644 --- a/mongoose.h +++ b/mongoose.h @@ -2387,7 +2387,8 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts); -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, uint16_t id); void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts); int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/src/mqtt.c b/src/mqtt.c index 8531424bcf..7139c91167 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -319,9 +319,10 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } } -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); +static void mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts, + uint16_t id, uint8_t flags) { size_t len = 2 + opts->topic.len + opts->message.len; + flags |= (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, (char *) opts->topic.ptr, (int) opts->message.len, (char *) opts->message.ptr)); @@ -331,16 +332,27 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.ptr, opts->topic.len); - if (opts->qos > 0) { - if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; - mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); - } + if (opts->qos > 0) mg_send_u16(c, id); if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); if (opts->message.len > 0) mg_send(c, opts->message.ptr, opts->message.len); } +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { + uint16_t id; + if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; + id = mg_htons(c->mgr->mqtt_id); + mqtt_pub(c, opts, id, 0); + return id; +} + +void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, + uint16_t id) { + uint8_t flags = (1 << 3); + mqtt_pub(c, opts, id, flags); +} + void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { uint8_t qos_ = opts->qos & 3; size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0; diff --git a/src/mqtt.h b/src/mqtt.h index 6edb7aa17e..3fefd0fa1a 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -101,7 +101,8 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts); -void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts); +void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, uint16_t id); void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts); int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/test/unit_test.c b/test/unit_test.c index 4908d447b7..b3fbc21733 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -475,7 +475,8 @@ static void test_mqtt_basic(void) { struct mg_mqtt_opts opts; const char *url = "mqtt://broker.hivemq.com:1883"; int i, retries; - + uint16_t id; + // Connect with empty client ID, no options, ergo MQTT = 3.1.1 mg_mgr_init(&mgr); c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data); @@ -506,10 +507,12 @@ static void test_mqtt_basic(void) { // keep former opts.topic opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false; retries = 0; // don't do retries for test speed + id = mg_mqtt_pub(c, &opts); do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10); - } while (test_data.flags == 0 && retries--); + if (test_data.flags != 0) break; + mg_mqtt_repub(c, &opts, id); + } while (retries--); ASSERT(test_data.flags == flags_published); for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10); check_mqtt_message(&opts, &test_data, true); @@ -532,7 +535,8 @@ static void test_mqtt_ver(uint8_t mqtt_version) { struct mg_mqtt_prop properties[5]; const char *url = "mqtt://broker.hivemq.com:1883"; int i, retries; - + uint16_t id; + // Connect with options: version, clean session, last will, keepalive // time. Don't set retain, some runners are not random test_data.flags = 0; @@ -568,10 +572,12 @@ static void test_mqtt_ver(uint8_t mqtt_version) { construct_props(properties); } retries = 0; // don't do retries for test speed + id = mg_mqtt_pub(c, &opts); do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10); - } while (test_data.flags == 0 && retries--); + if (test_data.flags != 0) break; + mg_mqtt_repub(c, &opts, id); + } while (retries--); ASSERT(test_data.flags == flags_published); for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10); check_mqtt_message(&opts, &test_data, true); @@ -587,11 +593,13 @@ static void test_mqtt_ver(uint8_t mqtt_version) { construct_props(properties); } retries = 0; // don't do retries for test speed + id = mg_mqtt_pub(c, &opts); do { // retry on failure after an expected timeout - mg_mqtt_pub(c, &opts); for (i = 0; i < 500 && !(test_data.flags & flags_received); i++) mg_mgr_poll(&mgr, 10); - } while (!(test_data.flags & flags_received) && retries--); + if (test_data.flags & flags_received) break; + mg_mqtt_repub(c, &opts, id); + } while (retries--); ASSERT(test_data.flags & flags_received); test_data.flags &= ~flags_received; // Mongoose sent PUBREL, wait for PUBCOMP