diff --git a/src/v2i-hub/CARMACloudPlugin/manifest.json b/src/v2i-hub/CARMACloudPlugin/manifest.json index d6098af7c..84d9cbae8 100644 --- a/src/v2i-hub/CARMACloudPlugin/manifest.json +++ b/src/v2i-hub/CARMACloudPlugin/manifest.json @@ -48,6 +48,16 @@ "default": "1000", "description": "After it receives TCM from carma cloud, it repeatedly broadcasts TCM until TCMRepeatTimeOut milliseconds." }, + { + "key": "TCMRepeatedlyBroadcastSleep", + "default": "100", + "description": "The repeatedly broadcast thread should sleep for number of milliseconds." + }, + { + "key": "TCMRepeatedlyBroadCastTotalTimes", + "default": "1", + "description": "The number of times TCMs with the same request id should be repeatedly broadcast within the time out period." + }, { "key": "TCMNOAcknowledgementDescription", "default": "No response received from CMV after repeatedly broadcast TCMs.", diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 67eb6678d..b89697fc4 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -32,7 +32,9 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { base_ack = "/carmacloud/tcmack"; method = "POST"; _not_ACK_TCMs = std::make_shared>(); - std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this); + _tcm_broadcast_times = std::make_shared>(); + _tcm_broadcast_starting_time = std::make_shared>(); + std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this); Broadcast_t.detach(); } @@ -94,6 +96,14 @@ void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &r PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str< lock(_not_ACK_TCMs_mutex); + if(_not_ACK_TCMs->erase(reqid) <= 0) + { + PLOG(logDEBUG) << "TCR request id =" << reqid << " Not Found in TCM map." << std::endl; + }else{ + PLOG(logDEBUG) << "TCR request id =" << reqid << " Found in TCM map. Remove the existing TCMs with the same TCR request id." << std::endl; + } } void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg){ @@ -129,24 +139,45 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl string CMV_id = ss.str(); std::lock_guard lock(_not_ACK_TCMs_mutex); - //The traffic control id should match with the TCM id per CMV (CARMA vehicle). - if(_not_ACK_TCMs->erase(traffic_control_id) <= 0) + auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id); + bool is_tcm_removed = false; + for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++) + { + //The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM. + tsm5EncodedMessage msg = itr->second; + tsm5Message decoded_tsm5_msg = msg.decode_j2735_message(); + std::shared_ptr msg_j2735_data = decoded_tsm5_msg.get_j2735_data(); + if (msg_j2735_data == NULL) { + PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl; + break; + } + + if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum)) + { + //Remove a single TCM identified by reqid (traffic control id) and msgnum. + _not_ACK_TCMs->erase(itr); + PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl; + is_tcm_removed = true; + break; + } + } + if(!is_tcm_removed) { - PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << " Not Found in TCM map." << std::endl; + PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl; } //Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log tmx::messages::TmxEventLogMessage event_log_msg; //acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV - std::transform(acknnowledgement_str.begin(), acknnowledgement_str.end(), acknnowledgement_str.begin(), ::tolower ); - acknnowledgement_str.find("1") != std::string::npos ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + int ack = std::stoi(acknnowledgement_str); + ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description); PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; this->BroadcastMessage(event_log_msg); - //send negative ack to carma-cloud if not receiving any ack from CMV. acknowledgement_status__acknowledged = 1 - if(acknnowledgement_str.find("1") == std::string::npos ) + //Only send negative ack to carma-cloud if receiving any acks from CMV. + if(ack != acknowledgement_status::acknowledgement_status__acknowledged ) { stringstream sss; sss << " " << traffic_control_id @@ -219,6 +250,8 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) container.load(ss); tsm5message.set_contents(container.get_storage().get_tree()); tsm5ENC.encode_j2735_message(tsm5message); + BroadcastTCM(tsm5ENC); + PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); //Get TCM id Id64b_t tcmv01_req_id = tsm5message.get_j2735_data()->body.choice.tcmV01.reqid; @@ -238,13 +271,11 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket) } } -void CARMACloudPlugin::Broadcast_TCMs() +void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM() { - std::time_t start_time = 0, cur_time = 0; - bool is_started_broadcasting = false; while(true) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(_TCMRepeatedlyBroadcastSleep)); if(_plugin->state == IvpPluginState_error) { break; @@ -252,66 +283,130 @@ void CARMACloudPlugin::Broadcast_TCMs() if(_not_ACK_TCMs->size() > 0) { std::lock_guard lock(_not_ACK_TCMs_mutex); - for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr ) + + std::set expired_req_ids; + + for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ ) { - string tcmv01_req_id_hex = itr->first; - if(!is_started_broadcasting) + string tcmv01_req_id_hex = itr->first; + auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); + if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) { - start_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); - }else{ - cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); + _tcm_broadcast_starting_time->insert({tcmv01_req_id_hex, cur_time}); } - is_started_broadcasting = true; - if((cur_time - start_time) > _TCMRepeatedlyBroadcastTimeOut) + else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) { - _not_ACK_TCMs->erase(tcmv01_req_id_hex); - start_time = 0; - cur_time = 0; - is_started_broadcasting = false; - - //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log - tmx::messages::TmxEventLogMessage event_log_msg; - event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); - event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); - PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; - this->BroadcastMessage(event_log_msg); - - //send negative ack to carma-cloud - stringstream sss; - sss << " " << tcmv01_req_id_hex - << "" << acknowledgement_status::acknowledgement_status__not_acknowledged - << "" << _TCMNOAcknowledgementDescription - << ""; - PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() < msg; - msg.reset(); - msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + + tsm5EncodedMessage tsm5ENC = itr->second; - string enc = tsm5ENC.get_encoding(); - msg->refresh_timestamp(); - msg->set_payload(tsm5ENC.get_payload_str()); - msg->set_encoding(enc); - msg->set_flags(IvpMsgFlags_RouteDSRC); - msg->addDsrcMetadata(172, 0x8003); - msg->refresh_timestamp(); - - routeable_message *rMsg = dynamic_cast(msg.get()); - BroadcastMessage(*rMsg); - PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + string tcm_hex_payload = tsm5ENC.get_payload_str(); + if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload)) + { + continue; + } + BroadcastTCM(tsm5ENC); + PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + } //END TCMs LOOP + + // For any ids which have expired clean up the maps + for (auto tcmv01_req_id_hex : expired_req_ids) { + //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log + tmx::messages::TmxEventLogMessage event_log_msg; + event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); + PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; + this->BroadcastMessage(event_log_msg); + + //send negative ack to carma-cloud + stringstream sss; + sss << " " << tcmv01_req_id_hex + << "" << acknowledgement_status::acknowledgement_status__not_acknowledged + << "" << _TCMNOAcknowledgementDescription + << ""; + PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); + //If time out, stop tracking the starting time of the TCMs being broadcast so far + _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); + //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast + _tcm_broadcast_times->erase(tcmv01_req_id_hex); } } else { - start_time = 0; - cur_time = 0; - is_started_broadcasting = false; + PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl; + _tcm_broadcast_times->clear(); + _tcm_broadcast_starting_time->clear(); } } } +void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) { + //Broadcast TCM + string enc = tsm5ENC.get_encoding(); + std::unique_ptr msg; + msg.reset(); + msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + msg->refresh_timestamp(); + msg->set_payload(tsm5ENC.get_payload_str()); + msg->set_encoding(enc); + msg->set_flags(IvpMsgFlags_RouteDSRC); + msg->addDsrcMetadata(172, 0x8003); + msg->refresh_timestamp(); + routeable_message *rMsg = dynamic_cast(msg.get()); + BroadcastMessage(*rMsg); +} + +bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const +{ + //Skip repeatedly broadcasting + if(_TCMRepeatedlyBroadCastTotalTimes == 0) + { + return true; + } + + bool is_skip_cur_tcm = false; + bool is_tcm_hex_found = false; + auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex); + for(auto itr = tcms_metadatas.first; itr !=tcms_metadatas.second; itr ++) + { + string tcm_hex = itr->second.tcm_hex; + int times = itr->second.num_of_times; + if(tcm_hex == tcm_hex_payload) + { + is_tcm_hex_found = true; + if (times >= _TCMRepeatedlyBroadCastTotalTimes) + { + PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been repeatedly broadcast " << times << " times." << std::endl; + //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes + is_skip_cur_tcm = true; + } + else + { + //update the number of times a TCM being broadcast within time out period + times += 1; //Increase by 1 for every iteration + itr->second.num_of_times = times; + PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " repeatedly broadcast " << times << " times." << std::endl; + } + } + } + + if(!is_tcm_hex_found) + { + //Initialize the number of times a TCM being broadcast + TCMBroadcastMetadata t; + t.num_of_times = 1; + t.tcm_hex = tcm_hex_payload; + _tcm_broadcast_times->insert( {tcmv01_req_id_hex, t}); + PLOG(logDEBUG) << " TCMs reqid = "<< tcmv01_req_id_hex << " has been broadcast once."<< std::endl; + } + return is_skip_cur_tcm; +} + int CARMACloudPlugin::StartWebService() { //Web services @@ -359,6 +454,8 @@ void CARMACloudPlugin::UpdateConfigSettings() { GetConfigValue("MobilityOperationStrategies", _strategies); GetConfigValue("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut); GetConfigValue("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription); + GetConfigValue("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes); + GetConfigValue("TCMRepeatedlyBroadcastSleep", _TCMRepeatedlyBroadcastSleep); } diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 9cece33bf..493dc32e8 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -148,7 +148,18 @@ class CARMACloudPlugin: public PluginClient { * @brief: Loop through the received TCMs and broadcast them for the configured duration. * If it timed out, it would remove the TCMs from the list, and stop broadcasting them. * ***/ - void Broadcast_TCMs(); + void TCMAckCheckAndRebroadcastTCM(); + /*** + * @biref: Add DSRC metadata for TCM and broadcast TCM + * @param: Encoded TCM to broadcast + ***/ + void BroadcastTCM(tsm5EncodedMessage& tsm5ENC); + /*** + * @brief: Determin if stop broadcasting the current TCM + * @param: std::string of decoded TCM request id + * @param: key string TCM hex payload + * **/ + bool IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const; private: @@ -165,15 +176,31 @@ class CARMACloudPlugin: public PluginClient { //Comma separated string for list of strategies from MobilityOperation messages std::string _strategies; + struct TCMBroadcastMetadata + { + string tcm_hex; + int num_of_times; + }; //Used to lock the shared TCMs resource std::mutex _not_ACK_TCMs_mutex; //An associated array to keep track of TCMs that are not acknowledged std::shared_ptr> _not_ACK_TCMs; + //TCM repeatedly broadcast time out in unit of second uint16_t _TCMRepeatedlyBroadcastTimeOut = 0; + //Keep track of the starting time (unit of milliseconds) TCMs with same TCR request ids being broadcast Key: tcm request id, value: the start broadcasting timestamp + std::shared_ptr> _tcm_broadcast_starting_time; + std::string _TCMNOAcknowledgementDescription = ""; - const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; + //Total number of times repeatedly broadcast TCMs with the same request id + int _TCMRepeatedlyBroadCastTotalTimes = 0; + //Keep track of the number of times repeatedly broadcast TCMS. Key: tcm hex string, value: the number of times + std::shared_ptr< std::multimap> _tcm_broadcast_times; + + const string _TCMAcknowledgementStrategy = "carma3/geofence_acknowledgement"; + int _TCMRepeatedlyBroadcastSleep = 100; + }; std::mutex _cfgLock; }