Skip to content

Commit

Permalink
FabBackend: added retransmission before sending new messages
Browse files Browse the repository at this point in the history
  • Loading branch information
PBrunot committed May 26, 2024
1 parent 582f840 commit 60b210a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
13 changes: 8 additions & 5 deletions include/BufferedMsg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ namespace fabomatic
{
std::string mqtt_message;
std::string mqtt_topic;
bool wait_for_answer;
BufferedMsg() = default;
BufferedMsg(const std::string &message, const std::string &topic) : mqtt_message(message), mqtt_topic(topic){};
BufferedMsg(const std::string &message, const std::string &topic, bool wait) : mqtt_message(message), mqtt_topic(topic), wait_for_answer{wait} {};
BufferedMsg(const BufferedMsg &source) = default;
BufferedMsg(BufferedMsg &source) = default;
};
Expand All @@ -28,10 +29,11 @@ namespace fabomatic
std::deque<BufferedMsg> msg_queue;
bool has_changed{true};
static constexpr auto MAGIC_NUMBER = 1;
static constexpr auto MAX_MESSAGES = 40;

public:
auto push_back(const std::string &message, const std::string &topic) -> void;
auto push_front(const std::string &message, const std::string &topic) -> void;
auto push_back(const std::string &message, const std::string &topic, bool wait) -> void;
auto push_front(const std::string &message, const std::string &topic, bool wait) -> void;
auto getMessage() -> const BufferedMsg;
auto count() const -> size_t;
auto toJson(JsonDocument &doc, const std::string &element_name) const -> void;
Expand All @@ -46,13 +48,14 @@ namespace fabomatic
private:
std::string_view mqtt_value;
std::string_view mqtt_topic;
bool wait_for_answer;

public:
BufferedQuery() = delete;
constexpr BufferedQuery(const std::string_view &value, const std::string_view &topic) : mqtt_value(value), mqtt_topic(topic){};
constexpr BufferedQuery(const std::string_view &value, const std::string_view &topic, bool wait) : mqtt_value(value), mqtt_topic(topic), wait_for_answer{wait} {};

