diff --git a/.gitignore b/.gitignore index 25e009317..936416a92 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dependency-reduced-pom.xml .DS_Store /rill-flow-web/src/main/resources/static/ .vscode/ +logs diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java index e1e72b6f8..8d33bd868 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/dag/DAG.java @@ -39,6 +39,7 @@ public class DAG { private String version; private DAGType type; private Timeline timeline; + @Setter private List tasks; @Setter private List resources; @@ -48,6 +49,9 @@ public class DAG { @Setter private Map> commonMapping; private String inputSchema; + private Map output; + @Setter + private String endTaskName; @JsonCreator public DAG(@JsonProperty("workspace") String workspace, @@ -62,7 +66,8 @@ public DAG(@JsonProperty("workspace") String workspace, @JsonProperty("commonMapping") Map> commonMapping, @JsonProperty("namespace") String namespace, @JsonProperty("service") String service, - @JsonProperty("inputSchema") String inputSchema) { + @JsonProperty("inputSchema") String inputSchema, + @JsonProperty("output") Map output) { this.workspace = StringUtils.isBlank(workspace) ? namespace : workspace; this.dagName = StringUtils.isBlank(dagName) ? service : dagName; this.version = version; @@ -74,5 +79,6 @@ public DAG(@JsonProperty("workspace") String workspace, this.defaultContext = defaultContext; this.commonMapping = commonMapping; this.inputSchema = inputSchema; + this.output = output; } } diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java index 371542a14..6c4c68c72 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java @@ -45,6 +45,7 @@ public ChoiceTask( @JsonProperty("description") String description, @JsonProperty("category") String category, @JsonProperty("next") String next, + @JsonProperty("input") Map input, @JsonProperty("inputMappings") List inputMappings, @JsonProperty("outputMappings") List outputMappings, @JsonProperty("choices") List choices, @@ -55,7 +56,8 @@ public ChoiceTask( @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); this.choices = choices; } diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ForeachTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ForeachTask.java index fa41071c7..ca0ffd003 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ForeachTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ForeachTask.java @@ -58,8 +58,10 @@ public ForeachTask( @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { + super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); this.synchronization = synchronization; this.iterationMapping = iterationMapping; this.tasks = tasks; diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java index eb1a5367f..8dd51faee 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java @@ -25,6 +25,7 @@ import com.weibo.rill.flow.interfaces.model.strategy.Timeline; import com.weibo.rill.flow.interfaces.model.task.BaseTask; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import java.util.ArrayList; @@ -35,6 +36,7 @@ @Getter @Setter @JsonTypeName("pass") +@NoArgsConstructor public class PassTask extends BaseTask { @JsonCreator @@ -51,8 +53,10 @@ public PassTask(@JsonProperty("name") String name, @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { + super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null)); } diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java index 9594129e0..b6ca188c1 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ReturnTask.java @@ -52,8 +52,10 @@ public ReturnTask(@JsonProperty("name") String name, @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { + super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null)); this.conditions = conditions; } diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java index d7d51cb16..37ebac729 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SuspenseTask.java @@ -55,8 +55,10 @@ public SuspenseTask(@JsonProperty("name") String name, @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { + super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); this.conditions = conditions; this.interruptions = interruptions; } diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SwitchTask.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SwitchTask.java index 9365ea266..ca4c9dafc 100755 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SwitchTask.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/SwitchTask.java @@ -51,9 +51,10 @@ public SwitchTask( @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { super(name, title, description, category, null, false, inputMappings, null, progress, - degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + degrade, timeline, isKeyCallback, keyExp, parameters, templateId, input); this.switches = switches; this.setNext(switches.stream().map(Switch::getNext).collect(Collectors.joining(","))); } diff --git a/rill-flow-dag/olympicene-core/src/test/groovy/com/weibo/rill/flow/olympicene/core/helper/DAGWalkHelperTest.groovy b/rill-flow-dag/olympicene-core/src/test/groovy/com/weibo/rill/flow/olympicene/core/helper/DAGWalkHelperTest.groovy index 07f736ac7..e4c084c8b 100644 --- a/rill-flow-dag/olympicene-core/src/test/groovy/com/weibo/rill/flow/olympicene/core/helper/DAGWalkHelperTest.groovy +++ b/rill-flow-dag/olympicene-core/src/test/groovy/com/weibo/rill/flow/olympicene/core/helper/DAGWalkHelperTest.groovy @@ -20,7 +20,7 @@ class DAGWalkHelperTest extends Specification { def "task status of parent task should be correct"() { given: - ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null) + ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null, null) TaskInfo parentTask = new TaskInfo(name: 'parent', task: baseTask, diff --git a/rill-flow-dag/olympicene-ddl/src/main/java/com/weibo/rill/flow/olympicene/ddl/serialize/ObjectMapperFactory.java b/rill-flow-dag/olympicene-ddl/src/main/java/com/weibo/rill/flow/olympicene/ddl/serialize/ObjectMapperFactory.java index 715a22bff..44501f94d 100644 --- a/rill-flow-dag/olympicene-ddl/src/main/java/com/weibo/rill/flow/olympicene/ddl/serialize/ObjectMapperFactory.java +++ b/rill-flow-dag/olympicene-ddl/src/main/java/com/weibo/rill/flow/olympicene/ddl/serialize/ObjectMapperFactory.java @@ -63,6 +63,7 @@ public class ObjectMapperFactory { YAML_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); YAML_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); YAML_MAPPER.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true); + YAML_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); YAML_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); YAML_MAPPER.registerSubtypes( new NamedType(FunctionTask.class, "function"), diff --git a/rill-flow-dag/olympicene-storage/src/test/groovy/com/weibo/rill/flow/olympicene/storage/save/impl/DAGInfoDAOTest.groovy b/rill-flow-dag/olympicene-storage/src/test/groovy/com/weibo/rill/flow/olympicene/storage/save/impl/DAGInfoDAOTest.groovy index df72cdd6f..3c8d8b8f3 100644 --- a/rill-flow-dag/olympicene-storage/src/test/groovy/com/weibo/rill/flow/olympicene/storage/save/impl/DAGInfoDAOTest.groovy +++ b/rill-flow-dag/olympicene-storage/src/test/groovy/com/weibo/rill/flow/olympicene/storage/save/impl/DAGInfoDAOTest.groovy @@ -20,7 +20,7 @@ class DAGInfoDAOTest extends Specification { def setup() { dagInfo.executionId = executionId dagInfo.dagStatus = DAGStatus.NOT_STARTED - dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null) + dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null, null) } def "updateDagInfo redis eval shardingKey check"() { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java index 8ff75049b..5af1226e6 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/mappings/JSONPathInputOutputMapping.java @@ -42,7 +42,7 @@ @Slf4j public class JSONPathInputOutputMapping implements InputOutputMapping, JSONPath { Configuration conf = Configuration.builder().options(Option.DEFAULT_PATH_LEAF_TO_NULL).build(); - private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[\"(.*?)\"]|\\['(.*?)']"); + private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[(.*?)]"); @Value("${rill.flow.function.trigger.uri}") private String rillFlowFunctionTriggerUri; @@ -201,26 +201,68 @@ public Map setValue(Map map, Object value, Strin while (matcher.find()) { if (matcher.group(1) != null) { jsonPathParts.add(matcher.group(1)); - } else if (matcher.group(2) != null) { - jsonPathParts.add(matcher.group(2)); } } - jsonPathParts.remove(jsonPathParts.size() - 1); - Object current = map; - for (String part: jsonPathParts) { + for (int i = 0; i < jsonPathParts.size() - 1; i++) { + String part = jsonPathParts.get(i); + if (part.startsWith("\"") || part.startsWith("'")) { + part = part.substring(1, part.length() - 1); + } if (current instanceof Map) { - Map mapCurrent = (Map) current; - current = mapCurrent.computeIfAbsent(part, k -> new HashMap<>()); - } else { - break; + current = processMapJsonPathPart(current, part, jsonPathParts, i); + } else if (current instanceof List) { + current = processListJsonPathPart(current, part, jsonPathParts, i); } } return JsonPath.using(conf).parse(map).set(path, value).json(); } + private Object processListJsonPathPart(Object current, String part, List jsonPathParts, int i) { + List listCurrent = (List) current; + int index = Integer.parseInt(part); + Object insertPosition = listCurrent.get(index); + if (insertPosition != null) { + return insertPosition; + } + if (jsonPathParts.get(i + 1).matches("\\d+")) { + List nextArray = createAndFillNextArrayPart(jsonPathParts, i); + listCurrent.set(index, nextArray); + } else if (i + 1 < jsonPathParts.size() ) { + listCurrent.set(index, new HashMap<>()); + } + return listCurrent.get(index); + } + + private Object processMapJsonPathPart(Object current, String part, List jsonPathParts, int i) { + Map mapCurrent = (Map) current; + Object currentValue = mapCurrent.get(part); + if (currentValue != null) { + return currentValue; + } + if (jsonPathParts.get(i + 1).matches("\\d+")) { + List nextArray = createAndFillNextArrayPart(jsonPathParts, i); + mapCurrent.put(part, nextArray); + } else if (i + 1 < jsonPathParts.size()) { + mapCurrent.put(part, new HashMap<>()); + } + return mapCurrent.get(part); + } + + /** + * 下一个元素创建数组类型对象,并用 null 值填充指定元素个数 + */ + private List createAndFillNextArrayPart(List jsonPathParts, int i) { + List nextArray = new ArrayList<>(); + int nextIndex = Integer.parseInt(jsonPathParts.get(i + 1)); + for (int j = 0; j <= nextIndex; j++) { + nextArray.add(null); + } + return nextArray; + } + @Override public Map> delete(Map> map, String path) { if (map == null) { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/PassTaskRunner.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/PassTaskRunner.java index 0920257f4..ca568837b 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/PassTaskRunner.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/PassTaskRunner.java @@ -60,7 +60,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo); - outputMappings(context, new HashMap<>(), new HashMap<>(), taskInfo.getTask().getOutputMappings()); + outputMappings(context, input, new HashMap<>(), taskInfo.getTask().getOutputMappings()); saveContext(executionId, context, Sets.newHashSet(taskInfo)); } diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/json/JSONPathTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/json/JSONPathTest.groovy index 48057d534..f5b0259a1 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/json/JSONPathTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/json/JSONPathTest.groovy @@ -73,6 +73,42 @@ class JSONPathTest extends Specification { ['input': ['type': 'gif']] | _ } + def "set value when there is an array in the path"() { + given: + String path = "\$.input.meta.user[1].id" + long value = 1 + + when: + mapping.setValue(map, value, path) + + then: + mapping.getValue(map, path) == value + + where: + map | _ + [:] | _ + ['context': 123] | _ + ['input': ['type': 'gif']] | _ + } + + def "set value when the last part of the path is an array"() { + given: + String path = "\$.input.meta.user[1]" + long value = 1 + + when: + mapping.setValue(map, value, path) + + then: + mapping.getValue(map, path) == value + + where: + map | _ + [:] | _ + ['context': 123] | _ + ['input': ['type': 'gif']] | _ + } + def "set value intermediate route test contains dot"() { given: String path = "\$.input.meta.user.[\"id.key\"]" diff --git a/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/DagInfoDeserializeServiceImplTest.groovy b/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/DagInfoDeserializeServiceImplTest.groovy index 3158878ea..072591b72 100644 --- a/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/DagInfoDeserializeServiceImplTest.groovy +++ b/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/DagInfoDeserializeServiceImplTest.groovy @@ -21,7 +21,7 @@ class DagInfoDeserializeServiceImplTest extends Specification { def setup() { dagInfo.executionId = executionId dagInfo.dagStatus = DAGStatus.NOT_STARTED - dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null) + dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null, null) } def "delDagInfo invoke setting if time above zero"() { diff --git a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/mapping/Mapping.java b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/mapping/Mapping.java index 4943b17a1..66b6c0e0b 100644 --- a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/mapping/Mapping.java +++ b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/mapping/Mapping.java @@ -18,6 +18,8 @@ import lombok.*; +import java.util.Objects; + @AllArgsConstructor @NoArgsConstructor @Getter @@ -50,4 +52,19 @@ public Mapping(String source, String target) { this.source = source; this.target = target; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Mapping mapping = (Mapping) o; + return Objects.equals(reference, mapping.reference) && Objects.equals(source, mapping.source) + && Objects.equals(transform, mapping.transform) && Objects.equals(target, mapping.target) + && Objects.equals(variable, mapping.variable); + } + + @Override + public int hashCode() { + return Objects.hash(reference, source, transform, target, variable); + } } diff --git a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/BaseTask.java b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/BaseTask.java index 449681caa..46e23bca7 100644 --- a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/BaseTask.java +++ b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/BaseTask.java @@ -51,6 +51,7 @@ public abstract class BaseTask { private String keyExp; private Map parameters; private String templateId; + private Map input; public abstract List subTasks(); } diff --git a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/FunctionTask.java b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/FunctionTask.java index 959636237..8e6f8e2b1 100644 --- a/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/FunctionTask.java +++ b/rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/task/FunctionTask.java @@ -69,8 +69,10 @@ public FunctionTask(@JsonProperty("name") String name, @JsonProperty("isKeyCallback") boolean isKeyCallback, @JsonProperty("keyExp") String keyExp, @JsonProperty("parameters") Map parameters, - @JsonProperty("templateId") String templateId) { - super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId); + @JsonProperty("templateId") String templateId, + @JsonProperty("input") Map input) { + super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, + timeline, isKeyCallback, keyExp, parameters, templateId, input); this.resourceProtocol = resourceProtocol; this.resourceName = resourceName; this.resource = resource; diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java index b0147d5e3..30a667713 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java @@ -70,8 +70,7 @@ public String handle(Resource resource, DispatchInfo dispatchInfo) { Map data = Maps.newHashMap(); Optional.ofNullable(dispatchInfo.getInput()).ifPresent(data::putAll); Long uid = Optional.ofNullable(data.get("uid")).map(it -> Long.parseLong(String.valueOf(it))).orElse(0L); - String dagDescriptor = descriptorManager.getDagDescriptor(uid, data, resource.getSchemeValue()); - DAG dag = dagBuilder.parse(dagDescriptor); + DAG dag = descriptorManager.getDAG(uid, data, resource.getSchemeValue()); String executionId = ExecutionIdUtil.generateExecutionId(dag); data.put("flow_execution_id", executionId); DAGSettings dagSettings = DAGSettings.builder() diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java index 6c5b8c378..777214051 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java @@ -30,13 +30,10 @@ import com.google.common.collect.Maps; import com.weibo.rill.flow.common.exception.TaskException; import com.weibo.rill.flow.common.model.BizError; -import com.weibo.rill.flow.common.model.Node; -import com.weibo.rill.flow.common.model.NodeType; import com.weibo.rill.flow.interfaces.model.resource.Resource; import com.weibo.rill.flow.olympicene.core.model.event.DAGDescriptorEvent; import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient; import com.weibo.rill.flow.service.manager.DescriptorManager; -import com.weibo.rill.flow.service.service.ProtocolPluginService; import com.weibo.rill.flow.service.statistic.DAGSubmitChecker; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; @@ -48,7 +45,10 @@ import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; @@ -67,8 +67,6 @@ public class DAGDescriptorFacade { private static final String DESCRIPTOR_ID = "descriptor_id"; private static final String DESCRIPTOR = "descriptor"; @Autowired - ProtocolPluginService protocolPluginService; - @Autowired private DescriptorManager descriptorManager; @Autowired @Qualifier("descriptorRedisClient") @@ -191,7 +189,7 @@ public Map addDescriptor(String identity, String businessId, Str .alias(alias) .attachments(attachments) .type(DAGDescriptorEvent.Type.addDescriptor) - .build(); + .add(false).build(); applicationEventPublisher.publishEvent(new DAGDescriptorEvent(operation)); return ImmutableMap.of(RET, descriptorId != null, DESCRIPTOR_ID, Optional.ofNullable(descriptorId).orElse("")); @@ -208,13 +206,14 @@ public Map getDescriptor(Long uidOriginal, Map i .map(it -> Long.parseLong(String.valueOf(it))) .orElse(0L) ); + String descriptor = descriptorManager.getDescriptor(uid, input, descriptorId); return ImmutableMap.of(DESCRIPTOR_ID, descriptorId, "uid", String.valueOf(uid), - DESCRIPTOR, descriptorManager.getDagDescriptor(uid, input, descriptorId)); + DESCRIPTOR, descriptor); } public JSONObject getDescriptor(String descriptorId) { - String descriptor = descriptorManager.getDagDescriptor(null, null, descriptorId); + String descriptor = descriptorManager.getDescriptor(null, null, descriptorId); JSONObject descriptorObject = yamlToJson(descriptor); if (descriptorObject == null) { log.warn("descriptorId:{} descriptor is null", descriptorId); diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGRuntimeFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGRuntimeFacade.java index b392d765c..18f8d4dd6 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGRuntimeFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGRuntimeFacade.java @@ -377,9 +377,8 @@ public Map functionDispatchParams(String executionId, String tas } public Map dependencyCheck(String descriptorId, String descriptor) { - String dagDescriptor = StringUtils.isNotBlank(descriptorId) ? - descriptorManager.getDagDescriptor(0L, Collections.emptyMap(), descriptorId) : descriptor; - DAG dag = dagStringParser.parse(dagDescriptor); + DAG dag = StringUtils.isNotBlank(descriptorId) ? + descriptorManager.getDAG(0L, Collections.emptyMap(), descriptorId) : dagStringParser.parse(descriptor); Map> dependencies = dagWalkHelper.getDependedResources(dag); List> resourceToNames = dependencies.entrySet().stream() .map(entry -> ImmutableMap.of("resource_name", entry.getKey(), "names", entry.getValue())) diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java index 2cebe2ac9..f6a771192 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java @@ -119,8 +119,7 @@ public Map submit(User flowUser, String descriptorId, Map submit(Long uid, String descriptorId, Map context, String callback, ResourceCheckConfig resourceCheckConfig) { - String dagDescriptor = descriptorManager.getDagDescriptor(uid, context, descriptorId); - DAG dag = dagStringParser.parse(dagDescriptor); + DAG dag = descriptorManager.getDAG(uid, context, descriptorId); String executionId = ExecutionIdUtil.generateExecutionId(dag); dagSubmitChecker.check(executionId, resourceCheckConfig); diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/manager/DescriptorManager.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/manager/DescriptorManager.java index a92fc06c2..2f431b1cb 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/manager/DescriptorManager.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/manager/DescriptorManager.java @@ -30,6 +30,7 @@ import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager; import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser; import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient; +import com.weibo.rill.flow.service.strategies.DAGProcessStrategyContext; import com.weibo.rill.flow.service.util.ExecutionIdUtil; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -140,6 +141,9 @@ public class DescriptorManager { private AviatorCache aviatorCache; @Autowired private SwitcherManager switcherManagerImpl; + @Autowired + @Qualifier("dagProcessStrategyContext") + private DAGProcessStrategyContext dagProcessStrategyContext; private final Cache descriptorRedisKeyToYamlCache = CacheBuilder.newBuilder() .maximumSize(300) @@ -150,9 +154,24 @@ public class DescriptorManager { .expireAfterWrite(60, TimeUnit.SECONDS) .build(); - public String getDagDescriptor(Long uid, Map input, String dagDescriptorId) { + /** + * 获取 DAG 对象,用于 rill-flow 系统内部流程执行 + */ + public DAG getDAG(Long uid, Map input, String dagDescriptorId) { + // 调用量比较小 useCache为false 实时取最新的yaml保证更新会立即生效 + String descriptor = getDagDescriptorWithCache(uid, input, dagDescriptorId, false); + descriptor = dagProcessStrategyContext.transformDescriptor(descriptor, DAGProcessStrategyContext.DEFAULT_STRATEGY); + return dagParser.parse(descriptor); + } + + /** + * 获取 DAG 描述符,用于提供用户编辑和展示 + */ + public String getDescriptor(Long uid, Map input, String dagDescriptorId) { // 调用量比较小 useCache为false 实时取最新的yaml保证更新会立即生效 - return getDagDescriptorWithCache(uid, input, dagDescriptorId, false); + String descriptor = getDagDescriptorWithCache(uid, input, dagDescriptorId, false); + // 在下发给用户展示和编辑之前,对工作流描述符的属性进行处理,去除系统运行所需的属性 + return dagProcessStrategyContext.transformDescriptor(descriptor, DAGProcessStrategyContext.CUSTOM_STRATEGY); } /** @@ -175,7 +194,7 @@ public String getDagDescriptorWithCache(Long uid, Map input, Str // 校验dagDescriptorId String[] fields = StringUtils.isEmpty(dagDescriptorId) ? new String[0] : dagDescriptorId.trim().split(ReservedConstant.COLON); if (fields.length < 2 || nameInvalid(fields[0], fields[1])) { - log.info("getDagDescriptor dagDescriptorId data format error, dagDescriptorId:{}", dagDescriptorId); + log.info("getDescriptor dagDescriptorId data format error, dagDescriptorId:{}", dagDescriptorId); throw new TaskException(BizError.ERROR_DATA_FORMAT.getCode(), "dagDescriptorId:" + dagDescriptorId + " format error"); } @@ -185,7 +204,7 @@ public String getDagDescriptorWithCache(Long uid, Map input, Str String thirdField = fields.length > 2 ? fields[2] : null; if (StringUtils.isEmpty(thirdField)) { thirdField = getDescriptorAliasByGrayRule(uid, input, businessId, featureName); - log.info("getDagDescriptor result businessId:{} featureName:{} alias:{}", businessId, featureName, thirdField); + log.info("getDescriptor result businessId:{} featureName:{} alias:{}", businessId, featureName, thirdField); } String descriptorRedisKey; if (thirdField.startsWith(MD5_PREFIX)) { @@ -209,7 +228,7 @@ public String getDagDescriptorWithCache(Long uid, Map input, Str } catch (TaskException taskException) { throw taskException; } catch (Exception e) { - log.warn("getDagDescriptor fails, uid:{}, dagDescriptorId:{}", uid, dagDescriptorId, e); + log.warn("getDescriptor fails, uid:{}, dagDescriptorId:{}", uid, dagDescriptorId, e); throw new TaskException(BizError.ERROR_PROCESS_FAIL.getCode(), String.format("get descriptor:%s fails", dagDescriptorId)); } } @@ -482,6 +501,8 @@ public String createDAGDescriptor(String businessId, String featureName, String businessId, dag.getWorkspace(), featureName, dag.getDagName()); throw new TaskException(BizError.ERROR_DATA_FORMAT, "name not match"); } + // 在存储到 redis 之前,对 DAG 属性进行处理,增加系统运行所需的属性 + dag = dagProcessStrategyContext.transformDAGProperties(dag, DAGProcessStrategyContext.CUSTOM_STRATEGY); createAlias(businessId, featureName, alias); @@ -494,7 +515,7 @@ public String createDAGDescriptor(String businessId, String featureName, String argv.add(String.valueOf(versionMaxCount)); argv.add(String.valueOf(System.currentTimeMillis())); argv.add(md5); - argv.add(descriptor); + argv.add(dagParser.serialize(dag)); redisClient.eval(VERSION_ADD, businessId, keys, argv); return buildDescriptorId(businessId, featureName, MD5_PREFIX + md5); diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategy.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategy.java new file mode 100644 index 000000000..702a43b2d --- /dev/null +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategy.java @@ -0,0 +1,371 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; +import com.jayway.jsonpath.JsonPath; +import com.weibo.rill.flow.interfaces.model.mapping.Mapping; +import com.weibo.rill.flow.interfaces.model.task.BaseTask; +import com.weibo.rill.flow.olympicene.core.model.dag.DAG; +import com.weibo.rill.flow.olympicene.core.model.task.PassTask; +import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Component(DAGProcessStrategyContext.CUSTOM_STRATEGY) +public class CustomDAGProcessStrategy implements DAGProcessStrategy { + + private static final String CONTEXT_PREFIX = "$.context."; + private static final String INPUT_PREFIX = "$.input."; + private static final String DAG_END_TASK_NAME = "endPassTask"; + + @Resource + private DAGStringParser dagParser; + + /** + * 设置 DAG 时的处理 + * @param dag 待处理的DAG对象 + */ + @Override + public DAG transformDAGProperties(DAG dag) { + Map taskMap = getTaskMapByDag(dag); + // 1. 处理 task 的 input 以及 dag 的 output,为任务生成原始的 inputMappings(input 中的来源直接作为 source,如 $.functionA.data.id) + // 返回是否需要后续处理,不需要后续处理则直接返回 + if (!processInputToGenerateInputMappings(dag, taskMap)) { + return dag; + } + // 2. 处理任务的 inputMappings,返回各任务 inputMappings 的 source 对应的元素列表的列表 + Map>> taskPathsMap = processTaskInputMappings(dag, taskMap); + // 3. 通过各个任务 inputMappings 对应的元素列表的列表,生成任务的 outputMappings + LinkedHashMultimap outputMappingsMultimap = getOutputMappingsByPaths(taskPathsMap); + // 4. 将生成的 outputMappings 设置到对应的 task + generateOutputMappingsIntoTasks(outputMappingsMultimap, taskMap); + return dag; + } + + /** + * 处理各个任务的 inputMappings,实现 inputMappings 的填充,返回 inputMappings 对应的元素列表的列表 + * @param dag DAG对象 + * @param taskMap 任务映射 + * @return 任务路径映射,如 functionB 任务的 inputMappings 包含两条,source 分别为: + * $["functionA"]["data"][0]["id"] 和 $["functionA"]["data"][0]["name"] + * 则返回: ["functionB": [["data", "0", "id"], ["data", "1", "id"]]] + */ + private Map>> processTaskInputMappings(DAG dag, Map taskMap) { + Map>> taskPathsMap = new HashMap<>(); + dag.getTasks().forEach(task -> processTaskInputMapping(task, taskMap, taskPathsMap, dag.getEndTaskName())); + return taskPathsMap; + } + + /** + * 分析和处理任务的 inputMappings,并将结果填充到 taskPathsMap + * @param task 待处理的任务 + * @param taskMap 任务映射 + * @param taskPathsMap 任务路径映射 + * @param endTaskName 结束任务名称 + */ + private void processTaskInputMapping(BaseTask task, Map taskMap, + Map>> taskPathsMap, String endTaskName) { + for (Mapping inputMapping : task.getInputMappings()) { + List elements = getSourcePathElementsByMapping(inputMapping); + if (elements.size() < 2) { + continue; + } + String outputTaskName = elements.get(1); + if (taskMap.containsKey(outputTaskName)) { + // 更新 inputMappings,并填充 taskPathsMap + updateInputMapping(inputMapping, outputTaskName, elements, taskPathsMap); + if (task.getName().equalsIgnoreCase(endTaskName)) { + // 如果当前任务为图的结束任务,则需要更新提供来源数据的任务的 next 属性,让它指向结束任务 + updateTaskNext(taskMap.get(outputTaskName), endTaskName); + } + } + } + } + + /** + * 更新 inputMappings,将 source 中的 $.functionA.id 变成 $.context.functionA.id,并填充 taskPathsMap + * @param inputMapping 输入映射 + * @param outputTaskName 输出任务名称 + * @param elements 路径元素 + * @param taskPathsMap 任务路径映射 + */ + private void updateInputMapping(Mapping inputMapping, String outputTaskName, List elements, + Map>> taskPathsMap) { + inputMapping.setSource("$.context" + inputMapping.getSource().substring(1)); + taskPathsMap.computeIfAbsent(outputTaskName, k -> new ArrayList<>()) + .add(elements.subList(2, elements.size())); + } + + /** + * 将 taskName 放入到 task 的 next 中 + */ + private void updateTaskNext(BaseTask task, String taskName) { + String next = task.getNext(); + if (StringUtils.isEmpty(next)) { + task.setNext(taskName); + } else { + Set nextSet = new LinkedHashSet<>(Arrays.asList(next.split(","))); + nextSet.add(taskName); + task.setNext(String.join(",", nextSet)); + } + } + + /** + * DAG获取任务名称到任务的映射 + */ + private Map getTaskMapByDag(DAG dag) { + if (CollectionUtils.isEmpty(dag.getTasks())) { + return Maps.newHashMap(); + } + return dag.getTasks().stream().collect(Collectors.toMap(BaseTask::getName, Function.identity())); + } + + /** + * 将 inputMapping 中的 source 解析为 element 数组,如 $["functionA"]["data"]["ids"], 则返回 ["functionA", "data", "ids"] + */ + private List getSourcePathElementsByMapping(Mapping inputMapping) { + if (inputMapping.getSource() == null || !inputMapping.getSource().startsWith("$.")) { + return new ArrayList<>(); + } + String source = inputMapping.getSource(); + String path; + try { + path = JsonPath.compile(source).getPath(); + } catch (Exception e) { + return new ArrayList<>(); + } + + String normalizedPath = path.replace("\"", "'"); + return Arrays.stream(normalizedPath.split("\\['|']")) + .filter(StringUtils::isNotEmpty) + .toList(); + } + + /** + * 将 outputMappings 设置到对应的任务中 + */ + private void generateOutputMappingsIntoTasks(LinkedHashMultimap outputMappingsMultimap, Map taskMap) { + if (outputMappingsMultimap.isEmpty()) { + return; + } + outputMappingsMultimap.forEach((taskName, path) -> { + BaseTask task = taskMap.get(taskName); + if (task != null) { + List outputMappings = Optional.ofNullable(task.getOutputMappings()).orElse(new ArrayList<>()); + Set targets = outputMappings.stream().map(Mapping::getTarget).collect(Collectors.toSet()); + String target = CONTEXT_PREFIX + taskName + path; + if (!targets.contains(target)) { + outputMappings.add(new Mapping("$.output" + path, target)); + task.setOutputMappings(outputMappings); + } + } + }); + } + + /** + * 根据 path 元素列表,生成任务的 outputMappings + */ + private LinkedHashMultimap getOutputMappingsByPaths(Map>> taskPathsMap) { + LinkedHashMultimap result = LinkedHashMultimap.create(); + + for (Map.Entry>> taskPathElementsEntry: taskPathsMap.entrySet()) { + String taskName = taskPathElementsEntry.getKey(); + List> elementsList = taskPathElementsEntry.getValue(); + for (List elements: elementsList) { + processPathElements(elements, result, taskName); + } + } + return result; + } + + /** + * 处理 jsonpath 对应的元素,并将结果设置到 result 中 + * 如 elements 为 ["$", "context", "functionA", "data", "text"] + * 则生成的结果为:$.context.functionA.data.text + */ + private void processPathElements(List elements, LinkedHashMultimap result, String taskName) { + StringBuilder mappingSb = new StringBuilder(); + elements.forEach(element -> { + if (element.contains(".")) { + mappingSb.append("['").append(element).append("']"); + } else if (element.matches("\\[\\d+]") || element.equals("[*]")) { + mappingSb.append(element); + } else { + mappingSb.append(".").append(element); + } + }); + if (!result.containsKey(mappingSb.toString())) { + result.put(taskName, mappingSb.toString()); + } + } + + /** + * 根据 task 的 input 以及 dag 的 output,生成任务的 inputMappings + * @return 是否通过新版本的任务 input 或 DAG output 配置,意即是否需要后续处理 + */ + private boolean processInputToGenerateInputMappings(DAG dag, Map taskMap) { + if (CollectionUtils.isEmpty(dag.getTasks())) { + return false; + } + // 1. 根据 dag 的 output 生成图的 end 任务 + PassTask endPassTask = generateEndPassTask(dag, taskMap); + boolean needPostProcess = endPassTask != null; + // 2. 生成各个任务原始的 inputMappings,此时 source 内容仍然为 input 中的原始配置 + // 如 input 为 "id: $.functionA.id" 则生成的 inputMapping 的 source 为 $.functionA.id,target 为 $.input.id + for (BaseTask task : dag.getTasks()) { + Map taskInput = task.getInput(); + if (MapUtils.isEmpty(taskInput)) { + continue; + } else { + needPostProcess = true; + } + List inputMappings = task.getInputMappings() == null ? new ArrayList<>() : task.getInputMappings(); + taskInput.entrySet().stream().filter(entry -> entry.getKey() != null && entry.getValue() != null).forEach(entry -> { + String key = entry.getKey(); + Object value = entry.getValue(); + String target = INPUT_PREFIX + key; + Mapping inputMapping; + if (value instanceof Map) { + inputMapping = JSON.parseObject(JSON.toJSONString(value), Mapping.class); + inputMapping.setTarget(target); + } else { + inputMapping = new Mapping(value.toString(), target); + } + inputMappings.add(inputMapping); + }); + task.setInputMappings(inputMappings); + } + return needPostProcess; + } + + /** + * 生成 end 任务,用于设置图的最终输出信息 + */ + private PassTask generateEndPassTask(DAG dag, Map taskMap) { + if (MapUtils.isEmpty(dag.getOutput())) { + return null; + } + dag.setEndTaskName(DAG_END_TASK_NAME); + PassTask endPassTask = new PassTask(); + endPassTask.setName(DAG_END_TASK_NAME); + endPassTask.setInput(dag.getOutput()); + endPassTask.setCategory("pass"); + // 由于图的 end 任务没有后续任务,所以需要生成它的 outputMappings 来实现将参数传递的信息放入到 context 中 + endPassTask.setOutputMappings(dag.getOutput().keySet().stream() + .map(key -> new Mapping(INPUT_PREFIX + key, CONTEXT_PREFIX + key)).toList()); + + taskMap.put(DAG_END_TASK_NAME, endPassTask); + dag.getTasks().add(endPassTask); + return endPassTask; + } + + /** + * 处理获取描述符时的逻辑 + */ + @Override + public String transformDescriptor(String descriptor) { + // 1. 解析 descriptor,获取 taskName 到 task 的映射 map,并判断是否需要后续处理 + DAG dag = dagParser.parse(descriptor); + Map taskMap = getTaskMapByDag(dag); + if (!needsPostProcessing(dag, taskMap)) { + return descriptor; + } + + // 2. 对非结束节点的任务进行处理,包括 inputMappings、outputMappings 等的处理,同时在任务列表中删除 DAG 结束节点 + List tasks = taskMap.values().stream().filter(task -> !task.getName().equals(dag.getEndTaskName())) + .map(task -> processTask(task, dag.getEndTaskName())).toList(); + + // 3. 重新序列化生成 descriptor + dag.setTasks(tasks); + dag.setEndTaskName(null); + return dagParser.serialize(dag); + } + + /** + * 判断是否需要后续处理 + * @return 如果DAG有结束任务或任何一个任务有 input 配置,则返回true,否则返回false + */ + private boolean needsPostProcessing(DAG dag, Map taskMap) { + return StringUtils.isNotBlank(dag.getEndTaskName()) || + taskMap.values().stream().anyMatch(task -> MapUtils.isNotEmpty(task.getInput())); + } + + /** + * 处理单个任务,包括 outputMappings、inputMappings、next 的处理 + */ + private BaseTask processTask(BaseTask task, String endTaskName) { + // 1. 处理 outputMappings,删除自动生成的配置项 + processOutputMappingsWhenGetDescriptor(task); + // 2. 处理 inputMappings,删除存在于 input 中的配置项 + processInputMappingsWhenGetDescriptor(task); + // 3. 处理 next,删除指向 endTask 的指针 + removeFromNext(task, endTaskName); + return task; + } + + /** + * 处理任务的 next 属性,删除指向 taskName 的指针 + */ + private void removeFromNext(BaseTask task, String taskName) { + if (task.getNext() == null || StringUtils.isEmpty(taskName)) { + return; + } + Set nextSet = new LinkedHashSet<>(Arrays.asList(task.getNext().split(","))); + nextSet.remove(taskName); + task.setNext(String.join(",", nextSet)); + } + + /** + * 处理获取描述符时的 outputMappings,删除自动生成的配置项 + */ + private void processOutputMappingsWhenGetDescriptor(BaseTask task) { + List outputMappings = task.getOutputMappings(); + if (CollectionUtils.isEmpty(outputMappings)) { + return; + } + List newOutputMappings = outputMappings.stream() + .filter(mapping -> !mapping.getTarget().startsWith(CONTEXT_PREFIX + task.getName() + ".")).toList(); + task.setOutputMappings(CollectionUtils.isEmpty(newOutputMappings)? null: newOutputMappings); + } + + /** + * 处理获取描述符时的 inputMappings, 从任务的 inputMappings 中删除存在于 input 中的配置项 + */ + private void processInputMappingsWhenGetDescriptor(BaseTask task) { + List inputMappings = task.getInputMappings(); + if (CollectionUtils.isEmpty(inputMappings)) { + return; + } + Set inputTargets = task.getInput().keySet().stream() + .map(key -> INPUT_PREFIX + key).collect(Collectors.toSet()); + + List filteredMappings = inputMappings.stream() + .filter(mapping -> !inputTargets.contains(mapping.getTarget())).toList(); + + task.setInputMappings(CollectionUtils.isEmpty(filteredMappings) ? null : filteredMappings); + } +} diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategy.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategy.java new file mode 100644 index 000000000..cc4e3cc96 --- /dev/null +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategy.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies; + +import com.weibo.rill.flow.olympicene.core.model.dag.DAG; + +public interface DAGProcessStrategy { + /** + * 对 DAG 对象的某些属性进行增删和修改 + * @param dag 工作流对象 + * @return 处理后的工作流对象 + */ + DAG transformDAGProperties(DAG dag); + + /** + * 对工作流描述符字符串进行处理,增删和修改某些属性 + * @param descriptor 工作流描述 + * @return 处理后的工作流描述 + */ + String transformDescriptor(String descriptor); +} diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContext.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContext.java new file mode 100644 index 000000000..bfd723112 --- /dev/null +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContext.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies; + +import com.weibo.rill.flow.olympicene.core.model.dag.DAG; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Map; + +@Slf4j +@Component("dagProcessStrategyContext") +// 策略模式上下文,管理和保存 Strategy 对象,实现具体策略行为的执行 +public class DAGProcessStrategyContext { + @Resource + private Map strategies; + + // 默认处理策略,返回原数据 + public static final String DEFAULT_STRATEGY = "defaultDAGProcessStrategy"; + // 供用户编辑使用的策略,上行时添加系统所需的额外属性,下行时去除相应的额外属性 + public static final String CUSTOM_STRATEGY = "customDAGProcessStrategy"; + + public DAG transformDAGProperties(DAG dag, String strategyName) { + DAGProcessStrategy strategy = strategies.get(strategyName); + if (strategy == null) { + log.warn("strategy {} not found on storage", strategyName); + strategy = strategies.get(DEFAULT_STRATEGY); + } + return strategy.transformDAGProperties(dag); + } + + public String transformDescriptor(String descriptor, String strategyName) { + DAGProcessStrategy strategy = strategies.get(strategyName); + if (strategy == null) { + log.warn("strategy {} not found on retrieval", strategyName); + strategy = strategies.get(DEFAULT_STRATEGY); + } + return strategy.transformDescriptor(descriptor); + } +} diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategy.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategy.java new file mode 100644 index 000000000..63d647dd2 --- /dev/null +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategy.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies; + +import com.weibo.rill.flow.olympicene.core.model.dag.DAG; +import org.springframework.stereotype.Component; + +@Component(DAGProcessStrategyContext.DEFAULT_STRATEGY) +public class DefaultDAGProcessStrategy implements DAGProcessStrategy { + @Override + public DAG transformDAGProperties(DAG dag) { + return dag; + } + + @Override + public String transformDescriptor(String descriptor) { + return descriptor; + } +} \ No newline at end of file diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy index 32afb66be..62e26f8f1 100644 --- a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy @@ -63,7 +63,8 @@ class OlympiceneFacadeTest extends Specification { dagSubmitChecker.getCheckConfig(_) >> null dagSubmitChecker.check(*_) >> null - descriptorManager.getDagDescriptor(*_) >> null + descriptorManager.getDAG(*_) >> dag + descriptorManager.getDescriptor(*_) >> null dagStringParser.parse(_) >> dag } diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGResourceStatisticTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGResourceStatisticTest.groovy index f472488da..0a7c017ff 100755 --- a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGResourceStatisticTest.groovy +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGResourceStatisticTest.groovy @@ -1,21 +1,37 @@ -package com.weibo.rill.flow.service.statistic - -import com.alibaba.fastjson.JSONObject -import spock.lang.Specification -import spock.lang.Unroll - -class DAGResourceStatisticTest extends Specification { - DAGResourceStatistic statistic = new DAGResourceStatistic() - - @Unroll - def "test getRetryIntervalSeconds"() { - when: - JSONObject urlRet1 = new JSONObject(Map.of("data", "message")) - JSONObject urlRet2 = new JSONObject(Map.of("data", Map.of("sys_info", Map.of("retry_interval_seconds", 100)))) - JSONObject urlRet3 = new JSONObject(Map.of("error_detail", Map.of("retry_interval_seconds", 100))) - then: - statistic.getRetryIntervalSeconds(urlRet1) == 0 - statistic.getRetryIntervalSeconds(urlRet2) == 100 - statistic.getRetryIntervalSeconds(urlRet3) == 100 - } -} +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.statistic + +import com.alibaba.fastjson.JSONObject +import spock.lang.Specification +import spock.lang.Unroll + +class DAGResourceStatisticTest extends Specification { + DAGResourceStatistic statistic = new DAGResourceStatistic() + + @Unroll + def "test getRetryIntervalSeconds"() { + when: + JSONObject urlRet1 = new JSONObject(Map.of("data", "message")) + JSONObject urlRet2 = new JSONObject(Map.of("data", Map.of("sys_info", Map.of("retry_interval_seconds", 100)))) + JSONObject urlRet3 = new JSONObject(Map.of("error_detail", Map.of("retry_interval_seconds", 100))) + then: + statistic.getRetryIntervalSeconds(urlRet1) == 0 + statistic.getRetryIntervalSeconds(urlRet2) == 100 + statistic.getRetryIntervalSeconds(urlRet3) == 100 + } +} diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGSubmitCheckerTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGSubmitCheckerTest.groovy index 3da30ad3e..4d7987394 100755 --- a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGSubmitCheckerTest.groovy +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/statistic/DAGSubmitCheckerTest.groovy @@ -1,46 +1,62 @@ -package com.weibo.rill.flow.service.statistic - -import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager -import com.weibo.rill.flow.olympicene.storage.exception.StorageException -import com.weibo.rill.flow.service.dconfs.BizDConfs -import spock.lang.Specification - -class DAGSubmitCheckerTest extends Specification { - SwitcherManager switcherManager = Mock(SwitcherManager) - BizDConfs bizDConfs = Mock(BizDConfs) - DAGSubmitChecker dagSubmitChecker = new DAGSubmitChecker(switcherManagerImpl: switcherManager, bizDConfs: bizDConfs) - - def "test checkDAGInfoLength when switcher is off"() { - given: - switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> false - expect: - dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) - } - - def "test checkDAGInfoLength when switcher is on and be limited"() { - given: - switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true - bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5] - when: - dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) - then: - thrown StorageException - } - - def "test checkDAGInfoLength when switcher is on"() { - given: - switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true - bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5] - expect: - dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", null) - dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) - } - - def "test checkDAGInfoLength when switcher is on and dconfs return null"() { - given: - switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true - bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> null - expect: - dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) - } -} +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.statistic + +import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager +import com.weibo.rill.flow.olympicene.storage.exception.StorageException +import com.weibo.rill.flow.service.dconfs.BizDConfs +import spock.lang.Specification + +class DAGSubmitCheckerTest extends Specification { + SwitcherManager switcherManager = Mock(SwitcherManager) + BizDConfs bizDConfs = Mock(BizDConfs) + DAGSubmitChecker dagSubmitChecker = new DAGSubmitChecker(switcherManagerImpl: switcherManager, bizDConfs: bizDConfs) + + def "test checkDAGInfoLength when switcher is off"() { + given: + switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> false + expect: + dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) + } + + def "test checkDAGInfoLength when switcher is on and be limited"() { + given: + switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true + bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5] + when: + dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) + then: + thrown StorageException + } + + def "test checkDAGInfoLength when switcher is on"() { + given: + switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true + bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5] + expect: + dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", null) + dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) + } + + def "test checkDAGInfoLength when switcher is on and dconfs return null"() { + given: + switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true + bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> null + expect: + dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes]) + } +} diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategyTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategyTest.groovy new file mode 100644 index 000000000..e80c9c379 --- /dev/null +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/CustomDAGProcessStrategyTest.groovy @@ -0,0 +1,452 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies + +import com.weibo.rill.flow.interfaces.model.mapping.Mapping +import com.weibo.rill.flow.interfaces.model.task.BaseTask +import com.weibo.rill.flow.olympicene.core.model.dag.DAG +import com.weibo.rill.flow.olympicene.core.model.task.PassTask +import com.weibo.rill.flow.olympicene.core.runtime.DAGParser +import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser +import com.weibo.rill.flow.olympicene.ddl.serialize.YAMLSerializer +import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.FlowDAGValidator +import com.weibo.rill.flow.olympicene.ddl.validation.task.impl.NotSupportedTaskValidator +import org.junit.platform.commons.util.StringUtils +import spock.lang.Specification + +class CustomDAGProcessStrategyTest extends Specification { + DAGParser dagParser = new DAGStringParser(new YAMLSerializer(), [new FlowDAGValidator([new NotSupportedTaskValidator()])]) + CustomDAGProcessStrategy strategy = new CustomDAGProcessStrategy(dagParser: dagParser) + + /** + * 测试常规 input 的解析与 inputMappings 及 outputMappings 的生成 + */ + def "test transformDescriptor"() { + given: + String descriptor = "workspace: default\n" + + "dagName: testGenerateOutputMappings\n" + + "alias: release\n" + + "type: flow\n" + + "tasks:\n" + + " - name: functionA\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " pattern: task_sync\n" + + " next: functionB\n" + + " input:\n" + + " body.world: hello\n" + + " resourceProtocol: http\n" + + " - name: functionB\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " input:\n" + + " body.datax.y: \$.functionA.datax.y\n" + + " body.datax.y.z: \$.functionA.datax.y.z\n" + + " body.dataz: \$.functionA.dataz[\"a.b\"]\n" + + " body.datax.a: \$.functionA.datax.a\n" + + " body.datay.hello: \$.functionA.datay.hello\n" + + " body.id: \$.functionA.objs[0].id\n" + + " body.x: \$.functionA.objs[1].x\n" + + " body.hello.id: \$.context.hello.objs[0].id\n" + + " body.world:\n" + + " transform: return \"hello world\";\n" + + " resourceProtocol: http\n" + + " pattern: task_sync\n" + DAG dag = dagParser.parse(descriptor) + when: + strategy.transformDAGProperties(dag) + then: + assert dag.getTasks().size() == 2 + for (BaseTask task : dag.getTasks()) { + if (task.getName() == "functionA") { + Set outputMappings = new HashSet<>(task.getOutputMappings()) + assert outputMappings.size() == 7 + assert outputMappings.contains(new Mapping("\$.output.datax.a", "\$.context.functionA.datax.a")) + assert outputMappings.contains(new Mapping("\$.output.datax.y", "\$.context.functionA.datax.y")) + assert outputMappings.contains(new Mapping("\$.output.datay.hello", "\$.context.functionA.datay.hello")) + assert outputMappings.contains(new Mapping("\$.output.objs[0].id", "\$.context.functionA.objs[0].id")) + assert outputMappings.contains(new Mapping("\$.output.objs[1].x", "\$.context.functionA.objs[1].x")) + assert outputMappings.contains(new Mapping("\$.output.dataz['a.b']", "\$.context.functionA.dataz['a.b']")) + } else { + HashSet inputMappings = new HashSet<>(task.getInputMappings()) + assert inputMappings.size() == 9 + assert inputMappings.contains(new Mapping("\$.context.functionA.datax.y", "\$.input.body.datax.y")) + assert inputMappings.contains(new Mapping("\$.context.functionA.datax.y.z", "\$.input.body.datax.y.z")) + assert inputMappings.contains(new Mapping("\$.context.functionA.dataz[\"a.b\"]", "\$.input.body.dataz")) + assert inputMappings.contains(new Mapping("\$.context.hello.objs[0].id", "\$.input.body.hello.id")) + assert inputMappings.contains(new Mapping("\$.context.functionA.objs[1].x", "\$.input.body.x")) + Mapping inputMapping = new Mapping(); + inputMapping.setTransform("return \"hello world\";") + inputMapping.setTarget("\$.input.body.world") + assert inputMappings.contains(inputMapping) + } + } + } + + /** + * 生成 jsonpath 包含数组情况的 inputMappings 与 outputMappings 的生成 + */ + def "test processDAG when include *"() { + given: + String descriptor = "workspace: default\n" + + "dagName: testGenerateOutputMappings\n" + + "alias: release\n" + + "type: flow\n" + + "tasks:\n" + + " - name: functionA\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " pattern: task_sync\n" + + " next: functionB\n" + + " input:\n" + + " body.world: hello\n" + + " resourceProtocol: http\n" + + " - name: functionB\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " input:\n" + + " body.elements: \$.functionA.elements.*.name\n" + + " body.first_id: \$.functionA.elements[0].id\n" + + " resourceProtocol: http\n" + + " pattern: task_sync\n" + DAG dag = dagParser.parse(descriptor) + when: + strategy.transformDAGProperties(dag) + then: + assert dag.getTasks().size() == 2 + for (BaseTask task : dag.getTasks()) { + if (task.getName() == "functionA") { + Set outputMappings = new HashSet<>(task.getOutputMappings()) + assert outputMappings.size() == 2 + assert outputMappings.contains(new Mapping("\$.output.elements[*].name", "\$.context.functionA.elements[*].name")) + assert outputMappings.contains(new Mapping("\$.output.elements[0].id", "\$.context.functionA.elements[0].id")) + } else { + HashSet inputMappings = new HashSet<>(task.getInputMappings()) + inputMappings.size() == 2 + assert inputMappings.contains(new Mapping("\$.context.functionA.elements.*.name", "\$.input.body.elements")) + assert inputMappings.contains(new Mapping("\$.context.functionA.elements[0].id", "\$.input.body.first_id")) + } + } + } + + /** + * 测试 outputMappings 已经存在指定项的情况 + */ + def 'test transformDescriptor when target exists'() { + given: + String descriptor = "workspace: default\n" + + "dagName: testGenerateOutputMappings\n" + + "alias: release\n" + + "type: flow\n" + + "tasks:\n" + + " - name: functionA\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " pattern: task_sync\n" + + " next: functionB\n" + + " input:\n" + + " body.world: hello\n" + + " resourceProtocol: http\n" + + " outputMappings:\n" + + " - source: \$.output.id\n" + + " target: \$.context.functionA.id\n" + + " - name: functionB\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " input:\n" + + " body.functionA.id: \$.functionA.id\n" + + " resourceProtocol: http\n" + + " pattern: task_sync\n" + DAG dag = dagParser.parse(descriptor) + when: + strategy.transformDAGProperties(dag) + then: + assert dag.getTasks().size() == 2 + for (BaseTask task : dag.getTasks()) { + if (task.getName() == "functionA") { + Set outputMappings = new HashSet<>(task.getOutputMappings()) + assert outputMappings.size() == 1 + assert outputMappings.contains(new Mapping("\$.output.id", "\$.context.functionA.id")) + } else { + HashSet inputMappings = new HashSet<>(task.getInputMappings()) + inputMappings.size() == 1 + assert inputMappings.contains(new Mapping("\$.context.functionA.id", "\$.input.body.functionA.id")) + } + } + + } + + /** + * 测试下发时处理 descriptor 的情况 + */ + def "testTransformDescriptor"() { + given: + String descriptor = "workspace: \"default\"\n" + + "dagName: \"testGenerateOutputMappings\"\n" + + "type: \"flow\"\n" + + "tasks:\n" + + "- name: \"functionA\"\n" + + " category: \"function\"\n" + + " next: \"functionB\"\n" + + " tolerance: false\n" + + " resourceName: \"http://test.url\"\n" + + " resourceProtocol: \"http\"\n" + + " pattern: \"task_sync\"\n" + + " inputMappings:\n" + + " - source: \"hello\"\n" + + " target: \"\$.input.body.world\"\n" + + " outputMappings:\n" + + " - source: \"\$.output.datay.hello\"\n" + + " target: \"\$.context.functionA.datay.hello\"\n" + + " - source: \"\$.output.datax\"\n" + + " target: \"\$.context.functionA.datax\"\n" + + " - source: \"\$.output.dataz['a.b']\"\n" + + " target: \"\$.context.functionA.dataz['a.b']\"\n" + + " - source: \"\$.output.objs\"\n" + + " target: \"\$.context.functionA.objs\"\n" + + " requestType: \"POST\"\n" + + " input:\n" + + " body.world: \"hello\"\n" + + " key_callback: false\n" + + "- name: \"functionB\"\n" + + " category: \"function\"\n" + + " tolerance: false\n" + + " resourceName: \"http://test.url\"\n" + + " resourceProtocol: \"http\"\n" + + " pattern: \"task_sync\"\n" + + " inputMappings:\n" + + " - source: \"\$.context.functionA.datax.y\"\n" + + " target: \"\$.input.body.datax.y\"\n" + + " - source: \"\$.context.functionA.dataz[\\\"a.b\\\"]\"\n" + + " target: \"\$.input.body.dataz\"\n" + + " - source: \"\$.context.functionA.datax.a\"\n" + + " target: \"\$.input.body.datax.a\"\n" + + " - source: \"\$.context.functionA.datay.hello\"\n" + + " target: \"\$.input.body.datay.hello\"\n" + + " - source: \"\$.context.functionA.objs.0.id\"\n" + + " target: \"\$.input.body.id\"\n" + + " - source: \"\$.context.hello.objs.0.id\"\n" + + " target: \"\$.input.body.hello.id\"\n" + + " - transform: \"return \\\"hello world\\\";\"\n" + + " target: \"\$.input.body.world\"\n" + + " requestType: \"POST\"\n" + + " input:\n" + + " body.datax.y: \"\$.functionA.datax.y\"\n" + + " body.dataz: \"\$.functionA.dataz[\\\"a.b\\\"]\"\n" + + " body.datax.a: \"\$.functionA.datax.a\"\n" + + " body.datay.hello: \"\$.functionA.datay.hello\"\n" + + " body.id: \"\$.functionA.objs.0.id\"\n" + + " body.hello.id: \"\$.context.hello.objs.0.id\"\n" + + " body.world:\n" + + " transform: \"return \\\"hello world\\\";\"\n" + when: + String newDescriptor = strategy.transformDescriptor(descriptor) + DAG newDag = dagParser.parse(newDescriptor) + then: + newDag.getTasks().forEach(it -> { + assert it.inputMappings == null + assert it.outputMappings == null + }) + } + + /** + * 测试 dag 输出节点的生成以及对应 outputMappings 的生成 + */ + def "test dag output"() { + given: + String descriptor = "workspace: default\n" + + "dagName: testGenerateOutputMappings\n" + + "alias: release\n" + + "type: flow\n" + + "tasks:\n" + + " - name: functionA\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " pattern: task_sync\n" + + " next: functionB\n" + + " input:\n" + + " body.world: hello\n" + + " resourceProtocol: http\n" + + " - name: functionB\n" + + " category: function\n" + + " resourceName: http://test.url\n" + + " requestType: POST\n" + + " input:\n" + + " body.datax.y: \$.functionA.datax.y\n" + + " body.dataz: \$.functionA.dataz[\"a.b\"]\n" + + " body.datax.a: \$.functionA.datax.a\n" + + " body.datay.hello: \$.functionA.datay.hello\n" + + " body.id: \$.functionA.objs.0.id\n" + + " body.hello.id: \$.context.hello.objs.0.id\n" + + " body.world:\n" + + " transform: return \"hello world\";\n" + + " resourceProtocol: http\n" + + " pattern: task_sync\n" + + "output:\n" + + " end.x: \$.functionB.output.x\n" + + " end.y: \$.functionB.output.y\n" + + " end.as: \$.functionA.objs.*\n" + DAG dag = dagParser.parse(descriptor) + when: + strategy.transformDAGProperties(dag) + then: + assert dag.getTasks().size() == 3 + assert StringUtils.isNotBlank(dag.getEndTaskName()) + dag.getTasks().forEach {task -> { + Set outputMappings = new HashSet<>(task.getOutputMappings()) + if (task.getName().equals("functionA")) { + assert outputMappings.size() == 6 + assert outputMappings.contains(new Mapping("\$.output.objs[*]", "\$.context.functionA.objs[*]")) + assert task.next.equals("functionB," + dag.getEndTaskName()) || task.next.equals(dag.getEndTaskName() + ",functionB") + } else if (task.getName().equals("functionB")) { + assert outputMappings.size() == 2 + assert outputMappings.contains(new Mapping("\$.output.output.x", "\$.context.functionB.output.x")) + assert outputMappings.contains(new Mapping("\$.output.output.y", "\$.context.functionB.output.y")) + assert task.getNext().equals(dag.getEndTaskName()) + } else { + assert task instanceof PassTask + assert task.getName() == dag.getEndTaskName() + Set inputMappings = new HashSet<>(task.getInputMappings()) + assert inputMappings.size() == 3 + assert inputMappings.contains(new Mapping("\$.context.functionB.output.x", "\$.input.end.x")) + assert inputMappings.contains(new Mapping("\$.context.functionB.output.y", "\$.input.end.y")) + assert inputMappings.contains(new Mapping("\$.context.functionA.objs.*", "\$.input.end.as")) + assert outputMappings.size() == 3 + assert outputMappings.contains(new Mapping("\$.input.end.x", "\$.context.end.x")) + assert outputMappings.contains(new Mapping("\$.input.end.y", "\$.context.end.y")) + assert outputMappings.contains(new Mapping("\$.input.end.as", "\$.context.end.as")) + } + }} + } + + /** + * 测试 dag 包含 output 时 descriptor 的下发 + */ + def "test get descriptor with output"() { + given: + String descriptor = "workspace: \"default\"\n" + + "dagName: \"testGenerateOutputMappings\"\n" + + "type: \"flow\"\n" + + "tasks:\n" + + "- !\n" + + " name: \"functionA\"\n" + + " category: \"function\"\n" + + " next: \"functionB,endPassTask20241018\"\n" + + " tolerance: false\n" + + " resourceName: \"http://test.url\"\n" + + " resourceProtocol: \"http\"\n" + + " pattern: \"task_sync\"\n" + + " inputMappings:\n" + + " - source: \"hello\"\n" + + " target: \"\$.input.body.world\"\n" + + " outputMappings:\n" + + " - source: \"\$.output.datay.hello\"\n" + + " target: \"\$.context.functionA.datay.hello\"\n" + + " - source: \"\$.output.datax\"\n" + + " target: \"\$.context.functionA.datax\"\n" + + " - source: \"\$.output.dataz['a.b']\"\n" + + " target: \"\$.context.functionA.dataz['a.b']\"\n" + + " - source: \"\$.output.objs\"\n" + + " target: \"\$.context.functionA.objs\"\n" + + " requestType: \"POST\"\n" + + " input:\n" + + " body.world: \"hello\"\n" + + " key_callback: false\n" + + "- !\n" + + " name: \"functionB\"\n" + + " category: \"function\"\n" + + " next: \"endPassTask20241018\"\n" + + " tolerance: false\n" + + " resourceName: \"http://test.url\"\n" + + " resourceProtocol: \"http\"\n" + + " pattern: \"task_sync\"\n" + + " inputMappings:\n" + + " - source: \"\$.context.functionA.datax.y\"\n" + + " target: \"\$.input.body.datax.y\"\n" + + " - source: \"\$.context.functionA.dataz[\\\"a.b\\\"]\"\n" + + " target: \"\$.input.body.dataz\"\n" + + " - source: \"\$.context.functionA.datax.a\"\n" + + " target: \"\$.input.body.datax.a\"\n" + + " - source: \"\$.context.functionA.datay.hello\"\n" + + " target: \"\$.input.body.datay.hello\"\n" + + " - source: \"\$.context.functionA.objs.0.id\"\n" + + " target: \"\$.input.body.id\"\n" + + " - source: \"\$.context.hello.objs.0.id\"\n" + + " target: \"\$.input.body.hello.id\"\n" + + " - transform: \"return \\\"hello world\\\";\"\n" + + " target: \"\$.input.body.world\"\n" + + " outputMappings:\n" + + " - source: \"\$.output.output\"\n" + + " target: \"\$.context.functionB.output\"\n" + + " requestType: \"POST\"\n" + + " input:\n" + + " body.datax.y: \"\$.functionA.datax.y\"\n" + + " body.dataz: \"\$.functionA.dataz[\\\"a.b\\\"]\"\n" + + " body.datax.a: \"\$.functionA.datax.a\"\n" + + " body.datay.hello: \"\$.functionA.datay.hello\"\n" + + " body.id: \"\$.functionA.objs.0.id\"\n" + + " body.hello.id: \"\$.context.hello.objs.0.id\"\n" + + " body.world:\n" + + " transform: \"return \\\"hello world\\\";\"\n" + + " key_callback: false\n" + + "- !\n" + + " name: \"endPassTask20241018\"\n" + + " category: \"pass\"\n" + + " inputMappings:\n" + + " - source: \"\$.context.functionB.output.x\"\n" + + " target: \"\$.input.end.x\"\n" + + " - source: \"\$.context.functionB.output.y\"\n" + + " target: \"\$.input.end.y\"\n" + + " - source: \"\$.context.functionA.objs.*\"\n" + + " target: \"\$.input.end.as\"\n" + + " outputMappings:\n" + + " - source: \"\$.input.end.x\"\n" + + " target: \"\$.context.end.x\"\n" + + " - source: \"\$.input.end.y\"\n" + + " target: \"\$.context.end.y\"\n" + + " - source: \"\$.input.end.as\"\n" + + " target: \"\$.context.end.as\"\n" + + " input:\n" + + " end.x: \"\$.functionB.output.x\"\n" + + " end.y: \"\$.functionB.output.y\"\n" + + " end.as: \"\$.functionA.objs.*\"\n" + + " tolerance: false\n" + + " key_callback: false\n" + + "output:\n" + + " end.x: \"\$.functionB.output.x\"\n" + + " end.y: \"\$.functionB.output.y\"\n" + + " end.as: \"\$.functionA.objs.*\"\n" + + "end_task_name: \"endPassTask20241018\"\n" + when: + String resultDescriptor = strategy.transformDescriptor(descriptor) + DAG dag = dagParser.parse(resultDescriptor) + then: + assert dag.tasks.size() == 2 + assert dag.getEndTaskName() == null + dag.tasks.forEach {task -> { + assert task.getInputMappings() == null + assert task.getOutputMappings() == null + }} + } +} diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContextTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContextTest.groovy new file mode 100644 index 000000000..9ec429483 --- /dev/null +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DAGProcessStrategyContextTest.groovy @@ -0,0 +1,89 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies + +import com.weibo.rill.flow.olympicene.core.model.dag.DAG +import spock.lang.Specification + +class DAGProcessStrategyContextTest extends Specification { + + DAGProcessStrategyContext strategyContext + DAGProcessStrategy defaultStrategy + DAGProcessStrategy customStrategy + + def setup() { + strategyContext = new DAGProcessStrategyContext() + defaultStrategy = Mock(DAGProcessStrategy) + customStrategy = Mock(DAGProcessStrategy) + strategyContext.strategies = [ + (DAGProcessStrategyContext.DEFAULT_STRATEGY): defaultStrategy, + (DAGProcessStrategyContext.CUSTOM_STRATEGY): customStrategy + ] + } + + def "processDAG should use the specified strategy when it exists"() { + given: + DAG inputDag = new DAG() + DAG outputDag = new DAG() + + when: + def result = strategyContext.transformDAGProperties(inputDag, DAGProcessStrategyContext.CUSTOM_STRATEGY) + + then: + 1 * customStrategy.transformDAGProperties(inputDag) >> outputDag + result == outputDag + } + + def "processDAG should use the default strategy when specified strategy doesn't exist"() { + given: + DAG inputDag = new DAG() + DAG outputDag = new DAG() + + when: + def result = strategyContext.transformDAGProperties(inputDag, "nonExistentStrategy") + + then: + 1 * defaultStrategy.transformDAGProperties(inputDag) >> outputDag + result == outputDag + } + + def "transformDescriptor should use the specified strategy when it exists"() { + given: + String descriptor = "testDescriptor" + String processedDescriptor = "processedTestDescriptor" + + when: + def result = strategyContext.transformDescriptor(descriptor, DAGProcessStrategyContext.CUSTOM_STRATEGY) + + then: + 1 * customStrategy.transformDescriptor(descriptor) >> processedDescriptor + result == processedDescriptor + } + + def "transformDescriptor should use the default strategy when specified strategy doesn't exist"() { + given: + String descriptor = "testDescriptor" + String processedDescriptor = "processedTestDescriptor" + + when: + def result = strategyContext.transformDescriptor(descriptor, "nonExistentStrategy") + + then: + 1 * defaultStrategy.transformDescriptor(descriptor) >> processedDescriptor + result == processedDescriptor + } +} diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategyTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategyTest.groovy new file mode 100644 index 000000000..c25afdba1 --- /dev/null +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/strategies/DefaultDAGProcessStrategyTest.groovy @@ -0,0 +1,47 @@ +/* + * Copyright 2021-2023 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weibo.rill.flow.service.strategies + +import com.weibo.rill.flow.olympicene.core.model.dag.DAG +import spock.lang.Specification + +class DefaultDAGProcessStrategyTest extends Specification { + + def defaultStrategy = new DefaultDAGProcessStrategy() + + def 'test processDAG method'() { + given: + def inputDAG = Mock(DAG) + + when: + def result = defaultStrategy.transformDAGProperties(inputDAG) + + then: + result == inputDAG + } + + def 'test transformDescriptor method'() { + given: + def inputDescriptor = "test descriptor" + + when: + def result = defaultStrategy.transformDescriptor(inputDescriptor) + + then: + result == inputDescriptor + } +}