Skip to content

Commit

Permalink
Merge pull request #2691 from cesanta/mqtt12
Browse files Browse the repository at this point in the history
add simple API support for app MQTT pub retries
  • Loading branch information
scaprile authored Apr 16, 2024
2 parents 9fa840c + 2bd188f commit 9cccb98
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
14 changes: 10 additions & 4 deletions mongoose.c
Original file line number Diff line number Diff line change
Expand Up @@ -4376,7 +4376,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id = opts->retransmit_id;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
Expand All @@ -4385,17 +4386,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->qos > 0) len += 2;
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);

if (opts->qos > 0 && id != 0) flags |= 1 << 3;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.buf, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (opts->qos > 0) { // need to send 'id' field
if (id == 0) { // generate new one if not resending
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = c->mgr->mqtt_id;
}
mg_send_u16(c, mg_htons(id));
}

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len);
return id;
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
Expand Down
3 changes: 2 additions & 1 deletion mongoose.h
Original file line number Diff line number Diff line change
Expand Up @@ -2359,6 +2359,7 @@ struct mg_mqtt_opts {
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4
uint16_t keepalive; // Keep-alive timer in seconds
uint16_t retransmit_id; // For PUBLISH, init to 0
bool retain; // Retain flag
bool clean; // Clean session flag
struct mg_mqtt_prop *props; // MQTT5 props array
Expand All @@ -2385,7 +2386,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
14 changes: 10 additions & 4 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id = opts->retransmit_id;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
Expand All @@ -328,17 +329,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->qos > 0) len += 2;
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);

if (opts->qos > 0 && id != 0) flags |= 1 << 3;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.buf, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (opts->qos > 0) { // need to send 'id' field
if (id == 0) { // generate new one if not resending
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = c->mgr->mqtt_id;
}
mg_send_u16(c, mg_htons(id));
}

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len);
return id;
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
Expand Down
3 changes: 2 additions & 1 deletion src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct mg_mqtt_opts {
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4
uint16_t keepalive; // Keep-alive timer in seconds
uint16_t retransmit_id; // For PUBLISH, init to 0
bool retain; // Retain flag
bool clean; // Clean session flag
struct mg_mqtt_prop *props; // MQTT5 props array
Expand All @@ -101,7 +102,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
13 changes: 8 additions & 5 deletions test/unit_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,17 +504,18 @@ static void test_mqtt_basic(void) {

// Publish with QoS1 to subscribed topic and check reception
// keep former opts.topic
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false;
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false, opts.retransmit_id = 0;
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
opts.retransmit_id = 0;

// Clean Disconnect !
mg_mqtt_disconnect(c, NULL);
Expand Down Expand Up @@ -569,14 +570,15 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
opts.retransmit_id = 0;

// Publish with QoS2 to subscribed topic and check (simultaneous) reception
// keep former opts.topic
Expand All @@ -588,13 +590,14 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && !(test_data.flags & flags_received); i++)
mg_mgr_poll(&mgr, 10);
} while (!(test_data.flags & flags_received) && retries--);
ASSERT(test_data.flags & flags_received);
test_data.flags &= ~flags_received;
// Mongoose sent PUBREL, wait for PUBCOMP
opts.retransmit_id = 0;
// Mongoose sent PUBREL, wait for PUBCOMP
for (i = 0; i < 500 && !(test_data.flags & flags_completed); i++)
mg_mgr_poll(&mgr, 10);
// TODO(): retry sending PUBREL on failure after an expected timeout
Expand Down

0 comments on commit 9cccb98

Please sign in to comment.