From 9ac6edac4af7253f1293d420be836c6a6f18ab56 Mon Sep 17 00:00:00 2001 From: paulbourelly999 <77466294+paulbourelly999@users.noreply.github.com> Date: Thu, 4 Apr 2024 11:07:35 -0400 Subject: [PATCH] CXC-28: Update CARMA Cloud Plugin to have configurable IP/Port (#593) # PR Details ## Description Update CARMA Cloud plugin to allow for configurable target IP/Port for sending CARMA Cloud REST Requests ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/46f850ea-5a4d-4796-8487-101bfb750ab5) ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/2d96c045-748c-435b-a1a2-d9febe438905) **NOTE**: Large changeset is due to correcting line ending characters for file. Feel free to double check change set in text diff tool of your choice. Here is what I got ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/7ee70ee2-d952-4ec7-93fe-5bc86eb05ca5) ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/2d8ea908-765d-410d-8b36-5fec0378b199) ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/cdc71fdd-494e-4590-8e5b-dd54ff6f833f) ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/89af2000-79b0-4c5e-9a05-a6ffbc8bccdf) ![image](https://github.com/usdot-fhwa-OPS/V2X-Hub/assets/77466294/80b6cfb7-82e2-42ae-b63b-c0e314e31c5d) ## Related Issue [CXC-28](https://usdot-carma.atlassian.net/browse/CXC-28) ## Motivation and Context To support CARMA Cloud CDASim Integration testing ## How Has This Been Tested? Locally integration tested using devcontainer to ensure configurable parameters were available and modified URL correctly ## Types of changes - [x] Defect fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- src/v2i-hub/CARMACloudPlugin/manifest.json | 20 +- .../CARMACloudPlugin/src/CARMACloudPlugin.cpp | 1310 +++++++++-------- .../CARMACloudPlugin/src/CARMACloudPlugin.h | 2 +- 3 files changed, 673 insertions(+), 659 deletions(-) diff --git a/src/v2i-hub/CARMACloudPlugin/manifest.json b/src/v2i-hub/CARMACloudPlugin/manifest.json index e8241c871..56d7ae9df 100644 --- a/src/v2i-hub/CARMACloudPlugin/manifest.json +++ b/src/v2i-hub/CARMACloudPlugin/manifest.json @@ -28,6 +28,21 @@ "default":"127.0.0.1", "description":"Server IP address for V2X hub" }, + { + "key":"WebServicePort", + "default":"22222", + "description":"Server Port for V2X hub to receive TCM messages" + }, + { + "key":"CARMACloudIP", + "default":"http://127.0.0.1", + "description":"Server IP address for V2X hub" + }, + { + "key":"CARMACloudPort", + "default":"33333", + "description":"Server Port for V2X hub to receive TCM messages" + }, { "key":"fetchTime", "default":"15", @@ -63,11 +78,6 @@ "default": "No response received from CMV after repeatedly broadcast TCMs.", "description": "If the plugin does not receives any aknowledgement from CMV within the configured seconds that match the original TCM, the plugin will create an NO ACK message and display it on UI." }, - { - "key":"WebServicePort", - "default":"22222", - "description":"Server Port for V2X hub to receive TCM messages" - }, { "key":"listTCM", "default":"true", diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp index 4f46a190c..4bd2a567a 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp @@ -1,653 +1,657 @@ -#include "CARMACloudPlugin.h" -#include -#include -#include -using namespace std; -using namespace tmx::messages; -using namespace tmx::utils; -using namespace boost::property_tree; - -namespace CARMACloudPlugin { - - - -/** - * Construct a new CARMACloudPlugin with the given name. - * - * @param name The name to give the plugin for identification purposes - */ -CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { - - UpdateConfigSettings(); - std::lock_guard lock(_cfgLock); - AddMessageFilter < tsm4Message > (this, &CARMACloudPlugin::HandleCARMARequest); - AddMessageFilter < tsm3Message > (this, &CARMACloudPlugin::HandleMobilityOperationMessage); - - // Subscribe to all messages specified by the filters above. - SubscribeToMessages(); - std::thread webthread(&CARMACloudPlugin::StartWebService,this); - webthread.detach(); // wait for the thread to finish - url ="http://127.0.0.1:33333"; // 33333 is the port that will send from v2xhub to carma cloud ## initally was 23665 - base_hb = "/carmacloud/v2xhub"; - base_req = "/carmacloud/tcmreq"; - base_ack = "/carmacloud/tcmack"; - method = "POST"; - _not_ACK_TCMs = std::make_shared>(); - _tcm_broadcast_times = std::make_shared>(); - _tcm_broadcast_starting_time = std::make_shared>(); - std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this); - Broadcast_t.detach(); - -} - - -void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &routeableMsg) -{ - auto carmaRequest = msg.get_j2735_data(); - - - - // convert reqid bytes to hex string. - size_t hexlen = 2; //size of each hex representation with a leading 0 - char reqid[carmaRequest->body.choice.tcrV01.reqid.size * hexlen + 1]; - for (int i = 0; i < carmaRequest->body.choice.tcrV01.reqid.size; i++) - { - sprintf(reqid+(i*hexlen), "%.2X", carmaRequest->body.choice.tcrV01.reqid.buf[i]); - } - - printf("%s\n",reqid); - - long int reqseq = carmaRequest->body.choice.tcrV01.reqseq; - long int scale = carmaRequest->body.choice.tcrV01.scale; - - - int totBounds = carmaRequest->body.choice.tcrV01.bounds.list.count; - int cnt=0; - char bounds_str[5000]; - strcpy(bounds_str,""); - - // get current time - std::time_t tm = std::time(0)/60-fetchtime*24*60; // T minus 24 hours in min - - while(cntbody.choice.tcrV01.bounds.list.array[cnt]->reflat; - long longg = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->reflon; - - - long dtx0 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[0]->deltax; - long dty0 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[0]->deltay; - long dtx1 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[1]->deltax; - long dty1 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[1]->deltay; - long dtx2 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[2]->deltax; - long dty2 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[2]->deltay; - - sprintf(bounds_str+strlen(bounds_str),"%u%ld%ld%ld%ld%ld%ld%ld%ld",oldest,longg,lat,dtx0,dty0,dtx1,dty1,dtx2,dty2); - - cnt++; - - - } - - char xml_str[10000]; - sprintf(xml_str,"%s%ld%ld%s",std::to_string(webport).c_str(),list_tcm.c_str(),reqid, reqseq,scale,bounds_str); - - PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str< strategies_v; - ConvertString2Vector(strategies_v, _strategies); - - //Process incoming MobilityOperation message - auto mobilityOperationMsg = msg.get_j2735_data(); - - //process MobilityOperation strategy - stringstream ss; - ss << mobilityOperationMsg->body.strategy.buf; - string mo_strategy = ss.str(); - std::transform(mo_strategy.begin(), mo_strategy.end(), mo_strategy.begin(), ::tolower ); - - if( std::find( strategies_v.begin(), strategies_v.end(), mo_strategy ) != strategies_v.end() && mo_strategy.find(_TCMAcknowledgementStrategy) != std::string::npos) - { - //Process MobilityOperation strategy params - ss.str(""); - ss << mobilityOperationMsg->body.operationParams.buf; - string strategy_params_str = ss.str(); - std::vector strategy_params_v; - ConvertString2Vector(strategy_params_v, strategy_params_str); - - string even_log_description = GetValueFromStrategyParamsByKey(strategy_params_v, "reason"); - string acknnowledgement_str = GetValueFromStrategyParamsByKey(strategy_params_v, "acknowledgement"); - string traffic_control_id = GetValueFromStrategyParamsByKey(strategy_params_v, "traffic_control_id"); - string msgnum = GetValueFromStrategyParamsByKey(strategy_params_v, "msgnum"); - boost::trim(traffic_control_id); - std::transform(traffic_control_id.begin(), traffic_control_id.end(), traffic_control_id.begin(), ::tolower ); - ss.str(""); - ss << mobilityOperationMsg->header.hostStaticId.buf; - string CMV_id = ss.str(); - - std::lock_guard lock(_not_ACK_TCMs_mutex); - auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id); - bool is_tcm_removed = false; - for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++) - { - //The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM. - tsm5EncodedMessage msg = itr->second; - tsm5Message decoded_tsm5_msg = msg.decode_j2735_message(); - std::shared_ptr msg_j2735_data = decoded_tsm5_msg.get_j2735_data(); - if (msg_j2735_data == NULL) { - PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl; - break; - } - - if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum)) - { - //Remove a single TCM identified by reqid (traffic control id) and msgnum. - _not_ACK_TCMs->erase(itr); - PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl; - is_tcm_removed = true; - break; - } - } - if(!is_tcm_removed) - { - PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl; - } - - //Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log - tmx::messages::TmxEventLogMessage event_log_msg; - - //acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV - int ack = std::stoi(acknnowledgement_str); - ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); - event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description); - PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; - this->BroadcastMessage(event_log_msg); - - //Only send negative ack to carma-cloud if receiving any acks from CMV. - if(ack != acknowledgement_status::acknowledgement_status__acknowledged ) - { - stringstream sss; - sss << " " << traffic_control_id - << ""<< msgnum - <<""<< CMV_id - <<"" << acknowledgement_status::acknowledgement_status__rejected - << "" << even_log_description - << ""; - PLOG(logINFO) << "Sent Negative ACK: "<< sss.str() <bytesAvailable()>0) - { - auto readBytes = socket->readAll(); - if (socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE) - { - //readBytes is compressed in gzip format - st.append(UncompressBytes(readBytes)); - }else{ - st.append(readBytes); - } - } - QByteArray array = st.toLocal8Bit(); - - char* _cloudUpdate = array.data(); // would be the cloud update packet, needs parsing - - - string tcm = _cloudUpdate; - - PLOG(logINFO) << "Received TCM from cloud" << tcm << std::endl; - if(tcm.length() == 0) - { - PLOG(logERROR) << "Received TCM length is zero, and skipped." << std::endl; - return; - } - - //Transform carma-cloud TCM XML to J2735 compatible TCM XML by updating tags - tcm=updateTags(tcm,"",""); - tcm=updateTags(tcm,"",""); - tcm=updateTags(tcm,"TrafficControlParams","params"); - tcm=updateTags(tcm,"TrafficControlGeometry","geometry"); - tcm=updateTags(tcm,"TrafficControlPackage","package"); - - //List of tcm in string format - std::list tcm_sl = FilterTCMs(tcm); - - for(const auto tcm_s: tcm_sl) - { - tsm5Message tsm5message; - tsm5EncodedMessage tsm5ENC; - tmx::message_container_type container; - - - std::stringstream ss; - ss << tcm_s; - - container.load(ss); - tsm5message.set_contents(container.get_storage().get_tree()); - tsm5ENC.encode_j2735_message(tsm5message); - BroadcastTCM(tsm5ENC); - PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); - - //Get TCM id - Id64b_t tcmv01_req_id = tsm5message.get_j2735_data()->body.choice.tcmV01.reqid; - //Translate TCM request id to hex string - ss.str(""); - for(size_t i=0; i < tcmv01_req_id.size; i++) - { - ss << std::setfill('0') << std::setw(2) << std::hex << (unsigned) tcmv01_req_id.buf[i]; - } - string tcmv01_req_id_hex = ss.str(); - //Transform all hex to lower case - std::transform(tcmv01_req_id_hex.begin(), tcmv01_req_id_hex.end(), tcmv01_req_id_hex.begin(), ::tolower ); - if(tcmv01_req_id_hex.length() > 0) - { - //Update map of tcm request id hex and tcm hex - std::lock_guard lock(_not_ACK_TCMs_mutex); - _not_ACK_TCMs->insert({tcmv01_req_id_hex, tsm5ENC}); - } - } -} - -std::list CARMACloudPlugin::FilterTCMs(const std::string& tcm_response) const -{ - std::list tcm_sl = {}; - try - { - std::stringstream iss; - iss << tcm_response; - boost::property_tree::ptree parent_node; - boost::property_tree::read_xml(iss, parent_node); - auto child_nodes = parent_node.get_child_optional("TrafficControlMessageList"); - //The tcm response is a list of TCM - if (child_nodes) - { - for (const auto &p : child_nodes.get()) - { - boost::property_tree::ptree tcm_node; - tcm_node.put_child(p.first, p.second); - std::ostringstream oss; - boost::property_tree::write_xml(oss, tcm_node); - tcm_sl.push_back(oss.str()); - } - }else{ - tcm_sl.push_back(iss.str()); - } - } - catch (const boost::property_tree::xml_parser_error &e) - { - PLOG(logERROR) << "Failed to parse the xml string." << e.what(); - } - return tcm_sl; -} - -void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM() -{ - while(true) - { - std::this_thread::sleep_for(std::chrono::milliseconds(_TCMRepeatedlyBroadcastSleep)); - if(_plugin->state == IvpPluginState_error) - { - break; - } - if(_not_ACK_TCMs->size() > 0) - { - std::lock_guard lock(_not_ACK_TCMs_mutex); - - std::set expired_req_ids; - - for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ ) - { - string tcmv01_req_id_hex = itr->first; - auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); - if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) - { - _tcm_broadcast_starting_time->insert({tcmv01_req_id_hex, cur_time}); - } - else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) - { - expired_req_ids.insert(tcmv01_req_id_hex); - continue; - } - - - tsm5EncodedMessage tsm5ENC = itr->second; - string tcm_hex_payload = tsm5ENC.get_payload_str(); - if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload)) - { - continue; - } - BroadcastTCM(tsm5ENC); - PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str(); - } //END TCMs LOOP - - // For any ids which have expired clean up the maps - for (auto tcmv01_req_id_hex : expired_req_ids) { - //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log - tmx::messages::TmxEventLogMessage event_log_msg; - event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); - event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); - PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; - this->BroadcastMessage(event_log_msg); - - //send negative ack to carma-cloud - stringstream sss; - sss << " " << tcmv01_req_id_hex - << "" << acknowledgement_status::acknowledgement_status__not_acknowledged - << "" << _TCMNOAcknowledgementDescription - << ""; - PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); - //If time out, stop tracking the starting time of the TCMs being broadcast so far - _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); - //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast - _tcm_broadcast_times->erase(tcmv01_req_id_hex); - } - } - else - { - PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl; - _tcm_broadcast_times->clear(); - _tcm_broadcast_starting_time->clear(); - } - } -} - -void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) { - //Broadcast TCM - string enc = tsm5ENC.get_encoding(); - std::unique_ptr msg; - msg.reset(); - msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); - msg->refresh_timestamp(); - msg->set_payload(tsm5ENC.get_payload_str()); - msg->set_encoding(enc); - msg->set_flags(IvpMsgFlags_RouteDSRC); - msg->addDsrcMetadata(0x8003); - msg->refresh_timestamp(); - routeable_message *rMsg = dynamic_cast(msg.get()); - BroadcastMessage(*rMsg); -} - -bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const -{ - //Skip repeatedly broadcasting - if(_TCMRepeatedlyBroadCastTotalTimes == 0) - { - return true; - } - - bool is_skip_cur_tcm = false; - bool is_tcm_hex_found = false; - auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex); - for(auto itr = tcms_metadatas.first; itr !=tcms_metadatas.second; itr ++) - { - string tcm_hex = itr->second.tcm_hex; - int times = itr->second.num_of_times; - if(tcm_hex == tcm_hex_payload) - { - is_tcm_hex_found = true; - if (times >= _TCMRepeatedlyBroadCastTotalTimes) - { - PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been repeatedly broadcast " << times << " times." << std::endl; - //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes - is_skip_cur_tcm = true; - } - else - { - //update the number of times a TCM being broadcast within time out period - times += 1; //Increase by 1 for every iteration - itr->second.num_of_times = times; - PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " repeatedly broadcast " << times << " times." << std::endl; - } - } - } - - if(!is_tcm_hex_found) - { - //Initialize the number of times a TCM being broadcast - TCMBroadcastMetadata t; - t.num_of_times = 1; - t.tcm_hex = tcm_hex_payload; - _tcm_broadcast_times->insert( {tcmv01_req_id_hex, t}); - PLOG(logDEBUG) << " TCMs reqid = "<< tcmv01_req_id_hex << " has been broadcast once."<< std::endl; - } - return is_skip_cur_tcm; -} - -int CARMACloudPlugin::StartWebService() -{ - //Web services - char *placeholderX[1]={0}; - int placeholderC=1; - QCoreApplication a(placeholderC,placeholderX); - - QHostAddress address = QHostAddress(QString::fromStdString (webip)); - quint16 port = static_cast(webport); - - QSharedPointer handler(new OpenAPI::OAIApiRequestHandler()); - handler = QSharedPointer (new OpenAPI::OAIApiRequestHandler()); - - auto router = QSharedPointer::create(); - router->setUpRoutes(); - - QObject::connect(handler.data(), &OpenAPI::OAIApiRequestHandler::requestReceived, [&](QHttpEngine::Socket *socket) { - - CARMAResponseHandler(socket); - }); - - QObject::connect(handler.data(), &OpenAPI::OAIApiRequestHandler::requestReceived, [&](QHttpEngine::Socket *socket) { - - router->processRequest(socket); - }); - - QHttpEngine::Server server(handler.data()); - - if (!server.listen(address, port)) { - qCritical("Unable to listen on the specified port."); - return 1; - } - PLOG(logERROR)<<"CARMACloudPlugin:: Started web service"; - return a.exec(); - -} - - -void CARMACloudPlugin::UpdateConfigSettings() { - std::lock_guard lock(_cfgLock); - GetConfigValue("Frequency", _frequency); - GetConfigValue("WebServiceIP",webip); - GetConfigValue("WebServicePort",webport); - GetConfigValue("fetchTime",fetchtime); - GetConfigValue("MobilityOperationStrategies", _strategies); - GetConfigValue("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut); - GetConfigValue("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription); - GetConfigValue("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes); - GetConfigValue("TCMRepeatedlyBroadcastSleep", _TCMRepeatedlyBroadcastSleep); - GetConfigValue("listTCM",list_tcm); - - -} - -void CARMACloudPlugin::OnConfigChanged(const char *key, const char *value) { - PluginClient::OnConfigChanged(key, value); - UpdateConfigSettings(); -} - -void CARMACloudPlugin::OnStateChange(IvpPluginState state) { - PluginClient::OnStateChange(state); - - if (state == IvpPluginState_registered) { - UpdateConfigSettings(); - } -} - -void CARMACloudPlugin::CloudSendAsync(const string& local_msg,const string& local_url, const string& local_base, const string& local_method) -{ - std::thread t([this, &local_msg, &local_url, &local_base, &local_method](){ - CloudSend(local_msg, local_url, local_base, local_method); - }); - t.detach(); -} - -int CARMACloudPlugin::CloudSend(const string &local_msg, const string& local_url, const string& local_base, const string& local_method) -{ - CURL *req; - CURLcode res; - string urlfull = local_url+local_base; - req = curl_easy_init(); - if(req) { - curl_easy_setopt(req, CURLOPT_URL, urlfull.c_str()); - - if(strcmp(local_method.c_str(),"POST")==0) - { - curl_easy_setopt(req, CURLOPT_POSTFIELDS, local_msg.c_str()); - curl_easy_setopt(req, CURLOPT_TIMEOUT_MS, 1000L); // Request operation complete within max millisecond timeout - res = curl_easy_perform(req); - if(res != CURLE_OK) - { - fprintf(stderr, "curl send failed: %s\n",curl_easy_strerror(res)); - return 1; - } - } - curl_easy_cleanup(req); - } - - return 0; -} - -void CARMACloudPlugin::ConvertString2Vector(std::vector &sub_str_v, const string &str) const{ - stringstream ss; - ss << str; - while (ss.good()) - { - std::string substring; - getline(ss, substring, ','); - std::transform(substring.begin(), substring.end(),substring.begin(), ::tolower ); - sub_str_v.push_back( substring ); - } -} - -string CARMACloudPlugin::GetValueFromStrategyParamsByKey(const std::vector & strategy_params_v, const string& key) const -{ - string value = ""; - for(auto itr = strategy_params_v.begin(); itr != strategy_params_v.end(); itr++) - { - std::pair key_value_pair; - ConvertString2Pair(key_value_pair, *itr); - if (key_value_pair.first.find(key) != std::string::npos) - { - value = key_value_pair.second; - return value; - } - } - return value; -} - -void CARMACloudPlugin::ConvertString2Pair(std::pair &str_pair, const string &str) const -{ - stringstream ss; - ss << str; - size_t count = 0; - string key, value; - while (ss.good()) - { - std::string substring; - getline(ss, substring, ':'); - if(count == 0) - { - std::transform(substring.begin(), substring.end(),substring.begin(), ::tolower ); - key = substring; - count += 1; - }else{ - value += substring + " "; - } - } - str_pair = std::make_pair(key, value); -} - -QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) const -{ - z_stream strm; - strm.zalloc = nullptr;//Refer to zlib docs (https://zlib.net/zlib_how.html) - strm.zfree = nullptr; - strm.opaque = nullptr; - strm.avail_in = compressedBytes.size(); - strm.next_in = (Byte *)compressedBytes.data(); - //checking input z_stream to see if there is any error, eg: invalid data etc. - auto err = inflateInit2(&strm, MAX_WBITS + 16); // gzip input - QByteArray outBuf; - //MAX numbers of bytes stored in a buffer - const int BUFFER_SIZE = 4092; - //There is successful, starting to decompress data - if (err == Z_OK) - { - int isDone = 0; - do - { - char buffer[BUFFER_SIZE] = {0}; - strm.avail_out = BUFFER_SIZE; - strm.next_out = (Byte *)buffer; - //Uncompress finished - isDone = inflate(&strm, Z_FINISH); - outBuf.append(buffer); - } while (Z_STREAM_END != isDone); //Reach the end of stream to be uncompressed - }else{ - PLOG(logWARNING) << "Error initalize stream. Err code = " << err << std::endl; - } - //Finished decompress data stream - inflateEnd(&strm); - return outBuf; -} - -int CARMACloudPlugin::Main() { - - - FILE_LOG(logINFO) << "Starting plugin."; - - while (_plugin->state != IvpPluginState_error) { - - if (IsPluginState(IvpPluginState_registered)) - { - this_thread::sleep_for(chrono::milliseconds(5000)); - } - } - - return (EXIT_SUCCESS); -} -} /* namespace */ - -int main(int argc, char *argv[]) { - return run_plugin < CARMACloudPlugin::CARMACloudPlugin > ("CARMACloudPlugin", argc, argv); -} +#include "CARMACloudPlugin.h" +#include +#include +#include +using namespace std; +using namespace tmx::messages; +using namespace tmx::utils; +using namespace boost::property_tree; + +namespace CARMACloudPlugin { + + + +/** + * Construct a new CARMACloudPlugin with the given name. + * + * @param name The name to give the plugin for identification purposes + */ +CARMACloudPlugin::CARMACloudPlugin(string name) :PluginClient(name) { + + UpdateConfigSettings(); + std::lock_guard lock(_cfgLock); + AddMessageFilter < tsm4Message > (this, &CARMACloudPlugin::HandleCARMARequest); + AddMessageFilter < tsm3Message > (this, &CARMACloudPlugin::HandleMobilityOperationMessage); + + // Subscribe to all messages specified by the filters above. + SubscribeToMessages(); + std::thread webthread(&CARMACloudPlugin::StartWebService,this); + webthread.detach(); // wait for the thread to finish + base_hb = "/carmacloud/v2xhub"; + base_req = "/carmacloud/tcmreq"; + base_ack = "/carmacloud/tcmack"; + method = "POST"; + _not_ACK_TCMs = std::make_shared>(); + _tcm_broadcast_times = std::make_shared>(); + _tcm_broadcast_starting_time = std::make_shared>(); + std::thread Broadcast_t(&CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM, this); + Broadcast_t.detach(); + +} + + +void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &routeableMsg) +{ + auto carmaRequest = msg.get_j2735_data(); + + + + // convert reqid bytes to hex string. + size_t hexlen = 2; //size of each hex representation with a leading 0 + char reqid[carmaRequest->body.choice.tcrV01.reqid.size * hexlen + 1]; + for (int i = 0; i < carmaRequest->body.choice.tcrV01.reqid.size; i++) + { + sprintf(reqid+(i*hexlen), "%.2X", carmaRequest->body.choice.tcrV01.reqid.buf[i]); + } + + printf("%s\n",reqid); + + long int reqseq = carmaRequest->body.choice.tcrV01.reqseq; + long int scale = carmaRequest->body.choice.tcrV01.scale; + + + int totBounds = carmaRequest->body.choice.tcrV01.bounds.list.count; + int cnt=0; + char bounds_str[5000]; + strcpy(bounds_str,""); + + // get current time + std::time_t tm = std::time(0)/60-fetchtime*24*60; // T minus 24 hours in min + + while(cntbody.choice.tcrV01.bounds.list.array[cnt]->reflat; + long longg = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->reflon; + + + long dtx0 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[0]->deltax; + long dty0 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[0]->deltay; + long dtx1 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[1]->deltax; + long dty1 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[1]->deltay; + long dtx2 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[2]->deltax; + long dty2 = carmaRequest->body.choice.tcrV01.bounds.list.array[cnt]->offsets.list.array[2]->deltay; + + sprintf(bounds_str+strlen(bounds_str),"%u%ld%ld%ld%ld%ld%ld%ld%ld",oldest,longg,lat,dtx0,dty0,dtx1,dty1,dtx2,dty2); + + cnt++; + + + } + + char xml_str[10000]; + sprintf(xml_str,"%s%ld%ld%s",std::to_string(webport).c_str(),list_tcm.c_str(),reqid, reqseq,scale,bounds_str); + + PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str< strategies_v; + ConvertString2Vector(strategies_v, _strategies); + + //Process incoming MobilityOperation message + auto mobilityOperationMsg = msg.get_j2735_data(); + + //process MobilityOperation strategy + stringstream ss; + ss << mobilityOperationMsg->body.strategy.buf; + string mo_strategy = ss.str(); + std::transform(mo_strategy.begin(), mo_strategy.end(), mo_strategy.begin(), ::tolower ); + + if( std::find( strategies_v.begin(), strategies_v.end(), mo_strategy ) != strategies_v.end() && mo_strategy.find(_TCMAcknowledgementStrategy) != std::string::npos) + { + //Process MobilityOperation strategy params + ss.str(""); + ss << mobilityOperationMsg->body.operationParams.buf; + string strategy_params_str = ss.str(); + std::vector strategy_params_v; + ConvertString2Vector(strategy_params_v, strategy_params_str); + + string even_log_description = GetValueFromStrategyParamsByKey(strategy_params_v, "reason"); + string acknnowledgement_str = GetValueFromStrategyParamsByKey(strategy_params_v, "acknowledgement"); + string traffic_control_id = GetValueFromStrategyParamsByKey(strategy_params_v, "traffic_control_id"); + string msgnum = GetValueFromStrategyParamsByKey(strategy_params_v, "msgnum"); + boost::trim(traffic_control_id); + std::transform(traffic_control_id.begin(), traffic_control_id.end(), traffic_control_id.begin(), ::tolower ); + ss.str(""); + ss << mobilityOperationMsg->header.hostStaticId.buf; + string CMV_id = ss.str(); + + std::lock_guard lock(_not_ACK_TCMs_mutex); + auto matching_TCMS = _not_ACK_TCMs->equal_range(traffic_control_id); + bool is_tcm_removed = false; + for(auto itr = matching_TCMS.first; itr != matching_TCMS.second; itr++) + { + //The traffic control id should match with the TCM id per CMV (CARMA vehicle) and combines with msgnum to uniquely identify each TCM. + tsm5EncodedMessage msg = itr->second; + tsm5Message decoded_tsm5_msg = msg.decode_j2735_message(); + std::shared_ptr msg_j2735_data = decoded_tsm5_msg.get_j2735_data(); + if (msg_j2735_data == NULL) { + PLOG(logERROR) << "get_j2735_data() on decoded j2735 returned NULL." << std::endl; + break; + } + + if(msg_j2735_data->body.choice.tcmV01.msgnum == stol(msgnum)) + { + //Remove a single TCM identified by reqid (traffic control id) and msgnum. + _not_ACK_TCMs->erase(itr); + PLOG(logINFO) << "Acknowledgement received, traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " removed from TCM map." << std::endl; + is_tcm_removed = true; + break; + } + } + if(!is_tcm_removed) + { + PLOG(logERROR) << "Acknowledgement received, but traffic_control_id =" << traffic_control_id << ", msgnum = "<< msgnum << " NOT found in TCM map." << std::endl; + } + + //Create an event log object for both positive and negative ACK (ackownledgement), and broadcast the event log + tmx::messages::TmxEventLogMessage event_log_msg; + + //acknnowledgement: Flag to indicate whether the received geofence was processed successfully by the CMV. 1 mapping to acknowledged by CMV + int ack = std::stoi(acknnowledgement_str); + ack == acknowledgement_status::acknowledgement_status__acknowledged ? event_log_msg.set_level(IvpLogLevel::IvpLogLevel_info) : event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + event_log_msg.set_description(mo_strategy + ": Traffic control id = " + traffic_control_id + ( CMV_id.length() <= 0 ? "":", CMV Id = " + CMV_id )+ ", reason = " + even_log_description); + PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; + this->BroadcastMessage(event_log_msg); + + //Only send negative ack to carma-cloud if receiving any acks from CMV. + if(ack != acknowledgement_status::acknowledgement_status__acknowledged ) + { + stringstream sss; + sss << " " << traffic_control_id + << ""<< msgnum + <<""<< CMV_id + <<"" << acknowledgement_status::acknowledgement_status__rejected + << "" << even_log_description + << ""; + PLOG(logINFO) << "Sent Negative ACK: "<< sss.str() <bytesAvailable()>0) + { + auto readBytes = socket->readAll(); + if (socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE) + { + //readBytes is compressed in gzip format + st.append(UncompressBytes(readBytes)); + }else{ + st.append(readBytes); + } + } + QByteArray array = st.toLocal8Bit(); + + char* _cloudUpdate = array.data(); // would be the cloud update packet, needs parsing + + + string tcm = _cloudUpdate; + + PLOG(logINFO) << "Received TCM from cloud" << tcm << std::endl; + if(tcm.length() == 0) + { + PLOG(logERROR) << "Received TCM length is zero, and skipped." << std::endl; + return; + } + + //Transform carma-cloud TCM XML to J2735 compatible TCM XML by updating tags + tcm=updateTags(tcm,"",""); + tcm=updateTags(tcm,"",""); + tcm=updateTags(tcm,"TrafficControlParams","params"); + tcm=updateTags(tcm,"TrafficControlGeometry","geometry"); + tcm=updateTags(tcm,"TrafficControlPackage","package"); + + //List of tcm in string format + std::list tcm_sl = FilterTCMs(tcm); + + for(const auto tcm_s: tcm_sl) + { + tsm5Message tsm5message; + tsm5EncodedMessage tsm5ENC; + tmx::message_container_type container; + + + std::stringstream ss; + ss << tcm_s; + + container.load(ss); + tsm5message.set_contents(container.get_storage().get_tree()); + tsm5ENC.encode_j2735_message(tsm5message); + BroadcastTCM(tsm5ENC); + PLOG(logINFO) << " CARMACloud Plugin :: Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + + //Get TCM id + Id64b_t tcmv01_req_id = tsm5message.get_j2735_data()->body.choice.tcmV01.reqid; + //Translate TCM request id to hex string + ss.str(""); + for(size_t i=0; i < tcmv01_req_id.size; i++) + { + ss << std::setfill('0') << std::setw(2) << std::hex << (unsigned) tcmv01_req_id.buf[i]; + } + string tcmv01_req_id_hex = ss.str(); + //Transform all hex to lower case + std::transform(tcmv01_req_id_hex.begin(), tcmv01_req_id_hex.end(), tcmv01_req_id_hex.begin(), ::tolower ); + if(tcmv01_req_id_hex.length() > 0) + { + //Update map of tcm request id hex and tcm hex + std::lock_guard lock(_not_ACK_TCMs_mutex); + _not_ACK_TCMs->insert({tcmv01_req_id_hex, tsm5ENC}); + } + } +} + +std::list CARMACloudPlugin::FilterTCMs(const std::string& tcm_response) const +{ + std::list tcm_sl = {}; + try + { + std::stringstream iss; + iss << tcm_response; + boost::property_tree::ptree parent_node; + boost::property_tree::read_xml(iss, parent_node); + auto child_nodes = parent_node.get_child_optional("TrafficControlMessageList"); + //The tcm response is a list of TCM + if (child_nodes) + { + for (const auto &p : child_nodes.get()) + { + boost::property_tree::ptree tcm_node; + tcm_node.put_child(p.first, p.second); + std::ostringstream oss; + boost::property_tree::write_xml(oss, tcm_node); + tcm_sl.push_back(oss.str()); + } + }else{ + tcm_sl.push_back(iss.str()); + } + } + catch (const boost::property_tree::xml_parser_error &e) + { + PLOG(logERROR) << "Failed to parse the xml string." << e.what(); + } + return tcm_sl; +} + +void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM() +{ + while(true) + { + std::this_thread::sleep_for(std::chrono::milliseconds(_TCMRepeatedlyBroadcastSleep)); + if(_plugin->state == IvpPluginState_error) + { + break; + } + if(_not_ACK_TCMs->size() > 0) + { + std::lock_guard lock(_not_ACK_TCMs_mutex); + + std::set expired_req_ids; + + for( auto itr = _not_ACK_TCMs->begin(); itr!=_not_ACK_TCMs->end(); itr++ ) + { + string tcmv01_req_id_hex = itr->first; + auto cur_time = (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch())).count(); + if (_tcm_broadcast_starting_time->count(tcmv01_req_id_hex) == 0) + { + _tcm_broadcast_starting_time->insert({tcmv01_req_id_hex, cur_time}); + } + else if ( (cur_time - _tcm_broadcast_starting_time->at(tcmv01_req_id_hex)) > _TCMRepeatedlyBroadcastTimeOut ) + { + expired_req_ids.insert(tcmv01_req_id_hex); + continue; + } + + + tsm5EncodedMessage tsm5ENC = itr->second; + string tcm_hex_payload = tsm5ENC.get_payload_str(); + if(IsSkipBroadcastCurTCM(tcmv01_req_id_hex, tcm_hex_payload)) + { + continue; + } + BroadcastTCM(tsm5ENC); + PLOG(logINFO) << " CARMACloud Plugin :: Repeatedly Broadcast tsm5:: " << tsm5ENC.get_payload_str(); + } //END TCMs LOOP + + // For any ids which have expired clean up the maps + for (auto tcmv01_req_id_hex : expired_req_ids) { + //Create an event log object for both NO ACK (ackownledgement), and broadcast the event log + tmx::messages::TmxEventLogMessage event_log_msg; + event_log_msg.set_level(IvpLogLevel::IvpLogLevel_warn); + event_log_msg.set_description(_TCMAcknowledgementStrategy + ": " + _TCMNOAcknowledgementDescription + " Traffic control id = " + tcmv01_req_id_hex); + PLOG(logDEBUG) << "event_log_msg " << event_log_msg << std::endl; + this->BroadcastMessage(event_log_msg); + + //send negative ack to carma-cloud + stringstream sss; + sss << " " << tcmv01_req_id_hex + << "" << acknowledgement_status::acknowledgement_status__not_acknowledged + << "" << _TCMNOAcknowledgementDescription + << ""; + PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); + //If time out, stop tracking the starting time of the TCMs being broadcast so far + _tcm_broadcast_starting_time->erase(tcmv01_req_id_hex); + //If time out, stop tracking the number of times the TCMs ( that has the same TCR reqid) being broadcast + _tcm_broadcast_times->erase(tcmv01_req_id_hex); + } + } + else + { + PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl; + _tcm_broadcast_times->clear(); + _tcm_broadcast_starting_time->clear(); + } + } +} + +void CARMACloudPlugin::BroadcastTCM(tsm5EncodedMessage& tsm5ENC) { + //Broadcast TCM + string enc = tsm5ENC.get_encoding(); + std::unique_ptr msg; + msg.reset(); + msg.reset(dynamic_cast(factory.NewMessage(api::MSGSUBTYPE_TESTMESSAGE05_STRING))); + msg->refresh_timestamp(); + msg->set_payload(tsm5ENC.get_payload_str()); + msg->set_encoding(enc); + msg->set_flags(IvpMsgFlags_RouteDSRC); + msg->addDsrcMetadata(0x8003); + msg->refresh_timestamp(); + routeable_message *rMsg = dynamic_cast(msg.get()); + BroadcastMessage(*rMsg); +} + +bool CARMACloudPlugin::IsSkipBroadcastCurTCM(const string & tcmv01_req_id_hex, const string & tcm_hex_payload ) const +{ + //Skip repeatedly broadcasting + if(_TCMRepeatedlyBroadCastTotalTimes == 0) + { + return true; + } + + bool is_skip_cur_tcm = false; + bool is_tcm_hex_found = false; + auto tcms_metadatas = _tcm_broadcast_times->equal_range(tcmv01_req_id_hex); + for(auto itr = tcms_metadatas.first; itr !=tcms_metadatas.second; itr ++) + { + string tcm_hex = itr->second.tcm_hex; + int times = itr->second.num_of_times; + if(tcm_hex == tcm_hex_payload) + { + is_tcm_hex_found = true; + if (times >= _TCMRepeatedlyBroadCastTotalTimes) + { + PLOG(logDEBUG) << "SKIP broadcasting as TCMs reqid = " << tcmv01_req_id_hex<< " has been repeatedly broadcast " << times << " times." << std::endl; + //Skip the broadcasting logic below if the TCMs with this request id has already been broadcast more than _TCMRepeatedlyBroadCastTotalTimes + is_skip_cur_tcm = true; + } + else + { + //update the number of times a TCM being broadcast within time out period + times += 1; //Increase by 1 for every iteration + itr->second.num_of_times = times; + PLOG(logDEBUG) << " TCMs reqid = " << tcmv01_req_id_hex<< " repeatedly broadcast " << times << " times." << std::endl; + } + } + } + + if(!is_tcm_hex_found) + { + //Initialize the number of times a TCM being broadcast + TCMBroadcastMetadata t; + t.num_of_times = 1; + t.tcm_hex = tcm_hex_payload; + _tcm_broadcast_times->insert( {tcmv01_req_id_hex, t}); + PLOG(logDEBUG) << " TCMs reqid = "<< tcmv01_req_id_hex << " has been broadcast once."<< std::endl; + } + return is_skip_cur_tcm; +} + +int CARMACloudPlugin::StartWebService() +{ + //Web services + char *placeholderX[1]={0}; + int placeholderC=1; + QCoreApplication a(placeholderC,placeholderX); + + QHostAddress address = QHostAddress(QString::fromStdString (webip)); + quint16 port = static_cast(webport); + + QSharedPointer handler(new OpenAPI::OAIApiRequestHandler()); + handler = QSharedPointer (new OpenAPI::OAIApiRequestHandler()); + + auto router = QSharedPointer::create(); + router->setUpRoutes(); + + QObject::connect(handler.data(), &OpenAPI::OAIApiRequestHandler::requestReceived, [&](QHttpEngine::Socket *socket) { + + CARMAResponseHandler(socket); + }); + + QObject::connect(handler.data(), &OpenAPI::OAIApiRequestHandler::requestReceived, [&](QHttpEngine::Socket *socket) { + + router->processRequest(socket); + }); + + QHttpEngine::Server server(handler.data()); + + if (!server.listen(address, port)) { + qCritical("Unable to listen on the specified port."); + return 1; + } + PLOG(logINFO)<<"CARMACloudPlugin:: Started web service"; + return a.exec(); + +} + + +void CARMACloudPlugin::UpdateConfigSettings() { + std::lock_guard lock(_cfgLock); + GetConfigValue("Frequency", _frequency); + GetConfigValue("WebServiceIP",webip); + GetConfigValue("WebServicePort",webport); + GetConfigValue("fetchTime",fetchtime); + GetConfigValue("MobilityOperationStrategies", _strategies); + GetConfigValue("TCMRepeatedlyBroadcastTimeOut",_TCMRepeatedlyBroadcastTimeOut); + GetConfigValue("TCMNOAcknowledgementDescription", _TCMNOAcknowledgementDescription); + GetConfigValue("TCMRepeatedlyBroadCastTotalTimes", _TCMRepeatedlyBroadCastTotalTimes); + GetConfigValue("TCMRepeatedlyBroadcastSleep", _TCMRepeatedlyBroadcastSleep); + GetConfigValue("listTCM",list_tcm); + std::string carma_cloud_ip; + uint carma_cloud_port; + GetConfigValue("CARMACloudIP",carma_cloud_ip); + GetConfigValue("CARMACloudPort",carma_cloud_port); + carma_cloud_url = carma_cloud_ip + ":" + std::to_string(carma_cloud_port); + PLOG(logDEBUG) << "Setting CARMA Cloud URL to " << carma_cloud_url << std::endl; + +} + +void CARMACloudPlugin::OnConfigChanged(const char *key, const char *value) { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); +} + +void CARMACloudPlugin::OnStateChange(IvpPluginState state) { + PluginClient::OnStateChange(state); + + if (state == IvpPluginState_registered) { + UpdateConfigSettings(); + } +} + +void CARMACloudPlugin::CloudSendAsync(const string& local_msg,const string& local_url, const string& local_base, const string& local_method) +{ + std::thread t([this, &local_msg, &local_url, &local_base, &local_method](){ + CloudSend(local_msg, local_url, local_base, local_method); + }); + t.detach(); +} + +int CARMACloudPlugin::CloudSend(const string &local_msg, const string& local_url, const string& local_base, const string& local_method) +{ + CURL *req; + CURLcode res; + string urlfull = local_url+local_base; + req = curl_easy_init(); + if(req) { + curl_easy_setopt(req, CURLOPT_URL, urlfull.c_str()); + + if(strcmp(local_method.c_str(),"POST")==0) + { + curl_easy_setopt(req, CURLOPT_POSTFIELDS, local_msg.c_str()); + curl_easy_setopt(req, CURLOPT_TIMEOUT_MS, 1000L); // Request operation complete within max millisecond timeout + res = curl_easy_perform(req); + if(res != CURLE_OK) + { + fprintf(stderr, "curl send failed: %s\n",curl_easy_strerror(res)); + return 1; + } + } + curl_easy_cleanup(req); + } + + return 0; +} + +void CARMACloudPlugin::ConvertString2Vector(std::vector &sub_str_v, const string &str) const{ + stringstream ss; + ss << str; + while (ss.good()) + { + std::string substring; + getline(ss, substring, ','); + std::transform(substring.begin(), substring.end(),substring.begin(), ::tolower ); + sub_str_v.push_back( substring ); + } +} + +string CARMACloudPlugin::GetValueFromStrategyParamsByKey(const std::vector & strategy_params_v, const string& key) const +{ + string value = ""; + for(auto itr = strategy_params_v.begin(); itr != strategy_params_v.end(); itr++) + { + std::pair key_value_pair; + ConvertString2Pair(key_value_pair, *itr); + if (key_value_pair.first.find(key) != std::string::npos) + { + value = key_value_pair.second; + return value; + } + } + return value; +} + +void CARMACloudPlugin::ConvertString2Pair(std::pair &str_pair, const string &str) const +{ + stringstream ss; + ss << str; + size_t count = 0; + string key, value; + while (ss.good()) + { + std::string substring; + getline(ss, substring, ':'); + if(count == 0) + { + std::transform(substring.begin(), substring.end(),substring.begin(), ::tolower ); + key = substring; + count += 1; + }else{ + value += substring + " "; + } + } + str_pair = std::make_pair(key, value); +} + +QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) const +{ + z_stream strm; + strm.zalloc = nullptr;//Refer to zlib docs (https://zlib.net/zlib_how.html) + strm.zfree = nullptr; + strm.opaque = nullptr; + strm.avail_in = compressedBytes.size(); + strm.next_in = (Byte *)compressedBytes.data(); + //checking input z_stream to see if there is any error, eg: invalid data etc. + auto err = inflateInit2(&strm, MAX_WBITS + 16); // gzip input + QByteArray outBuf; + //MAX numbers of bytes stored in a buffer + const int BUFFER_SIZE = 4092; + //There is successful, starting to decompress data + if (err == Z_OK) + { + int isDone = 0; + do + { + char buffer[BUFFER_SIZE] = {0}; + strm.avail_out = BUFFER_SIZE; + strm.next_out = (Byte *)buffer; + //Uncompress finished + isDone = inflate(&strm, Z_FINISH); + outBuf.append(buffer); + } while (Z_STREAM_END != isDone); //Reach the end of stream to be uncompressed + }else{ + PLOG(logWARNING) << "Error initalize stream. Err code = " << err << std::endl; + } + //Finished decompress data stream + inflateEnd(&strm); + return outBuf; +} + +int CARMACloudPlugin::Main() { + + + FILE_LOG(logINFO) << "Starting plugin."; + + while (_plugin->state != IvpPluginState_error) { + + if (IsPluginState(IvpPluginState_registered)) + { + this_thread::sleep_for(chrono::milliseconds(5000)); + } + } + + return (EXIT_SUCCESS); +} +} /* namespace */ + +int main(int argc, char *argv[]) { + return run_plugin < CARMACloudPlugin::CARMACloudPlugin > ("CARMACloudPlugin", argc, argv); +} diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index d6b88c2da..066e65756 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -174,7 +174,7 @@ class CARMACloudPlugin: public PluginClient { pthread_mutex_t _timMutex = PTHREAD_MUTEX_INITIALIZER; J2735MessageFactory factory; uint64_t _frequency = 0; - string url; + string carma_cloud_url; string base_hb; string base_req; string method;