Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit transaction lifecycle events #244

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions enterprise_subsidy/apps/core/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Functions for serializing and emiting Open edX event bus signals.
"""
from openedx_events.enterprise.data import LedgerTransaction, LedgerTransactionReversal
from openedx_events.enterprise.signals import (
LEDGER_TRANSACTION_COMMITTED,
LEDGER_TRANSACTION_CREATED,
LEDGER_TRANSACTION_FAILED,
LEDGER_TRANSACTION_REVERSED
)


def serialize_transaction(transaction_record):
"""
Serializes the ``transaction_record``into a defined set of attributes
for use in the event-bus signal.
"""
reversal_data = None
if reversal_record := transaction_record.get_reversal():
reversal_data = LedgerTransactionReversal(
uuid=reversal_record.uuid,
created=reversal_record.created,
modified=reversal_record.modified,
idempotency_key=reversal_record.idempotency_key,
quantity=reversal_record.quantity,
state=reversal_record.state,
)
data = LedgerTransaction(
uuid=transaction_record.uuid,
created=transaction_record.created,
modified=transaction_record.modified,
idempotency_key=transaction_record.idempotency_key,
quantity=transaction_record.quantity,
state=transaction_record.state,
ledger_uuid=transaction_record.ledger.uuid,
subsidy_access_policy_uuid=transaction_record.subsidy_access_policy_uuid,
lms_user_id=transaction_record.lms_user_id,
content_key=transaction_record.content_key,
parent_content_key=transaction_record.parent_content_key,
fulfillment_identifier=transaction_record.fulfillment_identifier,
reversal=reversal_data,
)
return data


def send_transaction_created_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_CREATED open edx event for the given ``transaction_record``.

Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_CREATED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_committed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_COMMITTED open edx event for the given ``transaction_record``.

Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_COMMITTED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_failed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_FAILED open edx event for the given ``transaction_record``.

Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_FAILED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)


def send_transaction_reversed_event(transaction_record):
"""
Sends the LEDGER_TRANSACTION_REVERSED open edx event for the given ``transaction_record``.

