浏览代码

Add a connection pool manager (#1189)

Motivation:

In #1176 we added a per-event-loop connection pool. In order to make a
client which uses these pools we need a way to manage them. This PR adds
a connection pool manager.

Modifications:

- Add a pool manager state machine which tracks connection pools and
  per-pool state such as the number of streams available and reserved
  for each pool.
- Add a pool manager which wraps the state machine.
- Add a pool manager state machine tests; the pool manager isn't tested
  here but will be tested indirectly in a later PR (when a client is
  added to wrap the pool manager).

Result:

We can manage connection pools.
George Barnett 4 年之前
父节点
当前提交
a59aa82fbc

+ 1 - 1
Sources/GRPC/ConnectionPool/ConnectionPool.swift

@@ -553,7 +553,7 @@ extension ConnectionPool: ConnectionManagerHTTP2Delegate {
     }
 
     if delta != 0 {
-      self.streamLender.increaseStreamCapacity(by: delta, for: self)
+      self.streamLender.changeStreamCapacity(by: delta, for: self)
     }
 
     // We always check, even if `delta` isn't greater than zero as this might be a new connection.

+ 313 - 0
Sources/GRPC/ConnectionPool/PoolManager.swift

@@ -0,0 +1,313 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+import NIOConcurrencyHelpers
+
+internal final class PoolManager {
+  /// Configuration used for each connection pool.
+  internal struct PerPoolConfiguration {
+    /// The maximum number of connections per pool.
+    var maxConnections: Int
+
+    /// The maximum number of waiters per pool.
+    var maxWaiters: Int
+
+    /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
+    /// (assuming there is a connection available to start).
+    var loadThreshold: Double
+
+    /// The assumed value of HTTP/2 'SETTINGS_MAX_CONCURRENT_STREAMS'.
+    var assumedMaxConcurrentStreams: Int = 100
+
+    /// The assumed maximum number of streams concurrently available in the pool.
+    var assumedStreamCapacity: Int {
+      return self.maxConnections * self.assumedMaxConcurrentStreams
+    }
+
+    /// A `Channel` provider.
+    var channelProvider: DefaultChannelProvider
+  }
+
+  /// Logging metadata keys
+  private enum Metadata {
+    /// The ID of the pool manager.
+    static let id = "poolmanager.id"
+    /// The number of managed connection pools.
+    static let poolCount = "poolmanager.pools.count"
+    /// The maximum number of connections per pool.
+    static let connectionsPerPool = "poolmanager.pools.conns_per_pool"
+    /// The maximum number of waiters per pool.
+    static let waitersPerPool = "poolmanager.pools.waiters_per_pool"
+  }
+
+  /// The current state of the pool manager, `lock` must be held when accessing or
+  /// modifying `state`.
+  private var state: PoolManagerStateMachine
+  private var pools: [ConnectionPool]
+  private let lock = Lock()
+
+  /// The `EventLoopGroup` providing `EventLoop`s for connection pools. Once initialized the manager
+  /// will hold as many pools as there are loops in this `EventLoopGroup`.
+  private let group: EventLoopGroup
+
+  /// Make a new pool manager and initialize it.
+  ///
+  /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
+  /// Each connection pool is configured using the `perPoolConfiguration`.
+  ///
+  /// - Parameters:
+  ///   - group: The `EventLoopGroup` providing `EventLoop`s to connections managed by the pool
+  ///       manager.
+  ///   - perPoolConfiguration: Configuration used by each connection pool managed by the manager.
+  ///   - logger: A logger.
+  /// - Returns: An initialized pool manager.
+  internal static func makeInitializedPoolManager(
+    using group: EventLoopGroup,
+    perPoolConfiguration: PerPoolConfiguration,
+    logger: GRPCLogger
+  ) -> PoolManager {
+    let manager = PoolManager(group: group)
+    manager.initialize(perPoolConfiguration: perPoolConfiguration, logger: logger)
+    return manager
+  }
+
+  private init(group: EventLoopGroup) {
+    self.state = PoolManagerStateMachine(.inactive)
+    self.pools = []
+    self.group = group
+
+    // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
+    // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
+    // of that we'll only check when running in debug mode.
+    debugOnly {
+      let eventLoopIDs = group.makeIterator().map { ObjectIdentifier($0) }
+      let uniqueEventLoopIDs = Set(eventLoopIDs)
+      assert(
+        eventLoopIDs.count == uniqueEventLoopIDs.count,
+        "'group' contains non-unique event loops"
+      )
+    }
+  }
+
+  deinit {
+    self.lock.withLockVoid {
+      assert(
+        self.state.isShutdownOrShuttingDown,
+        "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope."
+      )
+    }
+  }
+
+  /// Initialize the pool manager, create and initialize one connection pool per event loop in the
+  /// pools `EventLoopGroup`.
+  ///
+  /// - Important: Must only be called once.
+  /// - Parameters:
+  ///   - configuration: The configuration used for each connection pool.
+  ///   - logger: A logger.
+  private func initialize(
+    perPoolConfiguration configuration: PerPoolConfiguration,
+    logger: GRPCLogger
+  ) {
+    var logger = logger
+    logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
+
+    let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
+
+    logger.debug("initializing connection pool manager", metadata: [
+      Metadata.poolCount: "\(pools.count)",
+      Metadata.connectionsPerPool: "\(configuration.maxConnections)",
+      Metadata.waitersPerPool: "\(configuration.maxWaiters)",
+    ])
+
+    // The assumed maximum number of streams concurrently available in each pool.
+    let assumedCapacity = configuration.assumedStreamCapacity
+
+    // The state machine stores the per-pool state keyed by the pools `EventLoopID` and tells the
+    // pool manager about which pool to use/operate via the pools index in `self.pools`.
+    let poolKeys = pools.indices.map { index in
+      return ConnectionPoolKey(
+        index: ConnectionPoolIndex(index),
+        eventLoopID: pools[index].eventLoop.id
+      )
+    }
+
+    self.lock.withLockVoid {
+      assert(self.pools.isEmpty)
+      self.pools = pools
+
+      // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
+      // call `initialize` directly.
+      self.state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity)
+    }
+
+    for pool in pools {
+      pool.initialize(connections: configuration.maxConnections)
+    }
+  }
+
+  /// Make one pool per `EventLoop` in the pool's `EventLoopGroup`.
+  /// - Parameters:
+  ///   - configuration: The configuration to make each pool with.
+  ///   - logger: A logger.
+  /// - Returns: An array of `ConnectionPool`s.
+  private func makePools(
+    perPoolConfiguration configuration: PerPoolConfiguration,
+    logger: GRPCLogger
+  ) -> [ConnectionPool] {
+    let eventLoops = self.group.makeIterator()
+    return eventLoops.map { eventLoop in
+      // We're creating a retain cycle here as each pool will reference the manager and the per-pool
+      // state will hold the pool which will in turn be held by the pool manager. That's okay: when
+      // the pool is shutdown the per-pool state and in turn each connection pool will be dropped.
+      // and we'll break the cycle.
+      return ConnectionPool(
+        eventLoop: eventLoop,
+        maxWaiters: configuration.maxWaiters,
+        reservationLoadThreshold: configuration.loadThreshold,
+        assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
+        channelProvider: configuration.channelProvider,
+        streamLender: self,
+        logger: logger
+      )
+    }
+  }
+
+  // MARK: Stream Creation
+
+  /// A future for a `Channel` from a managed connection pool. The `eventLoop` indicates the loop
+  /// that the `Channel` is running on and therefore which event loop the RPC will use for its
+  /// transport.
+  internal struct PooledStreamChannel {
+    /// The future `Channel`.
+    var futureResult: EventLoopFuture<Channel>
+
+    /// The `EventLoop` that the `Channel` is using.
+    var eventLoop: EventLoop {
+      return self.futureResult.eventLoop
+    }
+  }
+
+  /// Make a stream and initialize it.
+  ///
+  /// - Parameters:
+  ///   - preferredEventLoop: The `EventLoop` that the stream should be created on, if possible. If
+  ///       a pool exists running this `EventLoop` then it will be chosen over all other pools,
+  ///       irregardless of the load on the pool. If no pool exists on the preferred `EventLoop` or
+  ///       no preference is given then the pool with the most streams available will be selected.
+  ///       The `EventLoop` of the selected pool will be the same as the `EventLoop` of
+  ///       the `EventLoopFuture<Channel>` returned from this call.
+  ///   - deadline: The point in time by which the stream must have been selected. If this deadline
+  ///       is passed then the returned `EventLoopFuture` will be failed.
+  ///   - logger: A logger.
+  ///   - initializer: A closure to initialize the `Channel` with.
+  /// - Returns: A `PoolStreamChannel` indicating the future channel and `EventLoop` as that the
+  ///     `Channel` is using. The future will be failed if the pool manager has been shutdown,
+  ///     the deadline has passed before a stream was created or if the selected connection pool
+  ///     is unable to create a stream (if there is too much demand on that pool, for example).
+  internal func makeStream(
+    preferredEventLoop: EventLoop?,
+    deadline: NIODeadline,
+    logger: GRPCLogger,
+    streamInitializer initializer: @escaping (Channel) -> EventLoopFuture<Void>
+  ) -> PooledStreamChannel {
+    let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) }
+    let reservedPool = self.lock.withLock {
+      return self.state.reserveStream(preferringPoolWithEventLoopID: preferredEventLoopID).map {
+        return self.pools[$0.value]
+      }
+    }
+
+    switch reservedPool {
+    case let .success(pool):
+      let channel = pool.makeStream(deadline: deadline, logger: logger, initializer: initializer)
+      return PooledStreamChannel(futureResult: channel)
+
+    case let .failure(error):
+      let eventLoop = preferredEventLoop ?? self.group.next()
+      return PooledStreamChannel(futureResult: eventLoop.makeFailedFuture(error))
+    }
+  }
+
+  // MARK: Shutdown
+
+  /// Shutdown the pool manager and all connection pools it manages.
+  internal func shutdown(promise: EventLoopPromise<Void>) {
+    let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
+      .withLock {
+        let action = self.state.shutdown(promise: promise)
+
+        switch action {
+        case .shutdownPools:
+          // Clear out the pools; we need to shut them down.
+          let pools = self.pools
+          self.pools.removeAll(keepingCapacity: true)
+          return (action, pools)
+
+        case .alreadyShutdown, .alreadyShuttingDown:
+          return (action, nil)
+        }
+      }
+
+    switch (action, pools) {
+    case let (.shutdownPools, .some(pools)):
+      promise.futureResult.whenComplete { _ in self.shutdownComplete() }
+      EventLoopFuture.andAllSucceed(pools.map { $0.shutdown() }, promise: promise)
+
+    case let (.alreadyShuttingDown(future), .none):
+      promise.completeWith(future)
+
+    case (.alreadyShutdown, .none):
+      promise.succeed(())
+
+    case (.shutdownPools, .none),
+         (.alreadyShuttingDown, .some),
+         (.alreadyShutdown, .some):
+      preconditionFailure()
+    }
+  }
+
+  private func shutdownComplete() {
+    self.lock.withLockVoid {
+      self.state.shutdownComplete()
+    }
+  }
+}
+
+// MARK: - Connection Pool to Pool Manager
+
+extension PoolManager: StreamLender {
+  internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
+    self.lock.withLockVoid {
+      self.state.returnStreams(count, toPoolOnEventLoopWithID: pool.eventLoop.id)
+    }
+  }
+
+  internal func changeStreamCapacity(by delta: Int, for pool: ConnectionPool) {
+    self.lock.withLockVoid {
+      self.state.changeStreamCapacity(by: delta, forPoolOnEventLoopWithID: pool.eventLoop.id)
+    }
+  }
+}
+
+internal enum PoolManagerError: Error {
+  /// The pool manager has not been initialized yet.
+  case notInitialized
+
+  /// The pool manager has been shutdown or is in the process of shutting down.
+  case shutdown
+}

