From b0ad6f2321d3ca2ac2cc1af11645e631ebca02ab Mon Sep 17 00:00:00 2001 From: Adam Saghy Date: Tue, 16 Apr 2024 16:28:39 +0200 Subject: [PATCH] FINERACT-1971: Fix marking external events sent in parallel --- .../event/external/jobs/SendAsynchronousEventsTasklet.java | 5 +++++ .../event/external/repository/ExternalEventRepository.java | 2 ++ 2 files changed, 7 insertions(+) diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java index 7277cfae645..91746f13ba7 100644 --- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java +++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java @@ -34,7 +34,9 @@ import org.apache.fineract.avro.MessageV1; import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService; import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.apache.fineract.infrastructure.core.domain.FineractContext; import org.apache.fineract.infrastructure.core.service.DateUtils; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer; import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository; import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus; @@ -104,11 +106,14 @@ private void markEventsAsSent(List eventIds) { // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize(); List> partitions = Lists.partition(eventIds, partitionSize); + FineractContext context = ThreadLocalContextUtil.getContext(); partitions.stream() // .parallel() // .forEach(partitionedEventIds -> { measure(() -> { + ThreadLocalContextUtil.init(context); repository.markEventsSent(partitionedEventIds, sentAt); + ThreadLocalContextUtil.reset(); }, timeTaken -> { log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size()); }); diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java index 9f7ccbddce8..49f772b1b50 100644 --- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java +++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java @@ -18,6 +18,7 @@ */ package org.apache.fineract.infrastructure.event.external.repository; +import jakarta.transaction.Transactional; import java.time.LocalDate; import java.time.OffsetDateTime; import java.util.List; @@ -40,6 +41,7 @@ public interface ExternalEventRepository extends JpaRepository ids, @Param("sentAt") OffsetDateTime sentAt);