Parameters:
transaction_record (openedx_ledger.models.Transaction): A transaction record.
"""
LEDGER_TRANSACTION_REVERSED.send_event(
ledger_transaction=serialize_transaction(transaction_record),
)
7 changes: 6 additions & 1 deletion enterprise_subsidy/apps/subsidy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.api_client.lms_user import LmsUserApiClient
from enterprise_subsidy.apps.content_metadata.api import ContentMetadataApi
from enterprise_subsidy.apps.core import event_bus
from enterprise_subsidy.apps.core.utils import localized_utcnow
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler

Expand Down Expand Up @@ -384,7 +385,7 @@ def create_transaction(
openedx_ledger.api.LedgerBalanceExceeded:
Raises this if the transaction would cause the balance of the ledger to become negative.
"""
return ledger_api.create_transaction(
ledger_transaction = ledger_api.create_transaction(
ledger=self.ledger,
quantity=quantity,
idempotency_key=idempotency_key,
Expand All @@ -396,6 +397,8 @@ def create_transaction(
subsidy_access_policy_uuid=subsidy_access_policy_uuid,
**transaction_metadata,
)
event_bus.send_transaction_created_event(ledger_transaction)
macdiesel marked this conversation as resolved.
Show resolved Hide resolved
return ledger_transaction

def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, external_reference=None):
"""
Expand All @@ -416,6 +419,7 @@ def commit_transaction(self, ledger_transaction, fulfillment_identifier=None, ex
ledger_transaction.external_reference.set([external_reference])
ledger_transaction.state = TransactionStateChoices.COMMITTED
ledger_transaction.save()
event_bus.send_transaction_committed_event(ledger_transaction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the event bus send fails for these calls is the ledger_transaction also rolled back?

I think my overall question is this: When these event emissions fail to send, should the related DB writes also fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because then we're making the communication pattern synchronous instead of asynchronous.

Copy link
Contributor

@macdiesel macdiesel Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the event doesn't make it on to the bus, and the record is written, systems downstream of this message will not be able to act accordingly. Not having this event hit the bus means that whatever actions were supposed to happen because of this do not happen leaving the system in an inconsistent state.

I don't believe this makes it synchronous. It's more that the transaction isn't complete until the event is sent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also suggest that this does not make it synchronous either as we are simply writing that the event happened. Not waiting for downstream systems to execute on the message.

I think that writing these messages on to the bus and ensuring that call succeeds is just as important now as ensuring the DB record is written.

Copy link
Contributor Author

@iloveagent57 iloveagent57 Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronous was the wrong word, it's more like you'd be making the send of the message part of the "atomic block" of the flow.
Part of the goal of using an async event broker, though, is to move toward "eventual consistency", and force our services to adapt to inconsistent states. The ideal thing to do if kafka is down/unreachable/whatever isn't to fail the redemption flow; it's to have a mechanism for replaying events that occurred in a given time window, and to be able to rely on idempotent consumers to process those replayed events. Or to implement a dead-letter queue (here on the producer side).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, having said all that, I should go through and check if this is already called from within a transaction.atomic() block.


def rollback_transaction(self, ledger_transaction, external_transaction_reference=None):
"""
Expand Down Expand Up @@ -463,6 +467,7 @@ def rollback_transaction(self, ledger_transaction, external_transaction_referenc
logger.info('[rollback_transaction] Setting transaction %s state to failed.', ledger_transaction.uuid)
ledger_transaction.state = TransactionStateChoices.FAILED
ledger_transaction.save()
event_bus.send_transaction_failed_event(ledger_transaction)

def redeem(
self,
Expand Down
16 changes: 14 additions & 2 deletions enterprise_subsidy/apps/transaction/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from openedx_ledger.models import TransactionStateChoices

from enterprise_subsidy.apps.api_client.enterprise import EnterpriseApiClient
from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event
from enterprise_subsidy.apps.fulfillment.api import GEAGFulfillmentHandler
from enterprise_subsidy.apps.transaction.utils import generate_transaction_reversal_idempotency_key

Expand Down Expand Up @@ -72,18 +73,26 @@ def cancel_transaction_external_fulfillment(transaction):
"Transaction is not committed"
)

for external_reference in transaction.external_reference.all():
references = list(transaction.external_reference.all())
if not references:
return True

fulfillment_cancelation_successful = False
for external_reference in references:
provider_slug = external_reference.external_fulfillment_provider.slug
geag_handler = GEAGFulfillmentHandler()
if provider_slug == geag_handler.EXTERNAL_FULFILLMENT_PROVIDER_SLUG:
geag_handler.cancel_fulfillment(external_reference)
fulfillment_cancelation_successful = True
else:
logger.warning(
'[fulfillment cancelation] dont know how to cancel transaction %s with provider %s',
transaction.uuid,
provider_slug,
)

return fulfillment_cancelation_successful


def reverse_transaction(transaction, unenroll_time=None):
"""
Expand All @@ -93,7 +102,10 @@ def reverse_transaction(transaction, unenroll_time=None):
transaction.fulfillment_identifier,
unenroll_time or timezone.now(),
)
return reverse_full_transaction(
reversal = reverse_full_transaction(
transaction=transaction,
idempotency_key=idempotency_key,
)
transaction.refresh_from_db()
send_transaction_reversed_event(transaction)
return reversal
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ def handle_reversing_enterprise_course_unenrollment(self, unenrollment):
)

if not self.dry_run:
cancel_transaction_external_fulfillment(related_transaction)
reverse_transaction(related_transaction, unenroll_time=enrollment_unenrolled_at)
return 1
successfully_canceled = cancel_transaction_external_fulfillment(related_transaction)
if successfully_canceled:
reverse_transaction(related_transaction, unenroll_time=enrollment_unenrolled_at)
return 1
else:
logger.warning(
'Could not cancel external fulfillment for transaction %s, no reversal written',
related_transaction.uuid,
)
return 0
else:
logger.info(
f"{self.dry_run_prefix}Would have written Reversal record for enterprise fulfillment: "
Expand Down
3 changes: 3 additions & 0 deletions enterprise_subsidy/apps/transaction/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.dispatch import receiver
from openedx_ledger.signals.signals import TRANSACTION_REVERSED

from enterprise_subsidy.apps.core.event_bus import send_transaction_reversed_event

from ..api import cancel_transaction_external_fulfillment, cancel_transaction_fulfillment
from ..exceptions import TransactionFulfillmentCancelationException

Expand All @@ -29,6 +31,7 @@ def listen_for_transaction_reversal(sender, **kwargs):
try:
cancel_transaction_external_fulfillment(transaction)
cancel_transaction_fulfillment(transaction)
send_transaction_reversed_event(transaction)
except TransactionFulfillmentCancelationException as exc:
error_msg = f"Error canceling platform fulfillment {transaction.fulfillment_identifier}: {exc}"
logger.exception(error_msg)
Expand Down
Loading
Loading