+ 88 - 0
Sources/GRPC/ConnectionPool/PoolManagerStateMachine+PerPoolState.swift

@@ -0,0 +1,88 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import NIO
+
+extension PoolManagerStateMachine.ActiveState {
+  internal struct PerPoolState {
+    /// The index of the connection pool associated with this state.
+    internal var poolIndex: PoolManager.ConnectionPoolIndex
+
+    /// The number of streams reserved in the pool.
+    internal private(set) var reservedStreams: Int
+
+    /// The total number of streams which may be available in the pool.
+    internal var maxAvailableStreams: Int
+
+    /// The number of available streams.
+    internal var availableStreams: Int {
+      return self.maxAvailableStreams - self.reservedStreams
+    }
+
+    init(poolIndex: PoolManager.ConnectionPoolIndex, assumedMaxAvailableStreams: Int) {
+      self.poolIndex = poolIndex
+      self.reservedStreams = 0
+      self.maxAvailableStreams = assumedMaxAvailableStreams
+    }
+
+    /// Reserve a stream and return the pool.
+    internal mutating func reserveStream() -> PoolManager.ConnectionPoolIndex {
+      self.reservedStreams += 1
+      return self.poolIndex
+    }
+
+    /// Return a reserved stream.
+    internal mutating func returnReservedStreams(_ count: Int) {
+      self.reservedStreams -= count
+      assert(self.reservedStreams >= 0)
+    }
+  }
+}
+
+extension PoolManager {
+  internal struct ConnectionPoolIndex: Hashable {
+    var value: Int
+
+    init(_ value: Int) {
+      self.value = value
+    }
+  }
+
+  internal struct ConnectionPoolKey: Hashable {
+    /// The index of the connection pool.
+    var index: ConnectionPoolIndex
+
+    /// The ID of the`EventLoop` the connection pool uses.
+    var eventLoopID: EventLoopID
+  }
+}
+
+internal struct EventLoopID: Hashable, CustomStringConvertible {
+  private let id: ObjectIdentifier
+
+  internal init(_ eventLoop: EventLoop) {
+    self.id = ObjectIdentifier(eventLoop)
+  }
+
+  internal var description: String {
+    return String(describing: self.id)
+  }
+}
+
+extension EventLoop {
+  internal var id: EventLoopID {
+    return EventLoopID(self)
+  }
+}

