Skip to content

Commit

Permalink
1800 Delivery Status Callbacks (revision 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbfled authored and EvanParish committed Aug 15, 2024
1 parent 8dca303 commit 7f902b0
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 146 deletions.
6 changes: 3 additions & 3 deletions app/callback/queue_callback_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ def send_callback(
except ClientError as e:
statsd_client.incr(f'callback.queue.{callback.callback_type}.non_retryable_error')
raise NonRetryableException(e)
else:
current_app.logger.info(f'Callback sent to {callback.url}, {tags}')
statsd_client.incr(f'callback.queue.{callback.callback_type}.success')

current_app.logger.info('Callback sent to %s, %s', callback.url, tags)
statsd_client.incr(f'callback.queue.{callback.callback_type}.success')
13 changes: 5 additions & 8 deletions app/celery/contact_information_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@ def lookup_contact_info(
current_app.logger.info('Looking up contact information for notification_id: %s.', notification_id)

notification = get_notification_by_id(notification_id)
va_profile_id = notification.recipient_identifiers[IdentifierType.VA_PROFILE_ID.value]
recipient_identifier = notification.recipient_identifiers[IdentifierType.VA_PROFILE_ID.value]

try:
if EMAIL_TYPE == notification.notification_type:
recipient = va_profile_client.get_email(va_profile_id)
recipient = va_profile_client.get_email(recipient_identifier)
elif SMS_TYPE == notification.notification_type:
recipient = va_profile_client.get_telephone(va_profile_id)
recipient = va_profile_client.get_telephone(recipient_identifier)
else:
raise NotImplementedError(
f'The task lookup_contact_info failed for notification {notification_id}. '
f'{notification.notification_type} is not supported'
)

except (Timeout, VAProfileRetryableException) as e:
if can_retry(self.request.retries, self.max_retries, notification_id):
current_app.logger.warning('Unable to get contact info for notification id: %s, retrying', notification_id)
Expand All @@ -63,7 +62,6 @@ def lookup_contact_info(
)
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message) from e

except (VAProfileIDNotFoundException, VAProfileNonRetryableException) as e:
current_app.logger.exception(e)
message = (
Expand All @@ -76,6 +74,5 @@ def lookup_contact_info(
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message) from e

else:
notification.to = recipient
dao_update_notification(notification)
notification.to = recipient
dao_update_notification(notification)
47 changes: 28 additions & 19 deletions app/celery/lookup_recipient_communication_permissions_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from app import notify_celery, va_profile_client
from app.celery.common import can_retry, handle_max_retries_exceeded
from app.celery.exceptions import AutoRetryException
from app.celery.service_callback_tasks import check_and_queue_callback_task
from app.dao.communication_item_dao import get_communication_item
from app.dao.notifications_dao import get_notification_by_id, update_notification_status_by_id
from app.exceptions import NotificationTechnicalFailureException
from app.exceptions import NotificationTechnicalFailureException, NotificationPermanentFailureException
from app.models import RecipientIdentifier, NOTIFICATION_PREFERENCES_DECLINED
from app.va.va_profile import VAProfileRetryableException
from app.va.va_profile.exceptions import CommunicationItemNotFoundException
Expand All @@ -30,44 +31,52 @@ def lookup_recipient_communication_permissions(
self,
notification_id: str,
) -> None:
current_app.logger.info(f'Looking up communication preferences for notification_id:{notification_id}')
current_app.logger.info('Looking up communication preferences for notification_id: %s', notification_id)

notification = get_notification_by_id(notification_id)

try:
notification.recipient_identifiers[IdentifierType.VA_PROFILE_ID.value]
va_profile_recipient_identifier = notification.recipient_identifiers[IdentifierType.VA_PROFILE_ID.value]
except KeyError as e:
current_app.logger.info(f'{VAProfileIdNotFoundException.failure_reason} on notification ' f'{notification_id}')
raise VAProfileIdNotFoundException from e

va_profile_recipient_identifier = notification.recipient_identifiers[IdentifierType.VA_PROFILE_ID.value]
current_app.logger.info('No VA Profile ID for notification %s.', notification_id)
raise VAProfileIdNotFoundException(f'No VA Profile ID for notification {notification_id}.') from e

va_profile_id = va_profile_recipient_identifier.id_value
communication_item_id = notification.template.communication_item_id
notification_type = notification.notification_type

status_reason = recipient_has_given_permission(
self,
IdentifierType.VA_PROFILE_ID.value,
va_profile_id,
notification_id,
notification_type,
communication_item_id,
)
try:
status_reason = recipient_has_given_permission(
self,
IdentifierType.VA_PROFILE_ID.value,
va_profile_id,
notification_id,
notification_type,
communication_item_id,
)
except NotificationTechnicalFailureException:
check_and_queue_callback_task(notification)
raise

if status_reason is not None:
# The recipient doesn't grant permission. a.k.a. preferences-declined

update_notification_status_by_id(
notification_id, NOTIFICATION_PREFERENCES_DECLINED, status_reason=status_reason
)
current_app.logger.info(
f'Recipient for notification {notification_id}' f'has declined permission to receive notifications'
)
self.request.chain = None
message = f'The recipient for notification {notification_id} has declined permission to receive notifications.'
current_app.logger.info(message)
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message)


def recipient_has_given_permission(
task, id_type: str, id_value: str, notification_id: str, notification_type: str, communication_item_id: str
) -> Optional[str]:
"""
Return None if the recipient has the permissions. Otherwise, return a string to explain the lack of permission.
"""

default_send_flag = True
communication_item = None
identifier = RecipientIdentifier(id_type=id_type, id_value=id_value)
Expand Down
7 changes: 6 additions & 1 deletion app/celery/onsite_notification_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ def send_va_onsite_notification_task(
template_id: str,
onsite_enabled: bool = False,
):
"""This function is used by celery to POST a notification to VA_Onsite."""
"""
POST a notification to VA_Onsite.
"""

current_app.logger.info(
'Calling va_onsite_notification_task with va_profile_id: %s\ntemplate_id: %s\nonsite_notification set to: %s',
va_profile_id,
Expand All @@ -19,4 +22,6 @@ def send_va_onsite_notification_task(

if onsite_enabled and va_profile_id:
data = {'onsite_notification': {'template_id': template_id, 'va_profile_id': va_profile_id}}

# This method catches exceptions. It should never disrupt execution of the Celery task chain.
va_onsite_client.post_onsite_notification(data)
34 changes: 26 additions & 8 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
log_and_update_technical_failure,
)
from app.celery.exceptions import NonRetryableException, AutoRetryException
from app.celery.service_callback_tasks import check_and_queue_callback_task
from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
from app.config import QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.dao.service_sms_sender_dao import dao_get_service_sms_sender_by_service_id_and_number
from app.delivery import send_to_providers
from app.exceptions import NotificationTechnicalFailureException, MalwarePendingException, InvalidProviderException
from app.exceptions import (
MalwarePendingException,
NotificationPermanentFailureException,
NotificationTechnicalFailureException,
InvalidProviderException,
)
from app.models import NOTIFICATION_TECHNICAL_FAILURE
from app.v2.errors import RateLimitError
from flask import current_app
Expand All @@ -37,8 +43,9 @@ def deliver_sms(
notification_id,
sms_sender_id=None,
):
current_app.logger.info('Start sending SMS for notification id: %s', notification_id)

try:
current_app.logger.info('Start sending SMS for notification id: %s', notification_id)
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
# Distributed computing race condition
Expand All @@ -57,14 +64,15 @@ def deliver_sms(
e,
'SMS provider configuration invalid',
)
raise NotificationTechnicalFailureException(str(e))
raise NotificationTechnicalFailureException from e
except InvalidPhoneError as e:
log_and_update_permanent_failure(
notification.id,
'deliver_sms',
e,
'Phone number is invalid',
)
raise NotificationTechnicalFailureException from e
except NonRetryableException as e:
# Max retries exceeded, celery raised exception
log_and_update_permanent_failure(
Expand All @@ -73,6 +81,7 @@ def deliver_sms(
e,
'ERROR: NonRetryableException - permanent failure, not retrying',
)
raise NotificationPermanentFailureException from e
except (NullValueForNonConditionalPlaceholderException, AttributeError, RuntimeError) as e:
log_and_update_technical_failure(notification_id, 'deliver_sms', e)
raise NotificationTechnicalFailureException(f'Found {type(e).__name__}, NOT retrying...', e, e.args)
Expand All @@ -83,6 +92,7 @@ def deliver_sms(
raise AutoRetryException(f'Found {type(e).__name__}, autoretrying...', e, e.args)
else:
msg = handle_max_retries_exceeded(notification_id, 'deliver_sms')
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(msg)


Expand All @@ -104,8 +114,9 @@ def deliver_sms_with_rate_limiting(
):
from app.notifications.validators import check_sms_sender_over_rate_limit

current_app.logger.info('Start sending SMS with rate limiting for notification id: %s', notification_id)

try:
current_app.logger.info('Start sending SMS with rate limiting for notification id: %s', notification_id)
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
current_app.logger.warning('Notification not found for: %s, retrying', notification_id)
Expand Down Expand Up @@ -135,6 +146,7 @@ def deliver_sms_with_rate_limiting(
e,
'Phone number is invalid',
)
raise NotificationPermanentFailureException from e
except NonRetryableException as e:
# Max retries exceeded, celery raised exception
log_and_update_permanent_failure(
Expand All @@ -143,6 +155,7 @@ def deliver_sms_with_rate_limiting(
e,
'ERROR: NonRetryableException - permanent failure, not retrying',
)
raise NotificationTechnicalFailureException from e
except RateLimitError:
retry_time = sms_sender.rate_limit_interval / sms_sender.rate_limit
current_app.logger.info(
Expand All @@ -165,6 +178,7 @@ def deliver_sms_with_rate_limiting(
raise AutoRetryException(f'Found {type(e).__name__}, autoretrying...', e, e.args)
else:
msg = handle_max_retries_exceeded(notification_id, 'deliver_sms_with_rate_limiting')
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(msg)


Expand All @@ -184,8 +198,9 @@ def deliver_email(
notification_id: str,
sms_sender_id=None,
):
current_app.logger.info('Start sending email for notification id: %s', notification_id)

try:
current_app.logger.info('Start sending email for notification id: %s', notification_id)
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
current_app.logger.warning('Notification not found for: %s, retrying', notification_id)
Expand All @@ -201,7 +216,8 @@ def deliver_email(
update_notification_status_by_id(
notification_id, NOTIFICATION_TECHNICAL_FAILURE, status_reason='Email address is in invalid format'
)
raise NotificationTechnicalFailureException(str(e))
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException from e
except MalwarePendingException:
current_app.logger.info(
'RETRY number %s: Email notification %s is pending malware scans', self.request.retries, notification_id
Expand All @@ -212,10 +228,11 @@ def deliver_email(
update_notification_status_by_id(
notification_id, NOTIFICATION_TECHNICAL_FAILURE, status_reason='Email provider configuration invalid'
)
raise NotificationTechnicalFailureException(str(e))
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException from e
except (NullValueForNonConditionalPlaceholderException, AttributeError, RuntimeError) as e:
log_and_update_technical_failure(notification_id, 'deliver_email', e)
raise NotificationTechnicalFailureException(f'Found {type(e).__name__}, NOT retrying...', e, e.args)
raise NotificationTechnicalFailureException('NOT retrying...') from e
except Exception as e:
current_app.logger.exception('Email delivery for notification id: %s failed', notification_id)
if can_retry(self.request.retries, self.max_retries, notification_id):
Expand All @@ -230,4 +247,5 @@ def deliver_email(
raise AutoRetryException(f'Found {type(e).__name__}, autoretrying...', e, e.args)
else:
msg = handle_max_retries_exceeded(notification_id, 'deliver_email')
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(msg)
Loading

0 comments on commit 7f902b0

Please sign in to comment.