Skip to content

Commit

Permalink
pause receiving while submitting tasks
Browse files Browse the repository at this point in the history
results coming in during submission causes thread contention while submitting tasks

pause receiving messages while we are preparing tasks to be submitted
  • Loading branch information
minrk committed Jul 16, 2021
1 parent 3093386 commit bc9c32e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 22 deletions.
59 changes: 51 additions & 8 deletions ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import warnings
from collections.abc import Iterable
from concurrent.futures import Future
from contextlib import contextmanager
from getpass import getpass
from pprint import pprint
from threading import current_thread
Expand Down Expand Up @@ -990,21 +991,39 @@ def _stop_io_thread(self):
self._io_thread.join()

def _setup_streams(self):
self._query_stream = ZMQStream(self._query_socket, self._io_loop)
self._streams = []
self._query_stream = s = ZMQStream(self._query_socket, self._io_loop)
self._streams.append(s)

self._control_stream = s = ZMQStream(self._control_socket, self._io_loop)
self._streams.append(s)
self._mux_stream = s = ZMQStream(self._mux_socket, self._io_loop)
self._streams.append(s)
self._task_stream = s = ZMQStream(self._task_socket, self._io_loop)
self._streams.append(s)
self._broadcast_stream = s = ZMQStream(self._broadcast_socket, self._io_loop)
self._streams.append(s)
self._iopub_stream = s = ZMQStream(self._iopub_socket, self._io_loop)
self._streams.append(s)
self._notification_stream = s = ZMQStream(
self._notification_socket, self._io_loop
)
self._streams.append(s)
self._start_receiving()

def _start_receiving(self):
self._query_stream.on_recv(self._dispatch_single_reply, copy=False)
self._control_stream = ZMQStream(self._control_socket, self._io_loop)
self._control_stream.on_recv(self._dispatch_single_reply, copy=False)
self._mux_stream = ZMQStream(self._mux_socket, self._io_loop)
self._mux_stream.on_recv(self._dispatch_reply, copy=False)
self._task_stream = ZMQStream(self._task_socket, self._io_loop)
self._task_stream.on_recv(self._dispatch_reply, copy=False)
self._iopub_stream = ZMQStream(self._iopub_socket, self._io_loop)
self._broadcast_stream.on_recv(self._dispatch_reply, copy=False)
self._iopub_stream.on_recv(self._dispatch_iopub, copy=False)
self._notification_stream = ZMQStream(self._notification_socket, self._io_loop)
self._notification_stream.on_recv(self._dispatch_notification, copy=False)

self._broadcast_stream = ZMQStream(self._broadcast_socket, self._io_loop)
self._broadcast_stream.on_recv(self._dispatch_reply, copy=False)
def _stop_receiving(self):
"""Stop receiving on engine streams"""
for s in self._streams:
s.stop_on_recv()

def _start_io_thread(self):
"""Start IOLoop in a background thread."""
Expand Down Expand Up @@ -1034,6 +1053,30 @@ def _io_main(self, start_evt=None):
self._io_loop.start()
self._io_loop.close()

@contextmanager
def _pause_results(self):
"""Context manager to pause receiving results
When submitting lots of tasks,
the arrival of results can disrupt the processing
of new submissions.
Threadsafe.
"""
f = Future()

def _stop():
self._stop_receiving()
f.set_result(None)

# use add_callback to make it threadsafe
self._io_loop.add_callback(_stop)
f.result()
try:
yield
finally:
self._io_loop.add_callback(self._start_receiving)

@unpack_message
def _dispatch_single_reply(self, msg):
"""Dispatch single (non-execution) replies"""
Expand Down
44 changes: 30 additions & 14 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,12 @@ def _really_apply(
pargs = [PrePickled(arg) for arg in args]
pkwargs = {k: PrePickled(v) for k, v in kwargs.items()}

for ident in _idents:
future = self.client.send_apply_request(
self._socket, pf, pargs, pkwargs, track=track, ident=ident
)
futures.append(future)
with self.client._pause_results():
for ident in _idents:
future = self.client.send_apply_request(
self._socket, pf, pargs, pkwargs, track=track, ident=ident
)
futures.append(future)
if track:
trackers = [_.tracker for _ in futures]
else:
Expand Down Expand Up @@ -641,9 +642,16 @@ def map(self, f, *sequences, block=None, track=False, return_exceptions=False):

assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(
self, f, block=block, track=track, return_exceptions=return_exceptions
self, f, block=False, track=track, return_exceptions=return_exceptions
)
return pf.map(*sequences)
with self.client._pause_results():
ar = pf.map(*sequences)
if block:
try:
return ar.get()
except KeyboardInterrupt:
return ar
return ar

@sync_results
@save_ids
Expand All @@ -665,11 +673,12 @@ def execute(self, code, silent=True, targets=None, block=None):

_idents, _targets = self.client._build_targets(targets)
futures = []
for ident in _idents:
future = self.client.send_execute_request(
self._socket, code, silent=silent, ident=ident
)
futures.append(future)
with self.client._pause_results():
for ident in _idents:
future = self.client.send_execute_request(
self._socket, code, silent=silent, ident=ident
)
futures.append(future)
if isinstance(targets, int):
futures = futures[0]
ar = AsyncResult(
Expand Down Expand Up @@ -1292,12 +1301,19 @@ def map(
pf = ParallelFunction(
self,
f,
block=block,
block=False,
chunksize=chunksize,
ordered=ordered,
return_exceptions=return_exceptions,
)
return pf.map(*sequences)
with self.client._pause_results():
ar = pf.map(*sequences)
if block:
try:
return ar.get()
except KeyboardInterrupt:
return ar
return ar

def imap(
self,
Expand Down

0 comments on commit bc9c32e

Please sign in to comment.