diff --git a/nbgrader/preprocessors/instantiatetests.py b/nbgrader/preprocessors/instantiatetests.py index c208af9a8..165a873b0 100644 --- a/nbgrader/preprocessors/instantiatetests.py +++ b/nbgrader/preprocessors/instantiatetests.py @@ -5,7 +5,8 @@ from .. import utils from traitlets import Bool, List, Integer, Unicode, Dict, Callable from textwrap import dedent -from . import Execute +from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError +from . import NbGraderPreprocessor import secrets import asyncio import inspect @@ -14,13 +15,6 @@ from queue import Empty import datetime from typing import Optional -from nbclient.exceptions import ( - CellControlSignal, - CellExecutionComplete, - CellExecutionError, - CellTimeoutError, - DeadKernelError, -) try: from time import monotonic # Py 3 @@ -28,76 +22,8 @@ from time import time as monotonic # Py 2 -######################################################################################### -class CellExecutionComplete(Exception): - """ - Used as a control signal for cell execution across run_cell and - process_message function calls. Raised when all execution requests - are completed and no further messages are expected from the kernel - over zeromq channels. - """ - pass - -######################################################################################### -class CellExecutionError(Exception): - """ - Custom exception to propagate exceptions that are raised during - notebook execution to the caller. This is mostly useful when - using nbconvert as a library, since it allows dealing with - failures gracefully. - """ - - # ------------------------------------------------------------------------------------- - def __init__(self, traceback): - super(CellExecutionError, self).__init__(traceback) - self.traceback = traceback - - # ------------------------------------------------------------------------------------- - def __str__(self): - s = self.__unicode__() - if not isinstance(s, str): - s = s.encode('utf8', 'replace') - return s - - # ------------------------------------------------------------------------------------- - def __unicode__(self): - return self.traceback - - # ------------------------------------------------------------------------------------- - @classmethod - def from_code_and_msg(cls, code, msg): - """Instantiate from a code cell object and a message contents - (message is either execute_reply or error) - """ - tb = '\n'.join(msg.get('traceback', [])) - return cls(exec_err_msg.format(code=code, traceback=tb)) - # ------------------------------------------------------------------------------------- - - -######################################################################################### -class CodeExecutionError(Exception): - """ - Custom exception to propagate exceptions that are raised during - code snippet execution to the caller. This is mostly useful when - using nbconvert as a library, since it allows dealing with - failures gracefully. - """ - - -######################################################################################### - -exec_err_msg = u"""\ -An error occurred while executing the following code: ------------------- -{code} ------------------- -{traceback} -""" - - -######################################################################################### -class InstantiateTests(Execute): +class InstantiateTests(NbGraderPreprocessor, ExecutePreprocessor): tests = None autotest_filename = Unicode( @@ -169,6 +95,12 @@ class InstantiateTests(Execute): sanitizer = None global_tests_loaded = False + # COPIED from NbGrader Execute preprocessor + def __init__(self, *args, **kwargs): + # nbconvert < 7.3.1 used the sync version of this, which doesn't work for us. + kwargs.setdefault('kernel_manager_class', AsyncKernelManager) + super().__init__(*args, **kwargs) + def preprocess(self, nb, resources): # avoid starting the kernel at all/processing the notebook if there are no autotest delimiters for index, cell in enumerate(nb.cells): @@ -227,7 +159,7 @@ def preprocess_cell(self, cell, resources, index): continue # run all code lines prior to the current line containing the autotest_delimiter - asyncio.run(self._async_execute_code_snippet("\n".join(non_autotest_code_lines))) + self._execute_code_snippet("\n".join(non_autotest_code_lines)) non_autotest_code_lines = [] # there are autotests; we should check that it is a grading cell @@ -266,7 +198,7 @@ def preprocess_cell(self, cell, resources, index): if (self.setup_code is not None) and (not setup_code_inserted_into_cell): new_lines.append(self.setup_code) setup_code_inserted_into_cell = True - asyncio.run(self._async_execute_code_snippet(self.setup_code)) + self._execute_code_snippet(self.setup_code) # decide whether to use hashing based on whether the self.hashed_delimiter token # appears in the line before the self.autotest_delimiter token @@ -319,7 +251,7 @@ def preprocess_cell(self, cell, resources, index): # run the trailing non-autotest lines, if any remain if len(non_autotest_code_lines) > 0: - asyncio.run(self._async_execute_code_snippet("\n".join(non_autotest_code_lines))) + self._execute_code_snippet("\n".join(non_autotest_code_lines)) # add the final success message if is_grade_flag and self.global_tests_loaded: @@ -399,7 +331,7 @@ def _instantiate_tests(self, snippet, salt=None): # get the type of the snippet output (used to dispatch autotest) template = j2.Environment(loader=j2.BaseLoader).from_string(self.dispatch_template) dispatch_code = template.render(snippet=snippet) - dispatch_result = asyncio.run(self._async_execute_code_snippet(dispatch_code)) + dispatch_result = self._execute_code_snippet(dispatch_code) self.log.debug('Dispatch result returned by kernel: ', dispatch_result) # get the test code; if the type isn't in our dict, just default to 'default' # if default isn't in the tests code, this will throw an error @@ -449,296 +381,44 @@ def _instantiate_tests(self, snippet, salt=None): template = j2.Environment(loader=j2.BaseLoader).from_string(templ) instantiated_test = template.render(snippet=snippet) # run the instantiated template code - test_value = asyncio.run(self._async_execute_code_snippet(instantiated_test)) + test_value = self._execute_code_snippet(instantiated_test) instantiated_tests.append(instantiated_test) test_values.append(test_value) return instantiated_tests, test_values, rendered_fail_msgs - # ------------------------------------------------------------------------------------- - - ######################### - # async version of nbgrader interaction with kernel - # the below functions were adapted from the jupyter/nbclient GitHub repo, commit: - # https://github.com/jupyter/nbclient/commit/0c08e27c1ec655cffe9b35cf637da742cdab36e8 - ######################### - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.util.ensure_async - async def _ensure_async(self, obj): - """Convert a non-awaitable object to a coroutine if needed, - and await it if it was not already awaited. - adapted from nbclient.util._ensure_async - """ - if inspect.isawaitable(obj): - try: - result = await obj - except RuntimeError as e: - if str(e) == 'cannot reuse already awaited coroutine': - return obj - raise - return result - return obj - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client._async_handle_timeout - async def _async_handle_timeout(self, timeout: int) -> None: - - self.log.error("Timeout waiting for execute reply (%is)." % timeout) - if self.interrupt_on_timeout: - self.log.error("Interrupting kernel") - assert self.km is not None - await _ensure_async(self.km.interrupt_kernel()) - else: - raise CellTimeoutError.error_from_timeout_and_cell( - "Cell execution timed out", timeout - ) - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client._async_check_alive - async def _async_check_alive(self) -> None: - assert self.kc is not None - if not await self._ensure_async(self.kc.is_alive()): - self.log.error("Kernel died while waiting for execute reply.") - raise DeadKernelError("Kernel died") - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client._async_poll_output_msg - async def _async_poll_output_msg_code( - self, parent_msg_id: str, code - ) -> None: - - assert self.kc is not None - while True: - msg = await self._ensure_async(self.kc.iopub_channel.get_msg(timeout=None)) - if msg['parent_header'].get('msg_id') == parent_msg_id: - try: - msg_type = msg['msg_type'] - self.log.debug("msg_type: %s", msg_type) - content = msg['content'] - self.log.debug("content: %s", content) - - if msg_type in {'execute_result', 'display_data', 'update_display_data'}: - return self.sanitizer(content['data']['text/plain']) - - if msg_type == 'error': - self.log.error("Failed to run code: \n%s", code) - self.log.error("Runtime error from the kernel: \n%s", content['evalue']) - raise CodeExecutionError() - - if msg_type == 'status': - if content['execution_state'] == 'idle': - raise CellExecutionComplete() - - except CellExecutionComplete: - return - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client.async_wait_for_reply - async def _async_wait_for_reply( - self, msg_id: str, cell: t.Optional[NotebookNode] = None - ) -> t.Optional[t.Dict]: - - assert self.kc is not None - # wait for finish, with timeout - timeout = self._get_timeout(cell) - cummulative_time = 0 - while True: - try: - msg = await _ensure_async( - self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval) - ) - except Empty: - await self._async_check_alive() - cummulative_time += self.shell_timeout_interval - if timeout and cummulative_time > timeout: - await self._async_async_handle_timeout(timeout, cell) - break - else: - if msg['parent_header'].get('msg_id') == msg_id: - return msg - return None - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client._async_poll_for_reply - async def _async_poll_for_reply_code( - self, - msg_id: str, - timeout: t.Optional[int], - task_poll_output_msg: asyncio.Future, - task_poll_kernel_alive: asyncio.Future, - ) -> t.Dict: - - assert self.kc is not None - - self.log.debug("Executing _async_poll_for_reply:\n%s", msg_id) - if timeout is not None: - deadline = monotonic() + timeout - new_timeout = float(timeout) - - while True: - try: - shell_msg = await self._ensure_async(self.kc.shell_channel.get_msg(timeout=new_timeout)) - if shell_msg['parent_header'].get('msg_id') == msg_id: - try: - msg = await asyncio.wait_for(task_poll_output_msg, new_timeout) - except (asyncio.TimeoutError, Empty): - task_poll_kernel_alive.cancel() - raise CellExecutionError("Timeout waiting for IOPub output") - self.log.debug("Get _async_poll_for_reply:\n%s", msg) - - return msg if msg != None else "" - else: - if new_timeout is not None: - new_timeout = max(0, deadline - monotonic()) - except Empty: - self.log.debug("Empty _async_poll_for_reply:\n%s", msg_id) - task_poll_kernel_alive.cancel() - await self._async_check_alive() - await self._async_handle_timeout() - - # ------------------------------------------------------------------------------------- - # adapted from nbclient.client.async_execute_cell - async def _async_execute_code_snippet(self, code): - assert self.kc is not None + def _execute_code_snippet(self, code): + # append a temporary cell to the end of the notebook + tmp_cell = NotebookNode(cell_type = "code", source = code) + tmp_index = len(self.nb['cells']) + self.nb['cells'].append(tmp_cell) - self.log.debug("Executing cell:\n%s", code) + # execute that cell using NotebookClient's pre-existing functionality + tmp_cell = self.execute_cell(tmp_cell, tmp_index) - parent_msg_id = await self._ensure_async(self.kc.execute(code, stop_on_error=not self.allow_errors)) + # remove the temporary cell + self.nb['cells'].pop() - task_poll_kernel_alive = asyncio.ensure_future(self._async_check_alive()) + # on_cell_executed will inject an `execute_reply` attribute into the cell + # gives us access to the raw kernel reply, so we can process it here + reply = tmp_cell.execute_reply - task_poll_output_msg = asyncio.ensure_future(self._async_poll_output_msg_code(parent_msg_id, code)) + # if error, log and raise + if reply['content']['status'] == 'error': + self.log.error("Failed to run code: \n%s", code) + self.log.error("Runtime error from the kernel: \n%s\n%s\n%s", reply['content']['ename'], reply['content']['evalue'], reply['content']['traceback']) + raise CodeExecutionError() - task_poll_for_reply = asyncio.ensure_future( - self._async_poll_for_reply_code(parent_msg_id, self.timeout, task_poll_output_msg, task_poll_kernel_alive)) - - try: - msg = await task_poll_for_reply - except asyncio.CancelledError: - # can only be cancelled by task_poll_kernel_alive when the kernel is dead - task_poll_output_msg.cancel() - raise DeadKernelError("Kernel died") - except Exception as e: - # Best effort to cancel request if it hasn't been resolved - try: - # Check if the task_poll_output is doing the raising for us - if not isinstance(e, CellControlSignal): - task_poll_output_msg.cancel() - finally: - raise - - return msg - - # ------------------------------------------------------------------------------------- - async def async_execute_cell( - self, - cell: NotebookNode, - cell_index: int, - execution_count: t.Optional[int] = None, - store_history: bool = True, - ) -> NotebookNode: - """ - Executes a single code cell. - - To execute all cells see :meth:`execute`. - - Parameters - ---------- - cell : nbformat.NotebookNode - The cell which is currently being processed. - cell_index : int - The position of the cell within the notebook object. - execution_count : int - The execution count to be assigned to the cell (default: Use kernel response) - store_history : bool - Determines if history should be stored in the kernel (default: False). - Specific to ipython kernels, which can store command histories. - - Returns - ------- - output : dict - The execution output payload (or None for no output). - - Raises - ------ - CellExecutionError - If execution failed and should raise an exception, this will be raised - with defaults about the failure. - - Returns - ------- - cell : NotebookNode - The cell which was just processed. - """ - assert self.kc is not None - - await run_hook(self.on_cell_start, cell=cell, cell_index=cell_index) - - if cell.cell_type != 'code' or not cell.source.strip(): - self.log.debug("Skipping non-executing cell %s", cell_index) - return cell - - if self.skip_cells_with_tag in cell.metadata.get("tags", []): - self.log.debug("Skipping tagged cell %s", cell_index) - return cell - - if self.record_timing: # clear execution metadata prior to execution - cell['metadata']['execution'] = {} - - self.log.debug("Executing cell:\n%s", cell.source) - - cell_allows_errors = (not self.force_raise_errors) and ( - self.allow_errors or "raises-exception" in cell.metadata.get("tags", []) - ) - - await run_hook(self.on_cell_execute, cell=cell, cell_index=cell_index) - parent_msg_id = await _ensure_async( - self.kc.execute( - cell.source, store_history=store_history, stop_on_error=not cell_allows_errors - ) - ) - await run_hook(self.on_cell_complete, cell=cell, cell_index=cell_index) - # We launched a code cell to execute - self.code_cells_executed += 1 - exec_timeout = self._get_timeout(cell) - - cell.outputs = [] - self.clear_before_next_output = False - - task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive()) - task_poll_output_msg = asyncio.ensure_future( - self._async_poll_output_msg_code(parent_msg_id, code) - ) - self.task_poll_for_reply = asyncio.ensure_future( - self._async_poll_for_reply_code( - parent_msg_id, exec_timeout, task_poll_output_msg, task_poll_kernel_alive - ) - ) - try: - exec_reply = await self.task_poll_for_reply - except asyncio.CancelledError: - # can only be cancelled by task_poll_kernel_alive when the kernel is dead - task_poll_output_msg.cancel() - raise DeadKernelError("Kernel died") - except Exception as e: - # Best effort to cancel request if it hasn't been resolved - try: - # Check if the task_poll_output is doing the raising for us - if not isinstance(e, CellControlSignal): - task_poll_output_msg.cancel() - finally: - raise - - if execution_count: - cell['execution_count'] = execution_count - await self._check_raise_for_error(cell, cell_index, exec_reply) - self.nb['cells'][cell_index] = cell - return cell - # ------------------------------------------------------------------------------------- + # if no error, return the result + return self.sanitizer(reply['content']['data']['text/plain']) + def on_cell_executed(self, **kwargs): + # this hook is run by nbclient.NotebookClient after cell execution + # here append raw reply to the cell so that _execute_code_snippet can access it + kwargs['cell'].execute_reply = kwargs['execute_reply'] +# TODO: do we need this? def timestamp(msg: Optional[Dict] = None) -> str: if msg and 'header' in msg: # The test mocks don't provide a header, so tolerate that msg_header = msg['header']