Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid infinite recursion of openedx-event #312

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ Change Log
Unreleased
----------

[9.5.1] - 2024-02-12
--------------------
Changed
~~~~~~~
* Fixed recursion error when consuming events on the same service that produced them.

[9.5.0] - 2024-02-07
--------------------
Added
Expand Down
2 changes: 1 addition & 1 deletion openedx_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
more information about the project.
"""

__version__ = "9.5.0"
__version__ = "9.5.1"
12 changes: 11 additions & 1 deletion openedx_events/apps.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
openedx_events Django application initialization.
"""
import logging

from django.apps import AppConfig
from django.conf import settings

from openedx_events.event_bus import get_producer
from openedx_events.exceptions import ProducerConfigurationError
from openedx_events.tooling import OpenEdxPublicSignal, load_all_signals
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS, OpenEdxPublicSignal, load_all_signals

logger = logging.getLogger(__name__)


def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-argument
Expand All @@ -20,6 +23,13 @@ def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-
# "topic_a": { "event_key_field": "my.key.field", "enabled": True },
# "topic_b": { "event_key_field": "my.key.field", "enabled": False }
# }"
if kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS) is True:
logger.debug(
"Declining to send signal to the Event Bus since that's "
f"where it was sent from: {signal.event_type} (preventing recursion)"
)
return

event_data = {key: kwargs.get(key) for key in signal.init_data}

for topic in event_type_producer_configs.keys():
Expand Down
23 changes: 23 additions & 0 deletions openedx_events/tests/test_producer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ def test_enabled_disabled_events(self, mock_producer):
call_args
)

@patch("openedx_events.apps.logger")
@patch('openedx_events.apps.get_producer')
def test_send_events_with_custom_metadata_not_replayed_by_handler(self, mock_producer, mock_logger):
"""
Check wheter XBLOCK_PUBLISHED is connected to the handler and the handler
do not send any events as the signal is marked "from_event_bus".

Args:
mock_producer: mock get_producer to inspect the arguments.
mock_logger: mock logger to inspect the arguments.
"""
mock_send = Mock()
mock_producer.return_value = mock_send
metadata = XBLOCK_PUBLISHED.generate_signal_metadata()

XBLOCK_PUBLISHED.send_event_with_custom_metadata(metadata, xblock_info=self.xblock_info)

mock_send.send.assert_not_called()
mock_logger.debug.assert_called_once_with(
"Declining to send signal to the Event Bus since that's "
f"where it was sent from: {XBLOCK_PUBLISHED.event_type} (preventing recursion)"
)

@patch('openedx_events.apps.get_producer')
@override_settings(EVENT_BUS_PRODUCER_CONFIG={})
def test_events_not_in_config(self, mock_producer):
Expand Down
6 changes: 4 additions & 2 deletions openedx_events/tests/test_tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def test_send_event_allow_failure_successfully(self, send_mock, fake_metadata):
sender=None,
user=self.user_mock,
metadata=expected_metadata,
from_event_bus=False,
)

@patch("openedx_events.tooling.OpenEdxPublicSignal.generate_signal_metadata")
Expand All @@ -198,7 +199,8 @@ def test_send_robust_event_successfully(self, format_responses_mock, log_mock, f
self.public_signal.send_event(user=self.user_mock)

self.ok_receiver.assert_called_once_with(
signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata
signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata,
from_event_bus=False
)
# format_responses is mocked out because its output is
# complicated enough to warrant its own set of tests.
Expand Down Expand Up @@ -253,7 +255,7 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata):

assert response == expected_response
mock_send_event_with_metadata.assert_called_once_with(
metadata=metadata, send_robust=True, foo="bar",
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True
)

@ddt.data(
Expand Down
13 changes: 11 additions & 2 deletions openedx_events/tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
"org.openedx.learning.course.notification.requested.v1",
]

SIGNAL_PROCESSED_FROM_EVENT_BUS = "from_event_bus"


class OpenEdxPublicSignal(Signal):
"""
Expand Down Expand Up @@ -115,7 +117,7 @@ def generate_signal_metadata(self, time=None):
time=time,
)

def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs):
def _send_event_with_metadata(self, metadata, send_robust=True, from_event_bus=False, **kwargs):
"""
Send events to all connected receivers with the provided metadata.

Expand All @@ -124,6 +126,10 @@ def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs):
Arguments:
metadata (EventsMetadata): The metadata to be sent with the signal.
send_robust (bool): Defaults to True. See Django signal docs.
from_event_bus (bool): Defaults to False. If True, the signal is
being sent from the event bus. This is used to prevent infinite
loops when the event bus is consuming events. It should not be
used when sending events from the application.

See ``send_event`` docstring for more details on its usage and behavior.
"""
Expand Down Expand Up @@ -163,6 +169,7 @@ def validate_sender():
validate_sender()

kwargs["metadata"] = metadata
kwargs[SIGNAL_PROCESSED_FROM_EVENT_BUS] = from_event_bus

if self._allow_send_event_failure or settings.DEBUG or not send_robust:
return super().send(sender=None, **kwargs)
Expand Down Expand Up @@ -234,7 +241,9 @@ def send_event_with_custom_metadata(
See ``send_event`` docstring for more details.

"""
return self._send_event_with_metadata(metadata=metadata, send_robust=send_robust, **kwargs)
return self._send_event_with_metadata(
metadata=metadata, send_robust=send_robust, from_event_bus=True, **kwargs
)

def send(self, sender, **kwargs): # pylint: disable=unused-argument
"""
Expand Down
Loading