diff --git a/cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp b/cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp index b6f76b87..ad0c81ea 100644 --- a/cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp +++ b/cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp @@ -139,7 +139,7 @@ void DummyTCPFrameDecoder::monitor_buffers(void) {} void DummyTCPFrameDecoder::get_status(const std::string param_prefix, OdinData::IpcMessage &status_msg) { - status_msg.set_param(param_prefix + "name", + status_msg.set_param(param_prefix + "class", std::string("DummyTCPFrameDecoder")); } diff --git a/cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp b/cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp index de7019b0..5ecccf70 100644 --- a/cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp +++ b/cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp @@ -459,7 +459,7 @@ void DummyUDPFrameDecoder::get_status(const std::string param_prefix, OdinData::IpcMessage& status_msg) { status_get_count_++; - status_msg.set_param(param_prefix + "name", std::string("DummyUDPFrameDecoder")); + status_msg.set_param(param_prefix + "class", std::string("DummyUDPFrameDecoder")); status_msg.set_param(param_prefix + "status_get_count", status_get_count_); status_msg.set_param(param_prefix + "packets_received", packets_received_); status_msg.set_param(param_prefix + "packets_lost", packets_lost_); diff --git a/python/src/odin_data/control/frame_processor_adapter.py b/python/src/odin_data/control/frame_processor_adapter.py new file mode 100644 index 00000000..9f2521ba --- /dev/null +++ b/python/src/odin_data/control/frame_processor_adapter.py @@ -0,0 +1,6 @@ +from odin_data.control.odin_data_adapter import OdinDataAdapter +from odin_data.control.frame_processor_controller import FrameProcessorController + + +class FrameProcessorAdapter(OdinDataAdapter): + _controller_cls = FrameProcessorController diff --git a/python/src/odin_data/control/frame_processor_controller.py b/python/src/odin_data/control/frame_processor_controller.py new file mode 100644 index 00000000..3ddea738 --- /dev/null +++ b/python/src/odin_data/control/frame_processor_controller.py @@ -0,0 +1,148 @@ +from odin_data.control.odin_data_controller import OdinDataController +import logging + + +class FrameProcessorController(OdinDataController): + def __init__(self, name, endpoints, update_interval=0.5): + super().__init__(name, endpoints, update_interval) + # If we are a FP controller then we need to track the writing state + self._first_update = False + self._writing = [False]*len(self._clients) + + @property + def first_update(self): + return self._first_update + + def setup_parameter_tree(self): + super(FrameProcessorController, self).setup_parameter_tree() + self._acquisition_id = "" + self._write = False + self._frames = 0 + self._file_path = "" + self._file_prefix = "" + self._file_extension = "h5" + self._tree["config"] = { + "hdf": { + "acquisition_id": ( + self._get("_acquisition_id"), + lambda v: self._set("_acquisition_id", v), + {}, + ), + "frames": ( + self._get("_frames"), + lambda v: self._set("_frames", v), + {}, + ), + "file": { + "path": ( + self._get("_file_path"), + lambda v: self._set("_file_path", v), + {}, + ), + "prefix": ( + self._get("_file_prefix"), + lambda v: self._set("_file_prefix", v), + {}, + ), + "extension": ( + self._get("_file_extension"), + lambda v: self._set("_file_extension", v), + {}, + ), + }, + "write": ( + self._get("_write"), + self.execute_write, + {} + ) + }, + } + + def execute_write(self, value): + # Queue the write command + logging.debug("Executing write command with value: {}".format(value)) + processes = len(self._clients) + + if value: + # Before attempting to write files, make some simple error checks + + # Check if we have a valid buffer status from the FR adapter + + # valid, reason = self.check_controller_status() + # if not valid: + # raise RuntimeError(reason) + + # Check the file prefix is not empty + if str(self._file_prefix) == '': + raise RuntimeError("File prefix must not be empty") + + # First setup the rank + self.setup_rank() + + try: + for rank in range(processes): + # Setup the number of processes and the rank for each client + config = { + 'hdf': { + 'frames': self._frames + } + } + logging.info("Sending config to FP odin adapter %i: %s", rank, config) + self._clients[rank].send_configuration(config) + config = { + 'hdf': { + 'acquisition_id': self._acquisition_id, + 'file': { + 'path': str(self._file_path), + 'prefix': str(self._file_prefix), + 'extension': str(self._file_extension) + } + } + } + logging.info("Sending config to FP odin adapter %i: %s", rank, config) + self._clients[rank].send_configuration(config) + except Exception as err: + logging.error("Failed to send information to FP applications") + logging.error("Error: %s", err) + try: + config = {'hdf': {'write': value}} + for rank in range(processes): + logging.info("Sending config to FP odin adapter %i: %s", rank, config) + #self._odin_adapter_fps._controller.put(f"{rank}/config", config) + self._clients[rank].send_configuration(config) + except Exception as err: + logging.error("Failed to send write command to FP applications") + logging.error("Error: %s", err) + + def handle_client(self, client, index): + if "hdf" in client.parameters["status"]: + self._writing[index] = client.parameters["status"]["hdf"]["writing"] + # self._params.set("{}/config/hdf/write".format(index), writing[index]) + + def setup_rank(self): + # Attempt initialisation of the connected clients + processes = len(self._clients) + logging.info( + "Setting up rank information for {} FP processes".format(processes) + ) + rank = 0 + try: + for rank in range(processes): + # Setup the number of processes and the rank for each client + config = {"hdf": {"process": {"number": processes, "rank": rank}}} + logging.debug("Sending config to FP odin adapter %i: %s", rank, config) + self._clients[rank].send_configuration(config) + + except Exception as err: + logging.debug("Failed to send rank information to FP applications") + logging.error("Error: %s", err) + + def process_updates(self): + if not self._first_update: + self.setup_rank() + self._first_update = True + self._write = all(self._writing) + + # def check_controller_status(self): + # TODO: Need to check FR buffer status + # return True, "" diff --git a/python/src/odin_data/control/frame_receiver_adapter.py b/python/src/odin_data/control/frame_receiver_adapter.py index 257ae58b..1ec0bc7b 100644 --- a/python/src/odin_data/control/frame_receiver_adapter.py +++ b/python/src/odin_data/control/frame_receiver_adapter.py @@ -1,101 +1,6 @@ -""" -Created on 2nd August 2018 - -:author: Alan Greer -""" -import copy -import logging - from odin_data.control.odin_data_adapter import OdinDataAdapter +from odin_data.control.frame_receiver_controller import FrameReceiverController class FrameReceiverAdapter(OdinDataAdapter): - """ - OdinDataAdapter class - - This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system, - transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages - """ - VERSION_CHECK_CONFIG_ITEMS = ['decoder_path', 'decoder_type'] - - def __init__(self, **kwargs): - """ - Initialise the OdinDataAdapter object - - :param kwargs: - """ - logging.debug("FrameReceiverAdapter init called") - - super(FrameReceiverAdapter, self).__init__(**kwargs) - - self._decoder_config = [] - for ep in self._endpoints: - self._decoder_config.append(None) - - def require_version_check(self, param): - # If the parameter is in the version check list then request a version update - if param in self.VERSION_CHECK_CONFIG_ITEMS: - return True - return False - - def send_to_clients(self, request_command, parameters, client_index=-1): - """ - Intercept the base class send_to_clients method. - Keep a record of any decoder specific configuration items and then if a single decoder config - item is later changed send the full decoder configuration to the Frame Receiver application. - This is necessary as often a decoder config change will result in the complete tear down and re-init - of the Decoder class and so a full and consistent set of decoder config parameters are required. - - :param request_command: - :param parameters: - :param client_index: - """ - logging.debug("Original index: {} request_command: {} and parameters: {}".format(client_index, request_command, parameters)) - command, parameters = self.uri_params_to_dictionary(request_command, parameters) - - decoder_config = None - if command is None: - if 'decoder_config' in parameters: - logging.debug("Found decoder config: {}".format(parameters['decoder_config'])) - decoder_config = parameters['decoder_config'] - elif command == 'decoder_config': - logging.debug("Found decoder config: {}".format(parameters)) - decoder_config = parameters - - if decoder_config is not None: - if client_index == -1: - for index in range(len(self._decoder_config)): - if self._decoder_config[index] is None: - self._decoder_config[index] = decoder_config - else: - for item in decoder_config: - self._decoder_config[index][item] = decoder_config[item] - else: - if self._decoder_config[client_index] is None: - self._decoder_config[client_index] = decoder_config - else: - for item in decoder_config: - self._decoder_config[client_index][item] = decoder_config[item] - - # Now construct the new full message, inserting the full decoder config back into the parameters - if client_index == -1: - new_param_set = [] - # Loop over each decoder config item and send a list of commands - for dc in self._decoder_config: - if command is None: - if 'decoder_config' in parameters and dc is not None: - parameters['decoder_config'] = dc - elif command == 'decoder_config': - parameters = dc - new_param_set.append(copy.deepcopy(parameters)) - else: - if command is None: - if 'decoder_config' in parameters and self._decoder_config[client_index] is not None: - parameters['decoder_config'] = self._decoder_config[client_index] - elif command == 'decoder_config': - parameters = self._decoder_config[client_index] - new_param_set = parameters - - logging.debug("Updated full command: {} and parameter set: {}".format(command, new_param_set)) - - return super(FrameReceiverAdapter, self).send_to_clients(command, new_param_set, client_index) + _controller_cls = FrameReceiverController diff --git a/python/src/odin_data/control/frame_receiver_controller.py b/python/src/odin_data/control/frame_receiver_controller.py new file mode 100644 index 00000000..bd9ffae6 --- /dev/null +++ b/python/src/odin_data/control/frame_receiver_controller.py @@ -0,0 +1,5 @@ +from odin_data.control.odin_data_controller import OdinDataController + + +class FrameReceiverController(OdinDataController): + ... diff --git a/python/src/odin_data/control/odin_data_adapter.py b/python/src/odin_data/control/odin_data_adapter.py index 22bcc899..5acbde29 100644 --- a/python/src/odin_data/control/odin_data_adapter.py +++ b/python/src/odin_data/control/odin_data_adapter.py @@ -29,6 +29,7 @@ class OdinDataAdapter(ApiAdapter): This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system, transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages """ + _controller_cls = OdinDataController def __init__(self, **kwargs): """ @@ -57,12 +58,9 @@ def __init__(self, **kwargs): # Setup the time between client update requests self._update_interval = float(self.options.get("update_interval", 0.5)) - # Check for a Frame Processor flag - self._frame_processor_flag = bool(self.options.get("frame_processor", False)) - # Create the Frame Processor Controller object - self._controller = OdinDataController( - self.name, self._endpoint_arg, self._update_interval, self._frame_processor_flag + self._controller = self._controller_cls( + self.name, self._endpoint_arg, self._update_interval ) @request_types("application/json", "application/vnd.odin-native") @@ -85,7 +83,7 @@ def get(self, path, request): # logging.error("{}".format(response)) except ParameterTreeError as param_error: response = { - "response": "OdinDatatAdapter GET error: {}".format(param_error) + "response": f"{type(self).__name__} GET error: {param_error}" } status_code = 400 @@ -112,7 +110,7 @@ def put(self, path, request): # pylint: disable=W0613 self._controller.put(path, json_decode(request.body)) except ParameterTreeError as param_error: response = { - "response": "OdinDatatAdapter GET error: {}".format(param_error) + "response": f"{type(self).__name__} PUT error: {param_error}" } status_code = 400 diff --git a/python/src/odin_data/control/odin_data_controller.py b/python/src/odin_data/control/odin_data_controller.py index 223eff53..ee2db83d 100644 --- a/python/src/odin_data/control/odin_data_controller.py +++ b/python/src/odin_data/control/odin_data_controller.py @@ -15,7 +15,7 @@ class OdinDataController(object): - def __init__(self, name, endpoints, update_interval=0.5, frame_processor=False): + def __init__(self, name, endpoints, update_interval=0.5): self._clients = [] self._client_connections = [] self._update_interval = update_interval @@ -23,9 +23,7 @@ def __init__(self, name, endpoints, update_interval=0.5, frame_processor=False): self._api = 0.1 self._error = "" self._endpoints = [] - self._frame_processor = frame_processor self._config_cache = None - self._first_update = False for arg in endpoints.split(","): arg = arg.strip() @@ -38,6 +36,19 @@ def __init__(self, name, endpoints, update_interval=0.5, frame_processor=False): self._clients.append(IpcTornadoClient(ep["ip_address"], ep["port"])) self._client_connections.append(False) + # set up controller specific parameters + self.setup_parameter_tree() + + # TODO: Consider renaming this + self._params = ParameterTree(self._tree, mutable=True) + + # Create the status loop handling thread + self._status_running = True + self._status_lock = threading.Lock() + self._status_thread = threading.Thread(target=self.update_loop) + self._status_thread.start() + + def setup_parameter_tree(self): self._tree = { "api": (lambda: self._api, None, {}), "module": (lambda: self._name, None, {}), @@ -56,29 +67,12 @@ def __init__(self, name, endpoints, update_interval=0.5, frame_processor=False): "config": {}, } - if self._frame_processor: - # Set up the specific FP parameters - self.setup_frame_processor() - - # TODO: Consider renaming this - self._params = ParameterTree(self._tree, mutable=True) - - # Create the status loop handling thread - self._status_running = True - self._status_lock = threading.Lock() - self._status_thread = threading.Thread(target=self.update_loop) - self._status_thread.start() - def merge_external_tree(self, path, tree): # First we need to insert the new parameter tree self._tree[path] = tree # Next, we must re-build the complete parameter tree self._params = ParameterTree(self._tree, mutable=True) - @property - def first_update(self): - return self._first_update - def set_error(self, err): # Record the error message into the status self._error @@ -112,9 +106,6 @@ def update_loop(self): while self._status_running: try: - # If we are a FP controller then we need to track the writing state - writing = [False]*len(self._clients) - # Handle background tasks # Loop over all connected clients and obtain the status for index, client in enumerate(self._clients): @@ -146,34 +137,21 @@ def update_loop(self): # Log the error, but do not stop the update loop logging.error("Unhandled exception: %s", e) + self.handle_client(client, index) if "status" in client.parameters: self._params.replace( f"{index}/status", client.parameters["status"] ) - if self._frame_processor: - if "hdf" in client.parameters["status"]: -# client.parameters["config"]["hdf"]["write"] = client.parameters["status"]["hdf"]["writing"] - writing[index] = client.parameters["status"]["hdf"]["writing"] if "config" in client.parameters: self._params.replace( f"{index}/config", client.parameters["config"] ) -# self._params.set("{}/config/hdf/write".format(index), writing[index]) - self._config_cache = [ self._params.get(f"{idx}/config") for idx, _ in enumerate(self._clients) ] - # Flag that we have made an update - if not self._first_update: - # If we a re a FP controller then init the rank - if self._frame_processor: - self.setup_rank() - self._first_update = True - - if self._frame_processor: - self._write = all(writing) + self.process_updates() except Exception as ex: @@ -181,6 +159,13 @@ def update_loop(self): time.sleep(self._update_interval) + def handle_client(self, client, index): + """Called on each client in the update_loop loop before updating the + parameter tree and caching the config, can be overloaded by + subclasses to implement controller specific logic. + """ + pass + def process_config_changes(self): """Search through the application config trees and compare with the latest cached version. Any changes should be built into a new config @@ -273,128 +258,9 @@ def process_updates(self): def shutdown(self): self._status_running = False - def setup_frame_processor(self): - self._acquisition_id = "" - self._write = False - self._frames = 0 - self._file_path = "" - self._file_prefix = "" - self._file_extension = "h5" - self._tree["config"] = { - "hdf": { - "acquisition_id": ( - self._get("_acquisition_id"), - lambda v: self._set("_acquisition_id", v), - {}, - ), - "frames": ( - self._get("_frames"), - lambda v: self._set("_frames", v), - {}, - ), - "file": { - "path": ( - self._get("_file_path"), - lambda v: self._set("_file_path", v), - {}, - ), - "prefix": ( - self._get("_file_prefix"), - lambda v: self._set("_file_prefix", v), - {}, - ), - "extension": ( - self._get("_file_extension"), - lambda v: self._set("_file_extension", v), - {}, - ), - }, - "write": ( - self._get("_write"), - self.execute_write, - {} - ) - }, - } - - def execute_write(self, value): - # Queue the write command - logging.debug("Executing write command with value: {}".format(value)) - processes = len(self._clients) - - if value: - # Before attempting to write files, make some simple error checks - - # Check if we have a valid buffer status from the FR adapter - - # TODO: Need to check FR buffer status -# valid, reason = self.check_fr_status() -# if not valid: -# raise RuntimeError(reason) - - # Check the file prefix is not empty - if str(self._file_prefix) == '': - raise RuntimeError("File prefix must not be empty") - - # First setup the rank for the frameProcessor applications - self.setup_rank() - - try: - for rank in range(processes): - # Setup the number of processes and the rank for each client - config = { - 'hdf': { - 'frames': self._frames - } - } - logging.info("Sending config to FP odin adapter %i: %s", rank, config) - self._clients[rank].send_configuration(config) - config = { - 'hdf': { - 'acquisition_id': self._acquisition_id, - 'file': { - 'path': str(self._file_path), - 'prefix': str(self._file_prefix), - 'extension': str(self._file_extension) - } - } - } - logging.info("Sending config to FP odin adapter %i: %s", rank, config) - self._clients[rank].send_configuration(config) - except Exception as err: - logging.error("Failed to send information to FP applications") - logging.error("Error: %s", err) - try: - config = {'hdf': {'write': value}} - for rank in range(processes): - logging.info("Sending config to FP odin adapter %i: %s", rank, config) - #self._odin_adapter_fps._controller.put(f"{rank}/config", config) - self._clients[rank].send_configuration(config) - except Exception as err: - logging.error("Failed to send write command to FP applications") - logging.error("Error: %s", err) - def _set(self, attr, val): logging.debug("_set called: {} {}".format(attr, val)) setattr(self, attr, val) def _get(self, attr): return lambda: getattr(self, attr) - - def setup_rank(self): - # Attempt initialisation of the connected clients - processes = len(self._clients) - logging.info( - "Setting up rank information for {} FP processes".format(processes) - ) - rank = 0 - try: - for rank in range(processes): - # Setup the number of processes and the rank for each client - config = {"hdf": {"process": {"number": processes, "rank": rank}}} - logging.debug("Sending config to FP odin adapter %i: %s", rank, config) - self._clients[rank].send_configuration(config) - - except Exception as err: - logging.debug("Failed to send rank information to FP applications") - logging.error("Error: %s", err)