Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x.x - Wrap InboundStream iterator in request body #369

Merged
merged 8 commits into from
Feb 5, 2024
28 changes: 15 additions & 13 deletions Benchmarks/Benchmarks/Router/RouterBenchmarks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ extension Benchmark {
context: Context.Type = BasicBenchmarkContext.self,
configuration: Benchmark.Configuration = Benchmark.defaultConfiguration,
request: HTTPRequest,
writeBody: @escaping @Sendable (HBStreamedRequestBody) async throws -> Void = { _ in },
writeBody: @escaping @Sendable (HBStreamedRequestBody.InboundStream.TestSource) async throws -> Void = { _ in },
setupRouter: @escaping @Sendable (HBRouter<Context>) async throws -> Void
) {
let router = HBRouter(context: Context.self)
Expand All @@ -60,15 +60,17 @@ extension Benchmark {
allocator: ByteBufferAllocator(),
logger: Logger(label: "Benchmark")
)
let requestBodyStream = HBStreamedRequestBody()
let requestBody = HBRequestBody.stream(requestBodyStream)
let (inbound, source) = NIOAsyncChannelInboundStream<HTTPRequestPart>.makeTestingStream()
let streamer = HBStreamedRequestBody(iterator: inbound.makeAsyncIterator())
let requestBody = HBRequestBody.stream(streamer)
let hbRequest = HBRequest(head: request, body: requestBody)
group.addTask {
let response = try await responder.respond(to: hbRequest, context: context)
_ = try await response.body.write(BenchmarkBodyWriter())
}
try await writeBody(requestBodyStream)
requestBodyStream.finish()
try await writeBody(source)
source.yield(.end(nil))
source.finish()
}
}
}
Expand Down Expand Up @@ -100,10 +102,10 @@ func routerBenchmarks() {
configuration: .init(warmupIterations: 10),
request: .init(method: .put, scheme: "http", authority: "localhost", path: "/")
) { bodyStream in
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
} setupRouter: { router in
router.put { request, _ in
let body = try await request.body.collate(maxSize: .max)
Expand All @@ -116,10 +118,10 @@ func routerBenchmarks() {
configuration: .init(warmupIterations: 10),
request: .init(method: .post, scheme: "http", authority: "localhost", path: "/")
) { bodyStream in
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
} setupRouter: { router in
router.post { request, _ in
HBResponse(status: .ok, headers: [:], body: .init { writer in
Expand Down
92 changes: 56 additions & 36 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2023 the Hummingbird authors
// Copyright (c) 2023-2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,9 +12,13 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTPTypes

/// Request Body
///
/// Can be either a stream of ByteBuffers or a single ByteBuffer
public enum HBRequestBody: Sendable, AsyncSequence {
case byteBuffer(ByteBuffer)
case stream(HBStreamedRequestBody)
Expand All @@ -24,10 +28,11 @@

public func makeAsyncIterator() -> HBStreamedRequestBody.AsyncIterator {
switch self {
case .byteBuffer:
/// The server always creates the HBRequestBody as a stream. If it is converted
/// into a single ByteBuffer it cannot be treated as a stream after that
preconditionFailure("Cannot convert collapsed request body back into a sequence")
case .byteBuffer(let buffer):
let (stream, source) = NIOAsyncChannelInboundStream<HTTPRequestPart>.makeTestingStream()
source.yield(.body(buffer))
source.finish()
return HBStreamedRequestBody(iterator: stream.makeAsyncIterator()).makeAsyncIterator()
case .stream(let streamer):
return streamer.makeAsyncIterator()
}
Expand All @@ -45,46 +50,61 @@
}
}

/// A type that represents an HTTP request body.
public struct HBStreamedRequestBody: Sendable, AsyncSequence {
/// Request body that is a stream of ByteBuffers.
///
/// This is a unicast async sequence that allows a single iterator to be created.
public final class HBStreamedRequestBody: Sendable, AsyncSequence {
public typealias Element = ByteBuffer
public typealias InboundStream = NIOAsyncChannelInboundStream<HTTPRequestPart>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should prevent exposing this type, as we're locking ourselves in a corner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we are going to have to expose a public way to create a HBRequestBody that is initialised with a stream of data and at that time we'll have to expose the internal type to avoid having to do type conversions.


private let underlyingIterator: UnsafeTransfer<NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator>
private let alreadyIterated: NIOLockedValueBox<Bool>

/// Initialize HBStreamedRequestBody from AsyncIterator of a NIOAsyncChannelInboundStream
public init(iterator: InboundStream.AsyncIterator) {
self.underlyingIterator = .init(iterator)
self.alreadyIterated = .init(false)
}

/// Async Iterator for HBStreamedRequestBody
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = ByteBuffer

fileprivate var underlyingIterator: AsyncThrowingChannel<ByteBuffer, Error>.AsyncIterator
private var underlyingIterator: InboundStream.AsyncIterator
private var done: Bool

public mutating func next() async throws -> ByteBuffer? {
try await self.underlyingIterator.next()
init(underlyingIterator: InboundStream.AsyncIterator, done: Bool = false) {
self.underlyingIterator = underlyingIterator
self.done = done
}
}

/// HBRequestBody is internally represented by AsyncThrowingChannel
private var channel: AsyncThrowingChannel<ByteBuffer, Error>

/// Creates a new HTTP request body
@_spi(HBXCT) public init() {
self.channel = .init()
public mutating func next() async throws -> ByteBuffer? {
if self.done { return nil }
// if we are still expecting parts and the iterator finishes.
// In this case I think we can just assume we hit an .end
guard let part = try await self.underlyingIterator.next() else { return nil }
switch part {
case .body(let buffer):
return buffer
case .end:
self.done = true
return nil
default:
throw HTTPChannelError.unexpectedHTTPPart(part)

Check warning on line 93 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L93

Added line #L93 was not covered by tests
}
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(underlyingIterator: self.channel.makeAsyncIterator())
}
}

extension HBStreamedRequestBody {
/// push a single ByteBuffer to the HTTP request body stream
@_spi(HBXCT) public func send(_ buffer: ByteBuffer) async {
await self.channel.send(buffer)
}

/// pass error to HTTP request body
@_spi(HBXCT) public func fail(_ error: Error) {
self.channel.fail(error)
}

/// Finish HTTP request body stream
@_spi(HBXCT) public func finish() {
self.channel.finish()
// verify if an iterator has already been created. If it has then create an
// iterator that returns nothing. This could be a precondition failure (currently
// an assert) as you should not be allowed to do this.
let done = self.alreadyIterated.withLockedValue {
assert($0 == false, "Can only create iterator from request body once")
let done = $0
$0 = true
return done
}
return AsyncIterator(underlyingIterator: self.underlyingIterator.wrappedValue, done: done)
}
}
104 changes: 54 additions & 50 deletions Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,57 +43,61 @@
let processingRequest = ManagedAtomic(HTTPState.idle)
do {
try await withGracefulShutdownHandler {
try await withThrowingTaskGroup(of: Void.self) { group in
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()
while let part = try await iterator.next() {
// set to processing unless it is cancelled then exit
guard processingRequest.exchange(.processing, ordering: .relaxed) == .idle else { break }
guard case .head(let head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)
}
let bodyStream = HBStreamedRequestBody()
let body = HBRequestBody.stream(bodyStream)
let request = HBRequest(head: head, body: body)
// add task processing request and writing response
group.addTask {
let response: HBResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
}
do {
try await outbound.write(.head(response.head))
let tailHeaders = try await response.body.write(responseWriter)
try await outbound.write(.end(tailHeaders))
// flush request body
for try await _ in request.body {}
} catch {
// flush request body
for try await _ in request.body {}
throw error
}
if request.headers[.connection] == "close" {
throw HTTPChannelError.closeConnection
}
}
// send body parts to request
do {
// pass body part to request
while case .body(let buffer) = try await iterator.next() {
await bodyStream.send(buffer)
}
bodyStream.finish()
} catch {
// pass failed to read full http body to request
bodyStream.fail(error)
}
try await group.next()
// set to idle unless it is cancelled then exit
guard processingRequest.exchange(.idle, ordering: .relaxed) == .processing else { break }
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()

// read first part, verify it is a head
guard let part = try await iterator.next() else { return }
guard case .head(var head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)

Check warning on line 53 in Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift#L53

Added line #L53 was not covered by tests
}

while true {
// set to processing unless it is cancelled then exit
guard processingRequest.exchange(.processing, ordering: .relaxed) == .idle else { break }

let bodyStream = HBStreamedRequestBody(iterator: iterator)
let request = HBRequest(head: head, body: .stream(bodyStream))
let response: HBResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
}
do {
try await outbound.write(.head(response.head))
let tailHeaders = try await response.body.write(responseWriter)
try await outbound.write(.end(tailHeaders))
} catch {
throw error
}
if request.headers[.connection] == "close" {
throw HTTPChannelError.closeConnection
}
// set to idle unless it is cancelled then exit
guard processingRequest.exchange(.idle, ordering: .relaxed) == .processing else { break }

// Flush current request
// read until we don't have a body part
var part: HTTPRequestPart?
while true {
part = try await iterator.next()
guard case .body = part else { break }
}
// if we have an end then read the next part
if case .end = part {
part = try await iterator.next()
}

// if part is nil break out of loop
guard let part else {
break
}

// part should be a head, if not throw error
guard case .head(let newHead) = part else { throw HTTPChannelError.unexpectedHTTPPart(part) }
head = newHead
}
}
} onGracefulShutdown: {
Expand Down
62 changes: 62 additions & 0 deletions Sources/HummingbirdCore/Utils/UnsafeTransfer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler.
/// It can be used similar to `@unsafe Sendable` but for values instead of types.
@usableFromInline
struct UnsafeTransfer<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped

@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}

extension UnsafeTransfer: @unchecked Sendable {}

extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}

/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@usableFromInline
final class UnsafeMutableTransferBox<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped

@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}

Check warning on line 59 in Sources/HummingbirdCore/Utils/UnsafeTransfer.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Utils/UnsafeTransfer.swift#L57-L59

Added lines #L57 - L59 were not covered by tests
}

extension UnsafeMutableTransferBox: @unchecked Sendable {}
Loading
Loading