Skip to content

Commit

Permalink
Updated configuration settings (#441)
Browse files Browse the repository at this point in the history
* Updated configuration settings

* Added status labels for failing messages being skipped

* Fix log statement
  • Loading branch information
paulbourelly999 authored Dec 8, 2022
1 parent e4a1ad0 commit 9f6fe9b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
},
{
"key": "MobilityOperationStrategies",
"default": "Carma/stop_controlled_intersection",
"default": "Carma/stop_controlled_intersection,Carma/signalized_intersection",
"description": "A comma separated list of strategies of MobilityOperation messages to send to CARMA Streets"
},
{
Expand All @@ -68,7 +68,7 @@
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "BSMTopic",
"key": "BsmTopic",
"default": "v2xhub_bsm_in",
"description": "Apache Kafka topic plugin will transmit message to."
},
Expand All @@ -83,7 +83,7 @@
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SchedulingPlanGroupId",
"key": "SchedulingPlanConsumerGroupId",
"default": "v2xhub_scheduling_plan",
"description": "Apache Kafka consumer group ID for scheduling plan consumer."
},
Expand Down
14 changes: 12 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea
}
catch (TmxException &ex) {
PLOG(logERROR) << "Failed to decode message : " << ex.what();
SetStatus<uint>(Key_MobilityOperationMessageSkipped, ++_mobilityOperationMessageSkipped);

}


Expand Down Expand Up @@ -279,6 +281,7 @@ void CARMAStreetsPlugin::HandleMobilityPathMessage(tsm2Message &msg, routeable_m
catch (TmxException &ex)
{
PLOG(logERROR) << "Failed to decode message : " << ex.what();
SetStatus<uint>(Key_MobilityPathMessageSkipped, ++_mobilityPathMessageSkipped);

}
}
Expand Down Expand Up @@ -430,6 +433,8 @@ void CARMAStreetsPlugin::HandleBasicSafetyMessage(BsmMessage &msg, routeable_mes
}
catch (TmxException &ex) {
PLOG(logERROR) << "Failed to decode message : " << ex.what();
SetStatus<uint>(Key_BSMMessageSkipped, ++_bsmMessageSkipped);

}
}

Expand Down Expand Up @@ -516,6 +521,7 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped);
continue;
}

Expand Down Expand Up @@ -586,6 +592,7 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);
continue;
}
//Convert the SPAT JSON string into J2735 SPAT message and encode it.
Expand All @@ -602,6 +609,8 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get());
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);

continue;
}

Expand Down Expand Up @@ -731,9 +740,10 @@ bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Js
free(mobilityOperation);
return true;
}
catch(...)
catch(const std::runtime_error &e )
{
PLOG(logERROR) << "Failed to encoded MobilityOperation message" <<std::endl;
PLOG(logERROR) << "Failed to encoded Intersection Schedule into MobilityOperation message: " << e.what() <<std::endl;
SetStatus<uint>(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped);
return false;
}
}
Expand Down
66 changes: 62 additions & 4 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,69 @@ class CARMAStreetsPlugin: public PluginClient {
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;

/**
* @brief Status label for SPAT messages skipped due to errors.
*/
const char* Key_SPATMessageSkipped = "SPAT messages skipped due to errors.";
/**
* @brief Count for SPAT messages skipped due to errors.
*/
uint _spatMessageSkipped = 0;

/**
* @brief Status label for Intersection Schedule messages skipped due to errors.
*/
const char* Key_ScheduleMessageSkipped = "Intersection Schedule messages skipped due to errors.";

/**
* @brief Count for Intersection Schedule messages skipped due to errors.
*/
uint _scheduleMessageSkipped = 0;

/**
* @brief Status label for MAP messages skipped due to errors.
*/
const char* Key_MAPMessageSkipped = "MAP messages skipped due to errors.";

/**
* @brief Count for MAP messages skipped due to errors.
*/
uint _mapMessageSkipped = 0;

/**
* @brief Status label for Mobility Operation messages skipped due to errors.
*/
const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors.";

/**
* @brief Count for Mobility Operation messages skipped due to errors.
*/
uint _mobilityOperationMessageSkipped = 0;

/**
* @brief Status label for Mobility Path messages skipped due to errors.
*/
const char* Key_MobilityPathMessageSkipped = "Mobility Path messages skipped due to errors.";

/**
* @brief Count for Mobility Path messages skipped due to errors.
*/
uint _mobilityPathMessageSkipped = 0;

/**
* @brief Status label for BSM messages skipped due to errors.
*/
const char* Key_BSMMessageSkipped = "BSM messages skipped due to errors.";

/**
* @brief Count for BSM messages skipped due to errors.
*/
uint _bsmMessageSkipped = 0;

/***
* Configurable indicator to run consumer and consume messages from kafka topics
* run the consumer if it equals = 1; otherwise = 0
**/
/**
* @brief Intersection Id for intersection
*/
std::string _intersectionId = "UNSET";
};
std::mutex _cfgLock;
Expand Down

0 comments on commit 9f6fe9b

Please sign in to comment.