Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dag descriptor converter #91

Open
wants to merge 110 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
8a45c15
add todo mark
Oct 16, 2024
e360e8c
add unit test to drive the development
Oct 16, 2024
c6959d8
auto generate output mappings
Oct 16, 2024
cff7e49
modify gitignore
Oct 16, 2024
c3bc1da
add the situation that contains dot in mapping elements
Oct 16, 2024
1f0b816
remove useless variable
Oct 16, 2024
638ee3b
optimit code
Oct 16, 2024
2918395
adapt v2.0 when get descriptors
Oct 16, 2024
7cc01d4
adapt v2.0 when get descriptors
Oct 16, 2024
ff5a68a
remove context keyword in v2.0 version
Oct 16, 2024
e48f401
process inputMappings when set and get descriptor
Oct 17, 2024
03442df
fix index error in processInputMappingsWhenGetDescriptor and add unit…
Oct 17, 2024
a8ea584
optimit code
Oct 17, 2024
70a27d7
add input keyword to generate inputMappings and outputMappings
Oct 17, 2024
4d66de3
fix unit test
Oct 17, 2024
a84e51a
not limit version
Oct 17, 2024
d7384f5
remove useless variable
Oct 17, 2024
558d063
optimit code
Oct 17, 2024
0bf14af
generate end pass task
Oct 17, 2024
3a551e4
generate end pass task
Oct 17, 2024
1dfa9ad
fix unit test
Oct 17, 2024
7a6000f
fix unit test
Oct 17, 2024
a8ff214
fix sonar
Oct 17, 2024
e5fe076
fix sonar
Oct 17, 2024
1b2d113
change endtaskname
Oct 17, 2024
2181dfd
add unit test for generating end pass task
Oct 18, 2024
66a026d
optimit code
Oct 18, 2024
3147b08
optimit code
Oct 18, 2024
9e2c73e
optimit code
Oct 18, 2024
739a839
extract code into DescriptorParseService and implement
Oct 18, 2024
a051061
extract code into DescriptorParseService and implement
Oct 18, 2024
442b26f
add unit test for processWhenGetDescriptor and only use this method f…
Oct 18, 2024
e0f9612
fix error when element is number
Oct 18, 2024
611339f
allow array in json path when mapping
Oct 18, 2024
477daf2
allow array in json path when mapping
Oct 18, 2024
456cf81
allow array in json path when mapping
Oct 18, 2024
8d551cf
recovery useless modify
Oct 18, 2024
67509ed
remove logic to merge common prefixes and add logic to remove duplica…
Oct 21, 2024
b436b9d
remove useless modify
Oct 21, 2024
6d5bde5
modify unit test
Oct 21, 2024
db9994a
optimit code
Oct 21, 2024
f583634
optimit code
Oct 21, 2024
2c9309e
add comments
Oct 21, 2024
ef28da5
add comments
Oct 21, 2024
47ed905
add comments
Oct 21, 2024
7a560b2
add comments
Oct 21, 2024
878671f
add comments
Oct 21, 2024
5808911
recovery useless modify
Oct 21, 2024
49aca15
recovery useless modify
Oct 21, 2024
0172462
remove useless modify
Oct 21, 2024
fc33979
modify comments
Oct 21, 2024
e2ddfcd
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
21cbac1
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
d230d82
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
605d76e
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
fd188b0
optimit code
Oct 21, 2024
a5b288d
add comments
Oct 21, 2024
d448a64
add comments
Oct 21, 2024
7dbd534
add comments
Oct 21, 2024
35bd99a
add comments
Oct 21, 2024
20edf5f
modify return type from array to List
Oct 21, 2024
9741249
move all the calls of descriptorParseService into DescriptorManager
Oct 23, 2024
d9b721c
use strategy design pattern to redesign dag process
Oct 23, 2024
ad0be5f
optimit code
Oct 23, 2024
f41980a
add unit tests
Oct 23, 2024
5d1dac1
fix sonar
Oct 23, 2024
8f96160
fix sonar
Oct 23, 2024
2999042
fix sonar
Oct 23, 2024
f89dba3
optimit code
Oct 23, 2024
eaa760f
optimit code
Oct 23, 2024
cf97496
optimit code
Oct 23, 2024
525ab96
optimit code
Oct 23, 2024
1e1efaf
optimit code
Oct 23, 2024
0ed46a7
modify enter
Oct 24, 2024
23440b1
add license and comments
Oct 24, 2024
1dc06fb
add license and comments
Oct 24, 2024
d58cbe4
rename method name
Oct 24, 2024
420b4fd
rename method name
Oct 24, 2024
6b1f0f8
redesign DescriptorManager method
Oct 24, 2024
bd0b905
fix unit test
Oct 24, 2024
e9ee796
modify method name
Oct 24, 2024
d88ac16
modify method name
Oct 24, 2024
6a6af2b
add comments
Oct 24, 2024
efb2c42
create dag descriptor converter
Oct 24, 2024
7790437
create dag descriptor converter
Oct 24, 2024
6022894
create dag descriptor converter
Oct 24, 2024
b0a2c02
fix unit test
Oct 24, 2024
5b8b6fa
move converter into converter package
Oct 24, 2024
8748aef
modify DO to PO
Oct 24, 2024
587d375
modify DO to PO
Oct 24, 2024
33de274
remove type id comment after polymorphism serialize
Oct 25, 2024
a5e5d35
add unit tests
Oct 25, 2024
743be89
add unit tests
Oct 25, 2024
1ee9504
add unit tests to cover if conditions in DescriptorManager
Oct 25, 2024
db25f4d
add unit test for createDAGDescriptor method in DescriptorManage
Oct 25, 2024
b3e8348
regard input as output directly in PassTaskRunner
Oct 25, 2024
a816361
Merge branch 'generate_output_mappings' of github.com:weibocom/rill-f…
Oct 25, 2024
8e5d239
use output keyword in outputMappings in pass task
Oct 25, 2024
b0fe87f
fix unit test
Oct 25, 2024
d73eefe
add unit test for JSONPathInputOutputMapping and fix problem
Oct 28, 2024
d4cd047
remove useless comments
Oct 28, 2024
bb21e6c
add unit test for JSONPathInputOutputMapping
Oct 28, 2024
96d6fe3
use LinkedHashMultimap to rebuild DAGDescriptorConverterImpl
Oct 28, 2024
887a214
optimit code
Oct 28, 2024
75efbbf
modify comments
Oct 28, 2024
94a5683
optimit code
Oct 28, 2024
36d657f
optimit code
Oct 28, 2024
5c7a535
rename method name
Oct 28, 2024
e3dfd2d
rename method name
Oct 28, 2024
625c419
rename DescriptorManager into DAGDescriptorManager
Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependency-reduced-pom.xml
.DS_Store
/rill-flow-web/src/main/resources/static/
.vscode/
logs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.resource.BaseResource;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
Expand All @@ -39,6 +39,7 @@
private String version;
private DAGType type;
private Timeline timeline;
@Setter
private List<BaseTask> tasks;
@Setter
private List<BaseResource> resources;
Expand All @@ -48,6 +49,9 @@
@Setter
private Map<String, List<Mapping>> commonMapping;
private String inputSchema;
private Map<String, Object> output;
@Setter
private String endTaskName;

