Skip to content

Commit

Permalink
Merge pull request #367 from usdot-fhwa-OPS/hotfix_remove_tcm_duplica…
Browse files Browse the repository at this point in the history
…te_tcr_ids2

Hotfix Repeatedly broadcast TCM and timeout bugs
  • Loading branch information
snallamothu authored May 9, 2022
2 parents 7f083d5 + 715ef08 commit 68ed269
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 60 deletions.
10 changes: 10 additions & 0 deletions src/v2i-hub/CARMACloudPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
"default": "1000",
"description": "After it receives TCM from carma cloud, it repeatedly broadcasts TCM until TCMRepeatTimeOut milliseconds."
},
{
"key": "TCMRepeatedlyBroadcastSleep",
"default": "100",
"description": "The repeatedly broadcast thread should sleep for number of milliseconds."
},
{
"key": "TCMRepeatedlyBroadCastTotalTimes",
"default": "1",
"description": "The number of times TCMs with the same request id should be repeatedly broadcast within the time out period."
},
{
"key": "TCMNOAcknowledgementDescription",
"default": "No response received from CMV after repeatedly broadcast TCMs.",
Expand Down
213 changes: 155 additions & 58 deletions src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) {
base_ack = "/carmacloud/tcmack";
method = "POST";
_not_ACK_TCMs = std::make_shared<multimap<string, tsm5EncodedMessage>>();
std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this);
_tcm_broadcast_times = std::make_shared<std::multimap<string, TCMBroadcastMetadata>>();
_tcm_broadcast_starting_time = std::make_shared<std::map<string, std::time_t>>();
std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this);
Broadcast_t.detach();

}
Expand Down Expand Up @@ -94,6 +96,14 @@ void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &r
PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str<<endl;
CloudSend(xml_str,url, base_req, method);

//If TCR reqids match with the any existing TCMs, erase the TCMs from the list after sending new TCR request to carma-cloud.
std::lock_guard<mutex> lock(_not_ACK_TCMs_mutex);
if(_not_ACK_TCMs->erase(reqid) <= 0)
{
PLOG(logDEBUG) << "TCR request id =" << reqid << " Not Found in TCM map." << std::endl;
}else{
PLOG(logDEBUG) << "TCR request id =" << reqid << " Found in TCM map. Remove the existing TCMs with the same TCR request id." << std::endl;
}
}

void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg){
Expand Down Expand Up @@ -129,24 +139,45 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl
string CMV_id = ss.str();

std::lock_guard<mutex> lock(_not_ACK_TCMs_mutex);
//The traffic control id should match with the TCM id per CMV (CARMA vehicle).
if(_not_ACK_TCMs->erase(traffic_control_id) <= 0)
auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id);
bool is_tcm_removed = false;
for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++)
{
//The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM.
tsm5EncodedMessage msg = itr->second;
tsm5Message decoded_tsm5_msg = msg.decode_j2735_message();
std::shared_ptr<TestMessage05> msg_j2735_data = decoded_tsm5_msg.get_j2735_data();
if (msg_j2735_data == NULL) {
PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl;
break;
}

if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum))
{
//Remove a single TCM identified by reqid (traffic control id) and msgnum.
_not_ACK_TCMs->erase(itr);
PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl;
is_tcm_removed = true;
break;
}
}
if(!is_tcm_removed)
{
PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << " Not Found in TCM map." << std::endl;
PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl;
}

//Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log
tmx::messages::TmxEventLogMessage event_log_msg;

//acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV
std::transform(acknnowledgement_str.begin(), acknnowledgement_str.end(), acknnowledgement_str.begin(), ::tolower );
acknnowledgement_str.find("1") != std::string::npos ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
int ack = std::stoi(acknnowledgement_str);
ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description);
PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl;
this->BroadcastMessage<tmx::messages::TmxEventLogMessage>(event_log_msg);

//send negative ack to carma-cloud if not receiving any ack from CMV. acknowledgement_status__acknowledged = 1
if(acknnowledgement_str.find("1") == std::string::npos )
//Only send negative ack to carma-cloud if receiving any acks from CMV.
if(ack != acknowledgement_status::acknowledgement_status__acknowledged )
{
stringstream sss;
sss << "<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlAcknowledgement><reqid> " << traffic_control_id
Expand Down Expand Up @@ -219,6 +250,8 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
container.load<XML>(ss);
tsm5message.set_contents(container.get_storage().get_tree());
tsm5ENC.encode_j2735_message(tsm5message);
BroadcastTCM(tsm5ENC);
PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str();

//Get TCM id
Id64b_t tcmv01_req_id = tsm5message.get_j2735_data()->body.choice.tcmV01.reqid;
Expand All @@ -238,80 +271,142 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
}
}

