Browse Source

Add the HTTP/2 connection (#1859)

Motivation:

To build up a subchannel we need the notion of a connection to a
backend as a building block. The connection provides a single HTTP/2
connection to the remote peer and doesn't deal with backoff or
reconnects.

Modifications:

- Add the 'Connection' object which provides multiplexed streams to a
  connected backend
- Add a 'connector' API which provides a NIO channel and a multiplexer
  on which streams can be created
- Add test Utilities and tests

Result:

Can create a connection to a backend and run streams on it.
George Barnett 1 year ago
parent
commit
ec685b3d67

+ 3 - 2
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -18,8 +18,9 @@ import NIOCore
 import NIOHTTP2
 
 /// An event which happens on a client's HTTP/2 connection.
-enum ClientConnectionEvent: Sendable, Hashable {
-  enum CloseReason: Sendable, Hashable {
+@_spi(Package)
+public enum ClientConnectionEvent: Sendable, Hashable {
+  public enum CloseReason: Sendable, Hashable {
     /// The server sent a GOAWAY frame to the client.
     case goAway(HTTP2ErrorCode, String)
     /// The keep alive timer fired and subsequently timed out.

+ 434 - 0
Sources/GRPCHTTP2Core/Client/Connection/Connection.swift

@@ -0,0 +1,434 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+import NIOConcurrencyHelpers
+import NIOCore
+import NIOHTTP2
+
+/// A `Connection` provides communication to a single remote peer.
+///
+/// Each `Connection` object is 'one-shot': it may only be used for a single connection over
+/// its lifetime. If a connect attempt fails then the `Connection` must be discarded and a new one
+/// must be created. However, an active connection may be used multiple times to provide streams
+/// to the backend.
+///
+/// To use the `Connection` you must run it in a task. You can consume event updates by listening
+/// to `events`:
+///
+/// ```swift
+/// await withTaskGroup(of: Void.self) { group in
+///   group.addTask { await connection.run() }
+///
+///   for await event in connection.events {
+///     switch event {
+///     case .connectSucceeded:
+///       // ...
+///     default:
+///       // ...
+///     }
+///   }
+/// }
+/// ```
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+struct Connection: Sendable {
+  /// Events which can happen over the lifetime of the connection.
+  enum Event: Sendable {
+    /// The connect attempt succeeded and the connection is ready to use.
+    case connectSucceeded
+    /// The connect attempt failed.
+    case connectFailed(any Error)
+    /// The connection received a GOAWAY and will close soon. No new streams
+    /// should be opened on this connection.
+    case goingAway(HTTP2ErrorCode, String)
+    /// The connection is closed.
+    case closed(Connection.CloseReason)
+  }
+
+  /// The reason the connection closed.
+  enum CloseReason: Sendable {
+    /// Closed because an idle timeout fired.
+    case idleTimeout
+    /// Closed because a keepalive timer fired.
+    case keepaliveTimeout
+    /// Closed because the caller initiated shutdown and all RPCs on the connection finished.
+    case initiatedLocally
+    /// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
+    case remote
+    /// Closed because the connection encountered an unexpected error.
+    case error(Error)
+  }
+
+  /// Inputs to the 'run' method.
+  private enum Input: Sendable {
+    case close
+  }
+
+  /// Events which have happened to the connection.
+  private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
+
+  /// Events which the connection must react to.
+  private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
+
+  /// The address to connect to.
+  private let address: SocketAddress
+
+  /// The default compression algorithm used for requests.
+  private let defaultCompression: CompressionAlgorithm
+
+  /// The set of enabled compression algorithms.
+  private let enabledCompression: CompressionAlgorithmSet
+
+  /// A connector used to establish a connection.
+  private let http2Connector: any HTTP2Connector
+
+  /// The state of the connection.
+  private let state: NIOLockedValueBox<State>
+
+  /// The default max request message size in bytes, 4 MiB.
+  private static var defaultMaxRequestMessageSizeBytes: Int {
+    4 * 1024 * 1024
+  }
+
+  /// A stream of events which can happen to the connection.
+  var events: AsyncStream<Event> {
+    self.event.stream
+  }
+
+  init(
+    address: SocketAddress,
+    http2Connector: any HTTP2Connector,
+    defaultCompression: CompressionAlgorithm,
+    enabledCompression: CompressionAlgorithmSet
+  ) {
+    self.address = address
+    self.defaultCompression = defaultCompression
+    self.enabledCompression = enabledCompression
+    self.http2Connector = http2Connector
+    self.event = AsyncStream.makeStream(of: Event.self)
+    self.input = AsyncStream.makeStream(of: Input.self)
+    self.state = NIOLockedValueBox(.notConnected)
+  }
+
+  /// Connect and run the connection.
+  ///
+  /// This function returns when the connection has closed. You can observe connection events
+  /// by consuming the ``events`` sequence.
+  func run() async {
+    let connectResult = await Result {
+      try await self.http2Connector.establishConnection(to: self.address)
+    }
+
+    switch connectResult {
+    case .success(let connected):
+      // Connected successfully, update state and report the event.
+      self.state.withLockedValue { state in
+        state.connected(connected)
+      }
+
+      self.event.continuation.yield(.connectSucceeded)
+
+      await withDiscardingTaskGroup { group in
+        // Add a task to run the connection and consume events.
+        group.addTask {
+          try? await connected.channel.executeThenClose { inbound, outbound in
+            await self.consumeConnectionEvents(inbound)
+          }
+        }
+
+        // Meanwhile, consume input events. This sequence will end when the connection has closed.
+        for await input in self.input.stream {
+          switch input {
+          case .close:
+            let asyncChannel = self.state.withLockedValue { $0.beginClosing() }
+            if let channel = asyncChannel?.channel {
+              let event = ClientConnectionHandler.OutboundEvent.closeGracefully
+              channel.triggerUserOutboundEvent(event, promise: nil)
+            }
+          }
+        }
+      }
+
+    case .failure(let error):
+      // Connect failed, this connection is no longer useful.
+      self.state.withLockedValue { $0.closed() }
+      self.finishStreams(withEvent: .connectFailed(error))
+    }
+  }
+
+  /// Gracefully close the connection.
+  func close() {
+    self.input.continuation.yield(.close)
+  }
+
+  /// Make a stream using the connection if it's connected.
+  ///
+  /// - Parameter descriptor: A descriptor of the method to create a stream for.
+  /// - Returns: The open stream.
+  func makeStream(descriptor: MethodDescriptor, options: CallOptions) async throws -> Stream {
+    let (multiplexer, scheme) = try self.state.withLockedValue { state in
+      switch state {
+      case .connected(let connected):
+        return (connected.multiplexer, connected.scheme)
+      case .notConnected, .closing, .closed:
+        throw RPCError(code: .unavailable, message: "subchannel isn't ready")
+      }
+    }
+
+    let compression: CompressionAlgorithm
+    if let override = options.compression {
+      compression = self.enabledCompression.contains(override) ? override : .none
+    } else {
+      compression = self.defaultCompression
+    }
+
+    let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes
+
+    do {
+      let stream = try await multiplexer.openStream { channel in
+        channel.eventLoop.makeCompletedFuture {
+          let streamHandler = GRPCClientStreamHandler(
+            methodDescriptor: descriptor,
+            scheme: scheme,
+            outboundEncoding: compression,
+            acceptedEncodings: self.enabledCompression,
+            maximumPayloadSize: maxRequestSize
+          )
+          try channel.pipeline.syncOperations.addHandler(streamHandler)
+
+          return try NIOAsyncChannel(
+            wrappingChannelSynchronously: channel,
+            configuration: NIOAsyncChannel.Configuration(
+              isOutboundHalfClosureEnabled: true,
+              inboundType: RPCResponsePart.self,
+              outboundType: RPCRequestPart.self
+            )
+          )
+        }
+      }
+
+      return Stream(wrapping: stream, descriptor: descriptor)
+    } catch {
+      throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error)
+    }
+  }
+
+  private func consumeConnectionEvents(
+    _ connectionEvents: NIOAsyncChannelInboundStream<ClientConnectionEvent>
+  ) async {
+    do {
+      var channelCloseReason: ClientConnectionEvent.CloseReason?
+
+      for try await connectionEvent in connectionEvents {
+        switch connectionEvent {
+        case .closing(let reason):
+          self.state.withLockedValue { $0.closing() }
+
+          switch reason {
+          case .goAway(let errorCode, let reason):
+            // The connection will close at some point soon, yield a notification for this
+            // because the close might not be imminent and this could result in address resolution.
+            self.event.continuation.yield(.goingAway(errorCode, reason))
+          case .idle, .keepaliveExpired, .initiatedLocally:
+            // The connection will be closed imminently in these cases there's no need to do
+            // anything.
+            ()
+          }
+
+          // Take the reason with the highest precedence. A GOAWAY may be superseded by user
+          // closing, for example.
+          if channelCloseReason.map({ reason.precedence > $0.precedence }) ?? true {
+            channelCloseReason = reason
+          }
+        }
+      }
+
+      let connectionCloseReason: Self.CloseReason
+      switch channelCloseReason {
+      case .keepaliveExpired:
+        connectionCloseReason = .keepaliveTimeout
+
+      case .idle:
+        // Connection became idle, that's fine.
+        connectionCloseReason = .idleTimeout
+
+      case .goAway:
+        // Remote peer told us to GOAWAY.
+        connectionCloseReason = .remote
+
+      case .initiatedLocally, .none:
+        // Shutdown was initiated locally.
+        connectionCloseReason = .initiatedLocally
+      }
+
+      // The connection events sequence has finished: the connection is now closed.
+      self.state.withLockedValue { $0.closed() }
+      self.finishStreams(withEvent: .closed(connectionCloseReason))
+    } catch {
+      // Any error must come from consuming the inbound channel meaning that the connection
+      // must be borked, wrap it up and close.
+      let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error)
+      self.state.withLockedValue { $0.closed() }
+      self.finishStreams(withEvent: .closed(.error(rpcError)))
+    }
+  }
+
+  private func finishStreams(withEvent event: Event) {
+    self.event.continuation.yield(event)
+    self.event.continuation.finish()
+    self.input.continuation.finish()
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension Connection {
+  struct Stream {
+    typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
+
+    struct Outbound: ClosableRPCWriterProtocol {
+      typealias Element = RPCRequestPart
+
+      private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
+      private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
+
+      fileprivate init(
+        requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
+        http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
+      ) {
+        self.requestWriter = requestWriter
+        self.http2Stream = http2Stream
+      }
+
+      func write(contentsOf elements: some Sequence<Self.Element>) async throws {
+        try await self.requestWriter.write(contentsOf: elements)
+      }
+
+      func finish() {
+        self.requestWriter.finish()
+      }
+
+      func finish(throwing error: any Error) {
+        // Fire the error inbound; this fails the inbound writer.
+        self.http2Stream.channel.pipeline.fireErrorCaught(error)
+      }
+    }
+
+    let descriptor: MethodDescriptor
+
+    private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
+
+    init(
+      wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
+      descriptor: MethodDescriptor
+    ) {
+      self.http2Stream = stream
+      self.descriptor = descriptor
+    }
+
+    func execute<T>(
+      _ closure: (_ inbound: Inbound, _ outbound: Outbound) async throws -> T
+    ) async throws -> T where T: Sendable {
+      try await self.http2Stream.executeThenClose { inbound, outbound in
+        return try await closure(
+          inbound,
+          Outbound(requestWriter: outbound, http2Stream: self.http2Stream)
+        )
+      }
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension Connection {
+  private enum State {
+    /// The connection is idle or connecting.
+    case notConnected
+    /// A connection has been established with the remote peer.
+    case connected(Connected)
+    /// The connection has started to close. This may be initiated locally or by the remote.
+    case closing
+    /// The connection has closed. This is a terminal state.
+    case closed
+
+    struct Connected {
+      /// The connection channel.
+      var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
+      /// Multiplexer for creating HTTP/2 streams.
+      var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
+      /// Whether the connection is plaintext, `false` implies TLS is being used.
+      var scheme: Scheme
+
+      init(_ connection: HTTP2Connection) {
+        self.channel = connection.channel
+        self.multiplexer = connection.multiplexer
+        self.scheme = connection.isPlaintext ? .http : .https
+      }
+    }
+
+    mutating func connected(_ channel: HTTP2Connection) {
+      switch self {
+      case .notConnected:
+        self = .connected(State.Connected(channel))
+      case .connected, .closing, .closed:
+        fatalError("Invalid state: 'run()' must only be called once")
+      }
+    }
+
+    mutating func beginClosing() -> NIOAsyncChannel<ClientConnectionEvent, Void>? {
+      switch self {
+      case .notConnected:
+        fatalError("Invalid state: 'run()' must be called first")
+      case .connected(let connected):
+        self = .closing
+        return connected.channel
+      case .closing, .closed:
+        return nil
+      }
+    }
+
+    mutating func closing() {
+      switch self {
+      case .notConnected:
+        // Not reachable: happens as a result of a connection event, that can only happen if
+        // the connection has started (i.e. must be in the 'connected' state or later).
+        fatalError("Invalid state")
+      case .connected:
+        self = .closing
+      case .closing, .closed:
+        ()
+      }
+    }
+
+    mutating func closed() {
+      self = .closed
+    }
+  }
+}
+
+extension ClientConnectionEvent.CloseReason {
+  fileprivate var precedence: Int {
+    switch self {
+    case .goAway:
+      return 0
+    case .idle:
+      return 1
+    case .keepaliveExpired:
+      return 2
+    case .initiatedLocally:
+      return 3
+    }
+  }
+}

+ 48 - 0
Sources/GRPCHTTP2Core/Client/Connection/ConnectionFactory.swift

@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import NIOCore
+import NIOHTTP2
+import NIOPosix
+
+@_spi(Package)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+public protocol HTTP2Connector: Sendable {
+  func establishConnection(to address: SocketAddress) async throws -> HTTP2Connection
+}
+
+@_spi(Package)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+public struct HTTP2Connection {
+  /// The underlying TCP connection wrapped up for use with gRPC.
+  var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
+
+  /// An HTTP/2 stream multiplexer.
+  var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
+
+  /// Whether the connection is insecure (i.e. plaintext).
+  var isPlaintext: Bool
+
+  public init(
+    channel: NIOAsyncChannel<ClientConnectionEvent, Void>,
+    multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>,
+    isPlaintext: Bool
+  ) {
+    self.channel = channel
+    self.multiplexer = multiplexer
+    self.isPlaintext = isPlaintext
+  }
+}

+ 32 - 0
Sources/GRPCHTTP2Core/Internal/AsyncStream+MakeStream.swift

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if swift(<5.9)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension AsyncStream {
+  @inlinable
+  static func makeStream(
+    of elementType: Element.Type = Element.self,
+    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
+  ) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
+    var continuation: AsyncStream<Element>.Continuation!
+    let stream = AsyncStream(Element.self, bufferingPolicy: limit) {
+      continuation = $0
+    }
+    return (stream, continuation)
+  }
+}
+#endif

+ 30 - 0
Sources/GRPCHTTP2Core/Internal/Result+Catching.swift

@@ -0,0 +1,30 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension Result where Failure == any Error {
+  /// Like `Result(catching:)`, but `async`.
+  ///
+  /// - Parameter body: An `async` closure to catch the result of.
+  @inlinable
+  init(catching body: () async throws -> Success) async {
+    do {
+      self = .success(try await body())
+    } catch {
+      self = .failure(error)
+    }
+  }
+}

+ 1 - 2
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift

@@ -14,13 +14,12 @@
  * limitations under the License.
  */
 
+@_spi(Package) @testable import GRPCHTTP2Core
 import NIOCore
 import NIOEmbedded
 import NIOHTTP2
 import XCTest
 
-@testable import GRPCHTTP2Core
-
 final class ClientConnectionHandlerTests: XCTestCase {
   func testMaxIdleTime() throws {
     let connection = try Connection(maxIdleTime: .minutes(5))

+ 62 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/Connection+Equatable.swift

@@ -0,0 +1,62 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+
+@testable import GRPCHTTP2Core
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension Connection.Event: Equatable {
+  public static func == (lhs: Connection.Event, rhs: Connection.Event) -> Bool {
+    switch (lhs, rhs) {
+    case (.connectSucceeded, .connectSucceeded),
+      (.connectFailed, .connectFailed):
+      return true
+
+    case (.goingAway(let lhsCode, let lhsReason), .goingAway(let rhsCode, let rhsReason)):
+      return lhsCode == rhsCode && lhsReason == rhsReason
+
+    case (.closed(let lhsReason), .closed(let rhsReason)):
+      return lhsReason == rhsReason
+
+    default:
+      return false
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension Connection.CloseReason: Equatable {
+  public static func == (lhs: Connection.CloseReason, rhs: Connection.CloseReason) -> Bool {
+    switch (lhs, rhs) {
+    case (.idleTimeout, .idleTimeout),
+      (.keepaliveTimeout, .keepaliveTimeout),
+      (.initiatedLocally, .initiatedLocally),
+      (.remote, .remote):
+      return true
+
+    case (.error(let lhsError), .error(let rhsError)):
+      if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError {
+        return lhs == rhs
+      } else {
+        return true
+      }
+
+    default:
+      return false
+    }
+  }
+}

+ 219 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ConnectionTests.swift

@@ -0,0 +1,219 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import DequeModule
+import GRPCCore
+@_spi(Package) @testable import GRPCHTTP2Core
+import NIOConcurrencyHelpers
+import NIOCore
+import NIOHPACK
+import NIOHTTP2
+import NIOPosix
+import XCTest
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+final class ConnectionTests: XCTestCase {
+  func testConnectThenClose() async throws {
+    try await ConnectionTest.run(connector: .posix()) { context, event in
+      switch event {
+      case .connectSucceeded:
+        context.connection.close()
+      default:
+        ()
+      }
+    } validateEvents: { _, events in
+      XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
+    }
+  }
+
+  func testConnectThenIdleTimeout() async throws {
+    try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in
+      XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)])
+    }
+  }
+
+  func testConnectThenKeepaliveTimeout() async throws {
+    try await ConnectionTest.run(
+      connector: .posix(
+        keepaliveTime: .milliseconds(50),
+        keepaliveTimeout: .milliseconds(10),
+        keepaliveWithoutCalls: true,
+        dropPingAcks: true
+      )
+    ) { _, events in
+      XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)])
+    }
+  }
+
+  func testGoAwayWhenConnected() async throws {
+    try await ConnectionTest.run(connector: .posix()) { context, event in
+      switch event {
+      case .connectSucceeded:
+        let goAway = HTTP2Frame(
+          streamID: .rootStream,
+          payload: .goAway(
+            lastStreamID: 0,
+            errorCode: .noError,
+            opaqueData: ByteBuffer(string: "Hello!")
+          )
+        )
+
+        let accepted = try context.server.acceptedChannel
+        accepted.writeAndFlush(goAway, promise: nil)
+
+      default:
+        ()
+      }
+    } validateEvents: { _, events in
+      XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)])
+    }
+  }
+
+  func testConnectFails() async throws {
+    let error = RPCError(code: .unimplemented, message: "")
+    try await ConnectionTest.run(connector: .throwing(error)) { _, events in
+      XCTAssertEqual(events, [.connectFailed(error)])
+    }
+  }
+
+  func testMakeStreamOnActiveConnection() async throws {
+    try await ConnectionTest.run(connector: .posix()) { context, event in
+      switch event {
+      case .connectSucceeded:
+        let stream = try await context.connection.makeStream(
+          descriptor: .echoGet,
+          options: .defaults
+        )
+        try await stream.execute { inbound, outbound in
+          try await outbound.write(.metadata(["foo": "bar", "bar": "baz"]))
+          try await outbound.write(.message([0, 1, 2]))
+          outbound.finish()
+
+          var parts = [RPCResponsePart]()
+          for try await part in inbound {
+            switch part {
+            case .metadata(let metadata):
+              // Filter out any transport specific metadata
+              parts.append(.metadata(Metadata(metadata.suffix(2))))
+            case .message, .status:
+              parts.append(part)
+            }
+          }
+
+          let expected: [RPCResponsePart] = [
+            .metadata(["foo": "bar", "bar": "baz"]),
+            .message([0, 1, 2]),
+            .status(Status(code: .ok, message: ""), [:]),
+          ]
+          XCTAssertEqual(parts, expected)
+        }
+
+        context.connection.close()
+
+      default:
+        ()
+      }
+    } validateEvents: { _, events in
+      XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
+    }
+  }
+
+  func testMakeStreamOnClosedConnection() async throws {
+    try await ConnectionTest.run(connector: .posix()) { context, event in
+      switch event {
+      case .connectSucceeded:
+        context.connection.close()
+      case .closed:
+        await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
+          _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults)
+        } errorHandler: { error in
+          XCTAssertEqual(error.code, .unavailable)
+        }
+      default:
+        ()
+      }
+    } validateEvents: { context, events in
+      XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
+    }
+  }
+
+  func testMakeStreamOnNotRunningConnection() async throws {
+    let connection = Connection(
+      address: .ipv4(host: "ignored", port: 0),
+      http2Connector: .never,
+      defaultCompression: .none,
+      enabledCompression: .none
+    )
+
+    await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
+      _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults)
+    } errorHandler: { error in
+      XCTAssertEqual(error.code, .unavailable)
+    }
+  }
+}
+
+extension ClientBootstrap {
+  func connect<T>(
+    to address: GRPCHTTP2Core.SocketAddress,
+    _ configure: @Sendable @escaping (Channel) -> EventLoopFuture<T>
+  ) async throws -> T {
+    if let ipv4 = address.ipv4 {
+      return try await self.connect(
+        host: ipv4.host,
+        port: ipv4.port,
+        channelInitializer: configure
+      )
+    } else if let ipv6 = address.ipv6 {
+      return try await self.connect(
+        host: ipv6.host,
+        port: ipv6.port,
+        channelInitializer: configure
+      )
+    } else if let uds = address.unixDomainSocket {
+      return try await self.connect(
+        unixDomainSocketPath: uds.path,
+        channelInitializer: configure
+      )
+    } else if let vsock = address.virtualSocket {
+      return try await self.connect(
+        to: VsockAddress(
+          cid: .init(Int(vsock.contextID.rawValue)),
+          port: .init(Int(vsock.port.rawValue))
+        ),
+        channelInitializer: configure
+      )
+    } else {
+      throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)")
+    }
+  }
+}
+
+extension Metadata {
+  init(_ sequence: some Sequence<Element>) {
+    var metadata = Metadata()
+    for (key, value) in sequence {
+      switch value {
+      case .string(let value):
+        metadata.addString(value, forKey: key)
+      case .binary(let value):
+        metadata.addBinary(value, forKey: key)
+      }
+    }
+
+    self = metadata
+  }
+}

