diff --git a/app/celery/process_pinpoint_receipt_tasks.py b/app/celery/process_pinpoint_receipt_tasks.py index cb54b90c53..e7c8b6d940 100644 --- a/app/celery/process_pinpoint_receipt_tasks.py +++ b/app/celery/process_pinpoint_receipt_tasks.py @@ -10,7 +10,11 @@ from app import notify_celery, statsd_client from app.config import QueueNames -from app.dao.notifications_dao import update_notification_status_by_id, dao_get_notification_by_reference +from app.dao.notifications_dao import ( + dao_get_notification_by_reference, + dao_update_notification, + update_notification_status_by_id, +) from app.feature_flags import FeatureFlag, is_feature_enabled from app.models import ( NOTIFICATION_DELIVERED, @@ -42,16 +46,6 @@ } -def event_type_is_optout(event_type, reference): - is_optout = event_type == '_SMS.OPTOUT' - - if is_optout: - current_app.logger.info( - f"event type is OPTOUT for notification with reference {reference})" - ) - return is_optout - - def _map_record_status_to_notification_status(record_status): return _record_status_status_mapping[record_status] @@ -59,19 +53,45 @@ def _map_record_status_to_notification_status(record_status): @notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def process_pinpoint_results(self, response): + """ + Process a Pinpoint SMS stream event. Messages long enough to require multiple segments only + result in one event that contains the aggregate cost. + + https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams-data-sms.html + """ + if not is_feature_enabled(FeatureFlag.PINPOINT_RECEIPTS_ENABLED): - current_app.logger.info('Pinpoint receipts toggle is disabled, skipping callback task') + current_app.logger.info('Pinpoint receipts toggle is disabled. Skipping callback task.') return True try: pinpoint_message = json.loads(base64.b64decode(response['Message'])) - reference = pinpoint_message['attributes']['message_id'] - event_type = pinpoint_message.get('event_type') - record_status = pinpoint_message['attributes']['record_status'] - current_app.logger.info( - f'received callback from Pinpoint with event_type of {event_type} and record_status of {record_status}' - f' with reference {reference}' - ) + except (json.decoder.JSONDecodeError, ValueError, TypeError, KeyError) as e: + current_app.logger.exception(e) + self.retry(queue=QueueNames.RETRY) + return None + + try: + pinpoint_attributes = pinpoint_message["attributes"] + reference = pinpoint_attributes["message_id"] + event_type = pinpoint_message["event_type"] + record_status = pinpoint_attributes["record_status"] + number_of_message_parts = pinpoint_attributes["number_of_message_parts"] + price_in_millicents_usd = pinpoint_message["metrics"]["price_in_millicents_usd"] + except KeyError as e: + current_app.logger.error("The event stream message data is missing expected attributes.") + current_app.logger.exception(e) + current_app.logger.debug(pinpoint_message) + self.retry(queue=QueueNames.RETRY) + return None + + current_app.logger.info( + "Processing Pinpoint result. | reference=%s | event_type=%s | record_status=%s | " + "number_of_message_parts=%s | price_in_millicents_usd=%s", + reference, event_type, record_status, number_of_message_parts, price_in_millicents_usd + ) + + try: notification_status = get_notification_status(event_type, record_status, reference) notification, should_retry, should_exit = attempt_to_get_notification( @@ -84,13 +104,22 @@ def process_pinpoint_results(self, response): if should_exit: return - update_notification_status_by_id( - notification_id=notification.id, - status=notification_status - ) + assert notification is not None + + if price_in_millicents_usd > 0.0: + notification.status = notification_status + notification.segments_count = number_of_message_parts + notification.cost_in_millicents = price_in_millicents_usd + dao_update_notification(notification) + else: + update_notification_status_by_id( + notification_id=notification.id, + status=notification_status + ) current_app.logger.info( - f"Pinpoint callback return status of {notification_status} for notification: {notification.id}" + "Pinpoint callback return status of %s for notification: %s", + notification_status, notification.id ) statsd_client.incr(f"callback.pinpoint.{notification_status}") @@ -104,15 +133,19 @@ def process_pinpoint_results(self, response): return True except Retry: + # This block exists to preempt executing the "Exception" logic below. A better approach is + # to catch specific exceptions where they might occur. raise - except Exception as e: - current_app.logger.exception(f"Error processing Pinpoint results: {type(e)}") + current_app.logger.exception(e) self.retry(queue=QueueNames.RETRY) + return None + def get_notification_status(event_type: str, record_status: str, reference: str) -> str: - if event_type_is_optout(event_type, reference): + if event_type == '_SMS.OPTOUT': + current_app.logger.info("event type is OPTOUT for notification with reference %s", reference) statsd_client.incr(f"callback.pinpoint.optout") notification_status = NOTIFICATION_PERMANENT_FAILURE else: @@ -133,18 +166,18 @@ def attempt_to_get_notification( message_time = datetime.datetime.fromtimestamp(int(event_timestamp_in_ms) / 1000) if datetime.datetime.utcnow() - message_time < datetime.timedelta(minutes=5): current_app.logger.info( - f'Pinpoint callback event for reference {reference} was received less than five minutes ago.' + 'Pinpoint callback event for reference %s was received less than five minutes ago.', reference ) should_retry = True else: current_app.logger.warning( - f'notification not found for reference: {reference} (update to {notification_status})' + 'notification not found for reference: %s (update to %s)', reference, notification_status ) statsd_client.incr('callback.pinpoint.no_notification_found') should_exit = True except MultipleResultsFound: current_app.logger.warning( - f'multiple notifications found for reference: {reference} (update to {notification_status})' + 'multiple notifications found for reference: %s (update to %s)', reference, notification_status ) statsd_client.incr('callback.pinpoint.multiple_notifications_found') should_exit = True @@ -153,25 +186,26 @@ def attempt_to_get_notification( def check_notification_status(notification: Notification, notification_status: str) -> bool: - should_exit = False - # do not update if status has not changed + # Do not update if the status has not changed. if notification_status == notification.status: current_app.logger.info( - f'Pinpoint callback received the same status of {notification_status} for' - f' notification {notification_status})' + 'Pinpoint callback received the same status of %s for notification %s)', + notification_status, notification_status ) - should_exit = True - # do not update if notification status is in a final state + return True + + # Do not update if notification status is in a final state. if notification.status in FINAL_STATUS_STATES: log_notification_status_warning(notification, notification_status) - should_exit = True - return should_exit + return True + + return False def log_notification_status_warning(notification, status: str) -> None: time_diff = datetime.datetime.utcnow() - (notification.updated_at or notification.created_at) current_app.logger.warning( - f'Invalid callback received. Notification id {notification.id} received a status update to {status}' - f' {time_diff} after being set to {notification.status}. {notification.notification_type}' - f' sent by {notification.sent_by}' + 'Invalid callback received. Notification id %s received a status update to %s ' + '%s after being set to %s. %s sent by %s', + notification.id, status, time_diff, notification.status, notification.notification_type, notification.sent_by ) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 4c9b88f13e..6a87ecd324 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -150,7 +150,7 @@ def update_notification_status_by_reference(reference, status): notification = Notification.query.filter(Notification.reference == reference).first() if not notification: - current_app.logger.error('notification not found for reference {} (update to {})'.format(reference, status)) + current_app.logger.error("Notification not found for reference %s (update to %s)", reference, status) return None if notification.status not in { @@ -295,7 +295,8 @@ def _filter_query(query, filter_dict=None): @statsd(namespace="dao") def delete_notifications_older_than_retention_by_type(notification_type, qry_limit=10000): current_app.logger.info( - 'Deleting {} notifications for services with flexible data retention'.format(notification_type)) + 'Deleting %s notifications for services with flexible data retention', notification_type + ) flexible_data_retention = ServiceDataRetention.query.filter( ServiceDataRetention.notification_type == notification_type @@ -313,11 +314,13 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim insert_update_notification_history(notification_type, days_of_retention, f.service_id) current_app.logger.info( - "Deleting {} notifications for service id: {}".format(notification_type, f.service_id)) + "Deleting %s notifications for service id: %s", notification_type, f.service_id + ) deleted += _delete_notifications(notification_type, days_of_retention, f.service_id, qry_limit) current_app.logger.info( - 'Deleting {} notifications for services without flexible data retention'.format(notification_type)) + 'Deleting %s notifications for services without flexible data retention', notification_type + ) seven_days_ago = get_local_timezone_midnight_in_utc( convert_utc_to_local_timezone(datetime.utcnow()).date()) - timedelta(days=7) @@ -332,7 +335,7 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim insert_update_notification_history(notification_type, seven_days_ago, service_id) deleted += _delete_notifications(notification_type, seven_days_ago, service_id, qry_limit) - current_app.logger.info('Finished deleting {} notifications'.format(notification_type)) + current_app.logger.info('Finished deleting %s notifications', notification_type) return deleted @@ -432,8 +435,9 @@ def _delete_letters_from_s3( try: remove_s3_object(bucket_name, s3_object['Key']) except ClientError: - current_app.logger.exception( - "Could not delete S3 object with filename: {}".format(s3_object['Key'])) + current_app.logger.error( + "Could not delete S3 object with filename: %s", s3_object['Key'] + ) @statsd(namespace="dao") @@ -518,9 +522,10 @@ def is_delivery_slow_for_provider( slow_notifications = counts.get(True, 0) if total_notifications: - current_app.logger.info("Slow delivery notifications count for provider {}: {} out of {}. Ratio {}".format( + current_app.logger.info( + "Slow delivery notifications count for provider %s: %d out of %d. Ratio %.3f", provider, slow_notifications, total_notifications, slow_notifications / total_notifications - )) + ) return slow_notifications / total_notifications >= threshold else: return False @@ -737,9 +742,7 @@ def duplicate_update_warning(notification, status): time_diff = datetime.utcnow() - (notification.updated_at or notification.created_at) current_app.logger.info( - ( - f'Duplicate callback received. Notification id {notification.id} received a status update to ' # nosec - f'{status} {time_diff} after being set to {notification.status}. {notification.notification_type} ' - f'sent by {notification.sent_by}' - ) + 'Duplicate callback received. Notification id %s received a status update to ' + '%s %s after being set to %s. %s sent by %s', + notification.id, status, time_diff, notification.status, notification.notification_type, notification.sent_by ) diff --git a/app/models.py b/app/models.py index e5b9dd16ef..2deaae60cc 100644 --- a/app/models.py +++ b/app/models.py @@ -1368,6 +1368,12 @@ class Notification(db.Model): reply_to_text = db.Column(db.String, nullable=True) status_reason = db.Column(db.String, nullable=True) + # These attributes are for SMS billing stats. AWS Pinpoint relays price in millicents. + # ex. 645.0 millicents -> 0.654 cents -> $0.00645 + # A message that exceeds the SMS length limit is broken into "segments." + segments_count = db.Column(db.Integer, nullable=False, default=0) + cost_in_millicents = db.Column(db.Float, nullable=False, default=0) + postage = db.Column(db.String, nullable=True) billing_code = db.Column(db.String(256), nullable=True) CheckConstraint(""" @@ -1594,6 +1600,8 @@ def serialize(self): ], "billing_code": self.billing_code, "sms_sender_id": self.sms_sender_id, + "segments_count": self.segments_count, + "cost_in_millicents": self.cost_in_millicents, } if self.notification_type == LETTER_TYPE: @@ -1654,6 +1662,9 @@ class NotificationHistory(db.Model, HistoryModel): sms_sender = db.relationship(ServiceSmsSender) sms_sender_id = db.Column(UUID(as_uuid=True), db.ForeignKey('service_sms_senders.id'), nullable=True) + segments_count = db.Column(db.Integer, nullable=False, default=0) + cost_in_millicents = db.Column(db.Float, nullable=False, default=0) + postage = db.Column(db.String, nullable=True) status_reason = db.Column(db.String, nullable=True) billing_code = db.Column(db.String(256), nullable=True) diff --git a/documents/openapi/openapi-with-push.yaml b/documents/openapi/openapi-with-push.yaml index adfb07558d..13303f8932 100644 --- a/documents/openapi/openapi-with-push.yaml +++ b/documents/openapi/openapi-with-push.yaml @@ -1108,6 +1108,75 @@ paths: application/json: schema: $ref: '#/components/schemas/NotificationStatusResponse' + examples: + sms: + value: + billing_code: string + body: string + completed_at: 2022-11-28T18:59:00.542420Z + cost_in_millicents: 634.0 + created_at: 2022-11-28T18:58:18.998715Z + created_by_name: string + email_address: null + id: string + line_1: string + line_2: string + line_3: string + line_4: string + line_5: string + line_6: string + phone_number: "+10123456789" + postage: string + postcode: string + provider_reference: egsqaoi2hjok3mi0mlelijfk3i8ckalt5tsu6s00 + recipient_identifiers: [] + reference: string + scheduled_for: 2022-11-30T19:00:00.542420Z + segments_count: 2 + sent_at: 2022-11-28T18:59:18.998715Z + sent_by: string + status: created + status_reason: string + subject: string + template: + id: a21a1888-af44-4cb4-8d20-19bd0d3902f9 + uri: string + version: 0 + type: sms + email: + value: + billing_code: string + body: string + completed_at: 2022-11-28T18:59:18.998715Z + cost_in_millicents: 0.0 + created_at: 2022-11-28T18:58:18.998715Z + created_by_name: string + email_address: user@example.com + id: string + line_1: string + line_2: string + line_3: string + line_4: string + line_5: string + line_6: string + phone_number: null + postage: string + postcode: string + provider_reference: egsqaoi2hjok3mi0mlelijfk3i8ckalt5tsu6s00 + recipient_identifiers: [] + reference: string + scheduled_for: 2022-11-30T19:00:00.542420Z + segments_count: 0 + sent_at: 2022-11-28T18:59:18.998715Z + sent_by: string + status: created + status_reason: string + subject: string + template: + id: a21a1888-af44-4cb4-8d20-19bd0d3902f9 + uri: string + version: 0 + type: email '401': $ref: '#/components/responses/Unauthorized' '403': @@ -2014,6 +2083,8 @@ components: type: string format: date-time nullable: true + cost_in_millicents: + type: number created_at: type: string format: date-time @@ -2063,6 +2134,8 @@ components: type: string format: date-time nullable: true + segments_count: + type: integer sent_at: type: string format: date-time diff --git a/documents/openapi/openapi.yaml b/documents/openapi/openapi.yaml index cdecbb5334..fc94c76782 100644 --- a/documents/openapi/openapi.yaml +++ b/documents/openapi/openapi.yaml @@ -1183,6 +1183,7 @@ paths: billing_code: string body: string completed_at: 2022-11-28T18:59:00.542420Z + cost_in_millicents: 634.0 created_at: 2022-11-28T18:58:18.998715Z created_by_name: string email_address: null @@ -1200,6 +1201,7 @@ paths: recipient_identifiers: [] reference: string scheduled_for: 2022-11-30T19:00:00.542420Z + segments_count: 2 sent_at: 2022-11-28T18:59:18.998715Z sent_by: string status: created @@ -1216,6 +1218,7 @@ paths: billing_code: string body: string completed_at: 2022-11-28T18:59:18.998715Z + cost_in_millicents: 0.0 created_at: 2022-11-28T18:58:18.998715Z created_by_name: string email_address: user@example.com @@ -1233,6 +1236,7 @@ paths: recipient_identifiers: [] reference: string scheduled_for: 2022-11-30T19:00:00.542420Z + segments_count: 0 sent_at: 2022-11-28T18:59:18.998715Z sent_by: string status: created @@ -1243,7 +1247,6 @@ paths: uri: string version: 0 type: email - '401': $ref: '#/components/responses/Unauthorized' '403': @@ -2169,6 +2172,8 @@ components: type: string format: date-time nullable: true + cost_in_millicents: + type: number created_at: type: string format: date-time @@ -2224,6 +2229,8 @@ components: type: string format: date-time nullable: true + segments_count: + type: integer sent_at: type: string format: date-time diff --git a/migrations/versions/0355_sms_billing.py b/migrations/versions/0355_sms_billing.py new file mode 100644 index 0000000000..5ac10f7a0f --- /dev/null +++ b/migrations/versions/0355_sms_billing.py @@ -0,0 +1,25 @@ +""" +Revision ID: 0355_sms_billing +Revises: 0354_notification_sms_sender_id +Create Date: 2023-01-11 20:09:58.840087 +""" + +from alembic import op +import sqlalchemy as sa + +revision = '0355_sms_billing' +down_revision = '0354_notification_sms_sender_id' + + +def upgrade(): + op.add_column('notifications', sa.Column('segments_count', sa.Integer(), nullable=False, server_default='0')) + op.add_column('notifications', sa.Column('cost_in_millicents', sa.Float(), nullable=False, server_default="0.0")) + op.add_column('notification_history', sa.Column('segments_count', sa.Integer(), nullable=False, server_default='0')) + op.add_column('notification_history', sa.Column('cost_in_millicents', sa.Float(), nullable=False, server_default="0.0")) + + +def downgrade(): + op.drop_column('notification_history', 'cost_in_millicents') + op.drop_column('notification_history', 'segments_count') + op.drop_column('notifications', 'cost_in_millicents') + op.drop_column('notifications', 'segments_count') diff --git a/tests/app/celery/test_process_pinpoint_receipt_tasks.py b/tests/app/celery/test_process_pinpoint_receipt_tasks.py index b26e4a06c9..02b7779ce4 100644 --- a/tests/app/celery/test_process_pinpoint_receipt_tasks.py +++ b/tests/app/celery/test_process_pinpoint_receipt_tasks.py @@ -130,7 +130,95 @@ def test_process_pinpoint_results_should_not_update_notification_status_if_statu mock_callback.assert_not_called() -def pinpoint_notification_callback_record(reference, event_type='_SMS.SUCCESS', record_status='DELIVERED'): +def test_process_pinpoint_results_segments_and_price_buffered_first( + mocker, + db_session, + sample_template +): + """ + Test process a Pinpoint SMS stream event. Messages long enough to require multiple segments only + result in one event that contains the aggregate cost. + """ + + mocker.patch('app.celery.process_pinpoint_receipt_tasks.is_feature_enabled', return_value=True) + test_reference = 'sms-reference-1' + create_notification(sample_template, reference=test_reference, sent_at=datetime.datetime.utcnow(), status='sending') + notification = notifications_dao.dao_get_notification_by_reference(test_reference) + assert notification.segments_count == 0, "This is the default." + assert notification.cost_in_millicents == 0.0, "This is the default." + + # Receiving a _SMS.BUFFERED+SUCCESSFUL event first should update the notification. + + process_pinpoint_receipt_tasks.process_pinpoint_results( + response=pinpoint_notification_callback_record( + reference=test_reference, + event_type='_SMS.BUFFERED', + record_status='SUCCESSFUL', + number_of_message_parts=6, + price=4986.0 + ) + ) + + notification = notifications_dao.dao_get_notification_by_reference(test_reference) + assert notification.segments_count == 6 + assert notification.cost_in_millicents == 4986.0 + + # A subsequent _SMS.SUCCESS+DELIVERED event should not alter the segments and price columns. + + process_pinpoint_receipt_tasks.process_pinpoint_results( + response=pinpoint_notification_callback_record( + reference=test_reference, + event_type='_SMS.SUCCESS', + record_status='DELIVERED', + number_of_message_parts=6, + price=4986.0 + ) + ) + + notification = notifications_dao.dao_get_notification_by_reference(test_reference) + assert notification.segments_count == 6 + assert notification.cost_in_millicents == 4986.0 + + +def test_process_pinpoint_results_segments_and_price_success_first( + mocker, + db_session, + sample_template +): + """ + Test process a Pinpoint SMS stream event. Messages long enough to require multiple segments only + result in one event that contains the aggregate cost. + + Receiving a _SMS.SUCCESS+DELIVERED without any preceeding _SMS.BUFFERED event should update the + notification. + """ + + mocker.patch('app.celery.process_pinpoint_receipt_tasks.is_feature_enabled', return_value=True) + test_reference = 'sms-reference-1' + create_notification(sample_template, reference=test_reference, sent_at=datetime.datetime.utcnow(), status='sending') + + process_pinpoint_receipt_tasks.process_pinpoint_results( + response=pinpoint_notification_callback_record( + reference=test_reference, + event_type='_SMS.SUCCESS', + record_status='DELIVERED', + number_of_message_parts=4, + price=2986.0 + ) + ) + + notification = notifications_dao.dao_get_notification_by_reference(test_reference) + assert notification.segments_count == 4 + assert notification.cost_in_millicents == 2986.0 + + +def pinpoint_notification_callback_record( + reference, + event_type='_SMS.SUCCESS', + record_status='DELIVERED', + number_of_message_parts=1, + price=645.0 +): pinpoint_message = { "event_type": event_type, "event_timestamp": 1553104954322, @@ -155,13 +243,13 @@ def pinpoint_notification_callback_record(reference, event_type='_SMS.SUCCESS', "record_status": record_status, "iso_country_code": "US", "treatment_id": "0", - "number_of_message_parts": "1", + "number_of_message_parts": number_of_message_parts, "message_id": reference, "message_type": "Transactional", "campaign_id": "12345" }, "metrics": { - "price_in_millicents_usd": 645.0 + "price_in_millicents_usd": price, }, "awsAccountId": "123456789012" } diff --git a/tests/app/v2/notifications/test_get_notifications.py b/tests/app/v2/notifications/test_get_notifications.py index e7666e52da..42e3415e6c 100644 --- a/tests/app/v2/notifications/test_get_notifications.py +++ b/tests/app/v2/notifications/test_get_notifications.py @@ -75,6 +75,8 @@ def test_get_notification_by_id_returns_200(client, billable_units, provider, sa 'recipient_identifiers': [], 'billing_code': sample_notification.billing_code, 'sms_sender_id': None, + 'cost_in_millicents': 0.0, + 'segments_count': 0, } assert json_response == expected_response @@ -138,6 +140,8 @@ def test_get_notification_by_id_with_placeholders_and_recipient_identifiers_retu 'recipient_identifiers': recipient_identifiers if recipient_identifiers else [], 'billing_code': None, 'sms_sender_id': None, + 'cost_in_millicents': 0.0, + 'segments_count': 0, } assert json_response == expected_response