void CARMACloudPlugin::Broadcast_TCMs()
void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM()
{
std::time_t start_time = 0, cur_time = 0;
bool is_started_broadcasting = false;
while(true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(_TCMRepeatedlyBroadcastSleep));
if(_plugin->state == IvpPluginState_error)
{
break;
}
if(_not_ACK_TCMs->size() > 0)
{
std::lock_guard<mutex> lock(_not_ACK_TCMs_mutex);
for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr )

std::set<string> expired_req_ids;

for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ )
{
string tcmv01_req_id_hex = itr->first;
if(!is_started_broadcasting)
string tcmv01_req_id_hex = itr->first;
auto cur_time = (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())).count();
if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0)
{
start_time = (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())).count();
}else{
cur_time = (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())).count();
_tcm_broadcast_starting_time->insert({tcmv01_req_id_hex, cur_time});
}
is_started_broadcasting = true;
if((cur_time - start_time) > _TCMRepeatedlyBroadcastTimeOut)
else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut )
{
_not_ACK_TCMs->erase(tcmv01_req_id_hex);
start_time = 0;
cur_time = 0;
is_started_broadcasting = false;

//Create an event log object for both NO ACK (ackownledgement), and broadcast the event log
tmx::messages::TmxEventLogMessage event_log_msg;
event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex);
PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl;
this->BroadcastMessage<tmx::messages::TmxEventLogMessage>(event_log_msg);

//send negative ack to carma-cloud
stringstream sss;
sss << "<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlAcknowledgement><reqid> " << tcmv01_req_id_hex
<< "</reqid><msgnum></msgnum><cmvid></cmvid><acknowledgement>" << acknowledgement_status::acknowledgement_status__not_acknowledged
<< "</acknowledgement><description>" << _TCMNOAcknowledgementDescription
<< "</description></TrafficControlAcknowledgement>";
PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <<endl;
CloudSend(sss.str(),url, base_ack, method);
break;
expired_req_ids.insert(tcmv01_req_id_hex);
continue;
}
std::unique_ptr<tsm5EncodedMessage> msg;
msg.reset();
msg.reset(dynamic_cast<tsm5EncodedMessage*>(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING)));


tsm5EncodedMessage tsm5ENC = itr->second;
string enc = tsm5ENC.get_encoding();
msg->refresh_timestamp();
msg->set_payload(tsm5ENC.get_payload_str());
msg->set_encoding(enc);
msg->set_flags(IvpMsgFlags_RouteDSRC);
msg->addDsrcMetadata(172, 0x8003);
msg->refresh_timestamp();

routeable_message *rMsg = dynamic_cast<routeable_message *>(msg.get());
BroadcastMessage(*rMsg);
PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str();
string tcm_hex_payload = tsm5ENC.get_payload_str();
if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload))
{
continue;
}
BroadcastTCM(tsm5ENC);
PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str();
} //END TCMs LOOP

// For any ids which have expired clean up the maps
for (auto tcmv01_req_id_hex : expired_req_ids) {
//Create an event log object for both NO ACK (ackownledgement), and broadcast the event log
tmx::messages::TmxEventLogMessage event_log_msg;
event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex);
PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl;
this->BroadcastMessage<tmx::messages::TmxEventLogMessage>(event_log_msg);

//send negative ack to carma-cloud
stringstream sss;
sss << "<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlAcknowledgement><reqid> " << tcmv01_req_id_hex
<< "</reqid><msgnum></msgnum><cmvid></cmvid><acknowledgement>" << acknowledgement_status::acknowledgement_status__not_acknowledged
<< "</acknowledgement><description>" << _TCMNOAcknowledgementDescription
<< "</description></TrafficControlAcknowledgement>";
PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <<endl;
CloudSend(sss.str(),url, base_ack, method);

_not_ACK_TCMs->erase(tcmv01_req_id_hex);
//If time out, stop tracking the starting time of the TCMs being broadcast so far
_tcm_broadcast_starting_time->erase(tcmv01_req_id_hex);
//If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast
_tcm_broadcast_times->erase(tcmv01_req_id_hex);
}
}
else
{
start_time = 0;
cur_time = 0;
is_started_broadcasting = false;
PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl;
_tcm_broadcast_times->clear();
_tcm_broadcast_starting_time->clear();
}
}
}