+ 202 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift

@@ -0,0 +1,202 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import DequeModule
+import GRPCCore
+@_spi(Package) @testable import GRPCHTTP2Core
+import NIOCore
+import NIOHTTP2
+import NIOPosix
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+enum ConnectionTest {
+  struct Context {
+    var server: Server
+    var connection: Connection
+  }
+
+  static func run(
+    connector: HTTP2Connector,
+    handlEvents: (
+      _ context: Context,
+      _ event: Connection.Event
+    ) async throws -> Void = { _, _ in },
+    validateEvents: (_ context: Context, _ events: [Connection.Event]) -> Void
+  ) async throws {
+    let server = Server()
+    let address = try await server.bind()
+
+    try await withThrowingTaskGroup(of: Void.self) { group in
+      let connection = Connection(
+        address: address,
+        http2Connector: connector,
+        defaultCompression: .none,
+        enabledCompression: .none
+      )
+      let context = Context(server: server, connection: connection)
+      group.addTask { await connection.run() }
+
+      var events: [Connection.Event] = []
+      for await event in connection.events {
+        events.append(event)
+        try await handlEvents(context, event)
+      }
+
+      validateEvents(context, events)
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension ConnectionTest {
+  /// A server which only expected to accept a single connection.
+  final class Server {
+    private let eventLoop: any EventLoop
+    private var listener: (any Channel)?
+    private let client: EventLoopPromise<Channel>
+
+    init() {
+      self.eventLoop = .singletonMultiThreadedEventLoopGroup.next()
+      self.client = self.eventLoop.next().makePromise()
+    }
+
+    deinit {
+      self.listener?.close(promise: nil)
+      self.client.futureResult.whenSuccess { $0.close(mode: .all, promise: nil) }
+    }
+
+    var acceptedChannel: Channel {
+      get throws {
+        try self.client.futureResult.wait()
+      }
+    }
+
+    func bind() async throws -> GRPCHTTP2Core.SocketAddress {
+      precondition(self.listener == nil, "\(#function) must only be called once")
+
+      let hasAcceptedChannel = try await self.eventLoop.submit {
+        NIOLoopBoundBox(false, eventLoop: self.eventLoop)
+      }.get()
+
+      let bootstrap = ServerBootstrap(group: self.eventLoop).childChannelInitializer { channel in
+        precondition(!hasAcceptedChannel.value, "already accepted a channel")
+        hasAcceptedChannel.value = true
+
+        return channel.eventLoop.makeCompletedFuture {
+          let sync = channel.pipeline.syncOperations
+          let h2 = NIOHTTP2Handler(mode: .server)
+          let mux = HTTP2StreamMultiplexer(mode: .server, channel: channel) { stream in
+            let sync = stream.pipeline.syncOperations
+            let handler = GRPCServerStreamHandler(
+              scheme: .http,
+              acceptedEncodings: .none,
+              maximumPayloadSize: .max
+            )
+
+            return stream.eventLoop.makeCompletedFuture {
+              try sync.addHandler(handler)
+              try sync.addHandler(EchoHandler())
+            }
+          }
+
+          try sync.addHandler(h2)
+          try sync.addHandler(mux)
+          try sync.addHandlers(SucceedOnSettingsAck(promise: self.client))
+        }
+      }
+
+      let channel = try await bootstrap.bind(host: "127.0.0.1", port: 0).get()
+      self.listener = channel
+      return .ipv4(host: "127.0.0.1", port: channel.localAddress!.port!)
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension ConnectionTest {
+  /// Succeeds a promise when a SETTINGS frame ack has been read.
+  private final class SucceedOnSettingsAck: ChannelInboundHandler {
+    typealias InboundIn = HTTP2Frame
+    typealias InboundOut = HTTP2Frame
+
+    private let promise: EventLoopPromise<Channel>
+
+    init(promise: EventLoopPromise<Channel>) {
+      self.promise = promise
+    }
+
+    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+      let frame = self.unwrapInboundIn(data)
+      switch frame.payload {
+      case .settings(.ack):
+        self.promise.succeed(context.channel)
+      default:
+        ()
+      }
+
+      context.fireChannelRead(data)
+    }
+  }
+
+  private final class EchoHandler: ChannelInboundHandler {
+    typealias InboundIn = RPCRequestPart
+    typealias OutboundOut = RPCResponsePart
+
+    private var received: Deque<RPCRequestPart> = []
+    private var receivedEnd = false
+
+    func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
+      if let event = event as? ChannelEvent, event == .inputClosed {
+        self.receivedEnd = true
+      }
+    }
+
+    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+      self.received.append(self.unwrapInboundIn(data))
+    }
+
+    func channelReadComplete(context: ChannelHandlerContext) {
+      while let part = self.received.popFirst() {
+        switch part {
+        case .metadata(let metadata):
+          var filtered = Metadata()
+
+          // Remove any pseudo-headers.
+          for (key, value) in metadata where !key.hasPrefix(":") {
+            switch value {
+            case .string(let value):
+              filtered.addString(value, forKey: key)
+            case .binary(let value):
+              filtered.addBinary(value, forKey: key)
+            }
+          }
+
+          context.write(self.wrapOutboundOut(.metadata(filtered)), promise: nil)
+
+        case .message(let message):
+          context.write(self.wrapOutboundOut(.message(message)), promise: nil)
+        }
+      }
+
+      if self.receivedEnd {
+        let status = Status(code: .ok, message: "")
+        context.write(self.wrapOutboundOut(.status(status, [:])), promise: nil)
+      }
+
+      context.flush()
+    }
+  }
+}

+ 155 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/HTTP2Connectors.swift

@@ -0,0 +1,155 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+@_spi(Package) @testable import GRPCHTTP2Core
+import NIOCore
+import NIOHTTP2
+import NIOPosix
+
+@_spi(Package)
+extension HTTP2Connector where Self == ThrowingConnector {
+  /// A connector which throws the given error on a connect attempt.
+  static func throwing(_ error: RPCError) -> Self {
+    return ThrowingConnector(error: error)
+  }
+}
+
+@_spi(Package)
+extension HTTP2Connector where Self == NeverConnector {
+  /// A connector which fatal errors if a connect attempt is made.
+  static var never: Self {
+    NeverConnector()
+  }
+}
+
+@_spi(Package)
+extension HTTP2Connector where Self == NIOPosixConnector {
+  /// A connector which uses NIOPosix to establish a connection.
+  static func posix(
+    maxIdleTime: TimeAmount? = nil,
+    keepaliveTime: TimeAmount? = nil,
+    keepaliveTimeout: TimeAmount? = nil,
+    keepaliveWithoutCalls: Bool = false,
+    dropPingAcks: Bool = false
+  ) -> Self {
+    return NIOPosixConnector(
+      maxIdleTime: maxIdleTime,
+      keepaliveTime: keepaliveTime,
+      keepaliveTimeout: keepaliveTimeout,
+      keepaliveWithoutCalls: keepaliveWithoutCalls,
+      dropPingAcks: dropPingAcks
+    )
+  }
+}
+
+struct ThrowingConnector: HTTP2Connector {
+  private let error: RPCError
+
+  init(error: RPCError) {
+    self.error = error
+  }
+
+  func establishConnection(
+    to address: GRPCHTTP2Core.SocketAddress
+  ) async throws -> HTTP2Connection {
+    throw self.error
+  }
+}
+
+struct NeverConnector: HTTP2Connector {
+  func establishConnection(
+    to address: GRPCHTTP2Core.SocketAddress
+  ) async throws -> HTTP2Connection {
+    fatalError("\(#function) called unexpectedly")
+  }
+}
+
+struct NIOPosixConnector: HTTP2Connector {
+  private let eventLoopGroup: any EventLoopGroup
+  private let maxIdleTime: TimeAmount?
+  private let keepaliveTime: TimeAmount?
+  private let keepaliveTimeout: TimeAmount?
+  private let keepaliveWithoutCalls: Bool
+  private let dropPingAcks: Bool
+
+  init(
+    eventLoopGroup: (any EventLoopGroup)? = nil,
+    maxIdleTime: TimeAmount? = nil,
+    keepaliveTime: TimeAmount? = nil,
+    keepaliveTimeout: TimeAmount? = nil,
+    keepaliveWithoutCalls: Bool = false,
+    dropPingAcks: Bool = false
+  ) {
+    self.eventLoopGroup = eventLoopGroup ?? .singletonMultiThreadedEventLoopGroup
+    self.maxIdleTime = maxIdleTime
+    self.keepaliveTime = keepaliveTime
+    self.keepaliveTimeout = keepaliveTimeout
+    self.keepaliveWithoutCalls = keepaliveWithoutCalls
+    self.dropPingAcks = dropPingAcks
+  }
+
+  func establishConnection(
+    to address: GRPCHTTP2Core.SocketAddress
+  ) async throws -> HTTP2Connection {
+    return try await ClientBootstrap(group: self.eventLoopGroup).connect(to: address) { channel in
+      channel.eventLoop.makeCompletedFuture {
+        let sync = channel.pipeline.syncOperations
+
+        let multiplexer = try sync.configureAsyncHTTP2Pipeline(mode: .client) { stream in
+          // Server shouldn't be opening streams.
+          stream.close()
+        }
+
+        if self.dropPingAcks {
+          try sync.addHandler(PingAckDropper())
+        }
+
+        let connectionHandler = ClientConnectionHandler(
+          eventLoop: channel.eventLoop,
+          maxIdleTime: self.maxIdleTime,
+          keepaliveTime: self.keepaliveTime,
+          keepaliveTimeout: self.keepaliveTimeout,
+          keepaliveWithoutCalls: self.keepaliveWithoutCalls
+        )
+
+        try sync.addHandler(connectionHandler)
+
+        let asyncChannel = try NIOAsyncChannel<ClientConnectionEvent, Void>(
+          wrappingChannelSynchronously: channel
+        )
+
+        return HTTP2Connection(channel: asyncChannel, multiplexer: multiplexer, isPlaintext: true)
+      }
+    }
+  }
+
+  /// Drops all acks for PING frames. This is useful to help trigger the keepalive timeout.
+  final class PingAckDropper: ChannelInboundHandler {
+    typealias InboundIn = HTTP2Frame
+    typealias InboundOut = HTTP2Frame
+
+    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+      let frame = self.unwrapInboundIn(data)
+      switch frame.payload {
+      case .ping(_, ack: true):
+        ()  // drop-it
+      default:
+        context.fireChannelRead(data)
+      }
+    }
+  }
+}

+ 23 - 0
Tests/GRPCHTTP2CoreTests/Test Utilities/MethodDescriptor+Common.swift

@@ -0,0 +1,23 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+
+extension MethodDescriptor {
+  static var echoGet: Self {
+    MethodDescriptor(service: "echo.Echo", method: "Get")
+  }
+}