Skip to content

Commit

Permalink
remove tenant id logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Nov 23, 2024
1 parent 3a466a4 commit 4e847b0
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 65 deletions.
102 changes: 83 additions & 19 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from celery import Task
from celery.app import trace
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
from celery.signals import task_prerun
from celery.states import READY_STATES
from celery.utils.log import get_task_logger
from celery.worker import strategy # type: ignore
Expand All @@ -33,8 +35,11 @@
from danswer.utils.logger import ColoredFormatter
from danswer.utils.logger import PlainFormatter
from danswer.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.configs import SENTRY_DSN

from shared_configs.configs import TENANT_ID_PREFIX
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR

logger = setup_logger()

Expand All @@ -55,8 +60,8 @@ def on_task_prerun(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**kwds: Any,
) -> None:
pass
Expand Down Expand Up @@ -345,46 +350,60 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:


def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
loglevel: int,
logfile: str | None,
format: str,
colorize: bool,
**kwargs: Any,
) -> None:
# TODO: could unhardcode format and colorize and accept these as options from
# celery's config

# reformats the root logger
root_logger = logging.getLogger()
root_logger.handlers = []

root_handler = logging.StreamHandler() # Set up a handler for the root logger
# Define the log format
log_format = (
"%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s"
)

# Set up the root handler
root_handler = logging.StreamHandler()
root_formatter = ColoredFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
root_handler.setFormatter(root_formatter)
root_logger.addHandler(root_handler) # Apply the handler to the root logger
root_logger.addHandler(root_handler)

if logfile:
root_file_handler = logging.FileHandler(logfile)
root_file_formatter = PlainFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
root_file_handler.setFormatter(root_file_formatter)
root_logger.addHandler(root_file_handler)

root_logger.setLevel(loglevel)

# reformats celery's task logger
# Configure the task logger
task_logger.handlers = []

task_handler = logging.StreamHandler()
task_handler.addFilter(TenantContextFilter())
task_formatter = CeleryTaskColoredFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
task_handler = logging.StreamHandler() # Set up a handler for the task logger
task_handler.setFormatter(task_formatter)
task_logger.addHandler(task_handler) # Apply the handler to the task logger
task_logger.addHandler(task_handler)

if logfile:
task_file_handler = logging.FileHandler(logfile)
task_file_handler.addFilter(TenantContextFilter())
task_file_formatter = CeleryTaskPlainFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
task_file_handler.setFormatter(task_file_formatter)
Expand All @@ -393,10 +412,55 @@ def on_setup_logging(
task_logger.setLevel(loglevel)
task_logger.propagate = False

# hide celery task received spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received"
# Hide celery task received and succeeded/failed messages
strategy.logger.setLevel(logging.WARNING)

# hide celery task succeeded/failed spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None"
trace.logger.setLevel(logging.WARNING)


class TenantContextFilter(logging.Filter):

"""Logging filter to inject tenant ID into the logger's name."""

def filter(self, record: logging.LogRecord) -> bool:
if not MULTI_TENANT:
record.name = ""
return True

tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
if tenant_id:
tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5]
record.name = f"[t:{tenant_id}]"
else:
record.name = ""
return True


@task_prerun.connect
def set_tenant_id(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**other_kwargs: Any,
) -> None:
"""Signal handler to set tenant ID in context var before task starts."""
tenant_id = (
kwargs.get("tenant_id", POSTGRES_DEFAULT_SCHEMA)
if kwargs
else POSTGRES_DEFAULT_SCHEMA
)
CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)


@task_postrun.connect
def reset_tenant_id(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**other_kwargs: Any,
) -> None:
"""Signal handler to reset tenant ID in context var after task ends."""
CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA)
23 changes: 11 additions & 12 deletions backend/danswer/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ def tick(self) -> float:
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reload interval reached, initiating tenant task update")
logger.info("Reload interval reached, initiating task update")
self._update_tenant_tasks()
self._last_reload = now
logger.info("Tenant task update completed, reset reload timer")
logger.info("Task update completed, reset reload timer")
return retval

