diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index f2d4890c..9d91bdea 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -1,6 +1,10 @@ import abc from drunc.exceptions import DruncSetupException from drunc.utils.utils import ControlType, get_control_type_and_uri_from_connectivity_service, get_control_type_and_uri_from_cli +from drunc.utils.grpc_utils import pack_to_any +from druncschema.token_pb2 import Token +from druncschema.request_response_pb2 import Response, ResponseFlag, Description +import os class ChildInterfaceTechnologyUnknown(DruncSetupException): def __init__(self, t, name): @@ -8,13 +12,14 @@ def __init__(self, t, name): class ChildNode(abc.ABC): - def __init__(self, name:str, node_type:ControlType, **kwargs) -> None: + def __init__(self, name:str, configuration, node_type:ControlType, **kwargs) -> None: super().__init__(**kwargs) self.node_type = node_type import logging self.log = logging.getLogger(f"{name}-child-node") self.name = name + self.configuration = configuration @abc.abstractmethod def __str__(self): @@ -39,6 +44,37 @@ def get_status(self, token): def get_endpoint(self): pass + def describe(self, token:Token) -> Response: + descriptionType = None + descriptionName = None + + if hasattr(self.configuration.data, "application_name"): # Get the application name and type + descriptionType = self.configuration.data.application_name + descriptionName = self.configuration.data.id + elif hasattr(self.configuration.data, "controller") and hasattr(self.configuration.data.controller, "application_name"): # Get the controller name and type + descriptionType = self.configuration.data.controller.application_name + descriptionName = self.configuration.data.controller.id + + from drunc.controller.utils import get_detector_name + d = Description( + type = descriptionType, + name = descriptionName, + endpoint = self.get_endpoint(), + info = get_detector_name(self.configuration), + session = os.getenv("DUNEDAQ_SESSION"), + commands = None, + broadcast = None, + ) + + resp = Response( + name = self.name, + token = token, + data = pack_to_any(d), + flag = ResponseFlag.EXECUTED_SUCCESSFULLY, + children = None + ) + return resp + @staticmethod def get_child(name:str, cli, configuration, init_token=None, connectivity_service=None, **kwargs): diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index f263e13c..0b7ec58d 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -19,12 +19,13 @@ class gRPCChildNode(ChildNode): def __init__(self, name, configuration:gRCPChildConfHandler, init_token, uri): super().__init__( name = name, - node_type = ControlType.gRPC + node_type = ControlType.gRPC, + configuration = configuration ) from logging import getLogger self.log = getLogger(f'{self.name}-grpc-child') - self.configuration = configuration + host, port = uri.split(":") port = int(port) @@ -110,7 +111,6 @@ def terminate(self): pass def propagate_command(self, command, data, token) -> Response: - return send_command( controller = self.controller, token = token, diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index f5247345..fb646525 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -1,10 +1,14 @@ from drunc.controller.children_interface.child_node import ChildNode from drunc.exceptions import DruncSetupException from drunc.utils.utils import ControlType -from druncschema.request_response_pb2 import Response +from drunc.utils.grpc_utils import pack_to_any +from druncschema.generic_pb2 import PlainText, Stacktrace +from druncschema.request_response_pb2 import Response, ResponseFlag, Description +from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag from druncschema.token_pb2 import Token import threading from typing import NoReturn +import os class ResponseDispatcher(threading.Thread): @@ -395,7 +399,8 @@ class RESTAPIChildNode(ChildNode): def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configuration:FSMConfHandler, uri): super(RESTAPIChildNode, self).__init__( name = name, - node_type = ControlType.REST_API + node_type = ControlType.REST_API, + configuration = configuration ) from logging import getLogger @@ -403,6 +408,8 @@ def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configur self.response_listener = ResponseListener.get() + self.fsm_configuration = fsm_configuration + import socket response_listener_host = socket.gethostname() @@ -413,7 +420,7 @@ def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configur from drunc.exceptions import DruncSetupException raise DruncSetupException(f"Application {name} does not expose a control service in the configuration, or has not advertised itself to the application registry service, or the application registry service is not reachable.") - proxy_host, proxy_port = getattr(configuration.data, "proxy", [None, None]) + proxy_host, proxy_port = getattr(self.configuration.data, "proxy", [None, None]) proxy_port = int(proxy_port) if proxy_port is not None else None self.commander = AppCommander( @@ -465,10 +472,6 @@ def get_status(self, token): ) def propagate_command(self, command:str, data, token:Token) -> Response: - from druncschema.request_response_pb2 import ResponseFlag - from druncschema.generic_pb2 import PlainText, Stacktrace - from drunc.utils.grpc_utils import pack_to_any - if command == 'exclude': self.state.exclude() return Response( @@ -496,8 +499,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: children = [] ) - from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag - if self.state.excluded(): return Response( name = self.name, @@ -514,7 +515,11 @@ def propagate_command(self, command:str, data, token:Token) -> Response: ) # here lies the mother of all the problems - if command != 'execute_fsm_command': + if command == 'execute_fsm_command': + return self.propagate_fsm_command(command, data, token) + elif command == 'describe': + return self.describe(token) + else: self.log.info(f'Ignoring command \'{command}\' sent to \'{self.name}\'') return Response( name = self.name, @@ -524,6 +529,7 @@ def propagate_command(self, command:str, data, token:Token) -> Response: children = [] ) + def propagate_fsm_command(self, command:str, data, token:Token) -> Response: from drunc.exceptions import DruncException entry_state = self.state.get_operational_state() transition = self.fsm.get_transition(data.command_name) @@ -532,7 +538,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: import json self.log.info(f'Sending \'{data.command_name}\' to \'{self.name}\'') - try: self.commander.send_command( cmd_id = data.command_name, @@ -558,7 +563,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response: command_name = data.command_name, data = response_data ) - from drunc.utils.grpc_utils import pack_to_any response = Response( name = self.name, token = token, diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 16fb2a66..2d9edda0 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1,25 +1,24 @@ -from druncschema.request_response_pb2 import Request, Response, ResponseFlag -from druncschema.token_pb2 import Token -from druncschema.generic_pb2 import PlainText, PlainTextVector +from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.controller_pb2_grpc import ControllerServicer -from druncschema.controller_pb2 import Status -from druncschema.controller_pb2 import FSMCommand, FSMCommandResponse, FSMResponseFlag +from druncschema.controller_pb2 import Status, FSMCommand, FSMCommandResponse, FSMResponseFlag +from druncschema.generic_pb2 import PlainText, PlainTextVector +from druncschema.request_response_pb2 import Request, Response, ResponseFlag +from druncschema.token_pb2 import Token -from drunc.controller.children_interface.child_node import ChildNode -from drunc.controller.stateful_node import StatefulNode +from drunc.authoriser.decorators import authentified_and_authorised from drunc.broadcast.server.broadcast_sender import BroadcastSender -import drunc.controller.exceptions as ctler_excpt -from drunc.utils.grpc_utils import pack_to_any -from typing import Optional, List from drunc.broadcast.server.decorators import broadcasted -from drunc.utils.grpc_utils import unpack_request_data_to, pack_response -from drunc.authoriser.decorators import authentified_and_authorised -from druncschema.authoriser_pb2 import ActionType, SystemType +from drunc.controller.children_interface.child_node import ChildNode from drunc.controller.decorators import in_control +import drunc.controller.exceptions as ctler_excpt +from drunc.controller.stateful_node import StatefulNode from drunc.exceptions import DruncException +from drunc.utils.grpc_utils import pack_to_any +from drunc.utils.grpc_utils import unpack_request_data_to, pack_response import signal +from typing import Optional, List class ControllerActor: def __init__(self, token:Optional[Token]=None): @@ -168,10 +167,10 @@ def __init__(self, configuration, name:str, session:str, token:Token): name = 'describe_fsm', data_type = ['generic_pb2.PlainText', 'None'], help = '''Return a description of the FSM transitions: -if a transition name is provided in its input, return that transition description; -if a state is provided, return the transitions accessible from that state; -if "all-transitions" is provided, return all the transitions; -if nothing (None) is provided, return the transitions accessible from the current state.''', + if a transition name is provided in its input, return that transition description; + if a state is provided, return the transitions accessible from that state; + if "all-transitions" is provided, return all the transitions; + if nothing (None) is provided, return the transitions accessible from the current state.''', return_type = 'request_response_pb2.Description' ), @@ -457,21 +456,22 @@ def status(self, token:Token) -> Response: def describe(self, token:Token) -> Response: from druncschema.request_response_pb2 import Description from drunc.utils.grpc_utils import pack_to_any + from drunc.controller.utils import get_detector_name bd = self.describe_broadcast() d = Description( - # endpoint = self.uri, type = 'controller', name = self.name, + endpoint = self.uri, + info = get_detector_name(self.configuration), session = self.session, commands = self.commands, - # children_endpoints = [child.get_endpoint() for child in self.children_nodes], ) if bd: d.broadcast.CopyFrom(pack_to_any(bd)) - response_children = self.propagate_to_list( + children_description = self.propagate_to_list( 'describe', command_data = None, token = token, @@ -483,7 +483,7 @@ def describe(self, token:Token) -> Response: token = None, data = pack_to_any(d), flag = ResponseFlag.EXECUTED_SUCCESSFULLY, - children = response_children, + children = children_description, ) # ORDER MATTERS! diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index b8fdc0a6..26fbb3d5 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -3,7 +3,7 @@ from druncschema.controller_pb2 import Status from drunc.utils.grpc_utils import unpack_any -from drunc.utils.shell_utils import GRPCDriver +from drunc.utils.shell_utils import GRPCDriver, DecodedResponse class ControllerDriver(GRPCDriver): @@ -19,35 +19,35 @@ def create_stub(self, channel): from druncschema.controller_pb2_grpc import ControllerStub return ControllerStub(channel) - def describe(self) -> Description: + def describe(self) -> DecodedResponse: return self.send_command('describe', outformat = Description) - def describe_fsm(self, key:str=None) -> Description: # key can be: a state name, a transition name, none to get the currently accessible transitions, or all-transition for all the transitions + def describe_fsm(self, key:str=None) -> DecodedResponse: # key can be: a state name, a transition name, none to get the currently accessible transitions, or all-transition for all the transitions from druncschema.controller_pb2 import FSMCommandsDescription input = PlainText(text = key) return self.send_command('describe_fsm', data = input, outformat = FSMCommandsDescription) - def status(self) -> Description: + def status(self) -> DecodedResponse: return self.send_command('status', outformat = Status) - def take_control(self) -> Description: + def take_control(self) -> DecodedResponse: return self.send_command('take_control', outformat = PlainText) - def who_is_in_charge(self, rethrow=None) -> Description: + def who_is_in_charge(self, rethrow=None) -> DecodedResponse: return self.send_command('who_is_in_charge', outformat = PlainText) - def surrender_control(self) -> Description: + def surrender_control(self) -> DecodedResponse: return self.send_command('surrender_control') - def execute_fsm_command(self, arguments) -> Description: + def execute_fsm_command(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('execute_fsm_command', data = arguments, outformat = FSMCommandResponse) - def include(self, arguments) -> Description: + def include(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('include', data = arguments, outformat = PlainText) - def exclude(self, arguments) -> Description: + def exclude(self, arguments) -> DecodedResponse: from druncschema.controller_pb2 import FSMCommandResponse return self.send_command('exclude', data = arguments, outformat = PlainText) diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index 1d83e999..d4e04882 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -39,10 +39,12 @@ def wait(obj:ControllerContext, sleep_time:int) -> None: @click.command('status') @click.pass_obj def status(obj:ControllerContext) -> None: + # Get the dynamic system information statuses = obj.get_driver('controller').status() - + # Get the static system information + descriptions = obj.get_driver('controller').describe() from drunc.controller.interface.shell_utils import print_status_table - print_status_table(obj,statuses) + print_status_table(obj, statuses, descriptions) @click.command('connect') @click.argument('controller_address', type=str) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 3ada1ec0..94ce796d 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -1,11 +1,35 @@ from rich import print from druncschema.controller_pb2 import FSMCommandsDescription - +from druncschema.request_response_pb2 import Description +from drunc.utils.shell_utils import DecodedResponse import logging +import inspect log = logging.getLogger('controller_shell_utils') +def match_children(statuses:list, descriptions:list) -> list: + def check_message_type(message:Description, expected_type:str) -> None: + if message.data.DESCRIPTOR.name != expected_type: + raise TypeError("Message {message.name} is not of type 'Description'!") + return + + children = [] + for status in statuses: + check_message_type(status, "Status") + child = {} + child_name = status.name + for description in descriptions: + if description.name == child_name: + check_message_type(description, "Description") + child["status"] = status + child["description"] = description + children.append(child) + break + if len(descriptions) != len(children): + from drunc.controller.exceptions import MalformedCommand + raise MalformedCommand(f"Command {inspect.currentframe().f_code.co_name} has assigned the incorrect number of children!") + return children -def print_status_table(obj, statuses): +def print_status_table(obj, statuses:DecodedResponse, descriptions:DecodedResponse): from druncschema.controller_pb2 import Status if not statuses: return @@ -20,22 +44,26 @@ def print_status_table(obj, statuses): t = Table(title=f'Status') t.add_column('Name') + t.add_column('Info') t.add_column('State') t.add_column('Substate') - t.add_column('In error', justify='center') - t.add_column('Included', justify='center') + t.add_column('In error') + t.add_column('Included') + t.add_column('Endpoint') - def add_status_to_table(status, table, prefix): + def add_status_to_table(table, status, description, prefix): table.add_row( prefix+status.name, + description.data.info, status.data.state, status.data.sub_state, format_bool(status.data.in_error, false_is_good = True), format_bool(status.data.included), + description.data.endpoint ) - for child in status.children: - add_status_to_table(child, table, prefix=prefix+' ') - add_status_to_table(statuses, t, prefix='') + for child in match_children(status.children, description.children): + add_status_to_table(t, child["status"], child["description"], prefix=prefix+' ') + add_status_to_table(t, statuses, descriptions, prefix='') obj.print(t) obj.print_status_summary() @@ -292,11 +320,11 @@ def run_one_fsm_command(controller_name, transition_name, obj, **kwargs): obj.print(f"Running transition \'{transition_name}\' on controller \'{controller_name}\'") from druncschema.controller_pb2 import FSMCommand - description = obj.get_driver('controller').describe_fsm().data + fsm_description = obj.get_driver('controller').describe_fsm().data from drunc.controller.interface.shell_utils import search_fsm_command, validate_and_format_fsm_arguments, ArgumentException - command_desc = search_fsm_command(transition_name, description.commands) + command_desc = search_fsm_command(transition_name, fsm_description.commands) if command_desc is None: obj.error(f'Command "{transition_name}" does not exist, or is not accessible right now') @@ -349,9 +377,10 @@ def add_to_table(table, response, prefix=''): obj.print(t) statuses = obj.get_driver('controller').status() + descriptions = obj.get_driver('controller').describe() from drunc.controller.interface.shell_utils import print_status_table - print_status_table(obj, statuses) + print_status_table(obj, statuses, descriptions) from druncschema.controller_pb2 import FSMCommandDescription diff --git a/src/drunc/controller/utils.py b/src/drunc/controller/utils.py index 605abd1c..ebd624c8 100644 --- a/src/drunc/controller/utils.py +++ b/src/drunc/controller/utils.py @@ -3,6 +3,9 @@ from druncschema.token_pb2 import Token from drunc.utils.grpc_utils import pack_to_any +import logging +log = logging.getLogger('controller_utils') + def get_status_message(stateful:StatefulNode): from druncschema.controller_pb2 import Status state_string = stateful.get_node_operational_state() @@ -16,6 +19,16 @@ def get_status_message(stateful:StatefulNode): included = stateful.node_is_included(), ) +def get_detector_name(configuration) -> str: + detector_name = None + if hasattr(configuration.data, "contains") and len(configuration.data.contains) > 0: + if len(configuration.data.contains) > 0: + log.debug(f"Application {configuration.data.id} has multiple contains, using the first one") + detector_name = configuration.data.contains[0].id.replace("-", "_").replace("_", " ") + else: + log.debug(f"Application {configuration.data.id} has no \"contains\" relation, hence no detector") + return detector_name + def send_command(controller, token, command:str, data=None, rethrow=False): import grpc from google.protobuf import any_pb2 diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 1cbf879c..1b298c99 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -306,7 +306,7 @@ def print_status_summary(self) -> None: if status.in_error: self.print(f"[red] FSM is in error ({status})[/red], not currently accepting new commands.") else: - available_actions = [command.name.replace('_', '-') for command in self.get_driver('controller').describe_fsm().data.commands] + available_actions = [command.name.replace("_", "-") for command in self.get_driver('controller').describe_fsm().data.commands] self.print(f"Current FSM status is [green]{status.state}[/green]. Available transitions are [green]{'[/green], [green]'.join(available_actions)}[/green].")