Skip to content

Commit

Permalink
Fix up HTTPCHannelHandler after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Nov 27, 2023
1 parent c280749 commit 29bc8f5
Showing 1 changed file with 55 additions and 41 deletions.
96 changes: 55 additions & 41 deletions Sources/HummingbirdCore/Server/HTTP/HTTPChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import HTTPTypes
import Logging
import NIOCore
import NIOHTTPTypes
import ServiceLifecycle

/// Protocol for HTTP channels
public protocol HTTPChannelHandler: HBChannelSetup {
Expand All @@ -38,55 +39,68 @@ enum HTTPState: Int, AtomicValue {

extension HTTPChannelHandler {
public func handleHTTP(asyncChannel: NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, logger: Logger) async {
let processingRequest = ManagedAtomic(HTTPState.idle)
do {
try await withThrowingTaskGroup(of: Void.self) { group in
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()
while let part = try await iterator.next() {
guard case .head(let head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)
}
let bodyStream = HBStreamedRequestBody()
let body = HBRequestBody.stream(bodyStream)
let request = HBHTTPRequest(head: head, body: body)
// add task processing request and writing response
group.addTask {
let response: HBHTTPResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
try await withGracefulShutdownHandler {
try await withThrowingTaskGroup(of: Void.self) { group in
try await asyncChannel.executeThenClose { inbound, outbound in
let responseWriter = HBHTTPServerBodyWriter(outbound: outbound)
var iterator = inbound.makeAsyncIterator()
while let part = try await iterator.next() {
// set to processing unless it is cancelled then exit
guard processingRequest.exchange(.processing, ordering: .relaxed) == .idle else { break }
guard case .head(let head) = part else {
throw HTTPChannelError.unexpectedHTTPPart(part)
}
let bodyStream = HBStreamedRequestBody()
let body = HBRequestBody.stream(bodyStream)
let request = HBHTTPRequest(head: head, body: body)
// add task processing request and writing response
group.addTask {
let response: HBHTTPResponse
do {
response = try await self.responder(request, asyncChannel.channel)
} catch {
response = self.getErrorResponse(from: error, allocator: asyncChannel.channel.allocator)
}
do {
try await outbound.write(.head(response.head))
try await response.body.write(responseWriter)
try await outbound.write(.end(nil))
// flush request body
for try await _ in request.body {}
} catch {
// flush request body
for try await _ in request.body {}
throw error
}
if request.headers[.connection] == "close" {
throw HTTPChannelError.closeConnection
}
}
// send body parts to request
do {
try await outbound.write(.head(response.head))
try await response.body.write(responseWriter)
try await outbound.write(.end(nil))
// flush request body
for try await _ in request.body {}
// pass body part to request
while case .body(let buffer) = try await iterator.next() {
await bodyStream.send(buffer)
}
bodyStream.finish()
} catch {
// flush request body
for try await _ in request.body {}
throw error
// pass failed to read full http body to request
bodyStream.fail(error)
}
if request.headers[.connection] == "close" {
throw HTTPChannelError.closeConnection
}
}
// send body parts to request
do {
// pass body part to request
while case .body(let buffer) = try await iterator.next() {
await bodyStream.send(buffer)
}
bodyStream.finish()
} catch {
// pass failed to read full http body to request
bodyStream.fail(error)
try await group.next()
// set to idle unless it is cancelled then exit
guard processingRequest.exchange(.idle, ordering: .relaxed) == .processing else { break }
}
try await group.next()
}
}
} onGracefulShutdown: {
// set to cancelled
if processingRequest.exchange(.cancelled, ordering: .relaxed) == .idle {
// only close the channel input if it is idle
asyncChannel.channel.close(mode: .input, promise: nil)
}
}
} catch HTTPChannelError.closeConnection {
// channel is being closed because we received a connection: close header
Expand Down

0 comments on commit 29bc8f5

Please sign in to comment.