+ 253 - 0
Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift

@@ -0,0 +1,253 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import NIO
+
+internal struct PoolManagerStateMachine {
+  /// The current state.
+  private var state: State
+
+  internal init(_ state: State) {
+    self.state = state
+  }
+
+  internal enum State {
+    case inactive
+    case active(ActiveState)
+    case shuttingDown(EventLoopFuture<Void>)
+    case shutdown
+    case _modifying
+  }
+
+  internal struct ActiveState {
+    internal var pools: [EventLoopID: PerPoolState]
+
+    internal init(
+      poolKeys: [PoolManager.ConnectionPoolKey],
+      assumedMaxAvailableStreamsPerPool: Int
+    ) {
+      self.pools = Dictionary(uniqueKeysWithValues: poolKeys.map { key in
+        let value = PerPoolState(
+          poolIndex: key.index,
+          assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
+        )
+        return (key.eventLoopID, value)
+      })
+    }
+  }
+
+  /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
+  /// `self.state` to the `State` modified by the closure.
+  private mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
+    var state = State._modifying
+    swap(&self.state, &state)
+    defer {
+      self.state = state
+    }
+    return modify(&state)
+  }
+
+  /// Returns whether the pool is shutdown or in the process of shutting down.
+  internal var isShutdownOrShuttingDown: Bool {
+    switch self.state {
+    case .shuttingDown, .shutdown:
+      return true
+    case .inactive, .active:
+      return false
+    case ._modifying:
+      preconditionFailure()
+    }
+  }
+
+  /// Activate the pool manager by providing an array of connection pools.
+  ///
+  /// - Parameters:
+  ///   - keys: The index and `EventLoopID` of the pools.
+  ///   - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
+  ///       is, the product of the assumed value of max concurrent streams and the number of
+  ///       connections per pool).
+  internal mutating func activatePools(
+    keyedBy keys: [PoolManager.ConnectionPoolKey],
+    assumingPerPoolCapacity capacity: Int
+  ) {
+    self.modifyingState { state in
+      switch state {
+      case .inactive:
+        state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity))
+
+      case .active, .shuttingDown, .shutdown, ._modifying:
+        preconditionFailure()
+      }
+    }
+  }
+
+  /// Select and reserve a stream from a connection pool.
+  mutating func reserveStream(
+    preferringPoolWithEventLoopID eventLoopID: EventLoopID?
+  ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
+    return self.modifyingState { state in
+      switch state {
+      case var .active(active):
+        let connectionPoolIndex: PoolManager.ConnectionPoolIndex
+
+        if let index = eventLoopID.flatMap({ eventLoopID in
+          active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
+        }) {
+          connectionPoolIndex = index
+        } else {
+          // Nothing on the preferred event loop; fallback to the pool with the most available
+          // streams.
+          connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
+        }
+
+        state = .active(active)
+        return .success(connectionPoolIndex)
+
+      case .inactive:
+        return .failure(.notInitialized)
+
+      case .shuttingDown, .shutdown:
+        return .failure(.shutdown)
+
+      case ._modifying:
+        preconditionFailure()
+      }
+    }
+  }
+
+  /// Return streams to the given pool.
+  mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
+    self.modifyingState { state in
+      switch state {
+      case var .active(active):
+        active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
+        state = .active(active)
+
+      case .shuttingDown, .shutdown:
+        ()
+
+      case .inactive, ._modifying:
+        // If the manager is inactive there are no pools which can return streams.
+        preconditionFailure()
+      }
+    }
+  }
+
+  /// Update the capacity for the given pool.
+  mutating func changeStreamCapacity(
+    by delta: Int,
+    forPoolOnEventLoopWithID eventLoopID: EventLoopID
+  ) {
+    self.modifyingState { state in
+      switch state {
+      case var .active(active):
+        active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
+        state = .active(active)
+
+      case .shuttingDown, .shutdown:
+        ()
+
+      case .inactive, ._modifying:
+        // If the manager is inactive there are no pools which can update their capacity.
+        preconditionFailure()
+      }
+    }
+  }
+
+  enum ShutdownAction {
+    case shutdownPools
+    case alreadyShutdown
+    case alreadyShuttingDown(EventLoopFuture<Void>)
+  }
+
+  mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
+    self.modifyingState { state in
+      switch state {
+      case .inactive:
+        state = .shutdown
+        return .alreadyShutdown
+
+      case .active:
+        state = .shuttingDown(promise.futureResult)
+        return .shutdownPools
+
+      case let .shuttingDown(future):
+        return .alreadyShuttingDown(future)
+
+      case .shutdown:
+        return .alreadyShutdown
+
+      case ._modifying:
+        preconditionFailure()
+      }
+    }
+  }
+
+  mutating func shutdownComplete() {
+    self.modifyingState { state in
+      switch state {
+      case .shuttingDown:
+        state = .shutdown
+
+      case .inactive, .active, .shutdown, ._modifying:
+        preconditionFailure()
+      }
+    }
+  }
+}
+
+extension PoolManagerStateMachine.ActiveState {
+  mutating func reserveStreamFromPool(
+    onEventLoopWithID eventLoopID: EventLoopID
+  ) -> PoolManager.ConnectionPoolIndex? {
+    return self.pools[eventLoopID]?.reserveStream()
+  }
+
+  mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
+    // We don't allow pools to be empty (while active).
+    assert(!self.pools.isEmpty)
+
+    var mostAvailableStreams = Int.min
+    var mostAvailableIndex = self.pools.values.startIndex
+    var index = mostAvailableIndex
+
+    while index != self.pools.values.endIndex {
+      let availableStreams = self.pools.values[index].availableStreams
+
+      if availableStreams > mostAvailableStreams {
+        mostAvailableIndex = index
+        mostAvailableStreams = availableStreams
+      }
+
+      self.pools.values.formIndex(after: &index)
+    }
+
+    return self.pools.values[mostAvailableIndex].reserveStream()
+  }
+
+  mutating func returnStreams(
+    _ count: Int,
+    toPoolOnEventLoopWithID eventLoopID: EventLoopID
+  ) {
+    self.pools[eventLoopID]?.returnReservedStreams(count)
+  }
+
+  mutating func increaseMaxAvailableStreams(
+    by delta: Int,
+    forPoolOnEventLoopWithID eventLoopID: EventLoopID
+  ) {
+    self.pools[eventLoopID]?.maxAvailableStreams += delta
+  }
+}

