Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car committed Jun 30, 2023
1 parent 108c67d commit ad23110
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
DOCKER_HOST_IP: 127.0.0.1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1"
KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
KAFKA_OFFSETS_RETENTION_MINUTES: 1
volumes:
Expand Down
7 changes: 7 additions & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ namespace tmx::utils
return false;
}

if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK)
{
FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl;
return false;
}

// set consumer group
if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK)
{
Expand Down Expand Up @@ -88,6 +94,7 @@ namespace tmx::utils
void kafka_consumer_worker::stop()
{
_run = false;
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
}
Expand Down
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace tmx::utils {
const std::string GROUP_ID="group.id";
const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes";
const std::string ENABLE_PARTITION_END_OF="enable.partition.eof";
const std::string ENABLE_AUTO_COMMIT="enable.auto.commit";

//maximum size for pulling message from a single partition at a time
std::string STR_FETCH_NUM = "10240000";
Expand Down
17 changes: 13 additions & 4 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace CARMAStreetsPlugin {
*/
CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
PluginClient(name) {

PLOG(logERROR) << "CARMAStreetsPlugin Constructor.";
AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage);
AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage);
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
Expand All @@ -31,6 +33,11 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
}

CARMAStreetsPlugin::~CARMAStreetsPlugin() {
PLOG(logERROR) << "CARMAStreetsPlugin Destructor.";
//Todo: It does not seem the desctructor is called.
_spat_kafka_consumer_ptr->stop();
_scheduing_plan_kafka_consumer_ptr->stop();
_ssm_kafka_consumer_ptr->stop();
}

void CARMAStreetsPlugin::UpdateConfigSettings() {
Expand Down Expand Up @@ -95,7 +102,8 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
ss << uuid;
_subscribeToSpatConsumerGroupId += ss.str();
_subscribeToSchedulingPlanConsumerGroupId += ss.str();
PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId;
_subscribeToSSMConsumerGroupId += ss.str();
//Todo further enhancement: Temporary fix for the consumer rebalancing die to multiple consumers join the same group upon restarting plugin.
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId);
Expand All @@ -110,9 +118,9 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
PLOG(logERROR) <<"Kafka consumers init() failed!";
}

thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
thread_ssm = new std::thread(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -795,6 +803,7 @@ int CARMAStreetsPlugin::Main() {

usleep(100000); //sleep for microseconds set from config.
}

return (EXIT_SUCCESS);
}
} /* namespace */
Expand Down
3 changes: 0 additions & 3 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ class CARMAStreetsPlugin: public PluginClient {
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;
std::thread* thread_schpl;
std::thread* thread_spat;
std::thread* thread_ssm;

/**
* @brief Status label for SPAT messages skipped due to errors.
Expand Down

0 comments on commit ad23110

Please sign in to comment.