Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add simple API support for app MQTT pub retries #2691

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions mongoose.c
Original file line number Diff line number Diff line change
Expand Up @@ -4376,7 +4376,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id = opts->retransmit_id;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
Expand All @@ -4385,17 +4386,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->qos > 0) len += 2;
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);

if (opts->qos > 0 && id != 0) flags |= 1 << 3;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.buf, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (opts->qos > 0) { // need to send 'id' field
if (id == 0) { // generate new one if not resending
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = c->mgr->mqtt_id;
}
mg_send_u16(c, mg_htons(id));
}

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len);
return id;
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
Expand Down
3 changes: 2 additions & 1 deletion mongoose.h
Original file line number Diff line number Diff line change
Expand Up @@ -2359,6 +2359,7 @@ struct mg_mqtt_opts {
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4
uint16_t keepalive; // Keep-alive timer in seconds
uint16_t retransmit_id; // For PUBLISH, init to 0
bool retain; // Retain flag
bool clean; // Clean session flag
struct mg_mqtt_prop *props; // MQTT5 props array
Expand All @@ -2385,7 +2386,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
14 changes: 10 additions & 4 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id = opts->retransmit_id;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
Expand All @@ -328,17 +329,22 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->qos > 0) len += 2;
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);

if (opts->qos > 0 && id != 0) flags |= 1 << 3;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.buf, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (opts->qos > 0) { // need to send 'id' field
if (id == 0) { // generate new one if not resending
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = c->mgr->mqtt_id;
}
mg_send_u16(c, mg_htons(id));
}

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.buf, opts->message.len);
return id;
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
Expand Down
3 changes: 2 additions & 1 deletion src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct mg_mqtt_opts {
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4
uint16_t keepalive; // Keep-alive timer in seconds
uint16_t retransmit_id; // For PUBLISH, init to 0
bool retain; // Retain flag
bool clean; // Clean session flag
struct mg_mqtt_prop *props; // MQTT5 props array
Expand All @@ -101,7 +102,7 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
13 changes: 8 additions & 5 deletions test/unit_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,17 +504,18 @@ static void test_mqtt_basic(void) {

// Publish with QoS1 to subscribed topic and check reception
// keep former opts.topic
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false;
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false, opts.retransmit_id = 0;
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
opts.retransmit_id = 0;

// Clean Disconnect !
mg_mqtt_disconnect(c, NULL);
Expand Down Expand Up @@ -569,14 +570,15 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
opts.retransmit_id = 0;

// Publish with QoS2 to subscribed topic and check (simultaneous) reception
// keep former opts.topic
Expand All @@ -588,13 +590,14 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
opts.retransmit_id = mg_mqtt_pub(c, &opts); // save id for possible resend
for (i = 0; i < 500 && !(test_data.flags & flags_received); i++)
mg_mgr_poll(&mgr, 10);
} while (!(test_data.flags & flags_received) && retries--);
ASSERT(test_data.flags & flags_received);
test_data.flags &= ~flags_received;
// Mongoose sent PUBREL, wait for PUBCOMP
opts.retransmit_id = 0;
// Mongoose sent PUBREL, wait for PUBCOMP
for (i = 0; i < 500 && !(test_data.flags & flags_completed); i++)
mg_mgr_poll(&mgr, 10);
// TODO(): retry sending PUBREL on failure after an expected timeout
Expand Down
Loading