Skip to content

Commit

Permalink
Ability to retry a task on timeout (#53)
Browse files Browse the repository at this point in the history
* Ability to retry a task on timeout
* Fix issues with control message on retry
* Integration test to test basic restart scenarios
  • Loading branch information
Ankit Nanglia authored Jul 3, 2019
1 parent 72e5c2d commit 288775a
Show file tree
Hide file tree
Showing 29 changed files with 451 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM openjdk:8u151-jre-alpine

ENV KRONOS_VERSION 3.0.0-RC1
ENV KRONOS_VERSION 3.0.0-RC2
ENV KRONOS_HOME /home/kronos-${KRONOS_VERSION}
ENV MODE all

Expand Down
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>kronos</artifactId>
<groupId>com.cognitree</groupId>
<version>3.0.0-RC1</version>
<version>3.0.0-RC2</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos</artifactId>
<groupId>com.cognitree</groupId>
<version>3.0.0-RC1</version>
<version>3.0.0-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
69 changes: 69 additions & 0 deletions app/src/test/java/com/cognitree/kronos/ApplicationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.cognitree.kronos;

import com.cognitree.kronos.scheduler.JobService;
import com.cognitree.kronos.scheduler.ServiceTest;
import com.cognitree.kronos.scheduler.model.Job;
import com.cognitree.kronos.scheduler.model.WorkflowTrigger;
import com.cognitree.kronos.scheduler.store.StoreService;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

import static com.cognitree.kronos.TestUtil.scheduleWorkflow;
import static com.cognitree.kronos.TestUtil.waitForJobsToTriggerAndComplete;

public class ApplicationTest extends ServiceTest {

@Test
public void testSchedulerDown() throws Exception {
final StoreService storeService = (StoreService) ServiceProvider.getService(StoreService.class.getSimpleName());
if (!storeService.isPersistent()) {
return;
}
final WorkflowTrigger workflowTriggerOne = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
final WorkflowTrigger workflowTriggerTwo = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
SCHEDULER_APP.stop();
Thread.sleep(1000);
SCHEDULER_APP.start();

waitForJobsToTriggerAndComplete(workflowTriggerOne);
waitForJobsToTriggerAndComplete(workflowTriggerTwo);

JobService jobService = JobService.getService();
final List<Job> workflowOneJobs = jobService.get(workflowTriggerOne.getNamespace());
Assert.assertEquals(1, workflowOneJobs.size());
Assert.assertNotNull(jobService.get(workflowOneJobs.get(0)));
Assert.assertFalse(workflowOneJobs.stream().anyMatch(t -> t.getStatus() != Job.Status.SUCCESSFUL));

final List<Job> workflowTwoJobs = jobService.get(workflowTriggerTwo.getNamespace());
Assert.assertEquals(1, workflowTwoJobs.size());
Assert.assertNotNull(jobService.get(workflowTwoJobs.get(0)));
}


@Test
public void testExecutorDown() throws Exception {
final StoreService storeService = (StoreService) ServiceProvider.getService(StoreService.class.getSimpleName());
if (!storeService.isPersistent()) {
return;
}
EXECUTOR_APP.stop();
final WorkflowTrigger workflowTriggerOne = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
final WorkflowTrigger workflowTriggerTwo = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
Thread.sleep(1000);
EXECUTOR_APP.start();

waitForJobsToTriggerAndComplete(workflowTriggerOne);
waitForJobsToTriggerAndComplete(workflowTriggerTwo);

JobService jobService = JobService.getService();
final List<Job> workflowOneJobs = jobService.get(workflowTriggerOne.getNamespace());
Assert.assertEquals(1, workflowOneJobs.size());
Assert.assertNotNull(jobService.get(workflowOneJobs.get(0)));

final List<Job> workflowTwoJobs = jobService.get(workflowTriggerTwo.getNamespace());
Assert.assertEquals(1, workflowTwoJobs.size());
Assert.assertNotNull(jobService.get(workflowTwoJobs.get(0)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cognitree.kronos.executor.handlers;

import com.cognitree.kronos.executor.model.TaskResult;
import com.cognitree.kronos.model.Task;
import com.cognitree.kronos.model.TaskId;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MockRetryTaskHandler implements TaskHandler {
private static final Logger logger = LoggerFactory.getLogger(MockRetryTaskHandler.class);

private static final List<TaskId> tasks = Collections.synchronizedList(new ArrayList<>());
private boolean abort = false;
private Task task;

public static boolean isHandled(TaskId taskId) {
return tasks.contains(taskId);
}

@Override
public void init(Task task, ObjectNode config) {
this.task = task;
}

@Override
public TaskResult execute() {
logger.info("Received request to execute task {}", task);
tasks.add(task);
while (!abort && task.getRetryCount() < 2) { // the task should pass on the third retry
try {
Thread.sleep(50);
} catch (InterruptedException e) {
logger.error("Thread has been interrupted");
}
}
return TaskResult.SUCCESS;
}

@Override
public void abort() {
abort = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public static void init() throws IOException, ServiceException {
QUEUE_SERVICE.init();
QUEUE_SERVICE.start();
// initial call so that the topics are created
QUEUE_SERVICE.consumeTask(TASK_TYPE_A, 1);
QUEUE_SERVICE.consumeTask(TASK_TYPE_B, 1);
QUEUE_SERVICE.consumeTasks(TASK_TYPE_A, 1);
QUEUE_SERVICE.consumeTasks(TASK_TYPE_B, 1);
QUEUE_SERVICE.consumeTaskStatusUpdates();
QUEUE_SERVICE.consumeControlMessages();
}
Expand Down Expand Up @@ -192,7 +192,7 @@ private List<TaskStatusUpdate> getTaskStatusUpdate() throws ServiceException {
private List<Task> getTasks(String taskType, int size) throws ServiceException {
int count = 10;
while (count > 0) {
List<Task> tasks = QUEUE_SERVICE.consumeTask(taskType, size);
List<Task> tasks = QUEUE_SERVICE.consumeTasks(taskType, size);
if (!tasks.isEmpty()) {
return tasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package com.cognitree.kronos.scheduler;

import com.cognitree.kronos.ServiceProvider;
import com.cognitree.kronos.executor.handlers.MockAbortTaskHandler;
import com.cognitree.kronos.executor.handlers.MockFailureTaskHandler;
import com.cognitree.kronos.executor.handlers.MockRetryTaskHandler;
import com.cognitree.kronos.executor.handlers.MockSuccessTaskHandler;
import com.cognitree.kronos.model.Messages;
import com.cognitree.kronos.model.Task;
import com.cognitree.kronos.scheduler.model.Job;
import com.cognitree.kronos.scheduler.model.JobId;
import com.cognitree.kronos.scheduler.model.WorkflowTrigger;
import com.cognitree.kronos.scheduler.store.StoreService;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -33,7 +36,10 @@

import static com.cognitree.kronos.TestUtil.scheduleWorkflow;
import static com.cognitree.kronos.TestUtil.waitForJobsToTriggerAndComplete;
import static com.cognitree.kronos.TestUtil.waitForTaskToBeRunning;
import static com.cognitree.kronos.TestUtil.waitForTriggerToComplete;
import static com.cognitree.kronos.model.Task.Status.ABORTED;
import static com.cognitree.kronos.model.Task.Status.SKIPPED;

public class JobServiceTest extends ServiceTest {

Expand Down Expand Up @@ -167,7 +173,7 @@ public void testGetJobTasksFailedDueToTimeout() throws Exception {
Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
break;
case "taskThree":
Assert.assertEquals(Task.Status.ABORTED, task.getStatus());
Assert.assertEquals(Task.Status.TIMED_OUT, task.getStatus());
Assert.assertEquals(Messages.TIMED_OUT_EXECUTING_TASK_MESSAGE, task.getStatusMessage());
Assert.assertTrue(MockAbortTaskHandler.isHandled(task.getIdentity()));
break;
Expand All @@ -177,6 +183,41 @@ public void testGetJobTasksFailedDueToTimeout() throws Exception {
}
}

@Test
public void testGetJobTasksFailedDueToTimeoutWithRetry() throws Exception {
final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_TIMEOUT_TASKS_WITH_RETRY_YAML);

waitForJobsToTriggerAndComplete(workflowTrigger);

JobService jobService = JobService.getService();
final List<Job> workflowOneJobs = jobService.get(workflowTrigger.getNamespace(), workflowTrigger.getWorkflow(),
workflowTrigger.getName(), 0, System.currentTimeMillis());
Assert.assertEquals(1, workflowOneJobs.size());

final Job job = workflowOneJobs.get(0);
final List<Task> tasks = jobService.getTasks(job);
Assert.assertEquals(3, tasks.size());
for (Task task : tasks) {
switch (task.getName()) {
case "taskOne":
Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
break;
case "taskTwo":
Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
Assert.assertEquals(2, task.getRetryCount());
Assert.assertTrue(MockRetryTaskHandler.isHandled(task.getIdentity()));
break;
case "taskThree":
Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
break;
default:
Assert.fail();
}
}
}

@Test
public void testGetJobTasksFailedDueToHandler() throws Exception {
final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_FAILED_HANDLER_YAML);
Expand All @@ -203,7 +244,7 @@ public void testGetJobTasksFailedDueToHandler() throws Exception {
Assert.assertTrue(MockFailureTaskHandler.isHandled(task.getIdentity()));
break;
case "taskThree":
Assert.assertEquals(Task.Status.SKIPPED, task.getStatus());
Assert.assertEquals(SKIPPED, task.getStatus());
Assert.assertEquals(Messages.FAILED_DEPENDEE_TASK_MESSAGE, task.getStatusMessage());
break;
default:
Expand All @@ -229,6 +270,10 @@ public void testAbortJob() throws Exception {

waitForTriggerToComplete(workflowTrigger, WorkflowSchedulerService.getService().getScheduler());

Task taskOne = TaskService.getService().get(workflowTrigger.getNamespace())
.stream().filter(task -> task.getName().equals("taskOne")).findFirst().get();
waitForTaskToBeRunning(taskOne);

List<Job> jobs = JobService.getService().get(workflowTrigger.getNamespace());
JobService.getService().abortJob(jobs.get(0).getIdentity());

Expand All @@ -237,11 +282,12 @@ public void testAbortJob() throws Exception {
for (Task tsk : tasks) {
switch (tsk.getName()) {
case "taskOne":
Assert.assertEquals(Task.Status.ABORTED, tsk.getStatus());
Assert.assertEquals(ABORTED, tsk.getStatus());
Assert.assertTrue(MockAbortTaskHandler.isHandled(tsk.getIdentity()));
break;
case "taskTwo":
Assert.assertEquals(Task.Status.ABORTED, tsk.getStatus());
Assert.assertTrue((tsk.getStatus() == ABORTED) ||
(tsk.getStatus() == SKIPPED));
break;
default:
Assert.fail();
Expand All @@ -252,6 +298,22 @@ public void testAbortJob() throws Exception {
JobService.getService().get(workflowTrigger.getNamespace()).get(0).getStatus());
}

@Test(expected = ValidationException.class)
public void testAbortJobWithTaskInScheduledState() throws Exception {
final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
waitForJobsToTriggerAndComplete(workflowTrigger);

List<Job> jobs = JobService.getService().get(workflowTrigger.getNamespace());
Assert.assertFalse(jobs.isEmpty());
StoreService storeService = (StoreService) ServiceProvider.getService(StoreService.class.getSimpleName());
List<Task> tasks = TaskService.getService().get(workflowTrigger.getNamespace());
// move back the job to running state and task to scheduled state
JobService.getService().updateStatus(jobs.get(0).getIdentity(), Job.Status.RUNNING);
Task task = tasks.get(0);
task.setStatus(Task.Status.SCHEDULED);
storeService.getTaskStore().update(task);
JobService.getService().abortJob(jobs.get(0));
}

@Test
public void testDeleteJob() throws Exception {
Expand Down
25 changes: 13 additions & 12 deletions app/src/test/java/com/cognitree/kronos/scheduler/ServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@
import java.util.List;

public class ServiceTest {
static final String WORKFLOW_TEMPLATE_YAML = "workflows/workflow-template.yaml";
static final String WORKFLOW_TEMPLATE_TIMEOUT_TASKS_YAML = "workflows/workflow-template-timeout-tasks.yaml";
static final String WORKFLOW_TEMPLATE_FAILED_HANDLER_YAML = "workflows/workflow-template-failed-handler.yaml";
static final String INVALID_WORKFLOW_MISSING_TASKS_TEMPLATE_YAML = "workflows/invalid-workflow-missing-tasks-template.yaml";
static final String INVALID_WORKFLOW_DISABLED_TASKS_TEMPLATE_YAML = "workflows/invalid-workflow-disabled-tasks-template.yaml";
static final String WORKFLOW_TEMPLATE_ABORT_TASKS_YAML = "workflows/workflow-template-abort-tasks.yaml";
static final String WORKFLOW_TEMPLATE_WITH_TASK_CONTEXT_YAML = "workflows/workflow-template-with-task-context.yaml";
static final String WORKFLOW_TEMPLATE_WITH_PROPERTIES_YAML = "workflows/workflow-template-with-properties.yaml";
static final String WORKFLOW_TEMPLATE_WITH_DUPLICATE_POLICY_YAML = "workflows/workflow-template-with-duplicate-policy.yaml";
protected static final SchedulerApp SCHEDULER_APP = new SchedulerApp();
protected static final ExecutorApp EXECUTOR_APP = new ExecutorApp();
protected static final String WORKFLOW_TEMPLATE_YAML = "workflows/workflow-template.yaml";
protected static final String WORKFLOW_TEMPLATE_TIMEOUT_TASKS_YAML = "workflows/workflow-template-timeout-tasks.yaml";
protected static final String WORKFLOW_TEMPLATE_TIMEOUT_TASKS_WITH_RETRY_YAML = "workflows/workflow-template-timeout-tasks-with-retry.yaml";
protected static final String WORKFLOW_TEMPLATE_FAILED_HANDLER_YAML = "workflows/workflow-template-failed-handler.yaml";
protected static final String INVALID_WORKFLOW_MISSING_TASKS_TEMPLATE_YAML = "workflows/invalid-workflow-missing-tasks-template.yaml";
protected static final String INVALID_WORKFLOW_DISABLED_TASKS_TEMPLATE_YAML = "workflows/invalid-workflow-disabled-tasks-template.yaml";
protected static final String WORKFLOW_TEMPLATE_ABORT_TASKS_YAML = "workflows/workflow-template-abort-tasks.yaml";
protected static final String WORKFLOW_TEMPLATE_WITH_TASK_CONTEXT_YAML = "workflows/workflow-template-with-task-context.yaml";
protected static final String WORKFLOW_TEMPLATE_WITH_PROPERTIES_YAML = "workflows/workflow-template-with-properties.yaml";
protected static final String WORKFLOW_TEMPLATE_WITH_DUPLICATE_POLICY_YAML = "workflows/workflow-template-with-duplicate-policy.yaml";

private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());
private static final SchedulerApp SCHEDULER_APP = new SchedulerApp();
private static final ExecutorApp EXECUTOR_APP = new ExecutorApp();
private static final List<Namespace> EXISTING_NAMESPACE = new ArrayList<>();

@BeforeClass
Expand All @@ -69,7 +70,7 @@ private static void createTopics() throws ServiceException, java.io.IOException
executorConfig.getTaskHandlerConfig().forEach((type, taskHandlerConfig) -> {
try {
QueueService.getService(QueueService.EXECUTOR_QUEUE)
.consumeTask(type, 0);
.consumeTasks(type, 0);
} catch (ServiceException e) {
// do nothing
}
Expand Down
Loading

0 comments on commit 288775a

Please sign in to comment.