Skip to content

Commit

Permalink
Merge branch 'support/0.21.x' into async-run
Browse files Browse the repository at this point in the history
  • Loading branch information
khsrali authored Dec 5, 2024
2 parents ff1b699 + 0bf9961 commit 8b1b2cc
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:

strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']

services:
postgres:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:

strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']

services:
rabbitmq:
Expand Down
6 changes: 5 additions & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
version: 2

build:
os: ubuntu-22.04
tools:
python: '3.8'

python:
version: 3.8
install:
- method: pip
path: .
Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Changelog

## v0.21.11 - 2024-07-01

### Fixes
- Make `Waiting.resume()` idempotent [[a79497b]](https://github.com/aiidateam/plumpy/commit/a79497ba37cef7bc609cee90535ad86708fc48f9)

### Dependencies
- Add requirement `nbdime<4` [[94df0df]](https://github.com/aiidateam/plumpy/commit/94df0dfd0a3ea93174aa4de83ac5e06246350c27)


## v0.21.10 - 2023-11-13

### Dependencies

- Dependencies: Add support for Python 3.12 [[2af3907]](https://github.com/aiidateam/plumpy/commit/2af390738df3f151c8225c01e265527b65d7a005)


## v0.21.9 - 2023-11-10

### Features
- Make `ProcessListener` instances persistable [[98a375f]](https://github.com/aiidateam/plumpy/commit/98a375f07db0cacaacdc1545d4d12f25dd00bf1d)

### Fixes
- Catch `ChannelInvalidStateError` in process state change [[db2af9a]](https://github.com/aiidateam/plumpy/commit/db2af9acf7c139798a21e574d6308ae21b3b7513)

### Devops
- Update ReadTheDocs configuration file [[31f85c7]](https://github.com/aiidateam/plumpy/commit/31f85c71730b488aafd680f240485a51884722b7)


## v0.21.8 - 2023-06-07

### Devops
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
]
keywords = ['workflow', 'multithreaded', 'rabbitmq']
requires-python = '>=3.7'
Expand Down Expand Up @@ -62,6 +63,7 @@ tests = [
'pytest-asyncio==0.16.0',
'pytest-cov==3.0.0',
'pytest-notebook>=0.8.1',
'nbdime<4',
'shortuuid==1.0.8',
]

Expand Down
2 changes: 1 addition & 1 deletion src/plumpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# pylint: disable=undefined-variable
# mypy: disable-error-code="name-defined"
__version__ = '0.21.8'
__version__ = '0.21.11'

import logging

Expand Down
2 changes: 2 additions & 0 deletions src/plumpy/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> None:
wrapped(self, *args, **kwargs)
self._called -= 1

# Forward wrapped function name to the decorator to show the correct name in the ``call_with_super_check``
wrapper.__name__ = wrapped.__name__
return wrapper


Expand Down
54 changes: 54 additions & 0 deletions src/plumpy/event_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
import logging
from typing import TYPE_CHECKING, Any, Callable

from . import persistence

if TYPE_CHECKING:
from typing import Set, Type

from .process_listener import ProcessListener # pylint: disable=cyclic-import

_LOGGER = logging.getLogger(__name__)


@persistence.auto_persist('_listeners', '_listener_type')
class EventHelper(persistence.Savable):

def __init__(self, listener_type: 'Type[ProcessListener]'):
assert listener_type is not None, 'Must provide valid listener type'

self._listener_type = listener_type
self._listeners: 'Set[ProcessListener]' = set()

def add_listener(self, listener: 'ProcessListener') -> None:
assert isinstance(listener, self._listener_type), 'Listener is not of right type'
self._listeners.add(listener)

def remove_listener(self, listener: 'ProcessListener') -> None:
self._listeners.discard(listener)

def remove_all_listeners(self) -> None:
self._listeners.clear()

@property
def listeners(self) -> 'Set[ProcessListener]':
return self._listeners

def fire_event(self, event_function: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Call an event method on all listeners.
:param event_function: the method of the ProcessListener
:param args: arguments to pass to the method
:param kwargs: keyword arguments to pass to the method
"""
if event_function is None:
raise ValueError('Must provide valid event method')

# Make a copy of the list for iteration just in case it changes in a callback
for listener in list(self.listeners):
try:
getattr(listener, event_function.__name__)(*args, **kwargs)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.error("Listener '%s' produced an exception:\n%s", listener, exception)
26 changes: 24 additions & 2 deletions src/plumpy/process_listener.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
# -*- coding: utf-8 -*-
import abc
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Dict, Optional

from . import persistence
from .utils import SAVED_STATE_TYPE, protected

__all__ = ['ProcessListener']

if TYPE_CHECKING:
from .processes import Process # pylint: disable=cyclic-import


class ProcessListener(metaclass=abc.ABCMeta):
@persistence.auto_persist('_params')
class ProcessListener(persistence.Savable, metaclass=abc.ABCMeta):

# region Persistence methods

def __init__(self) -> None:
super().__init__()
self._params: Dict[str, Any] = {}

def init(self, **kwargs: Any) -> None:
self._params = kwargs

@protected
def load_instance_state(
self, saved_state: SAVED_STATE_TYPE, load_context: Optional[persistence.LoadSaveContext]
) -> None:
super().load_instance_state(saved_state, load_context)
self.init(**saved_state['_params'])

# endregion

def on_process_created(self, process: 'Process') -> None:
"""
Expand Down
4 changes: 4 additions & 0 deletions src/plumpy/process_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ async def execute(self) -> State: # type: ignore # pylint: disable=invalid-over

def resume(self, value: Any = NULL) -> None:
assert self._waiting_future is not None, 'Not yet waiting'

if self._waiting_future.done():
return

self._waiting_future.set_result(value)


Expand Down
21 changes: 12 additions & 9 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
except ModuleNotFoundError:
from contextvars import ContextVar

from aio_pika.exceptions import ConnectionClosed
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed
import kiwipy
import yaml

from . import events, exceptions, futures, persistence, ports, process_comms, process_states, utils
from .base import state_machine
from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
from .base.utils import call_with_super_check, super_check
from .event_helper import EventHelper
from .process_listener import ProcessListener
from .process_spec import ProcessSpec
from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
Expand Down Expand Up @@ -91,7 +92,9 @@ def func_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
return func_wrapper


@persistence.auto_persist('_pid', '_creation_time', '_future', '_paused', '_status', '_pre_paused_status')
@persistence.auto_persist(
'_pid', '_creation_time', '_future', '_paused', '_status', '_pre_paused_status', '_event_helper'
)
class Process(StateMachine, persistence.Savable, metaclass=ProcessStateMachineMeta):
"""
The Process class is the base for any unit of work in plumpy.
Expand Down Expand Up @@ -289,7 +292,7 @@ def __init__(

# Runtime variables
self._future = persistence.SavableFuture(loop=self._loop)
self.__event_helper = utils.EventHelper(ProcessListener)
self._event_helper = EventHelper(ProcessListener)
self._logger = logger
self._communicator = communicator

Expand Down Expand Up @@ -612,7 +615,7 @@ def load_instance_state(self, saved_state: SAVED_STATE_TYPE, load_context: persi

# Runtime variables, set initial states
self._future = persistence.SavableFuture()
self.__event_helper = utils.EventHelper(ProcessListener)
self._event_helper = EventHelper(ProcessListener)
self._logger = None
self._communicator = None

Expand Down Expand Up @@ -661,11 +664,11 @@ def add_process_listener(self, listener: ProcessListener) -> None:
"""
assert (listener != self), 'Cannot listen to yourself!'
self.__event_helper.add_listener(listener)
self._event_helper.add_listener(listener)

def remove_process_listener(self, listener: ProcessListener) -> None:
"""Remove a process listener from the process."""
self.__event_helper.remove_listener(listener)
self._event_helper.remove_listener(listener)

@protected
def set_logger(self, logger: logging.Logger) -> None:
Expand Down Expand Up @@ -715,7 +718,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
except ConnectionClosed:
except (ConnectionClosed, ChannelInvalidStateError):
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except kiwipy.TimeoutError:
Expand Down Expand Up @@ -778,7 +781,7 @@ def on_output_emitting(self, output_port: str, value: Any) -> None:
"""Output is about to be emitted."""

def on_output_emitted(self, output_port: str, value: Any, dynamic: bool) -> None:
self.__event_helper.fire_event(ProcessListener.on_output_emitted, self, output_port, value, dynamic)
self._event_helper.fire_event(ProcessListener.on_output_emitted, self, output_port, value, dynamic)

@super_check
def on_wait(self, awaitables: Sequence[Awaitable]) -> None:
Expand Down Expand Up @@ -891,7 +894,7 @@ def on_close(self) -> None:
self._closed = True

def _fire_event(self, evt: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
self.__event_helper.fire_event(evt, self, *args, **kwargs)
self._event_helper.fire_event(evt, self, *args, **kwargs)

# endregion

Expand Down
41 changes: 0 additions & 41 deletions src/plumpy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,47 +39,6 @@
PID_TYPE = Hashable # pylint: disable=invalid-name


class EventHelper:

def __init__(self, listener_type: 'Type[ProcessListener]'):
assert listener_type is not None, 'Must provide valid listener type'

self._listener_type = listener_type
self._listeners: 'Set[ProcessListener]' = set()

def add_listener(self, listener: 'ProcessListener') -> None:
assert isinstance(listener, self._listener_type), 'Listener is not of right type'
self._listeners.add(listener)

def remove_listener(self, listener: 'ProcessListener') -> None:
self._listeners.discard(listener)

def remove_all_listeners(self) -> None:
self._listeners.clear()

@property
def listeners(self) -> 'Set[ProcessListener]':
return self._listeners

def fire_event(self, event_function: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Call an event method on all listeners.
:param event_function: the method of the ProcessListener
:param args: arguments to pass to the method
:param kwargs: keyword arguments to pass to the method
"""
if event_function is None:
raise ValueError('Must provide valid event method')

# Make a copy of the list for iteration just in case it changes in a callback
for listener in list(self.listeners):
try:
getattr(listener, event_function.__name__)(*args, **kwargs)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.error("Listener '%s' produced an exception:\n%s", listener, exception)


class Frozendict(Mapping):
"""
An immutable wrapper around dictionaries that implements the complete :py:class:`collections.abc.Mapping`
Expand Down
11 changes: 5 additions & 6 deletions test/rmq/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
version: '3.4'

services:

rabbit:
image: rabbitmq:3.8.3-management
container_name: plumpy-rmq
image: rabbitmq:3-management-alpine
container_name: plumpy_rmq
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- '5672:5672'
- '15672:15672'
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
Expand Down
Loading

0 comments on commit 8b1b2cc

Please sign in to comment.