Skip to content

Commit

Permalink
#1049 Populate SMS cost and segments count for notifications (#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbfled authored Jan 23, 2023
1 parent 2795cad commit 9bdf81e
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 59 deletions.
116 changes: 75 additions & 41 deletions app/celery/process_pinpoint_receipt_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

from app import notify_celery, statsd_client
from app.config import QueueNames
from app.dao.notifications_dao import update_notification_status_by_id, dao_get_notification_by_reference
from app.dao.notifications_dao import (
dao_get_notification_by_reference,
dao_update_notification,
update_notification_status_by_id,
)
from app.feature_flags import FeatureFlag, is_feature_enabled
from app.models import (
NOTIFICATION_DELIVERED,
Expand Down Expand Up @@ -42,36 +46,52 @@
}


def event_type_is_optout(event_type, reference):
is_optout = event_type == '_SMS.OPTOUT'

if is_optout:
current_app.logger.info(
f"event type is OPTOUT for notification with reference {reference})"
)
return is_optout


def _map_record_status_to_notification_status(record_status):
return _record_status_status_mapping[record_status]


@notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def process_pinpoint_results(self, response):
"""
Process a Pinpoint SMS stream event. Messages long enough to require multiple segments only
result in one event that contains the aggregate cost.
https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams-data-sms.html
"""

if not is_feature_enabled(FeatureFlag.PINPOINT_RECEIPTS_ENABLED):
current_app.logger.info('Pinpoint receipts toggle is disabled, skipping callback task')
current_app.logger.info('Pinpoint receipts toggle is disabled. Skipping callback task.')
return True

try:
pinpoint_message = json.loads(base64.b64decode(response['Message']))
reference = pinpoint_message['attributes']['message_id']
event_type = pinpoint_message.get('event_type')
record_status = pinpoint_message['attributes']['record_status']
current_app.logger.info(
f'received callback from Pinpoint with event_type of {event_type} and record_status of {record_status}'
f' with reference {reference}'
)
except (json.decoder.JSONDecodeError, ValueError, TypeError, KeyError) as e:
current_app.logger.exception(e)
self.retry(queue=QueueNames.RETRY)
return None

try:
pinpoint_attributes = pinpoint_message["attributes"]
reference = pinpoint_attributes["message_id"]
event_type = pinpoint_message["event_type"]
record_status = pinpoint_attributes["record_status"]
number_of_message_parts = pinpoint_attributes["number_of_message_parts"]
price_in_millicents_usd = pinpoint_message["metrics"]["price_in_millicents_usd"]
except KeyError as e:
current_app.logger.error("The event stream message data is missing expected attributes.")
current_app.logger.exception(e)
current_app.logger.debug(pinpoint_message)
self.retry(queue=QueueNames.RETRY)
return None

current_app.logger.info(
"Processing Pinpoint result. | reference=%s | event_type=%s | record_status=%s | "
"number_of_message_parts=%s | price_in_millicents_usd=%s",
reference, event_type, record_status, number_of_message_parts, price_in_millicents_usd
)

try:
notification_status = get_notification_status(event_type, record_status, reference)

