Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
plasorak committed Nov 20, 2024
2 parents 03357fc + e08fe75 commit 9bf7234
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 265 deletions.
8 changes: 0 additions & 8 deletions src/drunc/broadcast/client/broadcast_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ def __init__(self, broadcast_configuration:BroadcastClientConfHandler):
message_format = BroadcastMessage,
conf = self.configuration
)
case BroadcastTypes.gRPC:
raise BroadcastTypeNotHandled("gRPC is not available for broadcasting!")
from drunc.broadcast.client.grpc_stdout_broadcast_handler import gRPCStdoutBroadcastHandler
from druncschema.broadcast_pb2 import BroadcastMessage
self.implementation = gRPCStdoutBroadcastHandler(
conf = broadcast_configuration,
message_format = BroadcastMessage,
)
case _:
self.log.info('Could not understand the BroadcastHandler technology you want to use, you will get no broadcast!')

Expand Down
6 changes: 3 additions & 3 deletions src/drunc/broadcast/client/grpc_stdout_broadcast_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from druncschema.generic_pb2 import Empty
import grpc
from drunc.utils.configuration import ConfTypes

from drunc.controller.configuration import ControllerConfHandler

class gRPCStdoutBroadcastHandler(BroadcastReceiverServicer):
def __init__(self, conf:ConfData, token, **kwargs) -> None:
def __init__(self, conf:ControllerConfHandler, token, **kwargs) -> None:
super(gRPCStdoutBroadcastHandler, self).__init__(
**kwargs
)
Expand All @@ -17,7 +17,7 @@ def __init__(self, conf:ConfData, token, **kwargs) -> None:
self.token = token
from logging import getLogger
self._log = getLogger("BroadcastReceiver")
self._address = f'[::]:{port}'
self._address = None # f'[::]:{port}'
self._log.debug('Broadcast receiver initialised')

def stop_receiving(self)->None:
Expand Down
5 changes: 3 additions & 2 deletions src/drunc/broadcast/server/grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from druncschema.authoriser_pb2 import ActionType
from drunc.utils.grpc_utils import unpack_any
from google.protobuf.any_pb2 import Any
import drunc.controller.exceptions as ctler_excpt

class ListenerRepresentation:

Expand Down Expand Up @@ -117,7 +118,7 @@ def add_to_broadcast_list(self, request:Request, context) -> Response:


def remove_from_broadcast_list(self, request:Request, context) -> Response:
r = unpack_any(data, BroadcastRequest)
r = unpack_any(request, BroadcastRequest)
if not self.broadcaster.rm_listener(r.broadcast_receiver_address):
raise ctler_excpt.ControllerException(f'Failed to remove {r.broadcast_receiver_address} from broadcast list')
return PlainText(text = f'Removed {r.broadcast_receiver_address} to broadcast list')
Expand Down Expand Up @@ -247,7 +248,7 @@ def serve(port:int) -> None:
receiver_threads.append(server_thread)
except:
pass

from drunc.broadcast.server.broadcast_sender import BroadcastSender
broadcaster = BroadcastSender()
for port in port_list:
broadcaster.add_listener(f'[::]:{port}')
Expand Down
3 changes: 1 addition & 2 deletions src/drunc/broadcast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

class BroadcastTypes(Enum):
Unknown = 0
gRPC = 1
Kafka = 2
Kafka = 1
ERS = 2


Expand Down
38 changes: 37 additions & 1 deletion src/drunc/controller/children_interface/child_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@
from drunc.exceptions import DruncSetupException
from drunc.utils.utils import ControlType, get_control_type_and_uri_from_connectivity_service, get_control_type_and_uri_from_cli
import logging
from drunc.utils.grpc_utils import pack_to_any
from druncschema.token_pb2 import Token
from druncschema.request_response_pb2 import Response, ResponseFlag, Description
import os

class ChildInterfaceTechnologyUnknown(DruncSetupException):
def __init__(self, t, name):
super().__init__(f'The type {t} is not supported for the ChildNode {name}')


class ChildNode(abc.ABC):
def __init__(self, name:str, node_type:ControlType, **kwargs) -> None:
def __init__(self, name:str, configuration, node_type:ControlType, **kwargs) -> None:
super().__init__(**kwargs)

self.node_type = node_type
import logging
self.log = logging.getLogger(f"{name}-child-node")
self.name = name
self.configuration = configuration

@abc.abstractmethod
def __str__(self):
Expand All @@ -40,6 +45,37 @@ def get_status(self, token):
def get_endpoint(self):
pass

def describe(self, token:Token) -> Response:
descriptionType = None
descriptionName = None

if hasattr(self.configuration.data, "application_name"): # Get the application name and type
descriptionType = self.configuration.data.application_name
descriptionName = self.configuration.data.id
elif hasattr(self.configuration.data, "controller") and hasattr(self.configuration.data.controller, "application_name"): # Get the controller name and type
descriptionType = self.configuration.data.controller.application_name
descriptionName = self.configuration.data.controller.id

