Skip to content

Commit

Permalink
[INLONG-9194][Agent] Calc time offset failed if the param is "0" (#9195)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 2, 2023
1 parent 58ecec5 commit 1968152
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public void submitTaskProfiles(List<TaskProfile> taskProfiles) {
while (configQueue.size() != 0) {
configQueue.poll();
}
for (int i = 0; i < taskProfiles.size(); i++) {
LOGGER.info("submitTaskProfiles index {} total {} {}", i, taskProfiles.size(),
taskProfiles.get(i).toJsonStr());
}
configQueue.add(taskProfiles);
}

Expand Down Expand Up @@ -187,7 +191,6 @@ private void dealWithActionQueue(BlockingQueue<TaskAction> queue) {
* NEW and STOP only used in manager
*/
private void keepPaceWithManager(List<TaskProfile> taskProfiles) {
LOGGER.info("deal with List<TaskProfile> {}", taskProfiles);
Map<String, TaskProfile> tasksFromManager = new ConcurrentHashMap<>();
taskProfiles.forEach((profile) -> {
TaskStateEnum state = profile.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String
long recoverTime, boolean isRetry) {
String cycleUnit = conf.getCycleUnit();
if (!isRetry) {
failTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
recoverTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
failTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
recoverTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
}

String startTime = NewDateUtils.millSecConvertToTimeStr(failTime, cycleUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void runForNormal() {
private void scanExistingFile() {
originPatterns.forEach((originPattern) -> {
List<BasicFileInfo> fileInfos = scanExistingFileByPattern(originPattern);
LOGGER.debug("scan {} get file count {}", originPattern, fileInfos.size());
LOGGER.info("scan {} get file count {}", originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
});
Expand All @@ -279,7 +279,7 @@ private List<BasicFileInfo> scanExistingFileByPattern(String originPattern) {
if (!retry) {
long currentTime = System.currentTimeMillis();
// only scan two cycle, like two hours or two days
long offset = NewDateUtils.caclOffset("-2" + taskProfile.getCycleUnit());
long offset = NewDateUtils.calcOffset("-2" + taskProfile.getCycleUnit());
startScanTime = currentTime - offset;
endScanTime = currentTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ public static boolean isValidCreationTime(String dataTime, String cycleUnit,

// To handle the offset, add the time offset to the timeout period
if (timeOffset.startsWith("-")) {
timeInterval += caclOffset(timeOffset);
timeInterval += calcOffset(timeOffset);
} else { // Process Backward Offset
timeInterval -= caclOffset(timeOffset);
timeInterval -= calcOffset(timeOffset);
}

return isValidCreationTime(dataTime, timeInterval);
Expand All @@ -240,7 +240,7 @@ public static boolean isValidCreationTime(String dataTime, String cycleUnit,
* @param timeOffset offset,such as -1d,-4h,-10m;
* @return
*/
public static long caclOffset(String timeOffset) {
public static long calcOffset(String timeOffset) {
String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0;
// Default Backward Offset
Expand All @@ -250,8 +250,11 @@ public static long caclOffset(String timeOffset) {
} else if (startIndex == 0) { // Forward offset
symbol = -1;
}
int offsetTime = Integer
.parseInt(timeOffset.substring(startIndex, timeOffset.length() - 1));
String strOffset = timeOffset.substring(startIndex, timeOffset.length() - 1);
if (strOffset.length() == 0) {
return 0;
}
int offsetTime = Integer.parseInt(strOffset);
if ("d".equalsIgnoreCase(offsetUnit)) {
return offsetTime * 24 * 3600 * 1000 * symbol;
} else if ("h".equalsIgnoreCase(offsetUnit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.inlong.agent.plugin.utils;

import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.common.metric.MetricRegister;

import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +44,15 @@ public class TestUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class);
private static final String RECORD = "This is the test line for file\n";

@Test
public void testCalcOffset() {
Assert.assertTrue(NewDateUtils.calcOffset("-1h") == 3600 * 1000);
Assert.assertTrue(NewDateUtils.calcOffset("1D") == -24 * 3600 * 1000);
Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);
}

public static String getTestTriggerProfile() {
return "{\n"
+ " \"job\": {\n"
Expand Down

0 comments on commit 1968152

Please sign in to comment.