Skip to content

Commit

Permalink
[INLONG-9300][Agent] Divide data time into source time and sink time (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 16, 2023
1 parent ab3f2cc commit f782760
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,26 @@ public void setInstanceId(String instanceId) {
set(TaskConstants.INSTANCE_ID, instanceId);
}

public void setDataTime(String dataTime) {
set(TaskConstants.JOB_DATA_TIME, dataTime);
public void setSourceDataTime(String dataTime) {
set(TaskConstants.SOURCE_DATA_TIME, dataTime);
}

public String getDataTime() {
return get(TaskConstants.JOB_DATA_TIME);
public String getSourceDataTime() {
return get(TaskConstants.SOURCE_DATA_TIME);
}

public void setSinkDataTime(Long dataTime) {
setLong(TaskConstants.SINK_DATA_TIME, dataTime);
}

public Long getSinkDataTime() {
return getLong(TaskConstants.SINK_DATA_TIME, 0);
}

@Override
public int compareTo(InstanceProfile object) {
int ret = ComparisonChain.start()
.compare(getDataTime(), object.getDataTime())
.compare(getSourceDataTime(), object.getSourceDataTime())
.compare(FileUtils.getFileCreationTime(getInstanceId()),
FileUtils.getFileCreationTime(object.getInstanceId()))
.compare(FileUtils.getFileLastModifyTime(getInstanceId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.pojo.TaskProfileDto;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.TimeZone;

import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
Expand All @@ -35,6 +41,7 @@
public class TaskProfile extends AbstractConfiguration {

private static final Gson GSON = new Gson();
private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);

/**
* Get a TaskProfile from a DataConfig
Expand All @@ -58,6 +65,10 @@ public String getTimeOffset() {
return get(TaskConstants.TASK_FILE_TIME_OFFSET);
}

public String getTimeZone() {
return get(TaskConstants.TASK_FILE_TIME_ZONE);
}

public TaskStateEnum getState() {
return TaskStateEnum.getTaskState(getInt(TASK_STATE));
}
Expand Down Expand Up @@ -111,7 +122,16 @@ public InstanceProfile createInstanceProfile(String instanceClass, String fileNa
InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
instanceProfile.setDataTime(dataTime);
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
try {
sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, getCycleUnit(),
TimeZone.getTimeZone(getTimeZone()));
} catch (ParseException e) {
logger.error("createInstanceProfile error: ", e);
return null;
}
instanceProfile.setSinkDataTime(sinkDataTime);
instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
instanceProfile.setState(InstanceStateEnum.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
Expand Down Expand Up @@ -179,8 +180,11 @@ public class TaskConstants extends CommonConstants {
// job delivery time
public static final String JOB_DELIVERY_TIME = "job.deliveryTime";

// job time reading file
public static final String JOB_DATA_TIME = "job.dataTime";
// data time reading file
public static final String SOURCE_DATA_TIME = "source.dataTime";

// data time for sink
public static final String SINK_DATA_TIME = "sink.dataTime";

// job of the number of seconds to wait before starting the task
public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
try {
dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class FileTask {
private Long startTime;
private Long endTime;
private String timeOffset;
private String timeZone;
private String addictiveString;
private String collectType;
private Line line;
Expand Down Expand Up @@ -109,6 +110,8 @@ public static class FileTaskConfig {
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
// Asia/Shanghai
private String timeZone;
// For example: a=b&c=b&e=f
private String additionalAttr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ private static FileTask getFileJob(DataConfig dataConfig) {
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
}
if (taskConfig.getTimeZone() != null) {
fileTask.setTimeZone(taskConfig.getTimeZone());
}

if (taskConfig.getAdditionalAttr() != null) {
fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
// GMT-8:00 same with Asia/Shanghai
fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("D");
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void teardown() {
public void testInstanceManager() {
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
String instanceId = profile.getInstanceId();
InstanceAction action = new InstanceAction();
action.setActionType(ActionType.ADD);
Expand All @@ -87,7 +87,7 @@ public void testInstanceManager() {

// test continue
profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
action = new InstanceAction();
action.setActionType(ActionType.ADD);
action.setProfile(profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void init(InstanceProfile profile) {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(),
profile.get(TASK_CYCLE_UNIT));
try {
registerMeta(profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
// GMT-8:00 same with Asia/Shanghai
fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("D");
fileTaskConfig.setRetry(retry);
Expand Down

0 comments on commit f782760

Please sign in to comment.