-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Notification to Admins for Ingestor Failure #387
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,17 @@ | |
|
||
from celery.utils.log import get_task_logger | ||
|
||
from django.core.mail import send_mail | ||
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.utils import timezone | ||
|
||
from core.celery import app | ||
from gap.models.ingestor import IngestorSession, IngestorType | ||
from core.models import BackgroundTask | ||
from gap.models.ingestor import ( | ||
IngestorSession, IngestorType, | ||
IngestorSessionStatus | ||
) | ||
|
||
logger = get_task_logger(__name__) | ||
|
||
|
@@ -20,7 +29,11 @@ | |
session = IngestorSession.objects.get(id=_id) | ||
session.run() | ||
except IngestorSession.DoesNotExist: | ||
logger.error('Ingestor Session {} does not exists'.format(_id)) | ||
logger.error(f"Ingestor Session {_id} does not exist") | ||
notify_ingestor_failure.delay(_id, "Session not found") | ||
except Exception as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also trigger the task when CollectionSession runs with an error. We could also run the task when the task fails unexpectedly in this BackgroundTask.task_on_errors: To run the notify task in BackgroundTask.task_on_errors, you need to check the task name whether it is ingestor session or collector session. |
||
logger.error(f"Error in Ingestor Session {_id}: {str(e)}") | ||
notify_ingestor_failure.delay(_id, str(e)) | ||
|
||
|
||
@app.task(name='run_daily_ingestor') | ||
|
@@ -41,3 +54,64 @@ | |
else: | ||
# When not created, it is run manually | ||
session.run() | ||
|
||
|
||
@app.task(name="notify_ingestor_failure") | ||
def notify_ingestor_failure(session_id: int, exception: str): | ||
""" | ||
Celery task to notify admins if an ingestor session fails. | ||
|
||
:param session_id: ID of the IngestorSession | ||
:param exception: Exception message describing the failure | ||
""" | ||
logger.error(f"Ingestor Session {session_id} failed: {exception}") | ||
# Get celery task ID | ||
task_id = notify_ingestor_failure.request.id | ||
|
||
# Retrieve the ingestor session | ||
try: | ||
session = IngestorSession.objects.get(id=session_id) | ||
session.status = IngestorSessionStatus.FAILED | ||
session.save() | ||
except IngestorSession.DoesNotExist: | ||
logger.warning(f"IngestorSession {session_id} not found.") | ||
return | ||
|
||
# Log failure in BackgroundTask | ||
BackgroundTask.objects.get_or_create( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BackgroundTask is automatically created when a task is triggered, so we don't need to create one in here. Each IngestorSession task has background task record. We can query by task name and context_id (value is IngestorSessionId). |
||
task_id=task_id, | ||
defaults={ | ||
"task_name": "notify_ingestor_failure", | ||
"status": "FAILED", | ||
"parameters": f"Ingestor ID: {session_id}", | ||
"errors": exception, | ||
"last_update": timezone.now(), | ||
}, | ||
) | ||
|
||
# Send an email notification to admins | ||
# Get admin emails from the database | ||
User = get_user_model() | ||
admin_emails = list( | ||
User.objects.filter( | ||
is_superuser=True).values_list('email', flat=True) | ||
) | ||
if admin_emails: | ||
send_mail( | ||
subject="Ingestor Failure Alert", | ||
message=( | ||
f"Ingestor Session {session_id} has failed.\n\n" | ||
f"Error: {exception}\n\n" | ||
"Please check the logs for more details." | ||
), | ||
from_email=settings.DEFAULT_FROM_EMAIL, | ||
recipient_list=[admin_emails], | ||
fail_silently=False, | ||
) | ||
logger.info(f"Sent ingestor failure email to {admin_emails}") | ||
else: | ||
logger.warning("No admin email found in settings.ADMINS") | ||
|
||
return ( | ||
f"Logged ingestor failure for session {session_id} and notified admins" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to define as periodic task as it is triggered manually.