Browse Source

Add a pooled channel

Motivation:

We added the various internals of the connection pool but are yet to
surface it as public API. This PR does just that.

Modifications:

- Add a `PooledChannel`, a `GRPCChannel` which uses the connection pool
- Add `GRPCPooledChannel` and `GRPCPooledChannel.Configuration` which
  produces `PooledChannel`s as `GRPCChannel`s.
- Add a handful of tests

Result:

We have a usable connection pool in the public API.
George Barnett 4 years ago
parent
commit
d6336e075e

+ 13 - 13
.github/workflows/ci.yaml

@@ -50,16 +50,6 @@ jobs:
       matrix:
         include:
           - image: swift:5.4-focal
-            env:
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 503000
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 215000
-              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
-              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
-              MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
-              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 204000
-              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 211000
-              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 211000
-          - image: swift:5.3-focal
             env:
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 504000
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 216000
@@ -69,16 +59,26 @@ jobs:
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 205000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 212000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 212000
-          - image: swift:5.2-bionic
+          - image: swift:5.3-focal
             env:
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 515000
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 218000
+              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 505000
+              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 217000
               MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
               MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
               MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 206000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 213000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 213000
+          - image: swift:5.2-bionic
+            env:
+              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 516000
+              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 219000
+              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
+              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
+              MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 207000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 214000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 214000
     name: Performance Tests on ${{ matrix.image }}
     runs-on: ubuntu-latest
     container:

+ 35 - 2
Sources/GRPC/ClientConnection.swift

