diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index 9dca8fdb..ed87f357 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -7,6 +7,8 @@ 'KilledError', 'PersistenceError', 'UnsuccessfulResult', + 'CoordinatorConnectionError', + 'CoordinatorTimeoutError', ] @@ -42,3 +44,11 @@ class ClosedError(Exception): class TaskRejectedError(Exception): """A task was rejected by the coordinacor""" + + +class CoordinatorConnectionError(ConnectionError): + """Raised when coordinator cannot be connected""" + + +class CoordinatorTimeoutError(TimeoutError): + """Raised when communicate with coordinator timeout""" diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 4da9cc06..c08836e1 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -741,19 +741,15 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None: call_with_super_check(self.on_killed) if self._coordinator and isinstance(self.state, enum.Enum): - # FIXME: this part should be tested first - # FIXME: move all to `coordinator.broadcast()` call and in rmq implement coordinator - from plumpy.rmq.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed - from_label = cast(enum.Enum, from_state.LABEL).value if from_state is not None else None subject = f'state_changed.{from_label}.{self.state.value}' self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) try: self._coordinator.broadcast_send(body=None, sender=self.pid, subject=subject) - except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError): + except exceptions.CoordinatorConnectionError: message = 'Process<%s>: no connection available to broadcast state change from %s to %s' self.logger.warning(message, self.pid, from_label, self.state.value) - except concurrent.futures.TimeoutError: + except exceptions.CoordinatorTimeoutError: message = 'Process<%s>: sending broadcast of state change from %s to %s timed out' self.logger.warning(message, self.pid, from_label, self.state.value) diff --git a/src/plumpy/rmq/__init__.py b/src/plumpy/rmq/__init__.py index fbb9f243..c44c5a2e 100644 --- a/src/plumpy/rmq/__init__.py +++ b/src/plumpy/rmq/__init__.py @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- # mypy: disable-error-code=name-defined from .communications import * -from .exceptions import * from .futures import * from .process_control import * -__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__ +__all__ = communications.__all__ + futures.__all__ + process_control.__all__ diff --git a/src/plumpy/rmq/exceptions.py b/src/plumpy/rmq/exceptions.py deleted file mode 100644 index 02eb3c97..00000000 --- a/src/plumpy/rmq/exceptions.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -import kiwipy -from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed - -__all__ = [ - 'CommunicatorChannelInvalidStateError', - 'CommunicatorConnectionClosed', -] - -# Alias aio_pika -CommunicatorConnectionClosed = ConnectionClosed -CommunicatorChannelInvalidStateError = ChannelInvalidStateError - -CancelledError = kiwipy.CancelledError diff --git a/tests/rmq/__init__.py b/tests/rmq/__init__.py index e0f263b8..50913d33 100644 --- a/tests/rmq/__init__.py +++ b/tests/rmq/__init__.py @@ -1,5 +1,8 @@ - +# -*- coding: utf-8 -*- import kiwipy +import concurrent.futures + +from plumpy.exceptions import CoordinatorConnectionError, CoordinatorTimeoutError class RmqCoordinator: @@ -40,11 +43,19 @@ def broadcast_send( subject=None, correlation_id=None, ): - return self._comm.broadcast_send(body, sender, subject, correlation_id) + from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed + + try: + rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) + except (ChannelInvalidStateError, ConnectionClosed) as exc: + raise CoordinatorConnectionError from exc + except concurrent.futures.TimeoutError as exc: + raise CoordinatorTimeoutError from exc + else: + return rsp def task_send(self, task, no_reply=False): return self._comm.task_send(task, no_reply) def close(self): self._comm.close() -