diff --git a/include/BufferedMsg.hpp b/include/BufferedMsg.hpp index 59353331..e2baea1a 100644 --- a/include/BufferedMsg.hpp +++ b/include/BufferedMsg.hpp @@ -16,8 +16,9 @@ namespace fabomatic { std::string mqtt_message; std::string mqtt_topic; + bool wait_for_answer; BufferedMsg() = default; - BufferedMsg(const std::string &message, const std::string &topic) : mqtt_message(message), mqtt_topic(topic){}; + BufferedMsg(const std::string &message, const std::string &topic, bool wait) : mqtt_message(message), mqtt_topic(topic), wait_for_answer{wait} {}; BufferedMsg(const BufferedMsg &source) = default; BufferedMsg(BufferedMsg &source) = default; }; @@ -28,10 +29,11 @@ namespace fabomatic std::deque msg_queue; bool has_changed{true}; static constexpr auto MAGIC_NUMBER = 1; + static constexpr auto MAX_MESSAGES = 40; public: - auto push_back(const std::string &message, const std::string &topic) -> void; - auto push_front(const std::string &message, const std::string &topic) -> void; + auto push_back(const std::string &message, const std::string &topic, bool wait) -> void; + auto push_front(const std::string &message, const std::string &topic, bool wait) -> void; auto getMessage() -> const BufferedMsg; auto count() const -> size_t; auto toJson(JsonDocument &doc, const std::string &element_name) const -> void; @@ -46,13 +48,14 @@ namespace fabomatic private: std::string_view mqtt_value; std::string_view mqtt_topic; + bool wait_for_answer; public: BufferedQuery() = delete; - constexpr BufferedQuery(const std::string_view &value, const std::string_view &topic) : mqtt_value(value), mqtt_topic(topic){}; + constexpr BufferedQuery(const std::string_view &value, const std::string_view &topic, bool wait) : mqtt_value(value), mqtt_topic(topic), wait_for_answer{wait} {}; [[nodiscard]] auto payload() const -> const std::string override { return std::string(mqtt_value); }; - [[nodiscard]] auto waitForReply() const -> bool override { return false; }; + [[nodiscard]] auto waitForReply() const -> bool override { return wait_for_answer; }; [[nodiscard]] auto buffered() const -> bool override { return false; }; [[nodiscard]] auto topic() const -> std::string { return std::string(mqtt_topic); }; }; diff --git a/src/BufferedMsg.cpp b/src/BufferedMsg.cpp index d875ccee..4dbc2895 100644 --- a/src/BufferedMsg.cpp +++ b/src/BufferedMsg.cpp @@ -6,24 +6,34 @@ namespace fabomatic { - auto Buffer::push_back(const std::string &message, const std::string &topic) -> void + auto Buffer::push_back(const std::string &message, const std::string &topic, bool wait) -> void { if constexpr (conf::debug::ENABLE_BUFFERING) { - BufferedMsg msg{message, topic}; + BufferedMsg msg{message, topic, wait}; msg_queue.push_back(msg); + if (msg_queue.size() > MAX_MESSAGES) + { + msg_queue.pop_front(); + ESP_LOGW(TAG, "Removing first message from the buffer"); + } has_changed = true; ESP_LOGI(TAG, "Buffered %s on %s, %lu messages queued", message.c_str(), topic.c_str(), msg_queue.size()); } } - auto Buffer::push_front(const std::string &message, const std::string &topic) -> void + auto Buffer::push_front(const std::string &message, const std::string &topic, bool wait) -> void { if constexpr (conf::debug::ENABLE_BUFFERING) { - BufferedMsg msg{message, topic}; + BufferedMsg msg{message, topic, wait}; msg_queue.push_front(msg); + if (msg_queue.size() > MAX_MESSAGES) + { + msg_queue.pop_back(); + ESP_LOGW(TAG, "Removing last message from the buffer"); + } has_changed = true; ESP_LOGI(TAG, "Buffered %s on %s, %lu messages queued", message.c_str(), topic.c_str(), msg_queue.size()); @@ -35,7 +45,7 @@ namespace fabomatic if (msg_queue.size() == 0) { ESP_LOGE(TAG, "Calling getMessage() on empty queue!"); - return {"", ""}; + return {"", "", false}; } auto elem = msg_queue.front(); @@ -59,8 +69,9 @@ namespace fabomatic for (auto elem = msg_queue.begin(); elem != msg_queue.end(); elem++) { auto obj_msg = json_elem.createNestedObject(); - obj_msg["topic"] = elem->mqtt_topic; - obj_msg["message"] = elem->mqtt_message; + obj_msg["tp"] = elem->mqtt_topic; + obj_msg["msg"] = elem->mqtt_message; + obj_msg["wait"] = elem->wait_for_answer; } } @@ -78,8 +89,9 @@ namespace fabomatic for (const auto &elem : json_obj["messages"].as()) { - buff.push_back(elem["message"].as(), - elem["topic"].as()); + buff.push_back(elem["msg"].as(), + elem["tp"].as(), + elem["wait"].as()); } ESP_LOGD(TAG, "Buffer::fromJsonElement() : data loaded successfully"); diff --git a/src/FabBackend.cpp b/src/FabBackend.cpp index dd8e11cf..b092b0ca 100644 --- a/src/FabBackend.cpp +++ b/src/FabBackend.cpp @@ -93,7 +93,7 @@ namespace fabomatic if (query.buffered()) { - buffer.push_back(query.payload(), topic); + buffer.push_back(query.payload(), topic, query.waitForReply()); } return false; } @@ -391,7 +391,22 @@ namespace fabomatic static_assert(std::is_base_of::value, "RespT must inherit from Response"); QueryT query{args...}; - if (isOnline()) + auto nb_tries = 0; + while (isOnline() && hasBufferedMsg() && !transmitBuffer() && nb_tries < 3) + { + // To preserve chronological order, we cannot send new messages until the old ones have been sent. + ESP_LOGW(TAG, "Online with pending messages that could not be transmitted, retrying..."); + + Tasks::delay(250ms); + + if (!isOnline()) + { + connect(); + } + nb_tries++; + } + + if (isOnline() && !hasBufferedMsg()) { if (publishWithReply(query)) { @@ -413,7 +428,7 @@ namespace fabomatic if (query.buffered()) { - buffer.push_back(query.payload(), topic); + buffer.push_back(query.payload(), topic, query.waitForReply()); } return std::make_unique(false); @@ -432,7 +447,22 @@ namespace fabomatic static_assert(std::is_base_of::value, "QueryT must inherit from Query"); QueryT query{args...}; - if (isOnline()) + auto nb_tries = 0; + while (isOnline() && hasBufferedMsg() && !transmitBuffer() && nb_tries < 3) + { + // To preserve chronological order, we cannot send new messages until the old ones have been sent. + ESP_LOGW(TAG, "Online with pending messages that could not be transmitted, retrying..."); + + Tasks::delay(250ms); + + if (!isOnline()) + { + connect(); + } + nb_tries++; + } + + if (isOnline() && !hasBufferedMsg()) { if (publish(query)) { @@ -447,7 +477,7 @@ namespace fabomatic if (query.buffered()) { - buffer.push_back(query.payload(), topic); + buffer.push_back(query.payload(), topic, query.waitForReply()); } return false; } @@ -548,15 +578,31 @@ namespace fabomatic { while (hasBufferedMsg() && isOnline()) { + ESP_LOGD(TAG, "Retransmitting buffered messages..."); const auto &msg = buffer.getMessage(); - const BufferedQuery bq{msg.mqtt_message, msg.mqtt_topic}; - if (!publish(bq)) + const BufferedQuery bq{msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer}; + if (bq.waitForReply()) { - // Will try again - buffer.push_front(msg.mqtt_message, msg.mqtt_topic); - break; + if (!publishWithReply(bq)) + { + ESP_LOGW(TAG, "Retransmitting buffered message failed!"); + // Will try again + buffer.push_front(msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer); + break; + } + } + else + { + if (!publish(bq)) + { + // Will try again + ESP_LOGW(TAG, "Retransmitting buffered message failed!"); + buffer.push_front(msg.mqtt_message, msg.mqtt_topic, msg.wait_for_answer); + break; + } } } + last_reply = ""; return !hasBufferedMsg(); }