Skip to content

Commit

Permalink
Remove quiesce helper, replace with withGracefulShutdown in HTTP hand…
Browse files Browse the repository at this point in the history
…ler (#278)
  • Loading branch information
adam-fowler authored Nov 23, 2023
1 parent c375cbb commit 895af49
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
--minversion 0.47.4

# Swift version
--swiftversion 5.3
--swiftversion 5.9

# file options
--exclude .build
Expand Down
105 changes: 63 additions & 42 deletions Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
//
//===----------------------------------------------------------------------===//

import Atomics
import Logging
import NIOCore
import NIOHTTP1
import ServiceLifecycle

/// Protocol for HTTP channels
public protocol HTTPChannelHandler: HBChannelSetup {
Expand All @@ -28,58 +30,77 @@ enum HTTPChannelError: Error {
case closeConnection
}

enum HTTPState: Int, AtomicValue {
case idle
case processing
case cancelled
}

extension HTTPChannelHandler {
public func handleHTTP(asyncChannel: NIOAsyncChannel<HTTPServerRequestPart, SendableHTTPServerResponsePart>, logger: Logger) async {
let processingRequest = ManagedAtomic(HTTPState.idle)
do {
try await withThrowingTaskGroup(of: Void.self) { group in
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()
while let part = try await iterator.next() {
guard case .head(let head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)
}
let bodyStream = HBStreamedRequestBody()
let body = HBRequestBody.stream(bodyStream)
let request = HBHTTPRequest(head: head, body: body)
// add task processing request and writing response
group.addTask {
let response: HBHTTPResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
try await withGracefulShutdownHandler {
try await withThrowingTaskGroup(of: Void.self) { group in
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()
while let part = try await iterator.next() {
// set to processing unless it is cancelled then exit
guard processingRequest.exchange(.processing, ordering: .relaxed) == .idle else { break }
guard case .head(let head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)
}
let head = HTTPResponseHead(version: request.head.version, status: response.status, headers: response.headers)
let bodyStream = HBStreamedRequestBody()
let body = HBRequestBody.stream(bodyStream)
let request = HBHTTPRequest(head: head, body: body)
// add task processing request and writing response
group.addTask {
let response: HBHTTPResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
}
let head = HTTPResponseHead(version: request.head.version, status: response.status, headers: response.headers)
do {
try await outbound.write(.head(head))
try await response.body.write(responseWriter)
try await outbound.write(.end(nil))
// flush request body
for try await _ in request.body {}
} catch {
// flush request body
for try await _ in request.body {}
throw error
}
if request.head.headers["connection"].first == "close" {
throw HTTPChannelError.closeConnection
}
}
// send body parts to request
do {
try await outbound.write(.head(head))
try await response.body.write(responseWriter)
try await outbound.write(.end(nil))
// flush request body
for try await _ in request.body {}
// pass body part to request
while case .body(let buffer) = try await iterator.next() {
await bodyStream.send(buffer)
}
bodyStream.finish()
} catch {
// flush request body
for try await _ in request.body {}
throw error
}
if request.head.headers["connection"].first == "close" {
throw HTTPChannelError.closeConnection
// pass failed to read full http body to request
bodyStream.fail(error)
}
try await group.next()
// set to idle unless it is cancelled then exit
guard processingRequest.exchange(.idle, ordering: .relaxed) == .processing else { break }
}
// send body parts to request
do {
// pass body part to request
while case .body(let buffer) = try await iterator.next() {
await bodyStream.send(buffer)
}
bodyStream.finish()
} catch {
// pass failed to read full http body to request
bodyStream.fail(error)
}
try await group.next()
}
}
} onGracefulShutdown: {
// set to cancelled
if processingRequest.exchange(.cancelled, ordering: .relaxed) == .idle {
// only close the channel input if it is idle
asyncChannel.channel.close(mode: .input, promise: nil)
}
}
} catch HTTPChannelError.closeConnection {
// channel is being closed because we received a connection: close header
Expand Down
12 changes: 0 additions & 12 deletions Sources/HummingbirdCore/Server/HTTPUserEventHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ public class HBHTTPUserEventHandler: ChannelDuplexHandler, RemovableChannelHandl

public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
switch event {
case is ChannelShouldQuiesceEvent:
// we received a quiesce event. If we have any requests in progress we should
// wait for them to finish.
//
// If we are running with the HTTP pipeline assistance handler then we will never
// receive quiesce events but in the case where we aren't this is needed
if self.requestsInProgress > 0 {
self.closeAfterResponseWritten = true
} else {
context.close(promise: nil)
}

case IdleStateHandler.IdleStateEvent.read:
// if we get an idle read event and we haven't completed reading the request
// close the connection
Expand Down
57 changes: 14 additions & 43 deletions Sources/HummingbirdCore/Server/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
onServerRunning: (@Sendable (Channel) async -> Void)?
)
case starting
case running(
asyncChannel: AsyncServerChannel,
quiescingHelper: ServerQuiescingHelper
)
case running(asyncChannel: AsyncServerChannel)
case shuttingDown(shutdownPromise: EventLoopPromise<Void>)
case shutdown

Expand Down Expand Up @@ -97,7 +94,7 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
self.state = .starting

do {
let (asyncChannel, quiescingHelper) = try await self.makeServer(
let asyncChannel = try await self.makeServer(
childChannelSetup: childChannelSetup,
configuration: configuration
)
Expand All @@ -108,7 +105,7 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
fatalError("We should only be running once")

case .starting:
self.state = .running(asyncChannel: asyncChannel, quiescingHelper: quiescingHelper)
self.state = .running(asyncChannel: asyncChannel)

await withGracefulShutdownHandler {
await onServerRunning?(asyncChannel.channel)
Expand Down Expand Up @@ -144,6 +141,7 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
self.state = .shutdown
throw error
}
self.state = .shutdown
case .starting, .running:
fatalError("Run should only be called once")

Expand All @@ -161,21 +159,10 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
case .initial, .starting:
self.state = .shutdown

case .running(let channel, let quiescingHelper):
// quiesce open channels
case .running(let channel):
let shutdownPromise = channel.channel.eventLoop.makePromise(of: Void.self)
channel.channel.close(promise: shutdownPromise)
self.state = .shuttingDown(shutdownPromise: shutdownPromise)
quiescingHelper.initiateShutdown(promise: shutdownPromise)
try await shutdownPromise.futureResult.get()

// We need to check the state here again since we just awaited above
switch self.state {
case .initial, .starting, .running, .shutdown:
fatalError("Unexpected state")

case .shuttingDown:
self.state = .shutdown
}

case .shuttingDown(let shutdownPromise):
// We are just going to queue up behind the current graceful shutdown
Expand All @@ -189,14 +176,10 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
/// Start server
/// - Parameter responder: Object that provides responses to requests sent to the server
/// - Returns: EventLoopFuture that is fulfilled when server has started
public func makeServer(childChannelSetup: ChannelSetup, configuration: HBServerConfiguration) async throws -> (AsyncServerChannel, ServerQuiescingHelper) {
let quiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
public func makeServer(childChannelSetup: ChannelSetup, configuration: HBServerConfiguration) async throws -> AsyncServerChannel {
let bootstrap: ServerBootstrapProtocol
#if canImport(Network)
if let tsBootstrap = self.createTSBootstrap(
configuration: configuration,
quiescingHelper: quiescingHelper
) {
if let tsBootstrap = self.createTSBootstrap(configuration: configuration) {
bootstrap = tsBootstrap
} else {
#if os(iOS) || os(tvOS)
Expand All @@ -205,15 +188,11 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
if configuration.tlsOptions.options != nil {
self.logger.warning("tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework")
}
bootstrap = self.createSocketsBootstrap(
configuration: configuration,
quiescingHelper: quiescingHelper
)
bootstrap = self.createSocketsBootstrap(configuration: configuration)
}
#else
bootstrap = self.createSocketsBootstrap(
configuration: configuration,
quiescingHelper: quiescingHelper
configuration: configuration
)
#endif

Expand Down Expand Up @@ -247,25 +226,21 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
}
self.logger.info("Server started and listening on socket path \(path)")
}
return (asyncChannel, quiescingHelper)
return asyncChannel
} catch {
quiescingHelper.initiateShutdown(promise: nil)
// should we close the channel here
throw error
}
}

/// create a BSD sockets based bootstrap
private func createSocketsBootstrap(
configuration: HBServerConfiguration,
quiescingHelper: ServerQuiescingHelper
configuration: HBServerConfiguration
) -> ServerBootstrap {
return ServerBootstrap(group: self.eventLoopGroup)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: numericCast(configuration.backlog))
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.serverChannelInitializer { channel in
channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
}
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
Expand All @@ -275,14 +250,10 @@ public actor HBServer<ChannelSetup: HBChannelSetup>: Service {
/// create a NIOTransportServices bootstrap using Network.framework
@available(macOS 10.14, iOS 12, tvOS 12, *)
private func createTSBootstrap(
configuration: HBServerConfiguration,
quiescingHelper: ServerQuiescingHelper
configuration: HBServerConfiguration
) -> NIOTSListenerBootstrap? {
guard let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)?
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.serverChannelInitializer({ channel in
channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
})
// Set the handlers that are applied to the accepted Channels
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdHTTP2/HTTP2Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public struct HTTP2Channel: HTTPChannelHandler {
await handleHTTP(asyncChannel: http1, logger: logger)
case .http2((let http2, let multiplexer)):
try await withThrowingDiscardingTaskGroup { group in
for try await client in multiplexer.inbound {
for try await client in multiplexer.inbound.cancelOnGracefulShutdown() {
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
}
Expand Down
7 changes: 7 additions & 0 deletions Sources/PerformanceTest/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ router.get("json") { _, _ in
return ["message": "Hello, world"]
}

// return JSON
// ./wrk -c 128 -d 15s -t 8 http://localhost:8080/json
router.get("wait") { _, _ in
try await Task.sleep(for: .seconds(8))
return "I waited"
}

var app = HBApplication(
responder: router.buildResponder(),
configuration: .init(
Expand Down
49 changes: 49 additions & 0 deletions Tests/HummingbirdCoreTests/CoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,55 @@ class HummingBirdCoreTests: XCTestCase {
}
}
}

func testChildChannelGracefulShutdown() async throws {
let promise = Promise<Void>()

try await testServer(
childChannelSetup: HTTP1Channel { request, _ in
await promise.complete(())
try await Task.sleep(for: .milliseconds(500))
return HBHTTPResponse(status: .ok, body: .init(asyncSequence: request.body.delayed()))
},
configuration: .init(address: .hostname(port: 0)),
eventLoopGroup: Self.eventLoopGroup,
logger: Logger(label: "HB")
) { server, client in
try await withTimeout(.seconds(5)) {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
do {
let response = try await client.get("/")
XCTAssertEqual(response.status, .ok)
} catch {
XCTFail("Error: \(error)")
}
}
await promise.wait()
try await server.shutdownGracefully()
try await group.waitForAll()
}
}
}
}

func testIdleChildChannelGracefulShutdown() async throws {
try await testServer(
childChannelSetup: HTTP1Channel { request, _ in
try await Task.sleep(for: .milliseconds(500))
return HBHTTPResponse(status: .ok, body: .init(asyncSequence: request.body.delayed()))
},
configuration: .init(address: .hostname(port: 0)),
eventLoopGroup: Self.eventLoopGroup,
logger: Logger(label: "HB")
) { server, client in
try await withTimeout(.seconds(5)) {
let response = try await client.get("/")
XCTAssertEqual(response.status, .ok)
try await server.shutdownGracefully()
}
}
}
}

struct DelayAsyncSequence<CoreSequence: AsyncSequence>: AsyncSequence {
Expand Down
Loading

0 comments on commit 895af49

Please sign in to comment.