Skip to content

Commit

Permalink
feat: [wip] emit transaction lifecycle events
Browse files Browse the repository at this point in the history
ENT-8761
  • Loading branch information
iloveagent57 committed May 20, 2024
1 parent 55d3025 commit ae5e1ed
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 3 deletions.
91 changes: 91 additions & 0 deletions enterprise_subsidy/apps/core/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Functions for serializing and emiting Open edX event bus signals.
"""
from openedx_events.enterprise.data import LedgerTransaction, LedgerTransactionReversal
from openedx_events.enterprise.signals import (
LEDGER_TRANSACTION_CREATED,
LEDGER_TRANSACTION_COMMITTED,
LEDGER_TRANSACTION_FAILED,
LEDGER_TRANSACTION_REVERSED,
)


def serialize_transaction(transaction_record):
"""
Serializes the ``transaction_record``into a defined set of attributes
for use in the event-bus signal.
"""
reversal_data = None
if reversal_record := transaction_record.get_reversal():
reversal_data = LedgerTransactionReversal(
uuid=reversal_record.uuid,
created=reversal_record.created,
modified=reversal_record.modified,
idempotency_key=reversal_record.idempotency_key,
quantity=reversal_record.quantity,
state=reversal_record.state,
)
data = LedgerTransaction(
uuid=transaction_record.uuid,
created=transaction_record.created,
modified=transaction_record.modified,
idempotency_key=transaction_record.idempotency_key,
quantity=transaction_record.quantity,
state=transaction_record.state,
ledger_uuid=transaction_record.ledger.uuid,
subsidy_access_policy_uuid=transaction_record.subsidy_access_policy_uuid,
lms_user_id=transaction_record.lms_user_id,
content_key=transaction_record.content_key,
parent_content_key=transaction_record.parent_content_key,
fulfillment_identifier=transaction_record.fulfillment_identifier,
reversal=reversal_data,
)
return data


def send_transaction_created_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_CREATED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_CREATED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_committed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_COMMITTED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_COMMITTED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_failed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_FAILED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_FAILED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_reversed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_REVERSED open edx event for the given ``transaction_record``.
Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_REVERSED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)
7 changes: 6 additions & 1 deletion enterprise_subsidy/apps/subsidy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.api_client.lms_user import LmsUserApiClient
from enterprise_subsidy.apps.content_metadata.api import ContentMetadataApi
from enterprise_subsidy.apps.core import event_bus
from enterprise_subsidy.apps.core.utils import localized_utcnow
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler

Expand Down Expand Up @@ -370,7 +371,7 @@ def create_transaction(
openedx_ledger.api.LedgerBalanceExceeded:
Raises this if the transaction would cause the balance of the ledger to become negative.
"""
return ledger_api.create_transaction(
ledger_transaction = ledger_api.create_transaction(
ledger=self.ledger,
quantity=quantity,
idempotency_key=idempotency_key,
Expand All @@ -382,6 +383,8 @@ def create_transaction(
subsidy_access_policy_uuid=subsidy_access_policy_uuid,
**transaction_metadata,
)
event_bus.send_transaction_created_event(ledger_transaction)
return ledger_transaction

def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, external_reference=None):
"""
Expand All @@ -402,6 +405,7 @@ def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, ex
ledger_transaction.external_reference.set([external_reference])
ledger_transaction.state = TransactionStateChoices.COMMITTED
ledger_transaction.save()
event_bus.send_transaction_committed_event(ledger_transaction)

def rollback_transaction(self, ledger_transaction):
"""
Expand All @@ -410,6 +414,7 @@ def rollback_transaction(self, ledger_transaction):
logger.info(f'Setting transaction {ledger_transaction.uuid} state to failed.')
ledger_transaction.state = TransactionStateChoices.FAILED
ledger_transaction.save()
event_bus.send_transaction_failed_event(ledger_transaction)

def redeem(
self,
Expand Down
6 changes: 5 additions & 1 deletion enterprise_subsidy/apps/transaction/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from openedx_ledger.models import TransactionStateChoices

from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler
from enterprise_subsidy.apps.transaction.utils import generate_transaction_reversal_idempotency_key

Expand Down Expand Up @@ -88,7 +89,10 @@ def reverse_transaction(transaction, unenroll_time=None):
transaction.fulfillment_identifier,
unenroll_time or timezone.now(),
)
return reverse_full_transaction(
reversal = reverse_full_transaction(
transaction=transaction,
idempotency_key=idempotency_key,
)
transaction.refresh_from_db()
send_transaction_reversed_event(transaction)
return reversal
3 changes: 3 additions & 0 deletions enterprise_subsidy/apps/transaction/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.dispatch import receiver
from openedx_ledger.signals.signals import TRANSACTION_REVERSED

from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event

from ..api import cancel_transaction_external_fulfillment, cancel_transaction_fulfillment
from ..exceptions import TransactionFulfillmentCancelationException

Expand All @@ -29,6 +31,7 @@ def listen_for_transaction_reversal(sender, **kwargs):
try:
cancel_transaction_external_fulfillment(transaction)
cancel_transaction_fulfillment(transaction)
send_transaction_reversed_event(transaction)
except TransactionFulfillmentCancelationException as exc:
error_msg = f"Error canceling platform fulfillment {transaction.fulfillment_identifier}: {exc}"
logger.exception(error_msg)
Expand Down
44 changes: 44 additions & 0 deletions enterprise_subsidy/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,47 @@ def root(*path_fragments):
# be more drift between the time of allocation and the time of redemption.
ALLOCATION_PRICE_VALIDATION_LOWER_BOUND_RATIO = .80
ALLOCATION_PRICE_VALIDATION_UPPER_BOUND_RATIO = 1.20

# Kafka and event broker settings
TRANSACTION_LIFECYCLE_TOPIC = "enterprise-subsidies-transaction-lifecycle"
TRANSACTION_CREATED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.created.v1"
TRANSACTION_COMMITTED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.committed.v1"
TRANSACTION_FAILED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.failed.v1"
TRANSACTION_REVERSED_EVENT_NAME = "org.openedx.enterprise_subsidies.ledger_transaction.reversed.v1"

# .. setting_name: EVENT_BUS_PRODUCER_CONFIG
# .. setting_default: all events disabled
# .. setting_description: Dictionary of event_types mapped to dictionaries of topic to topic-related configuration.
# Each topic configuration dictionary contains
# * `enabled`: a toggle denoting whether the event will be published to the topic. These should be annotated
# according to
# https://edx.readthedocs.io/projects/edx-toggles/en/latest/how_to/documenting_new_feature_toggles.html
# * `event_key_field` which is a period-delimited string path to event data field to use as event key.
# Note: The topic names should not include environment prefix as it will be dynamically added based on
# EVENT_BUS_TOPIC_PREFIX setting.
EVENT_BUS_PRODUCER_CONFIG = {
TRANSACTION_CREATED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_COMMITTED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_FAILED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
TRANSACTION_REVERSED_EVENT_NAME: {
TRANSACTION_LIFECYCLE_TOPIC: {
'event_key_field': 'ledger_transaction.uuid',
'enabled': False,
},
},
}
12 changes: 11 additions & 1 deletion enterprise_subsidy/settings/devstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@
EVENT_BUS_CONSUMER = 'edx_event_bus_kafka.KafkaEventConsumer'
EVENT_BUS_TOPIC_PREFIX = 'dev'

# Application settings go here...
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_CREATED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_COMMITTED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_FAILED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True
EVENT_BUS_PRODUCER_CONFIG[TRANSACTION_REVERSED_EVENT_NAME][TRANSACTION_LIFECYCLE_TOPIC]['enabled'] = True

# Private settings
# The local.py settings file also does this, but then this current file (devstack.py)
# imports *from* local.py, so anything earlier in this file overrides what's in private.py
# We want private.py to have the highest precedence, so re-import private settings again here.
if os.path.isfile(join(dirname(abspath(__file__)), 'private.py')):
from .private import * # pylint: disable=import-error

0 comments on commit ae5e1ed

Please sign in to comment.