diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ca71c17f..27218ad1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,10 @@ Change Log Unreleased ~~~~~~~~~~ +[8.3.0] + +* Allow to use any configured engine to replay tracking logs + [8.2.0] * Add support for batching for EventsRouter. diff --git a/event_routing_backends/__init__.py b/event_routing_backends/__init__.py index f2df879a..75ba1222 100644 --- a/event_routing_backends/__init__.py +++ b/event_routing_backends/__init__.py @@ -2,4 +2,4 @@ Various backends for receiving edX LMS events.. """ -__version__ = '8.2.0' +__version__ = '8.3.0' diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index d6ddf5c0..f8282982 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -7,11 +7,9 @@ from io import BytesIO from time import sleep -from event_routing_backends.backends.events_router import EventsRouter +from eventtracking.tracker import get_tracker + from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event -from event_routing_backends.models import RouterConfiguration -from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor -from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor class QueuedSender: @@ -43,23 +41,15 @@ def __init__( self.unparsable_lines = 0 self.batches_sent = 0 - if self.transformer_type == "xapi": - self.router = EventsRouter( - backend_name=RouterConfiguration.XAPI_BACKEND, - processors=[XApiProcessor()] - ) - else: - self.router = EventsRouter( - backend_name=RouterConfiguration.CALIPER_BACKEND, - processors=[CaliperProcessor()] - ) + self.tracker = get_tracker() + self.engine = self.tracker.backends[self.transformer_type] def is_known_event(self, event): """ Check whether any processor cares about this event. """ if "name" in event: - for processor in self.router.processors: + for processor in self.engine.processors: if event["name"] in processor.registry.mapping: return True return False @@ -108,7 +98,8 @@ def send(self): """ if self.destination == "LRS": print(f"Sending {len(self.event_queue)} events to LRS...") - self.router.bulk_send(self.event_queue) + for backend in self.engine.backends.values(): + backend.bulk_send(self.event_queue) else: print("Skipping send, we're storing with libcloud instead of an LRS.") @@ -133,7 +124,7 @@ def store(self): out = BytesIO() for event in self.event_queue: - transformed_event = self.router.processors[0](event) + transformed_event = self.engine.processors[0](event) out.write(str.encode(json.dumps(transformed_event))) out.write(str.encode("\n")) out.seek(0) diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index 11bc6c89..057714e7 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -7,9 +7,15 @@ import pytest from django.core.management import call_command +from django.test.utils import override_settings +from eventtracking.backends.async_routing import AsyncRoutingBackend +from eventtracking.backends.event_bus import EventBusRoutingBackend +from eventtracking.django.django_tracker import override_default_tracker +from eventtracking.tracker import get_tracker from libcloud.storage.types import ContainerDoesNotExistError import event_routing_backends.management.commands.transform_tracking_logs as transform_tracking_logs +from event_routing_backends.backends.events_router import EventsRouter from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender from event_routing_backends.management.commands.transform_tracking_logs import ( _get_chunks, @@ -19,6 +25,11 @@ validate_source_and_files, ) +override_default_tracker() + +tracker = get_tracker() + + LOCAL_CONFIG = json.dumps({"key": "/openedx/", "container": "data", "prefix": ""}) REMOTE_CONFIG = json.dumps({ "key": "api key", @@ -37,11 +48,9 @@ def mock_common_calls(): Mock out calls that we test elsewhere and aren't relevant to the command tests. """ command_path = "event_routing_backends.management.commands.transform_tracking_logs" - helper_path = "event_routing_backends.management.commands.helpers" with patch(command_path+".Provider") as mock_libcloud_provider: with patch(command_path+".get_driver") as mock_libcloud_get_driver: - with patch(helper_path + ".queued_sender.EventsRouter") as mock_eventsrouter: - yield mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter + yield mock_libcloud_provider, mock_libcloud_get_driver def command_options(): @@ -180,7 +189,7 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys): """ Test the command and QueuedSender with a variety of options. """ - mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls expected_results = command_opts.pop("expected_results") transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2) @@ -202,16 +211,15 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys): mm2.registry.mapping = {"problem_check": 1} # Fake a process response that can be serialized to json mm2.return_value = {"foo": "bar"} - mock_eventsrouter.return_value.processors = [mm2] + tracker.backends["xapi"].processors = [mm2] + for backend in tracker.backends["xapi"].backends.values(): + backend.bulk_send = MagicMock() call_command( 'transform_tracking_logs', **command_opts ) - # Router should only be set up once - assert mock_eventsrouter.call_count == 1 - captured = capsys.readouterr() print(captured.out) @@ -290,7 +298,7 @@ def test_invalid_libcloud_source_driver(capsys, mock_common_calls): """ Check error cases when non-existent libcloud drivers are passed in. """ - mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls mock_libcloud_get_driver.side_effect = [AttributeError(), MagicMock()] @@ -303,7 +311,7 @@ def test_invalid_libcloud_source_driver(capsys, mock_common_calls): def test_invalid_libcloud_dest_driver(capsys, mock_common_calls): - mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls mock_libcloud_get_driver.side_effect = [MagicMock(), AttributeError()] with pytest.raises(AttributeError): diff --git a/test_settings.py b/test_settings.py index 0bc4d976..132478f4 100644 --- a/test_settings.py +++ b/test_settings.py @@ -49,5 +49,46 @@ def root(*args): XAPI_AGENT_IFI_TYPE = 'external_id' EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 100 - +EVENT_TRACKING_ENABLED = True +EVENT_TRACKING_BACKENDS = { + "xapi": { + "ENGINE": "eventtracking.backends.async_routing.AsyncRoutingBackend", + "OPTIONS": { + "backends": { + "xapi": { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", + "OPTIONS": {}, + } + ], + "backend_name": "xapi", + }, + }, + }, + }, + }, + "caliper": { + "ENGINE": "eventtracking.backends.async_routing.AsyncRoutingBackend", + "OPTIONS": { + "backends": { + "caliper": { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors." + "caliper.transformer_processor.CaliperProcessor", + "OPTIONS": {}, + }, + ], + "backend_name": "caliper", + }, + }, + } + }, + }, +} _mock_third_party_modules()