diff --git a/Sources/HummingbirdJobs/AsyncAwaitSupport/AsyncJob.swift b/Sources/HummingbirdJobs/AsyncAwaitSupport/AsyncJob.swift deleted file mode 100644 index 439b8585f..000000000 --- a/Sources/HummingbirdJobs/AsyncAwaitSupport/AsyncJob.swift +++ /dev/null @@ -1,33 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore - -/// Job with asynchronous handler -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -public protocol HBAsyncJob: HBJob { - /// Execute job - func execute(logger: Logger) async throws -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension HBAsyncJob { - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - let promise = eventLoop.makePromise(of: Void.self) - promise.completeWithTask { - try await self.execute(logger: logger) - } - return promise.futureResult - } -} diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift index 5dce11a8f..9df845ed6 100644 --- a/Sources/HummingbirdJobs/Job.swift +++ b/Sources/HummingbirdJobs/Job.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2023 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -19,7 +19,7 @@ import NIO /// Protocol for job description /// /// For a job to be decodable, it has to be registered. Call `MyJob.register()` to register a job. -public protocol HBJob: Codable { +public protocol HBJob: Codable, Sendable { /// Unique Job name static var name: String { get } @@ -28,7 +28,7 @@ public protocol HBJob: Codable { /// Execute job /// - Returns: EventLoopFuture that is fulfulled when job is done - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture + func execute(logger: Logger) async throws } extension HBJob { diff --git a/Sources/HummingbirdJobs/JobIdentifier.swift b/Sources/HummingbirdJobs/JobIdentifier.swift new file mode 100644 index 000000000..2dece379a --- /dev/null +++ b/Sources/HummingbirdJobs/JobIdentifier.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2023 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation + +/// Identifier for Job +public struct JobIdentifier: Sendable, CustomStringConvertible, Codable, Hashable { + let id: String + + init() { + self.id = UUID().uuidString + } + + /// Initialize JobIdentifier from String + /// - Parameter value: string value + public init(_ value: String) { + self.id = value + } + + public init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + self.id = try container.decode(String.self) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + try container.encode(self.id) + } + + /// String description of Identifier + public var description: String { self.id } +} diff --git a/Sources/HummingbirdJobs/JobContainer.swift b/Sources/HummingbirdJobs/JobInstance.swift similarity index 89% rename from Sources/HummingbirdJobs/JobContainer.swift rename to Sources/HummingbirdJobs/JobInstance.swift index 3b7c9cc12..c43be9985 100644 --- a/Sources/HummingbirdJobs/JobContainer.swift +++ b/Sources/HummingbirdJobs/JobInstance.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2023 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -15,14 +15,14 @@ import Foundation /// Holder for all data related to a job -public struct HBJobContainer: Codable { +public struct HBJobInstance: Codable { /// Time created public let createdAt: Date /// Job data public let job: HBJob /// Initialize a queue job - init(_ job: HBJob) { + public init(_ job: HBJob) { self.job = job self.createdAt = Date() } @@ -52,7 +52,7 @@ public struct HBQueuedJob: Codable { /// Job id public let id: JobIdentifier /// Job data - public let job: HBJobContainer + public let job: HBJobInstance /// Initialize a queue job public init(_ job: HBJob) { @@ -61,7 +61,7 @@ public struct HBQueuedJob: Codable { } /// Initialize a queue job - public init(id: JobIdentifier, job: HBJobContainer) { + public init(id: JobIdentifier, job: HBJobInstance) { self.job = job self.id = id } diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 4cd57b32a..16ac2e020 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2023 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -16,106 +16,19 @@ import Foundation import Hummingbird import Logging -/// Identifier for Job -public struct JobIdentifier: Sendable, CustomStringConvertible, Codable { - let id: String - - init() { - self.id = UUID().uuidString - } - - /// Initialize JobIdentifier from String - /// - Parameter value: string value - public init(_ value: String) { - self.id = value - } - - public init(from decoder: Decoder) throws { - let container = try decoder.singleValueContainer() - self.id = try container.decode(String.self) - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.singleValueContainer() - try container.encode(self.id) - } - - /// String description of Identifier - public var description: String { self.id } -} - /// Job queue protocol. /// /// Defines how to push and pop jobs off a queue -public protocol HBJobQueue: AnyObject { - /// Process to run at initialisation of Job Queue - /// - Returns: When queue initialisation is finished - func onInit(on: EventLoop) -> EventLoopFuture +public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob { /// Push Job onto queue /// - Returns: Queued job information - func push(_ job: HBJob, on: EventLoop) -> EventLoopFuture - /// Pop job off queue. Future will wait until a job is available - /// - Returns: Queued job information - func pop(on: EventLoop) -> EventLoopFuture + @discardableResult func push(_ job: HBJob) async throws -> JobIdentifier /// This is called to say job has finished processing and it can be deleted - /// - Returns: When deletion of job has finished - func finished(jobId: JobIdentifier, on: EventLoop) -> EventLoopFuture + func finished(jobId: JobIdentifier) async throws + /// This is called to say job has failed to run and should be put aside + func failed(jobId: JobIdentifier, error: any Error) async throws + /// stop serving jobs + func stop() async /// shutdown queue - func shutdown(on: EventLoop) -> EventLoopFuture - /// time amount between each poll of queue - var pollTime: TimeAmount { get } -} - -extension HBJobQueue { - /// Default implememtatoin of `finish`. Does nothing - /// - Parameters: - /// - jobId: Job Identifier - /// - eventLoop: EventLoop to run process on - /// - Returns: When deletion of job has finished. In this case immediately - public func finished(jobId: JobIdentifier, on eventLoop: EventLoop) -> EventLoopFuture { - return eventLoop.makeSucceededVoidFuture() - } - - /// Default implementation of `onInit`. Does nothing - /// - Parameter eventLoop: EventLoop to run process on - /// - Returns: When initialisation has finished. In this case immediately - public func onInit(on eventLoop: EventLoop) -> EventLoopFuture { - return eventLoop.makeSucceededVoidFuture() - } - - /// Default implementation of `shutdown`. Does nothing - public func shutdown(on eventLoop: EventLoop) -> EventLoopFuture { - return eventLoop.makeSucceededVoidFuture() - } - - public var shutdownError: Error { return HBJobQueueShutdownError() } - - /// Push job onto queue - /// - Parameters: - /// - job: Job descriptor - /// - maxRetryCount: Number of times you should retry job - /// - Returns: ID for job - @discardableResult public func enqueue(_ job: HBJob, on eventLoop: EventLoop) -> EventLoopFuture { - return self.push(job, on: eventLoop).map(\.id) - } - - /// Queue workers poll the queue to get the latest jobs off the queue. This indicates the time amount - /// between each poll of the queue - public var pollTime: TimeAmount { .milliseconds(100) } -} - -/// Job queue asynchronous enqueue -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension HBJobQueue { - /// Push job onto queue - /// - Parameters: - /// - job: Job descriptor - /// - maxRetryCount: Number of times you should retry job - /// - Returns: ID for job - @discardableResult public func enqueue(_ job: HBJob, on eventLoop: EventLoop) async throws -> JobIdentifier { - return try await self.push(job, on: eventLoop).map(\.id).get() - } + func shutdownGracefully() async } - -/// Error type for when a job queue is being shutdown -struct HBJobQueueShutdownError: Error {} diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift index f2d338c1e..4d24665cd 100644 --- a/Sources/HummingbirdJobs/JobQueueError.swift +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -17,9 +17,12 @@ public struct JobQueueError: Error, Equatable { /// failed to decode job. Possibly because it hasn't been registered or data that was expected /// is not available public static var decodeJobFailed: Self { .init(.decodeJobFailed) } + /// failed to get job from queue + public static var dequeueError: Self { .init(.dequeueError) } private enum QueueError { case decodeJobFailed + case dequeueError } private let error: QueueError diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index afbd41dcf..b2ba286a6 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -12,89 +12,94 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Hummingbird import Logging +import ServiceLifecycle /// Object handling a single job queue -public final class HBJobQueueHandler { - public init(queue: HBJobQueue, numWorkers: Int, eventLoopGroup: EventLoopGroup, logger: Logger) { +public final class HBJobQueueHandler: Service { + public init(queue: Queue, numWorkers: Int, logger: Logger) { self.queue = queue - self.workers = (0.. EventLoopFuture { - self.queue.enqueue(job, on: eventLoop) + @discardableResult public func enqueue(_ job: HBJob) async throws -> JobIdentifier { + try await self.queue.push(job) } - /// Start queue workers - public func start() { - self.queue.onInit(on: self.eventLoop).whenComplete { _ in - self.workers.forEach { - $0.start() + public func run() async throws { + try await withGracefulShutdownHandler { + try await withThrowingTaskGroup(of: Void.self) { group in + var iterator = self.queue.makeAsyncIterator() + for _ in 0.. EventLoopFuture { - // shutdown all workers - let shutdownFutures: [EventLoopFuture] = self.workers.map { $0.shutdown() } - return EventLoopFuture.andAllComplete(shutdownFutures, on: self.eventLoop).flatMap { _ in - self.queue.shutdown(on: self.eventLoop) + func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob? { + while true { + do { + let job = try await queueIterator.next() + return job + } catch let error as JobQueueError where error == JobQueueError.decodeJobFailed { + self.logger.error("Job failed to decode.") + } } } - private let eventLoop: EventLoop - private let queue: HBJobQueue - private let workers: [HBJobQueueWorker] -} - -/// Job queue handler asynchronous enqueue -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension HBJobQueueHandler { - /// Push job onto queue - /// - Parameters: - /// - job: Job descriptor - /// - maxRetryCount: Number of times you should retry job - /// - Returns: ID for job - @discardableResult public func enqueue(_ job: HBJob, on eventLoop: EventLoop) async throws -> JobIdentifier { - try await self.enqueue(job, on: eventLoop).get() - } - - /// Shutdown queue workers and queue - public func shutdown() async throws { - try await self.shutdown().get() - } -} + func runJob(_ queuedJob: HBQueuedJob) async throws { + var logger = logger + logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) + logger[metadataKey: "hb_job_type"] = .string(String(describing: type(of: queuedJob.job.job))) -/// Job queue id -/// -/// If you want to add a new task queue. Extend this class to include a new id -/// ``` -/// extension HBJobQueueId { -/// public static var `myQueue`: HBJobQueueId { "myQueue" } -/// } -/// ``` -/// and register new queue with tasks handler -/// ``` -/// app.jobs.registerQueue(.myQueue, queue: .memory) -/// ``` -/// If you don't register the queue your application will crash as soon as you try to use it -public struct HBJobQueueId: Hashable, ExpressibleByStringLiteral { - public let id: String - - public init(stringLiteral: String) { - self.id = stringLiteral - } - - public init(_ string: String) { - self.id = string + let job = queuedJob.job + var count = type(of: job.job).maxRetryCount + logger.trace("Starting Job") + while true { + do { + try await job.job.execute(logger: self.logger) + break + } catch let error as CancellationError { + logger.error("Job cancelled") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + if count == 0 { + logger.error("Job failed") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } + count -= 1 + logger.debug("Retrying Job") + } + } + logger.trace("Finished Job") + try await self.queue.finished(jobId: queuedJob.id) } - public static var `default`: HBJobQueueId { "_hb_default_" } + private let queue: Queue + private let numWorkers: Int + let logger: Logger } diff --git a/Sources/HummingbirdJobs/JobQueueWorker.swift b/Sources/HummingbirdJobs/JobQueueWorker.swift deleted file mode 100644 index e242d0be0..000000000 --- a/Sources/HummingbirdJobs/JobQueueWorker.swift +++ /dev/null @@ -1,145 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import NIOCore - -/// Job queue worker. Pops job off the queue and runs it. Once the job is complete goes to -/// pop off a new job from the queue. If no job exists then if will poll the queue at regular -/// intervals until a job is available -class HBJobQueueWorker { - let queue: HBJobQueue - let eventLoop: EventLoop - var promise: EventLoopPromise - var isShutdown: Bool - var logger: Logger - - init(queue: HBJobQueue, eventLoop: EventLoop, logger: Logger) { - self.queue = queue - self.eventLoop = eventLoop - self.promise = self.eventLoop.makePromise() - self.promise.succeed(()) - self.isShutdown = false - self.logger = logger - } - - /// start worker - func start() { - self.executeNextJob() - } - - /// shutdown worker. Waits until current job is complete - func shutdown() -> EventLoopFuture { - return self.eventLoop.flatSubmit { - self.isShutdown = true - return self.promise.futureResult - } - } - - /// execute next job on the queue. Once that job is complete call this function again - func executeNextJob() { - self.eventLoop.execute { - self.pop(on: self.eventLoop) - .whenComplete { result in - guard self.isShutdown == false else { return } - self.promise = self.eventLoop.makePromise() - switch result { - case .success(let queuedJob): - self.executeJob(queuedJob, eventLoop: self.eventLoop, logger: self.logger) - .flatMap { _ in - self.queue.finished(jobId: queuedJob.id, on: self.eventLoop) - } - .whenComplete { _ in - self.promise.succeed(()) - if self.isShutdown == false { - self.executeNextJob() - } - } - case .failure(let error): - switch error { - case is HBJobQueueShutdownError: - break - default: - self.logger.error("Job queue error: \(error)") - } - self.promise.succeed(()) - if self.isShutdown == false { - self.executeNextJob() - } - } - } - } - } - - /// pop job off queue, or wait until a job is available and then pop that job off the queue - func pop(on eventLoop: EventLoop) -> EventLoopFuture { - let promise = eventLoop.makePromise(of: HBQueuedJob.self) - - eventLoop.scheduleRepeatedAsyncTask(initialDelay: .zero, delay: self.queue.pollTime) { task in - guard !self.isShutdown else { - promise.fail(self.queue.shutdownError) - task.cancel() - return eventLoop.makeFailedFuture(self.queue.shutdownError) - } - return self.queue.pop(on: eventLoop) - .map { value in - if let value = value { - promise.succeed(value) - task.cancel() - } - } - .flatMapErrorThrowing { error in - promise.fail(error) - task.cancel() - throw error - } - } - return promise.futureResult - } - - /// execute single job - func executeJob(_ queuedJob: HBQueuedJob, eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - var logger = logger - logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) - logger[metadataKey: "hb_job_type"] = .string(String(describing: type(of: queuedJob.job.job))) - - logger.debug("Executing job") - return self.executeJob(queuedJob, attemptNumber: 0, eventLoop: eventLoop, logger: logger).always { result in - switch result { - case .success: - logger.debug("Completed job") - case .failure: - logger.error("Failed to complete job") - } - } - } - - /// execute single job, retrying if it fails - func executeJob(_ queuedJob: HBQueuedJob, attemptNumber: Int, eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - let job = queuedJob.job.job - return job.execute(on: eventLoop, logger: logger) - .flatMapError { error in - guard attemptNumber < type(of: job).maxRetryCount else { - return eventLoop.makeFailedFuture(error) - } - logger.trace( - "Retrying job", - metadata: [ - "hb_job_attempt": .stringConvertible(attemptNumber + 1), - ] - ) - return self.executeJob(queuedJob, attemptNumber: attemptNumber + 1, eventLoop: eventLoop, logger: logger) - } - } -} diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index b5cc49809..0a5238b2a 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -12,33 +12,31 @@ // //===----------------------------------------------------------------------===// +import Collections import Foundation -import NIOCore /// In memory implementation of job queue driver. Stores jobs in a circular buffer -public class HBMemoryJobQueue: HBJobQueue { - public let eventLoop: EventLoop +public final class HBMemoryJobQueue: HBJobQueue { + public typealias Element = HBQueuedJob /// queue of jobs - var queue: CircularBuffer - /// queue of workers waiting for a new job - var waitingQueue: CircularBuffer> + fileprivate let queue: Internal + private let onFailedJob: @Sendable (HBQueuedJob, any Error) -> Void /// Initialise In memory job queue - /// - Parameter eventLoopGroup: EventLoop to run access to queue - public init(eventLoop: EventLoop) { - self.eventLoop = eventLoop - self.queue = .init(initialCapacity: 16) - self.waitingQueue = .init(initialCapacity: 4) + public init(onFailedJob: @escaping @Sendable (HBQueuedJob, any Error) -> Void = { _, _ in }) { + self.queue = .init() + self.onFailedJob = onFailedJob } /// Shutdown queue - public func shutdown(on eventLoop: EventLoop) -> EventLoopFuture { - self.eventLoop.submit { - self.waitingQueue.forEach { - $0.fail(self.shutdownError) - } - } + public func stop() async { + await self.queue.stop() + } + + /// Shutdown queue + public func shutdownGracefully() async { + await self.queue.shutdown() } /// Push job onto queue @@ -46,39 +44,88 @@ public class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - public func push(_ job: HBJob, on eventLoop: EventLoop) -> EventLoopFuture { - return self.eventLoop.submit { () -> HBQueuedJob in + public func push(_ job: HBJob) async throws -> JobIdentifier { + try await self.queue.push(job) + } + + public func finished(jobId: JobIdentifier) async throws { + await self.queue.clearPendingJob(jobId: jobId) + } + + public func failed(jobId: JobIdentifier, error: any Error) async throws { + if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { + self.onFailedJob(.init(id: jobId, job: job), error) + } + } + + /// Internal actor managing the job queue + fileprivate actor Internal { + var queue: Deque + var pendingJobs: [JobIdentifier: HBJobInstance] + var isStopped: Bool + + init() { + self.queue = .init() + self.isStopped = false + self.pendingJobs = .init() + } + + func push(_ job: HBJob) throws -> JobIdentifier { let queuedJob = HBQueuedJob(job) let jsonData = try JSONEncoder().encode(queuedJob) self.queue.append(jsonData) - if let waiting = self.waitingQueue.popFirst() { - waiting.succeed(()) - } - return queuedJob + return queuedJob.id + } + + func clearPendingJob(jobId: JobIdentifier) { + self.pendingJobs[jobId] = nil + } + + func clearAndReturnPendingJob(jobId: JobIdentifier) -> HBJobInstance? { + let instance = self.pendingJobs[jobId] + self.pendingJobs[jobId] = nil + return instance } - } - /// Pop job off queue - /// - Parameter eventLoop: Eventloop to run process on (ignored in this case) - /// - Returns: Queued Job if available - public func pop(on eventLoop: EventLoop) -> EventLoopFuture { - self.eventLoop.flatSubmit { - if self.waitingQueue.count > 0 || self.queue.count == 0 { - let promise = self.eventLoop.makePromise(of: Void.self) - self.waitingQueue.append(promise) - return promise.futureResult.flatMapThrowing { _ in - let jsonData = self.queue.popFirst()! - return try JSONDecoder().decode(HBQueuedJob.self, from: jsonData) + func next() async throws -> HBQueuedJob? { + while true { + if self.isStopped { + return nil } - } else { - let jsonData = self.queue.popFirst()! - do { - let value = try JSONDecoder().decode(HBQueuedJob.self, from: jsonData) - return self.eventLoop.makeSucceededFuture(value) - } catch { - return self.eventLoop.makeFailedFuture(error) + if let data = queue.popFirst() { + do { + let job = try JSONDecoder().decode(HBQueuedJob.self, from: data) + self.pendingJobs[job.id] = job.job + return job + } catch { + throw JobQueueError.decodeJobFailed + } } + try await Task.sleep(for: .milliseconds(100)) } } + + func stop() { + self.isStopped = true + } + + func shutdown() { + assert(self.pendingJobs.count == 0) + self.isStopped = true + } + } +} + +extension HBMemoryJobQueue { + public struct AsyncIterator: AsyncIteratorProtocol { + fileprivate let queue: Internal + + public func next() async throws -> Element? { + try await self.queue.next() + } + } + + public func makeAsyncIterator() -> AsyncIterator { + .init(queue: self.queue) } } diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 82113f9eb..a4a743918 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -12,324 +12,218 @@ // //===----------------------------------------------------------------------===// -import Hummingbird -@testable import HummingbirdJobs -import HummingbirdXCT +import Atomics +import HummingbirdJobs +import ServiceLifecycle import XCTest -/* - final class HummingbirdJobsTests: XCTestCase { - func testBasic() throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "Jobs Completed") - let value: Int - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - print(self.value) - return eventLoop.scheduleTask(in: .milliseconds(Int64.random(in: 10..<50))) { - Self.expectation.fulfill() - }.futureResult - } - } - TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 10 - - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - - try app.start() - defer { app.stop() } - - app.jobs.queue.enqueue(TestJob(value: 1), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 2), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 3), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 4), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 5), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 6), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 7), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 8), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 9), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 0), on: app.eventLoopGroup.next()) - - wait(for: [TestJob.expectation], timeout: 5) - } - - func testMultipleWorkers() throws { - struct TestJob: HBJob { - static let name = "testMultipleWorkers" - static let expectation = XCTestExpectation(description: "Jobs Completed") - - let value: Int - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - print(self.value) - return eventLoop.scheduleTask(in: .milliseconds(Int64.random(in: 10..<50))) { - Self.expectation.fulfill() - }.futureResult - } - } - TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 10 - - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 4) - - try app.start() - defer { app.stop() } - - app.jobs.queue.enqueue(TestJob(value: 1), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 2), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 3), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 4), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 5), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 6), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 7), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 8), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 9), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob(value: 0), on: app.eventLoopGroup.next()) - - wait(for: [TestJob.expectation], timeout: 5) - } - - func testErrorRetryCount() throws { - struct FailedError: Error {} - - struct TestJob: HBJob { - static let name = "testErrorRetryCount" - static let maxRetryCount = 3 - static let expectation = XCTestExpectation(description: "Jobs Completed") - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - Self.expectation.fulfill() - return eventLoop.makeFailedFuture(FailedError()) - } - } - TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 4 - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) +final class HummingbirdJobsTests: XCTestCase { + /// Helper function for test a server + /// + /// Creates test client, runs test function abd ensures everything is + /// shutdown correctly + public func testJobQueue( + _ jobQueueHandler: HBJobQueueHandler, + _ test: () async throws -> Void + ) async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueueHandler], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: Logger(label: "JobQueueService") + ) + ) + group.addTask { + try await serviceGroup.run() + } + try await test() + await serviceGroup.triggerGracefulShutdown() + } + } - try app.start() - defer { app.stop() } + func testBasic() async throws { + struct TestJob: HBJob { + static let name = "testBasic" + static let expectation = XCTestExpectation(description: "Jobs Completed") - app.jobs.queue.enqueue(TestJob(), on: app.eventLoopGroup.next()) + let value: Int + func execute(logger: Logger) async throws { + print(self.value) + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + Self.expectation.fulfill() + } + } + TestJob.register() + TestJob.expectation.expectedFulfillmentCount = 10 - wait(for: [TestJob.expectation], timeout: 1) - } + let jobQueueHandler = HBJobQueueHandler( + queue: HBMemoryJobQueue(), + numWorkers: 1, + logger: Logger(label: "HummingbirdJobsTests") + ) + try await testJobQueue(jobQueueHandler) { + try await jobQueueHandler.enqueue(TestJob(value: 1)) + try await jobQueueHandler.enqueue(TestJob(value: 2)) + try await jobQueueHandler.enqueue(TestJob(value: 3)) + try await jobQueueHandler.enqueue(TestJob(value: 4)) + try await jobQueueHandler.enqueue(TestJob(value: 5)) + try await jobQueueHandler.enqueue(TestJob(value: 6)) + try await jobQueueHandler.enqueue(TestJob(value: 7)) + try await jobQueueHandler.enqueue(TestJob(value: 8)) + try await jobQueueHandler.enqueue(TestJob(value: 9)) + try await jobQueueHandler.enqueue(TestJob(value: 10)) + + wait(for: [TestJob.expectation], timeout: 5) + } + } - func testSecondQueue() throws { - struct TestJob: HBJob { - static let name = "testSecondQueue" - static let expectation = XCTestExpectation(description: "Jobs Completed") - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - Self.expectation.fulfill() - return eventLoop.makeSucceededVoidFuture() - } - } - TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 1 - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - app.jobs.registerQueue(.test, queue: .memory, numWorkers: 1) + func testMultipleWorkers() async throws { + struct TestJob: HBJob { + static let name = "testBasic" + static let runningJobCounter = ManagedAtomic(0) + static let maxRunningJobCounter = ManagedAtomic(0) + static let expectation = XCTestExpectation(description: "Jobs Completed") - try app.start() - defer { app.stop() } + let value: Int + func execute(logger: Logger) async throws { + let runningJobs = Self.runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > Self.maxRunningJobCounter.load(ordering: .relaxed) { + Self.maxRunningJobCounter.store(runningJobs, ordering: .relaxed) + } + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + print(self.value) + Self.expectation.fulfill() + Self.runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + } + } + TestJob.register() + TestJob.expectation.expectedFulfillmentCount = 10 - app.jobs.queues(.test).enqueue(TestJob(), on: app.eventLoopGroup.next()) + let jobQueueHandler = HBJobQueueHandler( + queue: HBMemoryJobQueue(), + numWorkers: 4, + logger: Logger(label: "HummingbirdJobsTests") + ) + try await testJobQueue(jobQueueHandler) { + try await jobQueueHandler.enqueue(TestJob(value: 1)) + try await jobQueueHandler.enqueue(TestJob(value: 2)) + try await jobQueueHandler.enqueue(TestJob(value: 3)) + try await jobQueueHandler.enqueue(TestJob(value: 4)) + try await jobQueueHandler.enqueue(TestJob(value: 5)) + try await jobQueueHandler.enqueue(TestJob(value: 6)) + try await jobQueueHandler.enqueue(TestJob(value: 7)) + try await jobQueueHandler.enqueue(TestJob(value: 8)) + try await jobQueueHandler.enqueue(TestJob(value: 9)) + try await jobQueueHandler.enqueue(TestJob(value: 10)) + + wait(for: [TestJob.expectation], timeout: 5) + XCTAssertGreaterThan(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 4) + } + } - wait(for: [TestJob.expectation], timeout: 1) - } + func testErrorRetryCount() async throws { + let failedJobCount = ManagedAtomic(0) + struct FailedError: Error {} - func testQueueOutsideApplication() throws { struct TestJob: HBJob { - static let name = "testSecondQueue" + static let name = "testErrorRetryCount" + static let maxRetryCount = 3 static let expectation = XCTestExpectation(description: "Jobs Completed") - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { + func execute(logger: Logger) async throws { Self.expectation.fulfill() - return eventLoop.makeSucceededVoidFuture() + throw FailedError() } } TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 1 - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - let jobQueue = HBMemoryJobQueue(eventLoop: app.eventLoopGroup.any()) - let jobQueueHandler = HBJobQueueHandler(queue: jobQueue, numWorkers: 4, eventLoopGroup: app.eventLoopGroup, logger: app.logger) - app.lifecycle.register( - label: "MyJobQueue", - start: .sync { jobQueueHandler.start() }, - shutdown: .eventLoopFuture { jobQueueHandler.shutdown() } + TestJob.expectation.expectedFulfillmentCount = 4 + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .trace + let jobQueueHandler = HBJobQueueHandler( + queue: HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, + numWorkers: 4, + logger: logger ) + try await testJobQueue(jobQueueHandler) { + try await jobQueueHandler.enqueue(TestJob()) - try app.start() - defer { app.stop() } - - jobQueueHandler.enqueue(TestJob(), on: app.eventLoopGroup.next()) - - wait(for: [TestJob.expectation], timeout: 1) + wait(for: [TestJob.expectation], timeout: 5) + } + XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1) } - func testShutdown() throws { - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - try app.start() - app.stop() - app.wait() - } - - func testShutdownJob() throws { - struct TestJob: HBJob { - static let name = "testShutdownJob" - static var started: Bool = false - static var finished: Bool = false - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - Self.started = true - let job = eventLoop.scheduleTask(in: .milliseconds(500)) { - Self.finished = true - } - return job.futureResult - } - } - TestJob.register() - - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - try app.start() - let job = TestJob() - app.jobs.queue.enqueue(job, on: app.eventLoopGroup.next()) - // stall to give job chance to start running - Thread.sleep(forTimeInterval: 0.1) - app.stop() - app.wait() - - XCTAssertTrue(TestJob.started) - XCTAssertTrue(TestJob.finished) - } - - func testJobSerialization() throws { - struct TestJob: HBJob, Equatable { - static let name = "testJobSerialization" - let value: Int - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - return eventLoop.makeSucceededVoidFuture() - } - } - TestJob.register() - let job = TestJob(value: 2) - let queuedJob = HBJobContainer(job) - let data = try JSONEncoder().encode(queuedJob) - let queuedJob2 = try JSONDecoder().decode(HBJobContainer.self, from: data) - XCTAssertEqual(queuedJob2.job as? TestJob, job) - } - - /// test job fails to decode but queue continues to process - func testFailToDecode() throws { - struct TestJob1: HBJob { - static let name = "testFailToDecode" - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - return eventLoop.makeSucceededVoidFuture() - } - } - struct TestJob2: HBJob { - static let name = "testFailToDecode" - static var value: String? - let value: String - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - Self.value = self.value - return eventLoop.makeSucceededVoidFuture() - } - } - TestJob2.register() - - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - - try app.start() - - app.jobs.queue.enqueue(TestJob1(), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestJob2(value: "test"), on: app.eventLoopGroup.next()) - - // stall to give job chance to start running - Thread.sleep(forTimeInterval: 0.1) - - app.stop() - app.wait() - - XCTAssertEqual(TestJob2.value, "test") - } - - /// test access via `HBRequest` - func testAccessViaRequest() throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "Jobs Completed") - - func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - Self.expectation.fulfill() - return eventLoop.makeSucceededVoidFuture() - } - } - TestJob.register() - TestJob.expectation.expectedFulfillmentCount = 1 - - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 1) - - app.router.get("/job") { request -> EventLoopFuture in - return request.jobs.enqueue(job: TestJob()).map { _ in .ok } - } - - try app.XCTStart() - defer { app.XCTStop() } - - try app.XCTExecute(uri: "/job", method: .GET) { response in - XCTAssertEqual(response.status, .ok) - } - - wait(for: [TestJob.expectation], timeout: 5) - } - - @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) - func testAsyncJob() throws { - #if os(macOS) - // disable macOS tests in CI. GH Actions are currently running this when they shouldn't - guard HBEnvironment().get("CI") != "true" else { throw XCTSkip() } - #endif - struct TestAsyncJob: HBAsyncJob { - static let name = "testAsyncJob" - static let expectation = XCTestExpectation(description: "Jobs Completed") - - func execute(logger: Logger) async throws { - try await Task.sleep(nanoseconds: 1_000_000) - Self.expectation.fulfill() - } - } + func testJobSerialization() throws { + struct TestJob: HBJob, Equatable { + static let name = "testJobSerialization" + let value: Int + func execute(logger: Logger) async throws {} + } + TestJob.register() + let job = TestJob(value: 2) + let jobInstance = HBJobInstance(job) + let data = try JSONEncoder().encode(jobInstance) + let jobInstance2 = try JSONDecoder().decode(HBJobInstance.self, from: data) + XCTAssertEqual(jobInstance2.job as? TestJob, job) + } - TestAsyncJob.register() - TestAsyncJob.expectation.expectedFulfillmentCount = 3 + /// Test job is cancelled on shutdown + func testShutdownJob() async throws { + struct TestJob: HBJob { + static let name = "testShutdownJob" + func execute(logger: Logger) async throws { + try await Task.sleep(for: .milliseconds(100)) + } + } + TestJob.register() - let app = HBApplication(testing: .live) - app.logger.logLevel = .trace - app.addJobs(using: .memory, numWorkers: 2) + let cancelledJobCount = ManagedAtomic(0) + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .trace + let jobQueueHandler = HBJobQueueHandler( + queue: HBMemoryJobQueue { _, error in + if error is CancellationError { + cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) + } + }, + numWorkers: 4, + logger: logger + ) + try await testJobQueue(jobQueueHandler) { + try await jobQueueHandler.enqueue(TestJob()) + } - try app.start() - defer { app.stop() } + XCTAssertEqual(cancelledJobCount.load(ordering: .relaxed), 1) + } - app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next()) - app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next()) + /// test job fails to decode but queue continues to process + func testFailToDecode() async throws { + struct TestJob1: HBJob { + static let name = "testFailToDecode" + func execute(logger: Logger) async throws {} + } + struct TestJob2: HBJob { + static let name = "testFailToDecode" + static var value: String? + let value: String + func execute(logger: Logger) async throws { + Self.value = self.value + } + } + TestJob2.register() - wait(for: [TestAsyncJob.expectation], timeout: 5) - } - } + let jobQueueHandler = HBJobQueueHandler( + queue: HBMemoryJobQueue(), + numWorkers: 1, + logger: Logger(label: "HummingbirdJobsTests") + ) + try await testJobQueue(jobQueueHandler) { + try await jobQueueHandler.enqueue(TestJob1()) + try await jobQueueHandler.enqueue(TestJob2(value: "test")) + // stall to give job chance to start running + try await Task.sleep(for: .milliseconds(500)) + } - extension HBJobQueueId { - static var test: HBJobQueueId { "test" } - } - */ + XCTAssertEqual(TestJob2.value, "test") + } +}