Skip to content

Commit

Permalink
Make RequestBody.Source Sendable (#628)
Browse files Browse the repository at this point in the history
* Make RequestBody.Source Sendable

* final class, no longer breaking
  • Loading branch information
adam-fowler authored Dec 7, 2024
1 parent 08bc8f6 commit 96a3529
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
12 changes: 6 additions & 6 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extension RequestBody {

/// Delegate for NIOThrowingAsyncSequenceProducer
@usableFromInline
final class Delegate: NIOAsyncSequenceProducerDelegate {
final class Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
let checkedContinuations: NIOLockedValueBox<Deque<CheckedContinuation<Void, Never>>>

@usableFromInline
Expand Down Expand Up @@ -162,13 +162,13 @@ extension RequestBody {
}

/// A source used for driving a ``RequestBody`` stream.
public final class Source {
public final class Source: Sendable {
@usableFromInline
let source: Producer.Source
@usableFromInline
let delegate: Delegate
@usableFromInline
var waitForProduceMore: Bool
let waitForProduceMore: NIOLockedValueBox<Bool>

@usableFromInline
init(source: Producer.Source, delegate: Delegate) {
Expand All @@ -187,13 +187,13 @@ extension RequestBody {
public func yield(_ element: ByteBuffer) async throws {
// if previous call indicated we should stop producing wait until the delegate
// says we can start producing again
if self.waitForProduceMore {
if self.waitForProduceMore.withLockedValue({ $0 }) {
await self.delegate.waitForProduceMore()
self.waitForProduceMore = false
self.waitForProduceMore.withLockedValue { $0 = false }
}
let result = self.source.yield(element)
if result == .stopProducing {
self.waitForProduceMore = true
self.waitForProduceMore.withLockedValue { $0 = true }
}
}

Expand Down
33 changes: 33 additions & 0 deletions Tests/HummingbirdTests/ApplicationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,39 @@ final class ApplicationTests: XCTestCase {
XCTAssertEqual(format.error.message, message)
}
}

/// Test AsyncSequence returned by RequestBody.makeStream()
func testMakeStream() async throws {
let router = Router()
router.post("streaming") { request, context -> Response in
let body = try await withThrowingTaskGroup(of: Void.self) { group in
let (requestBody, source) = RequestBody.makeStream()
group.addTask {
for try await buffer in request.body {
try await source.yield(buffer)
}
source.finish()
}
var body = ByteBuffer()
for try await buffer in requestBody {
var buffer = buffer
body.writeBuffer(&buffer)
}
return body
}
return Response(status: .ok, body: .init(byteBuffer: body))
}
let app = Application(responder: router.buildResponder())

try await app.test(.router) { client in

let buffer = Self.randomBuffer(size: 640_001)
try await client.execute(uri: "/streaming", method: .post, body: buffer) { response in
XCTAssertEqual(response.status, .ok)
XCTAssertEqual(response.body, buffer)
}
}
}
}

/// HTTPField used during tests
Expand Down

0 comments on commit 96a3529

Please sign in to comment.