From 8a1f301d791a80b8a68d27bb0e50327c67f87b00 Mon Sep 17 00:00:00 2001 From: Joris Mancini <53527338+TheMaskedTurtle@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:44:59 +0100 Subject: [PATCH 1/3] feat: let consumeRun throw to let message broker handle failures (#91) Signed-off-by: Joris Mancini --- .../computation/ComputationException.java | 20 ++++ .../service/AbstractWorkerService.java | 28 ++---- .../service/NotificationService.java | 17 ---- .../computation/ComputationExceptionTest.java | 31 +++++++ .../commons/computation/ComputationTest.java | 91 ++++++++++++------- 5 files changed, 120 insertions(+), 67 deletions(-) create mode 100644 src/main/java/com/powsybl/ws/commons/computation/ComputationException.java create mode 100644 src/test/java/com/powsybl/ws/commons/computation/ComputationExceptionTest.java diff --git a/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java new file mode 100644 index 0000000..ef7e3e1 --- /dev/null +++ b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java @@ -0,0 +1,20 @@ +/** + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.ws.commons.computation; + +/** + * @author Joris Mancini + */ +public class ComputationException extends RuntimeException { + public ComputationException(String message) { + super(message); + } + + public ComputationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java index 4787ad4..04128f3 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java @@ -13,6 +13,7 @@ import com.powsybl.iidm.network.VariantManagerConstants; import com.powsybl.network.store.client.NetworkStoreService; import com.powsybl.network.store.client.PreloadingStrategy; +import com.powsybl.ws.commons.computation.ComputationException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,10 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -143,15 +141,12 @@ public Consumer> consumeRun() { sendResultMessage(resultContext, result); LOGGER.info("{} complete (resultUuid='{}')", getComputationType(), resultContext.getResultUuid()); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (CancellationException e) { + // Do nothing } catch (Exception e) { - if (!(e instanceof CancellationException)) { - LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e); - publishFail(resultContext, e.getMessage()); - resultService.delete(resultContext.getResultUuid()); - this.handleNonCancellationException(resultContext, e, rootReporter); - } + resultService.delete(resultContext.getResultUuid()); + this.handleNonCancellationException(resultContext, e, rootReporter); + throw new ComputationException(String.format("%s: %s", NotificationService.getFailedMessage(getComputationType()), e.getMessage()), e.getCause()); } finally { clean(resultContext); } @@ -192,11 +187,6 @@ protected void sendResultMessage(AbstractResultContext resultContext, R ignor resultContext.getRunContext().getUserId(), null); } - protected void publishFail(AbstractResultContext resultContext, String message) { - notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), - message, resultContext.getRunContext().getUserId(), getComputationType(), null); - } - /** * Do some extra task before running the computation, e.g. print log or init extra data for the run context * @param ignoredRunContext This context may be used for further computation in overriding classes @@ -205,7 +195,7 @@ protected void preRun(C ignoredRunContext) { LOGGER.info("Run {} computation...", getComputationType()); } - protected R run(C runContext, UUID resultUuid, AtomicReference rootReporter) throws Exception { + protected R run(C runContext, UUID resultUuid, AtomicReference rootReporter) { String provider = runContext.getProvider(); ReportNode reportNode = ReportNode.NO_OP; @@ -223,7 +213,7 @@ protected R run(C runContext, UUID resultUuid, AtomicReference rootR preRun(runContext); CompletableFuture future = runAsync(runContext, provider, resultUuid); - R result = future == null ? null : observer.observeRun("run", runContext, future::get); + R result = future == null ? null : observer.observeRun("run", runContext, future::join); postRun(runContext, rootReporter, result); return result; } diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java b/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java index 59d9a01..478b253 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java @@ -21,8 +21,6 @@ import java.util.Map; import java.util.UUID; -import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage; - /** * @author Etienne Homer additionalHeaders) { - MessageBuilder builder = MessageBuilder - .withPayload("") - .setHeader(HEADER_RESULT_UUID, resultUuid.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .setHeader(HEADER_MESSAGE, shortenMessage( - getFailedMessage(computationLabel) + " : " + causeMessage)) - .setHeader(HEADER_USER_ID, userId) - .copyHeaders(additionalHeaders); - Message message = builder.build(); - FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message); - publisher.send(publishPrefix + "Failed-out-0", message); - } - @PostCompletion public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) { Message message = MessageBuilder diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationExceptionTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationExceptionTest.java new file mode 100644 index 0000000..b890c83 --- /dev/null +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationExceptionTest.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.ws.commons.computation; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Joris Mancini + */ +class ComputationExceptionTest { + + @Test + void testMessageConstructor() { + var e = new ComputationException("test"); + assertEquals("test", e.getMessage()); + } + + @Test + void testMessageAndThrowableConstructor() { + var cause = new RuntimeException("test"); + var e = new ComputationException("test", cause); + assertEquals("test", e.getMessage()); + assertEquals(cause, e.getCause()); + } +} diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java index 8dfd238..0dc3be7 100644 --- a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java @@ -12,7 +12,6 @@ import com.powsybl.ws.commons.computation.service.AbstractComputationService; import com.powsybl.ws.commons.computation.service.AbstractResultContext; import com.powsybl.ws.commons.computation.service.AbstractWorkerService; -import com.powsybl.ws.commons.computation.service.CancelContext; import com.powsybl.ws.commons.computation.service.ExecutionService; import com.powsybl.ws.commons.computation.service.NotificationService; import com.powsybl.ws.commons.computation.service.ReportService; @@ -29,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.Message; @@ -38,12 +38,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RECEIVER; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -154,6 +155,7 @@ public UUID runAndSaveResult(MockComputationRunContext runContext) { private enum ComputationResultWanted { SUCCESS, FAIL, + CANCELLED, COMPLETED } @@ -179,6 +181,9 @@ protected String getComputationType() { protected CompletableFuture getCompletableFuture(MockComputationRunContext runContext, String provider, UUID resultUuid) { final CompletableFuture completableFuture = new CompletableFuture<>(); switch (runContext.getComputationResWanted()) { + case CANCELLED: + completableFuture.completeExceptionally(new CancellationException("Computation cancelled")); + break; case FAIL: completableFuture.completeExceptionally(new RuntimeException("Computation failed")); break; @@ -189,6 +194,10 @@ protected CompletableFuture getCompletableFuture(MockComputationRunConte } return completableFuture; } + + public void addFuture(UUID id, CompletableFuture future) { + this.futures.put(id, future); + } } private MockComputationWorkerService workerService; @@ -203,10 +212,11 @@ protected CompletableFuture getCompletableFuture(MockComputationRunConte final String provider = "MockComputation_Provider"; Message message; MockComputationRunContext runContext; + MockComputationResultService resultService; @BeforeEach void init() { - MockComputationResultService resultService = new MockComputationResultService(); + resultService = new MockComputationResultService(); notificationService = new NotificationService(publisher); workerService = new MockComputationWorkerService( networkStoreService, @@ -256,55 +266,74 @@ void testComputationFailed() { initComputationExecution(); runContext.setComputationResWanted(ComputationResultWanted.FAIL); + // execution / cleaning + assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message)); + assertNull(resultService.findStatus(RESULT_UUID)); + } + + @Test + void testStopComputationSendsCancelMessage() { + computationService.stop(RESULT_UUID, receiver); + verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); + } + + @Test + void testComputationCancelledInConsumeRun() { + // inits + initComputationExecution(); + runContext.setComputationResWanted(ComputationResultWanted.CANCELLED); + // execution / cleaning workerService.consumeRun().accept(message); // test the course - verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class)); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class)); } @Test - void testComputationCancelled() { - MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; + void testComputationCancelledInConsumeCancel() { + MockComputationStatus baseStatus = MockComputationStatus.RUNNING; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - computationService.stop(RESULT_UUID, receiver); - - // test the course - verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); + CompletableFuture futureThatCouldBeCancelled = Mockito.mock(CompletableFuture.class); + when(futureThatCouldBeCancelled.cancel(true)).thenReturn(true); + workerService.addFuture(RESULT_UUID, futureThatCouldBeCancelled); - Message cancelMessage = MessageBuilder.withPayload("") - .setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .build(); - CancelContext cancelContext = CancelContext.fromMessage(cancelMessage); - assertEquals(RESULT_UUID, cancelContext.resultUuid()); - assertEquals(receiver, cancelContext.receiver()); + workerService.consumeCancel().accept(message); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); } @Test void testComputationCancelFailed() { - MockComputationStatus baseStatus = MockComputationStatus.COMPLETED; + MockComputationStatus baseStatus = MockComputationStatus.RUNNING; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - computationService.stop(RESULT_UUID, receiver, userId); - - // test the course - verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); + CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); + when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false); + workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); - Message cancelMessage = MessageBuilder.withPayload("") - .setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .setHeader(HEADER_USER_ID, userId) - .build(); - CancelContext cancelContext = CancelContext.fromMessage(cancelMessage); - assertEquals(RESULT_UUID, cancelContext.resultUuid()); - assertEquals(receiver, cancelContext.receiver()); - assertEquals(userId, cancelContext.userId()); + workerService.consumeCancel().accept(message); + assertNotNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); + } + @Test + void testComputationCancelFailsIfNoMatchingFuture() { workerService.consumeCancel().accept(message); + assertNull(resultService.findStatus(RESULT_UUID)); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); } + + @Test + void testComputationCancelledBeforeRunReturnsNoResult() { + workerService.consumeCancel().accept(message); + + initComputationExecution(); + workerService.consumeRun().accept(message); + verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class)); + } } From 9bfe746ef1537c0ee7292ad2439a661731b768c8 Mon Sep 17 00:00:00 2001 From: Antoine Bouhours Date: Fri, 20 Dec 2024 11:01:05 +0100 Subject: [PATCH 2/3] Release v1.19.0 (#92) Signed-off-by: BOUHOURS Antoine --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 550ac75..19eb406 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ powsybl-ws-commons - 1.19.0-SNAPSHOT + 1.19.0 powsybl ws commons Powsybl WS commons From 59e62cc63dacfbb20f5e40ed86c7bd108c524107 Mon Sep 17 00:00:00 2001 From: Antoine Bouhours Date: Fri, 20 Dec 2024 11:24:36 +0100 Subject: [PATCH 3/3] Bump to v1.20.0-SNAPSHOT (#93) Signed-off-by: BOUHOURS Antoine --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 19eb406..4cf00fd 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ powsybl-ws-commons - 1.19.0 + 1.20.0-SNAPSHOT powsybl ws commons Powsybl WS commons