diff --git a/backend/danswer/background/celery/apps/app_base.py b/backend/danswer/background/celery/apps/app_base.py index d041ce0d2bc..6c448f64449 100644 --- a/backend/danswer/background/celery/apps/app_base.py +++ b/backend/danswer/background/celery/apps/app_base.py @@ -8,6 +8,8 @@ from celery import Task from celery.app import trace from celery.exceptions import WorkerShutdown +from celery.signals import task_postrun +from celery.signals import task_prerun from celery.states import READY_STATES from celery.utils.log import get_task_logger from celery.worker import strategy # type: ignore @@ -33,8 +35,11 @@ from danswer.utils.logger import ColoredFormatter from danswer.utils.logger import PlainFormatter from danswer.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT +from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import SENTRY_DSN - +from shared_configs.configs import TENANT_ID_PREFIX +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR logger = setup_logger() @@ -55,8 +60,8 @@ def on_task_prerun( sender: Any | None = None, task_id: str | None = None, task: Task | None = None, - args: tuple | None = None, - kwargs: dict | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, **kwds: Any, ) -> None: pass @@ -345,26 +350,36 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: def on_setup_logging( - loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any + loglevel: int, + logfile: str | None, + format: str, + colorize: bool, + **kwargs: Any, ) -> None: # TODO: could unhardcode format and colorize and accept these as options from # celery's config - # reformats the root logger root_logger = logging.getLogger() + root_logger.handlers = [] - root_handler = logging.StreamHandler() # Set up a handler for the root logger + # Define the log format + log_format = ( + "%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s" + ) + + # Set up the root handler + root_handler = logging.StreamHandler() root_formatter = ColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_handler.setFormatter(root_formatter) - root_logger.addHandler(root_handler) # Apply the handler to the root logger + root_logger.addHandler(root_handler) if logfile: root_file_handler = logging.FileHandler(logfile) root_file_formatter = PlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_file_handler.setFormatter(root_file_formatter) @@ -372,19 +387,23 @@ def on_setup_logging( root_logger.setLevel(loglevel) - # reformats celery's task logger + # Configure the task logger + task_logger.handlers = [] + + task_handler = logging.StreamHandler() + task_handler.addFilter(TenantContextFilter()) task_formatter = CeleryTaskColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) - task_handler = logging.StreamHandler() # Set up a handler for the task logger task_handler.setFormatter(task_formatter) - task_logger.addHandler(task_handler) # Apply the handler to the task logger + task_logger.addHandler(task_handler) if logfile: task_file_handler = logging.FileHandler(logfile) + task_file_handler.addFilter(TenantContextFilter()) task_file_formatter = CeleryTaskPlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) task_file_handler.setFormatter(task_file_formatter) @@ -393,10 +412,55 @@ def on_setup_logging( task_logger.setLevel(loglevel) task_logger.propagate = False - # hide celery task received spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received" + # Hide celery task received and succeeded/failed messages strategy.logger.setLevel(logging.WARNING) - - # hide celery task succeeded/failed spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None" trace.logger.setLevel(logging.WARNING) + + +class TenantContextFilter(logging.Filter): + + """Logging filter to inject tenant ID into the logger's name.""" + + def filter(self, record: logging.LogRecord) -> bool: + if not MULTI_TENANT: + record.name = "" + return True + + tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get() + if tenant_id: + tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5] + record.name = f"[t:{tenant_id}]" + else: + record.name = "" + return True + + +@task_prerun.connect +def set_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to set tenant ID in context var before task starts.""" + tenant_id = ( + kwargs.get("tenant_id", POSTGRES_DEFAULT_SCHEMA) + if kwargs + else POSTGRES_DEFAULT_SCHEMA + ) + CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) + + +@task_postrun.connect +def reset_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to reset tenant ID in context var after task ends.""" + CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA) diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index f7ae3ec2655..4d0f8c348e4 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -44,18 +44,18 @@ def tick(self) -> float: self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - logger.info("Reload interval reached, initiating tenant task update") + logger.info("Reload interval reached, initiating task update") self._update_tenant_tasks() self._last_reload = now - logger.info("Tenant task update completed, reset reload timer") + logger.info("Task update completed, reset reload timer") return retval def _update_tenant_tasks(self) -> None: - logger.info("Starting tenant task update process") + logger.info("Starting task update process") try: - logger.info("Fetching all tenant IDs") + logger.info("Fetching all IDs") tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} tenants") + logger.info(f"Found {len(tenant_ids)} IDs") logger.info("Fetching tasks to schedule") tasks_to_schedule = fetch_versioned_implementation( @@ -70,7 +70,7 @@ def _update_tenant_tasks(self) -> None: for task_name, _ in current_schedule: if "-" in task_name: existing_tenants.add(task_name.split("-")[-1]) - logger.info(f"Found {len(existing_tenants)} existing tenants in schedule") + logger.info(f"Found {len(existing_tenants)} existing items in schedule") for tenant_id in tenant_ids: if ( @@ -83,7 +83,7 @@ def _update_tenant_tasks(self) -> None: continue if tenant_id not in existing_tenants: - logger.info(f"Processing new tenant: {tenant_id}") + logger.info(f"Processing new item: {tenant_id}") for task in tasks_to_schedule(): task_name = f"{task['name']}-{tenant_id}" @@ -129,11 +129,10 @@ def _update_tenant_tasks(self) -> None: logger.info("Schedule update completed successfully") else: logger.info("Schedule is up to date, no changes needed") - - except (AttributeError, KeyError): - logger.exception("Failed to process task configuration") - except Exception: - logger.exception("Unexpected error updating tenant tasks") + except (AttributeError, KeyError) as e: + logger.exception(f"Failed to process task configuration: {str(e)}") + except Exception as e: + logger.exception(f"Unexpected error updating tasks: {str(e)}") def _should_update_schedule( self, current_schedule: dict, new_schedule: dict diff --git a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py index 9413dd97854..27fc3cf280c 100644 --- a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py @@ -76,7 +76,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during connector deletion check") finally: if lock_beat.owned(): lock_beat.release() @@ -132,14 +132,14 @@ def try_generate_document_cc_pair_cleanup_tasks( redis_connector_index = redis_connector.new_index(search_settings.id) if redis_connector_index.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (indexing in progress): " + "Connector deletion - Delayed (indexing in progress): " f"cc_pair={cc_pair_id} " f"search_settings={search_settings.id}" ) if redis_connector.prune.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (pruning in progress): " + "Connector deletion - Delayed (pruning in progress): " f"cc_pair={cc_pair_id}" ) @@ -176,7 +176,7 @@ def try_generate_document_cc_pair_cleanup_tasks( # return 0 task_logger.info( - f"RedisConnectorDeletion.generate_tasks finished. " + "RedisConnectorDeletion.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 14bf66cdc56..f0aa3ed297f 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -287,7 +287,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during indexing check") finally: if lock_beat.owned(): lock_beat.release() @@ -479,7 +479,6 @@ def try_creating_indexing_task( except Exception: task_logger.exception( f"try_creating_indexing_task - Unexpected exception: " - f"tenant={tenant_id} " f"cc_pair={cc_pair.id} " f"search_settings={search_settings.id}" ) @@ -505,7 +504,6 @@ def connector_indexing_proxy_task( """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" task_logger.info( f"Indexing watchdog - starting: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -524,15 +522,14 @@ def connector_indexing_proxy_task( if not job: task_logger.info( f"Indexing watchdog - spawn failed: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) return task_logger.info( + f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -557,7 +554,6 @@ def connector_indexing_proxy_task( task_logger.error( f"Indexing watchdog - spawned task exceptioned: " f"attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id} " f"error={job.exception()}" @@ -568,7 +564,6 @@ def connector_indexing_proxy_task( task_logger.info( f"Indexing watchdog - finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -790,7 +785,6 @@ def connector_indexing_task( logger.info( f"Indexing spawned task finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index 67b781f228f..ec49e8a8562 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -120,7 +120,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during pruning check") finally: if lock_beat.owned(): lock_beat.release() @@ -303,7 +303,7 @@ def connector_pruning_generator_task( doc_ids_to_remove = list(all_indexed_document_ids - all_connector_doc_ids) task_logger.info( - f"Pruning set collected: " + "Pruning set collected: " f"cc_pair={cc_pair_id} " f"connector_source={cc_pair.connector.source} " f"docs_to_remove={len(doc_ids_to_remove)}" @@ -319,7 +319,7 @@ def connector_pruning_generator_task( return None task_logger.info( - f"RedisConnector.prune.generate_tasks finished. " + "RedisConnector.prune.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 2719a4d0665..bc58effa906 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -59,7 +59,7 @@ def document_by_cc_pair_cleanup_task( connector / credential pair from the access list (6) delete all relevant entries from postgres """ - task_logger.debug(f"Task start: tenant={tenant_id} doc={document_id}") + task_logger.debug(f"Task start: doc={document_id}") try: with get_session_with_tenant(tenant_id) as db_session: @@ -128,16 +128,13 @@ def document_by_cc_pair_cleanup_task( db_session.commit() task_logger.info( - f"tenant={tenant_id} " f"doc={document_id} " f"action={action} " f"refcount={count} " f"chunks={chunks_affected}" ) except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") return False except Exception as ex: if isinstance(ex, RetryError): @@ -156,15 +153,12 @@ def document_by_cc_pair_cleanup_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False - task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" - ) + task_logger.exception(f"Unexpected exception: doc={document_id}") if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 @@ -175,7 +169,7 @@ def document_by_cc_pair_cleanup_task( # eventually gets fixed out of band via stale document reconciliation task_logger.warning( f"Max celery task retries reached. Marking doc as dirty for reconciliation: " - f"tenant={tenant_id} doc={document_id}" + f"doc={document_id}" ) with get_session_with_tenant(tenant_id) as db_session: # delete the cc pair relationship now and let reconciliation clean it up diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index ec7f52bc03c..a4ba2a8a421 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -154,7 +154,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during vespa metadata sync") finally: if lock_beat.owned(): lock_beat.release() @@ -855,13 +855,9 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) - task_logger.info( - f"tenant={tenant_id} doc={document_id} action=sync chunks={chunks_affected}" - ) + task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") except Exception as ex: if isinstance(ex, RetryError): task_logger.warning( @@ -879,14 +875,13 @@ def vespa_metadata_sync_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" + f"Unexpected exception during vespa metadata sync: doc={document_id}" ) # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 diff --git a/backend/tests/integration/tests/personas/test_persona_categories.py b/backend/tests/integration/tests/personas/test_persona_categories.py index fdd0e645814..1ac2d3b3000 100644 --- a/backend/tests/integration/tests/personas/test_persona_categories.py +++ b/backend/tests/integration/tests/personas/test_persona_categories.py @@ -44,6 +44,7 @@ def test_persona_category_management(reset: None) -> None: category=updated_persona_category, user_performing_action=regular_user, ) + assert exc_info.value.response is not None assert exc_info.value.response.status_code == 403 assert PersonaCategoryManager.verify(