+ 1 - 1
Sources/GRPC/ConnectionPool/StreamLender.swift

@@ -19,5 +19,5 @@ internal protocol StreamLender {
   func returnStreams(_ count: Int, to pool: ConnectionPool)
 
   /// Update the total number of streams which may be available at given time for `pool` by `delta`.
-  func increaseStreamCapacity(by delta: Int, for pool: ConnectionPool)
+  func changeStreamCapacity(by delta: Int, for pool: ConnectionPool)
 }

+ 19 - 0
Sources/GRPC/DebugOnly.swift

@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+internal func debugOnly(_ body: () -> Void) {
+  assert({ body(); return true }())
+}

+ 1 - 1
Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift

@@ -789,7 +789,7 @@ internal struct HookedStreamLender: StreamLender {
     self.onReturnStreams(count)
   }
 
-  internal func increaseStreamCapacity(by max: Int, for pool: ConnectionPool) {
+  internal func changeStreamCapacity(by max: Int, for pool: ConnectionPool) {
     self.onUpdateMaxAvailableStreams(max)
   }
 }

+ 365 - 0
Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift

@@ -0,0 +1,365 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@testable import GRPC
+import NIO
+import NIOConcurrencyHelpers
+import XCTest
+
+class PoolManagerStateMachineTests: GRPCTestCase {
+  private func makeConnectionPool(
+    on eventLoop: EventLoop,
+    maxWaiters: Int = 100,
+    maxConcurrentStreams: Int = 100,
+    loadThreshold: Double = 0.9,
+    makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
+  ) -> ConnectionPool {
+    return ConnectionPool(
+      eventLoop: eventLoop,
+      maxWaiters: maxWaiters,
+      reservationLoadThreshold: loadThreshold,
+      assumedMaxConcurrentStreams: maxConcurrentStreams,
+      channelProvider: HookedChannelProvider(makeChannel),
+      streamLender: HookedStreamLender(
+        onReturnStreams: { _ in },
+        onUpdateMaxAvailableStreams: { _ in }
+      ),
+      logger: self.logger.wrapped
+    )
+  }
+
+  private func makeInitializedPools(
+    group: EmbeddedEventLoopGroup,
+    connectionsPerPool: Int = 1
+  ) -> [ConnectionPool] {
+    let pools = group.loops.map {
+      self.makeConnectionPool(on: $0) { _, _ in fatalError() }
+    }
+
+    for pool in pools {
+      pool.initialize(connections: 1)
+    }
+
+    return pools
+  }
+
+  private func makeConnectionPoolKeys(
+    for pools: [ConnectionPool]
+  ) -> [PoolManager.ConnectionPoolKey] {
+    return pools.enumerated().map { index, pool in
+      return .init(index: .init(index), eventLoopID: pool.eventLoop.id)
+    }
+  }
+
+  func testReserveStreamOnPreferredEventLoop() {
+    let group = EmbeddedEventLoopGroup(loops: 5)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
+    let keys = self.makeConnectionPoolKeys(for: pools)
+    var state = PoolManagerStateMachine(
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+    )
+
+    for (index, loop) in group.loops.enumerated() {
+      let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: loop.id)
+      reservePreferredLoop.assertSuccess {
+        XCTAssertEqual($0, PoolManager.ConnectionPoolIndex(index))
+      }
+    }
+  }
+
+  func testReserveStreamOnPreferredEventLoopWhichNoPoolUses() {
+    let group = EmbeddedEventLoopGroup(loops: 1)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
+    let keys = self.makeConnectionPoolKeys(for: pools)
+    var state = PoolManagerStateMachine(
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+    )
+
+    let anotherLoop = EmbeddedEventLoop()
+    let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: anotherLoop.id)
+    reservePreferredLoop.assertSuccess {
+      XCTAssert((0 ..< pools.count).contains($0.value))
+    }
+  }
+
+  func testReserveStreamWithNoPreferenceReturnsPoolWithHighestAvailability() {
+    let group = EmbeddedEventLoopGroup(loops: 5)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
+    let keys = self.makeConnectionPoolKeys(for: pools)
+    var state = PoolManagerStateMachine(.inactive)
+    state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100)
+
+    // Reserve some streams.
+    for (index, loop) in group.loops.enumerated() {
+      for _ in 0 ..< 2 * index {
+        state.reserveStream(preferringPoolWithEventLoopID: loop.id).assertSuccess()
+      }
+    }
+
+    // We expect pools[0] to be reserved.
+    //     index:   0   1   2   3   4
+    // available: 100  98  96  94  92
+    state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
+      XCTAssertEqual(poolIndex.value, 0)
+    }
+
+    // We expect pools[0] to be reserved again.
+    //     index:   0   1   2   3   4
+    // available:  99  98  96  94  92
+    state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
+      XCTAssertEqual(poolIndex.value, 0)
+    }
+
+    // Return some streams to pools[3].
+    state.returnStreams(5, toPoolOnEventLoopWithID: pools[3].eventLoop.id)
+
+    // As we returned streams to pools[3] we expect this to be the current state:
+    //     index:   0   1   2   3   4
+    // available:  98  98  96  99  92
+    state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
+      XCTAssertEqual(poolIndex.value, 3)
+    }
+
+    // Give an event loop preference for a pool which has more streams reserved.
+    state.reserveStream(
+      preferringPoolWithEventLoopID: pools[2].eventLoop.id
+    ).assertSuccess { poolIndex in
+      XCTAssertEqual(poolIndex.value, 2)
+    }
+
+    // Update the capacity for one pool, this makes it relatively more available.
+    state.changeStreamCapacity(by: 900, forPoolOnEventLoopWithID: pools[4].eventLoop.id)
+    // pools[4] has a bunch more streams now:
+    //     index:   0   1   2   3    4
+    // available:  98  98  96  99  992
+    state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
+      XCTAssertEqual(poolIndex.value, 4)
+    }
+  }
+
+  func testReserveStreamWithNoEventLoopPreference() {
+    let group = EmbeddedEventLoopGroup(loops: 1)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
+    let keys = self.makeConnectionPoolKeys(for: pools)
+    var state = PoolManagerStateMachine(
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+    )
+
+    let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil)
+    reservePreferredLoop.assertSuccess()
+  }
+
+  func testReserveStreamWhenInactive() {
+    var state = PoolManagerStateMachine(.inactive)
+    let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
+    action.assertFailure { error in
+      XCTAssertEqual(error, .notInitialized)
+    }
+  }
+
+  func testReserveStreamWhenShuttingDown() {
+    let future = EmbeddedEventLoop().makeSucceededFuture(())
+    var state = PoolManagerStateMachine(.shuttingDown(future))
+    let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
+    action.assertFailure { error in
+      XCTAssertEqual(error, .shutdown)
+    }
+  }
+
+  func testReserveStreamWhenShutdown() {
+    var state = PoolManagerStateMachine(.shutdown)
+    let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
+    action.assertFailure { error in
+      XCTAssertEqual(error, .shutdown)
+    }
+  }
+
+  func testShutdownWhenInactive() {
+    let loop = EmbeddedEventLoop()
+    let promise = loop.makePromise(of: Void.self)
+
+    var state = PoolManagerStateMachine(.inactive)
+    let action = state.shutdown(promise: promise)
+    action.assertAlreadyShutdown()
+
+    // Don't leak the promise.
+    promise.succeed(())
+  }
+
+  func testShutdownWhenActive() {
+    let group = EmbeddedEventLoopGroup(loops: 5)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
+    let keys = self.makeConnectionPoolKeys(for: pools)
+    var state = PoolManagerStateMachine(
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+    )
+
+    let promise = group.loops[0].makePromise(of: Void.self)
+    promise.succeed(())
+
+    state.shutdown(promise: promise).assertShutdownPools()
+  }
+
+  func testShutdownWhenShuttingDown() {
+    let loop = EmbeddedEventLoop()
+    let future = loop.makeSucceededVoidFuture()
+    var state = PoolManagerStateMachine(.shuttingDown(future))
+
+    let promise = loop.makePromise(of: Void.self)
+    promise.succeed(())
+
+    let action = state.shutdown(promise: promise)
+    action.assertAlreadyShuttingDown {
+      XCTAssert($0 === future)
+    }
+
+    // Fully shutdown.
+    state.shutdownComplete()
+    state.shutdown(promise: promise).assertAlreadyShutdown()
+  }
+
+  func testShutdownWhenShutdown() {
+    let loop = EmbeddedEventLoop()
+    var state = PoolManagerStateMachine(.shutdown)
+
+    let promise = loop.makePromise(of: Void.self)
+    promise.succeed(())
+
+    let action = state.shutdown(promise: promise)
+    action.assertAlreadyShutdown()
+  }
+}
+
+// MARK: - Test Helpers
+
+extension Result {
+  internal func assertSuccess(
+    file: StaticString = #file,
+    line: UInt = #line,
+    verify: (Success) -> Void = { _ in }
+  ) {
+    if case let .success(value) = self {
+      verify(value)
+    } else {
+      XCTFail("Expected '.success' but got '\(self)'", file: file, line: line)
+    }
+  }
+
+  internal func assertFailure(
+    file: StaticString = #file,
+    line: UInt = #line,
+    verify: (Failure) -> Void = { _ in }
+  ) {
+    if case let .failure(value) = self {
+      verify(value)
+    } else {
+      XCTFail("Expected '.failure' but got '\(self)'", file: file, line: line)
+    }
+  }
+}
+
+extension PoolManagerStateMachine.ShutdownAction {
+  internal func assertShutdownPools(
+    file: StaticString = #file,
+    line: UInt = #line
+  ) {
+    if case .shutdownPools = self {
+      ()
+    } else {
+      XCTFail("Expected '.shutdownPools' but got '\(self)'", file: file, line: line)
+    }
+  }
+
+  internal func assertAlreadyShuttingDown(
+    file: StaticString = #file,
+    line: UInt = #line,
+    verify: (EventLoopFuture<Void>) -> Void = { _ in }
+  ) {
+    if case let .alreadyShuttingDown(future) = self {
+      verify(future)
+    } else {
+      XCTFail("Expected '.alreadyShuttingDown' but got '\(self)'", file: file, line: line)
+    }
+  }
+
+  internal func assertAlreadyShutdown(file: StaticString = #file, line: UInt = #line) {
+    if case .alreadyShutdown = self {
+      ()
+    } else {
+      XCTFail("Expected '.alreadyShutdown' but got '\(self)'", file: file, line: line)
+    }
+  }
+}
+
+/// An `EventLoopGroup` of `EmbeddedEventLoop`s.
+private final class EmbeddedEventLoopGroup: EventLoopGroup {
+  internal let loops: [EmbeddedEventLoop]
+
+  internal let lock = Lock()
+  internal var index = 0
+
+  internal init(loops: Int) {
+    self.loops = (0 ..< loops).map { _ in EmbeddedEventLoop() }
+  }
+
+  internal func next() -> EventLoop {
+    let index: Int = self.lock.withLock {
+      let index = self.index
+      self.index += 1
+      return index
+    }
+    return self.loops[index % self.loops.count]
+  }
+
+  internal func makeIterator() -> EventLoopIterator {
+    return EventLoopIterator(self.loops)
+  }
+
+  internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
+    var shutdownError: Error?
+
+    for loop in self.loops {
+      loop.shutdownGracefully(queue: queue) { error in
+        if let error = error {
+          shutdownError = error
+        }
+      }
+    }
+
+    queue.sync {
+      callback(shutdownError)
+    }
+  }
+}