Skip to content

Commit

Permalink
Move job registry inside queue handler
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Feb 26, 2024
1 parent 63c24d9 commit 16a77c2
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 147 deletions.
17 changes: 0 additions & 17 deletions Sources/HummingbirdJobs/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,3 @@ extension HBJob {
id.name
}
}

/// Type used internally by job queue implementations to encode a job request
public struct _HBJobRequest<Parameters: Codable & Sendable>: Encodable, Sendable {
let id: HBJobIdentifier<Parameters>
let parameters: Parameters

public init(id: HBJobIdentifier<Parameters>, parameters: Parameters) {
self.id = id
self.parameters = parameters
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: _HBJobCodingKey.self)
let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil))
try self.parameters.encode(to: childEncoder)
}
}
30 changes: 30 additions & 0 deletions Sources/HummingbirdJobs/JobDefinition.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Job definition type
public struct HBJobDefinition<Parameters: Codable & Sendable> {
let id: HBJobIdentifier<Parameters>
let maxRetryCount: Int
let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void

public init(id: HBJobIdentifier<Parameters>, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) {
self.id = id
self.maxRetryCount = maxRetryCount
self._execute = execute
}

func execute(_ parameters: Parameters, context: HBJobContext) async throws {
try await self._execute(parameters, context)
}
}
26 changes: 25 additions & 1 deletion Sources/HummingbirdJobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob
func onInit() async throws
/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult func push<Parameters: Codable & Sendable>(id: HBJobIdentifier<Parameters>, parameters: Parameters) async throws -> JobID
@discardableResult func _push(data: Data) async throws -> JobID
/// This is called to say job has finished processing and it can be deleted
func finished(jobId: JobID) async throws
/// This is called to say job has failed to run and should be put aside
Expand All @@ -40,4 +40,28 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob
extension HBJobQueue {
// default version of onInit doing nothing
public func onInit() async throws {}
/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push<Parameters: Codable & Sendable>(id: HBJobIdentifier<Parameters>, parameters: Parameters) async throws -> JobID {
let jobRequest = HBJobRequest(id: id, parameters: parameters)
let data = try JSONEncoder().encode(jobRequest)
return try await _push(data: data)
}
}

/// Type used internally to encode a request
struct HBJobRequest<Parameters: Codable & Sendable>: Encodable, Sendable {
let id: HBJobIdentifier<Parameters>
let parameters: Parameters

public init(id: HBJobIdentifier<Parameters>, parameters: Parameters) {
self.id = id
self.parameters = parameters
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: _HBJobCodingKey.self)
let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil))
try self.parameters.encode(to: childEncoder)
}
}
3 changes: 3 additions & 0 deletions Sources/HummingbirdJobs/JobQueueError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ public struct JobQueueError: Error, Equatable {
/// failed to decode job. Possibly because it hasn't been registered or data that was expected
/// is not available
public static var decodeJobFailed: Self { .init(.decodeJobFailed) }
/// failed to decode job as the job id is not recognised
public static var unrecognisedJobId: Self { .init(.unrecognisedJobId) }
/// failed to get job from queue
public static var dequeueError: Self { .init(.dequeueError) }

private enum QueueError {
case decodeJobFailed
case unrecognisedJobId
case dequeueError
}

Expand Down
64 changes: 46 additions & 18 deletions Sources/HummingbirdJobs/JobQueueHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,33 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
self.queue = queue
self.numWorkers = numWorkers
self.logger = logger
self.jobRegister = .init()
}

/// Register job
/// - Parameters:
/// - id: Job Identifier
/// - maxRetryCount: Maximum number of times job is retried before being flagged as failed
/// - execute: Job code
public func registerJob<Parameters: Codable & Sendable>(
_ id: HBJobIdentifier<Parameters>,
maxRetryCount: Int = 0,
execute: @escaping @Sendable (
Parameters,
HBJobContext
) async throws -> Void
) {
let definition = HBJobDefinition<Parameters>(id: id, maxRetryCount: maxRetryCount, execute: execute)
self.jobRegister.registerJob(job: definition)
}

/// Register job
/// - Parameters:
/// - id: Job Identifier
/// - maxRetryCount: Maximum number of times job is retried before being flagged as failed
/// - execute: Job code
public func registerJob(_ job: HBJobDefinition<some Codable & Sendable>) {
self.jobRegister.registerJob(job: job)
}

public func run() async throws {
Expand All @@ -32,16 +59,16 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
try await withThrowingTaskGroup(of: Void.self) { group in
var iterator = self.queue.makeAsyncIterator()
for _ in 0..<self.numWorkers {
if let job = try await self.getNextJob(&iterator) {
if let job = try await iterator.next() {
group.addTask {
await self.runJob(job)
try await self.runJob(job)
}
}
}
while let job = try await self.getNextJob(&iterator) {
while let job = try await iterator.next() {
try await group.next()
group.addTask {
await self.runJob(job)
try await self.runJob(job)
}
}
group.cancelAll()
Expand All @@ -54,23 +81,23 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
}
}

func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob<Queue.JobID>? {
while true {
do {
let job = try await queueIterator.next()
return job
} catch let error as JobQueueError where error == JobQueueError.decodeJobFailed {
self.logger.error("Job failed to decode.")
}
}
}

func runJob(_ queuedJob: HBQueuedJob<Queue.JobID>) async {
func runJob(_ queuedJob: HBQueuedJob<Queue.JobID>) async throws {
var logger = logger
logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id)
logger[metadataKey: "hb_job_type"] = .string(queuedJob.job.name)
let job: any HBJob
do {
job = try self.jobRegister.decode(data: queuedJob.jobData)
} catch let error as JobQueueError where error == .unrecognisedJobId {
logger.debug("Failed to find Job with ID while decoding")
try await self.queue.failed(jobId: queuedJob.id, error: error)
return
} catch {
logger.debug("Job failed to decode")
try await self.queue.failed(jobId: queuedJob.id, error: JobQueueError.decodeJobFailed)
return
}
logger[metadataKey: "hb_job_type"] = .string(job.name)

let job = queuedJob.job
var count = job.maxRetryCount
logger.debug("Starting Job")

Expand Down Expand Up @@ -103,6 +130,7 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
}
}

private let jobRegister: HBJobRegister
private let queue: Queue
private let numWorkers: Int
let logger: Logger
Expand Down
77 changes: 40 additions & 37 deletions Sources/HummingbirdJobs/JobRegister.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,69 +12,51 @@
//
//===----------------------------------------------------------------------===//

import Foundation
import NIOConcurrencyHelpers

/// Registry for job types
public enum HBJobRegister {
struct HBJobRegister: Sendable {
/// Register job
/// - Parameters:
/// - id: Job Identifier
/// - maxRetryCount: Maximum number of times job is retried before being flagged as failed
/// - execute: Job code
public static func registerJob<Parameters: Codable & Sendable>(
_ id: HBJobIdentifier<Parameters>,
maxRetryCount: Int = 0,
execute: @escaping @Sendable (
Parameters,
HBJobContext
) async throws -> Void
public func registerJob<Parameters: Codable & Sendable>(
job: HBJobDefinition<Parameters>
) {
let definition = HBJobInstance<Parameters>.Definition(id: id, maxRetryCount: maxRetryCount, execute: execute)
let builder = { (decoder: Decoder) in
let parameters = try Parameters(from: decoder)
return try HBJobInstance(job: definition, parameters: parameters)
return try HBJobInstance<Parameters>(job: job, parameters: parameters)
}
self.idTypeMap.withLockedValue {
precondition($0[id.name] == nil, "There is a job already registered under id \"\(id.name)\"")
$0[id.name] = builder
self.builderTypeMap.withLockedValue {
precondition($0[job.id.name] == nil, "There is a job already registered under id \"\(job.id.name)\"")
$0[job.id.name] = builder
}
}

static func decode(from decoder: Decoder) throws -> any HBJob {
func decode(data: Data) throws -> any HBJob {
return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, configuration: self).job
}

func decode(from decoder: Decoder) throws -> any HBJob {
let container = try decoder.container(keyedBy: _HBJobCodingKey.self)
let key = container.allKeys.first!
let childDecoder = try container.superDecoder(forKey: key)
let jobDefinitionBuilder = try Self.idTypeMap.withLockedValue {
guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed }
let jobDefinitionBuilder = try self.builderTypeMap.withLockedValue {
guard let job = $0[key.stringValue] else { throw JobQueueError.unrecognisedJobId }
return job
}
return try jobDefinitionBuilder(childDecoder)
}

static let idTypeMap: NIOLockedValueBox < [String: (Decoder) throws -> any HBJob]> = .init([:])
let builderTypeMap: NIOLockedValueBox < [String: @Sendable (Decoder) throws -> any HBJob]> = .init([:])
}

/// Internal job instance type
struct HBJobInstance<Parameters: Codable & Sendable>: HBJob {
/// Job definition type
struct Definition {
let id: HBJobIdentifier<Parameters>
let maxRetryCount: Int
let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void

init(id: HBJobIdentifier<Parameters>, maxRetryCount: Int, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) {
self.id = id
self.maxRetryCount = maxRetryCount
self._execute = execute
}

public func execute(_ parameters: Parameters, context: HBJobContext) async throws {
try await self._execute(parameters, context)
}
}

internal struct HBJobInstance<Parameters: Codable & Sendable>: HBJob {
/// job definition
let job: Definition
let job: HBJobDefinition<Parameters>
/// job parameters
let parameters: Parameters

Expand All @@ -86,12 +68,33 @@ struct HBJobInstance<Parameters: Codable & Sendable>: HBJob {
try await self.job.execute(self.parameters, context: context)
}

init(job: Definition, parameters: Parameters) throws {
init(job: HBJobDefinition<Parameters>, parameters: Parameters) throws {
self.job = job
self.parameters = parameters
}
}

/// Add codable support for decoding/encoding any HBJob
internal struct HBAnyCodableJob: DecodableWithConfiguration, Sendable {
typealias DecodingConfiguration = HBJobRegister

init(from decoder: Decoder, configuration register: DecodingConfiguration) throws {
self.job = try register.decode(from: decoder)
}

/// Job data
let job: any HBJob

/// Initialize a queue job
init(_ job: any HBJob) {
self.job = job
}

private enum CodingKeys: String, CodingKey {
case job
}
}

internal struct _HBJobCodingKey: CodingKey {
public var stringValue: String
public var intValue: Int?
Expand Down
23 changes: 10 additions & 13 deletions Sources/HummingbirdJobs/MemoryJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ public final class HBMemoryJobQueue: HBJobQueue {
/// - job: Job
/// - eventLoop: Eventloop to run process on (ignored in this case)
/// - Returns: Queued job
@discardableResult public func push<Parameters: Codable & Sendable>(id: HBJobIdentifier<Parameters>, parameters: Parameters) async throws -> JobID {
let job = _HBJobRequest(id: id, parameters: parameters)
return try await self.queue.push(job)
@discardableResult public func _push(data: Data) async throws -> JobID {
return try await self.queue.push(data)
}

public func finished(jobId: JobID) async throws {
Expand All @@ -56,14 +55,14 @@ public final class HBMemoryJobQueue: HBJobQueue {

public func failed(jobId: JobID, error: any Error) async throws {
if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) {
self.onFailedJob(.init(id: jobId, job: job), error)
self.onFailedJob(.init(id: jobId, jobData: job), error)
}
}

/// Internal actor managing the job queue
fileprivate actor Internal {
var queue: Deque<(JobID, Data)>
var pendingJobs: [JobID: any HBJob]
var queue: Deque<HBQueuedJob<JobID>>
var pendingJobs: [JobID: Data]
var isStopped: Bool

init() {
Expand All @@ -72,18 +71,17 @@ public final class HBMemoryJobQueue: HBJobQueue {
self.pendingJobs = .init()
}

func push(_ jobRequest: _HBJobRequest<some Codable>) throws -> JobID {
func push(_ jobData: Data) throws -> JobID {
let id = JobID()
let request = try (id, JSONEncoder().encode(jobRequest))
self.queue.append(request)
self.queue.append(HBQueuedJob(id: id, jobData: jobData))
return id
}

func clearPendingJob(jobId: JobID) {
self.pendingJobs[jobId] = nil
}

func clearAndReturnPendingJob(jobId: JobID) -> (any HBJob)? {
func clearAndReturnPendingJob(jobId: JobID) -> Data? {
let instance = self.pendingJobs[jobId]
self.pendingJobs[jobId] = nil
return instance
Expand All @@ -96,9 +94,8 @@ public final class HBMemoryJobQueue: HBJobQueue {
}
if let request = queue.popFirst() {
do {
let job = try JSONDecoder().decode(HBAnyCodableJob.self, from: request.1)
self.pendingJobs[request.0] = job.job
return HBQueuedJob(id: request.0, job: job.job)
self.pendingJobs[request.id] = request.jobData
return request
} catch {
throw JobQueueError.decodeJobFailed
}
Expand Down
Loading

0 comments on commit 16a77c2

Please sign in to comment.