From eeb86a69e560d864f846a761da7d93b83adbc970 Mon Sep 17 00:00:00 2001 From: fudongying Date: Fri, 30 Jun 2023 20:10:39 +0800 Subject: [PATCH 1/3] fix: time out of range Signed-off-by: fudongying --- .../jobscheduler/scheduler/JobScheduler.java | 13 ++++++++- .../scheduler/JobSchedulerTests.java | 27 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java index afbcf4ce..faf4f495 100644 --- a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java +++ b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java @@ -148,7 +148,18 @@ boolean reschedule( log.info("No next execution time for job {}", jobParameter.getName()); return true; } - Duration duration = Duration.between(this.clock.instant(), nextExecutionTime); + Instant now = this.clock.instant(); + Duration duration = Duration.between(now, nextExecutionTime); + if (duration.toNanos() < 0) { + log.info( + "job {} expect time: {} < current time: {}, set next excute time to current", + jobParameter.getName(), + nextExecutionTime.toEpochMilli(), + now.toEpochMilli() + ); + nextExecutionTime = now; + duration = Duration.ZERO; + } // Too many jobs start at the same time point will bring burst. Add random jitter delay to spread out load. // Example, if interval is 10 minutes, jitter is 0.6, next job run will be randomly delayed by 0 to 10*0.6 minutes. diff --git a/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java b/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java index 51d4c5f1..a6597b33 100644 --- a/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java +++ b/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java @@ -143,6 +143,33 @@ public void testReschedule_noEnableTime() { Assert.assertFalse(this.scheduler.reschedule(jobParameter, null, null, dummyVersion, jitterLimit)); } + public void testReschedule_outOfExpectTime() { + Schedule schedule = Mockito.mock(Schedule.class); + ScheduledJobParameter jobParameter = buildScheduledJobParameter( + "job-id", + "dummy job name", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + schedule, + false, + 0.6 + ); + JobSchedulingInfo jobSchedulingInfo = new JobSchedulingInfo("job-index", "job-id", jobParameter); + Instant now = Instant.now(); + jobSchedulingInfo.setDescheduled(false); + + Mockito.when(schedule.getNextExecutionTime(Mockito.any())) + .thenReturn(now.minus(10, ChronoUnit.MINUTES)) + .thenReturn(now.plus(2, ChronoUnit.MINUTES)); + + Scheduler.ScheduledCancellable cancellable = Mockito.mock(Scheduler.ScheduledCancellable.class); + Mockito.when(this.threadPool.schedule(Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(cancellable); + + Assert.assertTrue(this.scheduler.reschedule(jobParameter, jobSchedulingInfo, null, dummyVersion, jitterLimit)); + Assert.assertEquals(cancellable, jobSchedulingInfo.getScheduledCancellable()); + Mockito.verify(this.threadPool).schedule(Mockito.any(), Mockito.any(), Mockito.anyString()); + } + public void testReschedule_jobDescheduled() { Schedule schedule = Mockito.mock(Schedule.class); ScheduledJobParameter jobParameter = buildScheduledJobParameter( From 6defabce645d82edcef8962c03830f2d474e63c5 Mon Sep 17 00:00:00 2001 From: fudongying Date: Mon, 3 Jul 2023 15:52:07 +0800 Subject: [PATCH 2/3] fix: deschedule failed after schedule exception Signed-off-by: fudongying --- .../opensearch/jobscheduler/scheduler/JobScheduler.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java index faf4f495..ad015f07 100644 --- a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java +++ b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java @@ -119,13 +119,10 @@ public boolean deschedule(String indexName, String id) { jobInfo.setExpectedPreviousExecutionTime(null); Scheduler.ScheduledCancellable scheduledCancellable = jobInfo.getScheduledCancellable(); - if (scheduledCancellable != null) { - if (scheduledCancellable.cancel()) { - this.scheduledJobInfo.removeJob(indexName, id); - } else { - return false; - } + if (scheduledCancellable != null && scheduledCancellable.cancel() == false) { + return false; } + this.scheduledJobInfo.removeJob(indexName, id); return true; } From c7fadeec5be348a97c1eb4932fa35e776f1ca892 Mon Sep 17 00:00:00 2001 From: fudongying Date: Mon, 10 Jul 2023 10:32:13 +0800 Subject: [PATCH 3/3] chore: dbwiddis's comments Signed-off-by: fudongying --- .../org/opensearch/jobscheduler/scheduler/JobScheduler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java index ad015f07..21219ee6 100644 --- a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java +++ b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java @@ -119,7 +119,7 @@ public boolean deschedule(String indexName, String id) { jobInfo.setExpectedPreviousExecutionTime(null); Scheduler.ScheduledCancellable scheduledCancellable = jobInfo.getScheduledCancellable(); - if (scheduledCancellable != null && scheduledCancellable.cancel() == false) { + if (scheduledCancellable != null && !scheduledCancellable.cancel()) { return false; } this.scheduledJobInfo.removeJob(indexName, id); @@ -147,9 +147,9 @@ boolean reschedule( } Instant now = this.clock.instant(); Duration duration = Duration.between(now, nextExecutionTime); - if (duration.toNanos() < 0) { + if (duration.isNegative()) { log.info( - "job {} expect time: {} < current time: {}, set next excute time to current", + "job {} expected time: {} < current time: {}, setting next execute time to current", jobParameter.getName(), nextExecutionTime.toEpochMilli(), now.toEpochMilli()