Skip to content

Commit

Permalink
General error for Coordinator communicatior
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 18, 2024
1 parent da644ac commit 3f6c38b
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/plumpy/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ...
def broadcast_send(
self,
body: Any | None,
sender: Hashable | str | None = None,
sender: 'ID_TYPE | None' = None,
subject: str | None = None,
correlation_id: 'ID_TYPE | None' = None,
) -> Any: ...
Expand Down
4 changes: 4 additions & 0 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class TaskRejectedError(Exception):
"""A task was rejected by the coordinacor"""


class CoordinatorCommunicationError(Exception):
"""Generic coordinator communication error"""


class CoordinatorConnectionError(ConnectionError):
"""Raised when coordinator cannot be connected"""

Expand Down
13 changes: 7 additions & 6 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,13 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._coordinator.broadcast_send(body=None, sender=self.pid, subject=subject)
except exceptions.CoordinatorConnectionError:
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except exceptions.CoordinatorTimeoutError:
message = 'Process<%s>: sending broadcast of state change from %s to %s timed out'
self.logger.warning(message, self.pid, from_label, self.state.value)
except exceptions.CoordinatorCommunicationError:
message = f'Process<{self.pid}>: cannot broadcast state change from {from_label} to {self.state.value}'
self.logger.warning(message)
self.logger.debug(message, exc_info=True)
except Exception:
# bubble up for unknown exception
raise

def on_exiting(self) -> None:
state = self.state
Expand Down
13 changes: 12 additions & 1 deletion src/plumpy/rmq/process_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

import asyncio
from typing import Any, Dict, Optional, Sequence, Union
from typing import Any, Dict, Hashable, Optional, Sequence, Union

import kiwipy

Expand Down Expand Up @@ -274,6 +274,17 @@ def kill_all(self, msg: Optional[MessageType]) -> None:

self._coordinator.broadcast_send(msg, subject=Intent.KILL)

def notify_all(self, msg: MessageType | None, sender: Hashable | None = None, subject: str | None = None) -> None:
"""
Notify all processes by broadcasting
:param msg: an optional pause message
"""
if msg is None:
msg = MessageBuilder.kill()

self._coordinator.broadcast_send(msg, sender=sender, subject=subject)

def continue_process(
self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False
) -> Union[None, PID_TYPE, ProcessResult]:
Expand Down
14 changes: 9 additions & 5 deletions tests/rmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
import kiwipy
import concurrent.futures

from plumpy.exceptions import CoordinatorConnectionError, CoordinatorTimeoutError
from plumpy.exceptions import CoordinatorConnectionError


class RmqCoordinator:
def __init__(self, comm: kiwipy.Communicator):
self._comm = comm

# XXX: naming - `add_receiver_rpc`
def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

# XXX: naming - `add_receiver_broadcast`
def add_broadcast_subscriber(
self,
subscriber,
Expand All @@ -21,6 +23,7 @@ def add_broadcast_subscriber(
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

# XXX: naming - `add_reciver_task` (can be combined with two above maybe??)
def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

Expand All @@ -33,27 +36,28 @@ def remove_broadcast_subscriber(self, identifier):
def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

# XXX: naming - `send_to`
def rpc_send(self, recipient_id, msg):
return self._comm.rpc_send(recipient_id, msg)

# XXX: naming - `broadcast`
def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed
from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError

try:
rsp = self._comm.broadcast_send(body, sender, subject, correlation_id)
except (ChannelInvalidStateError, ConnectionClosed) as exc:
except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc:
raise CoordinatorConnectionError from exc
except concurrent.futures.TimeoutError as exc:
raise CoordinatorTimeoutError from exc
else:
return rsp

# XXX: naming - `assign_task` (this may able to be combined with send_to)
def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

Expand Down

0 comments on commit 3f6c38b

Please sign in to comment.