Skip to content

Commit

Permalink
add simple API support for app MQTT pub retries
Browse files Browse the repository at this point in the history
  • Loading branch information
scaprile committed Apr 5, 2024
1 parent b3312f3 commit 2813d9d
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 22 deletions.
24 changes: 18 additions & 6 deletions mongoose.c
Original file line number Diff line number Diff line change
Expand Up @@ -4375,9 +4375,10 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
static void mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts,
uint16_t id, uint8_t flags) {
size_t len = 2 + opts->topic.len + opts->message.len;
flags |= (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
Expand All @@ -4387,16 +4388,27 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (opts->qos > 0) mg_send_u16(c, id);

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.ptr, opts->message.len);
}

uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id;
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = mg_htons(c->mgr->mqtt_id);
mqtt_pub(c, opts, id, 0);
return id;
}

void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts,
uint16_t id) {
uint8_t flags = (1 << 3);
mqtt_pub(c, opts, id, flags);
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0;
Expand Down
3 changes: 2 additions & 1 deletion mongoose.h
Original file line number Diff line number Diff line change
Expand Up @@ -2387,7 +2387,8 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, uint16_t id);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
24 changes: 18 additions & 6 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,10 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}

void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
static void mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts,
uint16_t id, uint8_t flags) {
size_t len = 2 + opts->topic.len + opts->message.len;
flags |= (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
Expand All @@ -331,16 +332,27 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (opts->qos > 0) mg_send_u16(c, id);

if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);

if (opts->message.len > 0) mg_send(c, opts->message.ptr, opts->message.len);
}

uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint16_t id;
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
id = mg_htons(c->mgr->mqtt_id);
mqtt_pub(c, opts, id, 0);
return id;
}

void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts,
uint16_t id) {
uint8_t flags = (1 << 3);
mqtt_pub(c, opts, id, flags);
}

void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0;
Expand Down
3 changes: 2 additions & 1 deletion src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_repub(struct mg_connection *c, const struct mg_mqtt_opts *opts, uint16_t id);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
Expand Down
24 changes: 16 additions & 8 deletions test/unit_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ static void test_mqtt_basic(void) {
struct mg_mqtt_opts opts;
const char *url = "mqtt://broker.hivemq.com:1883";
int i, retries;

uint16_t id;

// Connect with empty client ID, no options, ergo MQTT = 3.1.1
mg_mgr_init(&mgr);
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data);
Expand Down Expand Up @@ -506,10 +507,12 @@ static void test_mqtt_basic(void) {
// keep former opts.topic
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false;
retries = 0; // don't do retries for test speed
id = mg_mqtt_pub(c, &opts);
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
if (test_data.flags != 0) break;
mg_mqtt_repub(c, &opts, id);
} while (retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
Expand All @@ -532,7 +535,8 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
struct mg_mqtt_prop properties[5];
const char *url = "mqtt://broker.hivemq.com:1883";
int i, retries;

uint16_t id;

// Connect with options: version, clean session, last will, keepalive
// time. Don't set retain, some runners are not random
test_data.flags = 0;
Expand Down Expand Up @@ -568,10 +572,12 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
construct_props(properties);
}
retries = 0; // don't do retries for test speed
id = mg_mqtt_pub(c, &opts);
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
if (test_data.flags != 0) break;
mg_mqtt_repub(c, &opts, id);
} while (retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
Expand All @@ -587,11 +593,13 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
construct_props(properties);
}
retries = 0; // don't do retries for test speed
id = mg_mqtt_pub(c, &opts);
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && !(test_data.flags & flags_received); i++)
mg_mgr_poll(&mgr, 10);
} while (!(test_data.flags & flags_received) && retries--);
if (test_data.flags & flags_received) break;
mg_mqtt_repub(c, &opts, id);
} while (retries--);
ASSERT(test_data.flags & flags_received);
test_data.flags &= ~flags_received;
// Mongoose sent PUBREL, wait for PUBCOMP
Expand Down

0 comments on commit 2813d9d

Please sign in to comment.