Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kiwipy/rmq related modules into independent module #297

Open
wants to merge 22 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c

A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.

The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.

The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.
2 changes: 1 addition & 1 deletion docs/source/nitpick-exceptions
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ py:class plumpy.base.state_machine.State
py:class State
py:class Process
py:class plumpy.futures.CancellableAction
py:class plumpy.communications.LoopCommunicator
py:class plumpy.rmq.communications.LoopCommunicator
py:class plumpy.persistence.PersistedPickle
py:class plumpy.utils.AttributesFrozendict
py:class plumpy.workchains._FunctionCall
Expand Down
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
Expand Down
12 changes: 7 additions & 5 deletions src/plumpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@

import logging

from .communications import *
# interfaces
from .controller import ProcessController
from .coordinator import Coordinator
from .events import *
from .exceptions import *
from .futures import *
from .loaders import *
from .message import *
from .mixins import *
from .persistence import *
from .ports import *
from .process_comms import *
from .process_listener import *
from .process_states import *
from .processes import *
from .rmq import *
from .utils import *
from .workchains import *

Expand All @@ -27,14 +30,13 @@
+ futures.__all__
+ mixins.__all__
+ persistence.__all__
+ communications.__all__
+ process_comms.__all__
+ message.__all__
+ process_listener.__all__
+ workchains.__all__
+ loaders.__all__
+ ports.__all__
+ process_states.__all__
)
) + ['ProcessController', 'Coordinator']


# Do this se we don't get the "No handlers could be found..." warnings that will be produced
Expand Down
116 changes: 116 additions & 0 deletions src/plumpy/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Protocol

from plumpy import loaders
from plumpy.message import MessageType
from plumpy.utils import PID_TYPE

ProcessResult = Any
ProcessStatus = Any


class ProcessController(Protocol):
"""
Control processes using coroutines that will send messages and wait
(in a non-blocking way) for their response
"""

def get_status(self, pid: 'PID_TYPE') -> ProcessStatus:
"""
Get the status of a process with the given PID
:param pid: the process id
:return: the status response from the process
"""
...

def pause_process(self, pid: 'PID_TYPE', msg: Any | None = None) -> ProcessResult:
"""
Pause the process

:param pid: the pid of the process to pause
:param msg: optional pause message
:return: True if paused, False otherwise
"""
...

def play_process(self, pid: 'PID_TYPE') -> ProcessResult:
"""
Play the process

:param pid: the pid of the process to play
:return: True if played, False otherwise
"""
...

def kill_process(self, pid: 'PID_TYPE', msg: MessageType | None = None) -> ProcessResult:
"""
Kill the process

:param pid: the pid of the process to kill
:param msg: optional kill message
:return: True if killed, False otherwise
"""
...

def continue_process(
self, pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False, no_reply: bool = False
) -> ProcessResult | None:
"""
Continue the process

:param _communicator: the communicator
:param pid: the pid of the process to continue
:param tag: the checkpoint tag to continue from
"""
...

async def launch_process(
self,
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
persist: bool = False,
loader: loaders.ObjectLoader | None = None,
nowait: bool = False,
no_reply: bool = False,
) -> ProcessResult:
"""
Launch a process given the class and constructor arguments

:param process_class: the class of the process to launch
:param init_args: the constructor positional arguments
:param init_kwargs: the constructor keyword arguments
:param persist: should the process be persisted
:param loader: the classloader to use
:param nowait: if True, don't wait for the process to send a response, just return the pid
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:return: the result of launching the process
"""
...

async def execute_process(
self,
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
loader: loaders.ObjectLoader | None = None,
nowait: bool = False,
no_reply: bool = False,
) -> ProcessResult:
"""
Execute a process. This call will first send a create task and then a continue task over
the communicator. This means that if communicator messages are durable then the process
will run until the end even if this interpreter instance ceases to exist.

:param process_class: the process class to execute
:param init_args: the positional arguments to the class constructor
:param init_kwargs: the keyword arguments to the class constructor
:param loader: the class loader to use
:param nowait: if True, don't wait for the process to send a response
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:return: the result of executing the process
"""
...
52 changes: 52 additions & 0 deletions src/plumpy/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Callable, Hashable, Pattern, Protocol

if TYPE_CHECKING:
# identifiers for subscribers
ID_TYPE = Hashable
Subscriber = Callable[..., Any]
# RPC subscriber params: communicator, msg
RpcSubscriber = Callable[['Coordinator', Any], Any]
# Task subscriber params: communicator, task
TaskSubscriber = Callable[['Coordinator', Any], Any]
# Broadcast subscribers params: communicator, body, sender, subject, correlation id
BroadcastSubscriber = Callable[['Coordinator', Any, Any, Any, ID_TYPE], Any]


class Coordinator(Protocol):
# XXX: naming - 'add_message_handler'
def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ...

