From 2c308fa7813a4bfa887d7c1905c6e3cf46369620 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 9 Feb 2024 13:21:33 -0500 Subject: [PATCH 1/2] fix: avoid infinite recursion of openedx-event chore: quality fixes fix: mark signals as already emitted by event bus automatically test: update test with from_event_bus chore: address suggestion test: implement tests chore: quality fixes chore: address suggestion test: correct tests chore: update debug log for signal processing from event bus Co-authored-by: Tim McCormack chore: handle pr comments chore: handle pr comments --- openedx_events/apps.py | 12 +++++++++- openedx_events/tests/test_producer_config.py | 23 ++++++++++++++++++++ openedx_events/tests/test_tooling.py | 6 +++-- openedx_events/tooling.py | 13 +++++++++-- 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/openedx_events/apps.py b/openedx_events/apps.py index 6326e620..ea416eb0 100644 --- a/openedx_events/apps.py +++ b/openedx_events/apps.py @@ -1,13 +1,16 @@ """ openedx_events Django application initialization. """ +import logging from django.apps import AppConfig from django.conf import settings from openedx_events.event_bus import get_producer from openedx_events.exceptions import ProducerConfigurationError -from openedx_events.tooling import OpenEdxPublicSignal, load_all_signals +from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS, OpenEdxPublicSignal, load_all_signals + +logger = logging.getLogger(__name__) def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-argument @@ -20,6 +23,13 @@ def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused- # "topic_a": { "event_key_field": "my.key.field", "enabled": True }, # "topic_b": { "event_key_field": "my.key.field", "enabled": False } # }" + if kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS) is True: + logger.debug( + "Declining to send signal to the Event Bus since that's " + f"where it was sent from: {signal.event_type} (preventing recursion)" + ) + return + event_data = {key: kwargs.get(key) for key in signal.init_data} for topic in event_type_producer_configs.keys(): diff --git a/openedx_events/tests/test_producer_config.py b/openedx_events/tests/test_producer_config.py index 05c4b8df..1b89e07e 100644 --- a/openedx_events/tests/test_producer_config.py +++ b/openedx_events/tests/test_producer_config.py @@ -55,6 +55,29 @@ def test_enabled_disabled_events(self, mock_producer): call_args ) + @patch("openedx_events.apps.logger") + @patch('openedx_events.apps.get_producer') + def test_send_events_with_custom_metadata_not_replayed_by_handler(self, mock_producer, mock_logger): + """ + Check wheter XBLOCK_PUBLISHED is connected to the handler and the handler + do not send any events as the signal is marked "from_event_bus". + + Args: + mock_producer: mock get_producer to inspect the arguments. + mock_logger: mock logger to inspect the arguments. + """ + mock_send = Mock() + mock_producer.return_value = mock_send + metadata = XBLOCK_PUBLISHED.generate_signal_metadata() + + XBLOCK_PUBLISHED.send_event_with_custom_metadata(metadata, xblock_info=self.xblock_info) + + mock_send.send.assert_not_called() + mock_logger.debug.assert_called_once_with( + "Declining to send signal to the Event Bus since that's " + f"where it was sent from: {XBLOCK_PUBLISHED.event_type} (preventing recursion)" + ) + @patch('openedx_events.apps.get_producer') @override_settings(EVENT_BUS_PRODUCER_CONFIG={}) def test_events_not_in_config(self, mock_producer): diff --git a/openedx_events/tests/test_tooling.py b/openedx_events/tests/test_tooling.py index f3aa01d5..da480d3d 100644 --- a/openedx_events/tests/test_tooling.py +++ b/openedx_events/tests/test_tooling.py @@ -179,6 +179,7 @@ def test_send_event_allow_failure_successfully(self, send_mock, fake_metadata): sender=None, user=self.user_mock, metadata=expected_metadata, + from_event_bus=False, ) @patch("openedx_events.tooling.OpenEdxPublicSignal.generate_signal_metadata") @@ -198,7 +199,8 @@ def test_send_robust_event_successfully(self, format_responses_mock, log_mock, f self.public_signal.send_event(user=self.user_mock) self.ok_receiver.assert_called_once_with( - signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata + signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata, + from_event_bus=False ) # format_responses is mocked out because its output is # complicated enough to warrant its own set of tests. @@ -253,7 +255,7 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata): assert response == expected_response mock_send_event_with_metadata.assert_called_once_with( - metadata=metadata, send_robust=True, foo="bar", + metadata=metadata, send_robust=True, foo="bar", from_event_bus=True ) @ddt.data( diff --git a/openedx_events/tooling.py b/openedx_events/tooling.py index 4994c833..2a5b2853 100644 --- a/openedx_events/tooling.py +++ b/openedx_events/tooling.py @@ -30,6 +30,8 @@ "org.openedx.learning.course.notification.requested.v1", ] +SIGNAL_PROCESSED_FROM_EVENT_BUS = "from_event_bus" + class OpenEdxPublicSignal(Signal): """ @@ -115,7 +117,7 @@ def generate_signal_metadata(self, time=None): time=time, ) - def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs): + def _send_event_with_metadata(self, metadata, send_robust=True, from_event_bus=False, **kwargs): """ Send events to all connected receivers with the provided metadata. @@ -124,6 +126,10 @@ def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs): Arguments: metadata (EventsMetadata): The metadata to be sent with the signal. send_robust (bool): Defaults to True. See Django signal docs. + from_event_bus (bool): Defaults to False. If True, the signal is + being sent from the event bus. This is used to prevent infinite + loops when the event bus is consuming events. It should not be + used when sending events from the application. See ``send_event`` docstring for more details on its usage and behavior. """ @@ -163,6 +169,7 @@ def validate_sender(): validate_sender() kwargs["metadata"] = metadata + kwargs[SIGNAL_PROCESSED_FROM_EVENT_BUS] = from_event_bus if self._allow_send_event_failure or settings.DEBUG or not send_robust: return super().send(sender=None, **kwargs) @@ -234,7 +241,9 @@ def send_event_with_custom_metadata( See ``send_event`` docstring for more details. """ - return self._send_event_with_metadata(metadata=metadata, send_robust=send_robust, **kwargs) + return self._send_event_with_metadata( + metadata=metadata, send_robust=send_robust, from_event_bus=True, **kwargs + ) def send(self, sender, **kwargs): # pylint: disable=unused-argument """ From 289128744796b5872203821bcc8fe376c3e03a95 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 12 Feb 2024 13:13:43 -0500 Subject: [PATCH 2/2] chore: bump version to 9.5.1 --- CHANGELOG.rst | 7 +++++++ openedx_events/__init__.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 615dc20a..bb762d15 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,13 @@ Change Log Unreleased ---------- + +[9.5.1] - 2024-02-12 +-------------------- +Changed +~~~~~~~ +* Fixed recursion error when consuming events on the same service that produced them. + [9.5.0] - 2024-02-07 -------------------- Added diff --git a/openedx_events/__init__.py b/openedx_events/__init__.py index b1f959b2..c1a3f1e7 100644 --- a/openedx_events/__init__.py +++ b/openedx_events/__init__.py @@ -5,4 +5,4 @@ more information about the project. """ -__version__ = "9.5.0" +__version__ = "9.5.1"