Skip to content

Commit

Permalink
Fix race condition timeout decorator (bugfix) (#1442)
Browse files Browse the repository at this point in the history
* Better error handling and no infinite hang

* Document why the new test was introduced

* Also test system exit and double get

* Test also is_picklable

* Various typos

Co-authored-by: Pierre Equoy <pierre.equoy@canonical.com>

* Explain the need for is_picklable

---------

Co-authored-by: Pierre Equoy <pierre.equoy@canonical.com>
  • Loading branch information
Hook25 and pieqq authored Sep 3, 2024
1 parent 6937537 commit d3078dd
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 51 deletions.
105 changes: 78 additions & 27 deletions checkbox-support/checkbox_support/helpers/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,29 @@
functions
"""
import os
import pickle
import traceback
import subprocess

from queue import Empty
from functools import partial
from contextlib import wraps
from unittest.mock import patch
from multiprocessing import Process, SimpleQueue
from contextlib import wraps, suppress
from multiprocessing import Process, Queue


def is_picklable(value):
"""
This function checks if an object is picklable. This is used here because
to propagate a value via a multiprocessing queue, it has to be picklable.
If it is not, when using normal Queues (not SimpleQueues), pushing the
value in the queue will crash the encoder Thread that the Queue contains,
silently failing the operation.
"""
with suppress(pickle.PicklingError), suppress(AttributeError):
_ = pickle.dumps(value)
return True
return False


def run_with_timeout(f, timeout_s, *args, **kwargs):
Expand All @@ -40,45 +56,80 @@ def run_with_timeout(f, timeout_s, *args, **kwargs):
Note: the function, *args and **kwargs must be picklable to use this.
"""
result_queue = SimpleQueue()
exception_queue = SimpleQueue()
result_queue = Queue()
exception_queue = Queue()

def _f(*args, **kwargs):
os.setsid()
try:
result_queue.put(f(*args, **kwargs))
except BaseException as e:
try:
exception_queue.put(e)
except BaseException:
# raised by pickle.dumps in put when an exception is not
# pickleable. This raises SystemExit as we can't preserve the
# exception type, so any exception handler in the function
# user will not work (or will wrongly handle the exception
# if we change the type)
result = f(*args, **kwargs)
if is_picklable(result):
result_queue.put(result)
else:
exception_queue.put(
SystemExit(
"".join(
[
"Function failed but the timeout decorator is "
"unable to propagate this un-picklable "
"exception:\n"
]
+ traceback.format_exception(type(e), e, None)
)
"Function tried to return non-picklable value "
"but this is not supported by the timeout decorator.\n"
"Returned object:\n" + repr(result)
)
)
result_queue.close()
exception_queue.close()
return
except BaseException as e:
error = e

if is_picklable(error):
exception_queue.put(error)
result_queue.close()
exception_queue.close()
return
# raised by pickle.dumps in put when an exception is not
# pickleable. This raises SystemExit as we can't preserve the
# exception type, so any exception handler in the function
# user will not work (or will wrongly handle the exception
# if we change the type)
exception_queue.put(
SystemExit(
"".join(
[
"Function failed but the timeout decorator is "
"unable to propagate this un-picklable "
"exception:\n"
]
+ traceback.format_exception(type(error), error, None)
)
)
)
result_queue.close()
exception_queue.close()

process = Process(target=_f, args=args, kwargs=kwargs, daemon=True)
process = Process(target=_f, args=args, kwargs=kwargs)
process.start()
process.join(timeout_s)

if process.is_alive():
subprocess.run("kill -9 -- -{}".format(process.pid), shell=True)
# this kills the whole process tree, not just the child
subprocess.run("kill -9 -{}".format(process.pid), shell=True)
raise TimeoutError("Task unable to finish in {}s".format(timeout_s))
if not exception_queue.empty():
raise exception_queue.get()
return result_queue.get()

with suppress(Empty):
return result_queue.get_nowait()
with suppress(Empty):
raise exception_queue.get_nowait()

# unpickling is done in a separate thread, could it be un-scheduled yet?
with suppress(Empty):
return result_queue.get(timeout=0.1)
with suppress(Empty):
raise exception_queue.get(timeout=0.1)

# this should never happen, lets crash the program if it does
raise SystemExit(
"Function failed to propagate either a value or an exception.\n"
"It is unclear why this happened or what the underlying function "
"did."
)


def timeout(timeout_s):
Expand Down
96 changes: 72 additions & 24 deletions checkbox-support/checkbox_support/tests/test_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.

import os
import time
import multiprocessing

from queue import Empty
from unittest import TestCase
from unittest.mock import patch
from functools import partial

from checkbox_support.helpers.timeout import (
run_with_timeout,
timeout,
is_picklable,
run_with_timeout,
fake_run_with_timeout,
)

Expand Down Expand Up @@ -137,38 +139,84 @@ def test_unpicklable_return_raises(self):
def k():
return lambda x: ...

with self.assertRaises(AttributeError):
with self.assertRaises(SystemExit):
k()

@patch("checkbox_support.helpers.timeout.Process")
@patch("os.setsid")
def test_unpicklable_raise_raises(self, os_setid, process_type_mock):
def test_unpicklable_raise_raises(self):
"""
The reason why this raises is that the timeout decorator pushes the
function to another process. Trying to raise an un-picklable object
will raise a pickle error.
"""

# this mocks process because else the coverage doesn't get the
# coverage
def init(*args, **kwargs):
process_type_mock.target = kwargs["target"]
process_type_mock.args = kwargs["args"]
process_type_mock.kwargs = kwargs["kwargs"]
return process_type_mock

def start():
return process_type_mock.target(
*process_type_mock.args, **process_type_mock.kwargs
)

process_type_mock.side_effect = init
process_type_mock.start = start
process_type_mock.is_alive.return_value = False

@timeout(1)
def k():
raise ValueError(lambda x: ...)

with self.assertRaises(SystemExit):
k()

def test_timeout_kills_subprocess_tree_on_timeout(self):
"""
This tests that the timeout decorator not only kills the direct child
(function under test) but also any sub-process it has spawned. This
is done because one could `subprocess.run` a long-lived process from
the function and it could cause mayhem (and block the test session as
Checkbox waits for all children to be done)
"""

def inner(pid_pipe):
pid_pipe.send(os.getpid())
pid_pipe.close()
time.sleep(1e4)

def outer(pid_pipe):
inner_p = multiprocessing.Process(target=inner, args=(pid_pipe,))
inner_p.start()
inner_p.join()

@timeout(0.1)
def f(pid_pipe):
outer_p = multiprocessing.Process(target=outer, args=(pid_pipe,))
outer_p.start()
outer_p.join()

read, write = multiprocessing.Pipe()
with self.assertRaises(TimeoutError):
f(write)
with self.assertRaises(OSError):
pid = read.recv()
# give the process a few ms to wind down
time.sleep(0.01)
# this throws an exception if the process we are trying to send
# a signal to doesn't exist
os.kill(pid, 0)

@patch("checkbox_support.helpers.timeout.Queue")
@patch("checkbox_support.helpers.timeout.Process")
def test_run_with_timeout_double_get(self, process_mock, queue_mock):
process_mock().is_alive.return_value = False
queue_mock().get_nowait.side_effect = Empty()
queue_mock().get.side_effect = [
Empty(),
ValueError("Some value error"),
]

with self.assertRaises(ValueError):
run_with_timeout(lambda: ..., 0)

@patch("checkbox_support.helpers.timeout.Queue")
@patch("checkbox_support.helpers.timeout.Process")
def test_run_with_timeout_system_exit_no_get(
self, process_mock, queue_mock
):
process_mock().is_alive.return_value = False
queue_mock().get_nowait.side_effect = Empty()
queue_mock().get.side_effect = Empty()

with self.assertRaises(SystemExit):
run_with_timeout(lambda: ..., 0)

def test_is_picklable(self):
self.assertFalse(is_picklable(lambda: ...))
self.assertTrue(is_picklable([1, 2, 3]))

0 comments on commit d3078dd

Please sign in to comment.