diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json index e8e53cf0a..634421f62 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/manifest.json +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -54,7 +54,7 @@ }, { "key": "MobilityOperationStrategies", - "default": "Carma/stop_controlled_intersection", + "default": "Carma/stop_controlled_intersection,Carma/signalized_intersection", "description": "A comma separated list of strategies of MobilityOperation messages to send to CARMA Streets" }, { @@ -68,7 +68,7 @@ "description": "Apache Kafka topic plugin will transmit message to." }, { - "key": "BSMTopic", + "key": "BsmTopic", "default": "v2xhub_bsm_in", "description": "Apache Kafka topic plugin will transmit message to." }, @@ -83,7 +83,7 @@ "description": "Apache Kafka topic plugin will transmit message to." }, { - "key": "SchedulingPlanGroupId", + "key": "SchedulingPlanConsumerGroupId", "default": "v2xhub_scheduling_plan", "description": "Apache Kafka consumer group ID for scheduling plan consumer." }, diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 32c125603..30545e6a8 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -195,6 +195,8 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea } catch (TmxException &ex) { PLOG(logERROR) << "Failed to decode message : " << ex.what(); + SetStatus(Key_MobilityOperationMessageSkipped, ++_mobilityOperationMessageSkipped); + } @@ -279,6 +281,7 @@ void CARMAStreetsPlugin::HandleMobilityPathMessage(tsm2Message &msg, routeable_m catch (TmxException &ex) { PLOG(logERROR) << "Failed to decode message : " << ex.what(); + SetStatus(Key_MobilityPathMessageSkipped, ++_mobilityPathMessageSkipped); } } @@ -430,6 +433,8 @@ void CARMAStreetsPlugin::HandleBasicSafetyMessage(BsmMessage &msg, routeable_mes } catch (TmxException &ex) { PLOG(logERROR) << "Failed to decode message : " << ex.what(); + SetStatus(Key_BSMMessageSkipped, ++_bsmMessageSkipped); + } } @@ -516,6 +521,7 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() if( !parse_sucessful ) { PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl; + SetStatus(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); continue; } @@ -586,6 +592,7 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ if( !parse_sucessful ) { PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl; + SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); continue; } //Convert the SPAT JSON string into J2735 SPAT message and encode it. @@ -602,6 +609,8 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " << ex.what() << std::endl; ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); + SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); + continue; } @@ -731,9 +740,10 @@ bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Js free(mobilityOperation); return true; } - catch(...) + catch(const std::runtime_error &e ) { - PLOG(logERROR) << "Failed to encoded MobilityOperation message" <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); return false; } } diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 7b9613b9e..01856cbf4 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -91,11 +91,69 @@ class CARMAStreetsPlugin: public PluginClient { std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; + + /** + * @brief Status label for SPAT messages skipped due to errors. + */ + const char* Key_SPATMessageSkipped = "SPAT messages skipped due to errors."; + /** + * @brief Count for SPAT messages skipped due to errors. + */ + uint _spatMessageSkipped = 0; + + /** + * @brief Status label for Intersection Schedule messages skipped due to errors. + */ + const char* Key_ScheduleMessageSkipped = "Intersection Schedule messages skipped due to errors."; + + /** + * @brief Count for Intersection Schedule messages skipped due to errors. + */ + uint _scheduleMessageSkipped = 0; + + /** + * @brief Status label for MAP messages skipped due to errors. + */ + const char* Key_MAPMessageSkipped = "MAP messages skipped due to errors."; + + /** + * @brief Count for MAP messages skipped due to errors. + */ + uint _mapMessageSkipped = 0; + + /** + * @brief Status label for Mobility Operation messages skipped due to errors. + */ + const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors."; + + /** + * @brief Count for Mobility Operation messages skipped due to errors. + */ + uint _mobilityOperationMessageSkipped = 0; + + /** + * @brief Status label for Mobility Path messages skipped due to errors. + */ + const char* Key_MobilityPathMessageSkipped = "Mobility Path messages skipped due to errors."; + + /** + * @brief Count for Mobility Path messages skipped due to errors. + */ + uint _mobilityPathMessageSkipped = 0; + + /** + * @brief Status label for BSM messages skipped due to errors. + */ + const char* Key_BSMMessageSkipped = "BSM messages skipped due to errors."; + + /** + * @brief Count for BSM messages skipped due to errors. + */ + uint _bsmMessageSkipped = 0; - /*** - * Configurable indicator to run consumer and consume messages from kafka topics - * run the consumer if it equals = 1; otherwise = 0 - **/ + /** + * @brief Intersection Id for intersection + */ std::string _intersectionId = "UNSET"; }; std::mutex _cfgLock;