Ver código fonte

Add a minimum connections configuration to the ConnectionPool (#1822)

Motivation:

Sometimes, depending on the characteristics of the traffic, we may need to execute requests in spikes.
It's possible that when we get a large spike of requests, a lot of connections are opened, which are subsequently closed by the idle timer. However, when another spike comes next, because we don't have any available open connections, latency will suffer as a result of us having to open new connections.

Modifications:

Add a ConnectionPool configuration to allow a minimum number of connections to remain open, that is, make a number of connections not go idle.

Result:

A number of connections will remain open even when there are no open streams on them.
Gustavo Cairo 1 ano atrás
pai
commit
ec5b39631f

+ 1 - 0
Sources/GRPC/ClientConnection.swift

@@ -131,6 +131,7 @@ public final class ClientConnection: Sendable {
     self.connectionManager = ConnectionManager(
       configuration: configuration,
       connectivityDelegate: monitor,
+      idleBehavior: .closeWhenIdleTimeout,
       logger: configuration.backgroundActivityLogger
     )
   }

+ 16 - 1
Sources/GRPC/ConnectionManager.swift

@@ -25,6 +25,14 @@ import NIOHTTP2
 // event loop is being used.
 @usableFromInline
 internal final class ConnectionManager: @unchecked Sendable {
+
+  /// Whether the connection managed by this manager should be allowed to go idle and be closed, or
+  /// if it should remain open indefinitely even when there are no active streams.
+  internal enum IdleBehavior {
+    case closeWhenIdleTimeout
+    case neverGoIdle
+  }
+
   internal enum Reconnect {
     case none
     case after(TimeInterval)
@@ -324,6 +332,9 @@ internal final class ConnectionManager: @unchecked Sendable {
   /// attempts should be made at all.
   private let connectionBackoff: ConnectionBackoff?
 
+  /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires).
+  internal let idleBehavior: IdleBehavior
+
   /// A logger.
   internal var logger: Logger
 
@@ -356,12 +367,14 @@ internal final class ConnectionManager: @unchecked Sendable {
     configuration: ClientConnection.Configuration,
     channelProvider: ConnectionManagerChannelProvider? = nil,
     connectivityDelegate: ConnectionManagerConnectivityDelegate?,
+    idleBehavior: IdleBehavior,
     logger: Logger
   ) {
     self.init(
       eventLoop: configuration.eventLoopGroup.next(),
       channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
       callStartBehavior: configuration.callStartBehavior.wrapped,
+      idleBehavior: idleBehavior,
       connectionBackoff: configuration.connectionBackoff,
       connectivityDelegate: connectivityDelegate,
       http2Delegate: nil,
@@ -373,6 +386,7 @@ internal final class ConnectionManager: @unchecked Sendable {
     eventLoop: EventLoop,
     channelProvider: ConnectionManagerChannelProvider,
     callStartBehavior: CallStartBehavior.Behavior,
+    idleBehavior: IdleBehavior,
     connectionBackoff: ConnectionBackoff?,
     connectivityDelegate: ConnectionManagerConnectivityDelegate?,
     http2Delegate: ConnectionManagerHTTP2Delegate?,
@@ -392,6 +406,7 @@ internal final class ConnectionManager: @unchecked Sendable {
     self.connectionBackoff = connectionBackoff
     self.connectivityDelegate = connectivityDelegate
     self.http2Delegate = http2Delegate
+    self.idleBehavior = idleBehavior
 
     self.connectionID = connectionID
     self.channelNumber = channelNumber
@@ -799,7 +814,7 @@ internal final class ConnectionManager: @unchecked Sendable {
       // Yes, after some time.
       case let .after(delay):
         let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
-        // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
+        // Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again.
         connecting.candidateMuxPromise.fail(error)
 
         let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {

+ 49 - 4
Sources/GRPC/ConnectionPool/ConnectionPool.swift

@@ -85,6 +85,11 @@ internal final class ConnectionPool {
   @usableFromInline
   internal let maxWaiters: Int
 
+  /// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
+  /// In other words, it's the number of connections for which we should ignore idle timers.
+  @usableFromInline
+  internal let minConnections: Int
+
   /// Configuration for backoff between subsequence connection attempts.
   @usableFromInline
   internal let connectionBackoff: ConnectionBackoff
@@ -157,6 +162,7 @@ internal final class ConnectionPool {
   init(
     eventLoop: EventLoop,
     maxWaiters: Int,
+    minConnections: Int,
     reservationLoadThreshold: Double,
     assumedMaxConcurrentStreams: Int,
     connectionBackoff: ConnectionBackoff,
@@ -176,6 +182,7 @@ internal final class ConnectionPool {
 
     self._connections = [:]
     self.maxWaiters = maxWaiters
+    self.minConnections = minConnections
     self.waiters = CircularBuffer(initialCapacity: 16)
 
     self.eventLoop = eventLoop
@@ -201,17 +208,25 @@ internal final class ConnectionPool {
       ]
     )
     self._connections.reserveCapacity(connections)
+    var numberOfKeepOpenConnections = self.minConnections
     while self._connections.count < connections {
-      self.addConnectionToPool()
+      // If we have less than the minimum number of connections, don't let
+      // the new connection close when idle.
+      let idleBehavior =
+        numberOfKeepOpenConnections > 0
+        ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
+      numberOfKeepOpenConnections -= 1
+      self.addConnectionToPool(idleBehavior: idleBehavior)
     }
   }
 
   /// Make and add a new connection to the pool.
-  private func addConnectionToPool() {
+  private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
     let manager = ConnectionManager(
       eventLoop: self.eventLoop,
       channelProvider: self.channelProvider,
       callStartBehavior: .waitsForConnectivity,
+      idleBehavior: idleBehavior,
       connectionBackoff: self.connectionBackoff,
       connectivityDelegate: self,
       http2Delegate: self,
@@ -220,6 +235,19 @@ internal final class ConnectionPool {
     let id = manager.id
     self._connections[id] = PerConnectionState(manager: manager)
     self.delegate?.connectionAdded(id: .init(id))
+
+    // If it's one of the connections that should be kept open, then connect
+    // straight away.
+    switch idleBehavior {
+    case .neverGoIdle:
+      self.eventLoop.execute {
+        if manager.sync.isIdle {
+          manager.sync.startConnecting()
+        }
+      }
+    case .closeWhenIdleTimeout:
+      ()
+    }
   }
 
   // MARK: - Called from the pool manager
@@ -689,8 +717,9 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate {
     // Grab the number of reserved streams (before invalidating the index by adding a connection).
     let reservedStreams = self._connections.values[index].reservedStreams
 
-    // Replace the connection with a new idle one.
-    self.addConnectionToPool()
+    // Replace the connection with a new idle one. Keep the idle behavior, so that
+    // if it's a connection that should be kept alive, we maintain it.
+    self.addConnectionToPool(idleBehavior: manager.idleBehavior)
 
     // Since we're removing this connection from the pool (and no new streams can be created on
     // the connection), the pool manager can ignore any streams reserved against this connection.
@@ -881,6 +910,22 @@ extension ConnectionPool {
       return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
     }
 
+    /// The number of active (i.e. connecting or ready) connections in the pool.
+    internal var activeConnections: Int {
+      self.pool.eventLoop.assertInEventLoop()
+      return self.pool._connections.values.reduce(0) {
+        $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
+      }
+    }
+
+    /// The number of connections in the pool in transient failure state.
+    internal var transientFailureConnections: Int {
+      self.pool.eventLoop.assertInEventLoop()
+      return self.pool._connections.values.reduce(0) {
+        $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
+      }
+    }
+
     /// The number of streams currently available to reserve across all connections in the pool.
     internal var availableStreams: Int {
       self.pool.eventLoop.assertInEventLoop()

+ 4 - 0
Sources/GRPC/ConnectionPool/GRPCChannelPool.swift

@@ -275,6 +275,10 @@ extension GRPCChannelPool.Configuration {
     /// Defaults to 100.
     public var maxWaitersPerEventLoop: Int = 100
 
+    /// The minimum number of connections to keep open in this pool, per EventLoop.
+    /// This number of connections per EventLoop will never go idle and be closed.
+    public var minConnectionsPerEventLoop: Int = 0
+
     /// The maximum amount of time a caller is willing to wait for a stream for before timing out.
     ///
     /// Defaults to 30 seconds.

+ 8 - 0
Sources/GRPC/ConnectionPool/PoolManager.swift

@@ -33,6 +33,11 @@ internal final class PoolManager {
     @usableFromInline
     var maxWaiters: Int
 
+    /// The minimum number of connections to keep open per pool.
+    /// This number of connections will never go idle and be closed.
+    @usableFromInline
+    var minConnections: Int
+
     /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
     /// (assuming there is a connection available to start).
     @usableFromInline
@@ -62,6 +67,7 @@ internal final class PoolManager {
     internal init(
       maxConnections: Int,
       maxWaiters: Int,
+      minConnections: Int,
       loadThreshold: Double,
       assumedMaxConcurrentStreams: Int = 100,
       connectionBackoff: ConnectionBackoff,
@@ -70,6 +76,7 @@ internal final class PoolManager {
     ) {
       self.maxConnections = maxConnections
       self.maxWaiters = maxWaiters
+      self.minConnections = minConnections
       self.loadThreshold = loadThreshold
       self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
       self.connectionBackoff = connectionBackoff
@@ -225,6 +232,7 @@ internal final class PoolManager {
       return ConnectionPool(
         eventLoop: eventLoop,
         maxWaiters: configuration.maxWaiters,
+        minConnections: configuration.minConnections,
         reservationLoadThreshold: configuration.loadThreshold,
         assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
         connectionBackoff: configuration.connectionBackoff,

+ 1 - 0
Sources/GRPC/ConnectionPool/PooledChannel.swift

@@ -96,6 +96,7 @@ internal final class PooledChannel: GRPCChannel {
       perPoolConfiguration: .init(
         maxConnections: configuration.connectionPool.connectionsPerEventLoop,
         maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
+        minConnections: configuration.connectionPool.minConnectionsPerEventLoop,
         loadThreshold: configuration.connectionPool.reservationLoadThreshold,
         assumedMaxConcurrentStreams: 100,
         connectionBackoff: configuration.connectionBackoff,

+ 11 - 5
Sources/GRPC/GRPCIdleHandler.swift

@@ -24,7 +24,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   typealias OutboundOut = HTTP2Frame
 
   /// The amount of time to wait before closing the channel when there are no active streams.
-  private let idleTimeout: TimeAmount
+  /// If nil, then we shouldn't schedule idle tasks.
+  private let idleTimeout: TimeAmount?
 
   /// The ping handler.
   private var pingHandler: PingHandler
@@ -78,7 +79,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     logger: Logger
   ) {
     self.mode = .client(connectionManager, multiplexer)
-    self.idleTimeout = idleTimeout
+    switch connectionManager.idleBehavior {
+    case .neverGoIdle:
+      self.idleTimeout = nil
+    case .closeWhenIdleTimeout:
+      self.idleTimeout = idleTimeout
+    }
     self.stateMachine = .init(role: .client, logger: logger)
     self.pingHandler = PingHandler(
       pingCode: 5,
@@ -135,7 +141,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     }
 
     // Handle idle timeout creation/cancellation.
-    if let idleTask = operations.idleTask {
+    if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask {
       switch idleTask {
       case let .cancel(task):
         self.stateMachine.logger.debug("idle timeout task cancelled")
@@ -145,9 +151,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
         if self.idleTimeout != .nanoseconds(.max), let context = self.context {
           self.stateMachine.logger.debug(
             "scheduling idle timeout task",
-            metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"]
+            metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"]
           )
-          let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
+          let task = context.eventLoop.scheduleTask(in: idleTimeout) {
             self.stateMachine.logger.debug("idle timeout task fired")
             self.idleTimeoutFired()
           }

+ 4 - 0
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -59,6 +59,7 @@ class ConnectionManagerTests: GRPCTestCase {
       configuration: configuration,
       channelProvider: channelProvider.map { HookedChannelProvider($0) },
       connectivityDelegate: self.monitor,
+      idleBehavior: .closeWhenIdleTimeout,
       logger: self.logger
     )
   }
@@ -948,6 +949,7 @@ extension ConnectionManagerTests {
         return loop.makeFailedFuture(DoomedChannelError())
       },
       connectivityDelegate: nil,
+      idleBehavior: .closeWhenIdleTimeout,
       logger: self.logger
     )
     let candidate = manager.getHTTP2Multiplexer()
@@ -1207,6 +1209,7 @@ extension ConnectionManagerTests {
         return eventLoop.makeSucceededFuture(channel)
       },
       callStartBehavior: .waitsForConnectivity,
+      idleBehavior: .closeWhenIdleTimeout,
       connectionBackoff: ConnectionBackoff(),
       connectivityDelegate: nil,
       http2Delegate: http2,
@@ -1383,6 +1386,7 @@ extension ConnectionManagerTests {
       configuration: configuration,
       channelProvider: Provider(),
       connectivityDelegate: self.monitor,
+      idleBehavior: .closeWhenIdleTimeout,
       logger: self.logger
     )
 

+ 85 - 1
Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift

@@ -53,6 +53,8 @@ final class ConnectionPoolTests: GRPCTestCase {
   private func makePool(
     waiters: Int = 1000,
     reservationLoadThreshold: Double = 0.9,
+    minConnections: Int = 0,
+    assumedMaxConcurrentStreams: Int = 100,
     now: @escaping () -> NIODeadline = { .now() },
     connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
     delegate: GRPCConnectionPoolDelegate? = nil,
@@ -63,8 +65,9 @@ final class ConnectionPoolTests: GRPCTestCase {
     return ConnectionPool(
       eventLoop: self.eventLoop,
       maxWaiters: waiters,
+      minConnections: minConnections,
       reservationLoadThreshold: reservationLoadThreshold,
-      assumedMaxConcurrentStreams: 100,
+      assumedMaxConcurrentStreams: assumedMaxConcurrentStreams,
       connectionBackoff: connectionBackoff,
       channelProvider: channelProvider,
       streamLender: HookedStreamLender(
@@ -1069,6 +1072,87 @@ final class ConnectionPoolTests: GRPCTestCase {
     XCTAssertEqual(error.code, .deadlineExceeded)
     XCTAssertNotEqual(error.code, .shutdown)
   }
+
+  func testMinimumConnectionsAreOpenRightAfterInitializing() {
+    let controller = ChannelController()
+    let pool = self.makePool(minConnections: 5, channelProvider: controller)
+
+    pool.initialize(connections: 20)
+    self.eventLoop.run()
+
+    XCTAssertEqual(pool.sync.connections, 20)
+    XCTAssertEqual(pool.sync.idleConnections, 15)
+    XCTAssertEqual(pool.sync.activeConnections, 5)
+    XCTAssertEqual(pool.sync.waiters, 0)
+    XCTAssertEqual(pool.sync.availableStreams, 0)
+    XCTAssertEqual(pool.sync.reservedStreams, 0)
+    XCTAssertEqual(pool.sync.transientFailureConnections, 0)
+  }
+
+  func testMinimumConnectionsAreOpenAfterOneIsQuiesced() {
+    let controller = ChannelController()
+    let pool = self.makePool(
+      minConnections: 1,
+      assumedMaxConcurrentStreams: 1,
+      channelProvider: controller
+    )
+
+    // Initialize two connections, and make sure that only one of them is active,
+    // since we have set minConnections to 1.
+    pool.initialize(connections: 2)
+    self.eventLoop.run()
+    XCTAssertEqual(pool.sync.connections, 2)
+    XCTAssertEqual(pool.sync.idleConnections, 1)
+    XCTAssertEqual(pool.sync.activeConnections, 1)
+    XCTAssertEqual(pool.sync.transientFailureConnections, 0)
+
+    // Open two streams, which, because the maxConcurrentStreams is 1, will
+    // create two channels.
+    let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
+      $0.eventLoop.makeSucceededVoidFuture()
+    }
+    let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
+      $0.eventLoop.makeSucceededVoidFuture()
+    }
+
+    // Start creating the channels.
+    self.eventLoop.run()
+
+    // Make both connections ready.
+    controller.connectChannel(atIndex: 0)
+    controller.sendSettingsToChannel(atIndex: 0)
+    controller.connectChannel(atIndex: 1)
+    controller.sendSettingsToChannel(atIndex: 1)
+
+    // Run the loop to create the streams/connections.
+    self.eventLoop.run()
+    XCTAssertNoThrow(try w1.wait())
+    controller.openStreamInChannel(atIndex: 0)
+    XCTAssertNoThrow(try w2.wait())
+    controller.openStreamInChannel(atIndex: 1)
+
+    XCTAssertEqual(pool.sync.connections, 2)
+    XCTAssertEqual(pool.sync.idleConnections, 0)
+    XCTAssertEqual(pool.sync.activeConnections, 2)
+    XCTAssertEqual(pool.sync.transientFailureConnections, 0)
+
+    // Quiesce the connection that should be kept alive.
+    // Another connection should be brought back up immediately after, to maintain
+    // the minimum number of active connections that won't go idle.
+    controller.sendGoAwayToChannel(atIndex: 0)
+    XCTAssertEqual(pool.sync.connections, 3)
+    XCTAssertEqual(pool.sync.idleConnections, 1)
+    XCTAssertEqual(pool.sync.activeConnections, 2)
+    XCTAssertEqual(pool.sync.transientFailureConnections, 0)
+
+    // Now quiesce the other one. This will add a new idle connection, but it
+    // won't connect it right away.
+    controller.sendGoAwayToChannel(atIndex: 1)
+    XCTAssertEqual(pool.sync.connections, 4)
+    XCTAssertEqual(pool.sync.idleConnections, 2)
+    XCTAssertEqual(pool.sync.activeConnections, 2)
+    XCTAssertEqual(pool.sync.transientFailureConnections, 0)
+  }
 }
 
 extension ConnectionPool {

+ 1 - 0
Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift

@@ -33,6 +33,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     return ConnectionPool(
       eventLoop: eventLoop,
       maxWaiters: maxWaiters,
+      minConnections: 0,
       reservationLoadThreshold: loadThreshold,
       assumedMaxConcurrentStreams: maxConcurrentStreams,
       connectionBackoff: connectionBackoff,