Skip to content

Commit

Permalink
Handler failed to decode, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Oct 29, 2023
1 parent 0cd32aa commit 4db2c8a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 187 deletions.
33 changes: 0 additions & 33 deletions Sources/HummingbirdJobs/AsyncAwaitSupport/AsyncJob.swift

This file was deleted.

2 changes: 1 addition & 1 deletion Sources/HummingbirdJobs/JobInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public struct HBJobInstance: Codable {
public let job: HBJob

/// Initialize a queue job
init(_ job: HBJob) {
public init(_ job: HBJob) {
self.job = job
self.createdAt = Date()
}
Expand Down
15 changes: 13 additions & 2 deletions Sources/HummingbirdJobs/JobQueueHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
try await withThrowingTaskGroup(of: Void.self) { group in
var iterator = self.queue.makeAsyncIterator()
for _ in 0..<self.numWorkers {
if let job = try await iterator.next() {
if let job = try await self.getNextJob(&iterator) {
group.addTask {
try await self.runJob(job)
}
}
}
for try await job in self.queue {
while let job = try await self.getNextJob(&iterator) {
try await group.next()
group.addTask {
try await self.runJob(job)
Expand All @@ -58,6 +58,17 @@ public final class HBJobQueueHandler<Queue: HBJobQueue>: Service {
}
}

func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob? {
while true {
do {
let job = try await queueIterator.next()
return job
} catch let error as JobQueueError where error == JobQueueError.decodeJobFailed {
self.logger.error("Job failed to decode.")
}
}
}

func runJob(_ queuedJob: HBQueuedJob) async throws {
var logger = logger
logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id)
Expand Down
218 changes: 67 additions & 151 deletions Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,162 +152,78 @@ final class HummingbirdJobsTests: XCTestCase {
}
XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1)
}
/*
func testShutdown() throws {
let app = HBApplication(testing: .live)
app.logger.logLevel = .trace
app.addJobs(using: .memory, numWorkers: 1)
try app.start()
app.stop()
app.wait()
}

func testShutdownJob() throws {
struct TestJob: HBJob {
static let name = "testShutdownJob"
static var started: Bool = false
static var finished: Bool = false
func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void> {
Self.started = true
let job = eventLoop.scheduleTask(in: .milliseconds(500)) {
Self.finished = true
}
return job.futureResult
}
}
TestJob.register()

let app = HBApplication(testing: .live)
app.logger.logLevel = .trace
app.addJobs(using: .memory, numWorkers: 1)
try app.start()
let job = TestJob()
app.jobs.queue.enqueue(job, on: app.eventLoopGroup.next())
// stall to give job chance to start running
Thread.sleep(forTimeInterval: 0.1)
app.stop()
app.wait()

XCTAssertTrue(TestJob.started)
XCTAssertTrue(TestJob.finished)
}

func testJobSerialization() throws {
struct TestJob: HBJob, Equatable {
static let name = "testJobSerialization"
let value: Int
func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void> {
return eventLoop.makeSucceededVoidFuture()
}
}
TestJob.register()
let job = TestJob(value: 2)
let queuedJob = HBJobContainer(job)
let data = try JSONEncoder().encode(queuedJob)
let queuedJob2 = try JSONDecoder().decode(HBJobContainer.self, from: data)
XCTAssertEqual(queuedJob2.job as? TestJob, job)
}

/// test job fails to decode but queue continues to process
func testFailToDecode() throws {
struct TestJob1: HBJob {
static let name = "testFailToDecode"
func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void> {
return eventLoop.makeSucceededVoidFuture()
}
}
struct TestJob2: HBJob {
static let name = "testFailToDecode"
static var value: String?
let value: String
func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void> {
Self.value = self.value
return eventLoop.makeSucceededVoidFuture()
}
}
TestJob2.register()

let app = HBApplication(testing: .live)
app.logger.logLevel = .trace
app.addJobs(using: .memory, numWorkers: 1)

try app.start()

app.jobs.queue.enqueue(TestJob1(), on: app.eventLoopGroup.next())
app.jobs.queue.enqueue(TestJob2(value: "test"), on: app.eventLoopGroup.next())

// stall to give job chance to start running
Thread.sleep(forTimeInterval: 0.1)

app.stop()
app.wait()

XCTAssertEqual(TestJob2.value, "test")
}

/// test access via `HBRequest`
func testAccessViaRequest() throws {
struct TestJob: HBJob {
static let name = "testBasic"
static let expectation = XCTestExpectation(description: "Jobs Completed")

func execute(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<Void> {
Self.expectation.fulfill()
return eventLoop.makeSucceededVoidFuture()
}
}
TestJob.register()
TestJob.expectation.expectedFulfillmentCount = 1

let app = HBApplication(testing: .live)
app.logger.logLevel = .trace
app.addJobs(using: .memory, numWorkers: 1)

app.router.get("/job") { request -> EventLoopFuture<HTTPResponseStatus> in
return request.jobs.enqueue(job: TestJob()).map { _ in .ok }
}

try app.XCTStart()
defer { app.XCTStop() }

try app.XCTExecute(uri: "/job", method: .GET) { response in
XCTAssertEqual(response.status, .ok)
}

wait(for: [TestJob.expectation], timeout: 5)
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
func testAsyncJob() throws {
#if os(macOS)
// disable macOS tests in CI. GH Actions are currently running this when they shouldn't
guard HBEnvironment().get("CI") != "true" else { throw XCTSkip() }
#endif
struct TestAsyncJob: HBAsyncJob {
static let name = "testAsyncJob"
static let expectation = XCTestExpectation(description: "Jobs Completed")
func testJobSerialization() throws {
struct TestJob: HBJob, Equatable {
static let name = "testJobSerialization"
let value: Int
func execute(logger: Logger) async throws {}
}
TestJob.register()
let job = TestJob(value: 2)
let jobInstance = HBJobInstance(job)
let data = try JSONEncoder().encode(jobInstance)
let jobInstance2 = try JSONDecoder().decode(HBJobInstance.self, from: data)
XCTAssertEqual(jobInstance2.job as? TestJob, job)
}

func execute(logger: Logger) async throws {
try await Task.sleep(nanoseconds: 1_000_000)
Self.expectation.fulfill()
}
}
/// Test job is cancelled on shutdown
func testShutdownJob() async throws {
struct TestJob: HBJob {
static let name = "testShutdownJob"
func execute(logger: Logger) async throws {
try await Task.sleep(for: .milliseconds(100))
}
}
TestJob.register()

TestAsyncJob.register()
TestAsyncJob.expectation.expectedFulfillmentCount = 3
let cancelledJobCount = ManagedAtomic(0)
var logger = Logger(label: "HummingbirdJobsTests")
logger.logLevel = .trace
let jobQueueHandler = HBJobQueueHandler(
queue: HBMemoryJobQueue { _, error in
if error is CancellationError {
cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed)
}
},
numWorkers: 4,
logger: logger
)
try await testJobQueue(jobQueueHandler) {
try await jobQueueHandler.enqueue(TestJob())
}

let app = HBApplication(testing: .live)
app.logger.logLevel = .trace
app.addJobs(using: .memory, numWorkers: 2)
XCTAssertEqual(cancelledJobCount.load(ordering: .relaxed), 1)
}

try app.start()
defer { app.stop() }
/// test job fails to decode but queue continues to process
func testFailToDecode() async throws {
struct TestJob1: HBJob {
static let name = "testFailToDecode"
func execute(logger: Logger) async throws {}
}
struct TestJob2: HBJob {
static let name = "testFailToDecode"
static var value: String?
let value: String
func execute(logger: Logger) async throws {
Self.value = self.value
}
}
TestJob2.register()

app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next())
app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next())
app.jobs.queue.enqueue(TestAsyncJob(), on: app.eventLoopGroup.next())
let jobQueueHandler = HBJobQueueHandler(
queue: HBMemoryJobQueue(),
numWorkers: 1,
logger: Logger(label: "HummingbirdJobsTests")
)
try await testJobQueue(jobQueueHandler) {
try await jobQueueHandler.enqueue(TestJob1())
try await jobQueueHandler.enqueue(TestJob2(value: "test"))
// stall to give job chance to start running
try await Task.sleep(for: .milliseconds(500))
}

wait(for: [TestAsyncJob.expectation], timeout: 5)
}
*/
XCTAssertEqual(TestJob2.value, "test")
}
}

0 comments on commit 4db2c8a

Please sign in to comment.