diff --git a/.circleci/config.yml b/.circleci/config.yml index ca7e2700a..b679a2982 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -192,8 +192,6 @@ jobs: port_drayage_webservice_build_develop : docker: - image: cimg/openjdk:11.0.12 - # Set working directory - working_directory: "/home/V2X-Hub" steps: - setup_remote_docker - checkout @@ -214,7 +212,6 @@ jobs: port_drayage_webservice_build : docker: - image: cimg/openjdk:11.0.12 - # Set working directory steps: - setup_remote_docker - checkout diff --git a/.sonarqube/sonar-scanner.properties b/.sonarqube/sonar-scanner.properties index 9bbb4a3ab..31c5ab7ad 100644 --- a/.sonarqube/sonar-scanner.properties +++ b/.sonarqube/sonar-scanner.properties @@ -50,7 +50,8 @@ sonar.modules= PedestrianPlugin, \ PortDrayagePlugin, \ ODELoggerPlugin, \ DsrcImmediateForwardPlugin, \ - MessageReceiverPlugin + MessageReceiverPlugin, \ + CARMAStreetsPlugin @@ -76,6 +77,7 @@ RtcmPlugin.sonar.projectBaseDir =/home/V2X-Hub/src/v2i-hub/RtcmP SPaTLoggerPlugin.sonar.projectBaseDir =/home/V2X-Hub/src/v2i-hub/SPaTLoggerPlugin SpatPlugin.sonar.projectBaseDir =/home/V2X-Hub/src/v2i-hub/SpatPlugin TimPlugin.sonar.projectBaseDir =/home/V2X-Hub/src/v2i-hub/TimPlugin +CARMAStreetsPlugin.sonar.projectBaseDir =/home/V2X-Hub/src/v2i-hub/CARMAStreetsPlugin # C++ Package differences # Sources @@ -102,6 +104,7 @@ CARMACloudPlugin.sonar.sources =src CommandPlugin.sonar.sources =src ODELoggerPlugin.sonar.sources =src MobilityOperationPlugin.sonar.sources =src +CARMAStreetsPlugin.sonar.sources =src @@ -130,7 +133,8 @@ PreemptionPlugin.sonar.cfamily.gcov.reportsPath =coverage #CARMACloudPlugin.sonar.cfamily.gcov.reportsPath =coverage #MobilityOperationPlugin.sonar.cfamily.gcov.reportsPath =coverage #ODELoggerPlugin.sonar.cfamily.gcov.reportsPath =coverage -#CommandPlugin.cfamily.gcov.reportsPath =coverage +#CommandPlugin.sonar.cfamily.gcov.reportsPath =coverage +#CARMAStreetsPlugin.sonar.cfamily.gcov.reportsPath =coverage # Tests # Note: For C++ setting this field does not cause test analysis to occur. It only allows the test source code to be evaluated. @@ -157,3 +161,4 @@ PreemptionPlugin.sonar.tests=test #MobilityOperationPlugin.sonar.tests=test #ODELoggerPlugin.sonar.tests=test #CommandPlugin.sonar.tests=test +#CARMAStreetsPlugin.sonar.tests=test diff --git a/Dockerfile b/Dockerfile index e483c6e53..ef13964c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -110,6 +110,8 @@ RUN ln -s ../bin PortDrayagePlugin/bin RUN zip PortDrayagePlugin.zip PortDrayagePlugin/bin/PortDrayagePlugin PortDrayagePlugin/manifest.json RUN ln -s ../bin ODELoggerPlugin/bin RUN zip ODELoggerPlugin.zip ODELoggerPlugin/bin/ODELoggerPlugin ODELoggerPlugin/manifest.json +RUN ln -s ../bin CARMAStreetsPlugin/bin +RUN zip CARMAStreetsPlugin.zip CARMAStreetsPlugin/bin/CARMAStreetsPlugin CARMAStreetsPlugin/manifest.json WORKDIR /home/V2X-Hub/src/tmx/TmxCore/ diff --git a/configuration/amd64/docker-compose.yml b/configuration/amd64/docker-compose.yml index 97a569b24..663559d36 100755 --- a/configuration/amd64/docker-compose.yml +++ b/configuration/amd64/docker-compose.yml @@ -24,6 +24,7 @@ services: network_mode: host depends_on: - db + - v2xhub stdin_open: true tty: true @@ -33,7 +34,7 @@ services: network_mode: host restart: always depends_on: - - php + - db environment: - MYSQL_PASSWORD=/run/secrets/mysql_password secrets: diff --git a/configuration/arm64/docker-compose.yml b/configuration/arm64/docker-compose.yml index c2132ec3c..5f9a6fbe5 100644 --- a/configuration/arm64/docker-compose.yml +++ b/configuration/arm64/docker-compose.yml @@ -24,6 +24,7 @@ services: network_mode: host depends_on: - db + - v2xhub stdin_open: true tty: true @@ -33,7 +34,7 @@ services: network_mode: host restart: always depends_on: - - php + - db environment: - MYSQL_PASSWORD=/run/secrets/mysql_password secrets: diff --git a/container/service.sh b/container/service.sh index ad8da583b..a9288378c 100755 --- a/container/service.sh +++ b/container/service.sh @@ -19,6 +19,7 @@ tmxctl --plugin-install MessageLoggerPlugin.zip tmxctl --plugin-install PedestrianPlugin.zip tmxctl --plugin-install TimPlugin.zip tmxctl --plugin-install CARMACloudPlugin.zip +tmxctl --plugin-install CARMAStreetsPlugin.zip tmxctl --plugin-install ODELoggerPlugin.zip tmxctl --plugin-install PortDrayagePlugin.zip diff --git a/docker/Dockerfile-depl b/docker/Dockerfile-depl index 6be0a8170..ca4a2e91f 100644 --- a/docker/Dockerfile-depl +++ b/docker/Dockerfile-depl @@ -2,7 +2,7 @@ FROM ubuntu:bionic-20190807 AS install_dependencies RUN apt-get update && apt-get install -y cmake git build-essential libgtest-dev libssl-dev qtbase5-dev \ zip libmysqlcppconn-dev libboost1.65-all-dev libmysqlclient-dev uuid-dev libxerces-c-dev qtbase5-dev \ - libcurl4-openssl-dev libgps-dev libsnmp-dev librdkafka-dev libev-dev libuv-dev libcpprest-dev + libcurl4-openssl-dev libgps-dev libsnmp-dev librdkafka-dev libjsoncpp-dev libev-dev libuv-dev libcpprest-dev # Build and install GTest WORKDIR cd /usr/src/googletest/googletest RUN mkdir ~/build @@ -118,6 +118,8 @@ RUN zip PortDrayagePlugin.zip PortDrayagePlugin/bin/PortDrayagePlugin PortDrayag RUN ln -s ../bin ODELoggerPlugin/bin RUN zip ODELoggerPlugin.zip ODELoggerPlugin/bin/ODELoggerPlugin ODELoggerPlugin/manifest.json +RUN ln -s ../bin CARMAStreetsPlugin/bin +RUN zip CARMAStreetsPlugin.zip CARMAStreetsPlugin/bin/CARMAStreetsPlugin CARMAStreetsPlugin/manifest.json WORKDIR /home/V2X-Hub/src/tmx/TmxCore/ RUN cp tmxcore.service /lib/systemd/system/ && cp tmxcore.service /usr/sbin/ diff --git a/src/tmx/Asn_J2735/include/asn_j2735_r63/TestMessage02.h b/src/tmx/Asn_J2735/include/asn_j2735_r63/TestMessage02.h index b45458ddd..1010456d8 100644 --- a/src/tmx/Asn_J2735/include/asn_j2735_r63/TestMessage02.h +++ b/src/tmx/Asn_J2735/include/asn_j2735_r63/TestMessage02.h @@ -20,14 +20,11 @@ extern "C" { #endif -/* Forward declarations */ -struct Header; -struct Reg_BasicSafetyMessage; /* TestMessage02 */ typedef struct TestMessage02 { - struct Header *header; /* OPTIONAL */ - struct Reg_BasicSafetyMessage *regional; /* OPTIONAL */ + MobilityHeader_t header; /* OPTIONAL */ + MobilityPath_t body; /* OPTIONAL */ /* * This type is extensible, * possible extensions are below. @@ -46,9 +43,6 @@ extern asn_TYPE_member_t asn_MBR_TestMessage02_1[2]; } #endif -/* Referred external types */ -#include "Header.h" -#include "RegionalExtension.h" #endif /* _TestMessage02_H_ */ #include "asn_internal.h" diff --git a/src/tmx/Asn_J2735/src/r63/TestMessage02.c b/src/tmx/Asn_J2735/src/r63/TestMessage02.c index 49eb03155..3912b666c 100644 --- a/src/tmx/Asn_J2735/src/r63/TestMessage02.c +++ b/src/tmx/Asn_J2735/src/r63/TestMessage02.c @@ -8,40 +8,38 @@ #include "TestMessage02.h" asn_TYPE_member_t asn_MBR_TestMessage02_1[] = { - { ATF_POINTER, 2, offsetof(struct TestMessage02, header), + { ATF_NOFLAGS, 0, offsetof(struct TestMessage02, header), (ASN_TAG_CLASS_CONTEXT | (0 << 2)), -1, /* IMPLICIT tag at current level */ - &asn_DEF_Header, + &asn_DEF_MobilityHeader, 0, { 0, 0, 0 }, 0, 0, /* No default value */ "header" }, - { ATF_POINTER, 1, offsetof(struct TestMessage02, regional), + { ATF_NOFLAGS, 0, offsetof(struct TestMessage02, body), (ASN_TAG_CLASS_CONTEXT | (1 << 2)), -1, /* IMPLICIT tag at current level */ - &asn_DEF_Reg_BasicSafetyMessage, + &asn_DEF_MobilityPath, 0, { 0, 0, 0 }, 0, 0, /* No default value */ - "regional" + "body" }, }; -static const int asn_MAP_TestMessage02_oms_1[] = { 0, 1 }; static const ber_tlv_tag_t asn_DEF_TestMessage02_tags_1[] = { (ASN_TAG_CLASS_UNIVERSAL | (16 << 2)) }; static const asn_TYPE_tag2member_t asn_MAP_TestMessage02_tag2el_1[] = { { (ASN_TAG_CLASS_CONTEXT | (0 << 2)), 0, 0, 0 }, /* header */ - { (ASN_TAG_CLASS_CONTEXT | (1 << 2)), 1, 0, 0 } /* regional */ + { (ASN_TAG_CLASS_CONTEXT | (1 << 2)), 1, 0, 0 } /* body */ }; asn_SEQUENCE_specifics_t asn_SPC_TestMessage02_specs_1 = { sizeof(struct TestMessage02), offsetof(struct TestMessage02, _asn_ctx), asn_MAP_TestMessage02_tag2el_1, 2, /* Count of tags in the map */ - asn_MAP_TestMessage02_oms_1, /* Optional members */ - 2, 0, /* Root/Additions */ + 0, 0, 0, /* Optional elements (not needed) */ 2, /* First extension addition */ }; asn_TYPE_descriptor_t asn_DEF_TestMessage02 = { diff --git a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt new file mode 100644 index 000000000..998c81afa --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt @@ -0,0 +1,5 @@ +PROJECT ( CARMAStreetsPlugin VERSION 5.0 LANGUAGES CXX ) + +BuildTmxPlugin ( ) + +TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp) \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json new file mode 100644 index 000000000..f58c7f06a --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -0,0 +1,75 @@ +{ + "name": "CARMAStreetsPlugin", + "description": "Plugin to communicate with CARMA Streets via Apache Kafka", + "version": "@PROJECT_VERSION@", + "exeLocation": "/bin/CARMAStreetsPlugin", + "coreIpAddr": "127.0.0.1", + "corePort": 24601, + "messageTypes": [{ + "type": "J2735", + "subtype": "TMSG03-P", + "description": "Mobility Operation Message" + }, + { + "type": "J2735", + "subtype": "TMSG02-P", + "description": "Mobility Path Message" + }, + { + "type": "J2735", + "subtype": "BSM", + "description": "Basic Safety Message" + }], + "configuration": [ + { + "key": "transmitMobilityOperationTopic", + "default": "v2xhub_mobility_operation_in", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "transmitBSMTopic", + "default": "v2xhub_bsm_in", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "runKafkaConsumer", + "default": "1", + "description": "indicator for consuming kafka messages." + }, + { + "key": "subscribeToSchedulingPlanTopic", + "default": "v2xhub_scheduling_plan_sub", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "transmitMobilityPathTopic", + "default": "v2xhub_mobility_path_in", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "intersectionType", + "default": "Carma/stop_controlled_intersection", + "description": "The type of intersection" + }, + { + "key": "KafkaBrokerIp", + "default": "127.0.0.1", + "description": "IP of Apache Kafka broker." + }, + { + "key": "KafkaBrokerPort", + "default": "9092", + "description": "Port of Apache Kafka broker." + }, + { + "key": "MobilityOperationStrategies", + "default": "Carma/stop_controlled_intersection", + "description": "A comma separated list of strategies of MobilityOperation messages to send to CARMA Streets" + }, + { + "key": "LogLevel", + "default": "INFO", + "description": "The log level for this plugin" + } + ] +} \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp new file mode 100644 index 000000000..8646b8480 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -0,0 +1,697 @@ +//============================================================================ +// Name : CARMAStreetsPlugin.cpp +// Author : Paul Bourelly +// Version : 5.0 +// Copyright : Your copyright notice +// Description : Hello World in C++, Ansi-style +//============================================================================ + +#include "CARMAStreetsPlugin.h" + + +namespace CARMAStreetsPlugin { + + + +/** + * Construct a new CARMAStreetsPlugin with the given name. + * + * @param name The name to give the plugin for identification purposes + */ +CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : + PluginClient(name) { + + AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); + AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); + AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); + + SubscribeToMessages(); + +} + +CARMAStreetsPlugin::~CARMAStreetsPlugin() { +} + +void CARMAStreetsPlugin::UpdateConfigSettings() { + + lock_guard lock(_cfgLock); + GetConfigValue("receiveTopic", _receiveTopic); + GetConfigValue("transmitMobilityOperationTopic", _transmitMobilityOperationTopic); + GetConfigValue("transmitMobilityPathTopic", _transmitMobilityPathTopic); + GetConfigValue("KafkaBrokerIp", _kafkaBrokerIp); + GetConfigValue("KafkaBrokerPort", _kafkaBrokerPort); + GetConfigValue("runKafkaConsumer", _run_kafka_consumer); + GetConfigValue("subscribeToSchedulingPlanTopic", _subscribeToSchedulingPlanTopic); + GetConfigValue("transmitBSMTopic", _transmitBSMTopic); + GetConfigValue("intersectionType", _intersectionType); + // Populate strategies config + string config; + GetConfigValue("MobilityOperationStrategies", config); + std::stringstream ss(config); + _strategies.clear(); + while( ss.good() ) { + std::string substring; + getline( ss, substring, ','); + _strategies.push_back( substring); + } + std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; + std::string error_string; + kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; + kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + kafka_conf_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + + PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; + if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { + PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; + exit(1); + } else { + PLOG(logDEBUG) <<"Kafka config options set successfully"; + } + + kafka_producer = RdKafka::Producer::create(kafka_conf, error_string); + if (!kafka_producer) { + PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; + exit(1); + } + PLOG(logDEBUG) <<"Kafka producer created"; + + if (kafka_conf_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK || (kafka_conf_consumer->set("group.id", "streets_group", error_string) != RdKafka::Conf::CONF_OK)) { + PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; + exit(1); + } else { + PLOG(logDEBUG) <<"Kafka config group.id options set successfully"; + } + kafka_conf_consumer->set("enable.partition.eof", "true", error_string); + + kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_consumer, error_string); + if ( !kafka_consumer ) { + PLOG(logERROR) << "Failed to create Kafka consumer: " << error_string << std::endl; + exit(1); + } + PLOG(logDEBUG) << "Created consumer " << kafka_consumer->name() << std::endl; + + //create kafka topic + RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + if(!tconf) + { + PLOG(logERROR) << "RDKafka create topic conf failed "; + return; + } + + _topic = RdKafka::Topic::create(kafka_consumer,_subscribeToSchedulingPlanTopic,tconf,error_string); + if(!_topic) + { + PLOG(logERROR) << "RDKafka create topic failed:" << error_string; + return ; + } + + delete tconf; + + boost::thread thr(&CARMAStreetsPlugin::SubscribeKafkaTopics, this); +} + +void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); +} + +void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) { + try + { + auto mobilityOperation = msg.get_j2735_data(); + bool retry = true; + PLOG(logINFO) << "Body OperationParams : " << mobilityOperation->body.operationParams.buf << "\n" + << "Body Strategy : " << mobilityOperation->body.strategy.buf<< "\n" + <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic << " " + << kafka_producer->outq_len() <<"messages already in queue"; + + std::stringstream strat; + std::stringstream payload; + + ptree pr; + strat << mobilityOperation->body.strategy.buf; + payload << mobilityOperation->body.operationParams.buf; + std::string strategy_params; + std::string strategy = strat.str(); + if ( std::find( _strategies.begin(), _strategies.end(), strategy) != _strategies.end() ) + { + strategy_params = payload.str(); + Json::Value mobilityOperationJsonRoot; + Json::StreamWriterBuilder builder; + + Json::Value metadata; + std::stringstream hostStaticId; + + hostStaticId << mobilityOperation->header.hostStaticId.buf; + metadata["hostStaticId"] = hostStaticId.str(); + + std::stringstream targetStaticId; + targetStaticId << mobilityOperation->header.targetStaticId.buf; + metadata["targetStaticId"] = targetStaticId.str(); + + std::stringstream hostBSMId; + hostBSMId << mobilityOperation->header.hostBSMId.buf; + metadata["hostBSMId"] =hostBSMId.str(); + + std::stringstream planId; + planId << mobilityOperation->header.planId.buf; + metadata["planId"] = planId.str(); + + std::stringstream timestamp; + timestamp << mobilityOperation->header.timestamp.buf; + metadata["timestamp"] = timestamp.str(); + + mobilityOperationJsonRoot["strategy"] = strategy; + mobilityOperationJsonRoot["strategy_params"] = strategy_params; + mobilityOperationJsonRoot["metadata"] = metadata; + const std::string message = Json::writeString(builder, mobilityOperationJsonRoot); + PLOG(logDEBUG) <<"MobilityOperation message:" << message <produce(_transmitMobilityOperationTopic, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(message.c_str()), + message.size(), + NULL, NULL, 0, 0); + + if (produce_error == RdKafka::ERR_NO_ERROR) { + PLOG(logDEBUG) <<"Queued message:" << message; + retry = false; + } + else + { + PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) { + PLOG(logERROR) <<"Message queue full...retrying..."; + kafka_producer->poll(500); /* ms */ + retry = true; + } + else { + PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); + retry = false; + } + } + } + } + } + catch (TmxException &ex) { + PLOG(logERROR) << "Failed to decode message : " << ex.what(); + } + + +} + +void CARMAStreetsPlugin::HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg ) +{ + try + { + auto mobilityPathMsg = msg.get_j2735_data(); + + Json::Value mobilityPathJsonRoot; + Json::StreamWriterBuilder builder; + + Json::Value metadata; + std::stringstream hostStaticId; + + hostStaticId << mobilityPathMsg->header.hostStaticId.buf; + metadata["hostStaticId"] = hostStaticId.str(); + + std::stringstream targetStaticId; + targetStaticId << mobilityPathMsg->header.targetStaticId.buf; + metadata["targetStaticId"] = targetStaticId.str(); + + std::stringstream hostBSMId; + hostBSMId << mobilityPathMsg->header.hostBSMId.buf; + metadata["hostBSMId"] =hostBSMId.str(); + + std::stringstream planId; + planId << mobilityPathMsg->header.planId.buf; + metadata["planId"] = planId.str(); + + std::stringstream timestamp; + timestamp << mobilityPathMsg->header.timestamp.buf; + metadata["timestamp"] = timestamp.str(); + + Json::Value location; + Json::Value trajectory; + + std::stringstream location_ecefX; + location_ecefX << mobilityPathMsg->body.location.ecefX; + location["ecefX"] = std::stoi(location_ecefX.str()); + + std::stringstream location_ecefY; + location_ecefY << mobilityPathMsg->body.location.ecefY; + location["ecefY"] = std::stoi(location_ecefY.str()); + + std::stringstream location_ecefZ; + location_ecefZ << mobilityPathMsg->body.location.ecefZ; + location["ecefZ"] = std::stoi(location_ecefZ.str()); + + std::stringstream location_timestamp; + location_timestamp << mobilityPathMsg->body.location.timestamp.buf; + location["timestamp"] = location_timestamp.str(); + + for(int i=0; i < mobilityPathMsg->body.trajectory.list.count; i++) + { + Json::Value offset; + std::stringstream trajectory_offsetX; + trajectory_offsetX<body.trajectory.list.array[i]->offsetX; + offset["offsetX"] = std::stoi(trajectory_offsetX.str()); + + + std::stringstream trajectory_offsetY; + trajectory_offsetY<body.trajectory.list.array[i]->offsetY; + offset["offsetY"] = std::stoi(trajectory_offsetY.str()); + + std::stringstream trajectory_offsetZ; + trajectory_offsetZ<body.trajectory.list.array[i]->offsetZ; + offset["offsetZ"] = std::stoi(trajectory_offsetZ.str()); + + trajectory["offsets"].append(offset); + } + + trajectory["location"] = location; + mobilityPathJsonRoot["metadata"] = metadata; + mobilityPathJsonRoot["trajectory"] = trajectory; + const std::string json_message = Json::writeString(builder, mobilityPathJsonRoot); + PLOG(logDEBUG) <<"MobilityPath Json message:" << json_message; + RdKafka::ErrorCode produce_error = kafka_producer->produce( _transmitMobilityPathTopic, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, const_cast(json_message.c_str()), + json_message.size(), NULL, NULL, 0, 0 ); + + if (produce_error == RdKafka::ERR_NO_ERROR) + { + PLOG(logDEBUG) << "Queued message:" << json_message; + } + else + { + PLOG(logERROR) << "Failed to queue message:" << json_message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) + { + PLOG(logERROR) << "MobilityPath producer Message queue is full."; + } + } + } + catch (TmxException &ex) + { + PLOG(logERROR) << "Failed to decode message : " << ex.what(); + + } +} + +void CARMAStreetsPlugin::HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg) +{ + try + { + auto bsm = msg.get_j2735_data(); + bool retry = true; + + Json::Value bsmJsonRoot; + Json::Value coreData; + Json::Value size; + Json::StreamWriterBuilder builder; + + std::stringstream msgCnt; + msgCnt << bsm->coreData.msgCnt; + coreData["msg_count"] = msgCnt.str(); + + std::stringstream length; + length << bsm->coreData.size.length; + size["length"] = length.str(); + + std::stringstream width; + width << bsm->coreData.size.width; + size["width"] = width.str(); + + std::stringstream lat; + lat << bsm->coreData.lat; + coreData["lat"] = lat.str(); + + std::stringstream Long; + Long << bsm->coreData.Long; + coreData["long"] = Long.str(); + + std::stringstream elev; + elev << bsm->coreData.elev; + coreData["elev"] = elev.str(); + + std::stringstream speed; + speed << bsm->coreData.speed; + coreData["speed"] = speed.str(); + + std::stringstream secMark; + secMark << bsm->coreData.secMark; + coreData["sec_mark"] = secMark.str(); + + auto id_len = bsm->coreData.id.size; + std::stringstream id_ss; + for(auto i = 0; i < id_len; i++) + { + id_ss<(bsm->coreData.id.buf[i]); + } + std::stringstream id_fill_ss; + id_fill_ss << std::setfill('0') << std::setw(8) << id_ss.str(); + coreData["id"] = id_fill_ss.str(); + + Json::Value accuracy; + + std::stringstream orientation; + orientation << bsm->coreData.accuracy.orientation; + accuracy["orientation"] = orientation.str(); + + std::stringstream semiMajor; + semiMajor << bsm->coreData.accuracy.semiMajor; + accuracy["semi_major"] = semiMajor.str(); + + std::stringstream semiMinor; + semiMinor << bsm->coreData.accuracy.semiMinor; + accuracy["semi_minor"] = semiMinor.str(); + + std::stringstream angle; + angle << bsm->coreData.angle; + coreData["angle"] = angle.str(); + + std::stringstream heading; + heading << bsm->coreData.heading; + coreData["heading"] = heading.str(); + + Json::Value accel_set; + + std::stringstream accelSet_lat; + accelSet_lat << bsm->coreData.accelSet.lat; + accel_set["lat"] = accelSet_lat.str(); + + std::stringstream accelSet_long; + accelSet_long << bsm->coreData.accelSet.Long; + accel_set["long"] = accelSet_long.str(); + + std::stringstream accelSet_vert; + accelSet_vert << bsm->coreData.accelSet.vert; + accel_set["vert"] = accelSet_vert.str(); + + std::stringstream accelSet_yaw; + accelSet_yaw << bsm->coreData.accelSet.yaw; + accel_set["yaw"] = accelSet_yaw.str(); + + std::stringstream transmission; + transmission << bsm->coreData.transmission; + coreData["transmission"] = transmission.str(); + + Json::Value brakes; + + std::stringstream abs; + abs << bsm->coreData.brakes.abs; + brakes["abs"] = abs.str(); + + std::stringstream auxBrakes; + auxBrakes << bsm->coreData.brakes.auxBrakes; + brakes["aux_brakes"] = auxBrakes.str(); + + std::stringstream brake_boost; + brake_boost << bsm->coreData.brakes.brakeBoost; + brakes["brake_boost"] = brake_boost.str(); + + std::stringstream scs; + scs << bsm->coreData.brakes.scs; + brakes["scs"] = scs.str(); + + std::stringstream traction; + traction << bsm->coreData.brakes.traction; + brakes["traction"] = traction.str(); + + uint8_t binary = bsm->coreData.brakes.wheelBrakes.buf[0] >> 3; + unsigned int brake_applied_status_type = 4; + // e.g. shift the binary right until it equals to 1 (0b00000001) to determine the location of the non-zero bit + for (int i = 0; i < 4; i ++) + { + if ((int)binary == 1) + { + brakes["wheel_brakes"] = brake_applied_status_type; + break; + } + else + { + brake_applied_status_type -= 1; + binary = binary >> 1; + } + } + + coreData["accel_set"] = accel_set; + coreData["brakes"] = brakes; + coreData["accuracy"] = accuracy; + coreData["size"] = size; + bsmJsonRoot["core_data"] = coreData; + const std::string message = Json::writeString(builder, bsmJsonRoot); + + while (retry) + { + RdKafka::ErrorCode produce_error = kafka_producer->produce(_transmitBSMTopic, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(message.c_str()), + message.size(), + NULL, NULL, 0, 0); + + if (produce_error == RdKafka::ERR_NO_ERROR) { + PLOG(logDEBUG) <<"Queued message:" << message; + retry = false; + } + else + { + PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) { + PLOG(logERROR) <<"Message queue full...retrying..."; + kafka_producer->poll(500); /* ms */ + retry = true; + } + else { + PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); + retry = false; + } + } + } + } + catch (TmxException &ex) { + PLOG(logERROR) << "Failed to decode message : " << ex.what(); + } +} +void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { + PluginClient::OnStateChange(state); + + if (state == IvpPluginState_registered) { + UpdateConfigSettings(); + } +} + +void CARMAStreetsPlugin::SubscribeKafkaTopics() +{ + if(_subscribeToSchedulingPlanTopic.length() > 0) + { + PLOG(logDEBUG) << "SubscribeKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl; + std::vector topics; + topics.push_back(std::string(_subscribeToSchedulingPlanTopic)); + + RdKafka::ErrorCode err = kafka_consumer->subscribe(topics); + if (err) + { + PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; + return; + } + + while (_run_kafka_consumer) + { + RdKafka::Message *msg = kafka_consumer->consume( 500 ); + if( msg->err() == RdKafka::ERR_NO_ERROR ) + { + const char * payload_str = static_cast( msg->payload() ); + if(msg->len() > 0) + { + PLOG(logDEBUG) << "consumed message payload: " << payload_str <( tsm3EncodedMsgs )); + } + } + //Empty payload + if(payload_json_array.empty()) + { + Json::Value payload_json = {}; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) + { + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata( 172, 0xBFEE ); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); + } + } + } + } + delete msg; + } + + } +} + +bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json ) +{ + try + { + std::lock_guard lock(data_lock); + TestMessage03* mobilityOperation = (TestMessage03 *) calloc(1, sizeof(TestMessage03)); + std::string sender_id = "UNSET"; + std::string recipient_id_str = payload_json != Json::nullValue && payload_json.isMember("v_id") ? payload_json["v_id"].asString(): "UNSET"; + std::string sender_bsm_id_str = "00000000"; + std::string plan_id_str = "00000000-0000-0000-0000-000000000000"; + std::string strategy_str = _intersectionType; + + std::string strategy_params_str = "null"; + if( payload_json != Json::nullValue && !payload_json.empty()) + { + strategy_params_str = "st:" + (payload_json.isMember("st") ? std::to_string(payload_json["st"].asUInt64()) : "0") + + ",et:" + (payload_json.isMember("et") ? std::to_string(payload_json["et"].asUInt64()) : "0") + + ",dt:" + (payload_json.isMember("dt") ? std::to_string(payload_json["dt"].asUInt64()) : "0") + + ",dp:" + (payload_json.isMember("dp") ? std::to_string(payload_json["dp"].asUInt64()) : "0") + + ",access:" + (payload_json.isMember("dp") ? std::to_string(payload_json["access"].asUInt64()): "0"); + } + + + std::string timestamp_str = (metadata.isMember("timestamp") ? std::to_string(metadata["timestamp"].asUInt64()) : "0"); + + //content host id + size_t string_size = sender_id.size(); + uint8_t string_content_hostId[string_size]; + for(size_t i=0; i< string_size; i++) + { + string_content_hostId[i] = sender_id[i]; + } + mobilityOperation->header.hostStaticId.buf = string_content_hostId; + mobilityOperation->header.hostStaticId.size = string_size; + + //recipient id + std::string recipient_id = recipient_id_str; + string_size = recipient_id.size(); + uint8_t string_content_targetId[string_size]; + for(size_t i=0; i< string_size; i++) + { + string_content_targetId[i] = recipient_id[i]; + } + mobilityOperation->header.targetStaticId.buf = string_content_targetId; + mobilityOperation->header.targetStaticId.size = string_size; + + //sender bsm id + std::string sender_bsm_id = sender_bsm_id_str; + string_size = sender_bsm_id.size(); + uint8_t string_content_BSMId[string_size]; + for(size_t i=0; i< string_size; i++) + { + string_content_BSMId[i] = sender_bsm_id[i]; + } + mobilityOperation->header.hostBSMId.buf = string_content_BSMId; + mobilityOperation->header.hostBSMId.size = string_size; + + //plan id + std::string plan_id = plan_id_str; + string_size = plan_id.size(); + uint8_t string_content_planId[string_size]; + for(size_t i=0; i< string_size; i++) + { + string_content_planId[i] = plan_id[i]; + } + mobilityOperation->header.planId.buf = string_content_planId; + mobilityOperation->header.planId.size = string_size; + + //get timestamp and convert to char array; + string_size = timestamp_str.size(); + size_t timestamp_size = 19; + uint8_t string_content_timestamp[timestamp_size]; + size_t offset = timestamp_size-string_size; + if(offset > 0) + { + timestamp_str = std::string(offset,'0').append(timestamp_str); + } + for(size_t i= 0; i< timestamp_size; i++) + { + string_content_timestamp[i] = timestamp_str[i]; + } + mobilityOperation->header.timestamp.buf = string_content_timestamp; + mobilityOperation->header.timestamp.size = timestamp_size; + + //convert strategy string to char array + std::string strategy = strategy_str; + string_size = strategy.size(); + uint8_t string_content_strategy[string_size]; + for(size_t i=0; i< string_size; i++) + { + string_content_strategy[i] = strategy[i]; + } + mobilityOperation->body.strategy.buf = string_content_strategy; + mobilityOperation->body.strategy.size = string_size; + + //convert parameters string to char array + std::string strategy_params = strategy_params_str; + string_size = strategy_params.size(); + uint8_t string_content_params[string_size]; + for(size_t i=0; i < string_size; i++) + { + string_content_params[i] = strategy_params[i]; + } + mobilityOperation->body.operationParams.buf = string_content_params; + mobilityOperation->body.operationParams.size = string_size; + + tmx::messages::tsm3Message* _tsm3Message = new tmx::messages::tsm3Message(mobilityOperation); + PLOG(logDEBUG) << *_tsm3Message; + tsm3EncodedMsg->initialize(*_tsm3Message); + free(mobilityOperation); + return true; + } + catch(...) + { + PLOG(logERROR) << "Failed to encoded MobilityOperation message" <state != IvpPluginState_error) { + + + + usleep(100000); //sleep for microseconds set from config. + } + + return (EXIT_SUCCESS); +} +} /* namespace */ + +int main(int argc, char *argv[]) { + return run_plugin < CARMAStreetsPlugin::CARMAStreetsPlugin > ("CARMAStreetsPlugin", argc, argv); +} + diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h new file mode 100644 index 000000000..2a551415d --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -0,0 +1,73 @@ +#ifndef SRC_CARMASTREETSPLUGIN_H_ +#define SRC_CARMASTREETSPLUGIN_H_ +#include "PluginClient.h" +#include +#include +#include +#include +#include +#include +#include +#include "jsoncpp/json/json.h" +#include +#include +#include + + + + +using namespace std; +using namespace tmx; +using namespace tmx::utils; +using namespace tmx::messages; +using namespace boost::property_tree; + +namespace CARMAStreetsPlugin { + +class CARMAStreetsPlugin: public PluginClient { +public: + CARMAStreetsPlugin(std::string); + virtual ~CARMAStreetsPlugin(); + int Main(); +protected: + + void UpdateConfigSettings(); + + // Virtual method overrides. + void OnConfigChanged(const char *key, const char *value); + + void OnStateChange(IvpPluginState state); + void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg); + void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg); + void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg); + void SubscribeKafkaTopics(); + bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json); + +private: + std::string _receiveTopic; + std::string _transmitMobilityOperationTopic; + std::string _subscribeToSchedulingPlanTopic = ""; + std::string _transmitMobilityPathTopic; + std::string _transmitBSMTopic; + std::string _kafkaBrokerIp; + std::string _kafkaBrokerPort; + RdKafka::Conf *kafka_conf; + RdKafka::Conf *kafka_conf_consumer; + RdKafka::Producer *kafka_producer; + RdKafka::KafkaConsumer *kafka_consumer; + RdKafka::Topic *_topic; + std::vector _strategies; + tmx::messages::tsm3Message *_tsm3Message{NULL}; + std::mutex data_lock; + + /*** + * Configurable indicator to run consumer and consume messages from kafka topics + * run the consumer if it equals = 1; otherwise = 0 + **/ + int _run_kafka_consumer = 0; + std::string _intersectionType = "NA"; +}; +std::mutex _cfgLock; + +} +#endif \ No newline at end of file diff --git a/src/v2i-hub/MessageReceiverPlugin/manifest.json b/src/v2i-hub/MessageReceiverPlugin/manifest.json index 5a24e679d..58669fbc3 100644 --- a/src/v2i-hub/MessageReceiverPlugin/manifest.json +++ b/src/v2i-hub/MessageReceiverPlugin/manifest.json @@ -45,7 +45,7 @@ }, { "key":"EnableVerification", - "default":"1", + "default":"0", "description":"If enabled, security HSM features are enabled and MessageReceiver Plugin will try to verify incoming message." }, {