Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into add_current_computation_count
  • Loading branch information
Slimane AMAR committed Dec 26, 2024
2 parents f424fa7 + 59e62cc commit 8699ee2
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 68 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</parent>

<artifactId>powsybl-ws-commons</artifactId>
<version>1.19.0-SNAPSHOT</version>
<version>1.20.0-SNAPSHOT</version>
<name>powsybl ws commons</name>
<description>Powsybl WS commons</description>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.ws.commons.computation;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
public class ComputationException extends RuntimeException {
public ComputationException(String message) {
super(message);
}

public ComputationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.powsybl.iidm.network.VariantManagerConstants;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;
import com.powsybl.ws.commons.computation.ComputationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,10 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -143,15 +141,12 @@ public Consumer<Message<String>> consumeRun() {
sendResultMessage(resultContext, result);
LOGGER.info("{} complete (resultUuid='{}')", getComputationType(), resultContext.getResultUuid());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
// Do nothing
} catch (Exception e) {
if (!(e instanceof CancellationException)) {
LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e);
publishFail(resultContext, e.getMessage());
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
}
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
throw new ComputationException(String.format("%s: %s", NotificationService.getFailedMessage(getComputationType()), e.getMessage()), e.getCause());
} finally {
clean(resultContext);
}
Expand Down Expand Up @@ -192,11 +187,6 @@ protected void sendResultMessage(AbstractResultContext<C> resultContext, R ignor
resultContext.getRunContext().getUserId(), null);
}

protected void publishFail(AbstractResultContext<C> resultContext, String message) {
notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(),
message, resultContext.getRunContext().getUserId(), getComputationType(), null);
}

/**
* Do some extra task before running the computation, e.g. print log or init extra data for the run context
* @param ignoredRunContext This context may be used for further computation in overriding classes
Expand All @@ -205,7 +195,7 @@ protected void preRun(C ignoredRunContext) {
LOGGER.info("Run {} computation...", getComputationType());
}

protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) throws Exception {
protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) {
String provider = runContext.getProvider();
ReportNode reportNode = ReportNode.NO_OP;

Expand All @@ -223,7 +213,7 @@ protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootR

preRun(runContext);
CompletableFuture<R> future = runAsync(runContext, provider, resultUuid);
R result = future == null ? null : observer.observeRun("run", runContext, future::get);
R result = future == null ? null : observer.observeRun("run", runContext, future::join);
postRun(runContext, rootReporter, result);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.Map;
import java.util.UUID;

import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage;

/**
* @author Etienne Homer <etienne.homer at rte-france.com
*/
Expand Down Expand Up @@ -94,21 +92,6 @@ public void publishStop(UUID resultUuid, String receiver, String computationLabe
publisher.send(publishPrefix + "Stopped-out-0", message);
}

@PostCompletion
public void publishFail(UUID resultUuid, String receiver, String causeMessage, String userId, String computationLabel, Map<String, Object> additionalHeaders) {
MessageBuilder<String> builder = MessageBuilder
.withPayload("")
.setHeader(HEADER_RESULT_UUID, resultUuid.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_MESSAGE, shortenMessage(
getFailedMessage(computationLabel) + " : " + causeMessage))
.setHeader(HEADER_USER_ID, userId)
.copyHeaders(additionalHeaders);
Message<String> message = builder.build();
FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message);
publisher.send(publishPrefix + "Failed-out-0", message);
}

