From 18e5c8cb774043b9871e86b39a2c05fc017c5d18 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Fri, 14 Jun 2024 08:39:47 +1000 Subject: [PATCH] Handling pings in the new client. --- client/app/common/AppClient.cpp | 1 + client/lib/CMakeLists.txt | 1 + client/lib/include/cc_mqttsn_client/common.h | 1 + client/lib/src/ClientImpl.cpp | 262 ++++++------------- client/lib/src/ClientImpl.h | 25 +- client/lib/src/ExtConfig.h | 4 +- client/lib/src/SessionState.h | 4 +- client/lib/src/op/ConnectOp.cpp | 20 ++ client/lib/src/op/KeepAliveOp.cpp | 126 +++++++++ client/lib/src/op/KeepAliveOp.h | 57 ++++ client/lib/src/op/Op.h | 2 +- client/lib/templ/client.cpp.templ | 8 +- client/lib/templ/client.h.templ | 8 +- client/lib/test/UnitTestCommonBase.cpp | 12 + client/lib/test/UnitTestCommonBase.h | 13 +- client/lib/test/UnitTestDefaultBase.cpp | 12 + 16 files changed, 347 insertions(+), 209 deletions(-) create mode 100644 client/lib/src/op/KeepAliveOp.cpp create mode 100644 client/lib/src/op/KeepAliveOp.h diff --git a/client/app/common/AppClient.cpp b/client/app/common/AppClient.cpp index 018d641..9cef20a 100644 --- a/client/app/common/AppClient.cpp +++ b/client/app/common/AppClient.cpp @@ -294,6 +294,7 @@ std::string AppClient::toString(CC_MqttsnAsyncOpStatus val) /* CC_MqttsnAsyncOpStatus_Aborted */ "Aborted", /* CC_MqttsnAsyncOpStatus_OutOfMemory */ "Out of Memory", /* CC_MqttsnAsyncOpStatus_BadParam */ "Bad Param", + /* CC_MqttsnAsyncOpStatus_GatewayDisconnected */ "Gateway Disconnected", }; static constexpr std::size_t MapSize = std::extent::value; diff --git a/client/lib/CMakeLists.txt b/client/lib/CMakeLists.txt index d312e50..88e6653 100644 --- a/client/lib/CMakeLists.txt +++ b/client/lib/CMakeLists.txt @@ -166,6 +166,7 @@ function (gen_lib_mqttsn_client config_file) message (STATUS "Defining library ${lib_name}") set (src src/op/ConnectOp.cpp + src/op/KeepAliveOp.cpp src/op/Op.cpp src/op/SearchOp.cpp src/ClientImpl.cpp diff --git a/client/lib/include/cc_mqttsn_client/common.h b/client/lib/include/cc_mqttsn_client/common.h index 785a5b5..d154ac2 100644 --- a/client/lib/include/cc_mqttsn_client/common.h +++ b/client/lib/include/cc_mqttsn_client/common.h @@ -91,6 +91,7 @@ typedef enum CC_MqttsnAsyncOpStatus_Aborted = 3, ///< The operation has been aborted before completion due to client's side operation. CC_MqttsnAsyncOpStatus_OutOfMemory = 4, ///< The client library wasn't able to allocate necessary memory. CC_MqttsnAsyncOpStatus_BadParam = 5, ///< Bad value has been returned from the relevant callback. + CC_MqttsnAsyncOpStatus_GatewayDisconnected = 6, ///< Gateway disconnection detected during the operation execution. CC_MqttsnAsyncOpStatus_ValuesLimit ///< Limit for the values } CC_MqttsnAsyncOpStatus; diff --git a/client/lib/src/ClientImpl.cpp b/client/lib/src/ClientImpl.cpp index b28316e..cb2bb1e 100644 --- a/client/lib/src/ClientImpl.cpp +++ b/client/lib/src/ClientImpl.cpp @@ -169,6 +169,18 @@ op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec) break; } + if (m_sessionState.m_disconnecting) { + errorLog("Session disconnection is in progress, cannot initiate connection."); + updateEc(ec, CC_MqttsnErrorCode_Disconnecting); + break; + } + + if (m_sessionState.m_connected) { + errorLog("Client is already connected."); + updateEc(ec, CC_MqttsnErrorCode_AlreadyConnected); + break; + } + if (m_ops.max_size() <= m_ops.size()) { errorLog("Cannot start connect operation, retry in next event loop iteration."); updateEc(ec, CC_MqttsnErrorCode_RetryLater); @@ -198,74 +210,6 @@ op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec) return op; } -// op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec) -// { -// op::ConnectOp* connectOp = nullptr; -// do { -// m_clientState.m_networkDisconnected = false; - -// if (!m_clientState.m_initialized) { -// if (m_apiEnterCount > 0U) { -// errorLog("Cannot prepare connect from within callback"); -// updateEc(ec, CC_MqttsnErrorCode_RetryLater); -// break; -// } - -// auto initEc = initInternal(); -// if (initEc != CC_MqttsnErrorCode_Success) { -// updateEc(ec, initEc); -// break; -// } -// } - -// if (!m_connectOps.empty()) { -// // Already allocated -// errorLog("Another connect operation is in progress."); -// updateEc(ec, CC_MqttsnErrorCode_Busy); -// break; -// } - -// if (m_sessionState.m_disconnecting) { -// errorLog("Session disconnection is in progress, cannot initiate connection."); -// updateEc(ec, CC_MqttsnErrorCode_Disconnecting); -// break; -// } - -// if (m_sessionState.m_connected) { -// errorLog("Client is already connected."); -// updateEc(ec, CC_MqttsnErrorCode_AlreadyConnected); -// break; -// } - -// if (m_ops.max_size() <= m_ops.size()) { -// errorLog("Cannot start connect operation, retry in next event loop iteration."); -// updateEc(ec, CC_MqttsnErrorCode_RetryLater); -// break; -// } - -// if (m_preparationLocked) { -// errorLog("Another operation is being prepared, cannot prepare \"connect\" without \"send\" or \"cancel\" of the previous."); -// updateEc(ec, CC_MqttsnErrorCode_PreparationLocked); -// break; -// } - -// auto ptr = m_connectOpAlloc.alloc(*this); -// if (!ptr) { -// errorLog("Cannot allocate new connect operation."); -// updateEc(ec, CC_MqttsnErrorCode_OutOfMemory); -// break; -// } - -// m_preparationLocked = true; -// m_ops.push_back(ptr.get()); -// m_connectOps.push_back(std::move(ptr)); -// connectOp = m_connectOps.back().get(); -// updateEc(ec, CC_MqttsnErrorCode_Success); -// } while (false); - -// return connectOp; -// } - // op::DisconnectOp* ClientImpl::disconnectPrepare(CC_MqttsnErrorCode* ec) // { // op::DisconnectOp* disconnectOp = nullptr; @@ -729,7 +673,7 @@ void ClientImpl::handle(GwinfoMsg& msg) // if (!msg.transportField_flags().field_dup().getBitValue_bit()) { // errorLog("Non duplicate PUBLISH with packet ID in use"); -// brokerDisconnected(CC_MqttsnBrokerDisconnectReason_ProtocolError); +// gatewayDisconnected(CC_MqttsnGatewayDisconnectReason_ProtocolError); // return; // } // else { @@ -800,12 +744,21 @@ void ClientImpl::handle(GwinfoMsg& msg) // #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2 -void ClientImpl::handle(ProtMessage& msg) +void ClientImpl::handle([[maybe_unused]] PingreqMsg& msg) +{ + if ((m_sessionState.m_disconnecting) || (!m_sessionState.m_connected)) { + return; + } + + PingrespMsg respMsg; + sendMessage(respMsg); +} + +void ClientImpl::handle([[maybe_unused]] ProtMessage& msg) { - static_cast(msg); - // if (m_sessionState.m_disconnecting) { - // return; - // } + if (m_sessionState.m_disconnecting) { + return; + } // During the dispatch to callbacks can be called and new ops issues, // the m_ops vector can be resized and iterators invalidated. @@ -826,9 +779,9 @@ void ClientImpl::handle(ProtMessage& msg) // After message dispatching the whole session may be in terminating state // Don't continue iteration - // if (m_sessionState.m_disconnecting) { - // break; - // } + if (m_sessionState.m_disconnecting) { + break; + } } } @@ -853,9 +806,9 @@ CC_MqttsnErrorCode ClientImpl::sendMessage(const ProtMessage& msg, unsigned broa COMMS_ASSERT(m_sendOutputDataCb != nullptr); m_sendOutputDataCb(m_sendOutputDataData, &m_buf[0], static_cast(len), broadcastRadius); - // for (auto& opPtr : m_keepAliveOps) { - // opPtr->messageSent(); - // } + for (auto& opPtr : m_keepAliveOps) { + opPtr->messageSent(); + } return CC_MqttsnErrorCode_Success; } @@ -875,7 +828,7 @@ void ClientImpl::opComplete(const op::Op* op) static const ExtraCompleteFunc Map[] = { /* Type_Search */ &ClientImpl::opComplete_Search, /* Type_Connect */ &ClientImpl::opComplete_Connect, - // /* Type_KeepAlive */ &ClientImpl::opComplete_KeepAlive, + /* Type_KeepAlive */ &ClientImpl::opComplete_KeepAlive, // /* Type_Disconnect */ &ClientImpl::opComplete_Disconnect, // /* Type_Subscribe */ &ClientImpl::opComplete_Subscribe, // /* Type_Unsubscribe */ &ClientImpl::opComplete_Unsubscribe, @@ -895,77 +848,28 @@ void ClientImpl::opComplete(const op::Op* op) (this->*func)(op); } -// void ClientImpl::brokerConnected(bool sessionPresent) -// { -// static_cast(sessionPresent); -// m_clientState.m_firstConnect = false; -// m_sessionState.m_connected = true; - -// do { -// if (sessionPresent) { -// for (auto& sendOpPtr : m_sendOps) { -// sendOpPtr->postReconnectionResend(); -// } - -// for (auto& recvOpPtr : m_recvOps) { -// recvOpPtr->postReconnectionResume(); -// } - -// auto resumeUntilIdx = m_sendOps.size(); -// auto resumeFromIdx = resumeUntilIdx; -// for (auto count = resumeUntilIdx; count > 0U; --count) { -// auto idx = count - 1U; -// auto& sendOpPtr = m_sendOps[idx]; -// if (!sendOpPtr->isPaused()) { -// break; -// } - -// resumeFromIdx = idx; -// } - -// if (resumeFromIdx < resumeUntilIdx) { -// resumeSendOpsSince(static_cast(resumeFromIdx)); -// } - -// break; -// } - -// // Old stored session, terminate pending ops -// for (auto* op : m_ops) { -// auto opType = op->type(); -// if ((opType != op::Op::Type::Type_Send) && -// (opType != op::Op::Type::Type_Recv)) { -// continue; -// } - -// op->terminateOp(CC_MqttsnAsyncOpStatus_Aborted); -// } -// } while (false); - -// createKeepAliveOpIfNeeded(); -// } - -// void ClientImpl::brokerDisconnected( -// CC_MqttsnBrokerDisconnectReason reason, -// CC_MqttsnAsyncOpStatus status) -// { -// m_clientState.m_initialized = false; // Require re-initialization -// m_sessionState.m_connected = false; +void ClientImpl::gatewayConnected() +{ + m_clientState.m_firstConnect = false; + m_sessionState.m_connected = true; + createKeepAliveOpIfNeeded(); +} -// m_sessionState.m_disconnecting = true; -// terminateOps(status, TerminateMode_KeepSendRecvOps); +void ClientImpl::gatewayDisconnected( + CC_MqttsnGatewayDisconnectReason reason, + CC_MqttsnAsyncOpStatus status) +{ + m_clientState.m_initialized = false; // Require re-initialization + m_sessionState.m_connected = false; -// for (auto* op : m_ops) { -// if (op != nullptr) { -// op->connectivityChanged(); -// } -// } + m_sessionState.m_disconnecting = true; + terminateOps(status); -// if (reason < CC_MqttsnBrokerDisconnectReason_ValuesLimit) { -// COMMS_ASSERT(m_brokerDisconnectReportCb != nullptr); -// m_brokerDisconnectReportCb(m_brokerDisconnectReportData, reason); -// } -// } + if (reason < CC_MqttsnGatewayDisconnectReason_ValuesLimit) { + COMMS_ASSERT(m_gatewayDisconnectedReportCb != nullptr); + m_gatewayDisconnectedReportCb(m_gatewayDisconnectedReportData, reason); + } +} // void ClientImpl::reportMsgInfo(const CC_MqttsnMessageInfo& info) // { @@ -1060,40 +964,32 @@ void ClientImpl::doApiExit() m_nextTickProgramCb(m_nextTickProgramData, nextWait); } -// void ClientImpl::createKeepAliveOpIfNeeded() -// { -// if (!m_keepAliveOps.empty()) { -// return; -// } - -// auto ptr = m_keepAliveOpsAlloc.alloc(*this); -// if (!ptr) { -// COMMS_ASSERT(false); // Should not happen -// return; -// } - -// m_ops.push_back(ptr.get()); -// m_keepAliveOps.push_back(std::move(ptr)); -// } +void ClientImpl::createKeepAliveOpIfNeeded() +{ + if (!m_keepAliveOps.empty()) { + return; + } -// void ClientImpl::terminateOps(CC_MqttsnAsyncOpStatus status, TerminateMode mode) -// { -// for (auto* op : m_ops) { -// if (op == nullptr) { -// continue; -// } + auto ptr = m_keepAliveOpsAlloc.alloc(*this); + if (!ptr) { + COMMS_ASSERT(false); // Should not happen + return; + } -// if (mode == TerminateMode_KeepSendRecvOps) { -// auto opType = op->type(); + m_ops.push_back(ptr.get()); + m_keepAliveOps.push_back(std::move(ptr)); +} -// if ((opType == op::Op::Type_Recv) || (opType == op::Op::Type_Send)) { -// continue; -// } -// } +void ClientImpl::terminateOps(CC_MqttsnAsyncOpStatus status) +{ + for (auto* op : m_ops) { + if (op == nullptr) { + continue; + } -// op->terminateOp(status); -// } -// } + op->terminateOp(status); + } +} void ClientImpl::cleanOps() { @@ -1271,10 +1167,10 @@ void ClientImpl::opComplete_Connect(const op::Op* op) eraseFromList(op, m_connectOps); } -// void ClientImpl::opComplete_KeepAlive(const op::Op* op) -// { -// eraseFromList(op, m_keepAliveOps); -// } +void ClientImpl::opComplete_KeepAlive(const op::Op* op) +{ + eraseFromList(op, m_keepAliveOps); +} // void ClientImpl::opComplete_Disconnect(const op::Op* op) // { diff --git a/client/lib/src/ClientImpl.h b/client/lib/src/ClientImpl.h index b1333e7..192e49c 100644 --- a/client/lib/src/ClientImpl.h +++ b/client/lib/src/ClientImpl.h @@ -19,7 +19,7 @@ #include "op/ConnectOp.h" // #include "op/DisconnectOp.h" -// #include "op/KeepAliveOp.h" +#include "op/KeepAliveOp.h" #include "op/Op.h" // #include "op/RecvOp.h" #include "op/SearchOp.h" @@ -160,16 +160,17 @@ class ClientImpl final : public ProtMsgHandler // virtual void handle(PubcompMsg& msg) override; // #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2 + virtual void handle(PingreqMsg& msg) override; virtual void handle(ProtMessage& msg) override; // -------------------- Ops Access API ----------------------------- CC_MqttsnErrorCode sendMessage(const ProtMessage& msg, unsigned broadcastRadius = 0); void opComplete(const op::Op* op); - // void brokerConnected(bool sessionPresent); - // void brokerDisconnected( - // CC_MqttsnBrokerDisconnectReason reason = CC_MqttsnBrokerDisconnectReason_ValuesLimit, - // CC_MqttsnAsyncOpStatus status = CC_MqttsnAsyncOpStatus_BrokerDisconnected); + void gatewayConnected(); + void gatewayDisconnected( + CC_MqttsnGatewayDisconnectReason reason = CC_MqttsnGatewayDisconnectReason_ValuesLimit, + CC_MqttsnAsyncOpStatus status = CC_MqttsnAsyncOpStatus_GatewayDisconnected); // void reportMsgInfo(const CC_MqttsnMessageInfo& info); // bool hasPausedSendsBefore(const op::SendOp* sendOp) const; // bool hasHigherQosSendsBefore(const op::SendOp* sendOp, op::Op::Qos qos) const; @@ -236,8 +237,8 @@ class ClientImpl final : public ProtMsgHandler using ConnectOpAlloc = ObjAllocator; using ConnectOpsList = ObjListType; - // using KeepAliveOpAlloc = ObjAllocator; - // using KeepAliveOpsList = ObjListType; + using KeepAliveOpAlloc = ObjAllocator; + using KeepAliveOpsList = ObjListType; // using DisconnectOpAlloc = ObjAllocator; // using DisconnectOpsList = ObjListType; @@ -267,8 +268,8 @@ class ClientImpl final : public ProtMsgHandler void doApiEnter(); void doApiExit(); - // void createKeepAliveOpIfNeeded(); - // void terminateOps(CC_MqttsnAsyncOpStatus status, TerminateMode mode); + void createKeepAliveOpIfNeeded(); + void terminateOps(CC_MqttsnAsyncOpStatus status); void cleanOps(); void errorLogInternal(const char* msg); CC_MqttsnErrorCode initInternal(); @@ -280,7 +281,7 @@ class ClientImpl final : public ProtMsgHandler void opComplete_Search(const op::Op* op); void opComplete_Connect(const op::Op* op); - // void opComplete_KeepAlive(const op::Op* op); + void opComplete_KeepAlive(const op::Op* op); // void opComplete_Disconnect(const op::Op* op); // void opComplete_Subscribe(const op::Op* op); // void opComplete_Unsubscribe(const op::Op* op); @@ -342,8 +343,8 @@ class ClientImpl final : public ProtMsgHandler ConnectOpAlloc m_connectOpAlloc; ConnectOpsList m_connectOps; - // KeepAliveOpAlloc m_keepAliveOpsAlloc; - // KeepAliveOpsList m_keepAliveOps; + KeepAliveOpAlloc m_keepAliveOpsAlloc; + KeepAliveOpsList m_keepAliveOps; // DisconnectOpAlloc m_disconnectOpsAlloc; // DisconnectOpsList m_disconnectOps; diff --git a/client/lib/src/ExtConfig.h b/client/lib/src/ExtConfig.h index 8cf591d..4a1aa1e 100644 --- a/client/lib/src/ExtConfig.h +++ b/client/lib/src/ExtConfig.h @@ -33,8 +33,8 @@ struct ExtConfig : public Config static constexpr unsigned SendOpTimers = 1U; static constexpr bool HasOpsLimit = (SearchOpsLimit > 0U) && - (ConnectOpsLimit > 0U) /* && - (KeepAliveOpsLimit > 0U) && + (ConnectOpsLimit > 0U) && + (KeepAliveOpsLimit > 0U) /* && (DisconnectOpsLimit > 0U) && (SubscribeOpsLimit > 0U) && (UnsubscribeOpsLimit > 0U) && diff --git a/client/lib/src/SessionState.h b/client/lib/src/SessionState.h index d79892a..8e6e0df 100644 --- a/client/lib/src/SessionState.h +++ b/client/lib/src/SessionState.h @@ -16,8 +16,8 @@ struct SessionState static constexpr unsigned DefaultKeepAlive = 60; unsigned m_keepAliveMs = 0U; - // bool m_connected = false; - // bool m_disconnecting = false; + bool m_connected = false; + bool m_disconnecting = false; }; } // namespace cc_mqttsn_client diff --git a/client/lib/src/op/ConnectOp.cpp b/client/lib/src/op/ConnectOp.cpp index 3cdcae9..e4f4049 100644 --- a/client/lib/src/op/ConnectOp.cpp +++ b/client/lib/src/op/ConnectOp.cpp @@ -45,6 +45,16 @@ CC_MqttsnErrorCode ConnectOp::config(const CC_MqttsnConnectConfig* config) return CC_MqttsnErrorCode_BadParam; } + if (client().clientState().m_firstConnect && (!config->m_cleanSession)) { + errorLog("First connect must force clean session"); + return CC_MqttsnErrorCode_BadParam; + } + + if (config->m_duration == 0U) { + errorLog("The connect duration value must be greater than 0"); + return CC_MqttsnErrorCode_BadParam; + } + if (config->m_clientId != nullptr) { m_connectMsg.field_clientId().value() = config->m_clientId; } @@ -111,6 +121,11 @@ CC_MqttsnErrorCode ConnectOp::send(CC_MqttsnConnectCompleteCb cb, void* cbData) return CC_MqttsnErrorCode_BadParam; } + if (m_connectMsg.field_duration().value() == 0U) { + errorLog("The connect operation hasn't been configured properly"); + return CC_MqttsnErrorCode_InsufficientConfig; + } + if (!m_timer.isValid()) { errorLog("The library cannot allocate required number of timers."); return CC_MqttsnErrorCode_InternalError; @@ -190,6 +205,11 @@ void ConnectOp::handle(ConnackMsg& msg) } #endif // #ifdef CC_MQTTSN_CLIENT_HAS_WILL + if (info.m_returnCode == CC_MqttsnReturnCode_Accepted) { + client().sessionState().m_keepAliveMs = comms::units::getMilliseconds(m_connectMsg.field_duration()); + client().gatewayConnected(); + } + completeOpInternal(CC_MqttsnAsyncOpStatus_Complete, &info); } diff --git a/client/lib/src/op/KeepAliveOp.cpp b/client/lib/src/op/KeepAliveOp.cpp new file mode 100644 index 0000000..e2f67c4 --- /dev/null +++ b/client/lib/src/op/KeepAliveOp.cpp @@ -0,0 +1,126 @@ +// +// Copyright 2024 - 2024 (C). Alex Robenko. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "op/KeepAliveOp.h" +#include "ClientImpl.h" + +namespace cc_mqttsn_client +{ + +namespace op +{ + +namespace +{ + +inline KeepAliveOp* asKeepAliveOp(void* data) +{ + return reinterpret_cast(data); +} + +} // namespace + +KeepAliveOp::KeepAliveOp(ClientImpl& client) : + Base(client), + m_pingTimer(client.timerMgr().allocTimer()), + m_recvTimer(client.timerMgr().allocTimer()), + m_respTimer(client.timerMgr().allocTimer()) +{ + COMMS_ASSERT(m_pingTimer.isValid()); + COMMS_ASSERT(m_recvTimer.isValid()); + COMMS_ASSERT(m_respTimer.isValid()); + + restartPingTimer(); +} + +void KeepAliveOp::messageSent() +{ + restartPingTimer(); +} + +void KeepAliveOp::handle([[maybe_unused]] PingrespMsg& msg) +{ + m_respTimer.cancel(); + COMMS_ASSERT(!m_respTimer.isActive()); + setRetryCount(client().configState().m_retryCount); + restartRecvTimer(); +} + +void KeepAliveOp::handle([[maybe_unused]] ProtMessage& msg) +{ + restartRecvTimer(); +} + +Op::Type KeepAliveOp::typeImpl() const +{ + return Type_KeepAlive; +} + +void KeepAliveOp::restartPingTimer() +{ + auto& state = client().sessionState(); + if (state.m_keepAliveMs == 0U) { + return; + } + + m_pingTimer.wait(state.m_keepAliveMs, &KeepAliveOp::sendPingCb, this); +} + +void KeepAliveOp::restartRecvTimer() +{ + auto& state = client().sessionState(); + if (state.m_keepAliveMs == 0U) { + return; + } + + m_recvTimer.wait(state.m_keepAliveMs, &KeepAliveOp::recvTimeoutCb, this); +} + +void KeepAliveOp::sendPing() +{ + if (m_respTimer.isActive()) { + return; // Ping has already been sent, waiting for response + } + + PingreqMsg msg; + client().sendMessage(msg); + auto& state = client().configState(); + m_respTimer.wait(state.m_retryPeriod, &KeepAliveOp::pingTimeoutCb, this); +} + +void KeepAliveOp::pingTimeoutInternal() +{ + COMMS_ASSERT(!m_respTimer.isActive()); + auto remRetries = getRetryCount(); + if (remRetries == 0U) { + errorLog("The gateway did not respond to PING(s)"); + client().gatewayDisconnected(CC_MqttsnGatewayDisconnectReason_NoGatewayResponse); + return; + } + + setRetryCount(remRetries - 1U); + sendPing(); +} + +void KeepAliveOp::sendPingCb(void* data) +{ + asKeepAliveOp(data)->sendPing(); +} + +void KeepAliveOp::recvTimeoutCb(void* data) +{ + asKeepAliveOp(data)->sendPing(); +} + +void KeepAliveOp::pingTimeoutCb(void* data) +{ + asKeepAliveOp(data)->pingTimeoutInternal(); +} + +} // namespace op + +} // namespace cc_mqttsn_client diff --git a/client/lib/src/op/KeepAliveOp.h b/client/lib/src/op/KeepAliveOp.h new file mode 100644 index 0000000..d773c52 --- /dev/null +++ b/client/lib/src/op/KeepAliveOp.h @@ -0,0 +1,57 @@ +// +// Copyright 2024 - 2024 (C). Alex Robenko. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#pragma once + +#include "op/Op.h" +#include "ExtConfig.h" +#include "ProtocolDefs.h" + +#include "TimerMgr.h" + +namespace cc_mqttsn_client +{ + +namespace op +{ + +class KeepAliveOp final : public Op +{ + using Base = Op; +public: + explicit KeepAliveOp(ClientImpl& client); + + void messageSent(); + + using Base::handle; + virtual void handle(PingrespMsg& msg) override; + virtual void handle(ProtMessage& msg) override; + +protected: + virtual Type typeImpl() const override; + +private: + void restartPingTimer(); + void restartRecvTimer(); + void sendPing(); + void pingTimeoutInternal(); + + static void sendPingCb(void* data); + static void recvTimeoutCb(void* data); + static void pingTimeoutCb(void* data); + + TimerMgr::Timer m_pingTimer; + TimerMgr::Timer m_recvTimer; + TimerMgr::Timer m_respTimer; + + static_assert(ExtConfig::KeepAliveOpTimers == 3U); +}; + +} // namespace op + + +} // namespace cc_mqttsn_client diff --git a/client/lib/src/op/Op.h b/client/lib/src/op/Op.h index d7f76b2..858a22f 100644 --- a/client/lib/src/op/Op.h +++ b/client/lib/src/op/Op.h @@ -30,7 +30,7 @@ class Op : public ProtMsgHandler { Type_Search, Type_Connect, - // Type_KeepAlive, + Type_KeepAlive, // Type_Disconnect, // Type_Subscribe, // Type_Unsubscribe, diff --git a/client/lib/templ/client.cpp.templ b/client/lib/templ/client.cpp.templ index 7e7e501..a9d7f01 100644 --- a/client/lib/templ/client.cpp.templ +++ b/client/lib/templ/client.cpp.templ @@ -522,14 +522,14 @@ CC_MqttsnErrorCode cc_mqttsn_##NAME##client_connect_cancel(CC_MqttsnConnectHandl } CC_MqttsnErrorCode cc_mqttsn_##NAME##client_connect( - CC_MqttsnClientHandle handle, - CC_MqttsnConnectConfig* config, - CC_MqttsnWillConfig* willConfig, + CC_MqttsnClientHandle client, + const CC_MqttsnConnectConfig* config, + const CC_MqttsnWillConfig* willConfig, CC_MqttsnConnectCompleteCb cb, void* cbData) { auto ec = CC_MqttsnErrorCode_Success; - auto connect = cc_mqttsn_##NAME##client_connect_prepare(handle, &ec); + auto connect = cc_mqttsn_##NAME##client_connect_prepare(client, &ec); if (connect == nullptr) { return ec; } diff --git a/client/lib/templ/client.h.templ b/client/lib/templ/client.h.templ index c7e7f52..d8e7604 100644 --- a/client/lib/templ/client.h.templ +++ b/client/lib/templ/client.h.templ @@ -371,7 +371,7 @@ CC_MqttsnErrorCode cc_mqttsn_##NAME##client_connect_cancel(CC_MqttsnConnectHandl /// @li @ref cc_mqttsn_##NAME##client_connect_config_will() /// @li @ref cc_mqttsn_##NAME##client_connect_send() /// -/// @param[in] handle Handle returned by @ref cc_mqttsn_##NAME##client_alloc() function. +/// @param[in] client Handle returned by @ref cc_mqttsn_##NAME##client_alloc() function. /// @param[in] config Connection configuration. Can be NULL. /// @param[in] willConfig Will configuration. Can be NULL. /// @param[in] cb Callback to be invoked when "connect" operation is complete. @@ -380,9 +380,9 @@ CC_MqttsnErrorCode cc_mqttsn_##NAME##client_connect_cancel(CC_MqttsnConnectHandl /// @return Result code of the call. /// @ingroup connect CC_MqttsnErrorCode cc_mqttsn_##NAME##client_connect( - CC_MqttsnClientHandle handle, - CC_MqttsnConnectConfig* config, - CC_MqttsnWillConfig* willConfig, + CC_MqttsnClientHandle client, + const CC_MqttsnConnectConfig* config, + const CC_MqttsnWillConfig* willConfig, CC_MqttsnConnectCompleteCb cb, void* cbData); diff --git a/client/lib/test/UnitTestCommonBase.cpp b/client/lib/test/UnitTestCommonBase.cpp index 9720194..14ba906 100644 --- a/client/lib/test/UnitTestCommonBase.cpp +++ b/client/lib/test/UnitTestCommonBase.cpp @@ -54,6 +54,18 @@ UnitTestCommonBase::UnitTestCommonBase(const LibFuncs& funcs) : test_assert(m_funcs.m_search_send != nullptr); test_assert(m_funcs.m_search_cancel != nullptr); test_assert(m_funcs.m_search != nullptr); + test_assert(m_funcs.m_connect_prepare != nullptr); + test_assert(m_funcs.m_connect_set_retry_period != nullptr); + test_assert(m_funcs.m_connect_get_retry_period != nullptr); + test_assert(m_funcs.m_connect_set_retry_count != nullptr); + test_assert(m_funcs.m_connect_get_retry_count != nullptr); + test_assert(m_funcs.m_connect_init_config != nullptr); + test_assert(m_funcs.m_connect_config != nullptr); + test_assert(m_funcs.m_connect_init_config_will != nullptr); + test_assert(m_funcs.m_connect_config_will != nullptr); + test_assert(m_funcs.m_connect_send != nullptr); + test_assert(m_funcs.m_connect_cancel != nullptr); + test_assert(m_funcs.m_connect != nullptr); test_assert(m_funcs.m_set_next_tick_program_callback != nullptr); test_assert(m_funcs.m_set_cancel_next_tick_wait_callback != nullptr); diff --git a/client/lib/test/UnitTestCommonBase.h b/client/lib/test/UnitTestCommonBase.h index 5bbbae7..bcef963 100644 --- a/client/lib/test/UnitTestCommonBase.h +++ b/client/lib/test/UnitTestCommonBase.h @@ -44,8 +44,19 @@ class UnitTestCommonBase CC_MqttsnErrorCode (*m_search_send)(CC_MqttsnSearchHandle, CC_MqttsnSearchCompleteCb, void*) = nullptr; CC_MqttsnErrorCode (*m_search_cancel)(CC_MqttsnSearchHandle) = nullptr; CC_MqttsnErrorCode (*m_search)(CC_MqttsnClientHandle, CC_MqttsnSearchCompleteCb, void*) = nullptr; + CC_MqttsnConnectHandle (*m_connect_prepare)(CC_MqttsnClientHandle, CC_MqttsnErrorCode*) = nullptr; + CC_MqttsnErrorCode (*m_connect_set_retry_period)(CC_MqttsnConnectHandle, unsigned ms) = nullptr; + unsigned (*m_connect_get_retry_period)(CC_MqttsnConnectHandle) = nullptr; + CC_MqttsnErrorCode (*m_connect_set_retry_count)(CC_MqttsnConnectHandle, unsigned count) = nullptr; + unsigned (*m_connect_get_retry_count)(CC_MqttsnConnectHandle) = nullptr; + void (*m_connect_init_config)(CC_MqttsnConnectConfig* config) = nullptr; + CC_MqttsnErrorCode (*m_connect_config)(CC_MqttsnConnectHandle, const CC_MqttsnConnectConfig*) = nullptr; + void (*m_connect_init_config_will)(CC_MqttsnWillConfig*) = nullptr; + CC_MqttsnErrorCode (*m_connect_config_will)(CC_MqttsnConnectHandle, const CC_MqttsnWillConfig*) = nullptr; + CC_MqttsnErrorCode (*m_connect_send)(CC_MqttsnConnectHandle, CC_MqttsnConnectCompleteCb, void*) = nullptr; + CC_MqttsnErrorCode (*m_connect_cancel)(CC_MqttsnConnectHandle) = nullptr; + CC_MqttsnErrorCode (*m_connect)(CC_MqttsnClientHandle, const CC_MqttsnConnectConfig*, const CC_MqttsnWillConfig*, CC_MqttsnConnectCompleteCb, void*) = nullptr; - void (*m_set_next_tick_program_callback)(CC_MqttsnClientHandle, CC_MqttsnNextTickProgramCb, void*) = nullptr; void (*m_set_cancel_next_tick_wait_callback)(CC_MqttsnClientHandle, CC_MqttsnCancelNextTickWaitCb, void*) = nullptr; void (*m_set_send_output_data_callback)(CC_MqttsnClientHandle, CC_MqttsnSendOutputDataCb, void*) = nullptr; diff --git a/client/lib/test/UnitTestDefaultBase.cpp b/client/lib/test/UnitTestDefaultBase.cpp index 672e186..bb133a0 100644 --- a/client/lib/test/UnitTestDefaultBase.cpp +++ b/client/lib/test/UnitTestDefaultBase.cpp @@ -35,6 +35,18 @@ const UnitTestDefaultBase::LibFuncs& UnitTestDefaultBase::getFuncs() funcs.m_search_send = &cc_mqttsn_client_search_send; funcs.m_search_cancel = &cc_mqttsn_client_search_cancel; funcs.m_search = &cc_mqttsn_client_search; + funcs.m_connect_prepare = &cc_mqttsn_client_connect_prepare; + funcs.m_connect_set_retry_period = &cc_mqttsn_client_connect_set_retry_period; + funcs.m_connect_get_retry_period = &cc_mqttsn_client_connect_get_retry_period; + funcs.m_connect_set_retry_count = &cc_mqttsn_client_connect_set_retry_count; + funcs.m_connect_get_retry_count = &cc_mqttsn_client_connect_get_retry_count; + funcs.m_connect_init_config = &cc_mqttsn_client_connect_init_config; + funcs.m_connect_config = &cc_mqttsn_client_connect_config; + funcs.m_connect_init_config_will = &cc_mqttsn_client_connect_init_config_will; + funcs.m_connect_config_will = &cc_mqttsn_client_connect_config_will; + funcs.m_connect_send = &cc_mqttsn_client_connect_send; + funcs.m_connect_cancel = &cc_mqttsn_client_connect_cancel; + funcs.m_connect = &cc_mqttsn_client_connect; funcs.m_set_next_tick_program_callback = &cc_mqttsn_client_set_next_tick_program_callback; funcs.m_set_cancel_next_tick_wait_callback = &cc_mqttsn_client_set_cancel_next_tick_wait_callback;