Skip to content

Commit

Permalink
Merge multiplexer.inbound AsyncSequence with timer sequence to implem…
Browse files Browse the repository at this point in the history
…ent timeouts

Also include gracefulShutdown sequence to implement gracefulShutdown
  • Loading branch information
adam-fowler committed Feb 2, 2024
1 parent f8d82c1 commit b4c692d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 21 deletions.
89 changes: 68 additions & 21 deletions Sources/HummingbirdHTTP2/HTTP2Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import HTTPTypes
import HummingbirdCore
import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTP2
import NIOHTTPTypes
import NIOHTTPTypesHTTP1
import NIOHTTPTypesHTTP2
import NIOPosix
import NIOSSL
import ServiceLifecycle

/// Child channel for processing HTTP1 with the option of upgrading to HTTP2
public struct HTTP2UpgradeChannel: HTTPChannelHandler {
public typealias Value = EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Channel.Value, (NIOAsyncChannel<HTTP2Frame, HTTP2Frame>, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP1Channel.Value>)>>

private let sslContext: NIOSSLContext
private let http1: HTTP1Channel
private let idleTimeout: Duration
private let additionalChannelHandlers: @Sendable () -> [any RemovableChannelHandler]
public var responder: @Sendable (HBRequest, Channel) async throws -> HBResponse { http1.responder }

Expand All @@ -39,12 +43,14 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
/// - responder: Function returning a HTTP response for a HTTP request
public init(
tlsConfiguration: TLSConfiguration,
idleTimeout: Duration = .seconds(30),
additionalChannelHandlers: @escaping @Sendable () -> [any RemovableChannelHandler] = { [] },
responder: @escaping @Sendable (HBRequest, Channel) async throws -> HBResponse = { _, _ in throw HBHTTPError(.notImplemented) }
) throws {
var tlsConfiguration = tlsConfiguration
tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols
self.sslContext = try NIOSSLContext(configuration: tlsConfiguration)
self.idleTimeout = idleTimeout
self.additionalChannelHandlers = additionalChannelHandlers
self.http1 = HTTP1Channel(responder: responder, additionalChannelHandlers: additionalChannelHandlers)
}
Expand Down Expand Up @@ -82,15 +88,11 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
self.additionalChannelHandlers() + [
HBHTTPUserEventHandler(logger: logger),
]

return http2ChildChannel
.pipeline
.addHandler(HTTP2FramePayloadToHTTPServerCodec())
.flatMap {
http2ChildChannel.pipeline.addHandlers(childChannelHandlers)
}.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
return http2ChildChannel.eventLoop.makeCompletedFuture {
try http2ChildChannel.pipeline.syncOperations.addHandler(HTTP2FramePayloadToHTTPServerCodec())
try http2ChildChannel.pipeline.syncOperations.addHandlers(childChannelHandlers)
return try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
}
}

Expand All @@ -99,29 +101,74 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
/// - value: Object to process input/output on child channel
/// - logger: Logger to use while processing messages
public func handle(value: Value, logger: Logger) async {
struct HTTP2StreamState {
var numberOfStreams: Int = 0
var lastClose: ContinuousClock.Instant = .now
}
do {
let channel = try await value.get()
switch channel {
case .http1_1(let http1):
await handleHTTP(asyncChannel: http1, logger: logger)
case .http2((let http2, let multiplexer)):
try await withThrowingDiscardingTaskGroup { group in
for try await client in multiplexer.inbound.cancelOnGracefulShutdown() {
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
enum MergeResult {
case gracefulShutdown
case timer
case stream(HTTP1Channel.Value)
}
let (gracefulShutdownSequence, gracefulShutdownSource) = AsyncStream<Void>.makeStream()
let timerSequence = AsyncTimerSequence(interval: .seconds(1), clock: .continuous)
let mergedSequence = merge(
multiplexer.inbound.map { MergeResult.stream($0) },
timerSequence.map { _ in .timer },
gracefulShutdownSequence.map { .gracefulShutdown }
)
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
let streamState = NIOLockedValueBox(HTTP2StreamState())
loop:
for try await element in mergedSequence
{
switch element {
case .stream(let client):
streamState.withLockedValue {
$0.numberOfStreams += 1
}
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
streamState.withLockedValue {
$0.numberOfStreams -= 1
$0.lastClose = .now
}
}
case .timer:
let state = streamState.withLockedValue { $0 }
if state.numberOfStreams == 0, state.lastClose + self.idleTimeout < .now {
break loop
}
case .gracefulShutdown:
break loop
}
}
}
} onGracefulShutdown: {
gracefulShutdownSource.yield()
}
} catch {
logger.error("Error handling inbound connection for HTTP2 handler: \(error)")
}
// have to run this to ensure http2 channel is closed. If we ended reading from the
// multiplexer because of graceful shutdown or because an error was thrown then the
// channel will still be open
do {
try await http2.channel.close()
} catch let error as ChannelError where error == .alreadyClosed {
// can ignore already closed errors
}

// Close the `http2` NIOAsyncCannel here. Closing it here ensures we retain the `http2` instance,
// preventing it from being `deinit`-ed.
// Not having this will cause HTTP2 connections to close shortly after the first request
// is handled. When NIOAsyncChannel `deinit`s, it closes the channel. So this ensures
// that closing the HTTP2 channel happens when we need it to.
try await http2.channel.close()
}
} catch {
logger.error("Error handling inbound connection for HTTP2 handler: \(error)")
logger.error("Error getting HTTP2 upgrade negotiated value: \(error)")
}
}
}
2 changes: 2 additions & 0 deletions Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ extension HBHTTPChannelBuilder {
/// - Returns: HTTPChannelHandler builder
public static func http2Upgrade(
tlsConfiguration: TLSConfiguration,
idleTimeout: Duration = .seconds(30),
additionalChannelHandlers: @autoclosure @escaping @Sendable () -> [any RemovableChannelHandler] = []
) throws -> HBHTTPChannelBuilder<HTTP2UpgradeChannel> {
return .init { responder in
return try HTTP2UpgradeChannel(
tlsConfiguration: tlsConfiguration,
idleTimeout: idleTimeout,
additionalChannelHandlers: additionalChannelHandlers,
responder: responder
)
Expand Down
29 changes: 29 additions & 0 deletions Tests/HummingbirdCoreTests/HTTP2Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,33 @@ class HummingBirdHTTP2Tests: XCTestCase {
XCTAssertEqual(response.status, .ok)
}
}

// test timeout doesn't kill long running task
func testTimeout() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
try await testServer(
responder: { _, _ in
try await Task.sleep(for: .seconds(2))
return .init(status: .ok, body: .init(byteBuffer: .init(string: "Hello")))
},
httpChannelSetup: .http2Upgrade(tlsConfiguration: getServerTLSConfiguration(), idleTimeout: .seconds(0.5)),
configuration: .init(address: .hostname(port: 0), serverName: testServerName),
eventLoopGroup: eventLoopGroup,
logger: Logger(label: "HB")
) { _, port in
var tlsConfiguration = try getClientTLSConfiguration()
// no way to override the SSL server name with AsyncHTTPClient so need to set
// hostname verification off
tlsConfiguration.certificateVerification = .noHostnameVerification
let httpClient = HTTPClient(
eventLoopGroupProvider: .shared(eventLoopGroup),
configuration: .init(tlsConfiguration: tlsConfiguration)
)
defer { try? httpClient.syncShutdown() }

let response = try await httpClient.get(url: "https://localhost:\(port)/").get()
XCTAssertEqual(response.status, .ok)
}
}
}

0 comments on commit b4c692d

Please sign in to comment.