Skip to content

Commit

Permalink
Update frameReceiver initial config handling
Browse files Browse the repository at this point in the history
This commit changes the frameReceiver initial configuration to avoid
binding the frame ready & receive channels to default endpoints before
configuration is fully applied. This makes the FR behave more like the
FP and therefore requires that the endpoints are explicitly defined in
configuration.
  • Loading branch information
timcnicholls committed Feb 29, 2024
1 parent f11b8bf commit cb77df9
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 113 deletions.
3 changes: 2 additions & 1 deletion cpp/frameProcessor/include/FrameProcessorApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class FrameProcessorApp

int parse_arguments(int argc, char** argv);
void configure_controller(OdinData::IpcMessage& config_msg);
void run();
int run(void);
static void stop(void);

private:

Expand Down
20 changes: 14 additions & 6 deletions cpp/frameProcessor/src/FrameProcessorApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@ int FrameProcessorApp::parse_arguments(int argc, char** argv)
return rc;
}

void FrameProcessorApp::run(void)
int FrameProcessorApp::run(void)
{

int rc = 0;

LOG4CXX_INFO(logger_, "frameProcessor version " << ODIN_DATA_VERSION_STR << " starting up");

// Instantiate a controller
Expand Down Expand Up @@ -238,13 +240,20 @@ void FrameProcessorApp::run(void)

controller_->run();

LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameProcessorController run finished. Stopping app.");
LOG4CXX_DEBUG_LEVEL(1, logger_, "frameProcessor stopped");

}
catch (OdinData::OdinDataException& e)
{
LOG4CXX_ERROR(logger_, "frameProcessor run failed: " << e.what());
rc = 1;
}
catch (const std::exception& e) {
LOG4CXX_ERROR(logger_, "Caught unhandled exception in FrameProcessor, application will terminate: " << e.what());
throw;
LOG4CXX_ERROR(logger_, "Caught unhandled exception in frameProcessor, application will terminate: " << e.what());
rc = 1;
}

return rc;
}

