diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 0c3c1c81458..276afba0d13 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -8,6 +8,8 @@ from celery import Task from celery.app import trace from celery.exceptions import WorkerShutdown +from celery.signals import task_postrun +from celery.signals import task_prerun from celery.states import READY_STATES from celery.utils.log import get_task_logger from celery.worker import strategy # type: ignore @@ -34,8 +36,11 @@ from onyx.utils.logger import ColoredFormatter from onyx.utils.logger import PlainFormatter from onyx.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT +from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import SENTRY_DSN - +from shared_configs.configs import TENANT_ID_PREFIX +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR logger = setup_logger() @@ -56,8 +61,8 @@ def on_task_prerun( sender: Any | None = None, task_id: str | None = None, task: Task | None = None, - args: tuple | None = None, - kwargs: dict | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, **kwds: Any, ) -> None: pass @@ -346,26 +351,36 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: def on_setup_logging( - loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any + loglevel: int, + logfile: str | None, + format: str, + colorize: bool, + **kwargs: Any, ) -> None: # TODO: could unhardcode format and colorize and accept these as options from # celery's config - # reformats the root logger root_logger = logging.getLogger() + root_logger.handlers = [] - root_handler = logging.StreamHandler() # Set up a handler for the root logger + # Define the log format + log_format = ( + "%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s" + ) + + # Set up the root handler + root_handler = logging.StreamHandler() root_formatter = ColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_handler.setFormatter(root_formatter) - root_logger.addHandler(root_handler) # Apply the handler to the root logger + root_logger.addHandler(root_handler) if logfile: root_file_handler = logging.FileHandler(logfile) root_file_formatter = PlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_file_handler.setFormatter(root_file_formatter) @@ -373,19 +388,23 @@ def on_setup_logging( root_logger.setLevel(loglevel) - # reformats celery's task logger + # Configure the task logger + task_logger.handlers = [] + + task_handler = logging.StreamHandler() + task_handler.addFilter(TenantContextFilter()) task_formatter = CeleryTaskColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) - task_handler = logging.StreamHandler() # Set up a handler for the task logger task_handler.setFormatter(task_formatter) - task_logger.addHandler(task_handler) # Apply the handler to the task logger + task_logger.addHandler(task_handler) if logfile: task_file_handler = logging.FileHandler(logfile) + task_file_handler.addFilter(TenantContextFilter()) task_file_formatter = CeleryTaskPlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) task_file_handler.setFormatter(task_file_formatter) @@ -394,10 +413,55 @@ def on_setup_logging( task_logger.setLevel(loglevel) task_logger.propagate = False - # hide celery task received spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received" + # Hide celery task received and succeeded/failed messages strategy.logger.setLevel(logging.WARNING) - - # hide celery task succeeded/failed spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None" trace.logger.setLevel(logging.WARNING) + + +class TenantContextFilter(logging.Filter): + + """Logging filter to inject tenant ID into the logger's name.""" + + def filter(self, record: logging.LogRecord) -> bool: + if not MULTI_TENANT: + record.name = "" + return True + + tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get() + if tenant_id: + tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5] + record.name = f"[t:{tenant_id}]" + else: + record.name = "" + return True + + +@task_prerun.connect +def set_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to set tenant ID in context var before task starts.""" + tenant_id = ( + kwargs.get("tenant_id", POSTGRES_DEFAULT_SCHEMA) + if kwargs + else POSTGRES_DEFAULT_SCHEMA + ) + CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) + + +@task_postrun.connect +def reset_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to reset tenant ID in context var after task ends.""" + CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA) diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 6880a253a81..6e372f75f97 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -44,18 +44,18 @@ def tick(self) -> float: self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - logger.info("Reload interval reached, initiating tenant task update") + logger.info("Reload interval reached, initiating task update") self._update_tenant_tasks() self._last_reload = now - logger.info("Tenant task update completed, reset reload timer") + logger.info("Task update completed, reset reload timer") return retval def _update_tenant_tasks(self) -> None: - logger.info("Starting tenant task update process") + logger.info("Starting task update process") try: - logger.info("Fetching all tenant IDs") + logger.info("Fetching all IDs") tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} tenants") + logger.info(f"Found {len(tenant_ids)} IDs") logger.info("Fetching tasks to schedule") tasks_to_schedule = fetch_versioned_implementation( @@ -70,7 +70,7 @@ def _update_tenant_tasks(self) -> None: for task_name, _ in current_schedule: if "-" in task_name: existing_tenants.add(task_name.split("-")[-1]) - logger.info(f"Found {len(existing_tenants)} existing tenants in schedule") + logger.info(f"Found {len(existing_tenants)} existing items in schedule") for tenant_id in tenant_ids: if ( @@ -83,7 +83,7 @@ def _update_tenant_tasks(self) -> None: continue if tenant_id not in existing_tenants: - logger.info(f"Processing new tenant: {tenant_id}") + logger.info(f"Processing new item: {tenant_id}") for task in tasks_to_schedule(): task_name = f"{task['name']}-{tenant_id}" @@ -129,11 +129,10 @@ def _update_tenant_tasks(self) -> None: logger.info("Schedule update completed successfully") else: logger.info("Schedule is up to date, no changes needed") - - except (AttributeError, KeyError): - logger.exception("Failed to process task configuration") - except Exception: - logger.exception("Unexpected error updating tenant tasks") + except (AttributeError, KeyError) as e: + logger.exception(f"Failed to process task configuration: {str(e)}") + except Exception as e: + logger.exception(f"Unexpected error updating tasks: {str(e)}") def _should_update_schedule( self, current_schedule: dict, new_schedule: dict diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 31a456286d7..39ed7d25d05 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -76,7 +76,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during connector deletion check") finally: if lock_beat.owned(): lock_beat.release() @@ -131,14 +131,14 @@ def try_generate_document_cc_pair_cleanup_tasks( redis_connector_index = redis_connector.new_index(search_settings.id) if redis_connector_index.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (indexing in progress): " + "Connector deletion - Delayed (indexing in progress): " f"cc_pair={cc_pair_id} " f"search_settings={search_settings.id}" ) if redis_connector.prune.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (pruning in progress): " + "Connector deletion - Delayed (pruning in progress): " f"cc_pair={cc_pair_id}" ) @@ -175,7 +175,7 @@ def try_generate_document_cc_pair_cleanup_tasks( # return 0 task_logger.info( - f"RedisConnectorDeletion.generate_tasks finished. " + "RedisConnectorDeletion.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index ad7ac2c9af6..8530a8e760d 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -309,7 +309,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during indexing check") finally: if locked: if lock_beat.owned(): @@ -508,7 +508,6 @@ def try_creating_indexing_task( except Exception: task_logger.exception( f"try_creating_indexing_task - Unexpected exception: " - f"tenant={tenant_id} " f"cc_pair={cc_pair.id} " f"search_settings={search_settings.id}" ) @@ -540,7 +539,6 @@ def connector_indexing_proxy_task( """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" task_logger.info( f"Indexing watchdog - starting: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -563,15 +561,14 @@ def connector_indexing_proxy_task( if not job: task_logger.info( f"Indexing watchdog - spawn failed: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) return task_logger.info( + f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -586,7 +583,6 @@ def connector_indexing_proxy_task( task_logger.warning( "Indexing watchdog - termination signal detected: " f"attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -681,7 +677,6 @@ def connector_indexing_proxy_task( task_logger.info( f"Indexing watchdog - finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -906,7 +901,6 @@ def connector_indexing_task( logger.info( f"Indexing spawned task finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index 1d76ecc7e0e..d4cca6e1cdf 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -122,7 +122,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during pruning check") finally: if lock_beat.owned(): lock_beat.release() @@ -308,7 +308,7 @@ def connector_pruning_generator_task( doc_ids_to_remove = list(all_indexed_document_ids - all_connector_doc_ids) task_logger.info( - f"Pruning set collected: " + "Pruning set collected: " f"cc_pair={cc_pair_id} " f"connector_source={cc_pair.connector.source} " f"docs_to_remove={len(doc_ids_to_remove)}" @@ -324,7 +324,7 @@ def connector_pruning_generator_task( return None task_logger.info( - f"RedisConnector.prune.generate_tasks finished. " + "RedisConnector.prune.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index 2686689708d..03a7ab83701 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -60,7 +60,7 @@ def document_by_cc_pair_cleanup_task( connector / credential pair from the access list (6) delete all relevant entries from postgres """ - task_logger.debug(f"Task start: tenant={tenant_id} doc={document_id}") + task_logger.debug(f"Task start: doc={document_id}") try: with get_session_with_tenant(tenant_id) as db_session: @@ -129,16 +129,13 @@ def document_by_cc_pair_cleanup_task( db_session.commit() task_logger.info( - f"tenant={tenant_id} " f"doc={document_id} " f"action={action} " f"refcount={count} " f"chunks={chunks_affected}" ) except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") return False except Exception as ex: if isinstance(ex, RetryError): @@ -157,15 +154,12 @@ def document_by_cc_pair_cleanup_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False - task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" - ) + task_logger.exception(f"Unexpected exception: doc={document_id}") if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 @@ -176,7 +170,7 @@ def document_by_cc_pair_cleanup_task( # eventually gets fixed out of band via stale document reconciliation task_logger.warning( f"Max celery task retries reached. Marking doc as dirty for reconciliation: " - f"tenant={tenant_id} doc={document_id}" + f"doc={document_id}" ) with get_session_with_tenant(tenant_id) as db_session: # delete the cc pair relationship now and let reconciliation clean it up diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 05e74f7f06c..704378a741c 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -156,7 +156,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during vespa metadata sync") finally: if lock_beat.owned(): lock_beat.release() @@ -873,13 +873,9 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) - task_logger.info( - f"tenant={tenant_id} doc={document_id} action=sync chunks={chunks_affected}" - ) + task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") except Exception as ex: if isinstance(ex, RetryError): task_logger.warning( @@ -897,14 +893,13 @@ def vespa_metadata_sync_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" + f"Unexpected exception during vespa metadata sync: doc={document_id}" ) # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64