@PostCompletion
public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) {
Message<String> message = MessageBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.ws.commons.computation;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
class ComputationExceptionTest {

@Test
void testMessageConstructor() {
var e = new ComputationException("test");
assertEquals("test", e.getMessage());
}

@Test
void testMessageAndThrowableConstructor() {
var cause = new RuntimeException("test");
var e = new ComputationException("test", cause);
assertEquals("test", e.getMessage());
assertEquals(cause, e.getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.powsybl.ws.commons.computation.service.AbstractComputationService;
import com.powsybl.ws.commons.computation.service.AbstractResultContext;
import com.powsybl.ws.commons.computation.service.AbstractWorkerService;
import com.powsybl.ws.commons.computation.service.CancelContext;
import com.powsybl.ws.commons.computation.service.ExecutionService;
import com.powsybl.ws.commons.computation.service.NotificationService;
import com.powsybl.ws.commons.computation.service.ReportService;
Expand All @@ -29,6 +28,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
Expand All @@ -38,12 +38,13 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;

import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RECEIVER;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -154,6 +155,7 @@ public UUID runAndSaveResult(MockComputationRunContext runContext) {
private enum ComputationResultWanted {
SUCCESS,
FAIL,
CANCELLED,
COMPLETED
}

Expand All @@ -179,6 +181,9 @@ protected String getComputationType() {
protected CompletableFuture<Object> getCompletableFuture(MockComputationRunContext runContext, String provider, UUID resultUuid) {
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
switch (runContext.getComputationResWanted()) {
case CANCELLED:
completableFuture.completeExceptionally(new CancellationException("Computation cancelled"));
break;
case FAIL:
completableFuture.completeExceptionally(new RuntimeException("Computation failed"));
break;
Expand All @@ -189,6 +194,10 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
}
return completableFuture;
}

public void addFuture(UUID id, CompletableFuture<Object> future) {
this.futures.put(id, future);
}
}

private MockComputationWorkerService workerService;
Expand All @@ -203,10 +212,11 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
final String provider = "MockComputation_Provider";
Message<String> message;
MockComputationRunContext runContext;
MockComputationResultService resultService;

@BeforeEach
void init() {
MockComputationResultService resultService = new MockComputationResultService();
resultService = new MockComputationResultService();
notificationService = new NotificationService(publisher);
workerService = new MockComputationWorkerService(
networkStoreService,
Expand Down Expand Up @@ -256,55 +266,74 @@ void testComputationFailed() {
initComputationExecution();
runContext.setComputationResWanted(ComputationResultWanted.FAIL);

// execution / cleaning
assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message));
assertNull(resultService.findStatus(RESULT_UUID));
}

@Test
void testStopComputationSendsCancelMessage() {
computationService.stop(RESULT_UUID, receiver);
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));
}

@Test
void testComputationCancelledInConsumeRun() {
// inits
initComputationExecution();
runContext.setComputationResWanted(ComputationResultWanted.CANCELLED);

// execution / cleaning
workerService.consumeRun().accept(message);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class));
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class));
}

@Test
void testComputationCancelled() {
MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE;
void testComputationCancelledInConsumeCancel() {
MockComputationStatus baseStatus = MockComputationStatus.RUNNING;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

computationService.stop(RESULT_UUID, receiver);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));
CompletableFuture<Object> futureThatCouldBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldBeCancelled.cancel(true)).thenReturn(true);
workerService.addFuture(RESULT_UUID, futureThatCouldBeCancelled);

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
workerService.consumeCancel().accept(message);
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class));
}

@Test
void testComputationCancelFailed() {
MockComputationStatus baseStatus = MockComputationStatus.COMPLETED;
MockComputationStatus baseStatus = MockComputationStatus.RUNNING;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

computationService.stop(RESULT_UUID, receiver, userId);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));
CompletableFuture<Object> futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false);
workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled);

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_USER_ID, userId)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
assertEquals(userId, cancelContext.userId());
workerService.consumeCancel().accept(message);
assertNotNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}

@Test
void testComputationCancelFailsIfNoMatchingFuture() {
workerService.consumeCancel().accept(message);
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}

@Test
void testComputationCancelledBeforeRunReturnsNoResult() {
workerService.consumeCancel().accept(message);

initComputationExecution();
workerService.consumeRun().accept(message);
verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class));
}
}

0 comments on commit 8699ee2

Please sign in to comment.