from drunc.controller.utils import get_detector_name
d = Description(
type = descriptionType,
name = descriptionName,
endpoint = self.get_endpoint(),
info = get_detector_name(self.configuration),
session = os.getenv("DUNEDAQ_SESSION"),
commands = None,
broadcast = None,
)

resp = Response(
name = self.name,
token = token,
data = pack_to_any(d),
flag = ResponseFlag.EXECUTED_SUCCESSFULLY,
children = None
)
return resp


@staticmethod
def get_child(name:str, cli, configuration, init_token=None, connectivity_service=None, **kwargs):
Expand Down
6 changes: 3 additions & 3 deletions src/drunc/controller/children_interface/grpc_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ class gRPCChildNode(ChildNode):
def __init__(self, name, configuration:gRCPChildConfHandler, init_token, uri):
super().__init__(
name = name,
node_type = ControlType.gRPC
node_type = ControlType.gRPC,
configuration = configuration
)

from logging import getLogger
self.log = getLogger(f'{self.name}-grpc-child')
self.configuration = configuration


host, port = uri.split(":")
port = int(port)
Expand Down Expand Up @@ -110,7 +111,6 @@ def terminate(self):
pass

def propagate_command(self, command, data, token) -> Response:

return send_command(
controller = self.controller,
token = token,
Expand Down
28 changes: 16 additions & 12 deletions src/drunc/controller/children_interface/rest_api_child.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from drunc.controller.children_interface.child_node import ChildNode
from drunc.exceptions import DruncSetupException
from drunc.utils.utils import ControlType
from druncschema.request_response_pb2 import Response
from drunc.utils.grpc_utils import pack_to_any
from druncschema.generic_pb2 import PlainText, Stacktrace
from druncschema.request_response_pb2 import Response, ResponseFlag, Description
from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag
from druncschema.token_pb2 import Token
import threading
from typing import NoReturn
import os

class ResponseDispatcher(threading.Thread):

Expand Down Expand Up @@ -395,14 +399,17 @@ class RESTAPIChildNode(ChildNode):
def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configuration:FSMConfHandler, uri):
super(RESTAPIChildNode, self).__init__(
name = name,
node_type = ControlType.REST_API
node_type = ControlType.REST_API,
configuration = configuration
)

from logging import getLogger
self.log = getLogger(f'{name}-rest-api-child')

self.response_listener = ResponseListener.get()

self.fsm_configuration = fsm_configuration

import socket
response_listener_host = socket.gethostname()

