From 17039d2601fcc46735f83d5fa965c0e9ac532bfc Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 9 Mar 2021 14:50:11 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=91=8C=20IMPROVE:=20Process=20broadcast?= =?UTF-8?q?=20subscriber=20(#212)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Filter out `state_changed` broadcasts, and allow these to pass-through without generating an asynchronous task. The issue this "fixes" is that, currently, every time a process changes state it causes a `run_task` asyncio Task to be created for every running process. This causes an unnecessary burden on the asyncio infrastructure, reducing the responsiveness of other async tasks. --- .pre-commit-config.yaml | 2 +- plumpy/communications.py | 23 ++++++++++++++++++++++- plumpy/processes.py | 7 ++++--- setup.py | 2 +- test/rmq/test_communicator.py | 29 ++++++++++++++++++++++++++++- tox.ini | 1 + 6 files changed, 57 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 26ad6c5e..c7a38cc5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,7 +27,7 @@ repos: - id: pylint additional_dependencies: [ "pyyaml~=5.1.2", "nest_asyncio~=1.4.0", "aio-pika~=6.6", - "aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.1" + "aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.4" ] args: [ diff --git a/plumpy/communications.py b/plumpy/communications.py index 36be71de..8f4035b1 100644 --- a/plumpy/communications.py +++ b/plumpy/communications.py @@ -62,13 +62,34 @@ def convert_to_comm(callback: 'Subscriber', on the given even loop and return a kiwi future representing the future outcome of the original method. - :param loop: the even loop to schedule the callback in :param callback: the function to convert + :param loop: the even loop to schedule the callback in :return: a new callback function that returns a future """ + if isinstance(callback, kiwipy.BroadcastFilter): + + # if the broadcast is filtered for this callback, + # we don't want to go through the (costly) process + # of setting up async tasks and callbacks + + def _passthrough(*args: Any, **kwargs: Any) -> bool: + sender = kwargs.get('sender', args[1]) + subject = kwargs.get('subject', args[2]) + return callback.is_filtered(sender, subject) # type: ignore[attr-defined] + else: + + def _passthrough(*args: Any, **kwargs: Any) -> bool: # pylint: disable=unused-argument + return False + coro = ensure_coroutine(callback) def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> kiwipy.Future: + + if _passthrough(*args, **kwargs): + kiwi_future = kiwipy.Future() + kiwi_future.set_result(None) + return kiwi_future + msg_fn = functools.partial(coro, communicator, *args, **kwargs) task_future = futures.create_task(msg_fn, loop) return plum_to_kiwi_future(task_future) diff --git a/plumpy/processes.py b/plumpy/processes.py index 8fc495ef..540b248b 100644 --- a/plumpy/processes.py +++ b/plumpy/processes.py @@ -7,6 +7,7 @@ import enum import functools import logging +import re import sys import time from types import TracebackType @@ -300,9 +301,9 @@ def init(self) -> None: self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid) try: - identifier = self._communicator.add_broadcast_subscriber( - self.broadcast_receive, identifier=str(self.pid) - ) + # filter out state change broadcasts + subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*')) + identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier)) except kiwipy.TimeoutError: self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid) diff --git a/setup.py b/setup.py index 1a2f6aba..4947387c 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ python_requires='>=3.6', install_requires=[ 'pyyaml~=5.1.2', 'nest_asyncio~=1.4.0', 'aio-pika~=6.6', 'aiocontextvars~=0.2.2; python_version<"3.7"', - 'kiwipy[rmq]~=0.7.1' + 'kiwipy[rmq]~=0.7.4' ], extras_require={ 'docs': ['sphinx~=3.2.0', 'myst-nb~=0.11.0', 'sphinx-book-theme~=0.0.39', 'ipython~=7.0'], diff --git a/test/rmq/test_communicator.py b/test/rmq/test_communicator.py index 5e50380e..a4e8f27b 100644 --- a/test/rmq/test_communicator.py +++ b/test/rmq/test_communicator.py @@ -7,7 +7,7 @@ import shortuuid import pytest -from kiwipy import rmq +from kiwipy import BroadcastFilter, rmq import plumpy from plumpy import communications, process_comms @@ -78,6 +78,33 @@ def get_broadcast(_comm, body, sender, subject, correlation_id): result = await broadcast_future assert result == BROADCAST + @pytest.mark.asyncio + async def test_broadcast_filter(self, loop_communicator): + + broadcast_future = plumpy.Future() + + loop = asyncio.get_event_loop() + + def ignore_broadcast(_comm, body, sender, subject, correlation_id): + broadcast_future.set_exception(AssertionError('broadcast received')) + + def get_broadcast(_comm, body, sender, subject, correlation_id): + broadcast_future.set_result(True) + + loop_communicator.add_broadcast_subscriber(BroadcastFilter(ignore_broadcast, subject='other')) + loop_communicator.add_broadcast_subscriber(get_broadcast) + loop_communicator.broadcast_send( + **{ + 'body': 'present', + 'sender': 'Martin', + 'subject': 'sup', + 'correlation_id': 420 + } + ) + + result = await broadcast_future + assert result is True + @pytest.mark.asyncio async def test_rpc(self, loop_communicator): MSG = 'rpc this' diff --git a/tox.ini b/tox.ini index 6bf99e89..4abfe77c 100644 --- a/tox.ini +++ b/tox.ini @@ -50,6 +50,7 @@ filterwarnings = [mypy] +show_error_codes = True disallow_untyped_defs = True disallow_incomplete_defs = True check_untyped_defs = True