diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index 8708c9cd3..337cadcff 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -71,20 +71,21 @@ func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context, return l, nil } -func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error { +// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate along with an error if any failures occur +func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) (bool, error) { existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) if err != nil { - return err + return false, err } if existing != nil { log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID) // Return the ID of the existing event chainEvent.ID = existing.ID - return nil + return false, nil } topic := em.getTopicForChainListener(listener) ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic) - return em.database.InsertEvent(ctx, ffEvent) + return true, em.database.InsertEvent(ctx, ffEvent) } func (em *eventManager) getChainListenerCached(cacheKey string, getter func() (*core.ContractListener, error)) (*core.ContractListener, error) { diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index 67c293a5c..4823eff53 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -151,6 +151,7 @@ func TestContractEventWrongNS(t *testing.T) { } +// TODO: Add test case for event not existing func TestPersistBlockchainEventDuplicate(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) @@ -173,9 +174,10 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) { em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev). Return(&core.BlockchainEvent{ID: existingID}, nil) - err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) + created, err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) assert.NoError(t, err) assert.Equal(t, existingID, ev.ID) + assert.False(t, created) } diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index f634108c4..ac06c88c0 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -681,7 +681,7 @@ func TestEventFilterOnSubscriptionMatchesEventType(t *testing.T) { filteredEvents, _ = em.FilterHistoricalEventsOnSubscription(context.Background(), events, subscription) assert.NotNil(t, filteredEvents) assert.Equal(t, 1, len(filteredEvents)) - + listenerUuid := fftypes.NewUUID() events[0].Event.Topic = "" diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index dec69f446..2fb4c1ea2 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -62,10 +62,13 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *core.TokenPool, e Type: pool.TX.Type, BlockchainID: blockchainID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return err } - em.emitBlockchainEventMetric(ev) + if created { + em.emitBlockchainEventMetric(ev) + } } if _, err := em.txHelper.PersistTransaction(ctx, pool.TX.ID, pool.TX.Type, blockchainID); err != nil { return err diff --git a/internal/events/tokens_approved.go b/internal/events/tokens_approved.go index f93b71968..272641f54 100644 --- a/internal/events/tokens_approved.go +++ b/internal/events/tokens_approved.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -97,10 +97,13 @@ func (em *eventManager) persistTokenApproval(ctx context.Context, approval *toke Type: approval.TX.Type, BlockchainID: approval.Event.BlockchainTXID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return false, err } - em.emitBlockchainEventMetric(approval.Event) + if created { + em.emitBlockchainEventMetric(approval.Event) + } approval.BlockchainEvent = chainEvent.ID fb := database.TokenApprovalQueryFactory.NewFilter(ctx) diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 74cccc89d..c4048611f 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -89,10 +89,13 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke Type: transfer.TX.Type, BlockchainID: transfer.Event.BlockchainTXID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return false, err } - em.emitBlockchainEventMetric(transfer.Event) + if created { + em.emitBlockchainEventMetric(transfer.Event) + } transfer.BlockchainEvent = chainEvent.ID // This is a no-op if we've already persisted this token transfer