Skip to content

Commit

Permalink
feat: let consume run throw to let message broker handle failures
Browse files Browse the repository at this point in the history
Signed-off-by: Joris Mancini <joris.mancini_externe@rte-france.com>
  • Loading branch information
TheMaskedTurtle committed Dec 11, 2024
1 parent 978c569 commit ed6c931
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@
<artifactId>spring-cloud-stream</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.ws.commons.computation;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
public class ComputationException extends RuntimeException {
public ComputationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.powsybl.iidm.network.VariantManagerConstants;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;
import com.powsybl.ws.commons.computation.ComputationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,10 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -147,10 +145,9 @@ public Consumer<Message<String>> consumeRun() {
Thread.currentThread().interrupt();
} catch (Exception e) {
if (!(e instanceof CancellationException)) {
LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e);
publishFail(resultContext, e.getMessage());
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
throw new ComputationException(NotificationService.getFailedMessage(getComputationType()), e);
}
} finally {
clean(resultContext);
Expand Down Expand Up @@ -192,11 +189,6 @@ protected void sendResultMessage(AbstractResultContext<C> resultContext, R ignor
resultContext.getRunContext().getUserId(), null);
}

protected void publishFail(AbstractResultContext<C> resultContext, String message) {
notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(),
message, resultContext.getRunContext().getUserId(), getComputationType(), null);
}

/**
* Do some extra task before running the computation, e.g. print log or init extra data for the run context
* @param ignoredRunContext This context may be used for further computation in overriding classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.Map;
import java.util.UUID;

import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage;

/**
* @author Etienne Homer <etienne.homer at rte-france.com
*/
Expand Down Expand Up @@ -94,21 +92,6 @@ public void publishStop(UUID resultUuid, String receiver, String computationLabe
publisher.send(publishPrefix + "Stopped-out-0", message);
}

@PostCompletion
public void publishFail(UUID resultUuid, String receiver, String causeMessage, String userId, String computationLabel, Map<String, Object> additionalHeaders) {
MessageBuilder<String> builder = MessageBuilder
.withPayload("")
.setHeader(HEADER_RESULT_UUID, resultUuid.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_MESSAGE, shortenMessage(
getFailedMessage(computationLabel) + " : " + causeMessage))
.setHeader(HEADER_USER_ID, userId)
.copyHeaders(additionalHeaders);
Message<String> message = builder.build();
FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message);
publisher.send(publishPrefix + "Failed-out-0", message);
}

@PostCompletion
public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) {
Message<String> message = MessageBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -257,10 +258,7 @@ void testComputationFailed() {
runContext.setComputationResWanted(ComputationResultWanted.FAIL);

// execution / cleaning
workerService.consumeRun().accept(message);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class));
assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message));
}

@Test
Expand Down

0 comments on commit ed6c931

Please sign in to comment.