Skip to content

Commit

Permalink
Merge pull request #363 from usdot-fhwa-OPS/hotfix_remove_TCM_duplica…
Browse files Browse the repository at this point in the history
…te_TCR_ids3

Remove a single TCM upon receiving one ACK from vehicle.
  • Loading branch information
dan-du-car authored Apr 27, 2022
2 parents a6a48a9 + cc55c90 commit ca1af83
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 37 deletions.
87 changes: 51 additions & 36 deletions src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) {
_not_ACK_TCMs = std::make_shared<multimap<string, tsm5EncodedMessage>>();
_tcm_broadcast_times = std::make_shared<std::multimap<string, TCMBroadcastMetadata>>();
_tcm_broadcast_starting_time = std::make_shared<std::map<string, std::time_t>>();
std::thread Broadcast_t(&CARMACloudPlugin::Broadcast_TCMs, this);
std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this);
Broadcast_t.detach();

}
Expand Down Expand Up @@ -139,24 +139,45 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl
string CMV_id = ss.str();

std::lock_guard<mutex> lock(_not_ACK_TCMs_mutex);
//The traffic control id should match with the TCM id per CMV (CARMA vehicle).
if(_not_ACK_TCMs->erase(traffic_control_id) <= 0)
auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id);
bool is_tcm_removed = false;
for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++)
{
//The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM.
tsm5EncodedMessage msg = itr->second;
tsm5Message decoded_tsm5_msg = msg.decode_j2735_message();
std::shared_ptr<TestMessage05> msg_j2735_data = decoded_tsm5_msg.get_j2735_data();
if (msg_j2735_data == NULL) {
PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl;
break;
}

if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum))
{
//Remove a single TCM identified by reqid (traffic control id) and msgnum.
_not_ACK_TCMs->erase(itr);
PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl;
is_tcm_removed = true;
break;
}
}
if(!is_tcm_removed)
{
PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << " Not Found in TCM map." << std::endl;
PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl;
}

//Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log
tmx::messages::TmxEventLogMessage event_log_msg;

//acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV
std::transform(acknnowledgement_str.begin(), acknnowledgement_str.end(), acknnowledgement_str.begin(), ::tolower );
acknnowledgement_str.find("1") != std::string::npos ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
int ack = std::stoi(acknnowledgement_str);
ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn);
event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description);
PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl;
this->BroadcastMessage<tmx::messages::TmxEventLogMessage>(event_log_msg);

//send negative ack to carma-cloud if not receiving any ack from CMV. acknowledgement_status__acknowledged = 1
if(acknnowledgement_str.find("1") == std::string::npos )
//Only send negative ack to carma-cloud if receiving any acks from CMV.
if(ack != acknowledgement_status::acknowledgement_status__acknowledged )
{
stringstream sss;
sss << "<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlAcknowledgement><reqid> " << traffic_control_id
Expand Down Expand Up @@ -229,18 +250,7 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
container.load<XML>(ss);
tsm5message.set_contents(container.get_storage().get_tree());
tsm5ENC.encode_j2735_message(tsm5message);
string enc = tsm5ENC.get_encoding();
std::unique_ptr<tsm5EncodedMessage> msg;
msg.reset();
msg.reset(dynamic_cast<tsm5EncodedMessage*>(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING)));
msg->refresh_timestamp();
msg->set_payload(tsm5ENC.get_payload_str());
msg->set_encoding(enc);
msg->set_flags(IvpMsgFlags_RouteDSRC);
msg->addDsrcMetadata(172, 0x8003);
msg->refresh_timestamp();
routeable_message *rMsg = dynamic_cast<routeable_message *>(msg.get());
BroadcastMessage(*rMsg);
BroadcastTCM(tsm5ENC);
PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str();

//Get TCM id
Expand All @@ -261,7 +271,7 @@ void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
}
}

void CARMACloudPlugin::Broadcast_TCMs()
void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM()
{
while(true)
{
Expand All @@ -276,7 +286,7 @@ void CARMACloudPlugin::Broadcast_TCMs()

std::set<string> expired_req_ids;

for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); ++itr )
for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ )
{
string tcmv01_req_id_hex = itr->first;
auto cur_time = (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())).count();
Expand All @@ -290,25 +300,14 @@ void CARMACloudPlugin::Broadcast_TCMs()
continue;
}

std::unique_ptr<tsm5EncodedMessage> msg;
msg.reset();
msg.reset(dynamic_cast<tsm5EncodedMessage*>(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING)));

tsm5EncodedMessage tsm5ENC = itr->second;
string tcm_hex_payload = tsm5ENC.get_payload_str();
if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload))
{
continue;
}
//Broadcast TCM
string enc = tsm5ENC.get_encoding();
msg->refresh_timestamp();
msg->set_payload(tsm5ENC.get_payload_str());
msg->set_encoding(enc);
msg->set_flags(IvpMsgFlags_RouteDSRC);
msg->addDsrcMetadata(172, 0x8003);
msg->refresh_timestamp();
routeable_message *rMsg = dynamic_cast<routeable_message *>(msg.get());
BroadcastMessage(*rMsg);
BroadcastTCM(tsm5ENC);
PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str();
} //END TCMs LOOP

Expand All @@ -324,7 +323,7 @@ void CARMACloudPlugin::Broadcast_TCMs()
//send negative ack to carma-cloud
stringstream sss;
sss << "<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlAcknowledgement><reqid> " << tcmv01_req_id_hex
<< "</reqid><msgnum></msgnum><cmvid></cmvid><acknowledgement>" << acknowledgement_status::acknowledgement_status__not_acknowledged
<< "</reqid><msgnum></msgnum><cmvid></cmvid><acknowledgement>" << acknowledgement_status::acknowledgement_status__not_acknowledged
<< "</acknowledgement><description>" << _TCMNOAcknowledgementDescription
<< "</description></TrafficControlAcknowledgement>";
PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <<endl;
Expand All @@ -346,6 +345,22 @@ void CARMACloudPlugin::Broadcast_TCMs()
}
}

void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) {
//Broadcast TCM
string enc = tsm5ENC.get_encoding();
std::unique_ptr<tsm5EncodedMessage> msg;
msg.reset();
msg.reset(dynamic_cast<tsm5EncodedMessage*>(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING)));
msg->refresh_timestamp();
msg->set_payload(tsm5ENC.get_payload_str());
msg->set_encoding(enc);
msg->set_flags(IvpMsgFlags_RouteDSRC);
msg->addDsrcMetadata(172, 0x8003);
msg->refresh_timestamp();
routeable_message *rMsg = dynamic_cast<routeable_message *>(msg.get());
BroadcastMessage(*rMsg);
}

bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const
{
bool is_skip_cur_tcm = false;
Expand Down
7 changes: 6 additions & 1 deletion src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ class CARMACloudPlugin: public PluginClient {
* @brief: Loop through the received TCMs and broadcast them for the configured duration.
* If it timed out, it would remove the TCMs from the list, and stop broadcasting them.
* ***/
void Broadcast_TCMs();
void TCMAckCheckAndRebroadcastTCM();
/***
* @biref: Add DSRC metadata for TCM and broadcast TCM
* @param: Encoded TCM to broadcast
***/
void BroadcastTCM(tsm5EncodedMessage& tsm5ENC);
/***
* @brief: Determin if stop broadcasting the current TCM
* @param: std::string of decoded TCM request id
Expand Down

0 comments on commit ca1af83

Please sign in to comment.