Przeglądaj źródła

Wire up connection backoff of channel pool (#1289)

Motivation:

`GRPCChannelPool.Configuration` has an entry controlling connection
backoff configuration. However this was never wired up to the underlying
connection pool and connections which just use the defaults.

Modifications:

- Propagate connection backoff through the API
- Add a test

Result:

Connection backoff is wired up as expected for `GRPCChannelPool`.
George Barnett 4 lat temu
rodzic
commit
a0f38ff379

+ 7 - 1
Sources/GRPC/ConnectionPool/ConnectionPool.swift

@@ -70,6 +70,10 @@ internal final class ConnectionPool {
   @usableFromInline
   internal let maxWaiters: Int
 
+  /// Configuration for backoff between subsequence connection attempts.
+  @usableFromInline
+  internal let connectionBackoff: ConnectionBackoff
+
   /// Provides a channel factory to the `ConnectionManager`.
   @usableFromInline
   internal let channelProvider: ConnectionManagerChannelProvider
@@ -125,6 +129,7 @@ internal final class ConnectionPool {
     maxWaiters: Int,
     reservationLoadThreshold: Double,
     assumedMaxConcurrentStreams: Int,
+    connectionBackoff: ConnectionBackoff,
     channelProvider: ConnectionManagerChannelProvider,
     streamLender: StreamLender,
     logger: GRPCLogger,
@@ -142,6 +147,7 @@ internal final class ConnectionPool {
     self.waiters = CircularBuffer(initialCapacity: 16)
 
     self.eventLoop = eventLoop
+    self.connectionBackoff = connectionBackoff
     self.channelProvider = channelProvider
     self.streamLender = streamLender
     self.logger = logger
@@ -165,7 +171,7 @@ internal final class ConnectionPool {
       eventLoop: self.eventLoop,
       channelProvider: self.channelProvider,
       callStartBehavior: .waitsForConnectivity,
-      connectionBackoff: ConnectionBackoff(),
+      connectionBackoff: self.connectionBackoff,
       connectivityDelegate: self,
       http2Delegate: self,
       logger: self.logger.unwrapped

+ 6 - 0
Sources/GRPC/ConnectionPool/PoolManager.swift

@@ -45,6 +45,9 @@ internal final class PoolManager {
       return self.maxConnections * self.assumedMaxConcurrentStreams
     }
 
+    @usableFromInline
+    var connectionBackoff: ConnectionBackoff
+
     /// A `Channel` provider.
     @usableFromInline
     var channelProvider: DefaultChannelProvider
@@ -55,12 +58,14 @@ internal final class PoolManager {
       maxWaiters: Int,
       loadThreshold: Double,
       assumedMaxConcurrentStreams: Int = 100,
+      connectionBackoff: ConnectionBackoff,
       channelProvider: DefaultChannelProvider
     ) {
       self.maxConnections = maxConnections
       self.maxWaiters = maxWaiters
       self.loadThreshold = loadThreshold
       self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
+      self.connectionBackoff = connectionBackoff
       self.channelProvider = channelProvider
     }
   }
@@ -211,6 +216,7 @@ internal final class PoolManager {
         maxWaiters: configuration.maxWaiters,
         reservationLoadThreshold: configuration.loadThreshold,
         assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
+        connectionBackoff: configuration.connectionBackoff,
         channelProvider: configuration.channelProvider,
         streamLender: self,
         logger: logger

+ 1 - 0
Sources/GRPC/ConnectionPool/PooledChannel.swift

@@ -72,6 +72,7 @@ internal final class PooledChannel: GRPCChannel {
         maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
         loadThreshold: configuration.connectionPool.reservationLoadThreshold,
         assumedMaxConcurrentStreams: 100,
+        connectionBackoff: configuration.connectionBackoff,
         channelProvider: provider
       ),
       logger: configuration.backgroundActivityLogger.wrapped

+ 69 - 0
Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift

@@ -52,6 +52,7 @@ final class ConnectionPoolTests: GRPCTestCase {
     waiters: Int = 1000,
     reservationLoadThreshold: Double = 0.9,
     now: @escaping () -> NIODeadline = { .now() },
+    connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
     onReservationReturned: @escaping (Int) -> Void = { _ in },
     onMaximumReservationsChange: @escaping (Int) -> Void = { _ in },
     channelProvider: ConnectionManagerChannelProvider
@@ -61,6 +62,7 @@ final class ConnectionPoolTests: GRPCTestCase {
       maxWaiters: waiters,
       reservationLoadThreshold: reservationLoadThreshold,
       assumedMaxConcurrentStreams: 100,
+      connectionBackoff: connectionBackoff,
       channelProvider: channelProvider,
       streamLender: HookedStreamLender(
         onReturnStreams: onReservationReturned,
@@ -85,6 +87,7 @@ final class ConnectionPoolTests: GRPCTestCase {
     waiters: Int = 1000,
     reservationLoadThreshold: Double = 0.9,
     now: @escaping () -> NIODeadline = { .now() },
+    connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
     onReservationReturned: @escaping (Int) -> Void = { _ in },
     onMaximumReservationsChange: @escaping (Int) -> Void = { _ in }
   ) -> (ConnectionPool, ChannelController) {
@@ -93,6 +96,7 @@ final class ConnectionPoolTests: GRPCTestCase {
       waiters: waiters,
       reservationLoadThreshold: reservationLoadThreshold,
       now: now,
+      connectionBackoff: connectionBackoff,
       onReservationReturned: onReservationReturned,
       onMaximumReservationsChange: onMaximumReservationsChange,
       channelProvider: controller
@@ -631,6 +635,71 @@ final class ConnectionPoolTests: GRPCTestCase {
     XCTAssertEqual(pool.sync.reservedStreams, 0)
     XCTAssertEqual(pool.sync.availableStreams, 100)
   }
+
+  func testBackoffIsUsedForReconnections() {
+    // Fix backoff to always be 1 second.
+    let backoff = ConnectionBackoff(
+      initialBackoff: 1.0,
+      maximumBackoff: 1.0,
+      multiplier: 1.0,
+      jitter: 0.0
+    )
+
+    let (pool, controller) = self.setUpPoolAndController(connectionBackoff: backoff)
+    pool.initialize(connections: 1)
+    XCTAssertEqual(pool.sync.connections, 1)
+
+    let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
+      $0.eventLoop.makeSucceededVoidFuture()
+    }
+    // Start creating the channel.
+    self.eventLoop.run()
+
+    // Make the connection 'ready'.
+    controller.connectChannel(atIndex: 0)
+    controller.sendSettingsToChannel(atIndex: 0)
+    self.eventLoop.run()
+    XCTAssertNoThrow(try w1.wait())
+    controller.openStreamInChannel(atIndex: 0)
+
+    // Close the connection. It should hit the transient failure state.
+    controller.fireChannelInactiveForChannel(atIndex: 0)
+    // Now nothing is available in the pool.
+    XCTAssertEqual(pool.sync.waiters, 0)
+    XCTAssertEqual(pool.sync.availableStreams, 0)
+    XCTAssertEqual(pool.sync.reservedStreams, 0)
+    XCTAssertEqual(pool.sync.idleConnections, 0)
+
+    // Enqueue two waiters. One to time out before the reconnect happens.
+    let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
+      $0.eventLoop.makeSucceededVoidFuture()
+    }
+
+    let w3 = pool.makeStream(
+      deadline: .uptimeNanoseconds(UInt64(TimeAmount.milliseconds(500).nanoseconds)),
+      logger: self.logger.wrapped
+    ) {
+      $0.eventLoop.makeSucceededVoidFuture()
+    }
+
+    XCTAssertEqual(pool.sync.waiters, 2)
+
+    // Time out w3.
+    self.eventLoop.advanceTime(by: .milliseconds(500))
+    XCTAssertThrowsError(try w3.wait())
+    XCTAssertEqual(pool.sync.waiters, 1)
+
+    // Wait a little more for the backoff to pass. The controller should now have a second channel.
+    self.eventLoop.advanceTime(by: .milliseconds(500))
+    XCTAssertEqual(controller.count, 2)
+
+    // Start up the next channel.
+    controller.connectChannel(atIndex: 1)
+    controller.sendSettingsToChannel(atIndex: 1)
+    self.eventLoop.run()
+    XCTAssertNoThrow(try w2.wait())
+    controller.openStreamInChannel(atIndex: 1)
+  }
 }
 
 // MARK: - Helpers

+ 2 - 0
Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift

@@ -25,6 +25,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
     maxWaiters: Int = 100,
     maxConcurrentStreams: Int = 100,
     loadThreshold: Double = 0.9,
+    connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
     makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
   ) -> ConnectionPool {
     return ConnectionPool(
@@ -32,6 +33,7 @@ class PoolManagerStateMachineTests: GRPCTestCase {
       maxWaiters: maxWaiters,
       reservationLoadThreshold: loadThreshold,
       assumedMaxConcurrentStreams: maxConcurrentStreams,
+      connectionBackoff: connectionBackoff,
       channelProvider: HookedChannelProvider(makeChannel),
       streamLender: HookedStreamLender(
         onReturnStreams: { _ in },