diff --git a/enterprise_subsidy/apps/core/event_bus.py b/enterprise_subsidy/apps/core/event_bus.py new file mode 100644 index 00000000..51ea1b5f --- /dev/null +++ b/enterprise_subsidy/apps/core/event_bus.py @@ -0,0 +1,91 @@ +""" +Functions for serializing and emiting Open edX event bus signals. +""" +from openedx_events.enterprise.data import LedgerTransaction, LedgerTransactionReversal +from openedx_events.enterprise.signals import ( + LEDGER_TRANSACTION_CREATED, + LEDGER_TRANSACTION_COMMITTED, + LEDGER_TRANSACTION_FAILED, + LEDGER_TRANSACTION_REVERSED, +) + + +def serialize_transaction(transaction_record): + """ + Serializes the ``transaction_record``into a defined set of attributes + for use in the event-bus signal. + """ + reversal_data = None + if reversal_record := transaction_record.get_reversal(): + reversal_data = LedgerTransactionReversal( + uuid=reversal_record.uuid, + created=reversal_record.created, + modified=reversal_record.modified, + idempotency_key=reversal_record.idempotency_key, + quantity=reversal_record.quantity, + state=reversal_record.state, + ) + data = LedgerTransaction( + uuid=transaction_record.uuid, + created=transaction_record.created, + modified=transaction_record.modified, + idempotency_key=transaction_record.idempotency_key, + quantity=transaction_record.quantity, + state=transaction_record.state, + ledger_uuid=transaction_record.ledger.uuid, + subsidy_access_policy_uuid=transaction_record.subsidy_access_policy_uuid, + lms_user_id=transaction_record.lms_user_id, + content_key=transaction_record.content_key, + parent_content_key=transaction_record.parent_content_key, + fulfillment_identifier=transaction_record.fulfillment_identifier, + reversal=reversal_data, + ) + return data + + +def send_transaction_created_event(transaction_record): + """ + Sends the LEDGER_TRANSACTION_CREATED open edx event for the given ``transaction_record``. + + Parameters: + transaction_record (openedx_ledger.models.Transaction): A transaction record. + """ + LEDGER_TRANSACTION_CREATED.send_event( + ledger_transaction=serialize_transaction(transaction_record), + ) + + +def send_transaction_committed_event(transaction_record): + """ + Sends the LEDGER_TRANSACTION_COMMITTED open edx event for the given ``transaction_record``. + + Parameters: + transaction_record (openedx_ledger.models.Transaction): A transaction record. + """ + LEDGER_TRANSACTION_COMMITTED.send_event( + ledger_transaction=serialize_transaction(transaction_record), + ) + + +def send_transaction_failed_event(transaction_record): + """ + Sends the LEDGER_TRANSACTION_FAILED open edx event for the given ``transaction_record``. + + Parameters: + transaction_record (openedx_ledger.models.Transaction): A transaction record. + """ + LEDGER_TRANSACTION_FAILED.send_event( + ledger_transaction=serialize_transaction(transaction_record), + ) + + +def send_transaction_reversed_event(transaction_record): + """ + Sends the LEDGER_TRANSACTION_REVERSED open edx event for the given ``transaction_record``. + + Parameters: + transaction_record (openedx_ledger.models.Transaction): A transaction record. + """ + LEDGER_TRANSACTION_REVERSED.send_event( + ledger_transaction=serialize_transaction(transaction_record), + ) diff --git a/enterprise_subsidy/apps/subsidy/models.py b/enterprise_subsidy/apps/subsidy/models.py index 3b2fff9a..e9d98d98 100644 --- a/enterprise_subsidy/apps/subsidy/models.py +++ b/enterprise_subsidy/apps/subsidy/models.py @@ -30,6 +30,7 @@ from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient from enterprise_subsidy.apps.api_client.lms_user import LmsUserApiClient from enterprise_subsidy.apps.content_metadata.api import ContentMetadataApi +from enterprise_subsidy.apps.core import event_bus from enterprise_subsidy.apps.core.utils import localized_utcnow from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler @@ -370,7 +371,7 @@ def create_transaction( openedx_ledger.api.LedgerBalanceExceeded: Raises this if the transaction would cause the balance of the ledger to become negative. """ - return ledger_api.create_transaction( + ledger_transaction = ledger_api.create_transaction( ledger=self.ledger, quantity=quantity, idempotency_key=idempotency_key, @@ -382,6 +383,8 @@ def create_transaction( subsidy_access_policy_uuid=subsidy_access_policy_uuid, **transaction_metadata, ) + event_bus.send_transaction_created_event(ledger_transaction) + return ledger_transaction def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, external_reference=None): """ @@ -402,6 +405,7 @@ def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, ex ledger_transaction.external_reference.set([external_reference]) ledger_transaction.state = TransactionStateChoices.COMMITTED ledger_transaction.save() + event_bus.send_transaction_committed_event(ledger_transaction) def rollback_transaction(self, ledger_transaction): """ @@ -410,6 +414,7 @@ def rollback_transaction(self, ledger_transaction): logger.info(f'Setting transaction {ledger_transaction.uuid} state to failed.') ledger_transaction.state = TransactionStateChoices.FAILED ledger_transaction.save() + event_bus.send_transaction_failed_event(ledger_transaction) def redeem( self, diff --git a/enterprise_subsidy/apps/transaction/api.py b/enterprise_subsidy/apps/transaction/api.py index 26dae08e..a877566b 100644 --- a/enterprise_subsidy/apps/transaction/api.py +++ b/enterprise_subsidy/apps/transaction/api.py @@ -9,6 +9,7 @@ from openedx_ledger.models import TransactionStateChoices from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient +from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler from enterprise_subsidy.apps.transaction.utils import generate_transaction_reversal_idempotency_key @@ -88,7 +89,10 @@ def reverse_transaction(transaction, unenroll_time=None): transaction.fulfillment_identifier, unenroll_time or timezone.now(), ) - return reverse_full_transaction( + reversal = reverse_full_transaction( transaction=transaction, idempotency_key=idempotency_key, ) + transaction.refresh_from_db() + send_transaction_reversed_event(transaction) + return reversal diff --git a/enterprise_subsidy/apps/transaction/signals/handlers.py b/enterprise_subsidy/apps/transaction/signals/handlers.py index 13644311..0ceb9e8c 100644 --- a/enterprise_subsidy/apps/transaction/signals/handlers.py +++ b/enterprise_subsidy/apps/transaction/signals/handlers.py @@ -6,6 +6,8 @@ from django.dispatch import receiver from openedx_ledger.signals.signals import TRANSACTION_REVERSED +from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event + from ..api import cancel_transaction_external_fulfillment, cancel_transaction_fulfillment from ..exceptions import TransactionFulfillmentCancelationException @@ -29,6 +31,7 @@ def listen_for_transaction_reversal(sender, **kwargs): try: cancel_transaction_external_fulfillment(transaction) cancel_transaction_fulfillment(transaction) + send_transaction_reversed_event(transaction) except TransactionFulfillmentCancelationException as exc: error_msg = f"Error canceling platform fulfillment {transaction.fulfillment_identifier}: {exc}" logger.exception(error_msg) diff --git a/enterprise_subsidy/settings/base.py b/enterprise_subsidy/settings/base.py index 27feb89e..d923794c 100644 --- a/enterprise_subsidy/settings/base.py +++ b/enterprise_subsidy/settings/base.py @@ -377,3 +377,47 @@ def root(*path_fragments): # be more drift between the time of allocation and the time of redemption. ALLOCATION_PRICE_VALIDATION_LOWER_BOUND_RATIO = .80 ALLOCATION_PRICE_VALIDATION_UPPER_BOUND_RATIO = 1.20 + +# Kafka and event broker settings +TRANSACTION_LIFECYCLE_TOPIC = "enterprise-subsidies-transaction-lifecycle" +TRANSACTION_CREATED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.created.v1" +TRANSACTION_COMMITTED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.committed.v1" +TRANSACTION_FAILED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.failed.v1" +TRANSACTION_REVERSED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.reversed.v1" + +# .. setting_name: EVENT_BUS_PRODUCER_CONFIG +# .. setting_default: all events disabled +# .. setting_description: Dictionary of event_types mapped to dictionaries of topic to topic-related configuration. +# Each topic configuration dictionary contains +# * `enabled`: a toggle denoting whether the event will be published to the topic. These should be annotated +# according to +# https://edx.readthedocs.io/projects/edx-toggles/en/latest/how_to/documenting_new_feature_toggles.html +# * `event_key_field` which is a period-delimited string path to event data field to use as event key. +# Note: The topic names should not include environment prefix as it will be dynamically added based on +# EVENT_BUS_TOPIC_PREFIX setting. +EVENT_BUS_PRODUCER_CONFIG = { + TRANSACTION_CREATED_EVENT_NAME: { + TRANSACTION_LIFECYCLE_TOPIC: { + 'event_key_field': 'ledger_transaction.uuid', + 'enabled': False, + }, + }, + TRANSACTION_COMMITTED_EVENT_NAME: { + TRANSACTION_LIFECYCLE_TOPIC: { + 'event_key_field': 'ledger_transaction.uuid', + 'enabled': False, + }, + }, + TRANSACTION_FAILED_EVENT_NAME: { + TRANSACTION_LIFECYCLE_TOPIC: { + 'event_key_field': 'ledger_transaction.uuid', + 'enabled': False, + }, + }, + TRANSACTION_REVERSED_EVENT_NAME: { + TRANSACTION_LIFECYCLE_TOPIC: { + 'event_key_field': 'ledger_transaction.uuid', + 'enabled': False, + }, + }, +} diff --git a/enterprise_subsidy/settings/devstack.py b/enterprise_subsidy/settings/devstack.py index 1bd7ec2e..ff98832c 100644 --- a/enterprise_subsidy/settings/devstack.py +++ b/enterprise_subsidy/settings/devstack.py @@ -90,4 +90,14 @@ EVENT_BUS_CONSUMER = 'edx_event_bus_kafka.KafkaEventConsumer' EVENT_BUS_TOPIC_PREFIX = 'dev' -# Application settings go here... +EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_CREATED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True +EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_COMMITTED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True +EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_FAILED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True +EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_REVERSED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True + +# Private settings +# The local.py settings file also does this, but then this current file (devstack.py) +# imports *from* local.py, so anything earlier in this file overrides what's in private.py +# We want private.py to have the highest precedence, so re-import private settings again here. +if os.path.isfile(join(dirname(abspath(__file__)), 'private.py')): + from .private import * # pylint: disable=import-error