Skip to content

Commit

Permalink
Merge pull request #362 from jsouter/controllers
Browse files Browse the repository at this point in the history
Create FR and FP controllers inheriting from OdinController
  • Loading branch information
GDYendell authored Oct 14, 2024
2 parents 1d19d16 + b0c8822 commit 9b02fe8
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 263 deletions.
2 changes: 1 addition & 1 deletion cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void DummyTCPFrameDecoder::monitor_buffers(void) {}

void DummyTCPFrameDecoder::get_status(const std::string param_prefix,
OdinData::IpcMessage &status_msg) {
status_msg.set_param(param_prefix + "name",
status_msg.set_param(param_prefix + "class",
std::string("DummyTCPFrameDecoder"));
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ void DummyUDPFrameDecoder::get_status(const std::string param_prefix,
OdinData::IpcMessage& status_msg)
{
status_get_count_++;
status_msg.set_param(param_prefix + "name", std::string("DummyUDPFrameDecoder"));
status_msg.set_param(param_prefix + "class", std::string("DummyUDPFrameDecoder"));
status_msg.set_param(param_prefix + "status_get_count", status_get_count_);
status_msg.set_param(param_prefix + "packets_received", packets_received_);
status_msg.set_param(param_prefix + "packets_lost", packets_lost_);
Expand Down
6 changes: 6 additions & 0 deletions python/src/odin_data/control/frame_processor_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from odin_data.control.odin_data_adapter import OdinDataAdapter
from odin_data.control.frame_processor_controller import FrameProcessorController


class FrameProcessorAdapter(OdinDataAdapter):
_controller_cls = FrameProcessorController
148 changes: 148 additions & 0 deletions python/src/odin_data/control/frame_processor_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from odin_data.control.odin_data_controller import OdinDataController
import logging


class FrameProcessorController(OdinDataController):
def __init__(self, name, endpoints, update_interval=0.5):
super().__init__(name, endpoints, update_interval)
# If we are a FP controller then we need to track the writing state
self._first_update = False
self._writing = [False]*len(self._clients)

@property
def first_update(self):
return self._first_update

def setup_parameter_tree(self):
super(FrameProcessorController, self).setup_parameter_tree()
self._acquisition_id = ""
self._write = False
self._frames = 0
self._file_path = ""
self._file_prefix = ""
self._file_extension = "h5"
self._tree["config"] = {
"hdf": {
"acquisition_id": (
self._get("_acquisition_id"),
lambda v: self._set("_acquisition_id", v),
{},
),
"frames": (
self._get("_frames"),
lambda v: self._set("_frames", v),
{},
),
"file": {
"path": (
self._get("_file_path"),
lambda v: self._set("_file_path", v),
{},
),
"prefix": (
self._get("_file_prefix"),
lambda v: self._set("_file_prefix", v),
{},
),
"extension": (
self._get("_file_extension"),
lambda v: self._set("_file_extension", v),
{},
),
},
"write": (
self._get("_write"),
self.execute_write,
{}
)
},
}

def execute_write(self, value):
# Queue the write command
logging.debug("Executing write command with value: {}".format(value))
processes = len(self._clients)

if value:
# Before attempting to write files, make some simple error checks

# Check if we have a valid buffer status from the FR adapter

# valid, reason = self.check_controller_status()
# if not valid:
# raise RuntimeError(reason)

# Check the file prefix is not empty
if str(self._file_prefix) == '':
raise RuntimeError("File prefix must not be empty")

# First setup the rank
self.setup_rank()

try:
for rank in range(processes):
# Setup the number of processes and the rank for each client
config = {
'hdf': {
'frames': self._frames
}
}
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)
config = {
'hdf': {
'acquisition_id': self._acquisition_id,
'file': {
'path': str(self._file_path),
'prefix': str(self._file_prefix),
'extension': str(self._file_extension)
}
}
}
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)
except Exception as err:
logging.error("Failed to send information to FP applications")
logging.error("Error: %s", err)
try:
config = {'hdf': {'write': value}}
for rank in range(processes):
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
#self._odin_adapter_fps._controller.put(f"{rank}/config", config)
self._clients[rank].send_configuration(config)
except Exception as err:
logging.error("Failed to send write command to FP applications")
logging.error("Error: %s", err)

def handle_client(self, client, index):
if "hdf" in client.parameters["status"]:
self._writing[index] = client.parameters["status"]["hdf"]["writing"]
# self._params.set("{}/config/hdf/write".format(index), writing[index])

def setup_rank(self):
# Attempt initialisation of the connected clients
processes = len(self._clients)
logging.info(
"Setting up rank information for {} FP processes".format(processes)
)
rank = 0
try:
for rank in range(processes):
# Setup the number of processes and the rank for each client
config = {"hdf": {"process": {"number": processes, "rank": rank}}}
logging.debug("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)

except Exception as err:
logging.debug("Failed to send rank information to FP applications")
logging.error("Error: %s", err)

def process_updates(self):
if not self._first_update:
self.setup_rank()
self._first_update = True
self._write = all(self._writing)

# def check_controller_status(self):
# TODO: Need to check FR buffer status
# return True, ""
99 changes: 2 additions & 97 deletions python/src/odin_data/control/frame_receiver_adapter.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,6 @@
"""
Created on 2nd August 2018
:author: Alan Greer
"""
import copy
import logging

from odin_data.control.odin_data_adapter import OdinDataAdapter
from odin_data.control.frame_receiver_controller import FrameReceiverController


class FrameReceiverAdapter(OdinDataAdapter):
"""
OdinDataAdapter class
This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system,
transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages
"""
VERSION_CHECK_CONFIG_ITEMS = ['decoder_path', 'decoder_type']

def __init__(self, **kwargs):
"""
Initialise the OdinDataAdapter object
:param kwargs:
"""
logging.debug("FrameReceiverAdapter init called")

super(FrameReceiverAdapter, self).__init__(**kwargs)

self._decoder_config = []
for ep in self._endpoints:
self._decoder_config.append(None)

def require_version_check(self, param):
# If the parameter is in the version check list then request a version update
if param in self.VERSION_CHECK_CONFIG_ITEMS:
return True
return False

def send_to_clients(self, request_command, parameters, client_index=-1):
"""
Intercept the base class send_to_clients method.
Keep a record of any decoder specific configuration items and then if a single decoder config
item is later changed send the full decoder configuration to the Frame Receiver application.
This is necessary as often a decoder config change will result in the complete tear down and re-init
of the Decoder class and so a full and consistent set of decoder config parameters are required.
:param request_command:
:param parameters:
:param client_index:
"""
logging.debug("Original index: {} request_command: {} and parameters: {}".format(client_index, request_command, parameters))
command, parameters = self.uri_params_to_dictionary(request_command, parameters)

decoder_config = None
if command is None:
if 'decoder_config' in parameters:
logging.debug("Found decoder config: {}".format(parameters['decoder_config']))
decoder_config = parameters['decoder_config']
elif command == 'decoder_config':
logging.debug("Found decoder config: {}".format(parameters))
decoder_config = parameters

if decoder_config is not None:
if client_index == -1:
for index in range(len(self._decoder_config)):
if self._decoder_config[index] is None:
self._decoder_config[index] = decoder_config
else:
for item in decoder_config:
self._decoder_config[index][item] = decoder_config[item]
else:
if self._decoder_config[client_index] is None:
self._decoder_config[client_index] = decoder_config
else:
for item in decoder_config:
self._decoder_config[client_index][item] = decoder_config[item]

# Now construct the new full message, inserting the full decoder config back into the parameters
if client_index == -1:
new_param_set = []
# Loop over each decoder config item and send a list of commands
for dc in self._decoder_config:
if command is None:
if 'decoder_config' in parameters and dc is not None:
parameters['decoder_config'] = dc
elif command == 'decoder_config':
parameters = dc
new_param_set.append(copy.deepcopy(parameters))
else:
if command is None:
if 'decoder_config' in parameters and self._decoder_config[client_index] is not None:
parameters['decoder_config'] = self._decoder_config[client_index]
elif command == 'decoder_config':
parameters = self._decoder_config[client_index]
new_param_set = parameters

logging.debug("Updated full command: {} and parameter set: {}".format(command, new_param_set))

return super(FrameReceiverAdapter, self).send_to_clients(command, new_param_set, client_index)
_controller_cls = FrameReceiverController
5 changes: 5 additions & 0 deletions python/src/odin_data/control/frame_receiver_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from odin_data.control.odin_data_controller import OdinDataController


class FrameReceiverController(OdinDataController):
...
12 changes: 5 additions & 7 deletions python/src/odin_data/control/odin_data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OdinDataAdapter(ApiAdapter):
This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system,
transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages
"""
_controller_cls = OdinDataController

def __init__(self, **kwargs):
"""
Expand Down Expand Up @@ -57,12 +58,9 @@ def __init__(self, **kwargs):
# Setup the time between client update requests
self._update_interval = float(self.options.get("update_interval", 0.5))

# Check for a Frame Processor flag
self._frame_processor_flag = bool(self.options.get("frame_processor", False))

# Create the Frame Processor Controller object
self._controller = OdinDataController(
self.name, self._endpoint_arg, self._update_interval, self._frame_processor_flag
self._controller = self._controller_cls(
self.name, self._endpoint_arg, self._update_interval
)

@request_types("application/json", "application/vnd.odin-native")
Expand All @@ -85,7 +83,7 @@ def get(self, path, request):
# logging.error("{}".format(response))
except ParameterTreeError as param_error:
response = {
"response": "OdinDatatAdapter GET error: {}".format(param_error)
"response": f"{type(self).__name__} GET error: {param_error}"
}
status_code = 400

Expand All @@ -112,7 +110,7 @@ def put(self, path, request): # pylint: disable=W0613
self._controller.put(path, json_decode(request.body))
except ParameterTreeError as param_error:
response = {
"response": "OdinDatatAdapter GET error: {}".format(param_error)
"response": f"{type(self).__name__} PUT error: {param_error}"
}
status_code = 400

Expand Down
Loading

0 comments on commit 9b02fe8

Please sign in to comment.