notification, should_retry, should_exit = attempt_to_get_notification(
Expand All @@ -84,13 +104,22 @@ def process_pinpoint_results(self, response):
if should_exit:
return

update_notification_status_by_id(
notification_id=notification.id,
status=notification_status
)
assert notification is not None

if price_in_millicents_usd > 0.0:
notification.status = notification_status
notification.segments_count = number_of_message_parts
notification.cost_in_millicents = price_in_millicents_usd
dao_update_notification(notification)
else:
update_notification_status_by_id(
notification_id=notification.id,
status=notification_status
)

current_app.logger.info(
f"Pinpoint callback return status of {notification_status} for notification: {notification.id}"
"Pinpoint callback return status of %s for notification: %s",
notification_status, notification.id
)

statsd_client.incr(f"callback.pinpoint.{notification_status}")
Expand All @@ -104,15 +133,19 @@ def process_pinpoint_results(self, response):
return True

except Retry:
# This block exists to preempt executing the "Exception" logic below. A better approach is
# to catch specific exceptions where they might occur.
raise

except Exception as e:
current_app.logger.exception(f"Error processing Pinpoint results: {type(e)}")
current_app.logger.exception(e)
self.retry(queue=QueueNames.RETRY)

return None


def get_notification_status(event_type: str, record_status: str, reference: str) -> str:
if event_type_is_optout(event_type, reference):
if event_type == '_SMS.OPTOUT':
current_app.logger.info("event type is OPTOUT for notification with reference %s", reference)
statsd_client.incr(f"callback.pinpoint.optout")
notification_status = NOTIFICATION_PERMANENT_FAILURE
else:
Expand All @@ -133,18 +166,18 @@ def attempt_to_get_notification(
message_time = datetime.datetime.fromtimestamp(int(event_timestamp_in_ms) / 1000)
if datetime.datetime.utcnow() - message_time < datetime.timedelta(minutes=5):
current_app.logger.info(
f'Pinpoint callback event for reference {reference} was received less than five minutes ago.'
'Pinpoint callback event for reference %s was received less than five minutes ago.', reference
)
should_retry = True
else:
current_app.logger.warning(
f'notification not found for reference: {reference} (update to {notification_status})'
'notification not found for reference: %s (update to %s)', reference, notification_status
)
statsd_client.incr('callback.pinpoint.no_notification_found')
should_exit = True
except MultipleResultsFound:
current_app.logger.warning(
f'multiple notifications found for reference: {reference} (update to {notification_status})'
'multiple notifications found for reference: %s (update to %s)', reference, notification_status
)
statsd_client.incr('callback.pinpoint.multiple_notifications_found')
should_exit = True
Expand All @@ -153,25 +186,26 @@ def attempt_to_get_notification(


def check_notification_status(notification: Notification, notification_status: str) -> bool:
should_exit = False
# do not update if status has not changed
# Do not update if the status has not changed.
if notification_status == notification.status:
current_app.logger.info(
f'Pinpoint callback received the same status of {notification_status} for'
f' notification {notification_status})'
'Pinpoint callback received the same status of %s for notification %s)',
notification_status, notification_status
)
should_exit = True
# do not update if notification status is in a final state
return True

# Do not update if notification status is in a final state.
if notification.status in FINAL_STATUS_STATES:
log_notification_status_warning(notification, notification_status)
should_exit = True
return should_exit
return True

return False


def log_notification_status_warning(notification, status: str) -> None:
time_diff = datetime.datetime.utcnow() - (notification.updated_at or notification.created_at)
current_app.logger.warning(
f'Invalid callback received. Notification id {notification.id} received a status update to {status}'
f' {time_diff} after being set to {notification.status}. {notification.notification_type}'
f' sent by {notification.sent_by}'
'Invalid callback received. Notification id %s received a status update to %s '
'%s after being set to %s. %s sent by %s',
notification.id, status, time_diff, notification.status, notification.notification_type, notification.sent_by
)
31 changes: 17 additions & 14 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def update_notification_status_by_reference(reference, status):
notification = Notification.query.filter(Notification.reference == reference).first()

if not notification:
current_app.logger.error('notification not found for reference {} (update to {})'.format(reference, status))
current_app.logger.error("Notification not found for reference %s (update to %s)", reference, status)
return None

if notification.status not in {
Expand Down Expand Up @@ -295,7 +295,8 @@ def _filter_query(query, filter_dict=None):
@statsd(namespace="dao")
def delete_notifications_older_than_retention_by_type(notification_type, qry_limit=10000):
current_app.logger.info(
'Deleting {} notifications for services with flexible data retention'.format(notification_type))
'Deleting %s notifications for services with flexible data retention', notification_type
)

flexible_data_retention = ServiceDataRetention.query.filter(
ServiceDataRetention.notification_type == notification_type
Expand All @@ -313,11 +314,13 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim
insert_update_notification_history(notification_type, days_of_retention, f.service_id)

current_app.logger.info(
"Deleting {} notifications for service id: {}".format(notification_type, f.service_id))
"Deleting %s notifications for service id: %s", notification_type, f.service_id
)
deleted += _delete_notifications(notification_type, days_of_retention, f.service_id, qry_limit)

current_app.logger.info(
'Deleting {} notifications for services without flexible data retention'.format(notification_type))
'Deleting %s notifications for services without flexible data retention', notification_type
)

seven_days_ago = get_local_timezone_midnight_in_utc(
convert_utc_to_local_timezone(datetime.utcnow()).date()) - timedelta(days=7)
Expand All @@ -332,7 +335,7 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim
insert_update_notification_history(notification_type, seven_days_ago, service_id)
deleted += _delete_notifications(notification_type, seven_days_ago, service_id, qry_limit)

current_app.logger.info('Finished deleting {} notifications'.format(notification_type))
current_app.logger.info('Finished deleting %s notifications', notification_type)

return deleted

Expand Down Expand Up @@ -432,8 +435,9 @@ def _delete_letters_from_s3(
try:
remove_s3_object(bucket_name, s3_object['Key'])
except ClientError:
current_app.logger.exception(
"Could not delete S3 object with filename: {}".format(s3_object['Key']))
current_app.logger.error(
"Could not delete S3 object with filename: %s", s3_object['Key']
)


@statsd(namespace="dao")
Expand Down Expand Up @@ -518,9 +522,10 @@ def is_delivery_slow_for_provider(
slow_notifications = counts.get(True, 0)

if total_notifications:
current_app.logger.info("Slow delivery notifications count for provider {}: {} out of {}. Ratio {}".format(
current_app.logger.info(
"Slow delivery notifications count for provider %s: %d out of %d. Ratio %.3f",
provider, slow_notifications, total_notifications, slow_notifications / total_notifications
))
)
return slow_notifications / total_notifications >= threshold
else:
return False
Expand Down Expand Up @@ -737,9 +742,7 @@ def duplicate_update_warning(notification, status):
time_diff = datetime.utcnow() - (notification.updated_at or notification.created_at)

current_app.logger.info(
(
f'Duplicate callback received. Notification id {notification.id} received a status update to ' # nosec
f'{status} {time_diff} after being set to {notification.status}. {notification.notification_type} '
f'sent by {notification.sent_by}'
)
'Duplicate callback received. Notification id %s received a status update to '
'%s %s after being set to %s. %s sent by %s',
notification.id, status, time_diff, notification.status, notification.notification_type, notification.sent_by
)
11 changes: 11 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,12 @@ class Notification(db.Model):
reply_to_text = db.Column(db.String, nullable=True)
status_reason = db.Column(db.String, nullable=True)

# These attributes are for SMS billing stats. AWS Pinpoint relays price in millicents.
# ex. 645.0 millicents -> 0.654 cents -> $0.00645
# A message that exceeds the SMS length limit is broken into "segments."
segments_count = db.Column(db.Integer, nullable=False, default=0)
cost_in_millicents = db.Column(db.Float, nullable=False, default=0)

postage = db.Column(db.String, nullable=True)
billing_code = db.Column(db.String(256), nullable=True)
CheckConstraint("""
Expand Down Expand Up @@ -1594,6 +1600,8 @@ def serialize(self):
],
"billing_code": self.billing_code,
"sms_sender_id": self.sms_sender_id,
"segments_count": self.segments_count,
"cost_in_millicents": self.cost_in_millicents,
}

if self.notification_type == LETTER_TYPE:
Expand Down Expand Up @@ -1654,6 +1662,9 @@ class NotificationHistory(db.Model, HistoryModel):
sms_sender = db.relationship(ServiceSmsSender)
sms_sender_id = db.Column(UUID(as_uuid=True), db.ForeignKey('service_sms_senders.id'), nullable=True)

segments_count = db.Column(db.Integer, nullable=False, default=0)
cost_in_millicents = db.Column(db.Float, nullable=False, default=0)

postage = db.Column(db.String, nullable=True)
status_reason = db.Column(db.String, nullable=True)
billing_code = db.Column(db.String(256), nullable=True)
Expand Down
Loading

0 comments on commit 9bdf81e

Please sign in to comment.