Skip to content

Commit

Permalink
More events output for the gateway application.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Aug 25, 2024
1 parent b0fa25d commit b03bfe4
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 12 deletions.
27 changes: 26 additions & 1 deletion gateway/app/gateway/GatewayApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bool GatewayApp::start(int argc, const char* argv[])
{
auto session = std::make_unique<GatewaySession>(m_io, m_logger, m_config, std::move(clientSocket));

session->setTermpReqCb(
session->setTermReqCb(
[this, sessionPtr = session.get()]()
{
auto iter =
Expand All @@ -87,6 +87,31 @@ bool GatewayApp::start(int argc, const char* argv[])
m_sessions.erase(iter);
});

session->setClientIdReportCb(
[this, sessionPtr = session.get()](const std::string& clientId)
{
auto iter =
std::find_if(
m_sessions.begin(), m_sessions.end(),
[sessionPtr, &clientId](auto& s)
{
assert(s);
return (s.get() != sessionPtr) && (clientId == s->clientId());
});

if (iter == m_sessions.end()) {
return;
}

boost::asio::post(
m_io,
[this, iter]()
{
m_sessions.erase(iter);
});

});

if (!session->start()) {
m_logger.error() << "Failed to start session" << std::endl;
return;
Expand Down
1 change: 1 addition & 0 deletions gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void GatewayIoClientAcceptor_Udp::doAccept()
}

// Received previous broadcast, ignoring...
logger().info() << "Ignoring received recently broadcast gateway data, not valid input" << std::endl;
m_lastBroadcastData.clear();
doAccept();
return;
Expand Down
9 changes: 9 additions & 0 deletions gateway/app/gateway/GatewayIoClientSocket_Udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@
namespace cc_mqttsn_gateway_app
{

GatewayIoClientSocket_Udp::GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& loggerParam, const Endpoint& endpoint) :
Base(io, loggerParam),
m_endpoint(endpoint)
{
logger().info() << "New UDP client connection from: " << m_endpoint << std::endl;
};

GatewayIoClientSocket_Udp::~GatewayIoClientSocket_Udp()
{
logger().info() << "Terminated UDP client connection from: " << m_endpoint << std::endl;

if (m_socketDeletedCb) {
m_socketDeletedCb(m_endpoint);
}
Expand Down
6 changes: 1 addition & 5 deletions gateway/app/gateway/GatewayIoClientSocket_Udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ class GatewayIoClientSocket_Udp final : public GatewayIoClientSocket
public:
using Endpoint = boost::asio::ip::udp::endpoint;

explicit GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& logger, const Endpoint& endpoint) :
Base(io, logger),
m_endpoint(endpoint)
{
};
explicit GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& loggerParam, const Endpoint& endpoint);

virtual ~GatewayIoClientSocket_Udp();

Expand Down
25 changes: 22 additions & 3 deletions gateway/app/gateway/GatewaySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ GatewaySession::GatewaySession(
m_logger(logger),
m_config(config),
m_timer(io),
m_reconnectTimer(io),
m_clientSocket(std::move(clientSocket)),
m_sessionPtr(std::make_unique<cc_mqttsn_gateway::Session>()),
m_session(m_sessionPtr.get())
Expand All @@ -42,10 +43,16 @@ GatewaySession::GatewaySession(
m_logger(logger),
m_config(config),
m_timer(io),
m_reconnectTimer(io),
m_session(session)
{
}

GatewaySession::~GatewaySession()
{
m_logger.info() << "Terminating session for client: " << m_clientId << std::endl;
}

bool GatewaySession::start()
{
assert(m_termReqCb);
Expand Down Expand Up @@ -125,6 +132,7 @@ void GatewaySession::doBrokerConnect()
[this]()
{
m_session->setBrokerConnected(true);
m_brokerConnected = true;
}
);

Expand All @@ -135,6 +143,7 @@ void GatewaySession::doBrokerConnect()
m_session->setBrokerConnected(false);
}

m_brokerConnected = false;
doBrokerReconnect();
});

Expand All @@ -147,10 +156,14 @@ void GatewaySession::doBrokerConnect()

void GatewaySession::doBrokerReconnect()
{
boost::asio::post(
m_io,
[this]()
m_reconnectTimer.expires_after(std::chrono::milliseconds(100));
m_reconnectTimer.async_wait(
[this](const boost::system::error_code& ec)
{
if (ec == boost::asio::error::operation_aborted) {
return;
}

doBrokerConnect();
});
}
Expand Down Expand Up @@ -212,6 +225,12 @@ bool GatewaySession::startSession()
m_session->setClientConnectedReportCb(
[this](const std::string& clientId)
{
m_logger.info() << "Connected client: " << clientId << std::endl;

m_clientId = clientId;
assert(m_clientIdReportCb);
m_clientIdReportCb(clientId);

auto& predefinedTopics = m_config.predefinedTopics();

auto applyForClient =
Expand Down
23 changes: 20 additions & 3 deletions gateway/app/gateway/GatewaySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,29 @@ class GatewaySession
const cc_mqttsn_gateway::Config& config,
cc_mqttsn_gateway::Session* session);

~GatewaySession();

bool start();

using TermpReqCb = std::function<void ()>;
const std::string& clientId() const
{
return m_clientId;
}

using TermReqCb = std::function<void ()>;
template <typename TFunc>
void setTermpReqCb(TFunc&& func)
void setTermReqCb(TFunc&& func)
{
m_termReqCb = std::forward<TFunc>(func);
}

using ClientIdReportCb = std::function<void (const std::string& clientId)>;
template <typename TFunc>
void setClientIdReportCb(TFunc&& func)
{
m_clientIdReportCb = std::forward<TFunc>(func);
}

private:
using Timer = boost::asio::steady_timer;
using TimestampClock = std::chrono::steady_clock;
Expand All @@ -70,11 +84,14 @@ class GatewaySession
GatewayLogger& m_logger;
const cc_mqttsn_gateway::Config& m_config;
Timer m_timer;
Timer m_reconnectTimer;
GatewayIoClientSocketPtr m_clientSocket;
GatewayIoBrokerSocketPtr m_brokerSocket;
std::unique_ptr<cc_mqttsn_gateway::Session> m_sessionPtr;
cc_mqttsn_gateway::Session* m_session = nullptr;
TermpReqCb m_termReqCb;
TermReqCb m_termReqCb;
ClientIdReportCb m_clientIdReportCb;
std::string m_clientId;
DataBuf m_brokerData;
Timestamp m_tickReqTs;
std::list<Ptr> m_fwdEncSessions;
Expand Down

0 comments on commit b03bfe4

Please sign in to comment.