From 043f63d7c9ed0cda2dea74c917341ccda54881f3 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Thu, 14 Nov 2024 21:12:21 +0100 Subject: [PATCH 1/8] Adding information to ps tables --- .../children_interface/child_node.py | 38 +++++++++++++++- .../children_interface/grpc_child.py | 6 +-- .../children_interface/rest_api_child.py | 28 +++++++----- src/drunc/controller/controller.py | 44 +++++++++---------- src/drunc/controller/controller_driver.py | 20 ++++----- src/drunc/controller/interface/commands.py | 7 ++- src/drunc/controller/interface/shell_utils.py | 28 +++++++----- src/drunc/controller/utils.py | 13 ++++++ src/drunc/utils/shell_utils.py | 2 +- 9 files changed, 123 insertions(+), 63 deletions(-) diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index f2d4890c..9d91bdea 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -1,6 +1,10 @@ import abc from drunc.exceptions import DruncSetupException from drunc.utils.utils import ControlType, get_control_type_and_uri_from_connectivity_service, get_control_type_and_uri_from_cli +from drunc.utils.grpc_utils import pack_to_any +from druncschema.token_pb2 import Token +from druncschema.request_response_pb2 import Response, ResponseFlag, Description +import os class ChildInterfaceTechnologyUnknown(DruncSetupException): def __init__(self, t, name): @@ -8,13 +12,14 @@ def __init__(self, t, name): class ChildNode(abc.ABC): - def __init__(self, name:str, node_type:ControlType, **kwargs) -> None: + def __init__(self, name:str, configuration, node_type:ControlType, **kwargs) -> None: super().__init__(**kwargs) self.node_type = node_type import logging self.log = logging.getLogger(f"{name}-child-node") self.name = name + self.configuration = configuration @abc.abstractmethod def __str__(self): @@ -39,6 +44,37 @@ def get_status(self, token): def get_endpoint(self): pass + def describe(self, token:Token) -> Response: + descriptionType = None + descriptionName = None + + if hasattr(self.configuration.data, "application_name"): # Get the application name and type + descriptionType = self.configuration.data.application_name + descriptionName = self.configuration.data.id + elif hasattr(self.configuration.data, "controller") and hasattr(self.configuration.data.controller, "application_name"): # Get the controller name and type + descriptionType = self.configuration.data.controller.application_name + descriptionName = self.configuration.data.controller.id + + from drunc.controller.utils import get_detector_name + d = Description( + type = descriptionType, + name = descriptionName, + endpoint = self.get_endpoint(), + info = get_detector_name(self.configuration), + session = os.getenv("DUNEDAQ_SESSION"), + commands = None, + broadcast = None, + ) + + resp = Response( + name = self.name, + token = token, + data = pack_to_any(d), + flag = ResponseFlag.EXECUTED_SUCCESSFULLY, + children = None + ) + return resp + @staticmethod def get_child(name:str, cli, configuration, init_token=None, connectivity_service=None, **kwargs): diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index f263e13c..0b7ec58d 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -19,12 +19,13 @@ class gRPCChildNode(ChildNode): def __init__(self, name, configuration:gRCPChildConfHandler, init_token, uri): super().__init__( name = name, - node_type = ControlType.gRPC + node_type = ControlType.gRPC, + configuration = configuration ) from logging import getLogger self.log = getLogger(f'{self.name}-grpc-child') - self.configuration = configuration + host, port = uri.split(":") port = int(port) @@ -110,7 +111,6 @@ def terminate(self): pass def propagate_command(self, command, data, token) -> Response: - return send_command( controller = self.controller, token = token, diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index d146e03a..d8f91353 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -1,9 +1,13 @@ from drunc.controller.children_interface.child_node import ChildNode from drunc.utils.utils import ControlType -from druncschema.request_response_pb2 import Response +from drunc.utils.grpc_utils import pack_to_any +from druncschema.generic_pb2 import PlainText, Stacktrace +from druncschema.request_response_pb2 import Response, ResponseFlag, Description +from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag from druncschema.token_pb2 import Token import threading from typing import NoReturn +import os class ResponseDispatcher(threading.Thread): @@ -394,7 +398,8 @@ class RESTAPIChildNode(ChildNode): def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configuration:FSMConfHandler, uri): super(RESTAPIChildNode, self).__init__( name = name, - node_type = ControlType.REST_API + node_type = ControlType.REST_API, + configuration = configuration ) from logging import getLogger @@ -402,6 +407,8 @@ def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configur self.response_listener = ResponseListener.get() + self.fsm_configuration = fsm_configuration + import socket response_listener_host = socket.gethostname() @@ -412,7 +419,7 @@ def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configur from drunc.exceptions import DruncSetupException raise DruncSetupException(f"Application {name} does not expose a control service in the configuration, or has not advertised itself to the application registry service, or the application registry service is not reachable.") - proxy_host, proxy_port = getattr(configuration.data, "proxy", [None, None]) + proxy_host, proxy_port = getattr(self.configuration.data, "proxy", [None, None]) proxy_port = int(proxy_port) if proxy_port is not None else None self.commander = AppCommander( @@ -464,10 +471,6 @@ def get_status(self, token): ) def propagate_command(self, command:str, data, token:Token) -> Response: - from druncschema.request_response_pb2 import ResponseFlag - from druncschema.generic_pb2 import PlainText, Stacktrace - from drunc.utils.grpc_utils import pack_to_any - if command == 'exclude': self.state.exclude() return Response( @@ -495,8 +498,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: children = [] ) - from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag - if self.state.excluded(): return Response( name = self.name, @@ -513,7 +514,11 @@ def propagate_command(self, command:str, data, token:Token) -> Response: ) # here lies the mother of all the problems - if command != 'execute_fsm_command': + if command == 'execute_fsm_command': + return self.propagate_fsm_command(command, data, token) + elif command == 'describe': + return self.describe(token) + else: self.log.info(f'Ignoring command \'{command}\' sent to \'{self.name}\'') return Response( name = self.name, @@ -523,6 +528,7 @@ def propagate_command(self, command:str, data, token:Token) -> Response: children = [] ) + def propagate_fsm_command(self, command:str, data, token:Token) -> Response: from drunc.exceptions import DruncException entry_state = self.state.get_operational_state() transition = self.fsm.get_transition(data.command_name) @@ -531,7 +537,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: import json self.log.info(f'Sending \'{data.command_name}\' to \'{self.name}\'') - try: self.commander.send_command( cmd_id = data.command_name, @@ -557,7 +562,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: command_name = data.command_name, data = response_data ) - from drunc.utils.grpc_utils import pack_to_any response = Response( name = self.name, token = token, diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 16fb2a66..8f358a2c 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1,25 +1,24 @@ -from druncschema.request_response_pb2 import Request, Response, ResponseFlag -from druncschema.token_pb2 import Token -from druncschema.generic_pb2 import PlainText, PlainTextVector +from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.controller_pb2_grpc import ControllerServicer -from druncschema.controller_pb2 import Status -from druncschema.controller_pb2 import FSMCommand, FSMCommandResponse, FSMResponseFlag +from druncschema.controller_pb2 import Status, FSMCommand, FSMCommandResponse, FSMResponseFlag +from druncschema.generic_pb2 import PlainText, PlainTextVector +from druncschema.request_response_pb2 import Request, Response, ResponseFlag +from druncschema.token_pb2 import Token -from drunc.controller.children_interface.child_node import ChildNode -from drunc.controller.stateful_node import StatefulNode +from drunc.authoriser.decorators import authentified_and_authorised from drunc.broadcast.server.broadcast_sender import BroadcastSender -import drunc.controller.exceptions as ctler_excpt -from drunc.utils.grpc_utils import pack_to_any -from typing import Optional, List from drunc.broadcast.server.decorators import broadcasted -from drunc.utils.grpc_utils import unpack_request_data_to, pack_response -from drunc.authoriser.decorators import authentified_and_authorised -from druncschema.authoriser_pb2 import ActionType, SystemType +from drunc.controller.children_interface.child_node import ChildNode from drunc.controller.decorators import in_control +import drunc.controller.exceptions as ctler_excpt +from drunc.controller.stateful_node import StatefulNode from drunc.exceptions import DruncException +from drunc.utils.grpc_utils import pack_to_any +from drunc.utils.grpc_utils import unpack_request_data_to, pack_response import signal +from typing import Optional, List class ControllerActor: def __init__(self, token:Optional[Token]=None): @@ -168,10 +167,10 @@ def __init__(self, configuration, name:str, session:str, token:Token): name = 'describe_fsm', data_type = ['generic_pb2.PlainText', 'None'], help = '''Return a description of the FSM transitions: -if a transition name is provided in its input, return that transition description; -if a state is provided, return the transitions accessible from that state; -if "all-transitions" is provided, return all the transitions; -if nothing (None) is provided, return the transitions accessible from the current state.''', + if a transition name is provided in its input, return that transition description; + if a state is provided, return the transitions accessible from that state; + if "all-transitions" is provided, return all the transitions; + if nothing (None) is provided, return the transitions accessible from the current state.''', return_type = 'request_response_pb2.Description' ), @@ -435,7 +434,7 @@ def propagate_to_child(child, command, command_data, token, response_lock, respo ) # 2nd step @unpack_request_data_to(None, pass_token=True) # 3rd step - def status(self, token:Token) -> Response: + def status(self, token:Token) -> Status: from drunc.controller.utils import get_status_message status = get_status_message(self.stateful_node) return Response ( @@ -457,21 +456,22 @@ def status(self, token:Token) -> Response: def describe(self, token:Token) -> Response: from druncschema.request_response_pb2 import Description from drunc.utils.grpc_utils import pack_to_any + from drunc.controller.utils import get_detector_name bd = self.describe_broadcast() d = Description( - # endpoint = self.uri, type = 'controller', name = self.name, + endpoint = self.uri, + info = get_detector_name(self.configuration), session = self.session, commands = self.commands, - # children_endpoints = [child.get_endpoint() for child in self.children_nodes], ) if bd: d.broadcast.CopyFrom(pack_to_any(bd)) - response_children = self.propagate_to_list( + children_description = self.propagate_to_list( 'describe', command_data = None, token = token, @@ -483,7 +483,7 @@ def describe(self, token:Token) -> Response: token = None, data = pack_to_any(d), flag = ResponseFlag.EXECUTED_SUCCESSFULLY, - children = response_children, + children = children_description, ) # ORDER MATTERS! diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index b8fdc0a6..26fbb3d5 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -3,7 +3,7 @@ from druncschema.controller_pb2 import Status from drunc.utils.grpc_utils import unpack_any -from drunc.utils.shell_utils import GRPCDriver +from drunc.utils.shell_utils import GRPCDriver, DecodedResponse class ControllerDriver(GRPCDriver): @@ -19,35 +19,35 @@ def create_stub(self, channel): from druncschema.controller_pb2_grpc import ControllerStub return ControllerStub(channel) - def describe(self) -> Description: + def describe(self) -> DecodedResponse: return self.send_command('describe', outformat = Description) - def describe_fsm(self, key:str=None) -> Description: # key can be: a state name, a transition name, none to get the currently accessible transitions, or all-transition for all the transitions + def describe_fsm(self, key:str=None) -> DecodedResponse: # key can be: a state name, a transition name, none to get the currently accessible transitions, or all-transition for all the transitions from druncschema.controller_pb2 import FSMCommandsDescription input = PlainText(text = key) return self.send_command('describe_fsm', data = input, outformat = FSMCommandsDescription) - def status(self) -> Description: + def status(self) -> DecodedResponse: return self.send_command('status', outformat = Status) - def take_control(self) -> Description: + def take_control(self) -> DecodedResponse: return self.send_command('take_control', outformat = PlainText) - def who_is_in_charge(self, rethrow=None) -> Description: + def who_is_in_charge(self, rethrow=None) -> DecodedResponse: return self.send_command('who_is_in_charge', outformat = PlainText) - def surrender_control(self) -> Description: + def surrender_control(self) -> DecodedResponse: return self.send_command('surrender_control') - def execute_fsm_command(self, arguments) -> Description: + def execute_fsm_command(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('execute_fsm_command', data = arguments, outformat = FSMCommandResponse) - def include(self, arguments) -> Description: + def include(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('include', data = arguments, outformat = PlainText) - def exclude(self, arguments) -> Description: + def exclude(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('exclude', data = arguments, outformat = PlainText) diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index 3ced89d9..b6bec5a3 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -39,10 +39,12 @@ def wait(obj:ControllerContext, sleep_time:int) -> None: @click.command('status') @click.pass_obj def status(obj:ControllerContext) -> None: + # Get the dynamic system information statuses = obj.get_driver('controller').status() - + # Get the static system information + descriptions = obj.get_driver('controller').describe() from drunc.controller.interface.shell_utils import print_status_table - print_status_table(obj,statuses) + print_status_table(obj, statuses, descriptions) @click.command('connect') @click.argument('controller_address', type=str) @@ -240,6 +242,7 @@ def send_FSM_command(obj:ControllerContext, command:FSMCommand) -> FSMCommandRes else: obj.print(f"[green]{command.command_name}[/green] executed successfully.") print_execution_report(command.command_name, result) + obj.print_status_summary() # This prints the table, unsure of why. RETURNTOME if not result: return return result diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 3ada1ec0..9e80271b 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -1,11 +1,10 @@ from rich import print from druncschema.controller_pb2 import FSMCommandsDescription - +from drunc.utils.shell_utils import DecodedResponse import logging log = logging.getLogger('controller_shell_utils') - -def print_status_table(obj, statuses): +def print_status_table(obj, statuses:DecodedResponse, descriptions:DecodedResponse): from druncschema.controller_pb2 import Status if not statuses: return @@ -20,22 +19,26 @@ def print_status_table(obj, statuses): t = Table(title=f'Status') t.add_column('Name') + t.add_column('Endpoint') + t.add_column('Info') t.add_column('State') t.add_column('Substate') - t.add_column('In error', justify='center') - t.add_column('Included', justify='center') + t.add_column('In error') + t.add_column('Included') - def add_status_to_table(status, table, prefix): + def add_status_to_table(table, status, description, prefix): table.add_row( prefix+status.name, + description.data.endpoint, + description.data.info, status.data.state, status.data.sub_state, format_bool(status.data.in_error, false_is_good = True), format_bool(status.data.included), ) - for child in status.children: - add_status_to_table(child, table, prefix=prefix+' ') - add_status_to_table(statuses, t, prefix='') + for child_status, child_description in zip(status.children, description.children): + add_status_to_table(t, child_status, child_description, prefix=prefix+' ') + add_status_to_table(t, statuses, descriptions, prefix='') obj.print(t) obj.print_status_summary() @@ -292,11 +295,11 @@ def run_one_fsm_command(controller_name, transition_name, obj, **kwargs): obj.print(f"Running transition \'{transition_name}\' on controller \'{controller_name}\'") from druncschema.controller_pb2 import FSMCommand - description = obj.get_driver('controller').describe_fsm().data + fsm_description = obj.get_driver('controller').describe_fsm().data from drunc.controller.interface.shell_utils import search_fsm_command, validate_and_format_fsm_arguments, ArgumentException - command_desc = search_fsm_command(transition_name, description.commands) + command_desc = search_fsm_command(transition_name, fsm_description.commands) if command_desc is None: obj.error(f'Command "{transition_name}" does not exist, or is not accessible right now') @@ -349,9 +352,10 @@ def add_to_table(table, response, prefix=''): obj.print(t) statuses = obj.get_driver('controller').status() + descriptions = obj.get_driver('controller').describe() from drunc.controller.interface.shell_utils import print_status_table - print_status_table(obj, statuses) + print_status_table(obj, statuses, descriptions) from druncschema.controller_pb2 import FSMCommandDescription diff --git a/src/drunc/controller/utils.py b/src/drunc/controller/utils.py index 605abd1c..ebd624c8 100644 --- a/src/drunc/controller/utils.py +++ b/src/drunc/controller/utils.py @@ -3,6 +3,9 @@ from druncschema.token_pb2 import Token from drunc.utils.grpc_utils import pack_to_any +import logging +log = logging.getLogger('controller_utils') + def get_status_message(stateful:StatefulNode): from druncschema.controller_pb2 import Status state_string = stateful.get_node_operational_state() @@ -16,6 +19,16 @@ def get_status_message(stateful:StatefulNode): included = stateful.node_is_included(), ) +def get_detector_name(configuration) -> str: + detector_name = None + if hasattr(configuration.data, "contains") and len(configuration.data.contains) > 0: + if len(configuration.data.contains) > 0: + log.debug(f"Application {configuration.data.id} has multiple contains, using the first one") + detector_name = configuration.data.contains[0].id.replace("-", "_").replace("_", " ") + else: + log.debug(f"Application {configuration.data.id} has no \"contains\" relation, hence no detector") + return detector_name + def send_command(controller, token, command:str, data=None, rethrow=False): import grpc from google.protobuf import any_pb2 diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 35d03022..1b298c99 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -306,7 +306,7 @@ def print_status_summary(self) -> None: if status.in_error: self.print(f"[red] FSM is in error ({status})[/red], not currently accepting new commands.") else: - available_actions = [command.name for command in self.get_driver('controller').describe_fsm().data.commands] + available_actions = [command.name.replace("_", "-") for command in self.get_driver('controller').describe_fsm().data.commands] self.print(f"Current FSM status is [green]{status.state}[/green]. Available transitions are [green]{'[/green], [green]'.join(available_actions)}[/green].") From c7a75942b6526deaab931c524d3243e9402069fc Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 15:25:02 +0100 Subject: [PATCH 2/8] Correcting incorrect return type --- src/drunc/controller/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 8f358a2c..2d9edda0 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -434,7 +434,7 @@ def propagate_to_child(child, command, command_data, token, response_lock, respo ) # 2nd step @unpack_request_data_to(None, pass_token=True) # 3rd step - def status(self, token:Token) -> Status: + def status(self, token:Token) -> Response: from drunc.controller.utils import get_status_message status = get_status_message(self.stateful_node) return Response ( From 53372b2491fe6eeef8939ce4fef7b633e4abbc46 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 15:28:30 +0100 Subject: [PATCH 3/8] Reordering the status table --- src/drunc/controller/interface/shell_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 9e80271b..694bc804 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -19,22 +19,22 @@ def print_status_table(obj, statuses:DecodedResponse, descriptions:DecodedRespon t = Table(title=f'Status') t.add_column('Name') - t.add_column('Endpoint') t.add_column('Info') t.add_column('State') t.add_column('Substate') t.add_column('In error') t.add_column('Included') + t.add_column('Endpoint') def add_status_to_table(table, status, description, prefix): table.add_row( prefix+status.name, - description.data.endpoint, description.data.info, status.data.state, status.data.sub_state, format_bool(status.data.in_error, false_is_good = True), format_bool(status.data.included), + description.data.endpoint ) for child_status, child_description in zip(status.children, description.children): add_status_to_table(t, child_status, child_description, prefix=prefix+' ') From a829a66d9dccc5e70df6b86ef8522058810a2ff1 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 16:34:10 +0100 Subject: [PATCH 4/8] Matching the children functions using an appropriate function --- src/drunc/controller/interface/shell_utils.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 694bc804..2c964fce 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -2,8 +2,25 @@ from druncschema.controller_pb2 import FSMCommandsDescription from drunc.utils.shell_utils import DecodedResponse import logging +import inspect log = logging.getLogger('controller_shell_utils') +def match_children(statuses:list, descriptions:list) -> list: + children = [] + for status in statuses: + child = {} + child_name = status.name + for description in descriptions: + if description.name == child_name: + child["status"] = status + child["description"] = description + children.append(child) + break + if len(descriptions) != len(children): + from drunc.controller.exceptions import MalformedCommand + raise MalformedCommand(f"Command {inspect.currentframe().f_code.co_name} has assigned the incorrect number of children!") + return children + def print_status_table(obj, statuses:DecodedResponse, descriptions:DecodedResponse): from druncschema.controller_pb2 import Status if not statuses: return @@ -36,8 +53,8 @@ def add_status_to_table(table, status, description, prefix): format_bool(status.data.included), description.data.endpoint ) - for child_status, child_description in zip(status.children, description.children): - add_status_to_table(t, child_status, child_description, prefix=prefix+' ') + for child in match_children(status.children, description.children): + add_status_to_table(t, child["status"], child["description"], prefix=prefix+' ') add_status_to_table(t, statuses, descriptions, prefix='') obj.print(t) obj.print_status_summary() From b49ec73d5815095f46de988508ee8afb390e1def Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 17:08:03 +0100 Subject: [PATCH 5/8] Adding type check --- src/drunc/controller/interface/shell_utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 2c964fce..94ce796d 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -1,17 +1,25 @@ from rich import print from druncschema.controller_pb2 import FSMCommandsDescription +from druncschema.request_response_pb2 import Description from drunc.utils.shell_utils import DecodedResponse import logging import inspect log = logging.getLogger('controller_shell_utils') def match_children(statuses:list, descriptions:list) -> list: + def check_message_type(message:Description, expected_type:str) -> None: + if message.data.DESCRIPTOR.name != expected_type: + raise TypeError("Message {message.name} is not of type 'Description'!") + return + children = [] for status in statuses: + check_message_type(status, "Status") child = {} child_name = status.name for description in descriptions: if description.name == child_name: + check_message_type(description, "Description") child["status"] = status child["description"] = description children.append(child) From e9548e604f3835e04c7a065b9db55bc9ede0dab9 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 18:13:23 +0100 Subject: [PATCH 6/8] Removing redundant fsm code --- .../client/grpc_stdout_broadcast_handler.py | 4 +- src/drunc/broadcast/server/grpc_servicer.py | 3 +- src/drunc/controller/interface/commands.py | 186 ------------------ src/drunc/process_manager/process_manager.py | 2 +- src/drunc/unified_shell/shell.py | 2 +- 5 files changed, 6 insertions(+), 191 deletions(-) diff --git a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py index eabf1526..6079c920 100644 --- a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py +++ b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py @@ -3,10 +3,10 @@ from druncschema.generic_pb2 import Empty import grpc from drunc.utils.configuration import ConfTypes - +from drunc.controller.configuration import ControllerConfHandler class gRPCStdoutBroadcastHandler(BroadcastReceiverServicer): - def __init__(self, conf:ConfData, token, **kwargs) -> None: + def __init__(self, conf:ControllerConfHandler, token, **kwargs) -> None: super(gRPCStdoutBroadcastHandler, self).__init__( **kwargs ) diff --git a/src/drunc/broadcast/server/grpc_servicer.py b/src/drunc/broadcast/server/grpc_servicer.py index 12ebf98b..ecfcac34 100644 --- a/src/drunc/broadcast/server/grpc_servicer.py +++ b/src/drunc/broadcast/server/grpc_servicer.py @@ -9,6 +9,7 @@ from druncschema.authoriser_pb2 import ActionType from drunc.utils.grpc_utils import unpack_any from google.protobuf.any_pb2 import Any +import drunc.controller.exceptions as ctler_excpt class ListenerRepresentation: @@ -247,7 +248,7 @@ def serve(port:int) -> None: receiver_threads.append(server_thread) except: pass - + from drunc.broadcast.server.broadcast_sender import BroadcastSender broadcaster = BroadcastSender() for port in port_list: broadcaster.add_listener(f'[::]:{port}') diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index 3ced89d9..1d83e999 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -81,192 +81,6 @@ def who_is_in_charge(obj:ControllerContext) -> None: if who: obj.print(who.text) - -@click.command('fsm') -@click.argument('fsm_command', type=str, nargs=-1) -@click.pass_obj -def fsm(obj:ControllerContext, fsm_command:str) -> None: - from drunc.controller.interface.shell_utils import format_bool, tree_prefix, search_fsm_command, validate_and_format_fsm_arguments, ArgumentException - from drunc.utils.grpc_utils import unpack_any - from druncschema.controller_pb2 import FSMResponseFlag, FSMCommandResponse, FSMCommand, FSMCommandDescription - from druncschema.request_response_pb2 import ResponseFlag - from rich.table import Table - - def split_FSM_args(obj:ControllerContext, passed_commands:tuple): - # Note this is a placeholder - want to get this from OKS. - available_commands = ["conf", "start", "enable_triggers", "disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap", "start_run", "stop_run", "shutdown"] - available_command_mandatory_args = [[], ["run_number"], [], [], [], [], [], [], ["run_number"], [], []] - available_sequences = ["start_run", "stop_run", "shutdown"] - available_sequence_commands = [["conf", "start", "enable_triggers"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap"]] - available_command_opt_args = [] - available_args = ["run_number"] - - # Get the index of all the commands in the command str - command_list = [command for command in passed_commands if command in available_commands] - if len(command_list) == 0: - if len(passed_commands) == 1: - obj.print(f"The passed command [red]{' '.join(passed_commands)}[/red] was not understood.") - else: - obj.print(f"None of the passed arguments were correctly identified.") - raise SystemExit(1) - - # Get the arguments for each command - command_index = [passed_commands.index(command) for command in command_list] - command_argument_list = [] - for i in range(len(command_index)-1): - command_argument_list.append(list(passed_commands[command_index[i]+1:command_index[i + 1]])) - command_argument_list.append(list(passed_commands[command_index[-1]+1:])) - - # Not elegant, would be better to check at the command level first but it does work - for argument_list in command_argument_list: - argument_names = argument_list[::2] - for argument in argument_names: - # Check if the argument is legal - if argument not in available_args: - from drunc.controller.exceptions import MalformedCommandArgument - raise MalformedCommandArgument(f"Argument '{argument}' not recognised as a valid argument.") - # Check for duplicates - if argument_names.count(argument) != 1: - from drunc.controller.exceptions import MalformedCommand - raise MalformedCommand(f"Argument '{argument}' has been repeated.") - - # Check the mandatory arguments - for command in command_list: - mandatory_command_arguments = available_command_mandatory_args[available_commands.index(command)] - provided_command_arguments = command_argument_list[command_list.index(command)] - if mandatory_command_arguments == []: - continue - for argument in mandatory_command_arguments: - if argument not in provided_command_arguments: - missing_command_arguments = list(set(mandatory_command_arguments) - set(provided_command_arguments)) - obj.print(f"There are missing arguments for command [green]{command}[/green]. Missing mandatory argument(s): [red]{' '.join(missing_command_arguments)}[/red].") - raise SystemExit(1) - - # Extract commands from sequences - for command in command_list: - if command not in available_sequences: - continue - # Define the sequence command parameters - passed_sequence_command_index = command_list.index(command) - passed_sequence_command_args = list(command_argument_list[passed_sequence_command_index]) - sequence_commands = available_sequence_commands[available_sequences.index(command)] - # Replace the sequence command with the correct fsm commands - del command_list[passed_sequence_command_index] - command_list[passed_sequence_command_index:passed_sequence_command_index] = sequence_commands - # Replace the sequence command arguments. Duplicates the sequence arguments for all FSM commands - del command_argument_list[passed_sequence_command_index] - for _ in range(len(sequence_commands)): - command_argument_list.insert(passed_sequence_command_index, passed_sequence_command_args) - - return command_list, command_argument_list - - - def dict_arguments(arguments:str) -> dict: - if len(arguments) % 2 != 0: - raise click.BadParameter('Arguments are pairs of key-value!') - keys = arguments[::2] - values = arguments[1::2] - arguments_dict = {keys[i]:values[i] for i in range(len(keys))} - return arguments_dict - - def bool_to_success(flag_message, FSM): - flag = False - if FSM and flag_message == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY: - flag = True - if not FSM and flag_message == ResponseFlag.EXECUTED_SUCCESSFULLY: - flag = True - return "success" if flag else "failed" - - def add_to_table(table, response, prefix=''): - table.add_row( - prefix+response.name, - bool_to_success(response.flag, FSM=False), - bool_to_success(response.data.flag, FSM=True) if response.flag == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY else "failed", - ) - for child_response in response.children: - add_to_table(table, child_response, " "+prefix) - - def print_execution_report(command:str, result:FSMCommandResponse) -> None: - t = Table(title=f'{command} execution report') - t.add_column('Name') - t.add_column('Command execution') - t.add_column('FSM transition') - - add_to_table(t, result) - obj.print(t) - obj.print("") # For formatting - return - - def filter_arguments(arguments:dict, fsm_command:FSMCommandDescription) -> dict: - if not arguments: - return None - cmd_arguments = {} - command_arguments = fsm_command.arguments - cmd_argument_names = [argument.name for argument in command_arguments] - for argument in list(arguments): - if argument in cmd_argument_names: - cmd_arguments[argument] = arguments[argument] - return cmd_arguments - - def construct_FSM_command(obj:ControllerContext, command:tuple[str, list]) -> FSMCommand: - command_name = command[0] - command_args = dict_arguments(command[1]) - command_desc = search_fsm_command(command_name, obj.get_driver('controller').describe_fsm().data.commands) # FSMCommandDescription - if command_desc == None: - return None - - # Apply the appropriate arguments for this command - arguments = filter_arguments(command_args, command_desc) - # Construct the FSMCommand - from druncschema.controller_pb2 import FSMCommand - cmd = FSMCommand( - command_name = command_name, - arguments = validate_and_format_fsm_arguments(arguments, command_desc.arguments) - ) - return cmd - - def send_FSM_command(obj:ControllerContext, command:FSMCommand) -> FSMCommandResponse: - try: - result = obj.get_driver('controller').execute_fsm_command(command) - except Exception as e: # TODO narrow this exception down - obj.print(f"[red]{command.command_name}[/red] failed.") - raise e - print(f"Response flag: {result.flag=}") - if result.flag == FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR: - self.stateful_node.to_error() - obj.print(f"[red]{command.command_name}[/red] failed, there is a node in error.") - elif result.flag != FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY: - obj.print(f"[red]{command.command_name}[/red] failed.") - else: - obj.print(f"[green]{command.command_name}[/green] executed successfully.") - print_execution_report(command.command_name, result) - if not result: return - return result - - # When FSM batch mode gets merged this will need updating - if obj.get_driver('controller').get_status().data.in_error: - obj.print(f"[red]{command}[/red] not sent - node is in error.") - return - - # Split command into a list of commands and a list of arguments - command_list, argument_list = split_FSM_args(obj, fsm_command) - - # Execute the FSM commands - result = None - for command in zip(command_list, argument_list): - grpc_command = construct_FSM_command(obj, command) - if grpc_command == None: - obj.print(f"[red]{command[0]}[/red] is not possible in current state, not executing.") - continue - obj.print(f"Sending [green]{command[0]}[/green].") - result = send_FSM_command(obj, grpc_command) - if (result == None): - obj.print(f"Transition {FSMtransition} did not execute. Check logs for more info.") - break - - obj.print_status_summary() - return - @click.command('include') @click.pass_obj def include(obj:ControllerContext) -> None: diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 5febe28c..1068ac4b 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -429,7 +429,7 @@ async def logs(self, lr:LogRequest) -> Response: yield Response( name = self.name, token = None, - data = pack_to_any(resp), + data = None, flag = ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, children = [], ) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a7a760ff..0a69b22c 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -186,7 +186,7 @@ def cleanup(): from drunc.controller.interface.commands import ( - status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, fsm, include, exclude, wait + status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, include, exclude, wait ) ctx.command.add_command(status, 'status') From 16dc1363b58e8cbc591e2a30f0bc20746b6d3bf2 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 18:45:58 +0100 Subject: [PATCH 7/8] Removing grpc_stdout_broadcast_handler --- src/drunc/broadcast/client/broadcast_handler.py | 8 -------- .../broadcast/client/grpc_stdout_broadcast_handler.py | 2 +- src/drunc/broadcast/types.py | 3 +-- src/drunc/controller/configuration.py | 3 ++- 4 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/drunc/broadcast/client/broadcast_handler.py b/src/drunc/broadcast/client/broadcast_handler.py index 4d1f806a..6b5940f3 100644 --- a/src/drunc/broadcast/client/broadcast_handler.py +++ b/src/drunc/broadcast/client/broadcast_handler.py @@ -24,14 +24,6 @@ def __init__(self, broadcast_configuration:BroadcastClientConfHandler): message_format = BroadcastMessage, conf = self.configuration ) - case BroadcastTypes.gRPC: - raise BroadcastTypeNotHandled("gRPC is not available for broadcasting!") - from drunc.broadcast.client.grpc_stdout_broadcast_handler import gRPCStdoutBroadcastHandler - from druncschema.broadcast_pb2 import BroadcastMessage - self.implementation = gRPCStdoutBroadcastHandler( - conf = broadcast_configuration, - message_format = BroadcastMessage, - ) case _: self.log.info('Could not understand the BroadcastHandler technology you want to use, you will get no broadcast!') diff --git a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py index 6079c920..81149230 100644 --- a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py +++ b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py @@ -17,7 +17,7 @@ def __init__(self, conf:ControllerConfHandler, token, **kwargs) -> None: self.token = token from logging import getLogger self._log = getLogger("BroadcastReceiver") - self._address = f'[::]:{port}' + self._address = None # f'[::]:{port}' self._log.debug('Broadcast receiver initialised') def stop_receiving(self)->None: diff --git a/src/drunc/broadcast/types.py b/src/drunc/broadcast/types.py index 3d9ef8db..d3685e79 100644 --- a/src/drunc/broadcast/types.py +++ b/src/drunc/broadcast/types.py @@ -4,8 +4,7 @@ class BroadcastTypes(Enum): Unknown = 0 - gRPC = 1 - Kafka = 2 + Kafka = 1 ERS = 2 diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index 2f320e00..f90e6704 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -31,7 +31,8 @@ def _grab_segment_conf_from_controller(self, configuration): self.session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) this_segment = ControllerConfHandler.find_segment(self.session.segment, self.oks_key.obj_uid) if this_segment is None: - CouldNotFindSegment(self.oks_key.obj_uid) + from drunc.exceptions import DruncSetupException + DruncSetupException(f"Could not find segment with oks_key.obj_uid: {self.oks_key.obj_uid}") return this_segment def _post_process_oks(self): From e47d81b01aa3f70d1f27ce35e0730ab752491a87 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 15 Nov 2024 18:56:29 +0100 Subject: [PATCH 8/8] Final ruff change --- src/drunc/broadcast/server/grpc_servicer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/broadcast/server/grpc_servicer.py b/src/drunc/broadcast/server/grpc_servicer.py index ecfcac34..698d0525 100644 --- a/src/drunc/broadcast/server/grpc_servicer.py +++ b/src/drunc/broadcast/server/grpc_servicer.py @@ -118,7 +118,7 @@ def add_to_broadcast_list(self, request:Request, context) -> Response: def remove_from_broadcast_list(self, request:Request, context) -> Response: - r = unpack_any(data, BroadcastRequest) + r = unpack_any(request, BroadcastRequest) if not self.broadcaster.rm_listener(r.broadcast_receiver_address): raise ctler_excpt.ControllerException(f'Failed to remove {r.broadcast_receiver_address} from broadcast list') return PlainText(text = f'Removed {r.broadcast_receiver_address} to broadcast list')