Skip to content

Commit

Permalink
Merge pull request #313 from DUNE-DAQ/PawelPlesniak/Ruff
Browse files Browse the repository at this point in the history
`ruff` clearup
  • Loading branch information
plasorak authored Nov 15, 2024
2 parents 5c9e8b4 + e47d81b commit e08fe75
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 205 deletions.
8 changes: 0 additions & 8 deletions src/drunc/broadcast/client/broadcast_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ def __init__(self, broadcast_configuration:BroadcastClientConfHandler):
message_format = BroadcastMessage,
conf = self.configuration
)
case BroadcastTypes.gRPC:
raise BroadcastTypeNotHandled("gRPC is not available for broadcasting!")
from drunc.broadcast.client.grpc_stdout_broadcast_handler import gRPCStdoutBroadcastHandler
from druncschema.broadcast_pb2 import BroadcastMessage
self.implementation = gRPCStdoutBroadcastHandler(
conf = broadcast_configuration,
message_format = BroadcastMessage,
)
case _:
self.log.info('Could not understand the BroadcastHandler technology you want to use, you will get no broadcast!')

Expand Down
6 changes: 3 additions & 3 deletions src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from druncschema.generic_pb2 import Empty
import grpc
from drunc.utils.configuration import ConfTypes

from drunc.controller.configuration import ControllerConfHandler

class gRPCStdoutBroadcastHandler(BroadcastReceiverServicer):
def __init__(self, conf:ConfData, token, **kwargs) -> None:
def __init__(self, conf:ControllerConfHandler, token, **kwargs) -> None:
super(gRPCStdoutBroadcastHandler, self).__init__(
**kwargs
)
Expand All @@ -17,7 +17,7 @@ def __init__(self, conf:ConfData, token, **kwargs) -> None:
self.token = token
from logging import getLogger
self._log = getLogger("BroadcastReceiver")
self._address = f'[::]:{port}'
self._address = None # f'[::]:{port}'
self._log.debug('Broadcast receiver initialised')

def stop_receiving(self)->None:
Expand Down
5 changes: 3 additions & 2 deletions src/drunc/broadcast/server/grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from druncschema.authoriser_pb2 import ActionType
from drunc.utils.grpc_utils import unpack_any
from google.protobuf.any_pb2 import Any
import drunc.controller.exceptions as ctler_excpt

class ListenerRepresentation:

Expand Down Expand Up @@ -117,7 +118,7 @@ def add_to_broadcast_list(self, request:Request, context) -> Response:


def remove_from_broadcast_list(self, request:Request, context) -> Response:
r = unpack_any(data, BroadcastRequest)
r = unpack_any(request, BroadcastRequest)
if not self.broadcaster.rm_listener(r.broadcast_receiver_address):
raise ctler_excpt.ControllerException(f'Failed to remove {r.broadcast_receiver_address} from broadcast list')
return PlainText(text = f'Removed {r.broadcast_receiver_address} to broadcast list')
Expand Down Expand Up @@ -247,7 +248,7 @@ def serve(port:int) -> None:
receiver_threads.append(server_thread)
except:
pass

from drunc.broadcast.server.broadcast_sender import BroadcastSender
broadcaster = BroadcastSender()
for port in port_list:
broadcaster.add_listener(f'[::]:{port}')
Expand Down
3 changes: 1 addition & 2 deletions src/drunc/broadcast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

class BroadcastTypes(Enum):
Unknown = 0
gRPC = 1
Kafka = 2
Kafka = 1
ERS = 2


Expand Down
3 changes: 2 additions & 1 deletion src/drunc/controller/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def _grab_segment_conf_from_controller(self, configuration):
self.session = self.db.get_dal(class_name="Session", uid=self.oks_key.session)
this_segment = ControllerConfHandler.find_segment(self.session.segment, self.oks_key.obj_uid)
if this_segment is None:
CouldNotFindSegment(self.oks_key.obj_uid)
from drunc.exceptions import DruncSetupException
DruncSetupException(f"Could not find segment with oks_key.obj_uid: {self.oks_key.obj_uid}")
return this_segment

def _post_process_oks(self):
Expand Down
187 changes: 0 additions & 187 deletions src/drunc/controller/interface/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,193 +83,6 @@ def who_is_in_charge(obj:ControllerContext) -> None:
if who:
obj.print(who.text)


