You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello. I am using your library to schedule tasks in directed acyclic graphs (DAGs). I implemented the "shouldExecute" method so that I can choose whether a node is executed based on whether at least one parent node has a success result (ANY) or if all of them have terminated successfully (ALL).
@OverridepublicbooleanshouldExecute(ExecutionResults<String, Boolean> parentResults) {
booleanshouldExecute = true;
if (parentResults.hasAnyResult()) {
List<Boolean> results = parentResults.getAll().stream()
.map(ExecutionResult::getResult)
.collect(Collectors.toList());
if (shouldExecuteConfig.equals(SHOULD_EXECUTE_ALL)) {
// All parents must end with successshouldExecute = results.stream().allMatch(Boolean.TRUE::equals);
} elseif (shouldExecuteConfig.equals(SHOULD_EXECUTE_ANY)) {
// At least one parent must end with successshouldExecute = results.stream().anyMatch(Boolean.TRUE::equals);
}
}
returnshouldExecute;
}
By doing that, I encountered an error when executing the next type of DAG:
where stages 1, 2 and 3 are of type ALL and stage final is type ANY.
In this scenario, stage final should be executed when stages 1 2 and 3 finish, when at least one of them finishes with success.
In the case where stage 1 finishes with success and stage 2 with error, stage 3 is skipped and stage final is executed twice, which is wrong.
2024-07-12 11:24:45 INFO Paths:
Path #0
Stage 1[]
Stage 2[Stage 1]
Stage 3[Stage 2]
Stage final[Stage 1, Stage 2, Stage 3]
2024-07-12 11:25:26 INFO Finished with success Stage 1
2024-07-12 11:26:02 ERROR Finished with error Stage 2
2024-07-12 11:26:02 INFO Started Stage final
2024-07-12 11:26:02 INFO Started Stage final
....
A work around to solve this problem is to create another status to the node entity: SUBMITTED. This way a task will only be submitted once to the execution engine.
privatevoiddoExecute(finalCollection<Node<T, R>> nodes, finalExecutionConfigconfig) {
for (Node<T, R> node : nodes) {
forceStopIfRequired();
if (this.state.shouldProcess(node) && !node.isSubmitted()) { // check is node is already submittedTask<T, R> task = newTask(config, node);
ExecutionResults<T, R> parentResults = parentResults(task, node);
task.setParentResults(parentResults);
task.setNodeProvider(newDefaultNodeProvider<T, R>(state));
if (node.isNotProcessed() && task.shouldExecute(parentResults)) {
this.state.incrementUnProcessedNodesCount();
logger.debug("Submitting {} node for execution", node.getValue());
this.executionEngine.submit(task);
node.setSubmmitted(); //set node status to submitted
} elseif (node.isNotProcessed()){
node.setSkipped();
logger.debug("Execution Skipped for node # {} ", node.getValue());
this.state.markProcessingDone(node);
doExecute(node.getOutGoingNodes(), config);
}
} else {
logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
}
}
}
I expect your comments on this solution, or on any other approach that you may think of. Feel free to add this changes to your repo if you feel it is adecuate. Thank you.
The text was updated successfully, but these errors were encountered:
Hello. I am using your library to schedule tasks in directed acyclic graphs (DAGs). I implemented the "shouldExecute" method so that I can choose whether a node is executed based on whether at least one parent node has a success result (ANY) or if all of them have terminated successfully (ALL).
By doing that, I encountered an error when executing the next type of DAG:
where stages 1, 2 and 3 are of type ALL and stage final is type ANY.
In this scenario, stage final should be executed when stages 1 2 and 3 finish, when at least one of them finishes with success.
In the case where stage 1 finishes with success and stage 2 with error, stage 3 is skipped and stage final is executed twice, which is wrong.
A work around to solve this problem is to create another status to the node entity: SUBMITTED. This way a task will only be submitted once to the execution engine.
I expect your comments on this solution, or on any other approach that you may think of. Feel free to add this changes to your repo if you feel it is adecuate. Thank you.
The text was updated successfully, but these errors were encountered: