Skip to content

Commit

Permalink
Job service (#255)
Browse files Browse the repository at this point in the history
* Initial job queue changes

* Job queue running in taskgroup

* Move group.next()

* Fix concurrent running jobs, add failed job handler

* Handler failed to decode, add tests
  • Loading branch information
adam-fowler authored Nov 1, 2023
1 parent de6927a commit 622b8f0
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 683 deletions.
33 changes: 0 additions & 33 deletions Sources/HummingbirdJobs/AsyncAwaitSupport/AsyncJob.swift

This file was deleted.

6 changes: 3 additions & 3 deletions Sources/HummingbirdJobs/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Copyright (c) 2021-2023 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -19,7 +19,7 @@ import NIO
/// Protocol for job description
///
/// For a job to be decodable, it has to be registered. Call `MyJob.register()` to register a job.
public protocol HBJob: Codable {
public protocol HBJob: Codable, Sendable {
/// Unique Job name
static var name: String { get }

Expand All @@ -28,7 +28,7 @@ public protocol HBJob: Codable {

/// Execute job
/// - Returns: EventLoopFuture that is fulfulled when job is done
func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void>
func execute(logger: Logger) async throws
}

extension HBJob {
Expand Down
43 changes: 43 additions & 0 deletions Sources/HummingbirdJobs/JobIdentifier.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2023 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation

/// Identifier for Job
public struct JobIdentifier: Sendable, CustomStringConvertible, Codable, Hashable {
let id: String

init() {
self.id = UUID().uuidString
}

/// Initialize JobIdentifier from String
/// - Parameter value: string value
public init(_ value: String) {
self.id = value
}

public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.id = try container.decode(String.self)
}

public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.id)
}

/// String description of Identifier
public var description: String { self.id }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Copyright (c) 2021-2023 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -15,14 +15,14 @@
import Foundation

/// Holder for all data related to a job
public struct HBJobContainer: Codable {
public struct HBJobInstance: Codable {
/// Time created
public let createdAt: Date
/// Job data
public let job: HBJob

/// Initialize a queue job
init(_ job: HBJob) {
public init(_ job: HBJob) {
self.job = job
self.createdAt = Date()
}
Expand Down Expand Up @@ -52,7 +52,7 @@ public struct HBQueuedJob: Codable {
/// Job id
public let id: JobIdentifier
/// Job data
public let job: HBJobContainer
public let job: HBJobInstance

/// Initialize a queue job
public init(_ job: HBJob) {
Expand All @@ -61,7 +61,7 @@ public struct HBQueuedJob: Codable {
}

/// Initialize a queue job
public init(id: JobIdentifier, job: HBJobContainer) {
public init(id: JobIdentifier, job: HBJobInstance) {
self.job = job
self.id = id
}
Expand Down
105 changes: 9 additions & 96 deletions Sources/HummingbirdJobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Copyright (c) 2021-2023 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -16,106 +16,19 @@ import Foundation
import Hummingbird
import Logging

/// Identifier for Job
public struct JobIdentifier: Sendable, CustomStringConvertible, Codable {
let id: String

init() {
self.id = UUID().uuidString
}

/// Initialize JobIdentifier from String
/// - Parameter value: string value
public init(_ value: String) {
self.id = value
}

public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.id = try container.decode(String.self)
}

public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.id)
}

/// String description of Identifier
public var description: String { self.id }
}

/// Job queue protocol.
///
/// Defines how to push and pop jobs off a queue
public protocol HBJobQueue: AnyObject {
/// Process to run at initialisation of Job Queue
/// - Returns: When queue initialisation is finished
func onInit(on: EventLoop) -> EventLoopFuture<Void>
public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob {
/// Push Job onto queue
/// - Returns: Queued job information
func push(_ job: HBJob, on: EventLoop) -> EventLoopFuture<HBQueuedJob>
/// Pop job off queue. Future will wait until a job is available
/// - Returns: Queued job information
func pop(on: EventLoop) -> EventLoopFuture<HBQueuedJob?>
@discardableResult func push(_ job: HBJob) async throws -> JobIdentifier
/// This is called to say job has finished processing and it can be deleted
/// - Returns: When deletion of job has finished
func finished(jobId: JobIdentifier, on: EventLoop) -> EventLoopFuture<Void>
func finished(jobId: JobIdentifier) async throws
/// This is called to say job has failed to run and should be put aside
func failed(jobId: JobIdentifier, error: any Error) async throws
/// stop serving jobs
func stop() async
/// shutdown queue
func shutdown(on: EventLoop) -> EventLoopFuture<Void>
/// time amount between each poll of queue
var pollTime: TimeAmount { get }
}

extension HBJobQueue {
/// Default implememtatoin of `finish`. Does nothing
/// - Parameters:
/// - jobId: Job Identifier
/// - eventLoop: EventLoop to run process on
/// - Returns: When deletion of job has finished. In this case immediately
public func finished(jobId: JobIdentifier, on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return eventLoop.makeSucceededVoidFuture()
}

/// Default implementation of `onInit`. Does nothing
/// - Parameter eventLoop: EventLoop to run process on
/// - Returns: When initialisation has finished. In this case immediately
public func onInit(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return eventLoop.makeSucceededVoidFuture()
}

/// Default implementation of `shutdown`. Does nothing
public func shutdown(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return eventLoop.makeSucceededVoidFuture()
}

public var shutdownError: Error { return HBJobQueueShutdownError() }

/// Push job onto queue
/// - Parameters:
/// - job: Job descriptor
/// - maxRetryCount: Number of times you should retry job
/// - Returns: ID for job
@discardableResult public func enqueue(_ job: HBJob, on eventLoop: EventLoop) -> EventLoopFuture<JobIdentifier> {
return self.push(job, on: eventLoop).map(\.id)
}

/// Queue workers poll the queue to get the latest jobs off the queue. This indicates the time amount
/// between each poll of the queue
public var pollTime: TimeAmount { .milliseconds(100) }
}

/// Job queue asynchronous enqueue
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension HBJobQueue {
/// Push job onto queue
/// - Parameters:
/// - job: Job descriptor
/// - maxRetryCount: Number of times you should retry job
/// - Returns: ID for job
@discardableResult public func enqueue(_ job: HBJob, on eventLoop: EventLoop) async throws -> JobIdentifier {
return try await self.push(job, on: eventLoop).map(\.id).get()
}
func shutdownGracefully() async
}

/// Error type for when a job queue is being shutdown
struct HBJobQueueShutdownError: Error {}
3 changes: 3 additions & 0 deletions Sources/HummingbirdJobs/JobQueueError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ public struct JobQueueError: Error, Equatable {
/// failed to decode job. Possibly because it hasn't been registered or data that was expected
/// is not available
public static var decodeJobFailed: Self { .init(.decodeJobFailed) }
/// failed to get job from queue
public static var dequeueError: Self { .init(.dequeueError) }

private enum QueueError {
case decodeJobFailed
case dequeueError
}

private let error: QueueError
Expand Down
Loading

0 comments on commit 622b8f0

Please sign in to comment.