Skip to content

Commit

Permalink
HTTP2 improvements (#601)
Browse files Browse the repository at this point in the history
* Use configureHTTP2AsyncSecureUpgrade

* Add HTTP2ServerConnectionManager that records streams being added/removed

* HTTP2 connection state machine

* Move HTTP2 tests to own target

* Handle closing connection

* remove immediate close in triggerGracefulShutdown

* Fixed HTTP2 channel shutdown

* Add HTTP2StreamChannel to handle http2 stream setup

* graceful shutdown timeout

* Re-order code

* Add maxAge timeout, handleInputClosed

* Updated comments to answer some PR comments

* Set closed state on inputClosed

* enhanceYourCalm

* Update function header comments

* Fix breaking changes

* Remove HTTPChannelHandler conformance from HTTP2StreamChannel

* minor comment change
  • Loading branch information
adam-fowler authored Nov 6, 2024
1 parent 61a7d64 commit 49e4a08
Show file tree
Hide file tree
Showing 16 changed files with 1,469 additions and 115 deletions.
11 changes: 10 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,22 @@ let package = Package(
dependencies:
[
.byName(name: "HummingbirdCore"),
.byName(name: "HummingbirdHTTP2"),
.byName(name: "HummingbirdTLS"),
.byName(name: "HummingbirdTesting"),
.product(name: "AsyncHTTPClient", package: "async-http-client"),
],
resources: [.process("Certificates")]
),
.testTarget(
name: "HummingbirdHTTP2Tests",
dependencies:
[
.byName(name: "HummingbirdCore"),
.byName(name: "HummingbirdHTTP2"),
.byName(name: "HummingbirdTesting"),
.product(name: "AsyncHTTPClient", package: "async-http-client"),
]
),
],
swiftLanguageVersions: [.v5, .version("6")]
)
15 changes: 6 additions & 9 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,9 @@ extension RequestBody {
/// Request body that is a stream of ByteBuffers sourced from a NIOAsyncChannelInboundStream.
///
/// This is a unicast async sequence that allows a single iterator to be created.
@usableFromInline
final class NIOAsyncChannelRequestBody: Sendable, AsyncSequence {
@usableFromInline
typealias Element = ByteBuffer
@usableFromInline
typealias InboundStream = NIOAsyncChannelInboundStream<HTTPRequestPart>
public final class NIOAsyncChannelRequestBody: Sendable, AsyncSequence {
public typealias Element = ByteBuffer
public typealias InboundStream = NIOAsyncChannelInboundStream<HTTPRequestPart>

@usableFromInline
internal let underlyingIterator: UnsafeTransfer<NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator>
Expand All @@ -209,7 +206,7 @@ final class NIOAsyncChannelRequestBody: Sendable, AsyncSequence {

/// Initialize NIOAsyncChannelRequestBody from AsyncIterator of a NIOAsyncChannelInboundStream
@inlinable
init(iterator: InboundStream.AsyncIterator) {
public init(iterator: InboundStream.AsyncIterator) {
self.underlyingIterator = .init(iterator)
self.alreadyIterated = .init(false)
}
Expand All @@ -228,7 +225,7 @@ final class NIOAsyncChannelRequestBody: Sendable, AsyncSequence {
}

@inlinable
mutating func next() async throws -> ByteBuffer? {
public mutating func next() async throws -> ByteBuffer? {
if self.done { return nil }
// if we are still expecting parts and the iterator finishes.
// In this case I think we can just assume we hit an .end
Expand All @@ -246,7 +243,7 @@ final class NIOAsyncChannelRequestBody: Sendable, AsyncSequence {
}

@inlinable
func makeAsyncIterator() -> AsyncIterator {
public func makeAsyncIterator() -> AsyncIterator {
// verify if an iterator has already been created. If it has then create an
// iterator that returns nothing. This could be a precondition failure (currently
// an assert) as you should not be allowed to do this.
Expand Down
4 changes: 4 additions & 0 deletions Sources/HummingbirdCore/Response/ResponseWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public struct ResponseWriter: ~Copyable {
@usableFromInline
let outbound: NIOAsyncChannelOutboundWriter<HTTPResponsePart>

public init(outbound: NIOAsyncChannelOutboundWriter<HTTPResponsePart>) {
self.outbound = outbound
}

/// Write HTTP head part and return ``ResponseBodyWriter`` to write response body
///
/// - Parameter head: Response head
Expand Down
3 changes: 2 additions & 1 deletion Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler {

/// HTTP1Channel configuration
public struct Configuration: Sendable {
/// Additional channel handlers to add to channel after HTTP part decoding and before HTTP request processing
/// Additional channel handlers to add to channel pipeline after HTTP part decoding and before HTTP request handling
public var additionalChannelHandlers: @Sendable () -> [any RemovableChannelHandler]
/// Time before closing an idle channel
public var idleTimeout: TimeAmount?
Expand Down Expand Up @@ -98,6 +98,7 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler {
/// - Parameters:
/// - asyncChannel: NIOAsyncChannel handling HTTP parts
/// - logger: Logger to use while processing messages
@inlinable
public func handle(
value asyncChannel: NIOCore.NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
logger: Logging.Logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public protocol HTTPChannelHandler: ServerChildChannel {
/// Internal error thrown when an unexpected HTTP part is received eg we didn't receive
/// a head part when we expected one
@usableFromInline
enum HTTPChannelError: Error {
package enum HTTPChannelError: Error {
case unexpectedHTTPPart(HTTPRequestPart)
}

Expand Down
158 changes: 121 additions & 37 deletions Sources/HummingbirdHTTP2/HTTP2Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2023 the Hummingbird authors
// Copyright (c) 2023-2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -16,44 +16,65 @@ import HTTPTypes
import HummingbirdCore
import Logging
import NIOCore
import NIOHTTP1
import NIOHTTP2
import NIOHTTPTypes
import NIOHTTPTypesHTTP1
import NIOHTTPTypesHTTP2
import NIOPosix
import NIOSSL
import NIOTLS

/// Child channel for processing HTTP1 with the option of upgrading to HTTP2
public struct HTTP2UpgradeChannel: HTTPChannelHandler {
typealias HTTP1ConnectionOutput = HTTP1Channel.Value
typealias HTTP2ConnectionOutput = NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP2StreamChannel.Value>
public struct Value: ServerChildChannelValue {
let negotiatedHTTPVersion: EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Channel.Value, (NIOAsyncChannel<HTTP2Frame, HTTP2Frame>, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP1Channel.Value>)>>
let negotiatedHTTPVersion: EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1ConnectionOutput, HTTP2ConnectionOutput>>
public let channel: Channel
}

/// HTTP2 Upgrade configuration
public struct Configuration: Sendable {
/// Idle timeout, how long connection is kept idle before closing
public var idleTimeout: Duration?
/// Maximum amount of time to wait for client response before all streams are closed after second GOAWAY has been sent
public var gracefulCloseTimeout: Duration?
/// Maximum amount of time a connection can be open
public var maxAgeTimeout: Duration?
/// Configuration applied to HTTP2 stream channels
public var streamConfiguration: HTTP1Channel.Configuration

/// Initialize HTTP2UpgradeChannel.Configuration
/// - Parameters:
/// - additionalChannelHandlers: Additional channel handlers to add to HTTP2 connection channel
/// - streamConfiguration: Configuration applied to HTTP2 stream channels
/// - idleTimeout: How long connection is kept idle before closing
/// - maxGraceCloseTimeout: Maximum amount of time to wait for client response before all streams are closed after second GOAWAY
/// - streamConfiguration: Configuration applieds to HTTP2 stream channels
public init(
idleTimeout: Duration? = nil,
gracefulCloseTimeout: Duration? = nil,
maxAgeTimeout: Duration? = nil,
streamConfiguration: HTTP1Channel.Configuration = .init()
) {
self.idleTimeout = idleTimeout
self.gracefulCloseTimeout = gracefulCloseTimeout
self.streamConfiguration = streamConfiguration
}
}

private let sslContext: NIOSSLContext
private let http1: HTTP1Channel
public var responder: HTTPChannelHandler.Responder { self.http1.responder }
private let http2Stream: HTTP2StreamChannel
public let configuration: Configuration
public var responder: Responder {
self.http2Stream.responder
}

/// Initialize HTTP2Channel
/// - Parameters:
/// - tlsConfiguration: TLS configuration
/// - additionalChannelHandlers: Additional channel handlers to add to channel pipeline
/// - additionalChannelHandlers: Additional channel handlers to add to stream channel pipeline after HTTP part decoding and
/// before HTTP request handling
/// - responder: Function returning a HTTP response for a HTTP request
@available(*, deprecated, renamed: "HTTP1Channel(tlsConfiguration:configuration:responder:)")
public init(
Expand All @@ -64,13 +85,21 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
var tlsConfiguration = tlsConfiguration
tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols
self.sslContext = try NIOSSLContext(configuration: tlsConfiguration)
self.http1 = HTTP1Channel(responder: responder, configuration: .init(additionalChannelHandlers: additionalChannelHandlers()))
self.configuration = .init()
self.http1 = HTTP1Channel(
responder: responder,
configuration: .init(additionalChannelHandlers: additionalChannelHandlers())
)
self.http2Stream = HTTP2StreamChannel(
responder: responder,
configuration: .init(additionalChannelHandlers: additionalChannelHandlers())
)
}

/// Initialize HTTP2Channel
/// - Parameters:
/// - tlsConfiguration: TLS configuration
/// - additionalChannelHandlers: Additional channel handlers to add to channel pipeline
/// - configuration: HTTP2 channel configuration
/// - responder: Function returning a HTTP response for a HTTP request
public init(
tlsConfiguration: TLSConfiguration,
Expand All @@ -80,7 +109,9 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
var tlsConfiguration = tlsConfiguration
tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols
self.sslContext = try NIOSSLContext(configuration: tlsConfiguration)
self.configuration = configuration
self.http1 = HTTP1Channel(responder: responder, configuration: configuration.streamConfiguration)
self.http2Stream = HTTP2StreamChannel(responder: responder, configuration: configuration.streamConfiguration)
}

/// Setup child channel for HTTP1 with HTTP2 upgrade
Expand All @@ -95,31 +126,28 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
return channel.eventLoop.makeFailedFuture(error)
}

return channel.configureAsyncHTTPServerPipeline { http1Channel -> EventLoopFuture<HTTP1Channel.Value> in
return http1Channel.eventLoop.makeCompletedFuture {
try http1Channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: true))
try http1Channel.pipeline.syncOperations.addHandlers(self.http1.configuration.additionalChannelHandlers())
if let idleTimeout = self.http1.configuration.idleTimeout {
try http1Channel.pipeline.syncOperations.addHandler(IdleStateHandler(readTimeout: idleTimeout))
}
try http1Channel.pipeline.syncOperations.addHandler(HTTPUserEventHandler(logger: logger))
return try HTTP1Channel.Value(wrappingChannelSynchronously: http1Channel)
}
} http2ConnectionInitializer: { http2Channel -> EventLoopFuture<NIOAsyncChannel<HTTP2Frame, HTTP2Frame>> in
http2Channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(wrappingChannelSynchronously: http2Channel)
}
} http2StreamInitializer: { http2ChildChannel -> EventLoopFuture<HTTP1Channel.Value> in
return http2ChildChannel.eventLoop.makeCompletedFuture {
try http2ChildChannel.pipeline.syncOperations.addHandler(HTTP2FramePayloadToHTTPServerCodec())
try http2ChildChannel.pipeline.syncOperations.addHandlers(self.http1.configuration.additionalChannelHandlers())
if let idleTimeout = self.http1.configuration.idleTimeout {
try http2ChildChannel.pipeline.syncOperations.addHandler(IdleStateHandler(readTimeout: idleTimeout))
return channel.configureHTTP2AsyncSecureUpgrade { channel in
self.http1.setup(channel: channel, logger: logger)
} http2ConnectionInitializer: { channel in
channel.eventLoop.makeCompletedFuture {
let connectionManager = HTTP2ServerConnectionManager(
eventLoop: channel.eventLoop,
idleTimeout: self.configuration.idleTimeout,
maxAgeTimeout: self.configuration.maxAgeTimeout,
gracefulCloseTimeout: self.configuration.gracefulCloseTimeout
)
let handler: HTTP2ConnectionOutput = try channel.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: .server,
streamDelegate: connectionManager.streamDelegate,
configuration: .init()
) { http2ChildChannel in
self.http2Stream.setup(channel: http2ChildChannel, logger: logger)
}
try http2ChildChannel.pipeline.syncOperations.addHandler(HTTPUserEventHandler(logger: logger))
return try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
try channel.pipeline.syncOperations.addHandler(connectionManager)
return handler
}
}.map {
}
.map {
.init(negotiatedHTTPVersion: $0, channel: channel)
}
}
Expand All @@ -133,24 +161,80 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
let channel = try await value.negotiatedHTTPVersion.get()
switch channel {
case .http1_1(let http1):
await handleHTTP(asyncChannel: http1, logger: logger)
case .http2((let http2, let multiplexer)):
await self.http1.handle(value: http1, logger: logger)
case .http2(let multiplexer):
do {
try await withThrowingDiscardingTaskGroup { group in
for try await client in multiplexer.inbound.cancelOnGracefulShutdown() {
for try await client in multiplexer.inbound {
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
await self.http2Stream.handle(value: client, logger: logger)
}
}
}
} catch {
logger.error("Error handling inbound connection for HTTP2 handler: \(error)")
}
// have to run this to ensure http2 channel outbound writer is closed
try await http2.executeThenClose { _, _ in }
}
} catch {
logger.error("Error getting HTTP2 upgrade negotiated value: \(error)")
}
}
}

// Code taken from NIOHTTP2
extension Channel {
/// Configures a channel to perform an HTTP/2 secure upgrade with typed negotiation results.
///
/// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to
/// negotiate the inner protocol as part of the TLS handshake. For this reason, until the TLS
/// handshake is complete, the ultimate configuration of the channel pipeline cannot be known.
///
/// This function configures the channel with a pair of callbacks that will handle the result
/// of the negotiation. It explicitly **does not** configure a TLS handler to actually attempt
/// to negotiate ALPN. The supported ALPN protocols are provided in
/// `NIOHTTP2SupportedALPNProtocols`: please ensure that the TLS handler you are using for your
/// pipeline is appropriately configured to perform this protocol negotiation.
///
/// If negotiation results in an unexpected protocol, the pipeline will close the connection
/// and no callback will fire.
///
/// This configuration is acceptable for use on both client and server channel pipelines.
///
/// - Parameters:
/// - http1ConnectionInitializer: A callback that will be invoked if HTTP/1.1 has been explicitly
/// negotiated, or if no protocol was negotiated. Must return a future that completes when the
/// channel has been fully mutated.
/// - http2ConnectionInitializer: A callback that will be invoked if HTTP/2 has been negotiated, and that
/// should configure the channel for HTTP/2 use. Must return a future that completes when the
/// channel has been fully mutated.
/// - Returns: An `EventLoopFuture` of an `EventLoopFuture` containing the `NIOProtocolNegotiationResult` that completes when the channel
/// is ready to negotiate.
@inlinable
internal func configureHTTP2AsyncSecureUpgrade<HTTP1Output: Sendable, HTTP2Output: Sendable>(
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1Output>,
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2Output>
) -> EventLoopFuture<EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>>> {
let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>>() { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return http2ConnectionInitializer(self).map { http2Output in .http2(http2Output) }
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1ConnectionInitializer(self).map { http1Output in .http1_1(http1Output) }
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
}
}

return self.pipeline
.addHandler(alpnHandler)
.flatMap { _ in
self.pipeline.handler(type: NIOTypedApplicationProtocolNegotiationHandler<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>>.self).map { alpnHandler in
alpnHandler.protocolNegotiationResult
}
}
}
}
Loading

0 comments on commit 49e4a08

Please sign in to comment.