def _update_tenant_tasks(self) -> None:
logger.info("Starting tenant task update process")
logger.info("Starting task update process")
try:
logger.info("Fetching all tenant IDs")
logger.info("Fetching all IDs")
tenant_ids = get_all_tenant_ids()
logger.info(f"Found {len(tenant_ids)} tenants")
logger.info(f"Found {len(tenant_ids)} IDs")

logger.info("Fetching tasks to schedule")
tasks_to_schedule = fetch_versioned_implementation(
Expand All @@ -70,7 +70,7 @@ def _update_tenant_tasks(self) -> None:
for task_name, _ in current_schedule:
if "-" in task_name:
existing_tenants.add(task_name.split("-")[-1])
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
logger.info(f"Found {len(existing_tenants)} existing items in schedule")

for tenant_id in tenant_ids:
if (
Expand All @@ -83,7 +83,7 @@ def _update_tenant_tasks(self) -> None:
continue

if tenant_id not in existing_tenants:
logger.info(f"Processing new tenant: {tenant_id}")
logger.info(f"Processing new item: {tenant_id}")

for task in tasks_to_schedule():
task_name = f"{task['name']}-{tenant_id}"
Expand Down Expand Up @@ -129,11 +129,10 @@ def _update_tenant_tasks(self) -> None:
logger.info("Schedule update completed successfully")
else:
logger.info("Schedule is up to date, no changes needed")

except (AttributeError, KeyError):
logger.exception("Failed to process task configuration")
except Exception:
logger.exception("Unexpected error updating tenant tasks")
except (AttributeError, KeyError) as e:
logger.exception(f"Failed to process task configuration: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error updating tasks: {str(e)}")

def _should_update_schedule(
self, current_schedule: dict, new_schedule: dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during connector deletion check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -132,14 +132,14 @@ def try_generate_document_cc_pair_cleanup_tasks(
redis_connector_index = redis_connector.new_index(search_settings.id)
if redis_connector_index.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (indexing in progress): "
"Connector deletion - Delayed (indexing in progress): "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings.id}"
)

if redis_connector.prune.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (pruning in progress): "
"Connector deletion - Delayed (pruning in progress): "
f"cc_pair={cc_pair_id}"
)

Expand Down Expand Up @@ -176,7 +176,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
# return 0

task_logger.info(
f"RedisConnectorDeletion.generate_tasks finished. "
"RedisConnectorDeletion.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)

Expand Down
10 changes: 2 additions & 8 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during indexing check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -479,7 +479,6 @@ def try_creating_indexing_task(
except Exception:
task_logger.exception(
f"try_creating_indexing_task - Unexpected exception: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
Expand All @@ -505,7 +504,6 @@ def connector_indexing_proxy_task(
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
task_logger.info(
f"Indexing watchdog - starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand All @@ -524,15 +522,14 @@ def connector_indexing_proxy_task(
if not job:
task_logger.info(
f"Indexing watchdog - spawn failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return

task_logger.info(
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand All @@ -557,7 +554,6 @@ def connector_indexing_proxy_task(
task_logger.error(
f"Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
Expand All @@ -568,7 +564,6 @@ def connector_indexing_proxy_task(

task_logger.info(
f"Indexing watchdog - finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand Down Expand Up @@ -790,7 +785,6 @@ def connector_indexing_task(

logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand Down
6 changes: 3 additions & 3 deletions backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during pruning check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -303,7 +303,7 @@ def connector_pruning_generator_task(
doc_ids_to_remove = list(all_indexed_document_ids - all_connector_doc_ids)

task_logger.info(
f"Pruning set collected: "
"Pruning set collected: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source} "
f"docs_to_remove={len(doc_ids_to_remove)}"
Expand All @@ -319,7 +319,7 @@ def connector_pruning_generator_task(
return None

task_logger.info(
f"RedisConnector.prune.generate_tasks finished. "
"RedisConnector.prune.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)

Expand Down
Loading

0 comments on commit 4e847b0

Please sign in to comment.