@click.command('fsm')
@click.argument('fsm_command', type=str, nargs=-1)
@click.pass_obj
def fsm(obj:ControllerContext, fsm_command:str) -> None:
from drunc.controller.interface.shell_utils import format_bool, tree_prefix, search_fsm_command, validate_and_format_fsm_arguments, ArgumentException
from drunc.utils.grpc_utils import unpack_any
from druncschema.controller_pb2 import FSMResponseFlag, FSMCommandResponse, FSMCommand, FSMCommandDescription
from druncschema.request_response_pb2 import ResponseFlag
from rich.table import Table

def split_FSM_args(obj:ControllerContext, passed_commands:tuple):
# Note this is a placeholder - want to get this from OKS.
available_commands = ["conf", "start", "enable_triggers", "disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap", "start_run", "stop_run", "shutdown"]
available_command_mandatory_args = [[], ["run_number"], [], [], [], [], [], [], ["run_number"], [], []]
available_sequences = ["start_run", "stop_run", "shutdown"]
available_sequence_commands = [["conf", "start", "enable_triggers"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop"], ["disable_triggers", "drain_dataflow", "stop_trigger_sources", "stop", "scrap"]]
available_command_opt_args = []
available_args = ["run_number"]

# Get the index of all the commands in the command str
command_list = [command for command in passed_commands if command in available_commands]
if len(command_list) == 0:
if len(passed_commands) == 1:
obj.print(f"The passed command [red]{' '.join(passed_commands)}[/red] was not understood.")
else:
obj.print(f"None of the passed arguments were correctly identified.")
raise SystemExit(1)

# Get the arguments for each command
command_index = [passed_commands.index(command) for command in command_list]
command_argument_list = []
for i in range(len(command_index)-1):
command_argument_list.append(list(passed_commands[command_index[i]+1:command_index[i + 1]]))
command_argument_list.append(list(passed_commands[command_index[-1]+1:]))

# Not elegant, would be better to check at the command level first but it does work
for argument_list in command_argument_list:
argument_names = argument_list[::2]
for argument in argument_names:
# Check if the argument is legal
if argument not in available_args:
from drunc.controller.exceptions import MalformedCommandArgument
raise MalformedCommandArgument(f"Argument '{argument}' not recognised as a valid argument.")
# Check for duplicates
if argument_names.count(argument) != 1:
from drunc.controller.exceptions import MalformedCommand
raise MalformedCommand(f"Argument '{argument}' has been repeated.")

# Check the mandatory arguments
for command in command_list:
mandatory_command_arguments = available_command_mandatory_args[available_commands.index(command)]
provided_command_arguments = command_argument_list[command_list.index(command)]
if mandatory_command_arguments == []:
continue
for argument in mandatory_command_arguments:
if argument not in provided_command_arguments:
missing_command_arguments = list(set(mandatory_command_arguments) - set(provided_command_arguments))
obj.print(f"There are missing arguments for command [green]{command}[/green]. Missing mandatory argument(s): [red]{' '.join(missing_command_arguments)}[/red].")
raise SystemExit(1)

# Extract commands from sequences
for command in command_list:
if command not in available_sequences:
continue
# Define the sequence command parameters
passed_sequence_command_index = command_list.index(command)
passed_sequence_command_args = list(command_argument_list[passed_sequence_command_index])
sequence_commands = available_sequence_commands[available_sequences.index(command)]
# Replace the sequence command with the correct fsm commands
del command_list[passed_sequence_command_index]
command_list[passed_sequence_command_index:passed_sequence_command_index] = sequence_commands
# Replace the sequence command arguments. Duplicates the sequence arguments for all FSM commands
del command_argument_list[passed_sequence_command_index]
for _ in range(len(sequence_commands)):
command_argument_list.insert(passed_sequence_command_index, passed_sequence_command_args)

return command_list, command_argument_list


def dict_arguments(arguments:str) -> dict:
if len(arguments) % 2 != 0:
raise click.BadParameter('Arguments are pairs of key-value!')
keys = arguments[::2]
values = arguments[1::2]
arguments_dict = {keys[i]:values[i] for i in range(len(keys))}
return arguments_dict

def bool_to_success(flag_message, FSM):
flag = False
if FSM and flag_message == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY:
flag = True
if not FSM and flag_message == ResponseFlag.EXECUTED_SUCCESSFULLY:
flag = True
return "success" if flag else "failed"

def add_to_table(table, response, prefix=''):
table.add_row(
prefix+response.name,
bool_to_success(response.flag, FSM=False),
bool_to_success(response.data.flag, FSM=True) if response.flag == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY else "failed",
)
for child_response in response.children:
add_to_table(table, child_response, " "+prefix)

def print_execution_report(command:str, result:FSMCommandResponse) -> None:
t = Table(title=f'{command} execution report')
t.add_column('Name')
t.add_column('Command execution')
t.add_column('FSM transition')

add_to_table(t, result)
obj.print(t)
obj.print("") # For formatting
return

def filter_arguments(arguments:dict, fsm_command:FSMCommandDescription) -> dict:
if not arguments:
return None
cmd_arguments = {}
command_arguments = fsm_command.arguments
cmd_argument_names = [argument.name for argument in command_arguments]
for argument in list(arguments):
if argument in cmd_argument_names:
cmd_arguments[argument] = arguments[argument]
return cmd_arguments

def construct_FSM_command(obj:ControllerContext, command:tuple[str, list]) -> FSMCommand:
command_name = command[0]
command_args = dict_arguments(command[1])
command_desc = search_fsm_command(command_name, obj.get_driver('controller').describe_fsm().data.commands) # FSMCommandDescription
if command_desc == None:
return None

# Apply the appropriate arguments for this command
arguments = filter_arguments(command_args, command_desc)
# Construct the FSMCommand
from druncschema.controller_pb2 import FSMCommand
cmd = FSMCommand(
command_name = command_name,
arguments = validate_and_format_fsm_arguments(arguments, command_desc.arguments)
)
return cmd

def send_FSM_command(obj:ControllerContext, command:FSMCommand) -> FSMCommandResponse:
try:
result = obj.get_driver('controller').execute_fsm_command(command)
except Exception as e: # TODO narrow this exception down
obj.print(f"[red]{command.command_name}[/red] failed.")
raise e
print(f"Response flag: {result.flag=}")
if result.flag == FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR:
self.stateful_node.to_error()
obj.print(f"[red]{command.command_name}[/red] failed, there is a node in error.")
elif result.flag != FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY:
obj.print(f"[red]{command.command_name}[/red] failed.")
else:
obj.print(f"[green]{command.command_name}[/green] executed successfully.")
print_execution_report(command.command_name, result)
obj.print_status_summary() # This prints the table, unsure of why. RETURNTOME
if not result: return
return result

# When FSM batch mode gets merged this will need updating
if obj.get_driver('controller').get_status().data.in_error:
obj.print(f"[red]{command}[/red] not sent - node is in error.")
return

# Split command into a list of commands and a list of arguments
command_list, argument_list = split_FSM_args(obj, fsm_command)

# Execute the FSM commands
result = None
for command in zip(command_list, argument_list):
grpc_command = construct_FSM_command(obj, command)
if grpc_command == None:
obj.print(f"[red]{command[0]}[/red] is not possible in current state, not executing.")
continue
obj.print(f"Sending [green]{command[0]}[/green].")
result = send_FSM_command(obj, grpc_command)
if (result == None):
obj.print(f"Transition {FSMtransition} did not execute. Check logs for more info.")
break

obj.print_status_summary()
return

@click.command('include')
@click.pass_obj
def include(obj:ControllerContext) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/drunc/process_manager/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ async def logs(self, lr:LogRequest) -> Response:
yield Response(
name = self.name,
token = None,
data = pack_to_any(resp),
data = None,
flag = ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
children = [],
)
Expand Down
2 changes: 1 addition & 1 deletion src/drunc/unified_shell/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def cleanup():


from drunc.controller.interface.commands import (
status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, fsm, include, exclude, wait
status, connect, take_control, surrender_control, who_am_i, who_is_in_charge, include, exclude, wait
)

ctx.command.add_command(status, 'status')
Expand Down

0 comments on commit e08fe75

Please sign in to comment.