diff --git a/src/drunc/broadcast/client/broadcast_handler.py b/src/drunc/broadcast/client/broadcast_handler.py index 4d1f806a..6b5940f3 100644 --- a/src/drunc/broadcast/client/broadcast_handler.py +++ b/src/drunc/broadcast/client/broadcast_handler.py @@ -24,14 +24,6 @@ def __init__(self, broadcast_configuration:BroadcastClientConfHandler): message_format = BroadcastMessage, conf = self.configuration ) - case BroadcastTypes.gRPC: - raise BroadcastTypeNotHandled("gRPC is not available for broadcasting!") - from drunc.broadcast.client.grpc_stdout_broadcast_handler import gRPCStdoutBroadcastHandler - from druncschema.broadcast_pb2 import BroadcastMessage - self.implementation = gRPCStdoutBroadcastHandler( - conf = broadcast_configuration, - message_format = BroadcastMessage, - ) case _: self.log.info('Could not understand the BroadcastHandler technology you want to use, you will get no broadcast!') diff --git a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py index eabf1526..81149230 100644 --- a/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py +++ b/src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py @@ -3,10 +3,10 @@ from druncschema.generic_pb2 import Empty import grpc from drunc.utils.configuration import ConfTypes - +from drunc.controller.configuration import ControllerConfHandler class gRPCStdoutBroadcastHandler(BroadcastReceiverServicer): - def __init__(self, conf:ConfData, token, **kwargs) -> None: + def __init__(self, conf:ControllerConfHandler, token, **kwargs) -> None: super(gRPCStdoutBroadcastHandler, self).__init__( **kwargs ) @@ -17,7 +17,7 @@ def __init__(self, conf:ConfData, token, **kwargs) -> None: self.token = token from logging import getLogger self._log = getLogger("BroadcastReceiver") - self._address = f'[::]:{port}' + self._address = None # f'[::]:{port}' self._log.debug('Broadcast receiver initialised') def stop_receiving(self)->None: diff --git a/src/drunc/broadcast/server/grpc_servicer.py b/src/drunc/broadcast/server/grpc_servicer.py index 12ebf98b..698d0525 100644 --- a/src/drunc/broadcast/server/grpc_servicer.py +++ b/src/drunc/broadcast/server/grpc_servicer.py @@ -9,6 +9,7 @@ from druncschema.authoriser_pb2 import ActionType from drunc.utils.grpc_utils import unpack_any from google.protobuf.any_pb2 import Any +import drunc.controller.exceptions as ctler_excpt class ListenerRepresentation: @@ -117,7 +118,7 @@ def add_to_broadcast_list(self, request:Request, context) -> Response: def remove_from_broadcast_list(self, request:Request, context) -> Response: - r = unpack_any(data, BroadcastRequest) + r = unpack_any(request, BroadcastRequest) if not self.broadcaster.rm_listener(r.broadcast_receiver_address): raise ctler_excpt.ControllerException(f'Failed to remove {r.broadcast_receiver_address} from broadcast list') return PlainText(text = f'Removed {r.broadcast_receiver_address} to broadcast list') @@ -247,7 +248,7 @@ def serve(port:int) -> None: receiver_threads.append(server_thread) except: pass - + from drunc.broadcast.server.broadcast_sender import BroadcastSender broadcaster = BroadcastSender() for port in port_list: broadcaster.add_listener(f'[::]:{port}') diff --git a/src/drunc/broadcast/types.py b/src/drunc/broadcast/types.py index 3d9ef8db..d3685e79 100644 --- a/src/drunc/broadcast/types.py +++ b/src/drunc/broadcast/types.py @@ -4,8 +4,7 @@ class BroadcastTypes(Enum): Unknown = 0 - gRPC = 1 - Kafka = 2 + Kafka = 1 ERS = 2 diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index 2f320e00..f90e6704 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -31,7 +31,8 @@ def _grab_segment_conf_from_controller(self, configuration): self.session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) this_segment = ControllerConfHandler.find_segment(self.session.segment, self.oks_key.obj_uid) if this_segment is None: - CouldNotFindSegment(self.oks_key.obj_uid) + from drunc.exceptions import DruncSetupException + DruncSetupException(f"Could not find segment with oks_key.obj_uid: {self.oks_key.obj_uid}") return this_segment def _post_process_oks(self): diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index b6bec5a3..d4e04882 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -83,193 +83,6 @@ def who_is_in_charge(obj:ControllerContext) -> None: if who: obj.print(who.text) - -@click.command('fsm') -@click.argument('fsm_command', type=str, nargs=-1) -@click.pass_obj -def fsm(obj:ControllerContext, fsm_command:str) -> None: - from drunc.controller.interface.shell_utils import format_bool, tree_prefix, search_fsm_command, validate_and_format_fsm_arguments, ArgumentException - from drunc.utils.grpc_utils import unpack_any - from druncschema.controller_pb2 import FSMResponseFlag, FSMCommandResponse, FSMCommand, FSMCommandDescription - from druncschema.request_response_pb2 import ResponseFlag - from rich.table import Table - - def split_FSM_args(obj:ControllerContext, passed_commands:tuple): - # Note this is a placeholder - want to get this from OKS. - available_commands = ["conf", "start", "enable_triggers", "disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap", "start_run", "stop_run", "shutdown"] - available_command_mandatory_args = [[], ["run_number"], [], [], [], [], [], [], ["run_number"], [], []] - available_sequences = ["start_run", "stop_run", "shutdown"] - available_sequence_commands = [["conf", "start", "enable_triggers"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap"]] - available_command_opt_args = [] - available_args = ["run_number"] - - # Get the index of all the commands in the command str - command_list = [command for command in passed_commands if command in available_commands] - if len(command_list) == 0: - if len(passed_commands) == 1: - obj.print(f"The passed command [red]{' '.join(passed_commands)}[/red] was not understood.") - else: - obj.print(f"None of the passed arguments were correctly identified.") - raise SystemExit(1) - - # Get the arguments for each command - command_index = [passed_commands.index(command) for command in command_list] - command_argument_list = [] - for i in range(len(command_index)-1): - command_argument_list.append(list(passed_commands[command_index[i]+1:command_index[i + 1]])) - command_argument_list.append(list(passed_commands[command_index[-1]+1:])) - - # Not elegant, would be better to check at the command level first but it does work - for argument_list in command_argument_list: - argument_names = argument_list[::2] - for argument in argument_names: - # Check if the argument is legal - if argument not in available_args: - from drunc.controller.exceptions import MalformedCommandArgument - raise MalformedCommandArgument(f"Argument '{argument}' not recognised as a valid argument.") - # Check for duplicates - if argument_names.count(argument) != 1: - from drunc.controller.exceptions import MalformedCommand - raise MalformedCommand(f"Argument '{argument}' has been repeated.") - - # Check the mandatory arguments - for command in command_list: - mandatory_command_arguments = available_command_mandatory_args[available_commands.index(command)] - provided_command_arguments = command_argument_list[command_list.index(command)] - if mandatory_command_arguments == []: - continue - for argument in mandatory_command_arguments: - if argument not in provided_command_arguments: - missing_command_arguments = list(set(mandatory_command_arguments) - set(provided_command_arguments)) - obj.print(f"There are missing arguments for command [green]{command}[/green]. Missing mandatory argument(s): [red]{' '.join(missing_command_arguments)}[/red].") - raise SystemExit(1) - - # Extract commands from sequences - for command in command_list: - if command not in available_sequences: - continue - # Define the sequence command parameters - passed_sequence_command_index = command_list.index(command) - passed_sequence_command_args = list(command_argument_list[passed_sequence_command_index]) - sequence_commands = available_sequence_commands[available_sequences.index(command)] - # Replace the sequence command with the correct fsm commands - del command_list[passed_sequence_command_index] - command_list[passed_sequence_command_index:passed_sequence_command_index] = sequence_commands - # Replace the sequence command arguments. Duplicates the sequence arguments for all FSM commands - del command_argument_list[passed_sequence_command_index] - for _ in range(len(sequence_commands)): - command_argument_list.insert(passed_sequence_command_index, passed_sequence_command_args) - - return command_list, command_argument_list - - - def dict_arguments(arguments:str) -> dict: - if len(arguments) % 2 != 0: - raise click.BadParameter('Arguments are pairs of key-value!') - keys = arguments[::2] - values = arguments[1::2] - arguments_dict = {keys[i]:values[i] for i in range(len(keys))} - return arguments_dict - - def bool_to_success(flag_message, FSM): - flag = False - if FSM and flag_message == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY: - flag = True - if not FSM and flag_message == ResponseFlag.EXECUTED_SUCCESSFULLY: - flag = True - return "success" if flag else "failed" - - def add_to_table(table, response, prefix=''): - table.add_row( - prefix+response.name, - bool_to_success(response.flag, FSM=False), - bool_to_success(response.data.flag, FSM=True) if response.flag == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY else "failed", - ) - for child_response in response.children: - add_to_table(table, child_response, " "+prefix) - - def print_execution_report(command:str, result:FSMCommandResponse) -> None: - t = Table(title=f'{command} execution report') - t.add_column('Name') - t.add_column('Command execution') - t.add_column('FSM transition') - - add_to_table(t, result) - obj.print(t) - obj.print("") # For formatting - return - - def filter_arguments(arguments:dict, fsm_command:FSMCommandDescription) -> dict: - if not arguments: - return None - cmd_arguments = {} - command_arguments = fsm_command.arguments - cmd_argument_names = [argument.name for argument in command_arguments] - for argument in list(arguments): - if argument in cmd_argument_names: - cmd_arguments[argument] = arguments[argument] - return cmd_arguments - - def construct_FSM_command(obj:ControllerContext, command:tuple[str, list]) -> FSMCommand: - command_name = command[0] - command_args = dict_arguments(command[1]) - command_desc = search_fsm_command(command_name, obj.get_driver('controller').describe_fsm().data.commands) # FSMCommandDescription - if command_desc == None: - return None - - # Apply the appropriate arguments for this command - arguments = filter_arguments(command_args, command_desc) - # Construct the FSMCommand - from druncschema.controller_pb2 import FSMCommand - cmd = FSMCommand( - command_name = command_name, - arguments = validate_and_format_fsm_arguments(arguments, command_desc.arguments) - ) - return cmd - - def send_FSM_command(obj:ControllerContext, command:FSMCommand) -> FSMCommandResponse: - try: - result = obj.get_driver('controller').execute_fsm_command(command) - except Exception as e: # TODO narrow this exception down - obj.print(f"[red]{command.command_name}[/red] failed.") - raise e - print(f"Response flag: {result.flag=}") - if result.flag == FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR: - self.stateful_node.to_error() - obj.print(f"[red]{command.command_name}[/red] failed, there is a node in error.") - elif result.flag != FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY: - obj.print(f"[red]{command.command_name}[/red] failed.") - else: - obj.print(f"[green]{command.command_name}[/green] executed successfully.") - print_execution_report(command.command_name, result) - obj.print_status_summary() # This prints the table, unsure of why. RETURNTOME - if not result: return - return result - - # When FSM batch mode gets merged this will need updating - if obj.get_driver('controller').get_status().data.in_error: - obj.print(f"[red]{command}[/red] not sent - node is in error.") - return - - # Split command into a list of commands and a list of arguments - command_list, argument_list = split_FSM_args(obj, fsm_command) - - # Execute the FSM commands - result = None - for command in zip(command_list, argument_list): - grpc_command = construct_FSM_command(obj, command) - if grpc_command == None: - obj.print(f"[red]{command[0]}[/red] is not possible in current state, not executing.") - continue - obj.print(f"Sending [green]{command[0]}[/green].") - result = send_FSM_command(obj, grpc_command) - if (result == None): - obj.print(f"Transition {FSMtransition} did not execute. Check logs for more info.") - break - - obj.print_status_summary() - return - @click.command('include') @click.pass_obj def include(obj:ControllerContext) -> None: diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 5febe28c..1068ac4b 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -429,7 +429,7 @@ async def logs(self, lr:LogRequest) -> Response: yield Response( name = self.name, token = None, - data = pack_to_any(resp), + data = None, flag = ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, children = [], ) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a7a760ff..0a69b22c 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -186,7 +186,7 @@ def cleanup(): from drunc.controller.interface.commands import ( - status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, fsm, include, exclude, wait + status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, include, exclude, wait ) ctx.command.add_command(status, 'status')