@@ -161,6 +161,13 @@ extension ClientConnection: GRPCChannel {
     let multiplexer = self.getMultiplexer()
     let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop
 
+    // This should be on the same event loop as the multiplexer (i.e. the event loop of the
+    // underlying `Channel`.
+    let channel = multiplexer.eventLoop.makePromise(of: Channel.self)
+    multiplexer.whenComplete {
+      ClientConnection.makeStreamChannel(using: $0, promise: channel)
+    }
+
     return Call(
       path: path,
       type: type,
@@ -168,7 +175,7 @@ extension ClientConnection: GRPCChannel {
       options: options,
       interceptors: interceptors,
       transportFactory: .http2(
-        multiplexer: multiplexer,
+        channel: channel.futureResult,
         authority: self.authority,
         scheme: self.scheme,
         maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
@@ -188,6 +195,13 @@ extension ClientConnection: GRPCChannel {
     let multiplexer = self.getMultiplexer()
     let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop
 
+    // This should be on the same event loop as the multiplexer (i.e. the event loop of the
+    // underlying `Channel`.
+    let channel = multiplexer.eventLoop.makePromise(of: Channel.self)
+    multiplexer.whenComplete {
+      ClientConnection.makeStreamChannel(using: $0, promise: channel)
+    }
+
     return Call(
       path: path,
       type: type,
@@ -195,7 +209,7 @@ extension ClientConnection: GRPCChannel {
       options: options,
       interceptors: interceptors,
       transportFactory: .http2(
-        multiplexer: multiplexer,
+        channel: channel.futureResult,
         authority: self.authority,
         scheme: self.scheme,
         maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
@@ -203,6 +217,20 @@ extension ClientConnection: GRPCChannel {
       )
     )
   }
+
+  private static func makeStreamChannel(
+    using result: Result<HTTP2StreamMultiplexer, Error>,
+    promise: EventLoopPromise<Channel>
+  ) {
+    switch result {
+    case let .success(multiplexer):
+      multiplexer.createStreamChannel(promise: promise) {
+        $0.eventLoop.makeSucceededVoidFuture()
+      }
+    case let .failure(error):
+      promise.fail(error)
+    }
+  }
 }
 
 // MARK: - Configuration structures
@@ -220,6 +248,11 @@ public struct ConnectionTarget {
     self.wrapped = wrapped
   }
 
+  /// The host and port. The port is 443 by default.
+  public static func host(_ host: String, port: Int = 443) -> ConnectionTarget {
+    return ConnectionTarget(.hostAndPort(host, port))
+  }
+
   /// The host and port.
   public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
     return ConnectionTarget(.hostAndPort(host, port))

+ 24 - 0
Sources/GRPC/ConnectionPool/ConnectionPool.swift

@@ -659,3 +659,27 @@ internal enum ConnectionPoolError: Error {
   /// The deadline for creating a stream has passed.
   case deadlineExceeded
 }
+
+extension ConnectionPoolError: GRPCStatusTransformable {
+  internal func makeGRPCStatus() -> GRPCStatus {
+    switch self {
+    case .shutdown:
+      return GRPCStatus(
+        code: .unavailable,
+        message: "The connection pool is shutdown"
+      )
+
+    case .tooManyWaiters:
+      return GRPCStatus(
+        code: .resourceExhausted,
+        message: "The connection pool has no capacity for new RPCs or RPC waiters"
+      )
+
+    case .deadlineExceeded:
+      return GRPCStatus(
+        code: .deadlineExceeded,
+        message: "Timed out waiting for an HTTP/2 stream from the connection pool"
+      )
+    }
+  }
+}

+ 279 - 0
Sources/GRPC/ConnectionPool/GRPCChannelPool.swift

@@ -0,0 +1,279 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIOCore
+import NIOPosix
+
+public enum GRPCChannelPool {
+  /// Make a new ``GRPCChannel`` on which calls may be made to gRPC services.
+  ///
+  /// The channel is backed by one connection pool per event loop, each of which may make multiple
+  /// connections to the given target. The size of the connection pool, and therefore the maximum
+  /// number of connections it may create at a given time is determined by the number of event loops
+  /// in the provided `EventLoopGroup` and the value of
+  /// ``GRPCChannelPool/Configuration/ConnectionPool-swift.struct/connectionsPerEventLoop``.
+  ///
+  /// The event loop and therefore connection chosen for a call is determined by
+  /// ``CallOptions/eventLoopPreference-swift.property``. If the `indifferent` preference is used
+  /// then the least-used event loop is chosen and a connection on that event loop will be selected.
+  /// If an `exact` preference is used then a connection on that event loop will be chosen provided
+  /// the given event loop belongs to the `EventLoopGroup` used to create this ``GRPCChannel``.
+  ///
+  /// Each connection in the pool is initially idle, and no connections will be established until
+  /// a call is made. The pool also closes connections after they have been inactive (i.e. are not
+  /// being used for calls) for some period of time. This is determined by
+  /// ``GRPCChannelPool/Configuration/idleTimeout``.
+  ///
+  /// > Important: The values of `transportSecurity` and `eventLoopGroup` **must** be compatible.
+  /// >
+  /// >   For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/tls(_:)`` the allowed
+  /// >   `EventLoopGroup`s depends on the value of ``GRPCTLSConfiguration``. If a TLS configuration
+  /// >   is known ahead of time, ``PlatformSupport/makeEventLoopGroup(compatibleWith:loopCount:)``
+  /// >   may be used to construct a compatible `EventLoopGroup`.
+  /// >
+  /// >   If the `EventLoopGroup` is known ahead of time then a default TLS configuration may be
+  /// >   constructed with ``GRPCTLSConfiguration/makeClientDefault(compatibleWith:)``.
+  /// >
+  /// >   For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext`` transport
+  /// >   security both `MultiThreadedEventLoopGroup` and `NIOTSEventLoopGroup` (and `EventLoop`s
+  /// >   from either) may be used.
+  ///
+  /// - Parameters:
+  ///   - target: The target to connect to.
+  ///   - transportSecurity: Transport layer security for connections.
+  ///   - eventLoopGroup: The `EventLoopGroup` to run connections on.
+  ///   - configure: A closure which may be used to modify defaulted configuration before
+  ///        constructing the ``GRPCChannel``.
+  /// - Throws: If it is not possible to construct an SSL context. This will never happen when
+  ///     using the ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext``
+  ///     transport security.
+  /// - Returns: A ``GRPCChannel``.
+  public static func with(
+    target: ConnectionTarget,
+    transportSecurity: GRPCChannelPool.Configuration.TransportSecurity,
+    eventLoopGroup: EventLoopGroup,
+    _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
+  ) throws -> GRPCChannel {
+    let configuration = GRPCChannelPool.Configuration.with(
+      target: target,
+      transportSecurity: transportSecurity,
+      eventLoopGroup: eventLoopGroup,
+      configure
+    )
+
+    return try PooledChannel(configuration: configuration)
+  }
+
+  /// See ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)``.
+  public static func with(
+    configuration: GRPCChannelPool.Configuration
+  ) throws -> GRPCChannel {
+    return try PooledChannel(configuration: configuration)
+  }
+}
+
+extension GRPCChannelPool {
+  public struct Configuration {
+    private init(
+      target: ConnectionTarget,
+      transportSecurity: TransportSecurity,
+      eventLoopGroup: EventLoopGroup
+    ) {
+      self.target = target
+      self.transportSecurity = transportSecurity
+      self.eventLoopGroup = eventLoopGroup
+    }
+
+    // Note: we use `configure` blocks to avoid having to add new initializers when properties are
+    // added to the configuration while allowing the configuration to be constructed as a constant.
+
+    /// Construct and configure a ``GRPCChannelPool/Configuration``.
+    ///
+    /// - Parameters:
+    ///   - target: The target to connect to.
+    ///   - transportSecurity: Transport layer security for connections. Note that the value of
+    ///       `eventLoopGroup` must be compatible with the value
+    ///   - eventLoopGroup: The `EventLoopGroup` to run connections on.
+    ///   - configure: A closure which may be used to modify defaulted configuration.
+    public static func with(
+      target: ConnectionTarget,
+      transportSecurity: TransportSecurity,
+      eventLoopGroup: EventLoopGroup,
+      _ configure: (inout Configuration) -> Void = { _ in }
+    ) -> Configuration {
+      var configuration = Configuration(
+        target: target,
+        transportSecurity: transportSecurity,
+        eventLoopGroup: eventLoopGroup
+      )
+      configure(&configuration)
+      return configuration
+    }
+
+    /// The target to connect to.
+    public var target: ConnectionTarget
+
+    /// Connection security.
+    public var transportSecurity: TransportSecurity
+
+    /// The `EventLoopGroup` used by the connection pool.
+    public var eventLoopGroup: EventLoopGroup
+
+    /// Connection pool configuration.
+    public var connectionPool: ConnectionPool = .defaults
+
+    /// HTTP/2 configuration.
+    public var http2: HTTP2 = .defaults
+
+    /// The connection backoff configuration.
+    public var connectionBackoff = ConnectionBackoff()
+
+    /// The amount of time to wait before closing the connection. The idle timeout will start only
+    /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
+    ///
+    /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
+    public var idleTimeout = TimeAmount.minutes(30)
+
+    /// The connection keepalive configuration.
+    public var keepalive = ClientConnectionKeepalive()
+
+    /// The maximum size in bytes of a message which may be received from a server. Defaults to 4MB.
+    ///
+    /// Any received messages whose size exceeds this limit will cause RPCs to fail with
+    /// a `.resourceExhausted` status code.
+    public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 {
+      willSet {
+        precondition(newValue >= 0, "maximumReceiveMessageLength must be positive")
+      }
+    }
+
+    /// A channel initializer which will be run after gRPC has initialized each `NIOCore.Channel`.
+    /// This may be used to add additional handlers to the pipeline and is intended for debugging.
+    ///
+    /// - Warning: The initializer closure may be invoked *multiple times*.
+    public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
+
+    /// An error delegate which is called when errors are caught.
+    public var errorDelegate: ClientErrorDelegate?
+
+    /// A logger used for background activity, such as connection state changes.
+    public var backgroundActivityLogger = Logger(
+      label: "io.grpc",
+      factory: { _ in
+        return SwiftLogNoOpLogHandler()
+      }
+    )
+  }
+}
+
+extension GRPCChannelPool.Configuration {
+  public struct TransportSecurity {
+    private init(_ configuration: GRPCTLSConfiguration?) {
+      self.tlsConfiguration = configuration
+    }
+
+    /// The TLS configuration used. A `nil` value means that no TLS will be used and
+    /// communication at the transport layer will be plaintext.
+    public var tlsConfiguration: Optional<GRPCTLSConfiguration>
+
+    /// Secure the transport layer with TLS.
+    ///
+    /// The TLS backend used depends on the value of `configuration`. See ``GRPCTLSConfiguration``
+    /// for more details.
+    ///
+    /// > Important: the value of `configuration` **must** be compatible with
+    /// > ``GRPCChannelPool/Configuration/eventLoopGroup``. See the documentation of
+    /// > ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)`` for more details.
+    public static func tls(_ configuration: GRPCTLSConfiguration) -> TransportSecurity {
+      return TransportSecurity(configuration)
+    }
+
+    /// Insecure plaintext communication.
+    public static let plaintext = TransportSecurity(nil)
+  }
+}
+
+extension GRPCChannelPool.Configuration {
+  public struct HTTP2: Hashable {
+    private static let allowedTargetWindowSizes = (1 ... Int(Int32.max))
+    private static let allowedMaxFrameSizes = (1 << 14) ... ((1 << 24) - 1)
+
+    /// Default HTTP/2 configuration.
+    public static let defaults = HTTP2()
+
+    public static func with(_ configure: (inout HTTP2) -> Void) -> HTTP2 {
+      var configuration = Self.defaults
+      configure(&configuration)
+      return configuration
+    }
+
+    /// The HTTP/2 max frame size. Defaults to 8MB. Values are clamped between 2^14 and 2^24-1
+    /// octets inclusive (RFC 7540 § 4.2).
+    public var targetWindowSize = 8 * 1024 * 1024 {
+      didSet {
+        self.targetWindowSize = self.targetWindowSize.clamped(to: Self.allowedTargetWindowSizes)
+      }
+    }
+
+    /// The HTTP/2 max frame size. Defaults to 16384. Value is clamped between 2^14 and 2^24-1
+    /// octets inclusive (the minimum and maximum allowable values - HTTP/2 RFC 7540 4.2).
+    public var maxFrameSize: Int = 16384 {
+      didSet {
+        self.maxFrameSize = self.maxFrameSize.clamped(to: Self.allowedMaxFrameSizes)
+      }
+    }
+  }
+}
+
+extension GRPCChannelPool.Configuration {
+  public struct ConnectionPool: Hashable {
+    /// Default connection pool configuration.
+    public static let defaults = ConnectionPool()
+
+    public static func with(_ configure: (inout ConnectionPool) -> Void) -> ConnectionPool {
+      var configuration = Self.defaults
+      configure(&configuration)
+      return configuration
+    }
+
+    /// The maximum number of connections per `EventLoop` that may be created at a given time.
+    ///
+    /// Defaults to 1.
+    public var connectionsPerEventLoop: Int = 1
+
+    /// The maximum number of callers which may be waiting for a stream at any given time on a
+    /// given `EventLoop`.
+    ///
+    /// Any requests for a stream which would cause this limit to be exceeded will be failed
+    /// immediately.
+    ///
+    /// Defaults to 100.
+    public var maxWaitersPerEventLoop: Int = 100
+
+    /// The maximum amount of time a caller is willing to wait for a stream for before timing out.
+    ///
+    /// Defaults to 30 seconds.
+    public var maxWaitTime: TimeAmount = .seconds(30)
+
+    /// The threshold which, if exceeded, when creating a stream determines whether the pool will
+    /// establish another connection (if doing so will not violate ``connectionsPerEventLoop``).
+    ///
+    /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of
+    /// waiters and the number of reserved streams) and the total number of streams which each
+    /// thread _could support.
+    public var reservationLoadThreshold: Double = 0.9
+  }
+}

+ 152 - 0
Sources/GRPC/ConnectionPool/PooledChannel.swift

@@ -0,0 +1,152 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+import NIOHTTP2
+import NIOSSL
+import SwiftProtobuf
+
+internal final class PooledChannel: GRPCChannel {
+  private let configuration: GRPCChannelPool.Configuration
+  private let pool: PoolManager
+  private let authority: String
+  private let scheme: String
+
+  internal init(configuration: GRPCChannelPool.Configuration) throws {
+    self.configuration = configuration
+    self.authority = configuration.target.host
+
+    let tlsMode: DefaultChannelProvider.TLSMode
+    let scheme: String
+
+    if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration {
+      scheme = "https"
+      if let sslContext = try tlsConfiguration.makeNIOSSLContext() {
+        tlsMode = .configureWithNIOSSL(.success(sslContext))
+      } else {
+        // No SSL context means we're using Network.framework.
+        tlsMode = .configureWithNetworkFramework
+      }
+    } else {
+      scheme = "http"
+      tlsMode = .disabled
+    }
+
+    self.scheme = scheme
+
+    let provider = DefaultChannelProvider(
+      connectionTarget: configuration.target,
+      connectionKeepalive: configuration.keepalive,
+      connectionIdleTimeout: configuration.idleTimeout,
+      tlsMode: tlsMode,
+      tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
+      httpTargetWindowSize: configuration.http2.targetWindowSize,
+      httpMaxFrameSize: configuration.http2.targetWindowSize,
+      errorDelegate: configuration.errorDelegate,
+      debugChannelInitializer: configuration.debugChannelInitializer
+    )
+
+    self.pool = PoolManager.makeInitializedPoolManager(
+      using: configuration.eventLoopGroup,
+      perPoolConfiguration: .init(
+        maxConnections: configuration.connectionPool.connectionsPerEventLoop,
+        maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
+        loadThreshold: configuration.connectionPool.reservationLoadThreshold,
+        assumedMaxConcurrentStreams: 100,
+        channelProvider: provider
+      ),
+      logger: configuration.backgroundActivityLogger.wrapped
+    )
+  }
+
+  private func makeStreamChannel(
+    callOptions: CallOptions
+  ) -> (EventLoopFuture<Channel>, EventLoop) {
+    let preferredEventLoop = callOptions.eventLoopPreference.exact
+    let connectionWaitDeadline = NIODeadline.now() + self.configuration.connectionPool.maxWaitTime
+    let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline)
+
+    let streamChannel = self.pool.makeStream(
+      preferredEventLoop: preferredEventLoop,
+      deadline: deadline,
+      logger: GRPCLogger(wrapping: callOptions.logger)
+    ) { channel in
+      return channel.eventLoop.makeSucceededVoidFuture()
+    }
+
+    return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop)
+  }
+
+  // MARK: GRPCChannel conformance
+
+  internal func makeCall<Request, Response>(
+    path: String,
+    type: GRPCCallType,
+    callOptions: CallOptions,
+    interceptors: [ClientInterceptor<Request, Response>]
+  ) -> Call<Request, Response> where Request: Message, Response: Message {
+    let (stream, eventLoop) = self.makeStreamChannel(callOptions: callOptions)
+
+    return Call(
+      path: path,
+      type: type,
+      eventLoop: eventLoop,
+      options: callOptions,
+      interceptors: interceptors,
+      transportFactory: .http2(
+        channel: stream,
+        authority: self.authority,
+        scheme: self.scheme,
+        maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
+        errorDelegate: self.configuration.errorDelegate
+      )
+    )
+  }
+
+  internal func makeCall<Request, Response>(
+    path: String,
+    type: GRPCCallType,
+    callOptions: CallOptions,
+    interceptors: [ClientInterceptor<Request, Response>]
+  ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
+    let (stream, eventLoop) = self.makeStreamChannel(callOptions: callOptions)
+
+    return Call(
+      path: path,
+      type: type,
+      eventLoop: eventLoop,
+      options: callOptions,
+      interceptors: interceptors,
+      transportFactory: .http2(
+        channel: stream,
+        authority: self.authority,
+        scheme: self.scheme,
+        maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
+        errorDelegate: self.configuration.errorDelegate
+      )
+    )
+  }
+
+  internal func close(promise: EventLoopPromise<Void>) {
+    self.pool.shutdown(promise: promise)
+  }
+
+  internal func close() -> EventLoopFuture<Void> {
+    let promise = self.configuration.eventLoopGroup.next().makePromise(of: Void.self)
+    self.pool.shutdown(promise: promise)
+    return promise.futureResult
+  }
+}

+ 12 - 2
Sources/GRPC/GRPCChannel/EmbeddedGRPCChannel.swift

@@ -68,7 +68,7 @@ class EmbeddedGRPCChannel: GRPCChannel {
       options: callOptions,
       interceptors: interceptors,
       transportFactory: .http2(
-        multiplexer: self.multiplexer,
+        channel: self.makeStreamChannel(),
         authority: self.authority,
         scheme: self.scheme,
         // This is internal and only for testing, so max is fine here.
@@ -91,7 +91,7 @@ class EmbeddedGRPCChannel: GRPCChannel {
       options: callOptions,
       interceptors: interceptors,
       transportFactory: .http2(
-        multiplexer: self.multiplexer,
+        channel: self.makeStreamChannel(),
         authority: self.authority,
         scheme: self.scheme,
         // This is internal and only for testing, so max is fine here.
@@ -100,4 +100,14 @@ class EmbeddedGRPCChannel: GRPCChannel {
       )
     )
   }
+
+  private func makeStreamChannel() -> EventLoopFuture<Channel> {
+    let promise = self.eventLoop.makePromise(of: Channel.self)
+    self.multiplexer.whenSuccess {
+      $0.createStreamChannel(promise: promise) {
+        $0.eventLoop.makeSucceededVoidFuture()
+      }
+    }
+    return promise.futureResult
+  }
 }

+ 25 - 7
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -268,6 +268,13 @@ extension ClientTransport: ChannelInboundHandler {
     self.dropReferences()
   }
 
+  @usableFromInline
+  internal func handlerAdded(context: ChannelHandlerContext) {
+    if context.channel.isActive {
+      self.transportActivated(channel: context.channel)
+    }
+  }
+
   @usableFromInline
   internal func errorCaught(context: ChannelHandlerContext, error: Error) {
     self.handleError(error)
@@ -319,15 +326,19 @@ extension ClientTransport {
   private func _transportActivated(channel: Channel) {
     self.callEventLoop.assertInEventLoop()
 
-    if self.state.activate() {
+    switch self.state.activate() {
+    case .unbuffer:
       self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
-
       self._pipeline?.logger = self.logger
       self.logger.debug("activated stream channel")
       self.channel = channel
       self.unbuffer()
-    } else {
+
+    case .close:
       channel.close(mode: .all, promise: nil)
+
+    case .doNothing:
+      ()
     }
   }
 
@@ -660,8 +671,14 @@ extension ClientTransportState {
     }
   }
 
+  enum ActivateAction {
+    case unbuffer
+    case close
+    case doNothing
+  }
+
   /// `channelActive` was invoked on the transport by the `Channel`.
-  mutating func activate() -> Bool {
+  mutating func activate() -> ActivateAction {
     // The channel has become active: what now?
     switch self {
     case .idle:
@@ -669,14 +686,15 @@ extension ClientTransportState {
 
     case .awaitingTransport:
       self = .activatingTransport
-      return true
+      return .unbuffer
 
     case .activatingTransport, .active:
-      preconditionFailure("Invalid state: stream is already active")
+      // Already activated.
+      return .doNothing
 
     case .closing:
       // We remain in closing: we only transition to closed on 'channelInactive'.
-      return false
+      return .close
 
     case .closed:
       preconditionFailure("Invalid state: stream is already inactive")

+ 19 - 30
Sources/GRPC/Interceptor/ClientTransportFactory.swift

@@ -44,14 +44,14 @@ internal struct ClientTransportFactory<Request, Response> {
   ///   - errorDelegate: A client error delegate.
   /// - Returns: A factory for making and configuring HTTP/2 based transport.
   internal static func http2<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
-    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    channel: EventLoopFuture<Channel>,
     authority: String,
     scheme: String,
     maximumReceiveMessageLength: Int,
     errorDelegate: ClientErrorDelegate?
   ) -> ClientTransportFactory<Request, Response> {
     let http2 = HTTP2ClientTransportFactory<Request, Response>(
-      multiplexer: multiplexer,
+      streamChannel: channel,
       scheme: scheme,
       authority: authority,
       serializer: ProtobufSerializer(),
@@ -70,14 +70,14 @@ internal struct ClientTransportFactory<Request, Response> {
   ///   - errorDelegate: A client error delegate.
   /// - Returns: A factory for making and configuring HTTP/2 based transport.
   internal static func http2<Request: GRPCPayload, Response: GRPCPayload>(
-    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    channel: EventLoopFuture<Channel>,
     authority: String,
     scheme: String,
     maximumReceiveMessageLength: Int,
     errorDelegate: ClientErrorDelegate?
   ) -> ClientTransportFactory<Request, Response> {
     let http2 = HTTP2ClientTransportFactory<Request, Response>(
-      multiplexer: multiplexer,
+      streamChannel: channel,
       scheme: scheme,
       authority: authority,
       serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
@@ -169,7 +169,7 @@ internal struct ClientTransportFactory<Request, Response> {
 
 private struct HTTP2ClientTransportFactory<Request, Response> {
   /// The multiplexer providing an HTTP/2 stream for the call.
-  private var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
+  private var streamChannel: EventLoopFuture<Channel>
 
   /// The ":authority" pseudo-header.
   private var authority: String
@@ -190,7 +190,7 @@ private struct HTTP2ClientTransportFactory<Request, Response> {
   private let maximumReceiveMessageLength: Int
 
   fileprivate init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
-    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    streamChannel: EventLoopFuture<Channel>,
     scheme: String,
     authority: String,
     serializer: Serializer,
@@ -198,7 +198,7 @@ private struct HTTP2ClientTransportFactory<Request, Response> {
     maximumReceiveMessageLength: Int,
     errorDelegate: ClientErrorDelegate?
   ) where Serializer.Input == Request, Deserializer.Output == Response {
-    self.multiplexer = multiplexer
+    self.streamChannel = streamChannel
     self.scheme = scheme
     self.authority = authority
     self.serializer = AnySerializer(wrapping: serializer)
@@ -230,31 +230,20 @@ private struct HTTP2ClientTransportFactory<Request, Response> {
 
   fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
     transport.configure { _ in
-      self.multiplexer.flatMap { multiplexer in
-        let streamPromise = self.multiplexer.eventLoop.makePromise(of: Channel.self)
+      self.streamChannel.flatMapThrowing { channel in
+        // This initializer will always occur on the appropriate event loop, sync operations are
+        // fine here.
+        let syncOperations = channel.pipeline.syncOperations
 
-        multiplexer.createStreamChannel(promise: streamPromise) { streamChannel in
-          // This initializer will always occur on the appropriate event loop, sync operations are
-          // fine here.
-          let syncOperations = streamChannel.pipeline.syncOperations
-
-          do {
-            let clientHandler = GRPCClientChannelHandler(
-              callType: transport.callDetails.type,
-              maximumReceiveMessageLength: self.maximumReceiveMessageLength,
-              logger: transport.logger
-            )
-            try syncOperations.addHandler(clientHandler)
-            try syncOperations.addHandler(transport)
-          } catch {
-            return streamChannel.eventLoop.makeFailedFuture(error)
-          }
-
-          return streamChannel.eventLoop.makeSucceededVoidFuture()
+        do {
+          let clientHandler = GRPCClientChannelHandler(
+            callType: transport.callDetails.type,
+            maximumReceiveMessageLength: self.maximumReceiveMessageLength,
+            logger: transport.logger
+          )
+          try syncOperations.addHandler(clientHandler)
+          try syncOperations.addHandler(transport)
         }
-
-        // We don't need the stream, but we do need to know it was correctly configured.
-        return streamPromise.futureResult.map { _ in }
       }
     }
   }

+ 30 - 0
Sources/GRPC/PlatformSupport.swift

@@ -303,7 +303,37 @@ public enum PlatformSupport {
   }
 }
 
+extension PlatformSupport {
+  /// Make an `EventLoopGroup` which is compatible with the given TLS configuration/
+  ///
+  /// - Parameters:
+  ///   - configuration: The configuration to make a compatible `EventLoopGroup` for.
+  ///   - loopCount: The number of loops the `EventLoopGroup` should have.
+  /// - Returns: An `EventLoopGroup` compatible with the given `configuration`.
+  public static func makeEventLoopGroup(
+    compatibleWith configuration: GRPCTLSConfiguration,
+    loopCount: Int
+  ) -> EventLoopGroup {
+    #if canImport(Network)
+    if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
+      if configuration.isNetworkFrameworkTLSBackend {
+        return NIOTSEventLoopGroup(loopCount: loopCount)
+      }
+    }
+    #endif
+    return MultiThreadedEventLoopGroup(numberOfThreads: loopCount)
+  }
+}
+
 extension GRPCTLSConfiguration {
+  /// Provides a `GRPCTLSConfiguration` suitable for the given `EventLoopGroup`.
+  public static func makeClientDefault(
+    compatibleWith eventLoopGroup: EventLoopGroup
+  ) -> GRPCTLSConfiguration {
+    let networkImplementation: NetworkImplementation = .matchingEventLoopGroup(eventLoopGroup)
+    return GRPCTLSConfiguration.makeClientDefault(for: .userDefined(networkImplementation))
+  }
+
   /// Provides a `GRPCTLSConfiguration` suitable for the given network preference.
   public static func makeClientDefault(
     for networkPreference: NetworkPreference

+ 384 - 0
Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift

@@ -0,0 +1,384 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import EchoImplementation
+import EchoModel
+import GRPC
+import GRPCSampleData
+import NIO
+import NIOConcurrencyHelpers
+import XCTest
+
+final class GRPCChannelPoolTests: GRPCTestCase {
+  private var group: MultiThreadedEventLoopGroup!
+  private var server: Server?
+  private var channel: GRPCChannel?
+
+  private var serverPort: Int? {
+    return self.server?.channel.localAddress?.port
+  }
+
+  private var echo: Echo_EchoClient {
+    return Echo_EchoClient(channel: self.channel!)
+  }
+
+  override func tearDown() {
+    if let channel = self.channel {
+      XCTAssertNoThrow(try channel.close().wait())
+    }
+
+    if let server = self.server {
+      XCTAssertNoThrow(try server.close().wait())
+    }
+
+    XCTAssertNoThrow(try self.group.syncShutdownGracefully())
+    super.tearDown()
+  }
+
+  private func configureEventLoopGroup(threads: Int = System.coreCount) {
+    self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
+  }
+
+  private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
+    let builder: Server.Builder
+
+    if withTLS {
+      builder = Server.usingTLSBackedByNIOSSL(
+        on: self.group,
+        certificateChain: [SampleCertificate.server.certificate],
+        privateKey: SamplePrivateKey.server
+      ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
+    } else {
+      builder = Server.insecure(group: self.group)
+    }
+
+    return builder
+      .withLogger(self.serverLogger)
+      .withServiceProviders([EchoProvider()])
+  }
+
+  private func startServer(withTLS: Bool = false) {
+    self.server = try! self.makeServerBuilder(withTLS: withTLS)
+      .bind(host: "localhost", port: 0)
+      .wait()
+  }
+
+  private func startChannel(
+    withTLS: Bool = false,
+    overrideTarget targetOverride: ConnectionTarget? = nil,
+    _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
+  ) {
+    let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
+
+    if withTLS {
+      let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
+        trustRoots: .certificates([SampleCertificate.ca.certificate])
+      )
+      transportSecurity = .tls(configuration)
+    } else {
+      transportSecurity = .plaintext
+    }
+
+    self.channel = try! GRPCChannelPool.with(
+      target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
+      transportSecurity: transportSecurity,
+      eventLoopGroup: self.group
+    ) { configuration in
+      configuration.backgroundActivityLogger = self.clientLogger
+      configure(&configuration)
+    }
+  }
+
+  private func setUpClientAndServer(withTLS tls: Bool) {
+    self.configureEventLoopGroup()
+    self.startServer(withTLS: tls)
+    self.startChannel(withTLS: tls) {
+      // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
+      // want to bounce off the limit as we wait for a connection to come up.
+      $0.connectionPool.maxWaitersPerEventLoop = .max
+    }
+  }
+
+  private func doTestUnaryRPCs(count: Int) throws {
+    var futures: [EventLoopFuture<GRPCStatus>] = []
+    futures.reserveCapacity(count)
+
+    for i in 1 ... count {
+      let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
+      let get = self.echo.get(request)
+      futures.append(get.status)
+    }
+
+    let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
+    XCTAssert(statuses.allSatisfy { $0.isOk })
+  }
+
+  func testUnaryRPCs_plaintext() throws {
+    self.setUpClientAndServer(withTLS: false)
+    try self.doTestUnaryRPCs(count: 100)
+  }
+
+  func testUnaryRPCs_tls() throws {
+    self.setUpClientAndServer(withTLS: true)
+    try self.doTestUnaryRPCs(count: 100)
+  }
+
+  private func doTestClientStreamingRPCs(count: Int) throws {
+    var futures: [EventLoopFuture<GRPCStatus>] = []
+    futures.reserveCapacity(count)
+
+    for i in 1 ... count {
+      let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
+      let collect = self.echo.collect()
+      collect.sendMessage(request, promise: nil)
+      collect.sendMessage(request, promise: nil)
+      collect.sendMessage(request, promise: nil)
+      collect.sendEnd(promise: nil)
+      futures.append(collect.status)
+    }
+
+    let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
+    XCTAssert(statuses.allSatisfy { $0.isOk })
+  }
+
+  func testClientStreamingRPCs_plaintext() throws {
+    self.setUpClientAndServer(withTLS: false)
+    try self.doTestClientStreamingRPCs(count: 100)
+  }
+
+  func testClientStreamingRPCs() throws {
+    self.setUpClientAndServer(withTLS: true)
+    try self.doTestClientStreamingRPCs(count: 100)
+  }
+
+  private func doTestServerStreamingRPCs(count: Int) throws {
+    var futures: [EventLoopFuture<GRPCStatus>] = []
+    futures.reserveCapacity(count)
+
+    for i in 1 ... count {
+      let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
+      let expand = self.echo.expand(request) { _ in }
+      futures.append(expand.status)
+    }
+
+    let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
+    XCTAssert(statuses.allSatisfy { $0.isOk })
+  }
+
+  func testServerStreamingRPCs_plaintext() throws {
+    self.setUpClientAndServer(withTLS: false)
+    try self.doTestServerStreamingRPCs(count: 100)
+  }
+
+  func testServerStreamingRPCs() throws {
+    self.setUpClientAndServer(withTLS: true)
+    try self.doTestServerStreamingRPCs(count: 100)
+  }
+
+  private func doTestBidiStreamingRPCs(count: Int) throws {
+    var futures: [EventLoopFuture<GRPCStatus>] = []
+    futures.reserveCapacity(count)
+
+    for i in 1 ... count {
+      let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
+      let update = self.echo.update { _ in }
+      update.sendMessage(request, promise: nil)
+      update.sendMessage(request, promise: nil)
+      update.sendMessage(request, promise: nil)
+      update.sendEnd(promise: nil)
+      futures.append(update.status)
+    }
+
+    let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
+    XCTAssert(statuses.allSatisfy { $0.isOk })
+  }
+
+  func testBidiStreamingRPCs_plaintext() throws {
+    self.setUpClientAndServer(withTLS: false)
+    try self.doTestBidiStreamingRPCs(count: 100)
+  }
+
+  func testBidiStreamingRPCs() throws {
+    self.setUpClientAndServer(withTLS: true)
+    try self.doTestBidiStreamingRPCs(count: 100)
+  }
+
+  func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
+    // 4 threads == 4 pools
+    self.configureEventLoopGroup(threads: 4)
+    // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
+    self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
+      // Tiny wait time for waiters.
+      $0.connectionPool.maxWaitTime = .milliseconds(50)
+    }
+
+    var statuses: [EventLoopFuture<GRPCStatus>] = []
+    statuses.reserveCapacity(40)
+
+    // Queue RPCs on each loop.
+    for eventLoop in self.group.makeIterator() {
+      let options = CallOptions(eventLoopPreference: .exact(eventLoop))
+      for i in 0 ..< 10 {
+        let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
+        statuses.append(get.status)
+      }
+    }
+
+    let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
+    for result in results {
+      result.assertSuccess {
+        XCTAssertEqual($0.code, .deadlineExceeded)
+      }
+    }
+  }
+
+  func testRPCsAreDistributedAcrossEventLoops() throws {
+    self.configureEventLoopGroup(threads: 4)
+
+    // We don't need a server here, but we do need a different target
+    self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
+      // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
+      // never complete and streams are not returned back to pools.
+      $0.connectionPool.maxWaitTime = .hours(1)
+    }
+
+    let echo = self.echo
+    echo.defaultCallOptions.eventLoopPreference = .indifferent
+
+    let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
+
+    let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
+    for rpcs in rpcsByEventLoop.values {
+      // 40 RPCs over 4 ELs should be 10 RPCs per EL.
+      XCTAssertEqual(rpcs.count, 10)
+    }
+
+    // All RPCs are waiting for connections since we never brought up a server. Each will fail when
+    // we shutdown the pool.
+    XCTAssertNoThrow(try self.channel?.close().wait())
+    // Unset the channel to avoid shutting down again in tearDown().
+    self.channel = nil
+
+    for rpc in rpcs {
+      XCTAssertEqual(try rpc.status.wait().code, .unavailable)
+    }
+  }
+
+  func testWaiterLimitPerEventLoop() throws {
+    self.configureEventLoopGroup(threads: 4)
+    self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
+      $0.connectionPool.maxWaitersPerEventLoop = 10
+      $0.connectionPool.maxWaitTime = .hours(1)
+    }
+
+    let loop = self.group.next()
+    let options = CallOptions(eventLoopPreference: .exact(loop))
+
+    // The first 10 will be waiting for the connection. The 11th should be failed immediately.
+    let rpcs = (1 ... 11).map { _ in
+      self.echo.get(.with { $0.text = "" }, callOptions: options)
+    }
+
+    XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
+
+    // If we express no event loop preference then we should not get the loaded loop.
+    let indifferentLoopRPCs = (1 ... 10).map {
+      _ in echo.get(.with { $0.text = "" })
+    }
+
+    XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
+  }
+
+  func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
+    self.configureEventLoopGroup(threads: 1)
+    self.startServer()
+    self.startChannel {
+      $0.connectionPool.connectionsPerEventLoop = 1
+      $0.connectionPool.maxWaitTime = .hours(1)
+    }
+
+    let lock = Lock()
+    var order = 0
+
+    // We need a connection to be up and running to avoid hitting the waiter limit when creating a
+    // batch of RPCs in one go.
+    let warmup = self.echo.get(.with { $0.text = "" })
+    XCTAssert(try warmup.status.wait().isOk)
+
+    // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
+    // wait because there's already an active connection.
+    let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
+    // The first RPC should (obviously) complete first.
+    rpcs.first!.status.whenComplete { _ in
+      lock.withLock {
+        XCTAssertEqual(order, 0)
+        order += 1
+      }
+    }
+
+    // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
+    // RPC below).
+    rpcs.last!.status.whenComplete { _ in
+      lock.withLock {
+        XCTAssertEqual(order, 1)
+        order += 1
+      }
+    }
+
+    // Still zero: the first RPC is still active.
+    lock.withLockVoid { XCTAssertEqual(order, 0) }
+    // End the first RPC.
+    XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
+    XCTAssertNoThrow(try rpcs.first!.status.wait())
+    lock.withLockVoid { XCTAssertEqual(order, 1) }
+    // End the last RPC.
+    XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
+    XCTAssertNoThrow(try rpcs.last!.status.wait())
+    lock.withLockVoid { XCTAssertEqual(order, 2) }
+
+    // End the rest.
+    for rpc in rpcs.dropFirst().dropLast() {
+      XCTAssertNoThrow(try rpc.sendEnd().wait())
+    }
+  }
+
+  func testRPCOnShutdownPool() {
+    self.configureEventLoopGroup(threads: 1)
+    self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
+
+    let echo = self.echo
+
+    XCTAssertNoThrow(try self.channel?.close().wait())
+    // Avoid shutting down again in tearDown()
+    self.channel = nil
+
+    let get = echo.get(.with { $0.text = "" })
+    XCTAssertEqual(try get.status.wait().code, .unavailable)
+  }
+
+  func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
+    self.configureEventLoopGroup(threads: 1)
+    self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
+      $0.connectionPool.maxWaitTime = .hours(24)
+    }
+
+    // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
+    // (much) later!
+    let options = CallOptions(timeLimit: .deadline(.now()))
+    let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
+
+    XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
+  }
+}

+ 27 - 0
Tests/GRPCTests/PlatformSupportTests.swift

@@ -20,6 +20,10 @@ import NIOPosix
 import NIOTransportServices
 import XCTest
 
+#if canImport(Network)
+import Network
+#endif
+
 class PlatformSupportTests: GRPCTestCase {
   var group: EventLoopGroup!
 
@@ -230,4 +234,27 @@ class PlatformSupportTests: GRPCTestCase {
 
     #endif
   }
+
+  func testMakeCompatibleEventLoopGroupForNIOSSL() {
+    let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL()
+    let group = PlatformSupport.makeEventLoopGroup(compatibleWith: configuration, loopCount: 1)
+    XCTAssertNoThrow(try group.syncShutdownGracefully())
+    XCTAssert(group is MultiThreadedEventLoopGroup)
+  }
+
+  func testMakeCompatibleEventLoopGroupForNetworkFramework() {
+    #if canImport(Network)
+    guard #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) else { return }
+
+    let options = NWProtocolTLS.Options()
+    let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNetworkFramework(
+      options: options
+    )
+
+    let group = PlatformSupport.makeEventLoopGroup(compatibleWith: configuration, loopCount: 1)
+    XCTAssertNoThrow(try group.syncShutdownGracefully())
+    XCTAssert(group is NIOTSEventLoopGroup)
+
+    #endif
+  }
 }