Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RequestBody.Source Sendable #628

Merged
merged 3 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extension RequestBody {

/// Delegate for NIOThrowingAsyncSequenceProducer
@usableFromInline
final class Delegate: NIOAsyncSequenceProducerDelegate {
final class Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
let checkedContinuations: NIOLockedValueBox<Deque<CheckedContinuation<Void, Never>>>

@usableFromInline
Expand Down Expand Up @@ -162,13 +162,13 @@ extension RequestBody {
}

/// A source used for driving a ``RequestBody`` stream.
public final class Source {
public final class Source: Sendable {
@usableFromInline
let source: Producer.Source
@usableFromInline
let delegate: Delegate
@usableFromInline
var waitForProduceMore: Bool
let waitForProduceMore: NIOLockedValueBox<Bool>

@usableFromInline
init(source: Producer.Source, delegate: Delegate) {
Expand All @@ -187,13 +187,13 @@ extension RequestBody {
public func yield(_ element: ByteBuffer) async throws {
// if previous call indicated we should stop producing wait until the delegate
// says we can start producing again
if self.waitForProduceMore {
if self.waitForProduceMore.withLockedValue({ $0 }) {
await self.delegate.waitForProduceMore()
self.waitForProduceMore = false
self.waitForProduceMore.withLockedValue { $0 = false }
}
let result = self.source.yield(element)
if result == .stopProducing {
self.waitForProduceMore = true
self.waitForProduceMore.withLockedValue { $0 = true }
}
}

Expand Down
33 changes: 33 additions & 0 deletions Tests/HummingbirdTests/ApplicationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,39 @@ final class ApplicationTests: XCTestCase {
XCTAssertEqual(format.error.message, message)
}
}

/// Test AsyncSequence returned by RequestBody.makeStream()
func testMakeStream() async throws {
let router = Router()
router.post("streaming") { request, context -> Response in
let body = try await withThrowingTaskGroup(of: Void.self) { group in
let (requestBody, source) = RequestBody.makeStream()
group.addTask {
for try await buffer in request.body {
try await source.yield(buffer)
}
source.finish()
}
var body = ByteBuffer()
for try await buffer in requestBody {
var buffer = buffer
body.writeBuffer(&buffer)
}
return body
}
return Response(status: .ok, body: .init(byteBuffer: body))
}
let app = Application(responder: router.buildResponder())

try await app.test(.router) { client in

let buffer = Self.randomBuffer(size: 640_001)
try await client.execute(uri: "/streaming", method: .post, body: buffer) { response in
XCTAssertEqual(response.status, .ok)
XCTAssertEqual(response.body, buffer)
}
}
}
}

/// HTTPField used during tests
Expand Down
Loading