-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Fix/129/quartz jobs miss fired problem (#131)
- Loading branch information
1 parent
20b242d
commit f8fe911
Showing
12 changed files
with
248 additions
and
244 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobDataProviderSurrogate.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
namespace KafkaFlow.Retry.IntegrationTests.PollingTests | ||
{ | ||
using System.Collections.Generic; | ||
using global::KafkaFlow.Retry.Durable.Definitions.Polling; | ||
using global::KafkaFlow.Retry.Durable.Polling; | ||
using Quartz; | ||
|
||
internal class JobDataProviderSurrogate : IJobDataProvider | ||
{ | ||
public JobDataProviderSurrogate(string schedulerId, PollingDefinition pollingDefinition, ITrigger trigger, List<IJobExecutionContext> jobExecutionContexts) | ||
{ | ||
this.PollingDefinition = pollingDefinition; | ||
|
||
this.Trigger = trigger; | ||
this.TriggerName = this.GetTriggerName(schedulerId); | ||
|
||
this.JobExecutionContexts = jobExecutionContexts; | ||
this.JobDetail = this.CreateJobDetail(); | ||
} | ||
|
||
public IJobDetail JobDetail { get; } | ||
|
||
public List<IJobExecutionContext> JobExecutionContexts { get; } | ||
|
||
public PollingDefinition PollingDefinition { get; } | ||
|
||
public ITrigger Trigger { get; } | ||
|
||
public string TriggerName { get; } | ||
|
||
private IJobDetail CreateJobDetail() | ||
{ | ||
var dataMap = new JobDataMap { { "JobExecution", this.JobExecutionContexts } }; | ||
|
||
return JobBuilder | ||
.Create<JobSurrogate>() | ||
.SetJobData(dataMap) | ||
.Build(); | ||
} | ||
|
||
private string GetTriggerName(string schedulerId) | ||
{ | ||
return $"pollingJobTrigger_{schedulerId}_{this.PollingDefinition.PollingJobType}"; | ||
} | ||
} | ||
} |
17 changes: 17 additions & 0 deletions
17
src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
namespace KafkaFlow.Retry.IntegrationTests.PollingTests | ||
{ | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using Quartz; | ||
|
||
internal class JobSurrogate : IJob | ||
{ | ||
public Task Execute(IJobExecutionContext context) | ||
{ | ||
var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List<IJobExecutionContext>; | ||
jobExecutionContexts.Add(context); | ||
|
||
return Task.CompletedTask; | ||
} | ||
} | ||
} |
171 changes: 171 additions & 0 deletions
171
src/KafkaFlow.Retry.IntegrationTests/PollingTests/QueueTrackerCoordinatorTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
namespace KafkaFlow.Retry.IntegrationTests.PollingTests | ||
{ | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using FluentAssertions; | ||
using global::KafkaFlow.Retry.Durable.Definitions.Polling; | ||
using global::KafkaFlow.Retry.Durable.Polling; | ||
using Moq; | ||
using Quartz; | ||
using Xunit; | ||
|
||
public class QueueTrackerCoordinatorTests | ||
{ | ||
private readonly Mock<IJobDataProvidersFactory> mockJobDataProvidersFactory; | ||
private readonly ITriggerProvider triggerProvider; | ||
|
||
public QueueTrackerCoordinatorTests() | ||
{ | ||
this.triggerProvider = new TriggerProvider(); | ||
|
||
this.mockJobDataProvidersFactory = new Mock<IJobDataProvidersFactory>(); | ||
} | ||
|
||
[Fact] | ||
public async Task QueueTrackerCoordinator_ForceMisfireJob_SuccessWithCorrectScheduledFiredTimes() | ||
{ | ||
// arrange | ||
var schedulerId = "MisfiredJobsDoesNothing"; | ||
var jobExecutionContexts = new List<IJobExecutionContext>(); | ||
|
||
var waitForScheduleInSeconds = 5; | ||
var jobActiveTimeInSeconds = 8; | ||
var pollingInSeconds = 2; | ||
|
||
var cronExpression = $"*/{pollingInSeconds} * * ? * * *"; | ||
|
||
var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, cronExpression, jobExecutionContexts); | ||
|
||
this.mockJobDataProvidersFactory | ||
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>())) | ||
.Returns(new[] { retryDurableJobDataProvider }); | ||
|
||
var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId); | ||
|
||
// act | ||
|
||
Thread.Sleep(waitForScheduleInSeconds * 1000); | ||
|
||
await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>()); | ||
|
||
Thread.Sleep(jobActiveTimeInSeconds * 1000); | ||
|
||
await queueTrackerCoordinator.UnscheduleJobsAsync(); | ||
|
||
// assert | ||
var scheduledFiredTimes = jobExecutionContexts | ||
.Where(ctx => ctx.ScheduledFireTimeUtc.HasValue) | ||
.Select(ctx => ctx.ScheduledFireTimeUtc.Value) | ||
.OrderBy(x => x) | ||
.ToList(); | ||
|
||
var currentScheduledFiredTime = scheduledFiredTimes.First(); | ||
var otherScheduledFiredTimes = scheduledFiredTimes.Skip(1).ToList(); | ||
|
||
foreach (var scheduledFiredTime in otherScheduledFiredTimes) | ||
{ | ||
currentScheduledFiredTime.AddSeconds(pollingInSeconds).Should().Be(scheduledFiredTime); | ||
|
||
currentScheduledFiredTime = scheduledFiredTime; | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task QueueTrackerCoordinator_ScheduleAndUnscheduleDifferentJobs_Success() | ||
{ | ||
// arrange | ||
var schedulerId = "twoJobsSchedulerId"; | ||
var jobExecutionContexts = new List<IJobExecutionContext>(); | ||
|
||
var timePollingActiveInSeconds = 4; | ||
|
||
var retryDurableCronExpression = "*/2 * * ? * * *"; | ||
var cleanupCronExpression = "*/4 * * ? * * *"; | ||
|
||
var retryDurableMinExpectedJobsFired = 2; | ||
var retryDurableMaxExpectedJobsFired = 3; | ||
var cleanupMinExpectedJobsFired = 1; | ||
var cleanupMaxExpectedJobsFired = 2; | ||
|
||
var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, retryDurableCronExpression, jobExecutionContexts); | ||
var cleanupJobDataProvider = this.CreateCleanupJobDataProvider(schedulerId, cleanupCronExpression, jobExecutionContexts); | ||
|
||
this.mockJobDataProvidersFactory | ||
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>())) | ||
.Returns(new[] { retryDurableJobDataProvider, cleanupJobDataProvider }); | ||
|
||
var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId); | ||
|
||
// act | ||
await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>()); | ||
|
||
Thread.Sleep(timePollingActiveInSeconds * 1000); | ||
|
||
await queueTrackerCoordinator.UnscheduleJobsAsync(); | ||
|
||
// assert | ||
jobExecutionContexts.Where(ctx => !ctx.PreviousFireTimeUtc.HasValue).Should().HaveCount(2); | ||
|
||
var retryDurableFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == retryDurableJobDataProvider.TriggerName); | ||
var cleanupFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == cleanupJobDataProvider.TriggerName); | ||
|
||
retryDurableFiresContexts | ||
.Should() | ||
.HaveCountGreaterThanOrEqualTo(retryDurableMinExpectedJobsFired) | ||
.And | ||
.HaveCountLessThanOrEqualTo(retryDurableMaxExpectedJobsFired); | ||
|
||
retryDurableFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue); | ||
|
||
cleanupFiresContexts | ||
.Should() | ||
.HaveCountGreaterThanOrEqualTo(cleanupMinExpectedJobsFired) | ||
.And | ||
.HaveCountLessThanOrEqualTo(cleanupMaxExpectedJobsFired); | ||
|
||
cleanupFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue); | ||
} | ||
|
||
private JobDataProviderSurrogate CreateCleanupJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts) | ||
{ | ||
var cleanupPollingDefinition = | ||
new CleanupPollingDefinition( | ||
enabled: true, | ||
cronExpression: cronExpression, | ||
timeToLiveInDays: 1, | ||
rowsPerRequest: 10 | ||
); | ||
|
||
return this.CreateJobDataProvider(schedulerId, cleanupPollingDefinition, jobExecutionContexts); | ||
} | ||
|
||
private JobDataProviderSurrogate CreateJobDataProvider(string schedulerId, PollingDefinition pollingDefinition, List<IJobExecutionContext> jobExecutionContexts) | ||
{ | ||
var trigger = this.triggerProvider.GetPollingTrigger(schedulerId, pollingDefinition); | ||
|
||
return new JobDataProviderSurrogate(schedulerId, pollingDefinition, trigger, jobExecutionContexts); | ||
} | ||
|
||
private IQueueTrackerCoordinator CreateQueueTrackerCoordinator(string schedulerId) | ||
{ | ||
var queueTrackerFactory = new QueueTrackerFactory(schedulerId, this.mockJobDataProvidersFactory.Object); | ||
|
||
return new QueueTrackerCoordinator(queueTrackerFactory); | ||
} | ||
|
||
private JobDataProviderSurrogate CreateRetryDurableJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts) | ||
{ | ||
var retryDurablePollingDefinition = | ||
new RetryDurablePollingDefinition( | ||
enabled: true, | ||
cronExpression: cronExpression, | ||
fetchSize: 100, | ||
expirationIntervalFactor: 1 | ||
); | ||
|
||
return this.CreateJobDataProvider(schedulerId, retryDurablePollingDefinition, jobExecutionContexts); | ||
} | ||
} | ||
} |
4 changes: 3 additions & 1 deletion
4
src/KafkaFlow.Retry.IntegrationTests/Properties/AssemblyMetadata.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
using System.Diagnostics.CodeAnalysis; | ||
using System.Runtime.CompilerServices; | ||
using Xunit; | ||
|
||
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] | ||
[assembly: ExcludeFromCodeCoverage] | ||
[assembly: ExcludeFromCodeCoverage] | ||
[assembly: CollectionBehavior(DisableTestParallelization = true)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,4 +141,4 @@ internal async Task RetryDurableTest( | |
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.