@JsonCreator
public DAG(@JsonProperty("workspace") String workspace,
Expand All @@ -62,7 +66,8 @@
@JsonProperty("commonMapping") Map<String, List<Mapping>> commonMapping,
@JsonProperty("namespace") String namespace,
@JsonProperty("service") String service,
@JsonProperty("inputSchema") String inputSchema) {
@JsonProperty("inputSchema") String inputSchema,
@JsonProperty("output") Map<String, Object> output) {

Check warning on line 70 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java#L70

Added line #L70 was not covered by tests
this.workspace = StringUtils.isBlank(workspace) ? namespace : workspace;
this.dagName = StringUtils.isBlank(dagName) ? service : dagName;
this.version = version;
Expand All @@ -74,5 +79,6 @@
this.defaultContext = defaultContext;
this.commonMapping = commonMapping;
this.inputSchema = inputSchema;
this.output = output;

Check warning on line 82 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java#L82

Added line #L82 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorPO {
private String descriptor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorVO {
private String descriptor;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
@JsonProperty("description") String description,
@JsonProperty("category") String category,
@JsonProperty("next") String next,
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputMappings") List<Mapping> inputMappings,
@JsonProperty("outputMappings") List<Mapping> outputMappings,
@JsonProperty("choices") List<Choice> choices,
Expand All @@ -55,7 +56,8 @@
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,

Check warning on line 59 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java#L59

Added line #L59 was not covered by tests
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.choices = choices;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public ForeachTask(
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.synchronization = synchronization;
this.iterationMapping = iterationMapping;
this.tasks = tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.ArrayList;
Expand All @@ -35,6 +36,7 @@
@Getter
@Setter
@JsonTypeName("pass")
@NoArgsConstructor
public class PassTask extends BaseTask {

@JsonCreator
Expand All @@ -51,8 +53,10 @@
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,

Check warning on line 58 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java#L58

Added line #L58 was not covered by tests
timeline, isKeyCallback, keyExp, parameters, templateId, input);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,

Check warning on line 57 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java#L57

Added line #L57 was not covered by tests
timeline, isKeyCallback, keyExp, parameters, templateId, input);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
this.conditions = conditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade,

Check warning on line 60 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java#L60

Added line #L60 was not covered by tests
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.conditions = conditions;
this.interruptions = interruptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ public SwitchTask(
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, null, false, inputMappings, null, progress,
degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
degrade, timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.switches = switches;
this.setNext(switches.stream().map(Switch::getNext).collect(Collectors.joining(",")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DAGWalkHelperTest extends Specification {

def "task status of parent task should be correct"() {
given:
ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null)
ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null, null)

TaskInfo parentTask = new TaskInfo(name: 'parent',
task: baseTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ObjectMapperFactory {
}

private static final YAMLFactory YAML_FACTORY = new YAMLFactory()
.configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false)
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
.configure(YAMLGenerator.Feature.USE_NATIVE_OBJECT_ID, false);
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(YAML_FACTORY);
Expand All @@ -63,6 +64,7 @@ public class ObjectMapperFactory {
YAML_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
YAML_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
YAML_MAPPER.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
YAML_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
YAML_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
YAML_MAPPER.registerSubtypes(
new NamedType(FunctionTask.class, "function"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DAGInfoDAOTest extends Specification {
def setup() {
dagInfo.executionId = executionId
dagInfo.dagStatus = DAGStatus.NOT_STARTED
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null)
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null, null)
}

def "updateDagInfo redis eval shardingKey check"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Slf4j
public class JSONPathInputOutputMapping implements InputOutputMapping, JSONPath {
Configuration conf = Configuration.builder().options(Option.DEFAULT_PATH_LEAF_TO_NULL).build();
private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[\"(.*?)\"]|\\['(.*?)']");
private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[(.*?)]");

@Value("${rill.flow.function.trigger.uri}")
private String rillFlowFunctionTriggerUri;
Expand Down Expand Up @@ -201,26 +201,69 @@
while (matcher.find()) {
if (matcher.group(1) != null) {
jsonPathParts.add(matcher.group(1));
} else if (matcher.group(2) != null) {
jsonPathParts.add(matcher.group(2));
}
}

jsonPathParts.remove(jsonPathParts.size() - 1);

Object current = map;
for (String part: jsonPathParts) {
for (int i = 0; i < jsonPathParts.size() - 1; i++) {
String part = jsonPathParts.get(i);
if (part.startsWith("\"") || part.startsWith("'")) {
part = part.substring(1, part.length() - 1);
}
if (current instanceof Map) {
Map<String, Object> mapCurrent = (Map<String, Object>) current;
current = mapCurrent.computeIfAbsent(part, k -> new HashMap<>());
} else {
break;
current = processMapJsonPathPart(current, part, jsonPathParts, i);
} else if (current instanceof List) {
current = processListJsonPathPart(current, part, jsonPathParts, i);
}
}

return JsonPath.using(conf).parse(map).set(path, value).json();
}

private Object processListJsonPathPart(Object current, String part, List<String> jsonPathParts, int i) {
List<Object> listCurrent = (List<Object>) current;
int index = Integer.parseInt(part);
Object insertPosition = listCurrent.get(index);
if (jsonPathParts.get(i + 1).matches("\\d+")) {
// 1. 下一个元素是数字,也就是数组的索引,所以需要创建数组并且填充到索引位置
List<Object> nextArray = createAndFillNextArrayPart(insertPosition, jsonPathParts, i);
listCurrent.set(index, nextArray);

Check warning on line 230 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java#L229-L230

Added lines #L229 - L230 were not covered by tests
} else if (i + 1 < jsonPathParts.size() && insertPosition == null) {
// 2. 下一个元素不是数字,则创建 map
listCurrent.set(index, new HashMap<>());
}
return listCurrent.get(index);
}

private Object processMapJsonPathPart(Object current, String part, List<String> jsonPathParts, int i) {
Map<String, Object> mapCurrent = (Map<String, Object>) current;
Object currentValue = mapCurrent.get(part);
if (jsonPathParts.get(i + 1).matches("\\d+")) {
List<Object> nextArray = createAndFillNextArrayPart(currentValue, jsonPathParts, i);
mapCurrent.put(part, nextArray);
} else if (i + 1 < jsonPathParts.size() && currentValue == null) {
mapCurrent.put(part, new HashMap<>());
}
return mapCurrent.get(part);
}

/**
* 为下一个元素创建数组类型对象,并用 null 值填充指定元素个数
*/
private List<Object> createAndFillNextArrayPart(Object nextPart, List<String> jsonPathParts, int i) {
List<Object> nextArray;
if (nextPart instanceof List) {
nextArray = (List<Object>) nextPart;
} else {
nextArray = new ArrayList<>();
}
int nextIndex = Integer.parseInt(jsonPathParts.get(i + 1));
for (int j = nextArray.size(); j <= nextIndex; j++) {
nextArray.add(null);
}
return nextArray;
}

@Override
public Map<String, Map<String, Object>> delete(Map<String, Map<String, Object>> map, String path) {
if (map == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.google.common.collect.Sets;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
Expand Down Expand Up @@ -60,7 +60,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map<Strin
log.info("pass task begin to run executionId:{}, taskInfoName:{}", executionId, taskInfo.getName());
if (CollectionUtils.isNotEmpty(taskInfo.getTask().getOutputMappings())) {
Map<String, Object> context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo);
outputMappings(context, new HashMap<>(), new HashMap<>(), taskInfo.getTask().getOutputMappings());
outputMappings(context, new HashMap<>(), input, taskInfo.getTask().getOutputMappings());
saveContext(executionId, context, Sets.newHashSet(taskInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,44 @@ class JSONPathTest extends Specification {
['input': ['type': 'gif']] | _
}

def "set value when there is an array in the path"() {
given:
String path = "\$.input.meta.user[1].id"
long value = 1

when:
mapping.setValue(map, value, path)

then:
mapping.getValue(map, path) == value

where:
map | _
[:] | _
['context': 123] | _
['input': ['type': 'gif']] | _
['input': ['meta': ['user': [['id': 1]]]]] | _
}

def "set value when the last part of the path is an array"() {
given:
String path = "\$.input.meta.user[1]"
long value = 1

when:
mapping.setValue(map, value, path)

then:
mapping.getValue(map, path) == value

where:
map | _
[:] | _
['context': 123] | _
['input': ['type': 'gif']] | _
['input': ['meta': ['user': [['id': 1]]]]] | _
}

def "set value intermediate route test contains dot"() {
given:
String path = "\$.input.meta.user.[\"id.key\"]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DagInfoDeserializeServiceImplTest extends Specification {
def setup() {
dagInfo.executionId = executionId
dagInfo.dagStatus = DAGStatus.NOT_STARTED
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null)
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null, null)
}

def "delDagInfo invoke setting if time above zero"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import lombok.*;

import java.util.Objects;

@AllArgsConstructor
@NoArgsConstructor
@Getter
Expand Down Expand Up @@ -50,4 +52,19 @@ public Mapping(String source, String target) {
this.source = source;
this.target = target;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Mapping mapping = (Mapping) o;
return Objects.equals(reference, mapping.reference) && Objects.equals(source, mapping.source)
&& Objects.equals(transform, mapping.transform) && Objects.equals(target, mapping.target)
&& Objects.equals(variable, mapping.variable);
}

@Override
public int hashCode() {
return Objects.hash(reference, source, transform, target, variable);
}
}
Loading
Loading