[[nodiscard]] auto payload() const -> const std::string override { return std::string(mqtt_value); };
[[nodiscard]] auto waitForReply() const -> bool override { return false; };
[[nodiscard]] auto waitForReply() const -> bool override { return wait_for_answer; };
[[nodiscard]] auto buffered() const -> bool override { return false; };
[[nodiscard]] auto topic() const -> std::string { return std::string(mqtt_topic); };
};
Expand Down
30 changes: 21 additions & 9 deletions src/BufferedMsg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@
namespace fabomatic
{

auto Buffer::push_back(const std::string &message, const std::string &topic) -> void
auto Buffer::push_back(const std::string &message, const std::string &topic, bool wait) -> void
{
if constexpr (conf::debug::ENABLE_BUFFERING)
{
BufferedMsg msg{message, topic};
BufferedMsg msg{message, topic, wait};
msg_queue.push_back(msg);
if (msg_queue.size() > MAX_MESSAGES)
{
msg_queue.pop_front();
ESP_LOGW(TAG, "Removing first message from the buffer");
}
has_changed = true;

ESP_LOGI(TAG, "Buffered %s on %s, %lu messages queued", message.c_str(), topic.c_str(), msg_queue.size());
}
}

auto Buffer::push_front(const std::string &message, const std::string &topic) -> void
auto Buffer::push_front(const std::string &message, const std::string &topic, bool wait) -> void
{
if constexpr (conf::debug::ENABLE_BUFFERING)
{
BufferedMsg msg{message, topic};
BufferedMsg msg{message, topic, wait};
msg_queue.push_front(msg);
if (msg_queue.size() > MAX_MESSAGES)
{
msg_queue.pop_back();
ESP_LOGW(TAG, "Removing last message from the buffer");
}
has_changed = true;

ESP_LOGI(TAG, "Buffered %s on %s, %lu messages queued", message.c_str(), topic.c_str(), msg_queue.size());
Expand All @@ -35,7 +45,7 @@ namespace fabomatic
if (msg_queue.size() == 0)
{
ESP_LOGE(TAG, "Calling getMessage() on empty queue!");
return {"", ""};
return {"", "", false};
}

auto elem = msg_queue.front();
Expand All @@ -59,8 +69,9 @@ namespace fabomatic
for (auto elem = msg_queue.begin(); elem != msg_queue.end(); elem++)
{
auto obj_msg = json_elem.createNestedObject();
obj_msg["topic"] = elem->mqtt_topic;
obj_msg["message"] = elem->mqtt_message;
obj_msg["tp"] = elem->mqtt_topic;
obj_msg["msg"] = elem->mqtt_message;
obj_msg["wait"] = elem->wait_for_answer;
}
}

Expand All @@ -78,8 +89,9 @@ namespace fabomatic

for (const auto &elem : json_obj["messages"].as<JsonArray>())
{
buff.push_back(elem["message"].as<std::string>(),
elem["topic"].as<std::string>());
buff.push_back(elem["msg"].as<std::string>(),
elem["tp"].as<std::string>(),
elem["wait"].as<bool>());
}

ESP_LOGD(TAG, "Buffer::fromJsonElement() : data loaded successfully");
Expand Down
66 changes: 56 additions & 10 deletions src/FabBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace fabomatic

if (query.buffered())
{
buffer.push_back(query.payload(), topic);
buffer.push_back(query.payload(), topic, query.waitForReply());
}
return false;
}
Expand Down Expand Up @@ -391,7 +391,22 @@ namespace fabomatic
static_assert(std::is_base_of<ServerMQTT::Response, RespT>::value, "RespT must inherit from Response");
QueryT query{args...};

if (isOnline())
auto nb_tries = 0;
while (isOnline() && hasBufferedMsg() && !transmitBuffer() && nb_tries < 3)
{
// To preserve chronological order, we cannot send new messages until the old ones have been sent.
ESP_LOGW(TAG, "Online with pending messages that could not be transmitted, retrying...");

Tasks::delay(250ms);

if (!isOnline())
{
connect();
}
nb_tries++;
}

if (isOnline() && !hasBufferedMsg())
{
if (publishWithReply(query))
{
Expand All @@ -413,7 +428,7 @@ namespace fabomatic

if (query.buffered())
{
buffer.push_back(query.payload(), topic);
buffer.push_back(query.payload(), topic, query.waitForReply());
}

return std::make_unique<RespT>(false);
Expand All @@ -432,7 +447,22 @@ namespace fabomatic
static_assert(std::is_base_of<ServerMQTT::Query, QueryT>::value, "QueryT must inherit from Query");
QueryT query{args...};

if (isOnline())
auto nb_tries = 0;
while (isOnline() && hasBufferedMsg() && !transmitBuffer() && nb_tries < 3)
{
// To preserve chronological order, we cannot send new messages until the old ones have been sent.
ESP_LOGW(TAG, "Online with pending messages that could not be transmitted, retrying...");

Tasks::delay(250ms);

if (!isOnline())
{
connect();
}
nb_tries++;
}

if (isOnline() && !hasBufferedMsg())
{
if (publish(query))
{
Expand All @@ -447,7 +477,7 @@ namespace fabomatic

if (query.buffered())
{
buffer.push_back(query.payload(), topic);
buffer.push_back(query.payload(), topic, query.waitForReply());
}
return false;
}
Expand Down Expand Up @@ -548,15 +578,31 @@ namespace fabomatic
{
while (hasBufferedMsg() && isOnline())
{
ESP_LOGD(TAG, "Retransmitting buffered messages...");
const auto &msg = buffer.getMessage();
const BufferedQuery bq{msg.mqtt_message, msg.mqtt_topic};
if (!publish(bq))
const BufferedQuery bq{msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer};
if (bq.waitForReply())
{
// Will try again
buffer.push_front(msg.mqtt_message, msg.mqtt_topic);
break;
if (!publishWithReply(bq))
{
ESP_LOGW(TAG, "Retransmitting buffered message failed!");
// Will try again
buffer.push_front(msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer);
break;
}
}
else
{
if (!publish(bq))
{
// Will try again
ESP_LOGW(TAG, "Retransmitting buffered message failed!");
buffer.push_front(msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer);
break;
}
}
}
last_reply = "";
return !hasBufferedMsg();
}

Expand Down

0 comments on commit 60b210a

Please sign in to comment.