From cb77df983d6f684e30d89961d39ae058dde92cae Mon Sep 17 00:00:00 2001 From: Tim Nicholls Date: Fri, 23 Feb 2024 08:56:11 +0000 Subject: [PATCH] Update frameReceiver initial config handling This commit changes the frameReceiver initial configuration to avoid binding the frame ready & receive channels to default endpoints before configuration is fully applied. This makes the FR behave more like the FP and therefore requires that the endpoints are explicitly defined in configuration. --- .../include/FrameProcessorApp.h | 3 +- cpp/frameProcessor/src/FrameProcessorApp.cpp | 20 ++- cpp/frameReceiver/include/FrameReceiverApp.h | 23 +--- .../include/FrameReceiverConfig.h | 9 +- .../include/FrameReceiverController.h | 8 +- cpp/frameReceiver/src/FrameReceiverApp.cpp | 62 ++++++---- .../src/FrameReceiverController.cpp | 114 +++++++++--------- .../test/FrameReceiverUDPRxThreadUnitTest.cpp | 4 + .../integrationTest/config/dummyUDP-fr.json | 2 + 9 files changed, 132 insertions(+), 113 deletions(-) diff --git a/cpp/frameProcessor/include/FrameProcessorApp.h b/cpp/frameProcessor/include/FrameProcessorApp.h index 0d9161bc..aecb099d 100644 --- a/cpp/frameProcessor/include/FrameProcessorApp.h +++ b/cpp/frameProcessor/include/FrameProcessorApp.h @@ -23,7 +23,8 @@ class FrameProcessorApp int parse_arguments(int argc, char** argv); void configure_controller(OdinData::IpcMessage& config_msg); - void run(); + int run(void); + static void stop(void); private: diff --git a/cpp/frameProcessor/src/FrameProcessorApp.cpp b/cpp/frameProcessor/src/FrameProcessorApp.cpp index 77da547a..2f96fabc 100644 --- a/cpp/frameProcessor/src/FrameProcessorApp.cpp +++ b/cpp/frameProcessor/src/FrameProcessorApp.cpp @@ -177,9 +177,11 @@ int FrameProcessorApp::parse_arguments(int argc, char** argv) return rc; } -void FrameProcessorApp::run(void) +int FrameProcessorApp::run(void) { + int rc = 0; + LOG4CXX_INFO(logger_, "frameProcessor version " << ODIN_DATA_VERSION_STR << " starting up"); // Instantiate a controller @@ -238,13 +240,20 @@ void FrameProcessorApp::run(void) controller_->run(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameProcessorController run finished. Stopping app."); + LOG4CXX_DEBUG_LEVEL(1, logger_, "frameProcessor stopped"); } + catch (OdinData::OdinDataException& e) + { + LOG4CXX_ERROR(logger_, "frameProcessor run failed: " << e.what()); + rc = 1; + } catch (const std::exception& e) { - LOG4CXX_ERROR(logger_, "Caught unhandled exception in FrameProcessor, application will terminate: " << e.what()); - throw; + LOG4CXX_ERROR(logger_, "Caught unhandled exception in frameProcessor, application will terminate: " << e.what()); + rc = 1; } + + return rc; } /** @@ -283,8 +292,7 @@ int main (int argc, char** argv) if (rc == -1) { // Run the application - app.run(); - rc = 0; + rc = app.run(); } return rc; diff --git a/cpp/frameReceiver/include/FrameReceiverApp.h b/cpp/frameReceiver/include/FrameReceiverApp.h index 478ae30a..1aeb2926 100644 --- a/cpp/frameReceiver/include/FrameReceiverApp.h +++ b/cpp/frameReceiver/include/FrameReceiverApp.h @@ -8,23 +8,7 @@ #ifndef FRAMERECEIVERAPP_H_ #define FRAMERECEIVERAPP_H_ -#include -using namespace std; - -#include - -#include -#include -#include -#include -#include -using namespace log4cxx; -using namespace log4cxx::helpers; - -#include - #include "DebugLevelLogger.h" -#include "FrameReceiverConfig.h" #include "FrameReceiverController.h" using namespace OdinData; @@ -32,7 +16,7 @@ using namespace OdinData; namespace FrameReceiver { -//! Frame receiver application class/ +//! Frame receiver application class //! //! This class implements the main functionality of the FrameReceiver application, parsing command line //! and configuraiton file options before creating, configuring and running the controller. @@ -47,7 +31,7 @@ class FrameReceiverApp int parse_arguments(int argc, char** argv); - void run(void); + int run(void); static void stop(void); private: @@ -56,7 +40,8 @@ class FrameReceiverApp static boost::shared_ptr controller_; //!< FrameReceiver controller object // Command line options - FrameReceiverConfig config_; //!< Configuration storage object + unsigned int io_threads_; //!< Number of IO threads for IPC channels + std::string ctrl_channel_endpoint_; //!< IPC channel endpoint for control communication with other processes std::string config_file_; //!< Full path to JSON configuration file }; diff --git a/cpp/frameReceiver/include/FrameReceiverConfig.h b/cpp/frameReceiver/include/FrameReceiverConfig.h index f3e4959e..cacb12a9 100644 --- a/cpp/frameReceiver/include/FrameReceiverConfig.h +++ b/cpp/frameReceiver/include/FrameReceiverConfig.h @@ -60,11 +60,10 @@ class FrameReceiverConfig rx_type_(Defaults::default_rx_type), rx_address_(Defaults::default_rx_address), rx_recv_buffer_size_(Defaults::default_rx_recv_buffer_size), - io_threads_(OdinData::Defaults::default_io_threads), - rx_channel_endpoint_(Defaults::default_rx_chan_endpoint), - ctrl_channel_endpoint_(Defaults::default_ctrl_chan_endpoint), - frame_ready_endpoint_(OdinData::Defaults::default_frame_ready_endpoint), - frame_release_endpoint_(OdinData::Defaults::default_frame_release_endpoint), + rx_channel_endpoint_(""), + ctrl_channel_endpoint_(""), + frame_ready_endpoint_(""), + frame_release_endpoint_(""), shared_buffer_name_(OdinData::Defaults::default_shared_buffer_name), frame_timeout_ms_(Defaults::default_frame_timeout_ms), enable_packet_logging_(Defaults::default_enable_packet_logging), diff --git a/cpp/frameReceiver/include/FrameReceiverController.h b/cpp/frameReceiver/include/FrameReceiverController.h index c7bc0756..2406ea99 100644 --- a/cpp/frameReceiver/include/FrameReceiverController.h +++ b/cpp/frameReceiver/include/FrameReceiverController.h @@ -39,7 +39,7 @@ using namespace OdinData; namespace FrameReceiver { - //! Frame receiver controller class/ + //! Frame receiver controller class //! //! This class implements the main controller of the FrameReceiver application, providing //! the overall framework for running the frame receiver, capturing frames of incoming data and @@ -51,7 +51,7 @@ namespace FrameReceiver public: - FrameReceiverController (FrameReceiverConfig& config); + FrameReceiverController(unsigned int num_io_threads); virtual ~FrameReceiverController (); void configure(OdinData::IpcMessage& config_msg, OdinData::IpcMessage& config_reply); void run(void); @@ -95,7 +95,7 @@ namespace FrameReceiver FrameDecoderPtr frame_decoder_; //!< Frame decoder object SharedBufferManagerPtr buffer_manager_; //!< Buffer manager object - FrameReceiverConfig& config_; //!< Configuration storage object + FrameReceiverConfig config_; //!< Configuration storage object bool terminate_controller_; //!< Flag to signal termination of the controller bool need_ipc_reconfig_; //!< Flag to signal reconfiguration of IPC channels @@ -108,7 +108,7 @@ namespace FrameReceiver bool buffer_manager_configured_; //!< Indicates that the buffer manager is configured bool rx_thread_configured_; //!< Indicates that the RX thread is configured bool configuration_complete_; //!< Indicates that all components are configured - + IpcContext& ipc_context_; //!< ZMQ context for IPC channels IpcChannel rx_channel_; //!< Channel for communication with receiver thread IpcChannel ctrl_channel_; //!< Channel for communication with control clients diff --git a/cpp/frameReceiver/src/FrameReceiverApp.cpp b/cpp/frameReceiver/src/FrameReceiverApp.cpp index 7874224d..bdf23382 100644 --- a/cpp/frameReceiver/src/FrameReceiverApp.cpp +++ b/cpp/frameReceiver/src/FrameReceiverApp.cpp @@ -12,6 +12,14 @@ #include using namespace std; +#include +#include +#include +#include +#include +using namespace log4cxx; +using namespace log4cxx::helpers; + #include #include namespace po = boost::program_options; @@ -88,15 +96,15 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) // and in the configuration file po::options_description config("Configuration options"); config.add_options() - ("debug-level,d", po::value()->default_value(debug_level), + ("debug-level,d", po::value()->default_value(debug_level), "Set the debug level") - ("log-config,l", po::value(), + ("log-config,l", po::value(), "Set the log4cxx logging configuration file") - ("io-threads", po::value()->default_value(OdinData::Defaults::default_io_threads), + ("io-threads", po::value()->default_value(OdinData::Defaults::default_io_threads), "Set number of IPC channel IO threads") - ("ctrl", po::value()->default_value(FrameReceiver::Defaults::default_ctrl_chan_endpoint), + ("ctrl", po::value()->default_value(FrameReceiver::Defaults::default_ctrl_chan_endpoint), "Set the control channel endpoint") - ("config,c", po::value()->default_value(OdinData::Defaults::default_json_config_file), + ("config,c", po::value(), "Path to a JSON configuration file to submit to the application") ; @@ -143,14 +151,14 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) if (vm.count("io-threads")) { - config_.io_threads_ = vm["io-threads"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << config_.io_threads_); + io_threads_ = vm["io-threads"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << io_threads_); } if (vm.count("ctrl")) { - config_.ctrl_channel_endpoint_ = vm["ctrl"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << config_.ctrl_channel_endpoint_); + ctrl_channel_endpoint_ = vm["ctrl"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << ctrl_channel_endpoint_); } if (vm.count("config")) @@ -191,22 +199,24 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) //! the input configuration parameters and then run, blocking until completion //! of execution. -void FrameReceiverApp::run(void) +int FrameReceiverApp::run(void) { + int rc = 0; + LOG4CXX_INFO(logger_, "frameReceiver version " << ODIN_DATA_VERSION_STR << " starting up"); // Instantiate a controller - controller_ = boost::shared_ptr( - new FrameReceiverController(config_) - ); + controller_ = boost::shared_ptr(new FrameReceiverController(io_threads_)); try { - OdinData::IpcMessage config_msg, config_reply; - config_.as_ipc_message(config_msg); - config_msg.set_param(CONFIG_FORCE_RECONFIG, true); - + // Send an initial configuration message to the controller to configure control and RX thread + // channel endpoints + OdinData::IpcMessage config_msg; + OdinData::IpcMessage config_reply; + config_msg.set_param(CONFIG_CTRL_ENDPOINT, ctrl_channel_endpoint_); + config_msg.set_param(CONFIG_RX_ENDPOINT, Defaults::default_rx_chan_endpoint); controller_->configure(config_msg, config_reply); // Check for a JSON configuration file option @@ -256,24 +266,29 @@ void FrameReceiverApp::run(void) controller_->run(); + LOG4CXX_INFO(logger_, "frameReceiver stopped"); + } catch (OdinData::OdinDataException& e) { - LOG4CXX_ERROR(logger_, "Frame receiver run failed: " << e.what()); + LOG4CXX_ERROR(logger_, "frameReceiver run failed: " << e.what()); + rc = 1; } catch (exception& e) { - LOG4CXX_ERROR(logger_, "Generic exception during frame receiver run:\n" << e.what()); + LOG4CXX_ERROR(logger_, "Generic exception during frameReceiver run:\n" << e.what()); + rc = 1; } catch (...) { - LOG4CXX_ERROR(logger_, "Unexpected exception during frame receiver run"); + LOG4CXX_ERROR(logger_, "Unexpected exception during frameReceiver run"); + rc = 1; } - LOG4CXX_INFO(logger_, "Frame receiver stopped"); + return rc; } -//! Stop the FrameRecevierApp. +//! Stop the frameRecevierApp. //! //! This method stops the frame receiver by signalling to the controller to stop. @@ -314,8 +329,7 @@ int main (int argc, char** argv) if (rc == -1) { // Run the application - app.run(); - rc = 0; + rc = app.run(); } return rc; diff --git a/cpp/frameReceiver/src/FrameReceiverController.cpp b/cpp/frameReceiver/src/FrameReceiverController.cpp index f61f637c..f97d9221 100644 --- a/cpp/frameReceiver/src/FrameReceiverController.cpp +++ b/cpp/frameReceiver/src/FrameReceiverController.cpp @@ -22,9 +22,8 @@ using namespace FrameReceiver; //! of the controller. Configuration and running are deferred to the //! configure() and run() methods respectively. //! -FrameReceiverController::FrameReceiverController (FrameReceiverConfig& config) : +FrameReceiverController::FrameReceiverController(unsigned int num_io_threads) : logger_(log4cxx::Logger::getLogger("FR.Controller")), - config_(config), terminate_controller_(false), need_ipc_reconfig_(false), need_decoder_reconfig_(false), @@ -35,7 +34,7 @@ FrameReceiverController::FrameReceiverController (FrameReceiverConfig& config) : buffer_manager_configured_(false), rx_thread_configured_(false), configuration_complete_(false), - ipc_context_(IpcContext::Instance(config.io_threads_)), + ipc_context_(IpcContext::Instance(num_io_threads)), rx_channel_(ZMQ_ROUTER), ctrl_channel_(ZMQ_ROUTER), frame_ready_channel_(ZMQ_PUB), @@ -66,7 +65,7 @@ FrameReceiverController::~FrameReceiverController () //! the frame decoder, frame buffer manager and RX thread are conditionally configured. //! //! \param[in] config_msg - IpcMessage containing configuration parameters -//! `param[out] reply_msg - Reply IpcMessage indicating success or failure of actions. +//! \param[out] reply_msg - Reply IpcMessage indicating success or failure of actions. //! void FrameReceiverController::configure(OdinData::IpcMessage& config_msg, OdinData::IpcMessage& config_reply) @@ -74,17 +73,6 @@ void FrameReceiverController::configure(OdinData::IpcMessage& config_msg, LOG4CXX_DEBUG_LEVEL(2, logger_, "Configuration submitted: " << config_msg.encode()); - // Determine if a forced reconfiguration of all parts of the system is requested - // and set up the individual reconfiguration flags appropriately. These can then - // be modified at each configuration step to handle interdependencies. - - bool force_reconfig = config_msg.get_param(CONFIG_FORCE_RECONFIG, false); - - need_ipc_reconfig_ = force_reconfig; - need_decoder_reconfig_ = force_reconfig; - need_buffer_manager_reconfig_ = force_reconfig; - need_rx_thread_reconfig_ = force_reconfig; - config_reply.set_msg_val(config_msg.get_msg_val()); try { @@ -118,12 +106,12 @@ void FrameReceiverController::configure(OdinData::IpcMessage& config_msg, buffer_manager_configured_ & rx_thread_configured_; // Construct the acknowledgement reply, indicating in the parameters which elements - // have been reconfigured + // have been configured config_reply.set_msg_type(OdinData::IpcMessage::MsgTypeAck); - config_reply.set_param("reconfigured/ipc", need_ipc_reconfig_); - config_reply.set_param("reconfigured/decoder", need_decoder_reconfig_); - config_reply.set_param("reconfigured/buffer_manager", need_buffer_manager_reconfig_); - config_reply.set_param("reconfigured/rx_thread", need_rx_thread_reconfig_); + config_reply.set_param("configured/ipc", ipc_configured_); + config_reply.set_param("configured/decoder", decoder_configured_); + config_reply.set_param("configured/buffer_manager", buffer_manager_configured_); + config_reply.set_param("configured/rx_thread", rx_thread_configured_); } catch (FrameReceiverException& e) { @@ -208,54 +196,72 @@ void FrameReceiverController::stop(const bool deferred) void FrameReceiverController::configure_ipc_channels(OdinData::IpcMessage& config_msg) { + static bool ctrl_channel_configured = false; + static bool rx_channel_configured = false; + static bool ready_channel_configured = false; + static bool release_channel_configured = false; + // Clear the IPC config status until successful completion ipc_configured_ = false; // If a new control endpoint is specified, bind the control channel - std::string ctrl_endpoint = config_msg.get_param( - CONFIG_CTRL_ENDPOINT, config_.ctrl_channel_endpoint_); - if (need_ipc_reconfig_ || (ctrl_endpoint != config_.ctrl_channel_endpoint_)) - { - this->unbind_channel(&ctrl_channel_, config_.ctrl_channel_endpoint_, true); - config_.ctrl_channel_endpoint_ = ctrl_endpoint; - this->setup_control_channel(ctrl_endpoint); + if (config_msg.has_param(CONFIG_CTRL_ENDPOINT)) { + std::string ctrl_endpoint = config_msg.get_param(CONFIG_CTRL_ENDPOINT); + if (ctrl_endpoint != config_.ctrl_channel_endpoint_) + { + this->unbind_channel(&ctrl_channel_, config_.ctrl_channel_endpoint_, true); + this->setup_control_channel(ctrl_endpoint); + config_.ctrl_channel_endpoint_ = ctrl_endpoint; + ctrl_channel_configured = true; + } } // If a new endpoint is specified, bind the RX thread channel - std::string rx_endpoint = config_msg.get_param( - CONFIG_RX_ENDPOINT, config_.rx_channel_endpoint_); - if (need_ipc_reconfig_ || (rx_endpoint != config_.rx_channel_endpoint_)) - { - this->unbind_channel(&rx_channel_, config_.rx_channel_endpoint_, false); - config_.rx_channel_endpoint_ = rx_endpoint; - this->setup_rx_channel(rx_endpoint); + if (config_msg.has_param(CONFIG_RX_ENDPOINT)) { + std::string rx_endpoint = config_msg.get_param(CONFIG_RX_ENDPOINT); + if (rx_endpoint != config_.rx_channel_endpoint_) + { + this->unbind_channel(&rx_channel_, config_.rx_channel_endpoint_, false); + this->setup_rx_channel(rx_endpoint); + config_.rx_channel_endpoint_ = rx_endpoint; + rx_channel_configured = true; - // The RX thread will have to be reconfigured if this endpoint changes - need_rx_thread_reconfig_ = true; + // The RX thread will have to be reconfigured if this endpoint changes + need_rx_thread_reconfig_ = true; + } } - // If the endpoint is specified, bind the frame ready notification channel - std::string frame_ready_endpoint = config_msg.get_param( - CONFIG_FRAME_READY_ENDPOINT, config_.frame_ready_endpoint_); - if (need_ipc_reconfig_ || (frame_ready_endpoint != config_.frame_ready_endpoint_)) - { - this->unbind_channel(&frame_ready_channel_, config_.frame_ready_endpoint_, false); - config_.frame_ready_endpoint_ = frame_ready_endpoint; - this->setup_frame_ready_channel(frame_ready_endpoint); + // If a new endpoint is specified, bind the frame ready notification channel + if (config_msg.has_param(CONFIG_FRAME_READY_ENDPOINT)) { + std::string frame_ready_endpoint = + config_msg.get_param(CONFIG_FRAME_READY_ENDPOINT); + if (frame_ready_endpoint != config_.frame_ready_endpoint_) + { + this->unbind_channel(&frame_ready_channel_, config_.frame_ready_endpoint_, false); + this->setup_frame_ready_channel(frame_ready_endpoint); + config_.frame_ready_endpoint_ = frame_ready_endpoint; + ready_channel_configured = true; + } } - // If the endpoint is specified, bind the frame release notification channel - std::string frame_release_endpoint = config_msg.get_param( - CONFIG_FRAME_RELEASE_ENDPOINT, config_.frame_release_endpoint_); - if (need_ipc_reconfig_ || (frame_release_endpoint != config_.frame_release_endpoint_)) - { - this->unbind_channel(&frame_release_channel_, config_.frame_release_endpoint_, false); - config_.frame_release_endpoint_ = frame_release_endpoint; - this->setup_frame_release_channel(frame_release_endpoint); + // If a new endpoint is specified, bind the frame release notification channel + if (config_msg.has_param(CONFIG_FRAME_RELEASE_ENDPOINT)) { + std::string frame_release_endpoint = + config_msg.get_param(CONFIG_FRAME_RELEASE_ENDPOINT); + if (frame_release_endpoint != config_.frame_release_endpoint_) + { + this->unbind_channel(&frame_release_channel_, config_.frame_release_endpoint_, false); + this->setup_frame_release_channel(frame_release_endpoint); + config_.frame_release_endpoint_ = frame_release_endpoint; + release_channel_configured = true; + } } - // Flag successful completion of IPC channel configuration - ipc_configured_ = true; + // Flag successful completion of IPC channel configuration if all channels configured + ipc_configured_ = ( + ctrl_channel_configured && rx_channel_configured && + ready_channel_configured && ready_channel_configured + ); } //! Set up the control channel. diff --git a/cpp/frameReceiver/test/FrameReceiverUDPRxThreadUnitTest.cpp b/cpp/frameReceiver/test/FrameReceiverUDPRxThreadUnitTest.cpp index 1625e862..89296960 100644 --- a/cpp/frameReceiver/test/FrameReceiverUDPRxThreadUnitTest.cpp +++ b/cpp/frameReceiver/test/FrameReceiverUDPRxThreadUnitTest.cpp @@ -29,8 +29,12 @@ class FrameReceiverRxThreadTestProxy #ifdef __MACH__ config_.rx_recv_buffer_size_ = 1048576; #endif + + config_.rx_channel_endpoint_ = Defaults::default_rx_chan_endpoint; + } + std::string& get_rx_channel_endpoint(void) { return config_.rx_channel_endpoint_; diff --git a/cpp/test/integrationTest/config/dummyUDP-fr.json b/cpp/test/integrationTest/config/dummyUDP-fr.json index 006c8e03..0fd402cb 100644 --- a/cpp/test/integrationTest/config/dummyUDP-fr.json +++ b/cpp/test/integrationTest/config/dummyUDP-fr.json @@ -1,5 +1,7 @@ [ { + "frame_ready_endpoint": "tcp://127.0.0.1:5001", + "frame_release_endpoint": "tcp://127.0.0.1:5002", "decoder_type": "DummyUDP", "decoder_path": "${CMAKE_INSTALL_PREFIX}/lib", "rx_ports": "61649",