diff --git a/Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift b/Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift index adf30f554..44450717b 100644 --- a/Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift @@ -18,8 +18,9 @@ import NIOCore import NIOHTTP2 /// An event which happens on a client's HTTP/2 connection. -enum ClientConnectionEvent: Sendable, Hashable { - enum CloseReason: Sendable, Hashable { +@_spi(Package) +public enum ClientConnectionEvent: Sendable, Hashable { + public enum CloseReason: Sendable, Hashable { /// The server sent a GOAWAY frame to the client. case goAway(HTTP2ErrorCode, String) /// The keep alive timer fired and subsequently timed out. diff --git a/Sources/GRPCHTTP2Core/Client/Connection/Connection.swift b/Sources/GRPCHTTP2Core/Client/Connection/Connection.swift new file mode 100644 index 000000000..d9cbe0fd7 --- /dev/null +++ b/Sources/GRPCHTTP2Core/Client/Connection/Connection.swift @@ -0,0 +1,434 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import NIOConcurrencyHelpers +import NIOCore +import NIOHTTP2 + +/// A `Connection` provides communication to a single remote peer. +/// +/// Each `Connection` object is 'one-shot': it may only be used for a single connection over +/// its lifetime. If a connect attempt fails then the `Connection` must be discarded and a new one +/// must be created. However, an active connection may be used multiple times to provide streams +/// to the backend. +/// +/// To use the `Connection` you must run it in a task. You can consume event updates by listening +/// to `events`: +/// +/// ```swift +/// await withTaskGroup(of: Void.self) { group in +/// group.addTask { await connection.run() } +/// +/// for await event in connection.events { +/// switch event { +/// case .connectSucceeded: +/// // ... +/// default: +/// // ... +/// } +/// } +/// } +/// ``` +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +struct Connection: Sendable { + /// Events which can happen over the lifetime of the connection. + enum Event: Sendable { + /// The connect attempt succeeded and the connection is ready to use. + case connectSucceeded + /// The connect attempt failed. + case connectFailed(any Error) + /// The connection received a GOAWAY and will close soon. No new streams + /// should be opened on this connection. + case goingAway(HTTP2ErrorCode, String) + /// The connection is closed. + case closed(Connection.CloseReason) + } + + /// The reason the connection closed. + enum CloseReason: Sendable { + /// Closed because an idle timeout fired. + case idleTimeout + /// Closed because a keepalive timer fired. + case keepaliveTimeout + /// Closed because the caller initiated shutdown and all RPCs on the connection finished. + case initiatedLocally + /// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame). + case remote + /// Closed because the connection encountered an unexpected error. + case error(Error) + } + + /// Inputs to the 'run' method. + private enum Input: Sendable { + case close + } + + /// Events which have happened to the connection. + private let event: (stream: AsyncStream, continuation: AsyncStream.Continuation) + + /// Events which the connection must react to. + private let input: (stream: AsyncStream, continuation: AsyncStream.Continuation) + + /// The address to connect to. + private let address: SocketAddress + + /// The default compression algorithm used for requests. + private let defaultCompression: CompressionAlgorithm + + /// The set of enabled compression algorithms. + private let enabledCompression: CompressionAlgorithmSet + + /// A connector used to establish a connection. + private let http2Connector: any HTTP2Connector + + /// The state of the connection. + private let state: NIOLockedValueBox + + /// The default max request message size in bytes, 4 MiB. + private static var defaultMaxRequestMessageSizeBytes: Int { + 4 * 1024 * 1024 + } + + /// A stream of events which can happen to the connection. + var events: AsyncStream { + self.event.stream + } + + init( + address: SocketAddress, + http2Connector: any HTTP2Connector, + defaultCompression: CompressionAlgorithm, + enabledCompression: CompressionAlgorithmSet + ) { + self.address = address + self.defaultCompression = defaultCompression + self.enabledCompression = enabledCompression + self.http2Connector = http2Connector + self.event = AsyncStream.makeStream(of: Event.self) + self.input = AsyncStream.makeStream(of: Input.self) + self.state = NIOLockedValueBox(.notConnected) + } + + /// Connect and run the connection. + /// + /// This function returns when the connection has closed. You can observe connection events + /// by consuming the ``events`` sequence. + func run() async { + let connectResult = await Result { + try await self.http2Connector.establishConnection(to: self.address) + } + + switch connectResult { + case .success(let connected): + // Connected successfully, update state and report the event. + self.state.withLockedValue { state in + state.connected(connected) + } + + self.event.continuation.yield(.connectSucceeded) + + await withDiscardingTaskGroup { group in + // Add a task to run the connection and consume events. + group.addTask { + try? await connected.channel.executeThenClose { inbound, outbound in + await self.consumeConnectionEvents(inbound) + } + } + + // Meanwhile, consume input events. This sequence will end when the connection has closed. + for await input in self.input.stream { + switch input { + case .close: + let asyncChannel = self.state.withLockedValue { $0.beginClosing() } + if let channel = asyncChannel?.channel { + let event = ClientConnectionHandler.OutboundEvent.closeGracefully + channel.triggerUserOutboundEvent(event, promise: nil) + } + } + } + } + + case .failure(let error): + // Connect failed, this connection is no longer useful. + self.state.withLockedValue { $0.closed() } + self.finishStreams(withEvent: .connectFailed(error)) + } + } + + /// Gracefully close the connection. + func close() { + self.input.continuation.yield(.close) + } + + /// Make a stream using the connection if it's connected. + /// + /// - Parameter descriptor: A descriptor of the method to create a stream for. + /// - Returns: The open stream. + func makeStream(descriptor: MethodDescriptor, options: CallOptions) async throws -> Stream { + let (multiplexer, scheme) = try self.state.withLockedValue { state in + switch state { + case .connected(let connected): + return (connected.multiplexer, connected.scheme) + case .notConnected, .closing, .closed: + throw RPCError(code: .unavailable, message: "subchannel isn't ready") + } + } + + let compression: CompressionAlgorithm + if let override = options.compression { + compression = self.enabledCompression.contains(override) ? override : .none + } else { + compression = self.defaultCompression + } + + let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes + + do { + let stream = try await multiplexer.openStream { channel in + channel.eventLoop.makeCompletedFuture { + let streamHandler = GRPCClientStreamHandler( + methodDescriptor: descriptor, + scheme: scheme, + outboundEncoding: compression, + acceptedEncodings: self.enabledCompression, + maximumPayloadSize: maxRequestSize + ) + try channel.pipeline.syncOperations.addHandler(streamHandler) + + return try NIOAsyncChannel( + wrappingChannelSynchronously: channel, + configuration: NIOAsyncChannel.Configuration( + isOutboundHalfClosureEnabled: true, + inboundType: RPCResponsePart.self, + outboundType: RPCRequestPart.self + ) + ) + } + } + + return Stream(wrapping: stream, descriptor: descriptor) + } catch { + throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error) + } + } + + private func consumeConnectionEvents( + _ connectionEvents: NIOAsyncChannelInboundStream + ) async { + do { + var channelCloseReason: ClientConnectionEvent.CloseReason? + + for try await connectionEvent in connectionEvents { + switch connectionEvent { + case .closing(let reason): + self.state.withLockedValue { $0.closing() } + + switch reason { + case .goAway(let errorCode, let reason): + // The connection will close at some point soon, yield a notification for this + // because the close might not be imminent and this could result in address resolution. + self.event.continuation.yield(.goingAway(errorCode, reason)) + case .idle, .keepaliveExpired, .initiatedLocally: + // The connection will be closed imminently in these cases there's no need to do + // anything. + () + } + + // Take the reason with the highest precedence. A GOAWAY may be superseded by user + // closing, for example. + if channelCloseReason.map({ reason.precedence > $0.precedence }) ?? true { + channelCloseReason = reason + } + } + } + + let connectionCloseReason: Self.CloseReason + switch channelCloseReason { + case .keepaliveExpired: + connectionCloseReason = .keepaliveTimeout + + case .idle: + // Connection became idle, that's fine. + connectionCloseReason = .idleTimeout + + case .goAway: + // Remote peer told us to GOAWAY. + connectionCloseReason = .remote + + case .initiatedLocally, .none: + // Shutdown was initiated locally. + connectionCloseReason = .initiatedLocally + } + + // The connection events sequence has finished: the connection is now closed. + self.state.withLockedValue { $0.closed() } + self.finishStreams(withEvent: .closed(connectionCloseReason)) + } catch { + // Any error must come from consuming the inbound channel meaning that the connection + // must be borked, wrap it up and close. + let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error) + self.state.withLockedValue { $0.closed() } + self.finishStreams(withEvent: .closed(.error(rpcError))) + } + } + + private func finishStreams(withEvent event: Event) { + self.event.continuation.yield(event) + self.event.continuation.finish() + self.input.continuation.finish() + } +} + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension Connection { + struct Stream { + typealias Inbound = NIOAsyncChannelInboundStream + + struct Outbound: ClosableRPCWriterProtocol { + typealias Element = RPCRequestPart + + private let requestWriter: NIOAsyncChannelOutboundWriter + private let http2Stream: NIOAsyncChannel + + fileprivate init( + requestWriter: NIOAsyncChannelOutboundWriter, + http2Stream: NIOAsyncChannel + ) { + self.requestWriter = requestWriter + self.http2Stream = http2Stream + } + + func write(contentsOf elements: some Sequence) async throws { + try await self.requestWriter.write(contentsOf: elements) + } + + func finish() { + self.requestWriter.finish() + } + + func finish(throwing error: any Error) { + // Fire the error inbound; this fails the inbound writer. + self.http2Stream.channel.pipeline.fireErrorCaught(error) + } + } + + let descriptor: MethodDescriptor + + private let http2Stream: NIOAsyncChannel + + init( + wrapping stream: NIOAsyncChannel, + descriptor: MethodDescriptor + ) { + self.http2Stream = stream + self.descriptor = descriptor + } + + func execute( + _ closure: (_ inbound: Inbound, _ outbound: Outbound) async throws -> T + ) async throws -> T where T: Sendable { + try await self.http2Stream.executeThenClose { inbound, outbound in + return try await closure( + inbound, + Outbound(requestWriter: outbound, http2Stream: self.http2Stream) + ) + } + } + } +} + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension Connection { + private enum State { + /// The connection is idle or connecting. + case notConnected + /// A connection has been established with the remote peer. + case connected(Connected) + /// The connection has started to close. This may be initiated locally or by the remote. + case closing + /// The connection has closed. This is a terminal state. + case closed + + struct Connected { + /// The connection channel. + var channel: NIOAsyncChannel + /// Multiplexer for creating HTTP/2 streams. + var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + /// Whether the connection is plaintext, `false` implies TLS is being used. + var scheme: Scheme + + init(_ connection: HTTP2Connection) { + self.channel = connection.channel + self.multiplexer = connection.multiplexer + self.scheme = connection.isPlaintext ? .http : .https + } + } + + mutating func connected(_ channel: HTTP2Connection) { + switch self { + case .notConnected: + self = .connected(State.Connected(channel)) + case .connected, .closing, .closed: + fatalError("Invalid state: 'run()' must only be called once") + } + } + + mutating func beginClosing() -> NIOAsyncChannel? { + switch self { + case .notConnected: + fatalError("Invalid state: 'run()' must be called first") + case .connected(let connected): + self = .closing + return connected.channel + case .closing, .closed: + return nil + } + } + + mutating func closing() { + switch self { + case .notConnected: + // Not reachable: happens as a result of a connection event, that can only happen if + // the connection has started (i.e. must be in the 'connected' state or later). + fatalError("Invalid state") + case .connected: + self = .closing + case .closing, .closed: + () + } + } + + mutating func closed() { + self = .closed + } + } +} + +extension ClientConnectionEvent.CloseReason { + fileprivate var precedence: Int { + switch self { + case .goAway: + return 0 + case .idle: + return 1 + case .keepaliveExpired: + return 2 + case .initiatedLocally: + return 3 + } + } +} diff --git a/Sources/GRPCHTTP2Core/Client/Connection/ConnectionFactory.swift b/Sources/GRPCHTTP2Core/Client/Connection/ConnectionFactory.swift new file mode 100644 index 000000000..1ec651ff3 --- /dev/null +++ b/Sources/GRPCHTTP2Core/Client/Connection/ConnectionFactory.swift @@ -0,0 +1,48 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import NIOCore +import NIOHTTP2 +import NIOPosix + +@_spi(Package) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +public protocol HTTP2Connector: Sendable { + func establishConnection(to address: SocketAddress) async throws -> HTTP2Connection +} + +@_spi(Package) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +public struct HTTP2Connection { + /// The underlying TCP connection wrapped up for use with gRPC. + var channel: NIOAsyncChannel + + /// An HTTP/2 stream multiplexer. + var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + + /// Whether the connection is insecure (i.e. plaintext). + var isPlaintext: Bool + + public init( + channel: NIOAsyncChannel, + multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, + isPlaintext: Bool + ) { + self.channel = channel + self.multiplexer = multiplexer + self.isPlaintext = isPlaintext + } +} diff --git a/Sources/GRPCHTTP2Core/Internal/AsyncStream+MakeStream.swift b/Sources/GRPCHTTP2Core/Internal/AsyncStream+MakeStream.swift new file mode 100644 index 000000000..5f1b75dd6 --- /dev/null +++ b/Sources/GRPCHTTP2Core/Internal/AsyncStream+MakeStream.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if swift(<5.9) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension AsyncStream { + @inlinable + static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(Element.self, bufferingPolicy: limit) { + continuation = $0 + } + return (stream, continuation) + } +} +#endif diff --git a/Sources/GRPCHTTP2Core/Internal/Result+Catching.swift b/Sources/GRPCHTTP2Core/Internal/Result+Catching.swift new file mode 100644 index 000000000..1cd809e42 --- /dev/null +++ b/Sources/GRPCHTTP2Core/Internal/Result+Catching.swift @@ -0,0 +1,30 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension Result where Failure == any Error { + /// Like `Result(catching:)`, but `async`. + /// + /// - Parameter body: An `async` closure to catch the result of. + @inlinable + init(catching body: () async throws -> Success) async { + do { + self = .success(try await body()) + } catch { + self = .failure(error) + } + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift b/Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift index c9d828645..d258b2ace 100644 --- a/Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift @@ -14,13 +14,12 @@ * limitations under the License. */ +@_spi(Package) @testable import GRPCHTTP2Core import NIOCore import NIOEmbedded import NIOHTTP2 import XCTest -@testable import GRPCHTTP2Core - final class ClientConnectionHandlerTests: XCTestCase { func testMaxIdleTime() throws { let connection = try Connection(maxIdleTime: .minutes(5)) diff --git a/Tests/GRPCHTTP2CoreTests/Client/Connection/Connection+Equatable.swift b/Tests/GRPCHTTP2CoreTests/Client/Connection/Connection+Equatable.swift new file mode 100644 index 000000000..e493881cb --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Client/Connection/Connection+Equatable.swift @@ -0,0 +1,62 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +@testable import GRPCHTTP2Core + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension Connection.Event: Equatable { + public static func == (lhs: Connection.Event, rhs: Connection.Event) -> Bool { + switch (lhs, rhs) { + case (.connectSucceeded, .connectSucceeded), + (.connectFailed, .connectFailed): + return true + + case (.goingAway(let lhsCode, let lhsReason), .goingAway(let rhsCode, let rhsReason)): + return lhsCode == rhsCode && lhsReason == rhsReason + + case (.closed(let lhsReason), .closed(let rhsReason)): + return lhsReason == rhsReason + + default: + return false + } + } +} + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension Connection.CloseReason: Equatable { + public static func == (lhs: Connection.CloseReason, rhs: Connection.CloseReason) -> Bool { + switch (lhs, rhs) { + case (.idleTimeout, .idleTimeout), + (.keepaliveTimeout, .keepaliveTimeout), + (.initiatedLocally, .initiatedLocally), + (.remote, .remote): + return true + + case (.error(let lhsError), .error(let rhsError)): + if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError { + return lhs == rhs + } else { + return true + } + + default: + return false + } + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Client/Connection/ConnectionTests.swift b/Tests/GRPCHTTP2CoreTests/Client/Connection/ConnectionTests.swift new file mode 100644 index 000000000..2be26da0f --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Client/Connection/ConnectionTests.swift @@ -0,0 +1,219 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import DequeModule +import GRPCCore +@_spi(Package) @testable import GRPCHTTP2Core +import NIOConcurrencyHelpers +import NIOCore +import NIOHPACK +import NIOHTTP2 +import NIOPosix +import XCTest + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +final class ConnectionTests: XCTestCase { + func testConnectThenClose() async throws { + try await ConnectionTest.run(connector: .posix()) { context, event in + switch event { + case .connectSucceeded: + context.connection.close() + default: + () + } + } validateEvents: { _, events in + XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)]) + } + } + + func testConnectThenIdleTimeout() async throws { + try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in + XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)]) + } + } + + func testConnectThenKeepaliveTimeout() async throws { + try await ConnectionTest.run( + connector: .posix( + keepaliveTime: .milliseconds(50), + keepaliveTimeout: .milliseconds(10), + keepaliveWithoutCalls: true, + dropPingAcks: true + ) + ) { _, events in + XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)]) + } + } + + func testGoAwayWhenConnected() async throws { + try await ConnectionTest.run(connector: .posix()) { context, event in + switch event { + case .connectSucceeded: + let goAway = HTTP2Frame( + streamID: .rootStream, + payload: .goAway( + lastStreamID: 0, + errorCode: .noError, + opaqueData: ByteBuffer(string: "Hello!") + ) + ) + + let accepted = try context.server.acceptedChannel + accepted.writeAndFlush(goAway, promise: nil) + + default: + () + } + } validateEvents: { _, events in + XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)]) + } + } + + func testConnectFails() async throws { + let error = RPCError(code: .unimplemented, message: "") + try await ConnectionTest.run(connector: .throwing(error)) { _, events in + XCTAssertEqual(events, [.connectFailed(error)]) + } + } + + func testMakeStreamOnActiveConnection() async throws { + try await ConnectionTest.run(connector: .posix()) { context, event in + switch event { + case .connectSucceeded: + let stream = try await context.connection.makeStream( + descriptor: .echoGet, + options: .defaults + ) + try await stream.execute { inbound, outbound in + try await outbound.write(.metadata(["foo": "bar", "bar": "baz"])) + try await outbound.write(.message([0, 1, 2])) + outbound.finish() + + var parts = [RPCResponsePart]() + for try await part in inbound { + switch part { + case .metadata(let metadata): + // Filter out any transport specific metadata + parts.append(.metadata(Metadata(metadata.suffix(2)))) + case .message, .status: + parts.append(part) + } + } + + let expected: [RPCResponsePart] = [ + .metadata(["foo": "bar", "bar": "baz"]), + .message([0, 1, 2]), + .status(Status(code: .ok, message: ""), [:]), + ] + XCTAssertEqual(parts, expected) + } + + context.connection.close() + + default: + () + } + } validateEvents: { _, events in + XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)]) + } + } + + func testMakeStreamOnClosedConnection() async throws { + try await ConnectionTest.run(connector: .posix()) { context, event in + switch event { + case .connectSucceeded: + context.connection.close() + case .closed: + await XCTAssertThrowsErrorAsync(ofType: RPCError.self) { + _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults) + } errorHandler: { error in + XCTAssertEqual(error.code, .unavailable) + } + default: + () + } + } validateEvents: { context, events in + XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)]) + } + } + + func testMakeStreamOnNotRunningConnection() async throws { + let connection = Connection( + address: .ipv4(host: "ignored", port: 0), + http2Connector: .never, + defaultCompression: .none, + enabledCompression: .none + ) + + await XCTAssertThrowsErrorAsync(ofType: RPCError.self) { + _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults) + } errorHandler: { error in + XCTAssertEqual(error.code, .unavailable) + } + } +} + +extension ClientBootstrap { + func connect( + to address: GRPCHTTP2Core.SocketAddress, + _ configure: @Sendable @escaping (Channel) -> EventLoopFuture + ) async throws -> T { + if let ipv4 = address.ipv4 { + return try await self.connect( + host: ipv4.host, + port: ipv4.port, + channelInitializer: configure + ) + } else if let ipv6 = address.ipv6 { + return try await self.connect( + host: ipv6.host, + port: ipv6.port, + channelInitializer: configure + ) + } else if let uds = address.unixDomainSocket { + return try await self.connect( + unixDomainSocketPath: uds.path, + channelInitializer: configure + ) + } else if let vsock = address.virtualSocket { + return try await self.connect( + to: VsockAddress( + cid: .init(Int(vsock.contextID.rawValue)), + port: .init(Int(vsock.port.rawValue)) + ), + channelInitializer: configure + ) + } else { + throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)") + } + } +} + +extension Metadata { + init(_ sequence: some Sequence) { + var metadata = Metadata() + for (key, value) in sequence { + switch value { + case .string(let value): + metadata.addString(value, forKey: key) + case .binary(let value): + metadata.addBinary(value, forKey: key) + } + } + + self = metadata + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift b/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift new file mode 100644 index 000000000..5e2b3c520 --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift @@ -0,0 +1,202 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import DequeModule +import GRPCCore +@_spi(Package) @testable import GRPCHTTP2Core +import NIOCore +import NIOHTTP2 +import NIOPosix + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +enum ConnectionTest { + struct Context { + var server: Server + var connection: Connection + } + + static func run( + connector: HTTP2Connector, + handlEvents: ( + _ context: Context, + _ event: Connection.Event + ) async throws -> Void = { _, _ in }, + validateEvents: (_ context: Context, _ events: [Connection.Event]) -> Void + ) async throws { + let server = Server() + let address = try await server.bind() + + try await withThrowingTaskGroup(of: Void.self) { group in + let connection = Connection( + address: address, + http2Connector: connector, + defaultCompression: .none, + enabledCompression: .none + ) + let context = Context(server: server, connection: connection) + group.addTask { await connection.run() } + + var events: [Connection.Event] = [] + for await event in connection.events { + events.append(event) + try await handlEvents(context, event) + } + + validateEvents(context, events) + } + } +} + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension ConnectionTest { + /// A server which only expected to accept a single connection. + final class Server { + private let eventLoop: any EventLoop + private var listener: (any Channel)? + private let client: EventLoopPromise + + init() { + self.eventLoop = .singletonMultiThreadedEventLoopGroup.next() + self.client = self.eventLoop.next().makePromise() + } + + deinit { + self.listener?.close(promise: nil) + self.client.futureResult.whenSuccess { $0.close(mode: .all, promise: nil) } + } + + var acceptedChannel: Channel { + get throws { + try self.client.futureResult.wait() + } + } + + func bind() async throws -> GRPCHTTP2Core.SocketAddress { + precondition(self.listener == nil, "\(#function) must only be called once") + + let hasAcceptedChannel = try await self.eventLoop.submit { + NIOLoopBoundBox(false, eventLoop: self.eventLoop) + }.get() + + let bootstrap = ServerBootstrap(group: self.eventLoop).childChannelInitializer { channel in + precondition(!hasAcceptedChannel.value, "already accepted a channel") + hasAcceptedChannel.value = true + + return channel.eventLoop.makeCompletedFuture { + let sync = channel.pipeline.syncOperations + let h2 = NIOHTTP2Handler(mode: .server) + let mux = HTTP2StreamMultiplexer(mode: .server, channel: channel) { stream in + let sync = stream.pipeline.syncOperations + let handler = GRPCServerStreamHandler( + scheme: .http, + acceptedEncodings: .none, + maximumPayloadSize: .max + ) + + return stream.eventLoop.makeCompletedFuture { + try sync.addHandler(handler) + try sync.addHandler(EchoHandler()) + } + } + + try sync.addHandler(h2) + try sync.addHandler(mux) + try sync.addHandlers(SucceedOnSettingsAck(promise: self.client)) + } + } + + let channel = try await bootstrap.bind(host: "127.0.0.1", port: 0).get() + self.listener = channel + return .ipv4(host: "127.0.0.1", port: channel.localAddress!.port!) + } + } +} + +@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) +extension ConnectionTest { + /// Succeeds a promise when a SETTINGS frame ack has been read. + private final class SucceedOnSettingsAck: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame + typealias InboundOut = HTTP2Frame + + private let promise: EventLoopPromise + + init(promise: EventLoopPromise) { + self.promise = promise + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let frame = self.unwrapInboundIn(data) + switch frame.payload { + case .settings(.ack): + self.promise.succeed(context.channel) + default: + () + } + + context.fireChannelRead(data) + } + } + + private final class EchoHandler: ChannelInboundHandler { + typealias InboundIn = RPCRequestPart + typealias OutboundOut = RPCResponsePart + + private var received: Deque = [] + private var receivedEnd = false + + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + if let event = event as? ChannelEvent, event == .inputClosed { + self.receivedEnd = true + } + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + self.received.append(self.unwrapInboundIn(data)) + } + + func channelReadComplete(context: ChannelHandlerContext) { + while let part = self.received.popFirst() { + switch part { + case .metadata(let metadata): + var filtered = Metadata() + + // Remove any pseudo-headers. + for (key, value) in metadata where !key.hasPrefix(":") { + switch value { + case .string(let value): + filtered.addString(value, forKey: key) + case .binary(let value): + filtered.addBinary(value, forKey: key) + } + } + + context.write(self.wrapOutboundOut(.metadata(filtered)), promise: nil) + + case .message(let message): + context.write(self.wrapOutboundOut(.message(message)), promise: nil) + } + } + + if self.receivedEnd { + let status = Status(code: .ok, message: "") + context.write(self.wrapOutboundOut(.status(status, [:])), promise: nil) + } + + context.flush() + } + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/HTTP2Connectors.swift b/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/HTTP2Connectors.swift new file mode 100644 index 000000000..0463ac157 --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/HTTP2Connectors.swift @@ -0,0 +1,155 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +@_spi(Package) @testable import GRPCHTTP2Core +import NIOCore +import NIOHTTP2 +import NIOPosix + +@_spi(Package) +extension HTTP2Connector where Self == ThrowingConnector { + /// A connector which throws the given error on a connect attempt. + static func throwing(_ error: RPCError) -> Self { + return ThrowingConnector(error: error) + } +} + +@_spi(Package) +extension HTTP2Connector where Self == NeverConnector { + /// A connector which fatal errors if a connect attempt is made. + static var never: Self { + NeverConnector() + } +} + +@_spi(Package) +extension HTTP2Connector where Self == NIOPosixConnector { + /// A connector which uses NIOPosix to establish a connection. + static func posix( + maxIdleTime: TimeAmount? = nil, + keepaliveTime: TimeAmount? = nil, + keepaliveTimeout: TimeAmount? = nil, + keepaliveWithoutCalls: Bool = false, + dropPingAcks: Bool = false + ) -> Self { + return NIOPosixConnector( + maxIdleTime: maxIdleTime, + keepaliveTime: keepaliveTime, + keepaliveTimeout: keepaliveTimeout, + keepaliveWithoutCalls: keepaliveWithoutCalls, + dropPingAcks: dropPingAcks + ) + } +} + +struct ThrowingConnector: HTTP2Connector { + private let error: RPCError + + init(error: RPCError) { + self.error = error + } + + func establishConnection( + to address: GRPCHTTP2Core.SocketAddress + ) async throws -> HTTP2Connection { + throw self.error + } +} + +struct NeverConnector: HTTP2Connector { + func establishConnection( + to address: GRPCHTTP2Core.SocketAddress + ) async throws -> HTTP2Connection { + fatalError("\(#function) called unexpectedly") + } +} + +struct NIOPosixConnector: HTTP2Connector { + private let eventLoopGroup: any EventLoopGroup + private let maxIdleTime: TimeAmount? + private let keepaliveTime: TimeAmount? + private let keepaliveTimeout: TimeAmount? + private let keepaliveWithoutCalls: Bool + private let dropPingAcks: Bool + + init( + eventLoopGroup: (any EventLoopGroup)? = nil, + maxIdleTime: TimeAmount? = nil, + keepaliveTime: TimeAmount? = nil, + keepaliveTimeout: TimeAmount? = nil, + keepaliveWithoutCalls: Bool = false, + dropPingAcks: Bool = false + ) { + self.eventLoopGroup = eventLoopGroup ?? .singletonMultiThreadedEventLoopGroup + self.maxIdleTime = maxIdleTime + self.keepaliveTime = keepaliveTime + self.keepaliveTimeout = keepaliveTimeout + self.keepaliveWithoutCalls = keepaliveWithoutCalls + self.dropPingAcks = dropPingAcks + } + + func establishConnection( + to address: GRPCHTTP2Core.SocketAddress + ) async throws -> HTTP2Connection { + return try await ClientBootstrap(group: self.eventLoopGroup).connect(to: address) { channel in + channel.eventLoop.makeCompletedFuture { + let sync = channel.pipeline.syncOperations + + let multiplexer = try sync.configureAsyncHTTP2Pipeline(mode: .client) { stream in + // Server shouldn't be opening streams. + stream.close() + } + + if self.dropPingAcks { + try sync.addHandler(PingAckDropper()) + } + + let connectionHandler = ClientConnectionHandler( + eventLoop: channel.eventLoop, + maxIdleTime: self.maxIdleTime, + keepaliveTime: self.keepaliveTime, + keepaliveTimeout: self.keepaliveTimeout, + keepaliveWithoutCalls: self.keepaliveWithoutCalls + ) + + try sync.addHandler(connectionHandler) + + let asyncChannel = try NIOAsyncChannel( + wrappingChannelSynchronously: channel + ) + + return HTTP2Connection(channel: asyncChannel, multiplexer: multiplexer, isPlaintext: true) + } + } + } + + /// Drops all acks for PING frames. This is useful to help trigger the keepalive timeout. + final class PingAckDropper: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame + typealias InboundOut = HTTP2Frame + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let frame = self.unwrapInboundIn(data) + switch frame.payload { + case .ping(_, ack: true): + () // drop-it + default: + context.fireChannelRead(data) + } + } + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Test Utilities/MethodDescriptor+Common.swift b/Tests/GRPCHTTP2CoreTests/Test Utilities/MethodDescriptor+Common.swift new file mode 100644 index 000000000..a9a31c688 --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Test Utilities/MethodDescriptor+Common.swift @@ -0,0 +1,23 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +extension MethodDescriptor { + static var echoGet: Self { + MethodDescriptor(service: "echo.Echo", method: "Get") + } +}