diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift index 5a794140b..eb1197cad 100644 --- a/Sources/HummingbirdJobs/Job.swift +++ b/Sources/HummingbirdJobs/Job.swift @@ -30,20 +30,3 @@ extension HBJob { id.name } } - -/// Type used internally by job queue implementations to encode a job request -public struct _HBJobRequest: Encodable, Sendable { - let id: HBJobIdentifier - let parameters: Parameters - - public init(id: HBJobIdentifier, parameters: Parameters) { - self.id = id - self.parameters = parameters - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.container(keyedBy: _HBJobCodingKey.self) - let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) - try self.parameters.encode(to: childEncoder) - } -} diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift new file mode 100644 index 000000000..bb9f4705d --- /dev/null +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job definition type +public struct HBJobDefinition { + let id: HBJobIdentifier + let maxRetryCount: Int + let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void + + public init(id: HBJobIdentifier, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { + self.id = id + self.maxRetryCount = maxRetryCount + self._execute = execute + } + + func execute(_ parameters: Parameters, context: HBJobContext) async throws { + try await self._execute(parameters, context) + } +} diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 46fd22ab9..1506a8791 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -26,7 +26,7 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob func onInit() async throws /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID + @discardableResult func _push(data: Data) async throws -> JobID /// This is called to say job has finished processing and it can be deleted func finished(jobId: JobID) async throws /// This is called to say job has failed to run and should be put aside @@ -40,4 +40,28 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob extension HBJobQueue { // default version of onInit doing nothing public func onInit() async throws {} + /// Push Job onto queue + /// - Returns: Identifier of queued job + @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { + let jobRequest = HBJobRequest(id: id, parameters: parameters) + let data = try JSONEncoder().encode(jobRequest) + return try await _push(data: data) + } +} + +/// Type used internally to encode a request +struct HBJobRequest: Encodable, Sendable { + let id: HBJobIdentifier + let parameters: Parameters + + public init(id: HBJobIdentifier, parameters: Parameters) { + self.id = id + self.parameters = parameters + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: _HBJobCodingKey.self) + let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) + try self.parameters.encode(to: childEncoder) + } } diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift index 4d24665cd..3ec07b125 100644 --- a/Sources/HummingbirdJobs/JobQueueError.swift +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -17,11 +17,14 @@ public struct JobQueueError: Error, Equatable { /// failed to decode job. Possibly because it hasn't been registered or data that was expected /// is not available public static var decodeJobFailed: Self { .init(.decodeJobFailed) } + /// failed to decode job as the job id is not recognised + public static var unrecognisedJobId: Self { .init(.unrecognisedJobId) } /// failed to get job from queue public static var dequeueError: Self { .init(.dequeueError) } private enum QueueError { case decodeJobFailed + case unrecognisedJobId case dequeueError } diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 67cc6edfe..991d9003f 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -23,6 +23,33 @@ public final class HBJobQueueHandler: Service { self.queue = queue self.numWorkers = numWorkers self.logger = logger + self.jobRegister = .init() + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + _ id: HBJobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + HBJobContext + ) async throws -> Void + ) { + let definition = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) + self.jobRegister.registerJob(job: definition) + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob(_ job: HBJobDefinition) { + self.jobRegister.registerJob(job: job) } public func run() async throws { @@ -32,16 +59,16 @@ public final class HBJobQueueHandler: Service { try await withThrowingTaskGroup(of: Void.self) { group in var iterator = self.queue.makeAsyncIterator() for _ in 0..: Service { } } - func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob? { - while true { - do { - let job = try await queueIterator.next() - return job - } catch let error as JobQueueError where error == JobQueueError.decodeJobFailed { - self.logger.error("Job failed to decode.") - } - } - } - - func runJob(_ queuedJob: HBQueuedJob) async { + func runJob(_ queuedJob: HBQueuedJob) async throws { var logger = logger logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) - logger[metadataKey: "hb_job_type"] = .string(queuedJob.job.name) + let job: any HBJob + do { + job = try self.jobRegister.decode(data: queuedJob.jobData) + } catch let error as JobQueueError where error == .unrecognisedJobId { + logger.debug("Failed to find Job with ID while decoding") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + logger.debug("Job failed to decode") + try await self.queue.failed(jobId: queuedJob.id, error: JobQueueError.decodeJobFailed) + return + } + logger[metadataKey: "hb_job_type"] = .string(job.name) - let job = queuedJob.job var count = job.maxRetryCount logger.debug("Starting Job") @@ -103,6 +130,7 @@ public final class HBJobQueueHandler: Service { } } + private let jobRegister: HBJobRegister private let queue: Queue private let numWorkers: Int let logger: Logger diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift index d32d110e1..de2416cdb 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -12,69 +12,51 @@ // //===----------------------------------------------------------------------===// +import Foundation import NIOConcurrencyHelpers /// Registry for job types -public enum HBJobRegister { +struct HBJobRegister: Sendable { /// Register job /// - Parameters: /// - id: Job Identifier /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed /// - execute: Job code - public static func registerJob( - _ id: HBJobIdentifier, - maxRetryCount: Int = 0, - execute: @escaping @Sendable ( - Parameters, - HBJobContext - ) async throws -> Void + public func registerJob( + job: HBJobDefinition ) { - let definition = HBJobInstance.Definition(id: id, maxRetryCount: maxRetryCount, execute: execute) let builder = { (decoder: Decoder) in let parameters = try Parameters(from: decoder) - return try HBJobInstance(job: definition, parameters: parameters) + return try HBJobInstance(job: job, parameters: parameters) } - self.idTypeMap.withLockedValue { - precondition($0[id.name] == nil, "There is a job already registered under id \"\(id.name)\"") - $0[id.name] = builder + self.builderTypeMap.withLockedValue { + precondition($0[job.id.name] == nil, "There is a job already registered under id \"\(job.id.name)\"") + $0[job.id.name] = builder } } - static func decode(from decoder: Decoder) throws -> any HBJob { + func decode(data: Data) throws -> any HBJob { + return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, configuration: self).job + } + + func decode(from decoder: Decoder) throws -> any HBJob { let container = try decoder.container(keyedBy: _HBJobCodingKey.self) let key = container.allKeys.first! let childDecoder = try container.superDecoder(forKey: key) - let jobDefinitionBuilder = try Self.idTypeMap.withLockedValue { - guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed } + let jobDefinitionBuilder = try self.builderTypeMap.withLockedValue { + guard let job = $0[key.stringValue] else { throw JobQueueError.unrecognisedJobId } return job } return try jobDefinitionBuilder(childDecoder) } - static let idTypeMap: NIOLockedValueBox < [String: (Decoder) throws -> any HBJob]> = .init([:]) + let builderTypeMap: NIOLockedValueBox < [String: @Sendable (Decoder) throws -> any HBJob]> = .init([:]) } /// Internal job instance type -struct HBJobInstance: HBJob { - /// Job definition type - struct Definition { - let id: HBJobIdentifier - let maxRetryCount: Int - let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void - - init(id: HBJobIdentifier, maxRetryCount: Int, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { - self.id = id - self.maxRetryCount = maxRetryCount - self._execute = execute - } - - public func execute(_ parameters: Parameters, context: HBJobContext) async throws { - try await self._execute(parameters, context) - } - } - +internal struct HBJobInstance: HBJob { /// job definition - let job: Definition + let job: HBJobDefinition /// job parameters let parameters: Parameters @@ -86,12 +68,33 @@ struct HBJobInstance: HBJob { try await self.job.execute(self.parameters, context: context) } - init(job: Definition, parameters: Parameters) throws { + init(job: HBJobDefinition, parameters: Parameters) throws { self.job = job self.parameters = parameters } } +/// Add codable support for decoding/encoding any HBJob +internal struct HBAnyCodableJob: DecodableWithConfiguration, Sendable { + typealias DecodingConfiguration = HBJobRegister + + init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { + self.job = try register.decode(from: decoder) + } + + /// Job data + let job: any HBJob + + /// Initialize a queue job + init(_ job: any HBJob) { + self.job = job + } + + private enum CodingKeys: String, CodingKey { + case job + } +} + internal struct _HBJobCodingKey: CodingKey { public var stringValue: String public var intValue: Int? diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index 0a17ee482..e333c2de5 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -45,9 +45,8 @@ public final class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { - let job = _HBJobRequest(id: id, parameters: parameters) - return try await self.queue.push(job) + @discardableResult public func _push(data: Data) async throws -> JobID { + return try await self.queue.push(data) } public func finished(jobId: JobID) async throws { @@ -56,14 +55,14 @@ public final class HBMemoryJobQueue: HBJobQueue { public func failed(jobId: JobID, error: any Error) async throws { if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { - self.onFailedJob(.init(id: jobId, job: job), error) + self.onFailedJob(.init(id: jobId, jobData: job), error) } } /// Internal actor managing the job queue fileprivate actor Internal { - var queue: Deque<(JobID, Data)> - var pendingJobs: [JobID: any HBJob] + var queue: Deque> + var pendingJobs: [JobID: Data] var isStopped: Bool init() { @@ -72,10 +71,9 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs = .init() } - func push(_ jobRequest: _HBJobRequest) throws -> JobID { + func push(_ jobData: Data) throws -> JobID { let id = JobID() - let request = try (id, JSONEncoder().encode(jobRequest)) - self.queue.append(request) + self.queue.append(HBQueuedJob(id: id, jobData: jobData)) return id } @@ -83,7 +81,7 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs[jobId] = nil } - func clearAndReturnPendingJob(jobId: JobID) -> (any HBJob)? { + func clearAndReturnPendingJob(jobId: JobID) -> Data? { let instance = self.pendingJobs[jobId] self.pendingJobs[jobId] = nil return instance @@ -96,9 +94,8 @@ public final class HBMemoryJobQueue: HBJobQueue { } if let request = queue.popFirst() { do { - let job = try JSONDecoder().decode(HBAnyCodableJob.self, from: request.1) - self.pendingJobs[request.0] = job.job - return HBQueuedJob(id: request.0, job: job.job) + self.pendingJobs[request.id] = request.jobData + return request } catch { throw JobQueueError.decodeJobFailed } diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index b3ef0f996..1e0e1737d 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -14,35 +14,16 @@ import Foundation -/// Add codable support for decoding/encoding any HBJob -public struct HBAnyCodableJob: Decodable, Sendable { - /// Job data - public let job: any HBJob - - /// Initialize a queue job - public init(_ job: any HBJob) { - self.job = job - } - - public init(from decoder: Decoder) throws { - self.job = try HBJobRegister.decode(from: decoder) - } - - private enum CodingKeys: String, CodingKey { - case job - } -} - -/// Queued job. Includes job, plus the id for the job +/// Queued job. Includes job data, plus the id for the job public struct HBQueuedJob: Sendable { /// Job instance id public let id: JobID /// Job data - public let job: any HBJob + public let jobData: Data /// Initialize a queue job - public init(id: JobID, job: any HBJob) { - self.job = job + public init(id: JobID, jobData: Data) { + self.jobData = jobData self.id = id } } diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 2955c8e54..cc7209fff 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -63,18 +63,18 @@ final class HummingbirdJobsTests: XCTestCase { func testBasic() async throws { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) let jobIdentifer = HBJobIdentifier(#function) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests") ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -95,7 +95,14 @@ final class HummingbirdJobsTests: XCTestCase { let runningJobCounter = ManagedAtomic(0) let maxRunningJobCounter = ManagedAtomic(0) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in + + let jobQueue = HBMemoryJobQueue() + let jobQueueHandler = HBJobQueueHandler( + queue: jobQueue, + numWorkers: 4, + logger: Logger(label: "HummingbirdJobsTests") + ) + jobQueueHandler.registerJob(jobIdentifer) { parameters, context in let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { maxRunningJobCounter.store(runningJobs, ordering: .relaxed) @@ -105,14 +112,7 @@ final class HummingbirdJobsTests: XCTestCase { expectation.fulfill() runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, - logger: Logger(label: "HummingbirdJobsTests") - ) - try await testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -136,10 +136,6 @@ final class HummingbirdJobsTests: XCTestCase { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) let failedJobCount = ManagedAtomic(0) struct FailedError: Error {} - HBJobRegister.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in - expectation.fulfill() - throw FailedError() - } var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace let jobQueue = HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } @@ -148,7 +144,11 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 4, logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() + throw FailedError() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) @@ -163,18 +163,18 @@ final class HummingbirdJobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called") let jobIdentifer = HBJobIdentifier(#function) - HBJobRegister.registerJob(jobIdentifer) { parameters, _ in - XCTAssertEqual(parameters.id, 23) - XCTAssertEqual(parameters.message, "Hello!") - expectation.fulfill() - } let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests") ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) await self.wait(for: [expectation], timeout: 5) @@ -185,10 +185,6 @@ final class HummingbirdJobsTests: XCTestCase { func testShutdownJob() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - HBJobRegister.registerJob(jobIdentifer) { _, _ in - expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } let cancelledJobCount = ManagedAtomic(0) var logger = Logger(label: "HummingbirdJobsTests") @@ -203,7 +199,11 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 4, logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) } @@ -217,18 +217,20 @@ final class HummingbirdJobsTests: XCTestCase { let jobIdentifer1 = HBJobIdentifier(#function) let jobIdentifer2 = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) - HBJobRegister.registerJob(jobIdentifer2) { parameters, _ in - string.withLockedValue { $0 = parameters } - expectation.fulfill() - } + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") + logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer1, parameters: 2) try await jobQueue.push(id: jobIdentifer2, parameters: "test") await self.wait(for: [expectation], timeout: 5) @@ -241,7 +243,7 @@ final class HummingbirdJobsTests: XCTestCase { func testMultipleJobQueueHandlers() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in + let job = HBJobDefinition(id: jobIdentifer) { parameters, context in context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() @@ -257,12 +259,14 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 2, logger: logger ) + jobQueueHandler.registerJob(job) let jobQueue2 = HBMemoryJobQueue() let jobQueueHandler2 = HBJobQueueHandler( queue: jobQueue2, numWorkers: 2, logger: logger ) + jobQueueHandler2.registerJob(job) try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup(