Pārlūkot izejas kodu

Add subpool stats (#1852)

Modification:

It can be helpful to track the state of the connection pool as a whole
over time to understand how heavily utilised it is. This is awkward to
do at the moment because delegate functions are called on connection
state changes. There's also no insight into how many RPCs are queued
waiting for a stream.

Modifications:

- Add a 'GRPCSubPoolStats' type capturing stats from each subpool. This
  includes:
  - counts of connections in each state
  - streams in use
  - streams which are free to use
  - number of rpcs waiting for a stream
- Add a delegate method which is passed an array of subpool stats (one
  per subpool) and add a no-op default implementation.
- Add configuration to determine how frequently stats are collected

Result:

More insight into pool stats
George Barnett 1 gadu atpakaļ
vecāks
revīzija
26e177e634

+ 4 - 4
Sources/GRPC/ConnectionManager.swift

@@ -338,12 +338,12 @@ internal final class ConnectionManager: @unchecked Sendable {
   /// A logger.
   internal var logger: Logger
 
-  private let connectionID: String
+  internal let id: ConnectionManagerID
   private var channelNumber: UInt64
   private var channelNumberLock = NIOLock()
 
   private var _connectionIDAndNumber: String {
-    return "\(self.connectionID)/\(self.channelNumber)"
+    return "\(self.id)/\(self.channelNumber)"
   }
 
   private var connectionIDAndNumber: String {
@@ -394,7 +394,7 @@ internal final class ConnectionManager: @unchecked Sendable {
   ) {
     // Setup the logger.
     var logger = logger
-    let connectionID = UUID().uuidString
+    let connectionID = ConnectionManagerID()
     let channelNumber: UInt64 = 0
     logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
 
@@ -408,7 +408,7 @@ internal final class ConnectionManager: @unchecked Sendable {
     self.http2Delegate = http2Delegate
     self.idleBehavior = idleBehavior
 
-    self.connectionID = connectionID
+    self.id = connectionID
     self.channelNumber = channelNumber
     self.logger = logger
   }

+ 6 - 11
Sources/GRPC/ConnectionPool/ConnectionManagerID.swift

@@ -14,25 +14,20 @@
  * limitations under the License.
  */
 
+import struct Foundation.UUID
+
 @usableFromInline
 internal struct ConnectionManagerID: Hashable, CustomStringConvertible, Sendable {
   @usableFromInline
-  internal let _id: ObjectIdentifier
+  internal let id: String
 
   @usableFromInline
-  internal init(_ manager: ConnectionManager) {
-    self._id = ObjectIdentifier(manager)
+  internal init() {
+    self.id = UUID().uuidString
   }
 
   @usableFromInline
   internal var description: String {
-    return String(describing: self._id)
-  }
-}
-
-extension ConnectionManager {
-  @usableFromInline
-  internal var id: ConnectionManagerID {
-    return ConnectionManagerID(self)
+    return String(describing: self.id)
   }
 }

+ 54 - 3
Sources/GRPC/ConnectionPool/ConnectionPool.swift

@@ -13,6 +13,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+import Atomics
 import Logging
 import NIOConcurrencyHelpers
 import NIOCore
@@ -108,12 +110,16 @@ internal final class ConnectionPool {
 
   /// A logger which always sets "GRPC" as its source.
   @usableFromInline
-  private(set) var logger: GRPCLogger
+  internal let logger: GRPCLogger
 
   /// Returns `NIODeadline` representing 'now'. This is useful for testing.
   @usableFromInline
   internal let now: () -> NIODeadline
 
+  /// The ID of this sub-pool.
+  @usableFromInline
+  internal let id: GRPCSubPoolID
+
   /// Logging metadata keys.
   @usableFromInline
   internal enum Metadata {
@@ -190,8 +196,14 @@ internal final class ConnectionPool {
     self.channelProvider = channelProvider
     self.streamLender = streamLender
     self.delegate = delegate
-    self.logger = logger
     self.now = now
+
+    let id = GRPCSubPoolID.next()
+    var logger = logger
+    logger[metadataKey: Metadata.id] = "\(id)"
+
+    self.id = id
+    self.logger = logger
   }
 
   /// Initialize the connection pool.
@@ -199,7 +211,6 @@ internal final class ConnectionPool {
   /// - Parameter connections: The number of connections to add to the pool.
   internal func initialize(connections: Int) {
     assert(self._connections.isEmpty)
-    self.logger.logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
     self.logger.debug(
       "initializing new sub-pool",
       metadata: [
@@ -628,6 +639,46 @@ internal final class ConnectionPool {
       promise.succeed(())
     }
   }
+
+  internal func stats() -> EventLoopFuture<GRPCSubPoolStats> {
+    let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self)
+
+    if self.eventLoop.inEventLoop {
+      self._stats(promise: promise)
+    } else {
+      self.eventLoop.execute {
+        self._stats(promise: promise)
+      }
+    }
+
+    return promise.futureResult
+  }
+
+  private func _stats(promise: EventLoopPromise<GRPCSubPoolStats>) {
+    self.eventLoop.assertInEventLoop()
+
+    var stats = GRPCSubPoolStats(id: self.id)
+
+    for connection in self._connections.values {
+      let sync = connection.manager.sync
+      if sync.isIdle {
+        stats.connectionStates.idle += 1
+      } else if sync.isConnecting {
+        stats.connectionStates.connecting += 1
+      } else if sync.isReady {
+        stats.connectionStates.ready += 1
+      } else if sync.isTransientFailure {
+        stats.connectionStates.transientFailure += 1
+      }
+
+      stats.streamsInUse += connection.reservedStreams
+      stats.streamsFreeToUse += connection.availableStreams
+    }
+
+    stats.rpcsWaiting += self.waiters.count
+
+    promise.succeed(stats)
+  }
 }
 
 extension ConnectionPool: ConnectionManagerConnectivityDelegate {

+ 59 - 0
Sources/GRPC/ConnectionPool/ConnectionPoolIDs.swift

@@ -0,0 +1,59 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Atomics
+
+enum RawID {
+  private static let source = ManagedAtomic(0)
+
+  static func next() -> Int {
+    self.source.loadThenWrappingIncrement(ordering: .relaxed)
+  }
+}
+
+/// The ID of a connection pool.
+public struct GRPCConnectionPoolID: Hashable, Sendable, CustomStringConvertible {
+  private var rawValue: Int
+
+  private init(rawValue: Int) {
+    self.rawValue = rawValue
+  }
+
+  public static func next() -> Self {
+    return Self(rawValue: RawID.next())
+  }
+
+  public var description: String {
+    "ConnectionPool(\(self.rawValue))"
+  }
+}
+
+/// The ID of a sub-pool in a connection pool.
+public struct GRPCSubPoolID: Hashable, Sendable, CustomStringConvertible {
+  private var rawValue: Int
+
+  private init(rawValue: Int) {
+    self.rawValue = rawValue
+  }
+
+  public static func next() -> Self {
+    return Self(rawValue: RawID.next())
+  }
+
+  public var description: String {
+    "SubPool(\(self.rawValue))"
+  }
+}

+ 68 - 1
Sources/GRPC/ConnectionPool/GRPCChannelPool.swift

@@ -179,6 +179,11 @@ extension GRPCChannelPool {
     /// pool.
     public var delegate: GRPCConnectionPoolDelegate?
 
+    /// The period at which connection pool stats are published to the ``delegate``.
+    ///
+    /// Ignored if either this value or ``delegate`` are `nil`.
+    public var statsPeriod: TimeAmount?
+
     /// A logger used for background activity, such as connection state changes.
     public var backgroundActivityLogger = Logger(
       label: "io.grpc",
@@ -354,7 +359,7 @@ public protocol GRPCConnectionPoolDelegate: Sendable {
   /// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The
   func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int)
 
-  /// The utlization of the connection changed; a stream may have been used, returned or the
+  /// The utilization of the connection changed; a stream may have been used, returned or the
   /// maximum number of concurrent streams available on the connection changed.
   func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int)
 
@@ -365,4 +370,66 @@ public protocol GRPCConnectionPoolDelegate: Sendable {
   /// The connection was closed. The connection may be established again in the future (notified
   /// via ``startedConnecting(id:)``).
   func connectionClosed(id: GRPCConnectionID, error: Error?)
+
+  /// Stats about the current state of the connection pool.
+  ///
+  /// Each ``GRPCConnectionPoolStats`` includes the stats for a sub-pool. Each sub-pool is tied
+  /// to an `EventLoop`.
+  ///
+  /// Unlike the other delegate methods, this is called periodically based on the value
+  /// of ``GRPCChannelPool/Configuration/statsPeriod``.
+  func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID)
+}
+
+extension GRPCConnectionPoolDelegate {
+  public func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) {
+    // Default conformance to avoid breaking changes.
+  }
+}
+
+public struct GRPCSubPoolStats: Sendable, Hashable {
+  public struct ConnectionStates: Sendable, Hashable {
+    /// The number of idle connections.
+    public var idle: Int
+    /// The number of connections trying to establish a connection.
+    public var connecting: Int
+    /// The number of connections which are ready to use.
+    public var ready: Int
+    /// The number of connections which are backing off waiting to attempt to connect.
+    public var transientFailure: Int
+
+    public init() {
+      self.idle = 0
+      self.connecting = 0
+      self.ready = 0
+      self.transientFailure = 0
+    }
+  }
+
+  /// The ID of the subpool.
+  public var id: GRPCSubPoolID
+
+  /// Counts of connection states.
+  public var connectionStates: ConnectionStates
+
+  /// The number of streams currently being used.
+  public var streamsInUse: Int
+
+  /// The number of streams which are currently free to use.
+  ///
+  /// The sum of this value and `streamsInUse` gives the capacity of the pool.
+  public var streamsFreeToUse: Int
+
+  /// The number of RPCs currently waiting for a stream.
+  ///
+  /// RPCs waiting for a stream are also known as 'waiters'.
+  public var rpcsWaiting: Int
+
+  public init(id: GRPCSubPoolID) {
+    self.id = id
+    self.connectionStates = ConnectionStates()
+    self.streamsInUse = 0
+    self.streamsFreeToUse = 0
+    self.rpcsWaiting = 0
+  }
 }

+ 41 - 5
Sources/GRPC/ConnectionPool/PoolManager.swift

@@ -63,6 +63,9 @@ internal final class PoolManager {
     @usableFromInline
     var delegate: GRPCConnectionPoolDelegate?
 
+    @usableFromInline
+    var statsPeriod: TimeAmount?
+
     @usableFromInline
     internal init(
       maxConnections: Int,
@@ -72,7 +75,8 @@ internal final class PoolManager {
       assumedMaxConcurrentStreams: Int = 100,
       connectionBackoff: ConnectionBackoff,
       channelProvider: DefaultChannelProvider,
-      delegate: GRPCConnectionPoolDelegate?
+      delegate: GRPCConnectionPoolDelegate?,
+      statsPeriod: TimeAmount?
     ) {
       self.maxConnections = maxConnections
       self.maxWaiters = maxWaiters
@@ -82,6 +86,7 @@ internal final class PoolManager {
       self.connectionBackoff = connectionBackoff
       self.channelProvider = channelProvider
       self.delegate = delegate
+      self.statsPeriod = statsPeriod
     }
   }
 
@@ -113,6 +118,9 @@ internal final class PoolManager {
   @usableFromInline
   internal let group: EventLoopGroup
 
+  @usableFromInline
+  internal let id: GRPCConnectionPoolID
+
   /// Make a new pool manager and initialize it.
   ///
   /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
@@ -140,6 +148,7 @@ internal final class PoolManager {
     self._state = PoolManagerStateMachine(.inactive)
     self._pools = []
     self.group = group
+    self.id = .next()
 
     // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
     // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
@@ -158,7 +167,7 @@ internal final class PoolManager {
     self.lock.withLock {
       assert(
         self._state.isShutdownOrShuttingDown,
-        "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope."
+        "The pool manager (\(self.id)) must be shutdown before going out of scope."
       )
     }
   }
@@ -175,7 +184,7 @@ internal final class PoolManager {
     logger: GRPCLogger
   ) {
     var logger = logger
-    logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
+    logger[metadataKey: Metadata.id] = "\(self.id)"
 
     let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
 
@@ -200,13 +209,27 @@ internal final class PoolManager {
       )
     }
 
+    let statsTask: RepeatedTask?
+    if let period = configuration.statsPeriod, let delegate = configuration.delegate {
+      let loop = self.group.next()
+      statsTask = loop.scheduleRepeatedTask(initialDelay: period, delay: period) { _ in
+        self.emitStats(delegate: delegate)
+      }
+    } else {
+      statsTask = nil
+    }
+
     self.lock.withLock {
       assert(self._pools.isEmpty)
       self._pools = pools
 
       // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
       // call `initialize` directly.
-      self._state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity)
+      self._state.activatePools(
+        keyedBy: poolKeys,
+        assumingPerPoolCapacity: assumedCapacity,
+        statsTask: statsTask
+      )
     }
 
     for pool in pools {
@@ -331,7 +354,8 @@ internal final class PoolManager {
       }
 
     switch (action, pools) {
-    case let (.shutdownPools, .some(pools)):
+    case let (.shutdownPools(statsTask), .some(pools)):
+      statsTask?.cancel(promise: nil)
       promise.futureResult.whenComplete { _ in self.shutdownComplete() }
       EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise)
 
@@ -353,6 +377,18 @@ internal final class PoolManager {
       self._state.shutdownComplete()
     }
   }
+
+  // MARK: - Stats
+
+  private func emitStats(delegate: GRPCConnectionPoolDelegate) {
+    let pools = self.lock.withLock { self._pools }
+    if pools.isEmpty { return }
+
+    let statsFutures = pools.map { $0.stats() }
+    EventLoopFuture.whenAllSucceed(statsFutures, on: self.group.any()).whenSuccess { stats in
+      delegate.connectionPoolStats(stats, id: self.id)
+    }
+  }
 }
 
 // MARK: - Connection Pool to Pool Manager

+ 17 - 6
Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift

@@ -40,10 +40,14 @@ internal struct PoolManagerStateMachine {
     @usableFromInline
     internal var pools: [EventLoopID: PerPoolState]
 
+    @usableFromInline
+    internal var statsTask: RepeatedTask?
+
     @usableFromInline
     internal init(
       poolKeys: [PoolManager.ConnectionPoolKey],
-      assumedMaxAvailableStreamsPerPool: Int
+      assumedMaxAvailableStreamsPerPool: Int,
+      statsTask: RepeatedTask?
     ) {
       self.pools = Dictionary(
         uniqueKeysWithValues: poolKeys.map { key in
@@ -54,6 +58,7 @@ internal struct PoolManagerStateMachine {
           return (key.eventLoopID, value)
         }
       )
+      self.statsTask = statsTask
     }
   }
 
@@ -92,12 +97,18 @@ internal struct PoolManagerStateMachine {
   @usableFromInline
   internal mutating func activatePools(
     keyedBy keys: [PoolManager.ConnectionPoolKey],
-    assumingPerPoolCapacity capacity: Int
+    assumingPerPoolCapacity capacity: Int,
+    statsTask: RepeatedTask?
   ) {
     self.modifyingState { state in
       switch state {
       case .inactive:
-        state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity))
+        let active = ActiveState(
+          poolKeys: keys,
+          assumedMaxAvailableStreamsPerPool: capacity,
+          statsTask: statsTask
+        )
+        state = .active(active)
 
       case .active, .shuttingDown, .shutdown, ._modifying:
         preconditionFailure()
@@ -180,7 +191,7 @@ internal struct PoolManagerStateMachine {
   }
 
   enum ShutdownAction {
-    case shutdownPools
+    case shutdownPools(RepeatedTask?)
     case alreadyShutdown
     case alreadyShuttingDown(EventLoopFuture<Void>)
   }
@@ -192,9 +203,9 @@ internal struct PoolManagerStateMachine {
         state = .shutdown
         return .alreadyShutdown
 
-      case .active:
+      case .active(let active):
         state = .shuttingDown(promise.futureResult)
-        return .shutdownPools
+        return .shutdownPools(active.statsTask)
 
       case let .shuttingDown(future):
         return .alreadyShuttingDown(future)

+ 2 - 1
Sources/GRPC/ConnectionPool/PooledChannel.swift

@@ -101,7 +101,8 @@ internal final class PooledChannel: GRPCChannel {
         assumedMaxConcurrentStreams: 100,
         connectionBackoff: configuration.connectionBackoff,
         channelProvider: provider,
-        delegate: configuration.delegate
+        delegate: configuration.delegate,
+        statsPeriod: configuration.statsPeriod
       ),
       logger: configuration.backgroundActivityLogger.wrapped
     )

+ 17 - 1
Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift

@@ -104,8 +104,9 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate {
     case connectionUtilizationChanged(GRPCConnectionID, Int, Int)
     case connectionQuiescing(GRPCConnectionID)
     case connectionRemoved(GRPCConnectionID)
+    case stats([GRPCSubPoolStats], GRPCConnectionPoolID)
 
-    var id: GRPCConnectionID {
+    var id: GRPCConnectionID? {
       switch self {
       case let .connectionAdded(id),
         let .startedConnecting(id),
@@ -116,6 +117,8 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate {
         let .connectionQuiescing(id),
         let .connectionRemoved(id):
         return id
+      case .stats:
+        return nil
       }
     }
   }
@@ -139,6 +142,13 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate {
     }
   }
 
+  func removeAll() -> CircularBuffer<Event> {
+    return self.lock.withLock {
+      defer { self.events.removeAll() }
+      return self.events
+    }
+  }
+
   func connectionAdded(id: GRPCConnectionID) {
     self.lock.withLock {
       self.events.append(.connectionAdded(id))
@@ -186,6 +196,12 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate {
       self.events.append(.connectionRemoved(id))
     }
   }
+
+  func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) {
+    self.lock.withLock {
+      self.events.append(.stats(stats, id))
+    }
+  }
 }
 
 extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {}

+ 1 - 2
Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift

@@ -1046,8 +1046,7 @@ final class ConnectionPoolTests: GRPCTestCase {
     // Two connections must be removed.
     for _ in 0 ..< 2 {
       if let event = recorder.popFirst() {
-        let id = event.id
-        XCTAssertEqual(event, .connectionRemoved(id))
+        XCTAssertEqual(event, event.id.map { .connectionRemoved($0) })
       } else {
         XCTFail("Expected .connectionRemoved")
       }

+ 29 - 0
Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift

@@ -589,5 +589,34 @@ final class GRPCChannelPoolTests: GRPCTestCase {
     }
     XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait())
   }
+
+  func testDelegateGetsCalledWithStats() throws {
+    let recorder = EventRecordingConnectionPoolDelegate()
+
+    self.configureEventLoopGroup(threads: 4)
+    self.startServer(withTLS: false)
+    self.startChannel(withTLS: false) {
+      $0.statsPeriod = .milliseconds(1)
+      $0.delegate = recorder
+    }
+
+    let scheduled = self.group.next().scheduleTask(in: .milliseconds(100)) {
+      _ = self.channel?.close()
+    }
+
+    try scheduled.futureResult.wait()
+
+    let events = recorder.removeAll()
+    let statsEvents = events.compactMap { event in
+      switch event {
+      case .stats(let stats, _):
+        return stats
+      default:
+        return nil
+      }
+    }
+
+    XCTAssertGreaterThan(statsEvents.count, 0)
+  }
 }
 #endif  // canImport(NIOSSL)

+ 5 - 5
Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift

@@ -79,7 +79,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
     let keys = self.makeConnectionPoolKeys(for: pools)
     var state = PoolManagerStateMachine(
-      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
     )
 
     for (index, loop) in group.loops.enumerated() {
@@ -99,7 +99,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
     let keys = self.makeConnectionPoolKeys(for: pools)
     var state = PoolManagerStateMachine(
-      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
     )
 
     let anotherLoop = EmbeddedEventLoop()
@@ -118,7 +118,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
     let keys = self.makeConnectionPoolKeys(for: pools)
     var state = PoolManagerStateMachine(.inactive)
-    state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100)
+    state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100, statsTask: nil)
 
     // Reserve some streams.
     for (index, loop) in group.loops.enumerated() {
@@ -177,7 +177,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
     let keys = self.makeConnectionPoolKeys(for: pools)
     var state = PoolManagerStateMachine(
-      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
     )
 
     let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil)
@@ -230,7 +230,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
     let keys = self.makeConnectionPoolKeys(for: pools)
     var state = PoolManagerStateMachine(
-      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
+      .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
     )
 
     let promise = group.loops[0].makePromise(of: Void.self)