/**
Expand Down Expand Up @@ -283,8 +292,7 @@ int main (int argc, char** argv)

if (rc == -1) {
// Run the application
app.run();
rc = 0;
rc = app.run();
}

return rc;
Expand Down
23 changes: 4 additions & 19 deletions cpp/frameReceiver/include/FrameReceiverApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,15 @@
#ifndef FRAMERECEIVERAPP_H_
#define FRAMERECEIVERAPP_H_

#include <string>
using namespace std;

#include <time.h>

#include <log4cxx/logger.h>
#include <log4cxx/basicconfigurator.h>
#include <log4cxx/propertyconfigurator.h>
#include <log4cxx/helpers/exception.h>
#include <log4cxx/xml/domconfigurator.h>
using namespace log4cxx;
using namespace log4cxx::helpers;

#include <boost/shared_ptr.hpp>

#include "DebugLevelLogger.h"
#include "FrameReceiverConfig.h"
#include "FrameReceiverController.h"

using namespace OdinData;

namespace FrameReceiver
{

//! Frame receiver application class/
//! Frame receiver application class
//!
//! This class implements the main functionality of the FrameReceiver application, parsing command line
//! and configuraiton file options before creating, configuring and running the controller.
Expand All @@ -47,7 +31,7 @@ class FrameReceiverApp

int parse_arguments(int argc, char** argv);

void run(void);
int run(void);
static void stop(void);

private:
Expand All @@ -56,7 +40,8 @@ class FrameReceiverApp
static boost::shared_ptr<FrameReceiverController> controller_; //!< FrameReceiver controller object

// Command line options
FrameReceiverConfig config_; //!< Configuration storage object
unsigned int io_threads_; //!< Number of IO threads for IPC channels
std::string ctrl_channel_endpoint_; //!< IPC channel endpoint for control communication with other processes
std::string config_file_; //!< Full path to JSON configuration file

};
Expand Down
9 changes: 4 additions & 5 deletions cpp/frameReceiver/include/FrameReceiverConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ class FrameReceiverConfig
rx_type_(Defaults::default_rx_type),
rx_address_(Defaults::default_rx_address),
rx_recv_buffer_size_(Defaults::default_rx_recv_buffer_size),
io_threads_(OdinData::Defaults::default_io_threads),
rx_channel_endpoint_(Defaults::default_rx_chan_endpoint),
ctrl_channel_endpoint_(Defaults::default_ctrl_chan_endpoint),
frame_ready_endpoint_(OdinData::Defaults::default_frame_ready_endpoint),
frame_release_endpoint_(OdinData::Defaults::default_frame_release_endpoint),
rx_channel_endpoint_(""),
ctrl_channel_endpoint_(""),
frame_ready_endpoint_(""),
frame_release_endpoint_(""),
shared_buffer_name_(OdinData::Defaults::default_shared_buffer_name),
frame_timeout_ms_(Defaults::default_frame_timeout_ms),
enable_packet_logging_(Defaults::default_enable_packet_logging),
Expand Down
8 changes: 4 additions & 4 deletions cpp/frameReceiver/include/FrameReceiverController.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ using namespace OdinData;
namespace FrameReceiver
{

//! Frame receiver controller class/
//! Frame receiver controller class
//!
//! This class implements the main controller of the FrameReceiver application, providing
//! the overall framework for running the frame receiver, capturing frames of incoming data and
Expand All @@ -51,7 +51,7 @@ namespace FrameReceiver

public:

FrameReceiverController (FrameReceiverConfig& config);
FrameReceiverController(unsigned int num_io_threads);
virtual ~FrameReceiverController ();
void configure(OdinData::IpcMessage& config_msg, OdinData::IpcMessage& config_reply);
void run(void);
Expand Down Expand Up @@ -95,7 +95,7 @@ namespace FrameReceiver
FrameDecoderPtr frame_decoder_; //!< Frame decoder object
SharedBufferManagerPtr buffer_manager_; //!< Buffer manager object

FrameReceiverConfig& config_; //!< Configuration storage object
FrameReceiverConfig config_; //!< Configuration storage object
bool terminate_controller_; //!< Flag to signal termination of the controller

bool need_ipc_reconfig_; //!< Flag to signal reconfiguration of IPC channels
Expand All @@ -108,7 +108,7 @@ namespace FrameReceiver
bool buffer_manager_configured_; //!< Indicates that the buffer manager is configured
bool rx_thread_configured_; //!< Indicates that the RX thread is configured
bool configuration_complete_; //!< Indicates that all components are configured

IpcContext& ipc_context_; //!< ZMQ context for IPC channels
IpcChannel rx_channel_; //!< Channel for communication with receiver thread
IpcChannel ctrl_channel_; //!< Channel for communication with control clients
Expand Down
62 changes: 38 additions & 24 deletions cpp/frameReceiver/src/FrameReceiverApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
#include <streambuf>
using namespace std;

#include <log4cxx/logger.h>
#include <log4cxx/basicconfigurator.h>
#include <log4cxx/propertyconfigurator.h>
#include <log4cxx/helpers/exception.h>
#include <log4cxx/xml/domconfigurator.h>
using namespace log4cxx;
using namespace log4cxx::helpers;

#include <boost/foreach.hpp>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
Expand Down Expand Up @@ -88,15 +96,15 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv)
// and in the configuration file
po::options_description config("Configuration options");
config.add_options()
("debug-level,d", po::value<unsigned int>()->default_value(debug_level),
("debug-level,d", po::value<unsigned int>()->default_value(debug_level),
"Set the debug level")
("log-config,l", po::value<string>(),
("log-config,l", po::value<string>(),
"Set the log4cxx logging configuration file")
("io-threads", po::value<unsigned int>()->default_value(OdinData::Defaults::default_io_threads),
("io-threads", po::value<unsigned int>()->default_value(OdinData::Defaults::default_io_threads),
"Set number of IPC channel IO threads")
("ctrl", po::value<std::string>()->default_value(FrameReceiver::Defaults::default_ctrl_chan_endpoint),
("ctrl", po::value<std::string>()->default_value(FrameReceiver::Defaults::default_ctrl_chan_endpoint),
"Set the control channel endpoint")
("config,c", po::value<std::string>()->default_value(OdinData::Defaults::default_json_config_file),
("config,c", po::value<std::string>(),
"Path to a JSON configuration file to submit to the application")
;

Expand Down Expand Up @@ -143,14 +151,14 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv)

if (vm.count("io-threads"))
{
config_.io_threads_ = vm["io-threads"].as<unsigned int>();
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << config_.io_threads_);
io_threads_ = vm["io-threads"].as<unsigned int>();
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << io_threads_);
}

if (vm.count("ctrl"))
{
config_.ctrl_channel_endpoint_ = vm["ctrl"].as<std::string>();
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << config_.ctrl_channel_endpoint_);
ctrl_channel_endpoint_ = vm["ctrl"].as<std::string>();
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << ctrl_channel_endpoint_);
}

if (vm.count("config"))
Expand Down Expand Up @@ -191,22 +199,24 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv)
//! the input configuration parameters and then run, blocking until completion
//! of execution.

void FrameReceiverApp::run(void)
int FrameReceiverApp::run(void)
{

int rc = 0;

LOG4CXX_INFO(logger_, "frameReceiver version " << ODIN_DATA_VERSION_STR << " starting up");

// Instantiate a controller
controller_ = boost::shared_ptr<FrameReceiverController>(
new FrameReceiverController(config_)
);
controller_ = boost::shared_ptr<FrameReceiverController>(new FrameReceiverController(io_threads_));

try {

OdinData::IpcMessage config_msg, config_reply;
config_.as_ipc_message(config_msg);
config_msg.set_param<bool>(CONFIG_FORCE_RECONFIG, true);

// Send an initial configuration message to the controller to configure control and RX thread
// channel endpoints
OdinData::IpcMessage config_msg;
OdinData::IpcMessage config_reply;
config_msg.set_param<std::string>(CONFIG_CTRL_ENDPOINT, ctrl_channel_endpoint_);
config_msg.set_param<std::string>(CONFIG_RX_ENDPOINT, Defaults::default_rx_chan_endpoint);
controller_->configure(config_msg, config_reply);

// Check for a JSON configuration file option
Expand Down Expand Up @@ -256,24 +266,29 @@ void FrameReceiverApp::run(void)

controller_->run();

LOG4CXX_INFO(logger_, "frameReceiver stopped");

}
catch (OdinData::OdinDataException& e)
{
LOG4CXX_ERROR(logger_, "Frame receiver run failed: " << e.what());
LOG4CXX_ERROR(logger_, "frameReceiver run failed: " << e.what());
rc = 1;
}
catch (exception& e)
{
LOG4CXX_ERROR(logger_, "Generic exception during frame receiver run:\n" << e.what());
LOG4CXX_ERROR(logger_, "Generic exception during frameReceiver run:\n" << e.what());
rc = 1;
}
catch (...)
{
LOG4CXX_ERROR(logger_, "Unexpected exception during frame receiver run");
LOG4CXX_ERROR(logger_, "Unexpected exception during frameReceiver run");
rc = 1;
}

LOG4CXX_INFO(logger_, "Frame receiver stopped");
return rc;
}

//! Stop the FrameRecevierApp.
//! Stop the frameRecevierApp.
//!
//! This method stops the frame receiver by signalling to the controller to stop.

Expand Down Expand Up @@ -314,8 +329,7 @@ int main (int argc, char** argv)

if (rc == -1) {
// Run the application
app.run();
rc = 0;
rc = app.run();
}

return rc;
Expand Down
Loading

0 comments on commit cb77df9

Please sign in to comment.