Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reset request cache before handling event #201

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.5.0] - 2023-09-21
********************
Changed
=======
* Reset edx-django-utils RequestCache before handling each event

[5.4.0] - 2023-08-28
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.4.0'
__version__ = '5.5.0'
33 changes: 31 additions & 2 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from django.db import connection
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventBusConsumer
Expand Down Expand Up @@ -106,6 +107,31 @@ def _reconnect_to_db_if_needed():
connection.connect()


def _clear_request_cache():
"""
Clear the RequestCache so that each event consumption starts fresh.

Signal handlers may be written with the assumption that they are called in the context
of a web request, so we clear the request cache just in case.
"""
RequestCache.clear_all_namespaces()


def _prepare_for_new_work_cycle():
"""
Ensure that the application state is appropriate for performing a new unit of work.

This mimics some setup/teardown that is normally performed by Django in its
request/response based architecture and that is needed for ensuring a clean and
usable state in this worker-based application.
"""
# Ensure that the database connection is active and usable.
_reconnect_to_db_if_needed()

# Clear the request cache, in case anything in the signal handlers rely on it.
_clear_request_cache()


class KafkaEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
Expand Down Expand Up @@ -278,8 +304,11 @@ def _consume_indefinitely(self):
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
# Before processing, try to make sure our application state is cleaned
# up as would happen at the start of a Django request/response cycle.
# See https://github.com/openedx/openedx-events/issues/236 for details.
_prepare_for_new_work_cycle()

signal = self.determine_signal(msg)
msg.set_value(self._deserialize_message_value(msg, signal))
self.emit_signals_from_message(msg, signal)
Expand Down