From cfed57d6cff4fd15fe4b25578573190d53b9942c Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 23 May 2024 15:51:29 +0200 Subject: [PATCH] fix: Set a timeout for Actor cleanup (#206) - closes #200 I could not find the actual reason why the linked run got stuck, but this should make the Actor cleanup more robust as a whole. --- CHANGELOG.md | 4 +++- src/apify/actor.py | 28 +++++++++++++++++----------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3003dbea..3a2614a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,9 @@ ## [1.7.1](../../releases/tag/v1.7.1) - Unreleased -... +### Fixed + +- Set a timeout for Actor cleanup ## [1.7.0](../../releases/tag/v1.7.0) - 2024-03-12 diff --git a/src/apify/actor.py b/src/apify/actor.py index a7c98eea..2c0b2239 100644 --- a/src/apify/actor.py +++ b/src/apify/actor.py @@ -5,7 +5,7 @@ import inspect import os import sys -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, Awaitable, Callable, TypeVar, cast from apify_client import ApifyClientAsync @@ -301,6 +301,7 @@ async def exit( exit_code: int = 0, event_listeners_timeout_secs: float | None = EVENT_LISTENERS_TIMEOUT_SECS, status_message: str | None = None, + cleanup_timeout: timedelta = timedelta(seconds=30), ) -> None: """Exit the actor instance. @@ -314,11 +315,13 @@ async def exit( exit_code (int, optional): The exit code with which the actor should fail (defaults to `0`). event_listeners_timeout_secs (float, optional): How long should the actor wait for actor event listeners to finish before exiting. status_message (str, optional): The final status message that the actor should display. + cleanup_timeout (timedelta, optional): How long we should wait for event listeners. """ return await cls._get_default_instance().exit( exit_code=exit_code, event_listeners_timeout_secs=event_listeners_timeout_secs, status_message=status_message, + cleanup_timeout=cleanup_timeout, ) async def _exit_internal( @@ -327,6 +330,7 @@ async def _exit_internal( exit_code: int = 0, event_listeners_timeout_secs: float | None = EVENT_LISTENERS_TIMEOUT_SECS, status_message: str | None = None, + cleanup_timeout: timedelta = timedelta(seconds=30), ) -> None: self._raise_if_not_initialized() @@ -336,21 +340,23 @@ async def _exit_internal( self.log.info('Exiting actor', extra={'exit_code': exit_code}) - await self._cancel_event_emitting_intervals() + async def finalize() -> None: + await self._cancel_event_emitting_intervals() - # Send final persist state event - if not self._was_final_persist_state_emitted: - self._event_manager.emit(ActorEventTypes.PERSIST_STATE, {'isMigrating': False}) - self._was_final_persist_state_emitted = True + # Send final persist state event + if not self._was_final_persist_state_emitted: + self._event_manager.emit(ActorEventTypes.PERSIST_STATE, {'isMigrating': False}) + self._was_final_persist_state_emitted = True - if status_message is not None: - await self.set_status_message(status_message, is_terminal=True) + if status_message is not None: + await self.set_status_message(status_message, is_terminal=True) - # Sleep for a bit so that the listeners have a chance to trigger - await asyncio.sleep(0.1) + # Sleep for a bit so that the listeners have a chance to trigger + await asyncio.sleep(0.1) - await self._event_manager.close(event_listeners_timeout_secs=event_listeners_timeout_secs) + await self._event_manager.close(event_listeners_timeout_secs=event_listeners_timeout_secs) + await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds()) self._is_initialized = False if is_running_in_ipython():