From e10e43635587a0338487d251d76045830e71bd1b Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Wed, 18 Dec 2024 10:38:22 +0100 Subject: [PATCH] General error for Coordinator communicatior --- src/plumpy/coordinator.py | 2 +- src/plumpy/exceptions.py | 4 ++++ src/plumpy/processes.py | 13 +++++++------ src/plumpy/rmq/process_control.py | 13 ++++++++++++- tests/rmq/__init__.py | 14 +++++++++----- 5 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/plumpy/coordinator.py b/src/plumpy/coordinator.py index 57418683..b3dcbec5 100644 --- a/src/plumpy/coordinator.py +++ b/src/plumpy/coordinator.py @@ -38,7 +38,7 @@ def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ... def broadcast_send( self, body: Any | None, - sender: Hashable | str | None = None, + sender: 'ID_TYPE | None' = None, subject: str | None = None, correlation_id: 'ID_TYPE | None' = None, ) -> Any: ... diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index 51898c70..5d05ea4b 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -46,6 +46,10 @@ class TaskRejectedError(Exception): """A task was rejected by the coordinacor""" +class CoordinatorCommunicationError(Exception): + """Generic coordinator communication error""" + + class CoordinatorConnectionError(ConnectionError): """Raised when coordinator cannot be connected""" diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 5d2c0218..754d501c 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -746,12 +746,13 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None: self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) try: self._coordinator.broadcast_send(body=None, sender=self.pid, subject=subject) - except exceptions.CoordinatorConnectionError: - message = 'Process<%s>: no connection available to broadcast state change from %s to %s' - self.logger.warning(message, self.pid, from_label, self.state.value) - except exceptions.CoordinatorTimeoutError: - message = 'Process<%s>: sending broadcast of state change from %s to %s timed out' - self.logger.warning(message, self.pid, from_label, self.state.value) + except exceptions.CoordinatorCommunicationError: + message = f'Process<{self.pid}>: cannot broadcast state change from {from_label} to {self.state.value}' + self.logger.warning(message) + self.logger.debug(message, exc_info=True) + except Exception: + # bubble up for unknown exception + raise def on_exiting(self) -> None: state = self.state diff --git a/src/plumpy/rmq/process_control.py b/src/plumpy/rmq/process_control.py index fc04a99f..fe040aa5 100644 --- a/src/plumpy/rmq/process_control.py +++ b/src/plumpy/rmq/process_control.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio -from typing import Any, Dict, Optional, Sequence, Union +from typing import Any, Dict, Hashable, Optional, Sequence, Union import kiwipy @@ -274,6 +274,17 @@ def kill_all(self, msg: Optional[MessageType]) -> None: self._coordinator.broadcast_send(msg, subject=Intent.KILL) + def notify_all(self, msg: MessageType | None, sender: Hashable | None = None, subject: str | None = None) -> None: + """ + Notify all processes by broadcasting + + :param msg: an optional pause message + """ + if msg is None: + msg = MessageBuilder.kill() + + self._coordinator.broadcast_send(msg, sender=sender, subject=subject) + def continue_process( self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False ) -> Union[None, PID_TYPE, ProcessResult]: diff --git a/tests/rmq/__init__.py b/tests/rmq/__init__.py index 50913d33..72078829 100644 --- a/tests/rmq/__init__.py +++ b/tests/rmq/__init__.py @@ -2,16 +2,18 @@ import kiwipy import concurrent.futures -from plumpy.exceptions import CoordinatorConnectionError, CoordinatorTimeoutError +from plumpy.exceptions import CoordinatorConnectionError class RmqCoordinator: def __init__(self, comm: kiwipy.Communicator): self._comm = comm + # XXX: naming - `add_receiver_rpc` def add_rpc_subscriber(self, subscriber, identifier=None): return self._comm.add_rpc_subscriber(subscriber, identifier) + # XXX: naming - `add_receiver_broadcast` def add_broadcast_subscriber( self, subscriber, @@ -21,6 +23,7 @@ def add_broadcast_subscriber( subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) return self._comm.add_broadcast_subscriber(subscriber, identifier) + # XXX: naming - `add_reciver_task` (can be combined with two above maybe??) def add_task_subscriber(self, subscriber, identifier=None): return self._comm.add_task_subscriber(subscriber, identifier) @@ -33,9 +36,11 @@ def remove_broadcast_subscriber(self, identifier): def remove_task_subscriber(self, identifier): return self._comm.remove_task_subscriber(identifier) + # XXX: naming - `send_to` def rpc_send(self, recipient_id, msg): return self._comm.rpc_send(recipient_id, msg) + # XXX: naming - `broadcast` def broadcast_send( self, body, @@ -43,17 +48,16 @@ def broadcast_send( subject=None, correlation_id=None, ): - from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed + from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError try: rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) - except (ChannelInvalidStateError, ConnectionClosed) as exc: + except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc: raise CoordinatorConnectionError from exc - except concurrent.futures.TimeoutError as exc: - raise CoordinatorTimeoutError from exc else: return rsp + # XXX: naming - `assign_task` (this may able to be combined with send_to) def task_send(self, task, no_reply=False): return self._comm.task_send(task, no_reply)