Skip to content

Commit

Permalink
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Browse files Browse the repository at this point in the history
…low into task_input_type
  • Loading branch information
zeyu10 committed Oct 25, 2024
2 parents 87700b3 + db25f4d commit 8d61a80
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.resource.BaseResource;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorPO {
private String descriptor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorVO {
private String descriptor;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ObjectMapperFactory {
}

private static final YAMLFactory YAML_FACTORY = new YAMLFactory()
.configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false)
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
.configure(YAMLGenerator.Feature.USE_NATIVE_OBJECT_ID, false);
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(YAML_FACTORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@
* limitations under the License.
*/

package com.weibo.rill.flow.service.strategies;
package com.weibo.rill.flow.service.converter;

import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import org.springframework.stereotype.Component;
import com.weibo.rill.flow.olympicene.core.model.dag.DescriptorPO;
import com.weibo.rill.flow.olympicene.core.model.dag.DescriptorVO;

@Component(DAGProcessStrategyContext.DEFAULT_STRATEGY)
public class DefaultDAGProcessStrategy implements DAGProcessStrategy {
@Override
public DAG transformDAGProperties(DAG dag) {
return dag;
}

@Override
public String transformDescriptor(String descriptor) {
return descriptor;
}
}
public interface DAGDescriptorConverter {
DAG convertDescriptorVOToDAG(DescriptorVO descriptorVO);
DAG convertDescriptorPOToDAG(DescriptorPO descriptorPO);
DescriptorVO convertDAGToDescriptorVO(DAG dag);
DescriptorPO convertDAGToDescriptorPO(DAG dag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.weibo.rill.flow.service.strategies;
package com.weibo.rill.flow.service.converter;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.LinkedHashMultimap;
Expand All @@ -23,34 +23,43 @@
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import com.weibo.rill.flow.olympicene.core.model.dag.DescriptorPO;
import com.weibo.rill.flow.olympicene.core.model.dag.DescriptorVO;
import com.weibo.rill.flow.olympicene.core.model.task.PassTask;
import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

@Component(DAGProcessStrategyContext.CUSTOM_STRATEGY)
public class CustomDAGProcessStrategy implements DAGProcessStrategy {

@Component
public class DAGDescriptorConverterImpl implements DAGDescriptorConverter {
private static final String CONTEXT_PREFIX = "$.context.";
private static final String INPUT_PREFIX = "$.input.";
private static final String DAG_END_TASK_NAME = "endPassTask";

@Resource
@Autowired
private DAGStringParser dagParser;

/**
* 设置 DAG 时的处理
* @param dag 待处理的DAG对象
*/
@Override
public DAG transformDAGProperties(DAG dag) {
public DAG convertDescriptorPOToDAG(DescriptorPO descriptorPO) {
return dagParser.parse(descriptorPO.getDescriptor());
}

@Override
public DescriptorPO convertDAGToDescriptorPO(DAG dag) {
String descriptor = dagParser.serialize(dag);
return new DescriptorPO(descriptor);
}

@Override
public DAG convertDescriptorVOToDAG(DescriptorVO descriptorVO) {
DAG dag = dagParser.parse(descriptorVO.getDescriptor());
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);
// 1. 处理 task 的 input 以及 dag 的 output,为任务生成原始的 inputMappings(input 中的来源直接作为 source,如 $.functionA.data.id)
// 返回是否需要后续处理,不需要后续处理则直接返回
Expand All @@ -66,6 +75,34 @@ public DAG transformDAGProperties(DAG dag) {
return dag;
}

@Override
public DescriptorVO convertDAGToDescriptorVO(DAG dag) {
// 1. 解析 descriptor,获取 taskName 到 task 的映射 map,并判断是否需要后续处理
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);
if (!needsPostProcessing(dag, taskMap)) {
return new DescriptorVO(dagParser.serialize(dag));
}

// 2. 对非结束节点的任务进行处理,包括 inputMappings、outputMappings 等的处理,同时在任务列表中删除 DAG 结束节点
List<BaseTask> tasks = taskMap.values().stream().filter(task -> !task.getName().equals(dag.getEndTaskName()))
.map(task -> processTask(task, dag.getEndTaskName())).toList();

// 3. 重新序列化生成 descriptor
dag.setTasks(tasks);
dag.setEndTaskName(null);
return new DescriptorVO(dagParser.serialize(dag));
}

/**
* DAG获取任务名称到任务的映射
*/
private Map<String, BaseTask> getTaskMapByDag(DAG dag) {
if (CollectionUtils.isEmpty(dag.getTasks())) {
return Maps.newHashMap();
}
return dag.getTasks().stream().collect(Collectors.toMap(BaseTask::getName, Function.identity()));
}

/**
* 处理各个任务的 inputMappings,实现 inputMappings 的填充,返回 inputMappings 对应的元素列表的列表
* @param dag DAG对象
Expand Down Expand Up @@ -134,16 +171,6 @@ private void updateTaskNext(BaseTask task, String taskName) {
}
}

/**
* DAG获取任务名称到任务的映射
*/
private Map<String, BaseTask> getTaskMapByDag(DAG dag) {
if (CollectionUtils.isEmpty(dag.getTasks())) {
return Maps.newHashMap();
}
return dag.getTasks().stream().collect(Collectors.toMap(BaseTask::getName, Function.identity()));
}

/**
* 将 inputMapping 中的 source 解析为 element 数组,如 $["functionA"]["data"]["ids"], 则返回 ["functionA", "data", "ids"]
*/
Expand Down Expand Up @@ -283,28 +310,6 @@ private PassTask generateEndPassTask(DAG dag, Map<String, BaseTask> taskMap) {
return endPassTask;
}

/**
* 处理获取描述符时的逻辑
*/
@Override
public String transformDescriptor(String descriptor) {
// 1. 解析 descriptor,获取 taskName 到 task 的映射 map,并判断是否需要后续处理
DAG dag = dagParser.parse(descriptor);
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);
if (!needsPostProcessing(dag, taskMap)) {
return descriptor;
}

// 2. 对非结束节点的任务进行处理,包括 inputMappings、outputMappings 等的处理,同时在任务列表中删除 DAG 结束节点
List<BaseTask> tasks = taskMap.values().stream().filter(task -> !task.getName().equals(dag.getEndTaskName()))
.map(task -> processTask(task, dag.getEndTaskName())).toList();

// 3. 重新序列化生成 descriptor
dag.setTasks(tasks);
dag.setEndTaskName(null);
return dagParser.serialize(dag);
}

/**
* 判断是否需要后续处理
* @return 如果DAG有结束任务或任何一个任务有 input 配置,则返回true,否则返回false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.weibo.rill.flow.common.exception.TaskException;
import com.weibo.rill.flow.common.model.BizError;
import com.weibo.rill.flow.interfaces.model.resource.Resource;
import com.weibo.rill.flow.olympicene.core.model.dag.DescriptorVO;
import com.weibo.rill.flow.olympicene.core.model.event.DAGDescriptorEvent;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.service.manager.DescriptorManager;
Expand Down Expand Up @@ -178,7 +179,8 @@ public Map<String, Object> addDescriptor(String identity, String businessId, Str
if (StringUtils.isNotEmpty(descriptor)) {
dagSubmitChecker.checkDAGInfoLengthByBusinessId(businessId, List.of(descriptor.getBytes(StandardCharsets.UTF_8)));
}
String descriptorId = descriptorManager.createDAGDescriptor(businessId, featureName, alias, descriptor);
DescriptorVO descriptorVO = new DescriptorVO(descriptor);
String descriptorId = descriptorManager.createDAGDescriptor(businessId, featureName, alias, descriptorVO);

Map<String, String> attachments = Maps.newHashMap();
String attachmentName = String.format("descriptor-%s_%s_%s.txt", businessId, featureName, alias);
Expand Down Expand Up @@ -207,15 +209,15 @@ public Map<String, Object> getDescriptor(Long uidOriginal, Map<String, Object> i
.map(it -> Long.parseLong(String.valueOf(it)))
.orElse(0L)
);
String descriptor = descriptorManager.getDescriptor(uid, input, descriptorId);
DescriptorVO descriptorVO = descriptorManager.getDescriptorVO(uid, input, descriptorId);
return ImmutableMap.of(DESCRIPTOR_ID, descriptorId,
"uid", String.valueOf(uid),
DESCRIPTOR, descriptor);
DESCRIPTOR, descriptorVO.getDescriptor());
}

public JSONObject getDescriptor(String descriptorId) {
String descriptor = descriptorManager.getDescriptor(null, null, descriptorId);
JSONObject descriptorObject = yamlToJson(descriptor);
DescriptorVO descriptorVO = descriptorManager.getDescriptorVO(null, null, descriptorId);
JSONObject descriptorObject = yamlToJson(descriptorVO.getDescriptor());
if (descriptorObject == null) {
log.warn("descriptorId:{} descriptor is null", descriptorId);
throw new TaskException(BizError.ERROR_DATA_FORMAT, "descriptor is null: " + descriptorId);
Expand Down
Loading

0 comments on commit 8d61a80

Please sign in to comment.