diff --git a/.circleci/config.yml b/.circleci/config.yml index ff7ba8583..d4295322b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -273,13 +273,8 @@ workflows: requires: - docker_build_push_develop - automated_release: - requires: - - docker_build_push - - arm_build_push filters: tags: only: /^[\.0-9]*$/ branches: ignore: /.*/ - - diff --git a/README.md b/README.md index 29aa796c8..7d176be81 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![CircleCI](https://circleci.com/gh/usdot-fhwa-OPS/V2X-Hub.svg?style=svg)](https://circleci.com/gh/usdot-fhwa-OPS/V2X-Hub) | [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=usdot-fhwa-ops_V2X-Hub&metric=alert_status)](https://sonarcloud.io/dashboard?id=usdot-fhwa-ops_V2X-Hub) | ## Release Notes -As of Feb 3rd, 2022, the V2X Hub software platform is on version 7.1. See more about release 7.1 here: [V2X Hub Release Notes]() +As of July 29th, 2022, the V2X Hub software platform is on version 7.3.1 See more about release 7.3.1 here: [V2X Hub Release Notes]() # Overview In order to bring infrastructure components into the Connected Vehicle architecture, you need software that will facilitate the exchange of data in a format that can be understood by both vehicles and infrastructure devices The V2X Hub, takes in data from vehicles via Basic Safety Messages (BSM) in a Society of Automotive Engineers (SAE) standard format and translates the data to a National Transportation Communications for ITS Protocol (NTCIP) that infrastructure components can understand. And vice versa. It translates Signal Phase and Timing (SPaT) data from NTCIP to SAE and sends it to the Roadside Unit (RSU) for broadcast to mobile devices, including vehicles. diff --git a/configuration/amd64/docker-compose.yml b/configuration/amd64/docker-compose.yml index 0583520ef..7ed58d51f 100755 --- a/configuration/amd64/docker-compose.yml +++ b/configuration/amd64/docker-compose.yml @@ -19,7 +19,7 @@ services: - ./mysql/port_drayage.sql:/docker-entrypoint-initdb.d/port_drayage.sql php: - image: usdotfhwaops/php:7.3.0 + image: usdotfhwaops/php:7.3.1 container_name: php network_mode: host depends_on: @@ -29,7 +29,7 @@ services: tty: true v2xhub: - image: usdotfhwaops/v2xhubamd:7.3.0 + image: usdotfhwaops/v2xhubamd:7.3.1 container_name: v2xhub network_mode: host restart: always @@ -43,7 +43,7 @@ services: - ./logs:/var/log/tmx - ./MAP:/var/www/plugins/MAP port_drayage_webservice: - image: usdotfhwaops/port-drayage-webservice:7.3.0 + image: usdotfhwaops/port-drayage-webservice:7.3.1 container_name: port_drayage_webservice network_mode: host secrets: diff --git a/configuration/arm64/docker-compose.yml b/configuration/arm64/docker-compose.yml index 8b208bba8..8522f715f 100644 --- a/configuration/arm64/docker-compose.yml +++ b/configuration/arm64/docker-compose.yml @@ -19,7 +19,7 @@ services: - ./mysql/port_drayage.sql:/docker-entrypoint-initdb.d/port_drayage.sql php: - image: usdotfhwaops/php_arm:7.3.0 + image: usdotfhwaops/php_arm:7.3.1 container_name: php network_mode: host depends_on: @@ -29,7 +29,7 @@ services: tty: true v2xhub: - image: usdotfhwaops/v2xhubarm:7.3.0 + image: usdotfhwaops/v2xhubarm:7.3.1 container_name: v2xhub network_mode: host restart: always @@ -43,7 +43,7 @@ services: - ./logs:/var/log/tmx - ./MAP:/var/www/plugins/MAP port_drayage_webservice: - image: usdotfhwaops/port-drayage-webservice_arm:7.3.0 + image: usdotfhwaops/port-drayage-webservice_arm:7.3.1 container_name: port_drayage_webservice network_mode: host secrets: diff --git a/docs/Release_notes.md b/docs/Release_notes.md index 82f5dd340..cde93e7bb 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,5 +1,18 @@ V2X-Hub Release Notes --------------------------------- +Version 7.3.1, released July 29th, 2022 +-------------------------------------------------------- +**Summary:** +V2X Hub release version 7.3.1 is a hotfix release for 7.3.0. The fixes primarily occurred during the Implementation of IHP2 Speed Harmonization algorithm in Carma-cloud application. + +Enhancements in this release: + - Issue 262: Updated CARMA Streets plugin to receive and decode Mobility Path messages into JSON through Kafka. +Bug fixes in this release: + - Issue 392: Fixed Large latencies experienced between V2XHub receiving a Traffic Control Request (TCR) and broadcasting corresponding Traffic Control Messages (TCMs). + - Issue 394: Fixed V2X Hub TCMs are broadcasted more than the configured maximum quantity, and are broadcasted after an acknowledgement is received. + - Issue 404: Fixed V2xhub cannot encode the TCM if the package detail has minplatoonhdwy tag. + + Version 7.3.0, released June 14th, 2022 -------------------------------------------------------- **Summary:** diff --git a/src/tmx/Asn_J2735/src/r63/TrafficControlDetail.c b/src/tmx/Asn_J2735/src/r63/TrafficControlDetail.c index 0fc30ac1a..f3e78883a 100644 --- a/src/tmx/Asn_J2735/src/r63/TrafficControlDetail.c +++ b/src/tmx/Asn_J2735/src/r63/TrafficControlDetail.c @@ -504,11 +504,17 @@ static asn_per_constraints_t asn_PER_memb_minvehocc_constr_42 CC_NOTUSED = { { APC_UNCONSTRAINED, -1, -1, 0, 0 }, 0, 0 /* No PER value map */ }; +static asn_oer_constraints_t asn_OER_memb_maxplatoonsize_constr_43 CC_NOTUSED = { + { 1, 1 } /* (1..63) */, + -1}; static asn_per_constraints_t asn_PER_memb_maxplatoonsize_constr_43 CC_NOTUSED = { { APC_CONSTRAINED, 6, 6, 1, 63 } /* (1..63) */, { APC_UNCONSTRAINED, -1, -1, 0, 0 }, 0, 0 /* No PER value map */ }; +static asn_oer_constraints_t asn_OER_memb_minplatoonhdwy_constr_44 CC_NOTUSED = { + { 2, 1 } /* (0..2047) */, + -1}; static asn_per_constraints_t asn_PER_memb_minplatoonhdwy_constr_44 CC_NOTUSED = { { APC_CONSTRAINED, 11, 11, 0, 2047 } /* (0..2047) */, { APC_UNCONSTRAINED, -1, -1, 0, 0 }, @@ -982,18 +988,18 @@ asn_TYPE_member_t asn_MBR_TrafficControlDetail_1[] = { (ASN_TAG_CLASS_CONTEXT | (20 << 2)), -1, /* IMPLICIT tag at current level */ &asn_DEF_NativeInteger, - memb_maxplatoonsize_constraint_1, - &asn_PER_memb_maxplatoonsize_constr_43, 0, + { &asn_OER_memb_maxplatoonsize_constr_43, &asn_PER_memb_maxplatoonsize_constr_43, memb_maxplatoonsize_constraint_1 }, + 0, 0, /* No default value */ "maxplatoonsize" }, { ATF_NOFLAGS, 0, offsetof(struct TrafficControlDetail, choice.minplatoonhdwy), (ASN_TAG_CLASS_CONTEXT | (21 << 2)), -1, /* IMPLICIT tag at current level */ &asn_DEF_NativeInteger, - memb_minplatoonhdwy_constraint_1, - &asn_PER_memb_minplatoonhdwy_constr_44, 0, + { &asn_OER_memb_minplatoonhdwy_constr_44, &asn_PER_memb_minplatoonhdwy_constr_44, memb_minplatoonhdwy_constraint_1 }, + 0, 0, /* No default value */ "minplatoonhdwy" }, }; diff --git a/src/tmx/TmxUtils/test/J2735MessageTest.cpp b/src/tmx/TmxUtils/test/J2735MessageTest.cpp index a5bb3642e..a8425da69 100644 --- a/src/tmx/TmxUtils/test/J2735MessageTest.cpp +++ b/src/tmx/TmxUtils/test/J2735MessageTest.cpp @@ -555,4 +555,36 @@ TEST_F(J2735MessageTest, EncodePersonalSafetyMessage){ std::cout << psmENC.get_payload_str()<(ss); + tsm4msg.set_contents(container.get_storage().get_tree()); + tsm4Enc.encode_j2735_message(tsm4msg); + std::cout << tsm4Enc.get_payload_str()<(ss); + tsm5msg.set_contents(container.get_storage().get_tree()); + tsm5Enc.encode_j2735_message(tsm5msg); + std::cout << tsm5Enc.get_payload_str()< #include +#include using namespace std; using namespace tmx::messages; using namespace tmx::utils; @@ -95,15 +96,6 @@ void CARMACloudPlugin::HandleCARMARequest(tsm4Message &msg, routeable_message &r PLOG(logINFO) << "Sent TCR to cloud: "<< xml_str< lock(_not_ACK_TCMs_mutex); - if(_not_ACK_TCMs->erase(reqid) <= 0) - { - PLOG(logDEBUG) << "TCR request id =" << reqid << " Not Found in TCM map." << std::endl; - }else{ - PLOG(logDEBUG) << "TCR request id =" << reqid << " Found in TCM map. Remove the existing TCMs with the same TCR request id." << std::endl; - } } void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg){ @@ -187,7 +179,7 @@ void CARMACloudPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeabl << "" << even_log_description << ""; PLOG(logINFO) << "Sent Negative ACK: "<< sss.str() <" << _TCMNOAcknowledgementDescription << ""; PLOG(logINFO) << "Sent No ACK as Time Out: "<< sss.str() <erase(tcmv01_req_id_hex); //If time out, stop tracking the starting time of the TCMs being broadcast so far @@ -472,31 +464,37 @@ void CARMACloudPlugin::OnStateChange(IvpPluginState state) { } } - -int CARMACloudPlugin::CloudSend(string msg,string url, string base, string method) +void CARMACloudPlugin::CloudSendAsync(const string& local_msg,const string& local_url, const string& local_base, const string& local_method) { - CURL *req; - CURLcode res; - string urlfull = url+base; - + std::thread t([this, &local_msg, &local_url, &local_base, &local_method](){ + CloudSend(local_msg, local_url, local_base, local_method); + }); + t.detach(); +} - req = curl_easy_init(); - if(req) { - curl_easy_setopt(req, CURLOPT_URL, urlfull.c_str()); +int CARMACloudPlugin::CloudSend(const string &local_msg, const string& local_url, const string& local_base, const string& local_method) +{ + CURL *req; + CURLcode res; + string urlfull = local_url+local_base; + req = curl_easy_init(); + if(req) { + curl_easy_setopt(req, CURLOPT_URL, urlfull.c_str()); - if(strcmp(method.c_str(),"POST")==0) + if(strcmp(local_method.c_str(),"POST")==0) { - curl_easy_setopt(req, CURLOPT_POSTFIELDS, msg.c_str()); - curl_easy_setopt(req, CURLOPT_TIMEOUT, 2L); // Sets a 2 second timeout + curl_easy_setopt(req, CURLOPT_POSTFIELDS, local_msg.c_str()); + curl_easy_setopt(req, CURLOPT_TIMEOUT_MS, 1000L); // Request operation complete within max millisecond timeout res = curl_easy_perform(req); - if(res != CURLE_OK) - { - fprintf(stderr, "curl send failed: %s\n",curl_easy_strerror(res)); - return 1; - } + if(res != CURLE_OK) + { + fprintf(stderr, "curl send failed: %s\n",curl_easy_strerror(res)); + return 1; + } } - curl_easy_cleanup(req); - } + curl_easy_cleanup(req); + } + return 0; } diff --git a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h index 493dc32e8..8876ade07 100644 --- a/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h +++ b/src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.h @@ -109,7 +109,9 @@ class CARMACloudPlugin: public PluginClient { int StartWebService(); void CARMAResponseHandler(QHttpEngine::Socket *socket); - int CloudSend(string msg,string url, string base, string method); + int CloudSend(const string& msg,const string& url, const string& base, const string& method); + //Send HTTP request async + void CloudSendAsync(const string& msg,const string& url, const string& base, const string& method); string updateTags(string s,string t, string t1); void HandleCARMARequest(tsm4Message &msg, routeable_message &routeableMsg); diff --git a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt index 998c81afa..51c7f34d2 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt +++ b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt @@ -2,4 +2,23 @@ PROJECT ( CARMAStreetsPlugin VERSION 5.0 LANGUAGES CXX ) BuildTmxPlugin ( ) -TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp) \ No newline at end of file +TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp) + +############# +## Testing ## +############# +enable_testing() +include_directories(${PROJECT_SOURCE_DIR}/src) +add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES} + ${ASN_J2735_LIBRARIES} + ${MYSQL_LIBRARIES} + ${MYSQLCPPCONN_LIBRARIES} + tmxutils + ${UUID_LIBRARY}) +set(BINARY ${PROJECT_NAME}_test) +file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) +set(SOURCES ${TEST_SOURCES} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/test) +add_executable(${BINARY} ${TEST_SOURCES}) +add_test(NAME ${BINARY} COMMAND ${BINARY}) +target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest jsoncpp) \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json index f58c7f06a..b870105bd 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/manifest.json +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -31,6 +31,11 @@ "default": "v2xhub_bsm_in", "description": "Apache Kafka topic plugin will transmit message to." }, + { + "key": "transmitMapTopic", + "default": "v2xhub_map_msg_in", + "description": "Apache Kafka topic plugin will transmit message to." + }, { "key": "runKafkaConsumer", "default": "1", diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 8646b8480..8cefdd585 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -24,6 +24,7 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); + AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage); SubscribeToMessages(); @@ -38,6 +39,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { GetConfigValue("receiveTopic", _receiveTopic); GetConfigValue("transmitMobilityOperationTopic", _transmitMobilityOperationTopic); GetConfigValue("transmitMobilityPathTopic", _transmitMobilityPathTopic); + GetConfigValue("transmitMapTopic", _transmitMAPTopic); GetConfigValue("KafkaBrokerIp", _kafkaBrokerIp); GetConfigValue("KafkaBrokerPort", _kafkaBrokerPort); GetConfigValue("runKafkaConsumer", _run_kafka_consumer); @@ -119,7 +121,6 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea try { auto mobilityOperation = msg.get_j2735_data(); - bool retry = true; PLOG(logINFO) << "Body OperationParams : " << mobilityOperation->body.operationParams.buf << "\n" << "Body Strategy : " << mobilityOperation->body.strategy.buf<< "\n" <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic << " " @@ -166,34 +167,7 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea mobilityOperationJsonRoot["metadata"] = metadata; const std::string message = Json::writeString(builder, mobilityOperationJsonRoot); PLOG(logDEBUG) <<"MobilityOperation message:" << message <produce(_transmitMobilityOperationTopic, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast(message.c_str()), - message.size(), - NULL, NULL, 0, 0); - - if (produce_error == RdKafka::ERR_NO_ERROR) { - PLOG(logDEBUG) <<"Queued message:" << message; - retry = false; - } - else - { - PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); - if (produce_error == RdKafka::ERR__QUEUE_FULL) { - PLOG(logERROR) <<"Message queue full...retrying..."; - kafka_producer->poll(500); /* ms */ - retry = true; - } - else { - PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); - retry = false; - } - } - } + produce_kafka_msg(message, _transmitMobilityOperationTopic); } } catch (TmxException &ex) { @@ -277,23 +251,7 @@ void CARMAStreetsPlugin::HandleMobilityPathMessage(tsm2Message &msg, routeable_m mobilityPathJsonRoot["trajectory"] = trajectory; const std::string json_message = Json::writeString(builder, mobilityPathJsonRoot); PLOG(logDEBUG) <<"MobilityPath Json message:" << json_message; - RdKafka::ErrorCode produce_error = kafka_producer->produce( _transmitMobilityPathTopic, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, const_cast(json_message.c_str()), - json_message.size(), NULL, NULL, 0, 0 ); - - if (produce_error == RdKafka::ERR_NO_ERROR) - { - PLOG(logDEBUG) << "Queued message:" << json_message; - } - else - { - PLOG(logERROR) << "Failed to queue message:" << json_message <<" with error:" << RdKafka::err2str(produce_error); - if (produce_error == RdKafka::ERR__QUEUE_FULL) - { - PLOG(logERROR) << "MobilityPath producer Message queue is full."; - } - } + produce_kafka_msg(json_message, _transmitMobilityPathTopic); } catch (TmxException &ex) { @@ -307,7 +265,6 @@ void CARMAStreetsPlugin::HandleBasicSafetyMessage(BsmMessage &msg, routeable_mes try { auto bsm = msg.get_j2735_data(); - bool retry = true; Json::Value bsmJsonRoot; Json::Value coreData; @@ -445,39 +402,59 @@ void CARMAStreetsPlugin::HandleBasicSafetyMessage(BsmMessage &msg, routeable_mes coreData["size"] = size; bsmJsonRoot["core_data"] = coreData; const std::string message = Json::writeString(builder, bsmJsonRoot); + produce_kafka_msg(message, _transmitBSMTopic); + + } + catch (TmxException &ex) { + PLOG(logERROR) << "Failed to decode message : " << ex.what(); + } +} + +void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message &routeableMsg) +{ + std::shared_ptr mapMsgPtr = msg.get_j2735_data(); + PLOG(logDEBUG) << "Intersection count: " << mapMsgPtr->intersections->list.count <produce(topic_name, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(message.c_str()), + message.size(), + NULL, NULL, 0, 0); - while (retry) + if (produce_error == RdKafka::ERR_NO_ERROR) { + PLOG(logDEBUG) <<"Queued message:" << message; + retry = false; + } + else { - RdKafka::ErrorCode produce_error = kafka_producer->produce(_transmitBSMTopic, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast(message.c_str()), - message.size(), - NULL, NULL, 0, 0); - - if (produce_error == RdKafka::ERR_NO_ERROR) { - PLOG(logDEBUG) <<"Queued message:" << message; + PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) { + PLOG(logERROR) <<"Message queue full...retrying..."; + kafka_producer->poll(500); /* ms */ + retry = true; + } + else { + PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); retry = false; } - else - { - PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); - if (produce_error == RdKafka::ERR__QUEUE_FULL) { - PLOG(logERROR) <<"Message queue full...retrying..."; - kafka_producer->poll(500); /* ms */ - retry = true; - } - else { - PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); - retry = false; - } - } - } - } - catch (TmxException &ex) { - PLOG(logERROR) << "Failed to decode message : " << ex.what(); + } } } + void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { PluginClient::OnStateChange(state); diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 2a551415d..392d2cfa2 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -8,11 +8,12 @@ #include #include #include +#include #include "jsoncpp/json/json.h" #include #include #include - +#include "J2735MapToJsonConverter.h" @@ -40,8 +41,21 @@ class CARMAStreetsPlugin: public PluginClient { void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg); void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg); void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg); + /** + * @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message. + * @param msg The J2735 MAP message received from the internal + * @param routeableMsg + */ + void HandleMapMessage(MapDataMessage &msg, routeable_message &routeableMsg); void SubscribeKafkaTopics(); bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json); + /** + * @brief Produce message to a kafka topic + * @param msg Json format message to send to a topic + * @param topic_name The name of the topic + */ + void produce_kafka_msg(const string &msg, const string &topic_name) const; + private: std::string _receiveTopic; @@ -49,6 +63,7 @@ class CARMAStreetsPlugin: public PluginClient { std::string _subscribeToSchedulingPlanTopic = ""; std::string _transmitMobilityPathTopic; std::string _transmitBSMTopic; + std::string _transmitMAPTopic; std::string _kafkaBrokerIp; std::string _kafkaBrokerPort; RdKafka::Conf *kafka_conf; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.cpp new file mode 100644 index 000000000..917d30a25 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.cpp @@ -0,0 +1,278 @@ + + +#include "J2735MapToJsonConverter.h" + +namespace CARMAStreetsPlugin +{ + + void J2735MapToJsonConverter::convertJ2735MAPToMapJSON(const std::shared_ptr mapMsgPtr, Json::Value &mapJson) const + { + // Construct metadata + Json::Value metadata; + auto timestamp_utc = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + metadata["timestamp"] = std::to_string(timestamp_utc); + mapJson["metadata"] = metadata; + + // Construct Map Data + Json::Value mapDataJson; + mapDataJson["layer_id"] = std::to_string(*mapMsgPtr->layerID); + mapDataJson["msg_issue_revision"] = std::to_string(mapMsgPtr->msgIssueRevision); + if(mapMsgPtr->layerType != nullptr) + { + mapDataJson["layer_type"] = std::to_string(*mapMsgPtr->layerType); + } + + // Construct intersections + const IntersectionGeometryList *intersections = mapMsgPtr->intersections; + if(intersections != nullptr) + { + Json::Value intersectionsJson; + // Assume there is only one intersection geometry for each intersection + for (size_t i = 0; i < intersections->list.count; i++) + { + Json::Value intersectionJson; + if (intersections->list.array != nullptr) + { + auto intersection = intersections->list.array[i]; + intersectionJson["id"]["id"] = std::to_string(intersection->id.id); + intersectionJson["lane_width"] = std::to_string(*intersection->laneWidth); + intersectionJson["revision"] = std::to_string(intersection->revision); + intersectionJson["ref_point"]["lat"] = std::to_string(intersection->refPoint.lat); + intersectionJson["ref_point"]["long"] = std::to_string(intersection->refPoint.Long); + if(intersection->refPoint.elevation !=nullptr) + { + intersectionJson["ref_point"]["elevation"] = std::to_string(*intersection->refPoint.elevation); + } + + // Convert Laneset + Json::Value laneSetJson; + convertLanesetToJSON(intersection, laneSetJson); + intersectionJson["lane_set"] = laneSetJson; + } + intersectionsJson["intersection_geometry"] = intersectionJson; + } + mapDataJson["intersections"] = intersectionsJson; + } + mapJson["map_data"] = mapDataJson; + } + + void J2735MapToJsonConverter::convertLanesetToJSON(const IntersectionGeometry *intersection, Json::Value &laneSetJson) const + { + // Construct laneset + const auto& laneSet = intersection->laneSet; + if (laneSet.list.array != nullptr) + { + for (size_t i = 0; i < laneSet.list.count; i++) + { + std::stringstream ss; + Json::Value laneJson; + auto lane = laneSet.list.array[i]; + laneJson["lane_id"] = std::to_string(lane->laneID); + if (lane->ingressApproach != nullptr) + { + ss.str(""); + ss << *lane->ingressApproach; + laneJson["ingress_approach"] = ss.str(); + } + if (lane->egressApproach != nullptr) + { + ss.str(""); + ss << *lane->egressApproach; + laneJson["egressApproach"] = ss.str(); + } + + // Construct LaneAttributes + Json::Value LaneAttributesJson; + convertLaneAttributeToJSON(lane, LaneAttributesJson); + laneJson["lane_attributes"] = LaneAttributesJson; + + // Construct nodelist + Json::Value nodeList; + convertNodeListToJSON(lane, nodeList); + laneJson["node_list"]["nodes"] = nodeList; + + // Construct connects + Json::Value connectsJson; + if (lane->connectsTo != nullptr) + { + convertConnectsToJSON(lane, connectsJson); + laneJson["connects_to"] = connectsJson; + } + + laneSetJson.append(laneJson); + } + } + } + + void J2735MapToJsonConverter::convertLaneAttributeToJSON(const GenericLane *lane, Json::Value &LaneAttributesJson) const + { + std::stringstream ss; + ss.str(""); + auto bit_unused = lane->laneAttributes.directionalUse.bits_unused; + ss << lane->laneAttributes.directionalUse.buf; + if( lane->laneAttributes.directionalUse.size != 0 ) + { + auto binary = lane->laneAttributes.directionalUse.buf[0] >> bit_unused; + std::string binary_str = std::to_string(static_cast(binary / 2)) + std::to_string(static_cast(binary % 2)); + LaneAttributesJson["directional_use"] = binary_str; + } + ss.str(""); + ss << lane->laneAttributes.sharedWith.buf; + LaneAttributesJson["shared_with"] = ss.str(); + if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_vehicle) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.vehicle.buf; + LaneAttributesJson["lane_type"]["vehicle"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_crosswalk) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.crosswalk.buf; + LaneAttributesJson["lane_type"]["crosswalk"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_bikeLane) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.bikeLane.buf; + LaneAttributesJson["lane_type"]["bike_lane"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_median) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.median.buf; + LaneAttributesJson["lane_type"]["median"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_parking) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.parking.buf; + LaneAttributesJson["lane_type"]["parking"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_sidewalk) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.sidewalk.buf; + LaneAttributesJson["lane_type"]["sidewalk"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_striping) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.striping.buf; + LaneAttributesJson["lane_type"]["striping"] = ss.str(); + } + else if (lane->laneAttributes.laneType.present == LaneTypeAttributes_PR_trackedVehicle) + { + ss.str(""); + ss << lane->laneAttributes.laneType.choice.trackedVehicle.buf; + LaneAttributesJson["lane_type"]["tracked_vehicle"] = ss.str(); + } + else + { + ss.str(""); + LaneAttributesJson["lane_type"]["nothing"] = ss.str(); + } + } + + void J2735MapToJsonConverter::convertNodeListToJSON(const GenericLane *lane, Json::Value &nodeListJson) const + { + std::stringstream ss; + auto nodes = lane->nodeList.choice.nodes; + if (NodeListXY_PR_nodes == lane->nodeList.present) + { + for (size_t i = 0; i < nodes.list.count; i++) + { + Json::Value nodeJson; + auto node = nodes.list.array[i]; + if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY1) + { + ss.str(""); + ss << node->delta.choice.node_XY1.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY1.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + else if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY2) + { + ss.str(""); + ss << node->delta.choice.node_XY2.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY2.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + else if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY3) + { + ss.str(""); + ss << node->delta.choice.node_XY3.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY3.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + else if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY4) + { + ss.str(""); + ss << node->delta.choice.node_XY4.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY4.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + else if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY5) + { + ss.str(""); + ss << node->delta.choice.node_XY5.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY5.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + else if (node->delta.present == NodeOffsetPointXY_PR::NodeOffsetPointXY_PR_node_XY6) + { + ss.str(""); + ss << node->delta.choice.node_XY6.x; + nodeJson["delta"]["node-xy"]["x"] = ss.str(); + + ss.str(""); + ss << node->delta.choice.node_XY6.y; + nodeJson["delta"]["node-xy"]["y"] = ss.str(); + nodeListJson.append(nodeJson); + } + } + } + } + + void J2735MapToJsonConverter::convertConnectsToJSON(const GenericLane *lane, Json::Value &connectsJson) const + { + std::stringstream ss; + if (lane->connectsTo != nullptr) + { + for (size_t i = 0; i < lane->connectsTo->list.count; i++) + { + Json::Value connJson; + ss.str(""); + auto connect = lane->connectsTo->list.array[i]; + ss << connect->connectingLane.lane; + connJson["connecting_lane"]["lane"] = ss.str(); + + ss.str(""); + connect = lane->connectsTo->list.array[i]; + ss << *connect->signalGroup; + connJson["signal_group"] = ss.str(); + connectsJson.append(connJson); + } + } + } +} \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.h b/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.h new file mode 100644 index 000000000..21581200d --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/J2735MapToJsonConverter.h @@ -0,0 +1,51 @@ + +#include "jsoncpp/json/json.h" +#include +#include +#include +#include +#include +#include + +namespace CARMAStreetsPlugin +{ + class J2735MapToJsonConverter + { + public: + J2735MapToJsonConverter() = default; + ~J2735MapToJsonConverter() = default; + /** + * @brief Convert the J2735 MAPData into JSON format. + * @param mapMsgPtr The input is a constant J2735 message pointer. This prevent any modification to the original MAP message + * @param mapJson Pass by reference to allow the method to populate this object with MAPData. + */ + void convertJ2735MAPToMapJSON(const std::shared_ptr mapMsgPtr, Json::Value &mapJson) const; + + /** + * @brief Convert the J2735 IntersectionGeometry into JSON format. + * @param mapMsgPtr The input is a constant J2735 message pointer. This prevent any modification to the original IntersectionGeometry message + * @param mapJson Pass by reference to allow the method to populate this object with IntersectionGeometry. + */ + void convertLanesetToJSON(const IntersectionGeometry *intersection, Json::Value &laneSetJson) const; + /** + * @brief Convert the J2735 GenericLane into JSON format. + * @param mapMsgPtr The input is a constant J2735 message pointer. This prevent any modification to the original GenericLane message + * @param mapJson Pass by reference to allow the method to populate this object with GenericLane. + */ + void convertLaneAttributeToJSON(const GenericLane *lane, Json::Value &LaneAttributesJson) const; + + /** + * @brief Convert the J2735 GenericLane into JSON format. + * @param mapMsgPtr The input is a constant J2735 message pointer. This prevent any modification to the original GenericLane message + * @param mapJson Pass by reference to allow the method to populate this object with GenericLane. + */ + void convertNodeListToJSON(const GenericLane *lane, Json::Value &nodeList) const; + + /** + * @brief Convert the J2735 GenericLane into JSON format. + * @param mapMsgPtr The input is a constant J2735 message pointer. This prevent any modification to the original GenericLane message + * @param mapJson Pass by reference to allow the method to populate this object with GenericLane. + */ + void convertConnectsToJSON(const GenericLane *lane, Json::Value &nodeListJson) const; + }; +} \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/test/main.cpp b/src/v2i-hub/CARMAStreetsPlugin/test/main.cpp new file mode 100644 index 000000000..ba7cd2667 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/test/main.cpp @@ -0,0 +1,8 @@ + +#include + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/v2i-hub/CARMAStreetsPlugin/test/test_J2735MapToJsonConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/test/test_J2735MapToJsonConverter.cpp new file mode 100644 index 000000000..304d7f999 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/test/test_J2735MapToJsonConverter.cpp @@ -0,0 +1,52 @@ +#include +#include "jsoncpp/json/json.h" +#include "J2735MapToJsonConverter.h" + +class test_J2735MapToJsonConverter : public testing::Test +{ +public: + test_J2735MapToJsonConverter() = default; + ~test_J2735MapToJsonConverter() = default; +}; + +namespace unit_test +{ + TEST_F(test_J2735MapToJsonConverter, convertJ2735MAPToMapJSON) + { + CARMAStreetsPlugin::J2735MapToJsonConverter converter; + MapData *mapData = (MapData *)calloc(1, sizeof(MapData)); + LayerID_t layer_id = 1; + DSRC_MsgCount_t msgIssueRevision = 2; + mapData->layerID = &layer_id; + mapData->msgIssueRevision = msgIssueRevision; + auto layer_type = std::make_unique(1); + mapData->layerType = layer_type.get(); + std::shared_ptr mapMsgPtr(mapData); + Json::Value mapJson; + converter.convertJ2735MAPToMapJSON(mapMsgPtr, mapJson); + ASSERT_EQ("1", mapJson["map_data"]["layer_id"].asString()); + ASSERT_EQ("2", mapJson["map_data"]["msg_issue_revision"].asString()); + ASSERT_EQ("1", mapJson["map_data"]["layer_id"].asString()); + ASSERT_EQ(true, mapJson["map_data"]["intersections"].empty()); + + GenericLane *lane_ptr = (GenericLane *)calloc(1, sizeof(GenericLane)); + lane_ptr->laneID = 3; + GenericLane *lane_list_ptr = (GenericLane *)(calloc(1, sizeof(GenericLane))); + IntersectionGeometry *intersection = (IntersectionGeometry *)(calloc(1, sizeof(IntersectionGeometry))); + intersection->id.id = 1002; + intersection->refPoint.lat = 111; + intersection->refPoint.Long = 222; + LaneWidth_t lanewith = 12; + intersection->laneWidth = &lanewith; + asn_sequence_add(&intersection->laneSet, lane_ptr); + IntersectionGeometryList *intersection_list = (IntersectionGeometryList *)(calloc(1, sizeof(IntersectionGeometryList))); + asn_sequence_add(&intersection_list->list, intersection); + mapMsgPtr->intersections = intersection_list; + converter.convertJ2735MAPToMapJSON(mapMsgPtr, mapJson); + ASSERT_EQ(false, mapJson["map_data"]["intersections"]["intersection_geometry"]["lane_set"].empty()); + ASSERT_EQ("3", mapJson["map_data"]["intersections"]["intersection_geometry"]["lane_set"][0]["lane_id"].asString()); + ASSERT_EQ("111", mapJson["map_data"]["intersections"]["intersection_geometry"]["ref_point"]["lat"].asString()); + ASSERT_EQ("222", mapJson["map_data"]["intersections"]["intersection_geometry"]["ref_point"]["long"].asString()); + ASSERT_EQ("12", mapJson["map_data"]["intersections"]["intersection_geometry"]["lane_width"].asString()); + } +} \ No newline at end of file diff --git a/src/v2i-hub/PedestrianPlugin/src/FLIRWebSockAsyncClnSession.cpp b/src/v2i-hub/PedestrianPlugin/src/FLIRWebSockAsyncClnSession.cpp index afebbca1f..f86cf9bb3 100644 --- a/src/v2i-hub/PedestrianPlugin/src/FLIRWebSockAsyncClnSession.cpp +++ b/src/v2i-hub/PedestrianPlugin/src/FLIRWebSockAsyncClnSession.cpp @@ -278,6 +278,7 @@ namespace PedestrianPlugin psmxml = psm_xml_str; psmQueue.push(psmxml); + PLOG(logDEBUG) << "Sending PSM xml to BroadcastPsm: " << psmxml.c_str() < psmQueue; + std::mutex _psmLock; int msgCount = 0; public: @@ -139,6 +140,7 @@ namespace PedestrianPlugin */ std::queue getPSMQueue(); + /** * @brief Parses the datetime string that the camera returns into a vector containing each component * @@ -148,5 +150,6 @@ namespace PedestrianPlugin std::vector timeStringParser(std::string dateTimeStr) const; }; + };