Skip to content

Commit

Permalink
feat: refactor EventsRouter bettween sync and async router
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 16, 2024
1 parent 2363d2d commit 9d5e704
Show file tree
Hide file tree
Showing 16 changed files with 1,003 additions and 391 deletions.
49 changes: 49 additions & 0 deletions event_routing_backends/backends/async_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Events router to send events to hosts via celery.
This events router will trigger a celery task to send the events to the
configured hosts.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent


class AsyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event.delay(event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_bulk_events.delay(events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event_persistent.delay(event_name, updated_event, router_type, host_configurations)
46 changes: 40 additions & 6 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,7 +137,7 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
self.dispatch_bulk_events(
prepared_events,
host['router_type'],
host['host_configurations']
Expand All @@ -160,18 +159,18 @@ def send(self, event):
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)
else:
dispatch_event.delay(
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)

def process_event(self, event):
Expand Down Expand Up @@ -215,3 +214,38 @@ def overwrite_event_data(self, event, host, event_name):
host['override_args']
))
return event

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event is not implemented')

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_bulk_events is not implemented')

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event_persistent is not implemented')
51 changes: 51 additions & 0 deletions event_routing_backends/backends/sync_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Events router to send events to hosts in sync mode.
This router is expected to be used with the event bus, which
can be configured to use this router to send events to hosts
in the same thread as it process the events.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import bulk_send_events, send_event


class SyncEventsRouter(EventsRouter):
"""
Router to send events to hosts using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
send_event(None, event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
bulk_send_events(None, events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
In this case, the event bus is expected to provide the persistent storage layer.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
self.dispatch_event(event_name, updated_event, router_type, host_configurations)
Loading

0 comments on commit 9d5e704

Please sign in to comment.