From 32268f88be041cd82d094c00dde8a123fd0a8136 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Fri, 24 May 2024 17:03:26 +1000 Subject: [PATCH] Saving work on gateway discovery functionality. --- client/lib/include/cc_mqttsn_client/common.h | 21 +- client/lib/src/ClientImpl.cpp | 196 +++++++++++++------ client/lib/src/ClientImpl.h | 10 +- client/lib/src/ConfigState.h | 3 + client/lib/src/ExtConfig.h | 4 +- gateway/lib/src/session_op/Encapsulate.cpp | 1 + 6 files changed, 162 insertions(+), 73 deletions(-) diff --git a/client/lib/include/cc_mqttsn_client/common.h b/client/lib/include/cc_mqttsn_client/common.h index 74416bc..bf5f6e0 100644 --- a/client/lib/include/cc_mqttsn_client/common.h +++ b/client/lib/include/cc_mqttsn_client/common.h @@ -66,10 +66,11 @@ typedef enum /// @brief Status of the gateway typedef enum { - CC_MqttsnGwStatus_Invalid, ///< Invalid value, should never be used - CC_MqttsnGwStatus_Available, ///< The gateway is available. - CC_MqttsnGwStatus_TimedOut, ///< The gateway hasn't advertised its presence in time, assumed disconnected. - CC_MqttsnGwStatus_Discarded ///< The gateway info was discarded using cc_mqttsn_client_discard_gw() or cc_mqttsn_client_discard_all_gw(). + CC_MqttsnGwStatus_AddedByGateway, ///< Added by the @b ADVERTISE or @b GWINFO sent by the gateway messages + CC_MqttsnGwStatus_AddedByClient, ///< Added by the @b GWINFO message sent by another client. + CC_MqttsnGwStatus_UpdatedByClient, ///< The gateway's address was updated by another client. + CC_MqttsnGwStatus_Alive, ///< The @b ADVERTISE or @b GWINFO message have been received from the gateway indicating it's alive. + CC_MqttsnGwStatus_Removed, ///< The gateway hasn't advertised its presence in time, assumed no longer available. } CC_MqttsnGwStatus; /// @brief Status of the asynchronous operation @@ -124,13 +125,6 @@ typedef struct bool retain; ///< Retain flag of the message. } CC_MqttsnMessageInfo; -/// @brief Tracked gateway status information -typedef struct -{ - unsigned char m_gwId; ///< Gateway ID - CC_MqttsnGwStatus m_status; ///< Gateway status -} CC_MqttsnGatewayStatusInfo; - /// @brief Gateway information typedef struct { @@ -175,8 +169,9 @@ typedef void (*CC_MqttsnSendOutputDataCb)(void* data, const unsigned char* buf, /// cc_mqttsn_client_set_gw_status_report_callback() function. /// @param[in] data Pointer to user data object, passed as last parameter to /// cc_mqttsn_client_set_gw_status_report_callback() function. -/// @param[in] info Gateway status info. -typedef void (*CC_MqttsnGwStatusReportCb)(void* data, const CC_MqttsnGatewayStatusInfo* info); +/// @param[in] status Current status of the gateway. +/// @param[in] info Currently stored gateway information. +typedef void (*CC_MqttsnGwStatusReportCb)(void* data, CC_MqttsnGwStatus status, const CC_MqttsnGatewayInfo* info); /// @brief Callback used to report unsolicited disconnection of the gateway. /// @param[in] data Pointer to user data object, passed as the last parameter to diff --git a/client/lib/src/ClientImpl.cpp b/client/lib/src/ClientImpl.cpp index 4f708c7..160cfc6 100644 --- a/client/lib/src/ClientImpl.cpp +++ b/client/lib/src/ClientImpl.cpp @@ -12,6 +12,7 @@ #include "comms/process.h" #include "comms/units.h" #include "comms/util/ScopeGuard.h" +#include "comms/util/assign.h" #include #include @@ -53,7 +54,8 @@ void updateEc(CC_MqttsnErrorCode* ec, CC_MqttsnErrorCode val) } // namespace ClientImpl::ClientImpl() : - m_gwDiscoveryTimer(m_timerMgr.allocTimer()) + m_gwDiscoveryTimer(m_timerMgr.allocTimer()), + m_sendGwinfoTimer(m_timerMgr.allocTimer()) { // TODO: check validity of timer in during intialization static_cast(m_searchOpAlloc); @@ -461,11 +463,13 @@ void ClientImpl::handle(AdvertiseMsg& msg) if (iter != m_clientState.m_gwInfos.end()) { iter->m_expiryTimestamp = m_clientState.m_timestamp + comms::units::getMilliseconds(msg.field_duration()); + reportGwStatus(CC_MqttsnGwStatus_Alive, *iter); return; } if (m_clientState.m_gwInfos.max_size() <= m_clientState.m_gwInfos.size()) { // Ignore new gateways if they cannot be stored + errorLog("Failed to store the new gateway information, due to insufficient storage"); return; } @@ -474,25 +478,38 @@ void ClientImpl::handle(AdvertiseMsg& msg) info.m_gwId = msg.field_gwId().value(); info.m_expiryTimestamp = m_clientState.m_timestamp + comms::units::getMilliseconds(msg.field_duration()); - if (m_gatewayStatusReportCb != nullptr) { - auto cbInfo = CC_MqttsnGatewayStatusInfo(); - cbInfo.m_gwId = info.m_gwId; - cbInfo.m_status = CC_MqttsnGwStatus_Available; - m_gatewayStatusReportCb(m_gatewayStatusReportData, &cbInfo); - } + reportGwStatus(CC_MqttsnGwStatus_AddedByGateway, info); +} + +void ClientImpl::handle(SearchgwMsg& msg) +{ + static_assert(Config::HasGatewayDiscovery); + if (m_gwinfoDelayReqCb == nullptr) { + // The application didn't provide a callback to inquire about the delay for resonditing to SEARCHGW + return; + } + + // TODO: check active + + auto delay = m_gwinfoDelayReqCb(m_gwinfoDelayReqData); + if (delay == 0U) { + // The application rejected sending GWINFO on behalf of gateway + return; + } + + m_pendingGwinfoBroadcastRadius = msg.field_radius().value(); + m_sendGwinfoTimer.wait(delay, &ClientImpl::sendGwinfoCb, this); } void ClientImpl::handle(GwinfoMsg& msg) { - auto onExit = - comms::util::makeScopeGuard( - [this, &msg]() - { - for (auto& op : m_searchOps) { - COMMS_ASSERT(op); - op->handle(msg); - } - }); + static_assert(Config::HasGatewayDiscovery); + m_sendGwinfoTimer.cancel(); // Do not send GWINFO if pending + + for (auto& op : m_searchOps) { + COMMS_ASSERT(op); + op->handle(msg); + } auto iter = std::find_if( @@ -509,21 +526,30 @@ void ClientImpl::handle(GwinfoMsg& msg) } if (iter->m_addr.max_size() < addr.size()) { - iter->m_addr.clear(); + errorLog("The gateway address reported by the client doesn't fit into the dedicated address storage, ignoring"); + return; + } + + if ((addr.size() == iter->m_addr.size()) && + (std::equal(addr.begin(), addr.end(), iter->m_addr.begin()))) { + // The address is already recorded. return; } iter->m_addr.assign(addr.begin(), addr.end()); + reportGwStatus(CC_MqttsnGwStatus_UpdatedByClient, *iter); return; } auto& addr = msg.field_gwAdd().value(); if (addr.empty()) { + reportGwStatus(CC_MqttsnGwStatus_Alive, *iter); return; } if (m_clientState.m_gwInfos.max_size() <= m_clientState.m_gwInfos.size()) { // Not enough space + errorLog("Failed to store the new gateway information, due to insufficient storage"); return; } @@ -531,7 +557,10 @@ void ClientImpl::handle(GwinfoMsg& msg) auto& info = m_clientState.m_gwInfos.back(); info.m_gwId = msg.field_gwId().value(); info.m_addr.assign(addr.begin(), addr.end()); - // TODO: report gateway discovery + info.m_expiryTimestamp = m_clientState.m_timestamp + m_configState.m_gwAdvTimeoutMs; + monitorGatewayExpiry(); + + reportGwStatus(CC_MqttsnGwStatus_AddedByClient, info); } #endif // #if CC_MQTTSN_HAS_GATEWAY_DISCOVERY @@ -658,36 +687,37 @@ void ClientImpl::handle(GwinfoMsg& msg) // #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2 -// void ClientImpl::handle(ProtMessage& msg) -// { -// static_cast(msg); -// if (m_sessionState.m_disconnecting) { -// return; -// } +void ClientImpl::handle(ProtMessage& msg) +{ + static_cast(msg); + // if (m_sessionState.m_disconnecting) { + // return; + // } -// // During the dispatch to callbacks can be called and new ops issues, -// // the m_ops vector can be resized and iterators invalidated. -// // As the result, the iteration needs to be performed using indices -// // instead of iterators. -// // Also do not dispatch the message to new ops. -// auto count = m_ops.size(); -// for (auto idx = 0U; idx < count; ++idx) { -// auto* op = m_ops[idx]; -// if (op == nullptr) { -// // ops can be deleted, but the pointer will be nullified -// // until last api guard. -// continue; -// } + // During the dispatch to callbacks can be called and new ops issues, + // the m_ops vector can be resized and iterators invalidated. + // As the result, the iteration needs to be performed using indices + // instead of iterators. + // Also do not dispatch the message to new ops. + auto count = m_ops.size(); + for (auto idx = 0U; idx < count; ++idx) { + auto* op = m_ops[idx]; + if (op == nullptr) { + // ops can be deleted, but the pointer will be nullified + // until last api guard. + continue; + } -// msg.dispatch(*op); + msg.dispatch(*op); -// // After message dispatching the whole session may be in terminating state -// // Don't continue iteration -// if (m_sessionState.m_disconnecting) { -// break; -// } -// } -// } + // After message dispatching the whole session may be in terminating state + // Don't continue iteration + + // if (m_sessionState.m_disconnecting) { + // break; + // } + } +} CC_MqttsnErrorCode ClientImpl::sendMessage(const ProtMessage& msg, unsigned broadcastRadius) { @@ -1169,15 +1199,13 @@ void ClientImpl::monitorGatewayExpiry() m_clientState.m_gwInfos.begin(), m_clientState.m_gwInfos.end(), [](const auto& first, const auto& second) { - if (first.m_expiryTimestamp == 0) { - return false; - } + COMMS_ASSERT(first.m_expiryTimestamp != 0); + COMMS_ASSERT(second.m_expiryTimestamp != 0); return first.m_expiryTimestamp < second.m_expiryTimestamp; }); - if ((iter == m_clientState.m_gwInfos.end()) || - (iter->m_expiryTimestamp == 0U)) { + if (iter == m_clientState.m_gwInfos.end()) { return; } @@ -1194,12 +1222,7 @@ void ClientImpl::gwExpiryTimeout() continue; } - if (m_gatewayStatusReportCb != nullptr) { - auto cbInfo = CC_MqttsnGatewayStatusInfo(); - cbInfo.m_gwId = info.m_gwId; - cbInfo.m_status = CC_MqttsnGwStatus_TimedOut; - m_gatewayStatusReportCb(m_gatewayStatusReportData, &cbInfo); - } + reportGwStatus(CC_MqttsnGwStatus_Removed, info); } m_clientState.m_gwInfos.erase( @@ -1215,6 +1238,60 @@ void ClientImpl::gwExpiryTimeout() } } +void ClientImpl::reportGwStatus(CC_MqttsnGwStatus status, const ClientState::GwInfo& info) +{ + if constexpr (Config::HasGatewayDiscovery) { + if (m_gatewayStatusReportCb == nullptr) { + return; + } + + auto gwInfo = CC_MqttsnGatewayInfo(); + gwInfo.m_gwId = info.m_gwId; + gwInfo.m_addr = info.m_addr.data(); + comms::cast_assign(gwInfo.m_addrLen) = info.m_addr.size(); + + m_gatewayStatusReportCb(m_gatewayStatusReportData, status, &gwInfo); + } +} + +void ClientImpl::sendGwinfo() +{ + if constexpr (Config::HasGatewayDiscovery) { + auto iter = + std::max_element( + m_clientState.m_gwInfos.begin(), m_clientState.m_gwInfos.end(), + [](auto& first, auto& second) + { + if (first.m_addr.empty()) { + // Prefer one with the address + return !second.m_addr.empty(); + } + + if (second.m_addr.empty()) { + return false; + } + + return first.m_expiryTimestamp < second.m_expiryTimestamp; + }); + + if ((iter == m_clientState.m_gwInfos.end()) || + (iter->m_addr.empty())) { + // None of the gateways have known address + return; + } + + GwinfoMsg msg; + if (msg.field_gwAdd().value().max_size() < iter->m_addr.size()) { + errorLog("Cannot fit the known gateway address into the GWINFO message address storage"); + return; + } + + msg.field_gwId().setValue(iter->m_gwId); + comms::util::assign(msg.field_gwAdd().value(), iter->m_addr.begin(), iter->m_addr.end()); + sendMessage(msg, std::max(m_pendingGwinfoBroadcastRadius, 1U)); + } +} + void ClientImpl::gwExpiryTimeoutCb(void* data) { if constexpr (Config::HasGatewayDiscovery) { @@ -1222,4 +1299,11 @@ void ClientImpl::gwExpiryTimeoutCb(void* data) } } +void ClientImpl::sendGwinfoCb(void* data) +{ + if constexpr (Config::HasGatewayDiscovery) { + reinterpret_cast(data)->sendGwinfo(); + } +} + } // namespace cc_mqttsn_client diff --git a/client/lib/src/ClientImpl.h b/client/lib/src/ClientImpl.h index 9a6d800..1e31e9c 100644 --- a/client/lib/src/ClientImpl.h +++ b/client/lib/src/ClientImpl.h @@ -145,6 +145,7 @@ class ClientImpl final : public ProtMsgHandler using Base::handle; #if CC_MQTTSN_HAS_GATEWAY_DISCOVERY virtual void handle(AdvertiseMsg& msg) override; + virtual void handle(SearchgwMsg& msg) override; virtual void handle(GwinfoMsg& msg) override; #endif // #if CC_MQTTSN_HAS_GATEWAY_DISCOVERY // virtual void handle(PublishMsg& msg) override; @@ -159,7 +160,7 @@ class ClientImpl final : public ProtMsgHandler // virtual void handle(PubcompMsg& msg) override; // #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2 -// virtual void handle(ProtMessage& msg) override; + virtual void handle(ProtMessage& msg) override; // -------------------- Ops Access API ----------------------------- @@ -286,7 +287,11 @@ class ClientImpl final : public ProtMsgHandler void monitorGatewayExpiry(); void gwExpiryTimeout(); + void reportGwStatus(CC_MqttsnGwStatus status, const ClientState::GwInfo& info); + void sendGwinfo(); + static void gwExpiryTimeoutCb(void* data); + static void sendGwinfoCb(void* data); friend class ApiEnterGuard; @@ -317,10 +322,10 @@ class ClientImpl final : public ProtMsgHandler ConfigState m_configState; ClientState m_clientState; // SessionState m_sessionState; - // ReuseState m_reuseState; TimerMgr m_timerMgr; TimerMgr::Timer m_gwDiscoveryTimer; + TimerMgr::Timer m_sendGwinfoTimer; unsigned m_apiEnterCount = 0U; OutputBuf m_buf; @@ -352,6 +357,7 @@ class ClientImpl final : public ProtMsgHandler // SendOpsList m_sendOps; OpPtrsList m_ops; + unsigned m_pendingGwinfoBroadcastRadius = 0U; bool m_opsDeleted = false; bool m_preparationLocked = false; }; diff --git a/client/lib/src/ConfigState.h b/client/lib/src/ConfigState.h index bc26662..97ef127 100644 --- a/client/lib/src/ConfigState.h +++ b/client/lib/src/ConfigState.h @@ -17,11 +17,14 @@ struct ConfigState static constexpr unsigned DefaultResponseTimeoutMs = 10000; static constexpr unsigned DefaultRetryCount = 3U; static constexpr unsigned DefaultBroadcastRadius = 3U; + static constexpr unsigned DefaultGwAdvTimeoutMs = 15 * 60 * 1000; // static constexpr unsigned MaxBroadcastRadius = 255U; + unsigned m_responseTimeoutMs = DefaultResponseTimeoutMs; unsigned m_retryCount = DefaultRetryCount; unsigned m_broadcastRadius = DefaultBroadcastRadius; + unsigned m_gwAdvTimeoutMs = DefaultGwAdvTimeoutMs; // CC_MqttsnPublishOrdering m_publishOrdering = CC_MqttsnPublishOrdering_SameQos; // bool m_verifyOutgoingTopic = Config::HasTopicFormatVerification; // bool m_verifyIncomingTopic = Config::HasTopicFormatVerification; diff --git a/client/lib/src/ExtConfig.h b/client/lib/src/ExtConfig.h index 3b6ec83..90b75f5 100644 --- a/client/lib/src/ExtConfig.h +++ b/client/lib/src/ExtConfig.h @@ -17,7 +17,7 @@ namespace cc_mqttsn_client struct ExtConfig : public Config { static constexpr unsigned KeepAliveOpsLimit = HasDynMemAlloc ? 0 : 1U; - static constexpr unsigned AdvertiseTimers = 1U; + static constexpr unsigned DiscoveryTimers = 2U; static constexpr unsigned SearchOpsLimit = HasDynMemAlloc ? 0 : 1U; static constexpr unsigned SearchOpTimers = 1U; static constexpr unsigned ConnectOpsLimit = HasDynMemAlloc ? 0 : 1U; @@ -41,7 +41,7 @@ struct ExtConfig : public Config (RecvOpsLimit > 0U) && (SendOpsLimit > 0U); static constexpr unsigned MaxTimersLimit = - (AdvertiseTimers) + + (DiscoveryTimers) + (SearchOpsLimit * SearchOpTimers) + (ConnectOpsLimit * ConnectOpTimers) + (KeepAliveOpsLimit * KeepAliveOpTimers) + diff --git a/gateway/lib/src/session_op/Encapsulate.cpp b/gateway/lib/src/session_op/Encapsulate.cpp index 769b05a..b26f382 100644 --- a/gateway/lib/src/session_op/Encapsulate.cpp +++ b/gateway/lib/src/session_op/Encapsulate.cpp @@ -104,6 +104,7 @@ void Encapsulate::handle(FwdMsg_SN& msg) if ((!sessionPtr->isRunning()) && (!sessionPtr->start())) { // Error failed to start session; + session().reportError("Failed to start forward encapsulated session"); session().reportFwdEncSessionDeleted(sessionPtr); m_sessions.erase(iter); iter = m_sessions.end();