diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 8ca3745d..52cf875b 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -32,6 +32,8 @@ cast, ) +import kiwipy + from plumpy.coordinator import Coordinator try: @@ -324,9 +326,11 @@ def init(self) -> None: try: # filter out state change broadcasts - identifier = self._coordinator.add_broadcast_subscriber( - self.broadcast_receive, subject_filters=[re.compile(r'^(?!state_changed).*')], identifier=str(self.pid) - ) + subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*')) + identifier = self._coordinator.add_broadcast_subscriber(subscriber, identifier=str(self.pid)) + # identifier = self._coordinator.add_broadcast_subscriber( + # subscriber, subject_filters=[re.compile(r'^(?!state_changed).*')], identifier=str(self.pid) + # ) self.add_cleanup(functools.partial(self._coordinator.remove_broadcast_subscriber, identifier)) except concurrent.futures.TimeoutError: self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid) @@ -788,6 +792,8 @@ def recursively_copy_dictionaries(value: Any) -> Any: self._uuid = uuid.uuid4() if self._pid is None: self._pid = self._uuid + # __import__('ipdb').set_trace() + # print("!!!!! ") @super_check def on_exit_running(self) -> None: