diff --git a/src/plumpy/__init__.py b/src/plumpy/__init__.py index 237617ac..8bb5b0d0 100644 --- a/src/plumpy/__init__.py +++ b/src/plumpy/__init__.py @@ -18,6 +18,10 @@ from .utils import * from .workchains import * +# interfaces +from .controller import ProcessController +from .coordinator import Coordinator + __all__ = ( events.__all__ + exceptions.__all__ @@ -32,7 +36,7 @@ + loaders.__all__ + ports.__all__ + process_states.__all__ -) +) + ['ProcessController', 'Coordinator'] # Do this se we don't get the "No handlers could be found..." warnings that will be produced diff --git a/src/plumpy/controller.py b/src/plumpy/controller.py new file mode 100644 index 00000000..5a411fd1 --- /dev/null +++ b/src/plumpy/controller.py @@ -0,0 +1,113 @@ +from collections.abc import Sequence +from typing import Any, Protocol + +from plumpy import loaders +from plumpy.message import MessageType +from plumpy.utils import PID_TYPE + +ProcessResult = Any +ProcessStatus = Any + + +class ProcessController(Protocol): + """ + Control processes using coroutines that will send messages and wait + (in a non-blocking way) for their response + """ + + def get_status(self, pid: 'PID_TYPE') -> ProcessStatus: + """ + Get the status of a process with the given PID + :param pid: the process id + :return: the status response from the process + """ + ... + + def pause_process(self, pid: 'PID_TYPE', msg: Any | None = None) -> ProcessResult: + """ + Pause the process + + :param pid: the pid of the process to pause + :param msg: optional pause message + :return: True if paused, False otherwise + """ + ... + + def play_process(self, pid: 'PID_TYPE') -> ProcessResult: + """ + Play the process + + :param pid: the pid of the process to play + :return: True if played, False otherwise + """ + ... + + def kill_process(self, pid: 'PID_TYPE', msg: MessageType | None = None) -> ProcessResult: + """ + Kill the process + + :param pid: the pid of the process to kill + :param msg: optional kill message + :return: True if killed, False otherwise + """ + ... + + def continue_process( + self, pid: 'PID_TYPE', tag: str|None = None, nowait: bool = False, no_reply: bool = False + ) -> ProcessResult | None: + """ + Continue the process + + :param _communicator: the communicator + :param pid: the pid of the process to continue + :param tag: the checkpoint tag to continue from + """ + ... + + async def launch_process( + self, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + persist: bool = False, + loader: loaders.ObjectLoader | None = None, + nowait: bool = False, + no_reply: bool = False, + ) -> ProcessResult: + """ + Launch a process given the class and constructor arguments + + :param process_class: the class of the process to launch + :param init_args: the constructor positional arguments + :param init_kwargs: the constructor keyword arguments + :param persist: should the process be persisted + :param loader: the classloader to use + :param nowait: if True, don't wait for the process to send a response, just return the pid + :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :return: the result of launching the process + """ + ... + + async def execute_process( + self, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + loader: loaders.ObjectLoader | None = None, + nowait: bool = False, + no_reply: bool = False, + ) -> ProcessResult: + """ + Execute a process. This call will first send a create task and then a continue task over + the communicator. This means that if communicator messages are durable then the process + will run until the end even if this interpreter instance ceases to exist. + + :param process_class: the process class to execute + :param init_args: the positional arguments to the class constructor + :param init_kwargs: the keyword arguments to the class constructor + :param loader: the class loader to use + :param nowait: if True, don't wait for the process to send a response + :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :return: the result of executing the process + """ + ... diff --git a/src/plumpy/coordinator.py b/src/plumpy/coordinator.py index cd66a883..c229a6c4 100644 --- a/src/plumpy/coordinator.py +++ b/src/plumpy/coordinator.py @@ -1,15 +1,25 @@ # -*- coding: utf-8 -*- -from typing import Any, Callable, Pattern, Protocol +import concurrent.futures +from typing import TYPE_CHECKING, Any, Callable, Hashable, Pattern, Protocol -RpcSubscriber = Callable[['Communicator', Any], Any] -BroadcastSubscriber = Callable[['Communicator', Any, Any, Any, Any], Any] + +if TYPE_CHECKING: + # identifiers for subscribers + ID_TYPE = Hashable + Subscriber = Callable[..., Any] + # RPC subscriber params: communicator, msg + RpcSubscriber = Callable[['Coordinator', Any], Any] + # Task subscriber params: communicator, task + TaskSubscriber = Callable[['Coordinator', Any], Any] + # Broadcast subscribers params: communicator, body, sender, subject, correlation id + BroadcastSubscriber = Callable[['Coordinator', Any, Any, Any, ID_TYPE], Any] class Communicator(Protocol): - def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any: ... + def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ... def add_broadcast_subscriber( - self, subscriber: BroadcastSubscriber, subject_filter: str | Pattern[str] | None = None, identifier=None + self, subscriber: 'BroadcastSubscriber', subject_filter: str | Pattern[str] | None = None, identifier=None ) -> Any: ... def remove_rpc_subscriber(self, identifier): ... @@ -18,15 +28,33 @@ def remove_broadcast_subscriber(self, identifier): ... def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: ... + class Coordinator(Protocol): - def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any: ... + def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier=None) -> Any: ... def add_broadcast_subscriber( - self, subscriber: BroadcastSubscriber, subject_filter: str | Pattern[str] | None = None, identifier=None + self, + subscriber: 'BroadcastSubscriber', + subject_filter: str | Pattern[str] | None = None, + identifier: 'ID_TYPE | None' = None, ) -> Any: ... + def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ... + def remove_rpc_subscriber(self, identifier): ... def remove_broadcast_subscriber(self, identifier): ... - def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: ... + def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: ... + + def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ... + + def broadcast_send( + self, + body: Any | None, + sender: str | None = None, + subject: str | None = None, + correlation_id: 'ID_TYPE | None' = None, + ) -> Any: ... + + def task_send(self, task: Any, no_reply: bool = False) -> Any: ... diff --git a/src/plumpy/rmq/process_comms.py b/src/plumpy/rmq/process_comms.py index 8b63c39e..fc04a99f 100644 --- a/src/plumpy/rmq/process_comms.py +++ b/src/plumpy/rmq/process_comms.py @@ -9,6 +9,7 @@ import kiwipy from plumpy import loaders +from plumpy.coordinator import Coordinator from plumpy.message import ( Intent, MessageBuilder, @@ -34,8 +35,8 @@ class RemoteProcessController: (in a non-blocking way) for their response """ - def __init__(self, communicator: kiwipy.Communicator) -> None: - self._communicator = communicator + def __init__(self, coordinator: Coordinator) -> None: + self._coordinator = coordinator async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': """ @@ -43,7 +44,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': :param pid: the process id :return: the status response from the process """ - future = self._communicator.rpc_send(pid, MessageBuilder.status()) + future = self._coordinator.rpc_send(pid, MessageBuilder.status()) result = await asyncio.wrap_future(future) return result @@ -57,8 +58,8 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr """ msg = MessageBuilder.pause(text=msg) - pause_future = self._communicator.rpc_send(pid, msg) - # rpc_send return a thread future from communicator + pause_future = self._coordinator.rpc_send(pid, msg) + # rpc_send return a thread future from coordinator future = await asyncio.wrap_future(pause_future) # future is just returned from rpc call which return a kiwipy future result = await asyncio.wrap_future(future) @@ -71,7 +72,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': :param pid: the pid of the process to play :return: True if played, False otherwise """ - play_future = self._communicator.rpc_send(pid, MessageBuilder.play()) + play_future = self._coordinator.rpc_send(pid, MessageBuilder.play()) future = await asyncio.wrap_future(play_future) result = await asyncio.wrap_future(future) return result @@ -88,7 +89,7 @@ async def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) msg = MessageBuilder.kill() # Wait for the communication to go through - kill_future = self._communicator.rpc_send(pid, msg) + kill_future = self._coordinator.rpc_send(pid, msg) future = await asyncio.wrap_future(kill_future) # Now wait for the kill to be enacted result = await asyncio.wrap_future(future) @@ -100,13 +101,13 @@ async def continue_process( """ Continue the process - :param _communicator: the communicator + :param _coordinator: the coordinator :param pid: the pid of the process to continue :param tag: the checkpoint tag to continue from """ message = create_continue_body(pid=pid, tag=tag, nowait=nowait) # Wait for the communication to go through - continue_future = self._communicator.task_send(message, no_reply=no_reply) + continue_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(continue_future) if no_reply: @@ -140,7 +141,7 @@ async def launch_process( """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) - launch_future = self._communicator.task_send(message, no_reply=no_reply) + launch_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(launch_future) if no_reply: @@ -160,7 +161,7 @@ async def execute_process( ) -> 'ProcessResult': """ Execute a process. This call will first send a create task and then a continue task over - the communicator. This means that if communicator messages are durable then the process + the coordinator. This means that if coordinator messages are durable then the process will run until the end even if this interpreter instance ceases to exist. :param process_class: the process class to execute @@ -174,12 +175,12 @@ async def execute_process( message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader) - create_future = self._communicator.task_send(message) + create_future = self._coordinator.task_send(message) future = await asyncio.wrap_future(create_future) pid: 'PID_TYPE' = await asyncio.wrap_future(future) message = create_continue_body(pid, nowait=nowait) - continue_future = self._communicator.task_send(message, no_reply=no_reply) + continue_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(continue_future) if no_reply: @@ -194,14 +195,14 @@ class RemoteProcessThreadController: A class that can be used to control and launch remote processes """ - def __init__(self, communicator: kiwipy.Communicator): + def __init__(self, coordinator: Coordinator): """ Create a new process controller - :param communicator: the communicator to use + :param coordinator: the coordinator to use """ - self._communicator = communicator + self._coordinator = coordinator def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: """Get the status of a process with the given PID. @@ -209,7 +210,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: :param pid: the process id :return: the status response from the process """ - return self._communicator.rpc_send(pid, MessageBuilder.status()) + return self._coordinator.rpc_send(pid, MessageBuilder.status()) def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: """ @@ -222,15 +223,15 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu """ msg = MessageBuilder.pause(text=msg) - return self._communicator.rpc_send(pid, msg) + return self._coordinator.rpc_send(pid, msg) def pause_all(self, msg: Any) -> None: """ - Pause all processes that are subscribed to the same communicator + Pause all processes that are subscribed to the same coordinator :param msg: an optional pause message """ - self._communicator.broadcast_send(msg, subject=Intent.PAUSE) + self._coordinator.broadcast_send(msg, subject=Intent.PAUSE) def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: """ @@ -240,13 +241,13 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: :return: a response future from the process to be played """ - return self._communicator.rpc_send(pid, MessageBuilder.play()) + return self._coordinator.rpc_send(pid, MessageBuilder.play()) def play_all(self) -> None: """ - Play all processes that are subscribed to the same communicator + Play all processes that are subscribed to the same coordinator """ - self._communicator.broadcast_send(None, subject=Intent.PLAY) + self._coordinator.broadcast_send(None, subject=Intent.PLAY) def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> kiwipy.Future: """ @@ -260,24 +261,24 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> ki if msg is None: msg = MessageBuilder.kill() - return self._communicator.rpc_send(pid, msg) + return self._coordinator.rpc_send(pid, msg) def kill_all(self, msg: Optional[MessageType]) -> None: """ - Kill all processes that are subscribed to the same communicator + Kill all processes that are subscribed to the same coordinator :param msg: an optional pause message """ if msg is None: msg = MessageBuilder.kill() - self._communicator.broadcast_send(msg, subject=Intent.KILL) + self._coordinator.broadcast_send(msg, subject=Intent.KILL) def continue_process( self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False ) -> Union[None, PID_TYPE, ProcessResult]: message = create_continue_body(pid=pid, tag=tag, nowait=nowait) - return self._communicator.task_send(message, no_reply=no_reply) + return self._coordinator.task_send(message, no_reply=no_reply) def launch_process( self, @@ -302,7 +303,7 @@ def launch_process( :return: the pid of the created process or the outputs (if nowait=False) """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) - return self._communicator.task_send(message, no_reply=no_reply) + return self._coordinator.task_send(message, no_reply=no_reply) def execute_process( self, @@ -315,7 +316,7 @@ def execute_process( ) -> Union[None, PID_TYPE, ProcessResult]: """ Execute a process. This call will first send a create task and then a continue task over - the communicator. This means that if communicator messages are durable then the process + the coordinator. This means that if coordinator messages are durable then the process will run until the end even if this interpreter instance ceases to exist. :param process_class: the process class to execute @@ -329,7 +330,7 @@ def execute_process( message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader) execute_future = kiwipy.Future() - create_future = self._communicator.task_send(message) + create_future = self._coordinator.task_send(message) def on_created(_: Any) -> None: with kiwipy.capture_exceptions(execute_future):