diff --git a/eventtracking/django/apps.py b/eventtracking/django/apps.py index c2fc320a..47d27fcf 100644 --- a/eventtracking/django/apps.py +++ b/eventtracking/django/apps.py @@ -21,3 +21,5 @@ def ready(self): # pylint: disable=import-outside-toplevel from eventtracking.django.django_tracker import override_default_tracker override_default_tracker() + + from eventtracking import handlers # noqa: F401 diff --git a/eventtracking/handlers.py b/eventtracking/handlers.py new file mode 100644 index 00000000..ac85f991 --- /dev/null +++ b/eventtracking/handlers.py @@ -0,0 +1,60 @@ +""" +This module contains the handlers for signals emitted by the analytics app. +""" +import json +import logging + +from django.dispatch import receiver +from eventtracking.backends.event_bus import EventBusRoutingBackend +from eventtracking.processors.exceptions import EventEmissionExit +from eventtracking.tracker import get_tracker +from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED +from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS + +logger = logging.getLogger(__name__) + + +@receiver(TRACKING_EVENT_EMITTED) +def send_tracking_log_to_backends( + sender, signal, **kwargs +): # pylint: disable=unused-argument + """ + Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends. + + The process is the following: + + 1. Unserialize the tracking log from the signal. + 2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc). + 3. Get the event bus backends that are the interested in the signals (multiple can be configured). + 4. Transform the event with the configured processors. + 5. Send the transformed event to the different event bus backends. + + This allows us to only send the tracking log to the event bus once and the event bus will send + the transformed event to the different configured backends. + """ + if not kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS, False): + logger.debug("Event received from a non-event bus backend, skipping...") + return + tracking_log = kwargs.get("tracking_log") + + event = { + "name": tracking_log.name, + "timestamp": tracking_log.timestamp, + "data": json.loads(tracking_log.data), + "context": json.loads(tracking_log.context), + } + + tracker = get_tracker() + + engines = { + name: engine + for name, engine in tracker.backends.items() + if isinstance(engine, EventBusRoutingBackend) + } + for name, engine in engines.items(): + try: + processed_event = engine.process_event(event) + logger.info('Successfully processed event "{}"'.format(event["name"])) + engine.send_to_backends(processed_event.copy()) + except EventEmissionExit: + logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) diff --git a/eventtracking/tests/test_handlers.py b/eventtracking/tests/test_handlers.py new file mode 100644 index 00000000..367c4ae9 --- /dev/null +++ b/eventtracking/tests/test_handlers.py @@ -0,0 +1,138 @@ +""" +Test handlers for signals emitted by the analytics app +""" + +from unittest.mock import Mock, patch + +from django.test import TestCase +from django.test.utils import override_settings +from eventtracking.django.django_tracker import DjangoTracker +from eventtracking.handlers import send_tracking_log_to_backends +from openedx_events.analytics.data import TrackingLogData +from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS + + +class TestHandlers(TestCase): + """ + Tests handlers for signals emitted by the analytics app + """ + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": {}, + }, + } + ) + @patch("eventtracking.handlers.get_tracker") + def test_send_tracking_log_to_backends( + self, mock_get_tracker + ): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["event_bus"].send_to_backends = mock_backend + kwargs = { + SIGNAL_PROCESSED_FROM_EVENT_BUS: True, + } + + send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ), + **kwargs + ) + + mock_backend.assert_called_once_with( + { + "name": "test_name", + "timestamp": "test_timestamp", + "data": {}, + "context": {}, + } + ) + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "processors": [ + { + "ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor", + "OPTIONS": { + "whitelist": ["no_test_name"] + } + } + ], + }, + }, + } + ) + @patch("eventtracking.handlers.get_tracker") + @patch("eventtracking.handlers.isinstance") + @patch("eventtracking.handlers.logger") + def test_send_tracking_log_to_backends_error( + self, mock_logger, mock_is_instance, mock_get_tracker + ): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_is_instance.return_value = True + kwargs = { + SIGNAL_PROCESSED_FROM_EVENT_BUS: True, + } + + x = send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ), + **kwargs + ) + + assert x is None + + mock_logger.info.assert_called_once_with( + "[EventEmissionExit] skipping event {}".format("test_name") + ) + + @patch("eventtracking.handlers.logger") + def test_send_tracking_log_to_backends_in_same_runtime( + self, mock_logger + ): + """ + Test for send_tracking_log_to_backends + """ + + x = send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ) + ) + + assert x is None + + mock_logger.debug.assert_called_once_with( + "Event received from a non-event bus backend, skipping..." + )