From 68da86e7c75ef0f91d4f8b391d21533396e3d18d Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Tue, 19 Apr 2022 16:11:14 -0400 Subject: [PATCH 01/16] init --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 67eb6678d..2864cfc01 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -94,6 +94,14 @@ void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &r PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str< lock(_not_ACK_TCMs_mutex); + if(_not_ACK_TCMs->erase(reqid) <= 0) + { + PLOG(logDEBUG) << "TCR request id =" << reqid << " Not Found in TCM map." << std::endl; + }else{ + PLOG(logDEBUG) << "TCR request id =" << reqid << " Found in TCM map. Remove the existing TCMs with the same TCR request id." << std::endl; + } } void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg){ From de4bbc1fa943ced490428dfcd21608f84fb7e7bc Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Wed, 20 Apr 2022 15:07:50 -0400 Subject: [PATCH 02/16] update --- src/v2i-hub/CARMACloudPlugin/manifest.json | 5 ++++ .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 27 ++++++++++++++++++- .../CARMACloudPlugin/src/CARMACloudPlugin.h | 5 ++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/v2i-hub/CARMACloudPlugin/manifest.json b/src/v2i-hub/CARMACloudPlugin/manifest.json index d6098af7c..7b4ac96af 100644 --- a/src/v2i-hub/CARMACloudPlugin/manifest.json +++ b/src/v2i-hub/CARMACloudPlugin/manifest.json @@ -48,6 +48,11 @@ "default": "1000", "description": "After it receives TCM from carma cloud, it repeatedly broadcasts TCM until TCMRepeatTimeOut milliseconds." }, + { + "key": "TCMRepeatedlyBroadCastTotalTimes", + "default": "1", + "description": "The number of times TCMs with the same request id should be broadcast within the time out period." + }, { "key": "TCMNOAcknowledgementDescription", "default": "No response received from CMV after repeatedly broadcast TCMs.", diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 2864cfc01..11380a46f 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -262,7 +262,8 @@ void CARMACloudPlugin::Broadcast_TCMs() std::lock_guard lock(_not_ACK_TCMs_mutex); for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) { - string tcmv01_req_id_hex = itr->first; + string tcmv01_req_id_hex = itr->first; + //Start recording time out period if(!is_started_broadcasting) { start_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); @@ -273,6 +274,8 @@ void CARMACloudPlugin::Broadcast_TCMs() if((cur_time - start_time) > _TCMRepeatedlyBroadcastTimeOut) { _not_ACK_TCMs->erase(tcmv01_req_id_hex); + //If time out, stop counting the number of times TCMs being broadcast + _tcm_broadcast_times->erase(tcmv01_req_id_hex); start_time = 0; cur_time = 0; is_started_broadcasting = false; @@ -293,7 +296,18 @@ void CARMACloudPlugin::Broadcast_TCMs() PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <count(tcmv01_req_id_hex) == 0) + { + _tcm_broadcast_times->insert({tcmv01_req_id_hex, 0}); } + else if (_tcm_broadcast_times->at(tcmv01_req_id_hex) > _TCMRepeatedlyBroadCastTotalTimes){ + //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes + continue; + } + std::unique_ptr msg; msg.reset(); msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); @@ -317,6 +331,16 @@ void CARMACloudPlugin::Broadcast_TCMs() cur_time = 0; is_started_broadcasting = false; } + + //update the number of times a TCM should be broadcast within time out period + for(auto itr = _tcm_broadcast_times->begin(); itr!=_tcm_broadcast_times->end(); itr++) + { + int times = _tcm_broadcast_times->at(itr->first); + times += 1; //Increase 1 by every iteration + _tcm_broadcast_times->erase(itr->first); + _tcm_broadcast_times->insert({itr->first, times}); + PLOG(logDEBUG) << " TCMs with request id = " << itr->first << " has been broadcast " << times << " times."; + } } } @@ -367,6 +391,7 @@ void CARMACloudPlugin::UpdateConfigSettings() { GetConfigValue("MobilityOperationStrategies", _strategies); GetConfigValue("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut); GetConfigValue("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription); + GetConfigValue("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes); } diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 9cece33bf..52a5c37b5 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -172,6 +172,11 @@ class CARMACloudPlugin: public PluginClient { //TCM repeatedly broadcast time out in unit of second uint16_t _TCMRepeatedlyBroadcastTimeOut = 0; std::string _TCMNOAcknowledgementDescription = ""; + + //Total number of times repeatedly broadcast TCMs with the same request id + int _TCMRepeatedlyBroadCastTotalTimes = 0; + //Keep tracking of the number of times repeatedly broadcast TCMS. Key: tcm request id, value: the number of times + std::shared_ptr> _tcm_broadcast_times; const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; }; From 85800c8177e4ef765a6ab4c417ea6dad53b85c4e Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Wed, 20 Apr 2022 16:12:43 -0400 Subject: [PATCH 03/16] update --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 11380a46f..8e3d37688 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -32,6 +32,7 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { base_ack = "/carmacloud/tcmack"; method = "POST"; _not_ACK_TCMs = std::make_shared>(); + _tcm_broadcast_times = std::make_shared>(); std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this); Broadcast_t.detach(); From 7eb67b454fb7418aecff07e4788d877b228d199d Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Wed, 20 Apr 2022 16:21:29 -0400 Subject: [PATCH 04/16] update --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 8e3d37688..a0e5106f7 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -304,7 +304,7 @@ void CARMACloudPlugin::Broadcast_TCMs() { _tcm_broadcast_times->insert({tcmv01_req_id_hex, 0}); } - else if (_tcm_broadcast_times->at(tcmv01_req_id_hex) > _TCMRepeatedlyBroadCastTotalTimes){ + else if (_tcm_broadcast_times->at(tcmv01_req_id_hex) >= _TCMRepeatedlyBroadCastTotalTimes){ //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes continue; } From ef91750c77fbb27c95c93d1e52a4b71c42faca5d Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Wed, 20 Apr 2022 18:13:36 -0400 Subject: [PATCH 05/16] update --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index a0e5106f7..3a9cb819b 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -303,8 +303,10 @@ void CARMACloudPlugin::Broadcast_TCMs() if (_tcm_broadcast_times->count(tcmv01_req_id_hex) == 0) { _tcm_broadcast_times->insert({tcmv01_req_id_hex, 0}); + PLOG(logDEBUG) << " TCMs with request id = " << tcmv01_req_id_hex << " has been broadcast " << _tcm_broadcast_times->at(tcmv01_req_id_hex) << " times."<< std::endl; } else if (_tcm_broadcast_times->at(tcmv01_req_id_hex) >= _TCMRepeatedlyBroadCastTotalTimes){ + PLOG(logDEBUG) << "SKIP as TCMs with request id = " << tcmv01_req_id_hex << " has been broadcast " << _tcm_broadcast_times->at(tcmv01_req_id_hex) << " times."<< std::endl; //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes continue; } @@ -324,6 +326,17 @@ void CARMACloudPlugin::Broadcast_TCMs() routeable_message *rMsg = dynamic_cast(msg.get()); BroadcastMessage(*rMsg); PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + } //END TCMs LOOP + + //update the number of times a TCM should be broadcast within time out period + for(auto itr = _tcm_broadcast_times->begin(); itr!=_tcm_broadcast_times->end(); itr++) + { + string reqid = itr->first; + int times = itr->second; + times += 1; //Increase 1 by every iteration + _tcm_broadcast_times->erase(reqid); + _tcm_broadcast_times->insert({reqid, times}); + PLOG(logDEBUG) << " TCMs with request id = " << reqid << " has been broadcast " << times << " times." << std::endl; } } else @@ -331,17 +344,10 @@ void CARMACloudPlugin::Broadcast_TCMs() start_time = 0; cur_time = 0; is_started_broadcasting = false; + _tcm_broadcast_times->clear(); } - //update the number of times a TCM should be broadcast within time out period - for(auto itr = _tcm_broadcast_times->begin(); itr!=_tcm_broadcast_times->end(); itr++) - { - int times = _tcm_broadcast_times->at(itr->first); - times += 1; //Increase 1 by every iteration - _tcm_broadcast_times->erase(itr->first); - _tcm_broadcast_times->insert({itr->first, times}); - PLOG(logDEBUG) << " TCMs with request id = " << itr->first << " has been broadcast " << times << " times."; - } + } } From 1c8d66e14abded70adb5394d07bdd9fcf0b67b1b Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 21 Apr 2022 00:06:22 -0400 Subject: [PATCH 06/16] update --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 100 ++++++++++-------- .../CARMACloudPlugin/src/CARMACloudPlugin.h | 22 +++- 2 files changed, 76 insertions(+), 46 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 3a9cb819b..9c0d8bb26 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -32,7 +32,8 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { base_ack = "/carmacloud/tcmack"; method = "POST"; _not_ACK_TCMs = std::make_shared>(); - _tcm_broadcast_times = std::make_shared>(); + _tcm_broadcast_times = std::make_shared>(); + _tcm_broadcast_starting_time = std::make_shared>(); std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this); Broadcast_t.detach(); @@ -249,8 +250,6 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) void CARMACloudPlugin::Broadcast_TCMs() { - std::time_t start_time = 0, cur_time = 0; - bool is_started_broadcasting = false; while(true) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -264,24 +263,20 @@ void CARMACloudPlugin::Broadcast_TCMs() for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) { string tcmv01_req_id_hex = itr->first; - //Start recording time out period - if(!is_started_broadcasting) + auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); + if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) { - start_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); - }else{ - cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); + _tcm_broadcast_starting_time->insert({tcmv01_req_id_hex, cur_time}); } - is_started_broadcasting = true; - if((cur_time - start_time) > _TCMRepeatedlyBroadcastTimeOut) + else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) { _not_ACK_TCMs->erase(tcmv01_req_id_hex); - //If time out, stop counting the number of times TCMs being broadcast + //If time out, stop tracking the starting time of the TCMs being broadcast so far + _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); + //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast _tcm_broadcast_times->erase(tcmv01_req_id_hex); - start_time = 0; - cur_time = 0; - is_started_broadcasting = false; - //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log + tmx::messages::TmxEventLogMessage event_log_msg; event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); @@ -297,24 +292,18 @@ void CARMACloudPlugin::Broadcast_TCMs() PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <count(tcmv01_req_id_hex) == 0) - { - _tcm_broadcast_times->insert({tcmv01_req_id_hex, 0}); - PLOG(logDEBUG) << " TCMs with request id = " << tcmv01_req_id_hex << " has been broadcast " << _tcm_broadcast_times->at(tcmv01_req_id_hex) << " times."<< std::endl; - } - else if (_tcm_broadcast_times->at(tcmv01_req_id_hex) >= _TCMRepeatedlyBroadCastTotalTimes){ - PLOG(logDEBUG) << "SKIP as TCMs with request id = " << tcmv01_req_id_hex << " has been broadcast " << _tcm_broadcast_times->at(tcmv01_req_id_hex) << " times."<< std::endl; - //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes - continue; } std::unique_ptr msg; msg.reset(); msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); tsm5EncodedMessage tsm5ENC = itr->second; + string tcm_hex_payload = tsm5ENC.get_payload_str(); + if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload)) + { + continue; + } + //Broadcast TCM string enc = tsm5ENC.get_encoding(); msg->refresh_timestamp(); msg->set_payload(tsm5ENC.get_payload_str()); @@ -322,33 +311,58 @@ void CARMACloudPlugin::Broadcast_TCMs() msg->set_flags(IvpMsgFlags_RouteDSRC); msg->addDsrcMetadata(172, 0x8003); msg->refresh_timestamp(); - routeable_message *rMsg = dynamic_cast(msg.get()); BroadcastMessage(*rMsg); PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); } //END TCMs LOOP - - //update the number of times a TCM should be broadcast within time out period - for(auto itr = _tcm_broadcast_times->begin(); itr!=_tcm_broadcast_times->end(); itr++) - { - string reqid = itr->first; - int times = itr->second; - times += 1; //Increase 1 by every iteration - _tcm_broadcast_times->erase(reqid); - _tcm_broadcast_times->insert({reqid, times}); - PLOG(logDEBUG) << " TCMs with request id = " << reqid << " has been broadcast " << times << " times." << std::endl; - } } else { - start_time = 0; - cur_time = 0; - is_started_broadcasting = false; + PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl; _tcm_broadcast_times->clear(); + _tcm_broadcast_starting_time->clear(); } + } +} +bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const +{ + bool is_skip_cur_tcm = false; + bool is_tcm_hex_found = false; + auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex); + for(auto itr = tcms_metadatas.first; itr !=tcms_metadatas.second; itr ++) + { + string tcm_hex = itr->second.tcm_hex; + int times = itr->second.num_of_times; + if(tcm_hex == tcm_hex_payload) + { + is_tcm_hex_found = true; + if (times >= _TCMRepeatedlyBroadCastTotalTimes) + { + PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been broadcast " << times << " times." << std::endl; + //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes + is_skip_cur_tcm = true; + } + else + { + //update the number of times a TCM being broadcast within time out period + times += 1; //Increase by 1 for every iteration + itr->second.num_of_times = times; + PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " has been broadcast " << times << " times." << std::endl; + } + } + } - } + if(!is_tcm_hex_found) + { + //Initialize the number of times a TCM being broadcast + TCMBroadcastMetadata t; + t.num_of_times = 1; + t.tcm_hex = tcm_hex_payload; + _tcm_broadcast_times->insert( {tcmv01_req_id_hex, t}); + PLOG(logDEBUG) << " TCMs reqid = "<< tcmv01_req_id_hex << " has been broadcast once."<< std::endl; + } + return is_skip_cur_tcm; } int CARMACloudPlugin::StartWebService() diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 52a5c37b5..bbff75570 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -149,6 +149,12 @@ class CARMACloudPlugin: public PluginClient { * If it timed out, it would remove the TCMs from the list, and stop broadcasting them. * ***/ void Broadcast_TCMs(); + /*** + * @brief: Determin if stop broadcasting the current TCM + * @param: std::string of decoded TCM request id + * @param: key string TCM hex payload + * **/ + bool IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const; private: @@ -165,20 +171,30 @@ class CARMACloudPlugin: public PluginClient { //Comma separated string for list of strategies from MobilityOperation messages std::string _strategies; + struct TCMBroadcastMetadata + { + string tcm_hex; + int num_of_times; + }; //Used to lock the shared TCMs resource std::mutex _not_ACK_TCMs_mutex; //An associated array to keep track of TCMs that are not acknowledged std::shared_ptr> _not_ACK_TCMs; + //TCM repeatedly broadcast time out in unit of second uint16_t _TCMRepeatedlyBroadcastTimeOut = 0; + //Keep track of the starting time (unit of milliseconds) TCMs with same TCR request ids being broadcast Key: tcm request id, value: the start broadcasting timestamp + std::shared_ptr> _tcm_broadcast_starting_time; + std::string _TCMNOAcknowledgementDescription = ""; //Total number of times repeatedly broadcast TCMs with the same request id int _TCMRepeatedlyBroadCastTotalTimes = 0; - //Keep tracking of the number of times repeatedly broadcast TCMS. Key: tcm request id, value: the number of times - std::shared_ptr> _tcm_broadcast_times; - const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; + //Keep track of the number of times repeatedly broadcast TCMS. Key: tcm hex string, value: the number of times + std::shared_ptr< std::multimap> _tcm_broadcast_times; + const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; + }; std::mutex _cfgLock; } From 4037baddf06c5c838b1c02a19ca4333c51691252 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 21 Apr 2022 10:29:29 -0400 Subject: [PATCH 07/16] update --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 9c0d8bb26..1c08b25bc 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -260,9 +260,16 @@ void CARMACloudPlugin::Broadcast_TCMs() if(_not_ACK_TCMs->size() > 0) { std::lock_guard lock(_not_ACK_TCMs_mutex); - for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) + auto itr = _not_ACK_TCMs->begin(); + while( itr!=_not_ACK_TCMs->end() ) { + if(itr == _not_ACK_TCMs->end()) + { + PLOG(logDEBUG) << "itr end " << std::endl; + break; + } string tcmv01_req_id_hex = itr->first; + PLOG(logDEBUG) << "tcmv01_req_id_hex " << tcmv01_req_id_hex << std::endl; auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) { @@ -291,8 +298,9 @@ void CARMACloudPlugin::Broadcast_TCMs() << ""; PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() < msg; msg.reset(); From fa9df65a4d20af086ea313dd43ffc2b57b702a55 Mon Sep 17 00:00:00 2001 From: Michael McConnell Date: Thu, 21 Apr 2022 07:56:30 -0700 Subject: [PATCH 08/16] clear outside loop --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 1c08b25bc..511152586 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -260,16 +260,12 @@ void CARMACloudPlugin::Broadcast_TCMs() if(_not_ACK_TCMs->size() > 0) { std::lock_guard lock(_not_ACK_TCMs_mutex); - auto itr = _not_ACK_TCMs->begin(); - while( itr!=_not_ACK_TCMs->end() ) + + std::unordered_set expired_req_ids; + + for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) { - if(itr == _not_ACK_TCMs->end()) - { - PLOG(logDEBUG) << "itr end " << std::endl; - break; - } string tcmv01_req_id_hex = itr->first; - PLOG(logDEBUG) << "tcmv01_req_id_hex " << tcmv01_req_id_hex << std::endl; auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) { @@ -277,30 +273,12 @@ void CARMACloudPlugin::Broadcast_TCMs() } else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) { - _not_ACK_TCMs->erase(tcmv01_req_id_hex); - //If time out, stop tracking the starting time of the TCMs being broadcast so far - _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); - //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast - _tcm_broadcast_times->erase(tcmv01_req_id_hex); - //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log - tmx::messages::TmxEventLogMessage event_log_msg; - event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); - event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); - PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; - this->BroadcastMessage(event_log_msg); - //send negative ack to carma-cloud - stringstream sss; - sss << " " << tcmv01_req_id_hex - << "" << acknowledgement_status::acknowledgement_status__not_acknowledged - << "" << _TCMNOAcknowledgementDescription - << ""; - PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() < msg; msg.reset(); @@ -323,6 +301,35 @@ void CARMACloudPlugin::Broadcast_TCMs() BroadcastMessage(*rMsg); PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); } //END TCMs LOOP + + // For any ids which have expired clean up the maps + for (auto expired_id : expired_req_ids) { + + //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log + + + tmx::messages::TmxEventLogMessage event_log_msg; + event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); + PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; + this->BroadcastMessage(event_log_msg); + + //send negative ack to carma-cloud + stringstream sss; + sss << " " << tcmv01_req_id_hex + << "" << acknowledgement_status::acknowledgement_status__not_acknowledged + << "" << _TCMNOAcknowledgementDescription + << ""; + PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); + //If time out, stop tracking the starting time of the TCMs being broadcast so far + _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); + //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast + _tcm_broadcast_times->erase(tcmv01_req_id_hex); + + } } else { From 39749a45ee56411c5f8a7a2d04b10f2e7edad6e4 Mon Sep 17 00:00:00 2001 From: Michael McConnell Date: Thu, 21 Apr 2022 08:02:59 -0700 Subject: [PATCH 09/16] clearer name --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 511152586..40c369646 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -303,7 +303,7 @@ void CARMACloudPlugin::Broadcast_TCMs() } //END TCMs LOOP // For any ids which have expired clean up the maps - for (auto expired_id : expired_req_ids) { + for (auto tcmv01_req_id_hex : expired_req_ids) { //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log From a6a48a9e4ed6fe584b4d8fb9334efe3e1e55100b Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 21 Apr 2022 13:16:40 -0400 Subject: [PATCH 10/16] update --- src/v2i-hub/CARMACloudPlugin/manifest.json | 5 ++ .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 71 ++++++++++--------- .../CARMACloudPlugin/src/CARMACloudPlugin.h | 1 + 3 files changed, 45 insertions(+), 32 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/manifest.json b/src/v2i-hub/CARMACloudPlugin/manifest.json index 7b4ac96af..a71ac020a 100644 --- a/src/v2i-hub/CARMACloudPlugin/manifest.json +++ b/src/v2i-hub/CARMACloudPlugin/manifest.json @@ -48,6 +48,11 @@ "default": "1000", "description": "After it receives TCM from carma cloud, it repeatedly broadcasts TCM until TCMRepeatTimeOut milliseconds." }, + { + "key": "TCMRepeatedlyBroadcastSleep", + "default": "100", + "description": "The broadcast thread should sleep for number of milliseconds." + }, { "key": "TCMRepeatedlyBroadCastTotalTimes", "default": "1", diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 40c369646..fddd0a2dc 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -229,6 +229,19 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) container.load(ss); tsm5message.set_contents(container.get_storage().get_tree()); tsm5ENC.encode_j2735_message(tsm5message); + string enc = tsm5ENC.get_encoding(); + std::unique_ptr msg; + msg.reset(); + msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + msg->refresh_timestamp(); + msg->set_payload(tsm5ENC.get_payload_str()); + msg->set_encoding(enc); + msg->set_flags(IvpMsgFlags_RouteDSRC); + msg->addDsrcMetadata(172, 0x8003); + msg->refresh_timestamp(); + routeable_message *rMsg = dynamic_cast(msg.get()); + BroadcastMessage(*rMsg); + PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); //Get TCM id Id64b_t tcmv01_req_id = tsm5message.get_j2735_data()->body.choice.tcmV01.reqid; @@ -252,7 +265,7 @@ void CARMACloudPlugin::Broadcast_TCMs() { while(true) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(_TCMRepeatedlyBroadcastSleep)); if(_plugin->state == IvpPluginState_error) { break; @@ -261,7 +274,7 @@ void CARMACloudPlugin::Broadcast_TCMs() { std::lock_guard lock(_not_ACK_TCMs_mutex); - std::unordered_set expired_req_ids; + std::set expired_req_ids; for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) { @@ -273,10 +286,7 @@ void CARMACloudPlugin::Broadcast_TCMs() } else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) { - - expired_req_ids.insert(tcmv01_req_id_hex); - continue; } @@ -299,36 +309,32 @@ void CARMACloudPlugin::Broadcast_TCMs() msg->refresh_timestamp(); routeable_message *rMsg = dynamic_cast(msg.get()); BroadcastMessage(*rMsg); - PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str(); } //END TCMs LOOP // For any ids which have expired clean up the maps for (auto tcmv01_req_id_hex : expired_req_ids) { - //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log - - - tmx::messages::TmxEventLogMessage event_log_msg; - event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); - event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); - PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; - this->BroadcastMessage(event_log_msg); - - //send negative ack to carma-cloud - stringstream sss; - sss << " " << tcmv01_req_id_hex - << "" << acknowledgement_status::acknowledgement_status__not_acknowledged - << "" << _TCMNOAcknowledgementDescription - << ""; - PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); - //If time out, stop tracking the starting time of the TCMs being broadcast so far - _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); - //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast - _tcm_broadcast_times->erase(tcmv01_req_id_hex); - + tmx::messages::TmxEventLogMessage event_log_msg; + event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); + PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; + this->BroadcastMessage(event_log_msg); + + //send negative ack to carma-cloud + stringstream sss; + sss << " " << tcmv01_req_id_hex + << "" << acknowledgement_status::acknowledgement_status__not_acknowledged + << "" << _TCMNOAcknowledgementDescription + << ""; + PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); + //If time out, stop tracking the starting time of the TCMs being broadcast so far + _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); + //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast + _tcm_broadcast_times->erase(tcmv01_req_id_hex); } } else @@ -354,7 +360,7 @@ bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, c is_tcm_hex_found = true; if (times >= _TCMRepeatedlyBroadCastTotalTimes) { - PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been broadcast " << times << " times." << std::endl; + PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been repeatedly broadcast " << times << " times." << std::endl; //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes is_skip_cur_tcm = true; } @@ -363,7 +369,7 @@ bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, c //update the number of times a TCM being broadcast within time out period times += 1; //Increase by 1 for every iteration itr->second.num_of_times = times; - PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " has been broadcast " << times << " times." << std::endl; + PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " repeatedly broadcast " << times << " times." << std::endl; } } } @@ -428,6 +434,7 @@ void CARMACloudPlugin::UpdateConfigSettings() { GetConfigValue("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut); GetConfigValue("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription); GetConfigValue("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes); + GetConfigValue("TCMRepeatedlyBroadcastSleep", _TCMRepeatedlyBroadcastSleep); } diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index bbff75570..95445f727 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -194,6 +194,7 @@ class CARMACloudPlugin: public PluginClient { std::shared_ptr< std::multimap> _tcm_broadcast_times; const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; + int _TCMRepeatedlyBroadcastSleep = 100; }; std::mutex _cfgLock; From 3c62406dcfa398c915caa62664109a6cd5b7b24d Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 22 Apr 2022 23:26:49 -0400 Subject: [PATCH 11/16] update --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index fddd0a2dc..ee73efbec 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -139,10 +139,23 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl string CMV_id = ss.str(); std::lock_guard lock(_not_ACK_TCMs_mutex); - //The traffic control id should match with the TCM id per CMV (CARMA vehicle). - if(_not_ACK_TCMs->erase(traffic_control_id) <= 0) + auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id); + bool is_tcm_removed = false; + for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++) + { + //The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM. + if(itr->second.decode_j2735_message().get_j2735_data()->body.choice.tcmV01.msgnum == stol(msgnum)) + { + //Remove a single TCM identified by reqid (traffic control id) and msgnum. + _not_ACK_TCMs->erase(itr); + PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl; + is_tcm_removed = true; + break; + } + } + if(!is_tcm_removed) { - PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << " Not Found in TCM map." << std::endl; + PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl; } //Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log From af6e2c85d5449a5b0aa56aece237ddeb4a58309f Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Sat, 23 Apr 2022 00:09:54 -0400 Subject: [PATCH 12/16] update --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index ee73efbec..72a80033e 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -162,14 +162,14 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl tmx::messages::TmxEventLogMessage event_log_msg; //acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV - std::transform(acknnowledgement_str.begin(), acknnowledgement_str.end(), acknnowledgement_str.begin(), ::tolower ); - acknnowledgement_str.find("1") != std::string::npos ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + int ack = std::stoi(acknnowledgement_str); + ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description); PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; this->BroadcastMessage(event_log_msg); - //send negative ack to carma-cloud if not receiving any ack from CMV. acknowledgement_status__acknowledged = 1 - if(acknnowledgement_str.find("1") == std::string::npos ) + //Only send negative ack to carma-cloud if receiving any acks from CMV. + if(ack != acknowledgement_status::acknowledgement_status__acknowledged ) { stringstream sss; sss << " " << traffic_control_id @@ -337,7 +337,7 @@ void CARMACloudPlugin::Broadcast_TCMs() //send negative ack to carma-cloud stringstream sss; sss << " " << tcmv01_req_id_hex - << "" << acknowledgement_status::acknowledgement_status__not_acknowledged + << "" << acknowledgement_status::acknowledgement_status__not_acknowledged << "" << _TCMNOAcknowledgementDescription << ""; PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() < Date: Mon, 25 Apr 2022 11:04:13 -0400 Subject: [PATCH 13/16] update --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 44 ++++++++----------- .../CARMACloudPlugin/src/CARMACloudPlugin.h | 5 +++ 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 72a80033e..87e3cd804 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -242,18 +242,7 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) container.load(ss); tsm5message.set_contents(container.get_storage().get_tree()); tsm5ENC.encode_j2735_message(tsm5message); - string enc = tsm5ENC.get_encoding(); - std::unique_ptr msg; - msg.reset(); - msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); - msg->refresh_timestamp(); - msg->set_payload(tsm5ENC.get_payload_str()); - msg->set_encoding(enc); - msg->set_flags(IvpMsgFlags_RouteDSRC); - msg->addDsrcMetadata(172, 0x8003); - msg->refresh_timestamp(); - routeable_message *rMsg = dynamic_cast(msg.get()); - BroadcastMessage(*rMsg); + BroadcastTCM(tsm5ENC); PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); //Get TCM id @@ -303,25 +292,14 @@ void CARMACloudPlugin::Broadcast_TCMs() continue; } - std::unique_ptr msg; - msg.reset(); - msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + tsm5EncodedMessage tsm5ENC = itr->second; string tcm_hex_payload = tsm5ENC.get_payload_str(); if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload)) { continue; } - //Broadcast TCM - string enc = tsm5ENC.get_encoding(); - msg->refresh_timestamp(); - msg->set_payload(tsm5ENC.get_payload_str()); - msg->set_encoding(enc); - msg->set_flags(IvpMsgFlags_RouteDSRC); - msg->addDsrcMetadata(172, 0x8003); - msg->refresh_timestamp(); - routeable_message *rMsg = dynamic_cast(msg.get()); - BroadcastMessage(*rMsg); + BroadcastTCM(tsm5ENC); PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str(); } //END TCMs LOOP @@ -359,6 +337,22 @@ void CARMACloudPlugin::Broadcast_TCMs() } } +void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) { + //Broadcast TCM + string enc = tsm5ENC.get_encoding(); + std::unique_ptr msg; + msg.reset(); + msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + msg->refresh_timestamp(); + msg->set_payload(tsm5ENC.get_payload_str()); + msg->set_encoding(enc); + msg->set_flags(IvpMsgFlags_RouteDSRC); + msg->addDsrcMetadata(172, 0x8003); + msg->refresh_timestamp(); + routeable_message *rMsg = dynamic_cast(msg.get()); + BroadcastMessage(*rMsg); +} + bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const { bool is_skip_cur_tcm = false; diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 95445f727..68f301c3f 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -149,6 +149,11 @@ class CARMACloudPlugin: public PluginClient { * If it timed out, it would remove the TCMs from the list, and stop broadcasting them. * ***/ void Broadcast_TCMs(); + /*** + * @biref: Add DSRC metadata for TCM and broadcast TCM + * @param: Encoded TCM to broadcast + ***/ + void BroadcastTCM(tsm5EncodedMessage& tsm5ENC); /*** * @brief: Determin if stop broadcasting the current TCM * @param: std::string of decoded TCM request id From cadea7b80b4dcd3c0eca112db1f64ac51f81cc66 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Mon, 25 Apr 2022 11:07:20 -0400 Subject: [PATCH 14/16] update --- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 4 ++-- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 87e3cd804..7c8a9c8ab 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -34,7 +34,7 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { _not_ACK_TCMs = std::make_shared>(); _tcm_broadcast_times = std::make_shared>(); _tcm_broadcast_starting_time = std::make_shared>(); - std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this); + std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this); Broadcast_t.detach(); } @@ -263,7 +263,7 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) } } -void CARMACloudPlugin::Broadcast_TCMs() +void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM() { while(true) { diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 68f301c3f..493dc32e8 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -148,7 +148,7 @@ class CARMACloudPlugin: public PluginClient { * @brief: Loop through the received TCMs and broadcast them for the configured duration. * If it timed out, it would remove the TCMs from the list, and stop broadcasting them. * ***/ - void Broadcast_TCMs(); + void TCMAckCheckAndRebroadcastTCM(); /*** * @biref: Add DSRC metadata for TCM and broadcast TCM * @param: Encoded TCM to broadcast From cc55c90a0eb66a92f1b22c3efdc9fc4484f83727 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Tue, 26 Apr 2022 00:05:34 -0400 Subject: [PATCH 15/16] fix seg fault --- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 7c8a9c8ab..6515ac71f 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -144,7 +144,15 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++) { //The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM. - if(itr->second.decode_j2735_message().get_j2735_data()->body.choice.tcmV01.msgnum == stol(msgnum)) + tsm5EncodedMessage msg = itr->second; + tsm5Message decoded_tsm5_msg = msg.decode_j2735_message(); + std::shared_ptr msg_j2735_data = decoded_tsm5_msg.get_j2735_data(); + if (msg_j2735_data == NULL) { + PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl; + break; + } + + if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum)) { //Remove a single TCM identified by reqid (traffic control id) and msgnum. _not_ACK_TCMs->erase(itr); @@ -278,7 +286,7 @@ void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM() std::set expired_req_ids; - for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) + for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ ) { string tcmv01_req_id_hex = itr->first; auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); From 715ef08f3627acf511c46b0ac045c6588f887f3f Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 5 May 2022 16:30:55 -0400 Subject: [PATCH 16/16] add zero time config for TCM repeatedly broadcast --- src/v2i-hub/CARMACloudPlugin/manifest.json | 4 ++-- src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/manifest.json b/src/v2i-hub/CARMACloudPlugin/manifest.json index a71ac020a..84d9cbae8 100644 --- a/src/v2i-hub/CARMACloudPlugin/manifest.json +++ b/src/v2i-hub/CARMACloudPlugin/manifest.json @@ -51,12 +51,12 @@ { "key": "TCMRepeatedlyBroadcastSleep", "default": "100", - "description": "The broadcast thread should sleep for number of milliseconds." + "description": "The repeatedly broadcast thread should sleep for number of milliseconds." }, { "key": "TCMRepeatedlyBroadCastTotalTimes", "default": "1", - "description": "The number of times TCMs with the same request id should be broadcast within the time out period." + "description": "The number of times TCMs with the same request id should be repeatedly broadcast within the time out period." }, { "key": "TCMNOAcknowledgementDescription", diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 6515ac71f..b89697fc4 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -363,6 +363,12 @@ void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) { bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const { + //Skip repeatedly broadcasting + if(_TCMRepeatedlyBroadCastTotalTimes == 0) + { + return true; + } + bool is_skip_cur_tcm = false; bool is_tcm_hex_found = false; auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex);