Skip to content

Commit

Permalink
HOTFIX - Refactored Celery to Reduce Time Polling SQS (#1996)
Browse files Browse the repository at this point in the history
  • Loading branch information
k-macmillan authored Sep 18, 2024
1 parent a89bf47 commit 97918a6
Show file tree
Hide file tree
Showing 28 changed files with 100 additions and 152 deletions.
6 changes: 6 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ fileignoreconfig:
checksum: b2cbbb8508af49abccae8b35b317f7ce09215f3508430ed31440add78f450e5a
- filename: app/notifications/process_notifications.py
checksum: ae4e31c6eb56d91ec80ae09d13baf4558cf461c65f08893b93fee43f036a17a7
- filename: lambda_functions/pinpoint_callback/pinpoint_callback_lambda.py
checksum: 7bd4900e14b1fa789bbb2568b8a8d7a400e3c8350ba32fb44cc0b5b66a2df037
- filename: lambda_functions/ses_callback/ses_callback_lambda.py
checksum: b20c36921290a9609f158784e2a3278c36190887e6054ea548004a67675fd79b
- filename: poetry.lock
checksum: 34d12acdf749363555c31add4e7e7afa9e2a27afd792bd98c85f331b87bd7112
- filename: scripts/trigger_task.py
checksum: 0e9d244dbe285de23fc84bb643407963dacf7d25a3358373f01f6272fb217778
version: "1.0"
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,10 @@ the script for the command line given that you provide the task name, prefix and
required arguments to the task. For example, running the command

```bash
python scripts/trigger_task.py --task-name generate-daily-notification-status-report --queue-prefix dev-notification- --routing-key delivery-receipts --task-args 2022-01-28
python scripts/trigger_task.py --task-name generate-daily-notification-status-report --queue-prefix dev-notification- --routing-key delivery-status-result-tasks --task-args 2022-01-28
```

Will run the `generate-daily-notification-status-report` task in the queue `dev-notification-delivery-receipts` with
Will run the `generate-daily-notification-status-report` task in the queue `dev-notification-delivery-status-result-tasks` with
a date string of `2022-01-28` passed to the task.

## Adding Environment Variables to Task Definition Files
Expand Down
2 changes: 1 addition & 1 deletion app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def deliver_sms_with_rate_limiting(
retry_time,
)

self.retry(queue=QueueNames.RATE_LIMIT_RETRY, max_retries=None, countdown=retry_time)
self.retry(queue=QueueNames.RETRY, max_retries=None, countdown=retry_time)
except (NullValueForNonConditionalPlaceholderException, AttributeError, RuntimeError) as e:
log_and_update_technical_failure(notification_id, 'deliver_sms_with_rate_limiting', e)
raise NotificationTechnicalFailureException(f'Found {type(e).__name__}, NOT retrying...', e, e.args)
Expand Down
14 changes: 6 additions & 8 deletions app/celery/reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def create_nightly_billing(day_start=None):

if is_feature_enabled(FeatureFlag.NIGHTLY_NOTIF_CSV_ENABLED):
tasks = [
create_nightly_billing_for_day.si(process_day.isoformat()).set(queue=QueueNames.REPORTING),
generate_nightly_billing_csv_report.si(process_day.isoformat()).set(queue=QueueNames.REPORTING),
create_nightly_billing_for_day.si(process_day.isoformat()).set(queue=QueueNames.NOTIFY),
generate_nightly_billing_csv_report.si(process_day.isoformat()).set(queue=QueueNames.NOTIFY),
]

chain(*tasks).apply_async()
else:
create_nightly_billing_for_day.apply_async(
kwargs={'process_day': process_day.isoformat()}, queue=QueueNames.REPORTING
kwargs={'process_day': process_day.isoformat()}, queue=QueueNames.NOTIFY
)


Expand Down Expand Up @@ -123,16 +123,14 @@ def create_nightly_notification_status(day_start=None):

if is_feature_enabled(FeatureFlag.NIGHTLY_NOTIF_CSV_ENABLED):
tasks = [
create_nightly_notification_status_for_day.si(process_day.isoformat()).set(queue=QueueNames.REPORTING),
generate_daily_notification_status_csv_report.si(process_day.isoformat()).set(
queue=QueueNames.REPORTING
),
create_nightly_notification_status_for_day.si(process_day.isoformat()).set(queue=QueueNames.NOTIFY),
generate_daily_notification_status_csv_report.si(process_day.isoformat()).set(queue=QueueNames.NOTIFY),
]
chain(*tasks).apply_async()

else:
create_nightly_notification_status_for_day.apply_async(
kwargs={'process_day': process_day.isoformat()}, queue=QueueNames.REPORTING
kwargs={'process_day': process_day.isoformat()}, queue=QueueNames.NOTIFY
)


Expand Down
4 changes: 2 additions & 2 deletions app/celery/research_mode_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def send_sms_response(
headers = {'Content-type': 'application/json'}
elif provider == 'pinpoint':
body = pinpoint_notification_callback_record(reference)
process_pinpoint_receipt_tasks.process_pinpoint_results.apply_async([body], queue=QueueNames.RESEARCH_MODE)
process_pinpoint_receipt_tasks.process_pinpoint_results.apply_async([body], queue=QueueNames.NOTIFY)
return
else:
headers = {'Content-type': 'application/x-www-form-urlencoded'}
Expand All @@ -75,7 +75,7 @@ def send_email_response(
else:
body = ses_notification_callback(reference)

process_ses_receipts_tasks.process_ses_results.apply_async([body], queue=QueueNames.RESEARCH_MODE)
process_ses_receipts_tasks.process_ses_results.apply_async([body], queue=QueueNames.NOTIFY)


def make_request(
Expand Down
4 changes: 2 additions & 2 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
process_job.apply_async([str(job.id)], queue=QueueNames.NOTIFY)
current_app.logger.info('Job ID {} added to process job queue'.format(job.id))
except SQLAlchemyError:
current_app.logger.exception('Failed to run scheduled jobs')
Expand Down Expand Up @@ -128,7 +128,7 @@ def check_job_status():
job_ids.append(str(job.id))

if job_ids:
notify_celery.send_task(name=TaskNames.PROCESS_INCOMPLETE_JOBS, args=(job_ids,), queue=QueueNames.JOBS)
notify_celery.send_task(name=TaskNames.PROCESS_INCOMPLETE_JOBS, args=(job_ids,), queue=QueueNames.NOTIFY)
raise JobIncompleteError('Job(s) {} have not completed.'.format(job_ids))


Expand Down
11 changes: 6 additions & 5 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def process_row(
encrypted,
),
task_kwargs,
queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE,
queue=QueueNames.NOTIFY,
)


Expand Down Expand Up @@ -229,12 +229,12 @@ def save_sms(
if is_feature_enabled(FeatureFlag.SMS_SENDER_RATE_LIMIT_ENABLED) and sms_sender and sms_sender.rate_limit:
provider_tasks.deliver_sms_with_rate_limiting.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE,
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.NOTIFY,
)
else:
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE,
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.NOTIFY,
)

current_app.logger.debug(
Expand Down Expand Up @@ -290,7 +290,7 @@ def save_email(

provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE,
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.NOTIFY,
)

current_app.logger.debug('Email %s created at %s', saved_notification.id, saved_notification.created_at)
Expand Down Expand Up @@ -338,7 +338,8 @@ def save_letter(
)
if current_app.config['NOTIFY_ENVIRONMENT'] in ['preview', 'development']:
research_mode_tasks.create_fake_letter_response_file.apply_async(
(saved_notification.reference,), queue=QueueNames.RESEARCH_MODE
(saved_notification.reference,),
queue=QueueNames.NOTIFY,
)
else:
update_notification_status_by_reference(saved_notification.reference, 'delivered')
Expand Down
55 changes: 18 additions & 37 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,50 +32,30 @@


class QueueNames(object):
PERIODIC = 'periodic-tasks'
PRIORITY = 'priority-tasks'
DATABASE = 'database-tasks'
SEND_SMS = 'send-sms-tasks'
SEND_EMAIL = 'send-email-tasks'
RESEARCH_MODE = 'research-mode-tasks'
REPORTING = 'reporting-tasks'
JOBS = 'job-tasks'
RETRY = 'retry-tasks'
RATE_LIMIT_RETRY = 'rate-limit-retry-tasks'
NOTIFY = 'notify-internal-tasks'
PROCESS_FTP = 'process-ftp-tasks'
CALLBACKS = 'service-callbacks'
ANTIVIRUS = 'antivirus-tasks'
LOOKUP_CONTACT_INFO = 'lookup-contact-info-tasks'
LOOKUP_VA_PROFILE_ID = 'lookup-va-profile-id-tasks'
DELIVERY_RECEIPTS = 'delivery-receipts'
COMMUNICATION_ITEM_PERMISSIONS = 'communication-item-permissions'
SEND_ONSITE_NOTIFICATION = 'onsite-notification-tasks'
DELIVERY_STATUS_RESULT_TASKS = 'delivery-status-result-tasks'
LOOKUP_CONTACT_INFO = 'lookup-contact-info-tasks'
LOOKUP_VA_PROFILE_ID = 'lookup-va-profile-id-tasks'
NOTIFY = 'notify-internal-tasks'
PERIODIC = 'periodic-tasks'
RETRY = 'retry-tasks'
SEND_EMAIL = 'send-email-tasks'
SEND_SMS = 'send-sms-tasks'

@staticmethod
def all_queues():
return [
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,
QueueNames.SEND_SMS,
QueueNames.SEND_EMAIL,
QueueNames.RESEARCH_MODE,
QueueNames.REPORTING,
QueueNames.JOBS,
QueueNames.RETRY,
QueueNames.RATE_LIMIT_RETRY,
QueueNames.NOTIFY,
# QueueNames.CREATE_LETTERS_PDF,
QueueNames.CALLBACKS,
# QueueNames.LETTERS,
QueueNames.LOOKUP_CONTACT_INFO,
QueueNames.LOOKUP_VA_PROFILE_ID,
QueueNames.DELIVERY_RECEIPTS,
QueueNames.COMMUNICATION_ITEM_PERMISSIONS,
QueueNames.SEND_ONSITE_NOTIFICATION,
QueueNames.DELIVERY_STATUS_RESULT_TASKS,
QueueNames.LOOKUP_CONTACT_INFO,
QueueNames.LOOKUP_VA_PROFILE_ID,
QueueNames.NOTIFY,
QueueNames.PERIODIC,
QueueNames.RETRY,
QueueNames.SEND_EMAIL,
QueueNames.SEND_SMS,
]


Expand Down Expand Up @@ -233,12 +213,13 @@ class Config(object):
'broker_url': os.getenv('BROKER_URL', 'sqs://sqs.us-gov-west-1.amazonaws.com'),
'broker_transport_options': {
'region': AWS_REGION,
'polling_interval': 1, # 1 second
'polling_interval': 0.5, # seconds
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX,
'is_secure': os.getenv('BROKER_SSL_ENABLED', 'True') == 'True',
},
'worker_enable_remote_control': False,
'worker_prefetch_multiplier': 16,
'enable_utc': True,
'timezone': os.getenv('TIMEZONE', 'America/New_York'),
'accept_content': ['json', 'pickle'],
Expand Down Expand Up @@ -291,12 +272,12 @@ class Config(object):
'create-nightly-billing': {
'task': 'create-nightly-billing',
'schedule': crontab(hour=0, minute=15),
'options': {'queue': QueueNames.REPORTING},
'options': {'queue': QueueNames.NOTIFY},
},
'create-nightly-notification-status': {
'task': 'create-nightly-notification-status',
'schedule': crontab(hour=0, minute=30),
'options': {'queue': QueueNames.REPORTING},
'options': {'queue': QueueNames.NOTIFY},
},
'delete-sms-notifications': {
'task': 'delete-sms-notifications',
Expand Down
2 changes: 1 addition & 1 deletion app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def create_job(service_id):
sender_id = data.get('sender_id')

if job.job_status == JOB_STATUS_PENDING:
process_job.apply_async([str(job.id)], {'sender_id': sender_id}, queue=QueueNames.JOBS)
process_job.apply_async([str(job.id)], {'sender_id': sender_id}, queue=QueueNames.NOTIFY)

job_json = job_schema.dump(job)
job_json['statistics'] = []
Expand Down
2 changes: 1 addition & 1 deletion app/letters/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
def create_process_returned_letters_job():
references = validate(request.get_json(), letter_references)

process_returned_letters_list.apply_async([references['references']], queue=QueueNames.DATABASE)
process_returned_letters_list.apply_async([references['references']], queue=QueueNames.NOTIFY)

return jsonify(references=references['references']), 200
6 changes: 3 additions & 3 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def _get_delivery_task(
"""

if research_mode or notification.key_type == KEY_TYPE_TEST:
queue = QueueNames.RESEARCH_MODE
queue = QueueNames.NOTIFY

if notification.notification_type == SMS_TYPE:
if not queue:
Expand Down Expand Up @@ -261,15 +261,15 @@ def send_to_queue_for_recipient_info_based_on_recipient_identifier(
if id_type == IdentifierType.VA_PROFILE_ID.value:
tasks = [
send_va_onsite_notification_task.s(id_value, str(notification.template.id), onsite_enabled).set(
queue=QueueNames.SEND_ONSITE_NOTIFICATION
queue=QueueNames.NOTIFY
),
]

else:
tasks = [
lookup_va_profile_id.si(notification.id).set(queue=QueueNames.LOOKUP_VA_PROFILE_ID),
send_va_onsite_notification_task.s(str(notification.template.id), onsite_enabled).set(
queue=QueueNames.SEND_ONSITE_NOTIFICATION
queue=QueueNames.NOTIFY
),
]

Expand Down
5 changes: 1 addition & 4 deletions app/notifications/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ def send_notification(notification_type):
if simulated:
current_app.logger.debug('POST simulated notification for id: %s', notification_model.id)
else:
queue_name = QueueNames.PRIORITY if template.process_type == PRIORITY else None
send_notification_to_queue(
notification=notification_model, research_mode=authenticated_service.research_mode, queue=queue_name
)
send_notification_to_queue(notification=notification_model, research_mode=authenticated_service.research_mode)

notification_form['template_version'] = template.version

Expand Down
3 changes: 0 additions & 3 deletions app/service/send_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ def send_one_off_notification(
reference=create_one_off_reference(template.template_type),
)

queue_name = QueueNames.PRIORITY if template.process_type == PRIORITY else None

if template.template_type == LETTER_TYPE and service.research_mode:
_update_notification_status(
notification,
Expand All @@ -93,7 +91,6 @@ def send_one_off_notification(
send_notification_to_queue(
notification=notification,
research_mode=service.research_mode,
queue=queue_name,
)

return {'id': str(notification.id)}
Expand Down
6 changes: 1 addition & 5 deletions app/v2/notifications/post_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ def post_notification(notification_type): # noqa: C901
reply_to = get_reply_to_text(notification_type, form, template)

if notification_type == LETTER_TYPE:
notification = process_letter_notification(
letter_data=form, api_key=api_user, template=template, reply_to_text=reply_to
)
return jsonify(result='error', message='Not Implemented'), 501
else:
if 'email_address' in form or 'phone_number' in form:
notification = process_sms_or_email_notification(
Expand Down Expand Up @@ -206,12 +204,10 @@ def process_sms_or_email_notification(
if simulated:
current_app.logger.debug('POST simulated notification for id: %s', notification.id)
else:
queue_name = QueueNames.PRIORITY if template.process_type == PRIORITY else None
recipient_id_type = recipient_identifier.get('id_type') if recipient_identifier else None
send_notification_to_queue(
notification=notification,
research_mode=service.research_mode,
queue=queue_name,
recipient_id_type=recipient_id_type,
sms_sender_id=form.get('sms_sender_id'),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import uuid

ROUTING_KEY = 'delivery-receipts'
ROUTING_KEY = 'delivery-status-result-tasks'


def lambda_handler(
Expand Down
2 changes: 1 addition & 1 deletion lambda_functions/ses_callback/ses_callback_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import uuid

ROUTING_KEY = 'delivery-receipts'
ROUTING_KEY = 'delivery-status-result-tasks'


def lambda_handler(
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_celery.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -e

# Necessary to run as exec so the PID is transferred to Celery for the `SIGTERM` sent from ECS
exec celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=4
exec celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=16
2 changes: 1 addition & 1 deletion scripts/trigger_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def send_task(
'--task-name', dest='task_name', type=str, default='generate-daily-notification-status-csv-report'
)
parser.add_argument('--queue-prefix', dest='queue_prefix', type=str, default='dev-notification-')
parser.add_argument('--routing-key', dest='routing_key', type=str, default='delivery-receipts')
parser.add_argument('--routing-key', dest='routing_key', type=str, default='delivery-status-result-tasks')
parser.add_argument('--task-args', dest='task_args', nargs='+', default=[])

args = parser.parse_args()
Expand Down
2 changes: 1 addition & 1 deletion tests/app/celery/test_provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def test_deliver_sms_with_rate_limiting_should_retry_if_rate_limit_exceeded(
deliver_sms_with_rate_limiting(notification.id)

retry.assert_called_once_with(
queue=QueueNames.RATE_LIMIT_RETRY,
queue=QueueNames.RETRY,
max_retries=None,
countdown=sms_sender.rate_limit_interval / sms_sender.rate_limit,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/app/celery/test_research_mode_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_make_ses_callback(notify_api, mocker):
some_ref = str(uuid.uuid4())
send_email_response(reference=some_ref, to='test@test.com')

mock_task.apply_async.assert_called_once_with(ANY, queue=QueueNames.RESEARCH_MODE)
mock_task.apply_async.assert_called_once_with(ANY, queue=QueueNames.NOTIFY)
assert mock_task.apply_async.call_args[0][0][0] == ses_notification_callback(some_ref)


Expand Down
Loading

0 comments on commit 97918a6

Please sign in to comment.