Skip to content

Commit

Permalink
add exclusion list
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car committed Nov 21, 2023
1 parent 7107404 commit f445eeb
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 69 deletions.
26 changes: 8 additions & 18 deletions src/v2i-hub/TelematicBridgePlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,15 @@
"default": "INFO",
"description": "The log level for this plugin"
},
{
"key": "NATSConnectionTimeOut",
"default": "1000",
"description": "The log level for this plugin"

},
{
"key": "NATSConnectionAttempts",
"default": "10",
"description": "The log level for this plugin"

},
{
"key": "NATSUrl",
"default": "nats://127.0.0.1:4222",
"description": "The log level for this plugin"

"description": "The log level for this plugin"
},
{
"key": "UnitId",
"default": "v2xhub_id",
"description": "The log level for this plugin"

"description": "The log level for this plugin"
},
{
"key": "UnitName",
Expand All @@ -45,8 +31,12 @@
{
"key": "UnitType",
"default": "Infrastructure",
"description": "The log level for this plugin"

"description": "The log level for this plugin"
},
{
"key": "TopicExclusionList",
"default": "System_Alive_CommandPlugin,System_Alive_CARMAStreetsPlugin",
"description": "The list of topics are excluded from the available topic list."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ namespace TelematicBridge
void TelematicBridgePlugin::UpdateConfigSettings()
{
lock_guard<mutex> lock(_configMutex);
GetConfigValue<int64_t>("NATSConnectionTimeOut", _natsConnTimeOut);
GetConfigValue<int>("NATSConnectionAttempts", _natsConnAttempts);
GetConfigValue<string>("NATSUrl", _natsURL);
GetConfigValue<string>("UnitId", _unitId);
GetConfigValue<string>("UnitName", _unitName);
GetConfigValue<string>("UnitType", _unitType);
GetConfigValue<string>("TopicExclusionList", _excludedTopics);
unit_st unit;
unit.unitId = _unitId;
unit.unitName = _unitName;
unit.unitType = _unitType;
if (_telematicUnitPtr)
{
_telematicUnitPtr->setUnit(unit);
_telematicUnitPtr->excludedTopics = _excludedTopics;
}
}

Expand All @@ -72,7 +72,6 @@ namespace TelematicBridge

void TelematicBridgePlugin::OnConfigChanged(const char *key, const char *value)
{
PLOG(logDEBUG1) << "OnConfigChanged called";
PluginClient::OnConfigChanged(key, value);
UpdateConfigSettings();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace TelematicBridge
string _unitType;
string _unitName;
string _natsURL;
string _excludedTopics;
mutex _configMutex;
void OnMessageReceived(IvpMessage *msg);

Expand Down
47 changes: 12 additions & 35 deletions src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,15 @@

namespace TelematicBridge
{
TelematicUnit::TelematicUnit()
{
// natsStatus s;
// s = natsOptions_SetTimeout(_opts, natsConnTimeout);
// if (s == NATS_OK)
// {
// natsOptions_SetMaxReconnect(_opts, natsConnAttempts);
// }

// if (s == NATS_OK)
// {
// s = natsOptions_SetRetryOnFailedConnect(_opts, true, nullptr, nullptr);
// }

// if (s == NATS_OK)
// {
// s = natsOptions_SetURL(_opts, natsURL);
// }

// if (s != NATS_OK)
// {
// nats_PrintLastErrorStack(stderr);
// throw TelematicBridgeException(natsStatus_GetText(s));
// }
}

void TelematicUnit::connect(const string &natsURL, uint16_t natsConnAttempts, uint16_t natsConnTimeout)
{
// Reset connection and registration status based on the latest config update
// Reset connection and registration status
_isConnected = false;
_isRegistered = false;
PLOG(logINFO) << "Trying to connect to " << natsURL << " attempts: " << natsConnAttempts << ", nats connect timeout: " << natsConnTimeout;
// auto s = natsConnection_Connect(&_conn, _opts);
auto s = natsConnection_ConnectTo(&_conn, natsURL.c_str());
PLOG(logINFO) << "natsConnection_Connect returned: " << natsStatus_GetText(s);
PLOG(logINFO) << "NATS connection returned: " << natsStatus_GetText(s);
if (s == NATS_OK)
{
_isConnected = true;
Expand All @@ -45,8 +19,6 @@ namespace TelematicBridge
else
{
_isConnected = false;
nats_PrintLastErrorStack(stderr);
printf("NATS Connection Error: %u - %s\n", s, natsStatus_GetText(s));
throw TelematicBridgeException(natsStatus_GetText(s));
}
}
Expand Down Expand Up @@ -109,7 +81,6 @@ namespace TelematicBridge
else
{
_isRegistered = false;
nats_PrintLastErrorStack(stderr);
PLOG(logERROR) << "NATS regsiter Error: " << s << "-" << natsStatus_GetText(s);
}
natsMsg_Destroy(reply);
Expand Down Expand Up @@ -172,7 +143,6 @@ namespace TelematicBridge
}
else
{
nats_PrintLastErrorStack(stderr);
throw TelematicBridgeException(natsStatus_GetText(s));
}
}
Expand Down Expand Up @@ -201,9 +171,12 @@ namespace TelematicBridge
Json::Value topics;
for (const auto &topic : obj->availableTopics)
{
Json::Value topicJson;
topicJson["name"] = topic;
topics.append(topicJson);
if (!boost::icontains(obj->excludedTopics, topic))
{
Json::Value topicJson;
topicJson["name"] = topic;
topics.append(topicJson);
}
}
payload["topics"] = topics;
Json::FastWriter fasterWirter;
Expand Down Expand Up @@ -231,6 +204,10 @@ namespace TelematicBridge
}
if (root["topics"].isArray())
{
// clear old selected topics
obj->selectedTopics.clear();

// update selected topics with selected topics from latest request
for (auto itr = root["topics"].begin(); itr != root["topics"].end(); itr++)
{
obj->selectedTopics.push_back(itr->asString());
Expand Down
28 changes: 15 additions & 13 deletions src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "ThreadTimer.h"
#include "TelematicBridgeException.h"
#include <jsoncpp/json/json.h>
#include <boost/algorithm/string.hpp>

using namespace std;
using namespace tmx::utils;
Expand All @@ -27,17 +28,18 @@ namespace TelematicBridge
public:
mutex _unitMutex;
mutex _availableTopicsMutex;
atomic<bool> _isConnected {false};
atomic<bool> _isConnected{false};
atomic<bool> _isRegistered{false};
unit_st _unit; // Global variable to store the unit information
vector<string> availableTopics; // Global variable to store available topics
vector<string> selectedTopics; // Global variable to store selected topics confirmed by users
static CONSTEXPR const char *AVAILABLE_TOPICS = ".available_topics"; // NATS subject to pub/sub available topics
static CONSTEXPR const char *REGISTER_UNIT_TOPIC = "*.register_unit"; // NATS subject to pub/sub registering unit
static CONSTEXPR const char *PUBLISH_TOPICS = ".publish_topics"; // NATS subject to publish data stream
static CONSTEXPR const char *CHECK_STATUS = ".check_status"; // NATS subject to pub/sub checking unit status
unit_st _unit; // Global variable to store the unit information
vector<string> availableTopics; // Global variable to store available topics
string excludedTopics; // Global variable to store topics that are excluded by the users
vector<string> selectedTopics; // Global variable to store selected topics confirmed by users
static CONSTEXPR const char *AVAILABLE_TOPICS = ".available_topics"; // NATS subject to pub/sub available topics
static CONSTEXPR const char *REGISTER_UNIT_TOPIC = "*.register_unit"; // NATS subject to pub/sub registering unit
static CONSTEXPR const char *PUBLISH_TOPICS = ".publish_topics"; // NATS subject to publish data stream
static CONSTEXPR const char *CHECK_STATUS = ".check_status"; // NATS subject to pub/sub checking unit status
unique_ptr<ThreadTimer> _natsRegisterTh;
natsConnection *_conn = nullptr; // Global NATS connection object
natsConnection *_conn = nullptr; // Global NATS connection object
natsOptions *_opts = nullptr;
natsSubscription *subAvailableTopic = nullptr;
natsSubscription *subSelectedTopic = nullptr;
Expand All @@ -50,9 +52,9 @@ namespace TelematicBridge
/**
*@brief Construct telematic unit
*/
explicit TelematicUnit();
explicit TelematicUnit() = default;
/**
* @brief A function for telematic unit to connect to NATS server. Throw exception is connection failed. *
* @brief A function for telematic unit to connect to NATS server. Throw exception is connection failed. *
* @param const string NATS server URL
* @param uint16_t The numbers of attempts to make connections to NATS server
* @param uint16_t The timeout for between connection attempts
Expand All @@ -79,8 +81,8 @@ namespace TelematicBridge
* @brief Check if the given topic is inside the selectedTopics list
* @param string A topic to check for existence
* @return boolean indicator whether the input topic eixst.
*/
bool inSelectedTopics(const string& topic);
*/
bool inSelectedTopics(const string &topic);

/**
* @brief A NATS requestor for telematic unit to send register request to NATS server.
Expand Down

0 comments on commit f445eeb

Please sign in to comment.