From 895af49c0245e17f8457ca3105b711419c1e0167 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 23 Nov 2023 14:49:23 +0000 Subject: [PATCH] Remove quiesce helper, replace with withGracefulShutdown in HTTP handler (#278) --- .swiftformat | 2 +- .../Server/HTTP/HTTPChannelHandler.swift | 105 +++++++++++------- .../Server/HTTPUserEventHandler.swift | 12 -- Sources/HummingbirdCore/Server/Server.swift | 57 +++------- Sources/HummingbirdHTTP2/HTTP2Channel.swift | 2 +- Sources/PerformanceTest/main.swift | 7 ++ Tests/HummingbirdCoreTests/CoreTests.swift | 49 ++++++++ Tests/HummingbirdCoreTests/TestUtils.swift | 23 +++- 8 files changed, 156 insertions(+), 101 deletions(-) diff --git a/.swiftformat b/.swiftformat index 3a7c14dad..9f3690883 100644 --- a/.swiftformat +++ b/.swiftformat @@ -2,7 +2,7 @@ --minversion 0.47.4 # Swift version ---swiftversion 5.3 +--swiftversion 5.9 # file options --exclude .build diff --git a/Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift b/Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift index b4d858f91..5b08dc6a8 100644 --- a/Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift +++ b/Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift @@ -12,9 +12,11 @@ // //===----------------------------------------------------------------------===// +import Atomics import Logging import NIOCore import NIOHTTP1 +import ServiceLifecycle /// Protocol for HTTP channels public protocol HTTPChannelHandler: HBChannelSetup { @@ -28,58 +30,77 @@ enum HTTPChannelError: Error { case closeConnection } +enum HTTPState: Int, AtomicValue { + case idle + case processing + case cancelled +} + extension HTTPChannelHandler { public func handleHTTP(asyncChannel: NIOAsyncChannel, logger: Logger) async { + let processingRequest = ManagedAtomic(HTTPState.idle) do { - try await withThrowingTaskGroup(of: Void.self) { group in - try await asyncChannel.executeThenClose { inbound, outbound in - let responseWriter = HBHTTPServerBodyWriter(outbound: outbound) - var iterator = inbound.makeAsyncIterator() - while let part = try await iterator.next() { - guard case .head(let head) = part else { - throw HTTPChannelError.unexpectedHTTPPart(part) - } - let bodyStream = HBStreamedRequestBody() - let body = HBRequestBody.stream(bodyStream) - let request = HBHTTPRequest(head: head, body: body) - // add task processing request and writing response - group.addTask { - let response: HBHTTPResponse - do { - response = try await self.responder(request, asyncChannel.channel) - } catch { - response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator) + try await withGracefulShutdownHandler { + try await withThrowingTaskGroup(of: Void.self) { group in + try await asyncChannel.executeThenClose { inbound, outbound in + let responseWriter = HBHTTPServerBodyWriter(outbound: outbound) + var iterator = inbound.makeAsyncIterator() + while let part = try await iterator.next() { + // set to processing unless it is cancelled then exit + guard processingRequest.exchange(.processing, ordering: .relaxed) == .idle else { break } + guard case .head(let head) = part else { + throw HTTPChannelError.unexpectedHTTPPart(part) } - let head = HTTPResponseHead(version: request.head.version, status: response.status, headers: response.headers) + let bodyStream = HBStreamedRequestBody() + let body = HBRequestBody.stream(bodyStream) + let request = HBHTTPRequest(head: head, body: body) + // add task processing request and writing response + group.addTask { + let response: HBHTTPResponse + do { + response = try await self.responder(request, asyncChannel.channel) + } catch { + response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator) + } + let head = HTTPResponseHead(version: request.head.version, status: response.status, headers: response.headers) + do { + try await outbound.write(.head(head)) + try await response.body.write(responseWriter) + try await outbound.write(.end(nil)) + // flush request body + for try await _ in request.body {} + } catch { + // flush request body + for try await _ in request.body {} + throw error + } + if request.head.headers["connection"].first == "close" { + throw HTTPChannelError.closeConnection + } + } + // send body parts to request do { - try await outbound.write(.head(head)) - try await response.body.write(responseWriter) - try await outbound.write(.end(nil)) - // flush request body - for try await _ in request.body {} + // pass body part to request + while case .body(let buffer) = try await iterator.next() { + await bodyStream.send(buffer) + } + bodyStream.finish() } catch { - // flush request body - for try await _ in request.body {} - throw error - } - if request.head.headers["connection"].first == "close" { - throw HTTPChannelError.closeConnection + // pass failed to read full http body to request + bodyStream.fail(error) } + try await group.next() + // set to idle unless it is cancelled then exit + guard processingRequest.exchange(.idle, ordering: .relaxed) == .processing else { break } } - // send body parts to request - do { - // pass body part to request - while case .body(let buffer) = try await iterator.next() { - await bodyStream.send(buffer) - } - bodyStream.finish() - } catch { - // pass failed to read full http body to request - bodyStream.fail(error) - } - try await group.next() } } + } onGracefulShutdown: { + // set to cancelled + if processingRequest.exchange(.cancelled, ordering: .relaxed) == .idle { + // only close the channel input if it is idle + asyncChannel.channel.close(mode: .input, promise: nil) + } } } catch HTTPChannelError.closeConnection { // channel is being closed because we received a connection: close header diff --git a/Sources/HummingbirdCore/Server/HTTPUserEventHandler.swift b/Sources/HummingbirdCore/Server/HTTPUserEventHandler.swift index e68b03aa7..d13d313d5 100644 --- a/Sources/HummingbirdCore/Server/HTTPUserEventHandler.swift +++ b/Sources/HummingbirdCore/Server/HTTPUserEventHandler.swift @@ -61,18 +61,6 @@ public class HBHTTPUserEventHandler: ChannelDuplexHandler, RemovableChannelHandl public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { switch event { - case is ChannelShouldQuiesceEvent: - // we received a quiesce event. If we have any requests in progress we should - // wait for them to finish. - // - // If we are running with the HTTP pipeline assistance handler then we will never - // receive quiesce events but in the case where we aren't this is needed - if self.requestsInProgress > 0 { - self.closeAfterResponseWritten = true - } else { - context.close(promise: nil) - } - case IdleStateHandler.IdleStateEvent.read: // if we get an idle read event and we haven't completed reading the request // close the connection diff --git a/Sources/HummingbirdCore/Server/Server.swift b/Sources/HummingbirdCore/Server/Server.swift index 503d0ea9c..5c2ecccc4 100644 --- a/Sources/HummingbirdCore/Server/Server.swift +++ b/Sources/HummingbirdCore/Server/Server.swift @@ -34,10 +34,7 @@ public actor HBServer: Service { onServerRunning: (@Sendable (Channel) async -> Void)? ) case starting - case running( - asyncChannel: AsyncServerChannel, - quiescingHelper: ServerQuiescingHelper - ) + case running(asyncChannel: AsyncServerChannel) case shuttingDown(shutdownPromise: EventLoopPromise) case shutdown @@ -97,7 +94,7 @@ public actor HBServer: Service { self.state = .starting do { - let (asyncChannel, quiescingHelper) = try await self.makeServer( + let asyncChannel = try await self.makeServer( childChannelSetup: childChannelSetup, configuration: configuration ) @@ -108,7 +105,7 @@ public actor HBServer: Service { fatalError("We should only be running once") case .starting: - self.state = .running(asyncChannel: asyncChannel, quiescingHelper: quiescingHelper) + self.state = .running(asyncChannel: asyncChannel) await withGracefulShutdownHandler { await onServerRunning?(asyncChannel.channel) @@ -144,6 +141,7 @@ public actor HBServer: Service { self.state = .shutdown throw error } + self.state = .shutdown case .starting, .running: fatalError("Run should only be called once") @@ -161,21 +159,10 @@ public actor HBServer: Service { case .initial, .starting: self.state = .shutdown - case .running(let channel, let quiescingHelper): - // quiesce open channels + case .running(let channel): let shutdownPromise = channel.channel.eventLoop.makePromise(of: Void.self) + channel.channel.close(promise: shutdownPromise) self.state = .shuttingDown(shutdownPromise: shutdownPromise) - quiescingHelper.initiateShutdown(promise: shutdownPromise) - try await shutdownPromise.futureResult.get() - - // We need to check the state here again since we just awaited above - switch self.state { - case .initial, .starting, .running, .shutdown: - fatalError("Unexpected state") - - case .shuttingDown: - self.state = .shutdown - } case .shuttingDown(let shutdownPromise): // We are just going to queue up behind the current graceful shutdown @@ -189,14 +176,10 @@ public actor HBServer: Service { /// Start server /// - Parameter responder: Object that provides responses to requests sent to the server /// - Returns: EventLoopFuture that is fulfilled when server has started - public func makeServer(childChannelSetup: ChannelSetup, configuration: HBServerConfiguration) async throws -> (AsyncServerChannel, ServerQuiescingHelper) { - let quiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup) + public func makeServer(childChannelSetup: ChannelSetup, configuration: HBServerConfiguration) async throws -> AsyncServerChannel { let bootstrap: ServerBootstrapProtocol #if canImport(Network) - if let tsBootstrap = self.createTSBootstrap( - configuration: configuration, - quiescingHelper: quiescingHelper - ) { + if let tsBootstrap = self.createTSBootstrap(configuration: configuration) { bootstrap = tsBootstrap } else { #if os(iOS) || os(tvOS) @@ -205,15 +188,11 @@ public actor HBServer: Service { if configuration.tlsOptions.options != nil { self.logger.warning("tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework") } - bootstrap = self.createSocketsBootstrap( - configuration: configuration, - quiescingHelper: quiescingHelper - ) + bootstrap = self.createSocketsBootstrap(configuration: configuration) } #else bootstrap = self.createSocketsBootstrap( - configuration: configuration, - quiescingHelper: quiescingHelper + configuration: configuration ) #endif @@ -247,25 +226,21 @@ public actor HBServer: Service { } self.logger.info("Server started and listening on socket path \(path)") } - return (asyncChannel, quiescingHelper) + return asyncChannel } catch { - quiescingHelper.initiateShutdown(promise: nil) + // should we close the channel here throw error } } /// create a BSD sockets based bootstrap private func createSocketsBootstrap( - configuration: HBServerConfiguration, - quiescingHelper: ServerQuiescingHelper + configuration: HBServerConfiguration ) -> ServerBootstrap { return ServerBootstrap(group: self.eventLoopGroup) // Specify backlog and enable SO_REUSEADDR for the server itself .serverChannelOption(ChannelOptions.backlog, value: numericCast(configuration.backlog)) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) - .serverChannelInitializer { channel in - channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel)) - } .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1) .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) @@ -275,14 +250,10 @@ public actor HBServer: Service { /// create a NIOTransportServices bootstrap using Network.framework @available(macOS 10.14, iOS 12, tvOS 12, *) private func createTSBootstrap( - configuration: HBServerConfiguration, - quiescingHelper: ServerQuiescingHelper + configuration: HBServerConfiguration ) -> NIOTSListenerBootstrap? { guard let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)? .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) - .serverChannelInitializer({ channel in - channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel)) - }) // Set the handlers that are applied to the accepted Channels .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) diff --git a/Sources/HummingbirdHTTP2/HTTP2Channel.swift b/Sources/HummingbirdHTTP2/HTTP2Channel.swift index d602ce1ce..fb5cf8401 100644 --- a/Sources/HummingbirdHTTP2/HTTP2Channel.swift +++ b/Sources/HummingbirdHTTP2/HTTP2Channel.swift @@ -86,7 +86,7 @@ public struct HTTP2Channel: HTTPChannelHandler { await handleHTTP(asyncChannel: http1, logger: logger) case .http2((let http2, let multiplexer)): try await withThrowingDiscardingTaskGroup { group in - for try await client in multiplexer.inbound { + for try await client in multiplexer.inbound.cancelOnGracefulShutdown() { group.addTask { await handleHTTP(asyncChannel: client, logger: logger) } diff --git a/Sources/PerformanceTest/main.swift b/Sources/PerformanceTest/main.swift index 78f6c7069..b19353915 100644 --- a/Sources/PerformanceTest/main.swift +++ b/Sources/PerformanceTest/main.swift @@ -44,6 +44,13 @@ router.get("json") { _, _ in return ["message": "Hello, world"] } +// return JSON +// ./wrk -c 128 -d 15s -t 8 http://localhost:8080/json +router.get("wait") { _, _ in + try await Task.sleep(for: .seconds(8)) + return "I waited" +} + var app = HBApplication( responder: router.buildResponder(), configuration: .init( diff --git a/Tests/HummingbirdCoreTests/CoreTests.swift b/Tests/HummingbirdCoreTests/CoreTests.swift index aa00b527a..d9f06e2a8 100644 --- a/Tests/HummingbirdCoreTests/CoreTests.swift +++ b/Tests/HummingbirdCoreTests/CoreTests.swift @@ -301,6 +301,55 @@ class HummingBirdCoreTests: XCTestCase { } } } + + func testChildChannelGracefulShutdown() async throws { + let promise = Promise() + + try await testServer( + childChannelSetup: HTTP1Channel { request, _ in + await promise.complete(()) + try await Task.sleep(for: .milliseconds(500)) + return HBHTTPResponse(status: .ok, body: .init(asyncSequence: request.body.delayed())) + }, + configuration: .init(address: .hostname(port: 0)), + eventLoopGroup: Self.eventLoopGroup, + logger: Logger(label: "HB") + ) { server, client in + try await withTimeout(.seconds(5)) { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + do { + let response = try await client.get("/") + XCTAssertEqual(response.status, .ok) + } catch { + XCTFail("Error: \(error)") + } + } + await promise.wait() + try await server.shutdownGracefully() + try await group.waitForAll() + } + } + } + } + + func testIdleChildChannelGracefulShutdown() async throws { + try await testServer( + childChannelSetup: HTTP1Channel { request, _ in + try await Task.sleep(for: .milliseconds(500)) + return HBHTTPResponse(status: .ok, body: .init(asyncSequence: request.body.delayed())) + }, + configuration: .init(address: .hostname(port: 0)), + eventLoopGroup: Self.eventLoopGroup, + logger: Logger(label: "HB") + ) { server, client in + try await withTimeout(.seconds(5)) { + let response = try await client.get("/") + XCTAssertEqual(response.status, .ok) + try await server.shutdownGracefully() + } + } + } } struct DelayAsyncSequence: AsyncSequence { diff --git a/Tests/HummingbirdCoreTests/TestUtils.swift b/Tests/HummingbirdCoreTests/TestUtils.swift index b90cc1e46..a08fbc60d 100644 --- a/Tests/HummingbirdCoreTests/TestUtils.swift +++ b/Tests/HummingbirdCoreTests/TestUtils.swift @@ -41,7 +41,7 @@ public func testServer( eventLoopGroup: EventLoopGroup, logger: Logger, clientConfiguration: HBXCTClient.Configuration = .init(), - _ test: @escaping @Sendable (HBXCTClient) async throws -> Value + _ test: @escaping @Sendable (HBServer, HBXCTClient) async throws -> Value ) async throws -> Value { try await withThrowingTaskGroup(of: Void.self) { group in let promise = Promise() @@ -69,13 +69,32 @@ public func testServer( eventLoopGroupProvider: .createNew ) client.connect() - let value = try await test(client) + let value = try await test(server, client) await serviceGroup.triggerGracefulShutdown() try await client.shutdown() return value } } +public func testServer( + childChannelSetup: ChannelSetup, + configuration: HBServerConfiguration, + eventLoopGroup: EventLoopGroup, + logger: Logger, + clientConfiguration: HBXCTClient.Configuration = .init(), + _ test: @escaping @Sendable (HBXCTClient) async throws -> Value +) async throws -> Value { + try await testServer( + childChannelSetup: childChannelSetup, + configuration: configuration, + eventLoopGroup: eventLoopGroup, + logger: logger, + clientConfiguration: clientConfiguration + ) { _, client in + try await test(client) + } +} + /// Run process with a timeout /// - Parameters: /// - timeout: Amount of time before timeout error is thrown