Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-128340: thread safe handle for loop.call_soon_threadsafe #128369

Merged
merged 9 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,10 @@ def call_soon_threadsafe(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args, context)
handle = events._ThreadSafeHandle(callback, args, self, context)
self._ready.append(handle)
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
Expand Down
28 changes: 28 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,34 @@ def _run(self):
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.

# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe
# and is thread safe unlike Handle which is not thread safe.
class _ThreadSafeHandle(Handle):

__slots__ = ('_lock',)

def __init__(self, callback, args, loop, context=None):
super().__init__(callback, args, loop, context)
self._lock = threading.RLock()

def cancel(self):
with self._lock:
return super().cancel()

def cancelled(self):
with self._lock:
return super().cancelled()

def _run(self):
# The event loop checks for cancellation without holding the lock
# It is possible that the handle is cancelled after the check
# but before the callback is called so check it again after acquiring
# the lock and return without calling the callback if it is cancelled.
with self._lock:
if self._cancelled:
return
return super()._run()
kumaraditya303 marked this conversation as resolved.
Show resolved Hide resolved


class TimerHandle(Handle):
"""Object returned by timed callback registration methods."""
Expand Down
118 changes: 118 additions & 0 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,124 @@ def run_in_thread():
t.join()
self.assertEqual(results, ['hello', 'world'])

def test_call_soon_threadsafe_handle_block_check_cancelled(self):
results = []

callback_started = threading.Event()
callback_finished = threading.Event()
def callback(arg):
callback_started.set()
results.append(arg)
time.sleep(1)
callback_finished.set()

def run_in_thread():
handle = self.loop.call_soon_threadsafe(callback, 'hello')
self.assertIsInstance(handle, events._ThreadSafeHandle)
callback_started.wait()
# callback started so it should block checking for cancellation
# until it finishes
self.assertFalse(handle.cancelled())
self.assertTrue(callback_finished.is_set())
self.loop.call_soon_threadsafe(self.loop.stop)

t = threading.Thread(target=run_in_thread)
t.start()

self.loop.run_forever()
t.join()
self.assertEqual(results, ['hello'])

def test_call_soon_threadsafe_handle_block_cancellation(self):
results = []

callback_started = threading.Event()
callback_finished = threading.Event()
def callback(arg):
callback_started.set()
results.append(arg)
time.sleep(1)
callback_finished.set()

def run_in_thread():
handle = self.loop.call_soon_threadsafe(callback, 'hello')
self.assertIsInstance(handle, events._ThreadSafeHandle)
callback_started.wait()
# callback started so it cannot be cancelled from other thread until
# it finishes
handle.cancel()
self.assertTrue(callback_finished.is_set())
self.loop.call_soon_threadsafe(self.loop.stop)

t = threading.Thread(target=run_in_thread)
t.start()

self.loop.run_forever()
t.join()
self.assertEqual(results, ['hello'])

def test_call_soon_threadsafe_handle_cancel_same_thread(self):
results = []
callback_started = threading.Event()
callback_finished = threading.Event()

fut = concurrent.futures.Future()
def callback(arg):
callback_started.set()
handle = fut.result()
handle.cancel()
results.append(arg)
callback_finished.set()
self.loop.stop()

def run_in_thread():
handle = self.loop.call_soon_threadsafe(callback, 'hello')
fut.set_result(handle)
self.assertIsInstance(handle, events._ThreadSafeHandle)
callback_started.wait()
# callback cancels itself from same thread so it has no effect
# it runs to completion
self.assertTrue(handle.cancelled())
self.assertTrue(callback_finished.is_set())
self.loop.call_soon_threadsafe(self.loop.stop)

t = threading.Thread(target=run_in_thread)
t.start()

self.loop.run_forever()
t.join()
self.assertEqual(results, ['hello'])

def test_call_soon_threadsafe_handle_cancel_other_thread(self):
results = []
ev = threading.Event()

callback_finished = threading.Event()
def callback(arg):
results.append(arg)
callback_finished.set()
self.loop.stop()

def run_in_thread():
handle = self.loop.call_soon_threadsafe(callback, 'hello')
# handle can be cancelled from other thread if not started yet
self.assertIsInstance(handle, events._ThreadSafeHandle)
handle.cancel()
self.assertTrue(handle.cancelled())
self.assertFalse(callback_finished.is_set())
ev.set()
self.loop.call_soon_threadsafe(self.loop.stop)

# block the main loop until the callback is added and cancelled in the
# other thread
self.loop.call_soon(ev.wait)
t = threading.Thread(target=run_in_thread)
t.start()
self.loop.run_forever()
t.join()
self.assertEqual(results, [])
self.assertFalse(callback_finished.is_set())

def test_call_soon_threadsafe_same_thread(self):
results = []

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add internal thread safe handle to be used in :meth:`asyncio.loop.call_soon_threadsafe` for thread safe cancellation.
Loading