Skip to content

Commit

Permalink
👌 IMPROVE: Process broadcast subscriber (#212)
Browse files Browse the repository at this point in the history
Filter out `state_changed` broadcasts,
and allow these to pass-through without generating an asynchronous task.

The issue this "fixes" is that, currently, every time a process changes state
it causes a `run_task` asyncio Task to be created for every running process.
This causes an unnecessary burden on the asyncio infrastructure,
reducing the responsiveness of other async tasks.
  • Loading branch information
chrisjsewell authored Mar 9, 2021
1 parent 56edeae commit 17039d2
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repos:
- id: pylint
additional_dependencies: [
"pyyaml~=5.1.2", "nest_asyncio~=1.4.0", "aio-pika~=6.6",
"aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.1"
"aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.4"
]
args:
[
Expand Down
23 changes: 22 additions & 1 deletion plumpy/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,34 @@ def convert_to_comm(callback: 'Subscriber',
on the given even loop and return a kiwi future representing the future outcome
of the original method.
:param loop: the even loop to schedule the callback in
:param callback: the function to convert
:param loop: the even loop to schedule the callback in
:return: a new callback function that returns a future
"""
if isinstance(callback, kiwipy.BroadcastFilter):

# if the broadcast is filtered for this callback,
# we don't want to go through the (costly) process
# of setting up async tasks and callbacks

def _passthrough(*args: Any, **kwargs: Any) -> bool:
sender = kwargs.get('sender', args[1])
subject = kwargs.get('subject', args[2])
return callback.is_filtered(sender, subject) # type: ignore[attr-defined]
else:

def _passthrough(*args: Any, **kwargs: Any) -> bool: # pylint: disable=unused-argument
return False

coro = ensure_coroutine(callback)

def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> kiwipy.Future:

if _passthrough(*args, **kwargs):
kiwi_future = kiwipy.Future()
kiwi_future.set_result(None)
return kiwi_future

msg_fn = functools.partial(coro, communicator, *args, **kwargs)
task_future = futures.create_task(msg_fn, loop)
return plum_to_kiwi_future(task_future)
Expand Down
7 changes: 4 additions & 3 deletions plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import enum
import functools
import logging
import re
import sys
import time
from types import TracebackType
Expand Down Expand Up @@ -300,9 +301,9 @@ def init(self) -> None:
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

try:
identifier = self._communicator.add_broadcast_subscriber(
self.broadcast_receive, identifier=str(self.pid)
)
# filter out state change broadcasts
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
python_requires='>=3.6',
install_requires=[
'pyyaml~=5.1.2', 'nest_asyncio~=1.4.0', 'aio-pika~=6.6', 'aiocontextvars~=0.2.2; python_version<"3.7"',
'kiwipy[rmq]~=0.7.1'
'kiwipy[rmq]~=0.7.4'
],
extras_require={
'docs': ['sphinx~=3.2.0', 'myst-nb~=0.11.0', 'sphinx-book-theme~=0.0.39', 'ipython~=7.0'],
Expand Down
29 changes: 28 additions & 1 deletion test/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import shortuuid

import pytest
from kiwipy import rmq
from kiwipy import BroadcastFilter, rmq

import plumpy
from plumpy import communications, process_comms
Expand Down Expand Up @@ -78,6 +78,33 @@ def get_broadcast(_comm, body, sender, subject, correlation_id):
result = await broadcast_future
assert result == BROADCAST

@pytest.mark.asyncio
async def test_broadcast_filter(self, loop_communicator):

broadcast_future = plumpy.Future()

loop = asyncio.get_event_loop()

def ignore_broadcast(_comm, body, sender, subject, correlation_id):
broadcast_future.set_exception(AssertionError('broadcast received'))

def get_broadcast(_comm, body, sender, subject, correlation_id):
broadcast_future.set_result(True)

loop_communicator.add_broadcast_subscriber(BroadcastFilter(ignore_broadcast, subject='other'))
loop_communicator.add_broadcast_subscriber(get_broadcast)
loop_communicator.broadcast_send(
**{
'body': 'present',
'sender': 'Martin',
'subject': 'sup',
'correlation_id': 420
}
)

result = await broadcast_future
assert result is True

@pytest.mark.asyncio
async def test_rpc(self, loop_communicator):
MSG = 'rpc this'
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ filterwarnings =


[mypy]
show_error_codes = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
Expand Down

0 comments on commit 17039d2

Please sign in to comment.