void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) {
//Broadcast TCM
string enc = tsm5ENC.get_encoding();
std::unique_ptr<tsm5EncodedMessage> msg;
msg.reset();
msg.reset(dynamic_cast<tsm5EncodedMessage*>(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING)));
msg->refresh_timestamp();
msg->set_payload(tsm5ENC.get_payload_str());
msg->set_encoding(enc);
msg->set_flags(IvpMsgFlags_RouteDSRC);
msg->addDsrcMetadata(172, 0x8003);
msg->refresh_timestamp();
routeable_message *rMsg = dynamic_cast<routeable_message *>(msg.get());
BroadcastMessage(*rMsg);
}

bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const
{
//Skip repeatedly broadcasting
if(_TCMRepeatedlyBroadCastTotalTimes == 0)
{
return true;
}

bool is_skip_cur_tcm = false;
bool is_tcm_hex_found = false;
auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex);
for(auto itr = tcms_metadatas.first; itr !=tcms_metadatas.second; itr ++)
{
string tcm_hex = itr->second.tcm_hex;
int times = itr->second.num_of_times;
if(tcm_hex == tcm_hex_payload)
{
is_tcm_hex_found = true;
if (times >= _TCMRepeatedlyBroadCastTotalTimes)
{
PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been repeatedly broadcast " << times << " times." << std::endl;
//Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes
is_skip_cur_tcm = true;
}
else
{
//update the number of times a TCM being broadcast within time out period
times += 1; //Increase by 1 for every iteration
itr->second.num_of_times = times;
PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " repeatedly broadcast " << times << " times." << std::endl;
}
}
}

if(!is_tcm_hex_found)
{
//Initialize the number of times a TCM being broadcast
TCMBroadcastMetadata t;
t.num_of_times = 1;
t.tcm_hex = tcm_hex_payload;
_tcm_broadcast_times->insert( {tcmv01_req_id_hex, t});
PLOG(logDEBUG) << " TCMs reqid = "<< tcmv01_req_id_hex << " has been broadcast once."<< std::endl;
}
return is_skip_cur_tcm;
}

int CARMACloudPlugin::StartWebService()
{
//Web services
Expand Down Expand Up @@ -359,6 +454,8 @@ void CARMACloudPlugin::UpdateConfigSettings() {
GetConfigValue<string>("MobilityOperationStrategies", _strategies);
GetConfigValue<uint16_t>("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut);
GetConfigValue<string>("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription);
GetConfigValue<int>("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes);
GetConfigValue<int>("TCMRepeatedlyBroadcastSleep", _TCMRepeatedlyBroadcastSleep);

}

Expand Down
31 changes: 29 additions & 2 deletions src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,18 @@ class CARMACloudPlugin: public PluginClient {
* @brief: Loop through the received TCMs and broadcast them for the configured duration.
* If it timed out, it would remove the TCMs from the list, and stop broadcasting them.
* ***/
void Broadcast_TCMs();
void TCMAckCheckAndRebroadcastTCM();
/***
* @biref: Add DSRC metadata for TCM and broadcast TCM
* @param: Encoded TCM to broadcast
***/
void BroadcastTCM(tsm5EncodedMessage& tsm5ENC);
/***
* @brief: Determin if stop broadcasting the current TCM
* @param: std::string of decoded TCM request id
* @param: key string TCM hex payload
* **/
bool IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const;

private:

Expand All @@ -165,15 +176,31 @@ class CARMACloudPlugin: public PluginClient {
//Comma separated string for list of strategies from MobilityOperation messages
std::string _strategies;

struct TCMBroadcastMetadata
{
string tcm_hex;
int num_of_times;
};
//Used to lock the shared TCMs resource
std::mutex _not_ACK_TCMs_mutex;
//An associated array to keep track of TCMs that are not acknowledged
std::shared_ptr<std::multimap<string, tsm5EncodedMessage>> _not_ACK_TCMs;

//TCM repeatedly broadcast time out in unit of second
uint16_t _TCMRepeatedlyBroadcastTimeOut = 0;
//Keep track of the starting time (unit of milliseconds) TCMs with same TCR request ids being broadcast Key: tcm request id, value: the start broadcasting timestamp
std::shared_ptr<std::map<string, std::time_t>> _tcm_broadcast_starting_time;

std::string _TCMNOAcknowledgementDescription = "";
const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement";

//Total number of times repeatedly broadcast TCMs with the same request id
int _TCMRepeatedlyBroadCastTotalTimes = 0;
//Keep track of the number of times repeatedly broadcast TCMS. Key: tcm hex string, value: the number of times
std::shared_ptr< std::multimap<string, TCMBroadcastMetadata>> _tcm_broadcast_times;

const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement";
int _TCMRepeatedlyBroadcastSleep = 100;

};
std::mutex _cfgLock;
}
Expand Down

0 comments on commit 68ed269

Please sign in to comment.