Expand All @@ -413,7 +420,7 @@ def __init__(self, name, configuration:RESTAPIChildNodeConfHandler, fsm_configur
from drunc.exceptions import DruncSetupException
raise DruncSetupException(f"Application {name} does not expose a control service in the configuration, or has not advertised itself to the application registry service, or the application registry service is not reachable.")

proxy_host, proxy_port = getattr(configuration.data, "proxy", [None, None])
proxy_host, proxy_port = getattr(self.configuration.data, "proxy", [None, None])
proxy_port = int(proxy_port) if proxy_port is not None else None

self.commander = AppCommander(
Expand Down Expand Up @@ -465,10 +472,6 @@ def get_status(self, token):
)

def propagate_command(self, command:str, data, token:Token) -> Response:
from druncschema.request_response_pb2 import ResponseFlag
from druncschema.generic_pb2 import PlainText, Stacktrace
from drunc.utils.grpc_utils import pack_to_any

if command == 'exclude':
self.state.exclude()
return Response(
Expand Down Expand Up @@ -496,8 +499,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response:
children = []
)

from druncschema.controller_pb2 import FSMCommandResponse, FSMResponseFlag

if self.state.excluded():
return Response(
name = self.name,
Expand All @@ -514,7 +515,11 @@ def propagate_command(self, command:str, data, token:Token) -> Response:
)

# here lies the mother of all the problems
if command != 'execute_fsm_command':
if command == 'execute_fsm_command':
return self.propagate_fsm_command(command, data, token)
elif command == 'describe':
return self.describe(token)
else:
self.log.info(f'Ignoring command \'{command}\' sent to \'{self.name}\'')
return Response(
name = self.name,
Expand All @@ -524,6 +529,7 @@ def propagate_command(self, command:str, data, token:Token) -> Response:
children = []
)

def propagate_fsm_command(self, command:str, data, token:Token) -> Response:
from drunc.exceptions import DruncException
entry_state = self.state.get_operational_state()
transition = self.fsm.get_transition(data.command_name)
Expand All @@ -532,7 +538,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response:
import json
self.log.info(f'Sending \'{data.command_name}\' to \'{self.name}\'')


try:
self.commander.send_command(
cmd_id = data.command_name,
Expand All @@ -558,7 +563,6 @@ def propagate_command(self, command:str, data, token:Token) -> Response:
command_name = data.command_name,
data = response_data
)
from drunc.utils.grpc_utils import pack_to_any
response = Response(
name = self.name,
token = token,
Expand Down
3 changes: 2 additions & 1 deletion src/drunc/controller/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def _grab_segment_conf_from_controller(self, configuration):
self.session = self.db.get_dal(class_name="Session", uid=self.oks_key.session)
this_segment = ControllerConfHandler.find_segment(self.session.segment, self.oks_key.obj_uid)
if this_segment is None:
CouldNotFindSegment(self.oks_key.obj_uid)
from drunc.exceptions import DruncSetupException
DruncSetupException(f"Could not find segment with oks_key.obj_uid: {self.oks_key.obj_uid}")
return this_segment

def _post_process_oks(self):
Expand Down
42 changes: 21 additions & 21 deletions src/drunc/controller/controller.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
from druncschema.request_response_pb2 import Request, Response, ResponseFlag
from druncschema.token_pb2 import Token
from druncschema.generic_pb2 import PlainText, PlainTextVector
from druncschema.authoriser_pb2 import ActionType, SystemType
from druncschema.broadcast_pb2 import BroadcastType
from druncschema.controller_pb2_grpc import ControllerServicer
from druncschema.controller_pb2 import Status
from druncschema.controller_pb2 import FSMCommand, FSMCommandResponse, FSMResponseFlag
from druncschema.controller_pb2 import Status, FSMCommand, FSMCommandResponse, FSMResponseFlag
from druncschema.generic_pb2 import PlainText, PlainTextVector
from druncschema.request_response_pb2 import Request, Response, ResponseFlag
from druncschema.token_pb2 import Token

from drunc.controller.children_interface.child_node import ChildNode
from drunc.controller.stateful_node import StatefulNode
from drunc.authoriser.decorators import authentified_and_authorised
from drunc.broadcast.server.broadcast_sender import BroadcastSender
import drunc.controller.exceptions as ctler_excpt
from drunc.utils.grpc_utils import pack_to_any
from typing import Optional, List
from drunc.broadcast.server.decorators import broadcasted
from drunc.utils.grpc_utils import unpack_request_data_to, pack_response
from drunc.authoriser.decorators import authentified_and_authorised
from druncschema.authoriser_pb2 import ActionType, SystemType
from drunc.controller.children_interface.child_node import ChildNode
from drunc.controller.decorators import in_control
import drunc.controller.exceptions as ctler_excpt
from drunc.controller.stateful_node import StatefulNode
from drunc.exceptions import DruncException
from drunc.utils.grpc_utils import pack_to_any
from drunc.utils.grpc_utils import unpack_request_data_to, pack_response

import signal
from typing import Optional, List

class ControllerActor:
def __init__(self, token:Optional[Token]=None):
Expand Down Expand Up @@ -168,10 +167,10 @@ def __init__(self, configuration, name:str, session:str, token:Token):
name = 'describe_fsm',
data_type = ['generic_pb2.PlainText', 'None'],
help = '''Return a description of the FSM transitions:
if a transition name is provided in its input, return that transition description;
if a state is provided, return the transitions accessible from that state;
if "all-transitions" is provided, return all the transitions;
if nothing (None) is provided, return the transitions accessible from the current state.''',
if a transition name is provided in its input, return that transition description;
if a state is provided, return the transitions accessible from that state;
if "all-transitions" is provided, return all the transitions;
if nothing (None) is provided, return the transitions accessible from the current state.''',
return_type = 'request_response_pb2.Description'
),

Expand Down Expand Up @@ -457,21 +456,22 @@ def status(self, token:Token) -> Response:
def describe(self, token:Token) -> Response:
from druncschema.request_response_pb2 import Description
from drunc.utils.grpc_utils import pack_to_any
from drunc.controller.utils import get_detector_name
bd = self.describe_broadcast()
d = Description(
# endpoint = self.uri,
type = 'controller',
name = self.name,
endpoint = self.uri,
info = get_detector_name(self.configuration),
session = self.session,
commands = self.commands,
# children_endpoints = [child.get_endpoint() for child in self.children_nodes],
)

if bd:
d.broadcast.CopyFrom(pack_to_any(bd))


response_children = self.propagate_to_list(
children_description = self.propagate_to_list(
'describe',
command_data = None,
token = token,
Expand All @@ -483,7 +483,7 @@ def describe(self, token:Token) -> Response:
token = None,
data = pack_to_any(d),
flag = ResponseFlag.EXECUTED_SUCCESSFULLY,
children = response_children,
children = children_description,
)

# ORDER MATTERS!
Expand Down
Loading

0 comments on commit 9bf7234

Please sign in to comment.