# XXX: naming - 'add_broadcast_handler'
def add_broadcast_subscriber(
self,
subscriber: 'BroadcastSubscriber',
subject_filters: list[Hashable | Pattern[str]] | None = None,
sender_filters: list[Hashable | Pattern[str]] | None = None,
identifier: 'ID_TYPE | None' = None,
) -> Any: ...

# XXX: naming - absorbed into 'add_message_handler'
def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ...

def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ...

def remove_broadcast_subscriber(self, identifier: 'ID_TYPE | None') -> None: ...

def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: ...

def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ...

def broadcast_send(
self,
body: Any | None,
sender: 'ID_TYPE | None' = None,
subject: str | None = None,
correlation_id: 'ID_TYPE | None' = None,
) -> Any: ...

def task_send(self, task: Any, no_reply: bool = False) -> Any: ...

def close(self) -> None: ...
29 changes: 26 additions & 3 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# -*- coding: utf-8 -*-
from typing import Optional

__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult']
__all__ = [
'ClosedError',
'CoordinatorConnectionError',
'CoordinatorTimeoutError',
'InvalidStateError',
'KilledError',
'PersistenceError',
'UnsuccessfulResult',
]


class KilledError(Exception):
"""The process was killed."""


class InvalidStateError(Exception):
"""
Raised when an operation is attempted that requires the process to be in a state
"""Raised when an operation is attempted that requires the process to be in a state
that is different from the current state
"""

Expand All @@ -33,3 +40,19 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


class TaskRejectedError(Exception):
"""A task was rejected by the coordinacor"""


class CoordinatorCommunicationError(Exception):
"""Generic coordinator communication error"""


class CoordinatorConnectionError(ConnectionError):
"""Raised when coordinator cannot be connected"""


class CoordinatorTimeoutError(TimeoutError):
"""Raised when communicate with coordinator timeout"""
84 changes: 34 additions & 50 deletions src/plumpy/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,36 @@
Module containing future related methods and classes
"""

from __future__ import annotations

import asyncio
from typing import Any, Awaitable, Callable, Optional
import contextlib
from typing import Any, Awaitable, Callable, Generator, Optional

import kiwipy
__all__ = ['CancellableAction', 'capture_exceptions', 'create_task', 'create_task', 'Future']

__all__ = ['CancelledError', 'Future', 'chain', 'copy_future', 'create_task', 'gather']

CancelledError = kiwipy.CancelledError
class InvalidFutureError(Exception):
"""Exception for when a future or action is in an invalid state"""


class InvalidStateError(Exception):
"""Exception for when a future or action is in an invalid state"""
Future = asyncio.Future


copy_future = kiwipy.copy_future
chain = kiwipy.chain
gather = asyncio.gather
@contextlib.contextmanager
def capture_exceptions(future, ignore: tuple[type[BaseException], ...] = ()) -> Generator[None, Any, None]: # type: ignore[no-untyped-def]
"""
Capture any exceptions in the context and set them as the result of the given future

Future = asyncio.Future
:param future: The future to the exception on
:param ignore: An optional list of exception types to ignore, these will be raised and not set on the future
"""
try:
yield
except ignore:
raise
except Exception as exception:
future.set_exception(exception)


class CancellableAction(Future):
Expand All @@ -46,10 +57,10 @@ def run(self, *args: Any, **kwargs: Any) -> None:
:param kwargs: the keyword arguments to the action
"""
if self.done():
raise InvalidStateError('Action has already been ran')
raise InvalidFutureError('Action has already been ran')

try:
with kiwipy.capture_exceptions(self):
with capture_exceptions(self):
self.set_result(self._action(*args, **kwargs))
finally:
self._action = None # type: ignore
Expand All @@ -67,41 +78,14 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr
"""
loop = loop or asyncio.get_event_loop()

future = loop.create_future()

async def run_task() -> None:
with kiwipy.capture_exceptions(future):
res = await coro()
future.set_result(res)

asyncio.run_coroutine_threadsafe(run_task(), loop)
return future


def unwrap_kiwi_future(future: kiwipy.Future) -> kiwipy.Future:
"""
Create a kiwi future that represents the final results of a nested series of futures,
meaning that if the futures provided itself resolves to a future the returned
future will not resolve to a value until the final chain of futures is not a future
but a concrete value. If at any point in the chain a future resolves to an exception
then the returned future will also resolve to that exception.

:param future: the future to unwrap
:return: the unwrapping future

"""
unwrapping = kiwipy.Future()

def unwrap(fut: kiwipy.Future) -> None:
if fut.cancelled():
unwrapping.cancel()
else:
with kiwipy.capture_exceptions(unwrapping):
result = fut.result()
if isinstance(result, kiwipy.Future):
result.add_done_callback(unwrap)
else:
unwrapping.set_result(result)

future.add_done_callback(unwrap)
return unwrapping
# future = loop.create_future()
#
# async def run_task() -> None:
# with capture_exceptions(future):
# res = await coro()
# future.set_result(res)
#
# asyncio.run_coroutine_threadsafe(run_task(), loop)
# return future

return asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro(), loop))
Loading
Loading