瀏覽代碼

Reconnect when making streams and the LB is idle (#41)

Motivation:

When a LB becomes idle and a stream is created it's enqueued but doesn't
cause the LB to start reconnecting again.

Modifications:

- Trigger a reconnect when enqueuing a waiter and the LB is idle

Result:

- Resolves #40
George Barnett 1 年之前
父節點
當前提交
cad03dbd30

+ 11 - 5
Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

@@ -337,10 +337,15 @@ extension GRPCChannel {
           return
         }
 
-        let enqueued = self.state.withLock { state in
+        let (enqueued, loadBalancer) = self.state.withLock { state in
           state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
         }
 
+        if let loadBalancer = loadBalancer {
+          // Attempting to pick a subchannel will trigger a connect event if the subchannel is idle.
+          _ = loadBalancer.pickSubchannel()
+        }
+
         // Not enqueued because the channel is shutdown or shutting down.
         if !enqueued {
           let error = RPCError(code: .unavailable, message: "channel is shutdown")
@@ -896,20 +901,21 @@ extension GRPCChannel.StateMachine {
     continuation: CheckedContinuation<LoadBalancer, any Error>,
     waitForReady: Bool,
     id: QueueEntryID
-  ) -> Bool {
+  ) -> (enqueued: Bool, loadBalancer: LoadBalancer?) {
     switch self.state {
     case .notRunning(var state):
       self.state = ._modifying
       state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
       self.state = .notRunning(state)
-      return true
+      return (true, nil)
     case .running(var state):
       self.state = ._modifying
       state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
       self.state = .running(state)
-      return true
+      // If idle then return the current load balancer so that it can be told to start connecting.
+      return (true, state.connectivityState == .idle ? state.current : nil)
     case .stopping, .stopped:
-      return false
+      return (false, nil)
     case ._modifying:
       fatalError("Invalid state")
     }

+ 54 - 0
Tests/GRPCNIOTransportCoreTests/Client/Connection/GRPCChannelTests.swift

@@ -795,6 +795,60 @@ final class GRPCChannelTests: XCTestCase {
       group.cancelAll()
     }
   }
+
+  func testMakeStreamAfterIdleTimeout() async throws {
+    let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
+    let address = try await server.bind()
+
+    // Configure a low idle time.
+    let channel = GRPCChannel(
+      resolver: .static(endpoints: [Endpoint(address)]),
+      connector: .posix(maxIdleTime: .milliseconds(50)),
+      config: .defaults,
+      defaultServiceConfig: ServiceConfig()
+    )
+
+    try await withThrowingDiscardingTaskGroup { group in
+      group.addTask {
+        // Just respond with 'ok'.
+        try await server.run { inbound, outbound in
+          let status = Status(code: .ok, message: "")
+          try await outbound.write(.status(status, [:]))
+          outbound.finish()
+        }
+      }
+
+      group.addTask {
+        await channel.connect()
+      }
+
+      func doAnRPC() async throws {
+        try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
+          try await stream.outbound.write(.metadata([:]))
+          await stream.outbound.finish()
+
+          let response = try await stream.inbound.reduce(into: []) { $0.append($1) }
+          switch response.first {
+          case .status(let status, _):
+            XCTAssertEqual(status.code, .ok)
+          default:
+            XCTFail("Expected status")
+          }
+        }
+      }
+
+      // Do an RPC.
+      try await doAnRPC()
+
+      // Wait for the idle time to pass.
+      try await Task.sleep(for: .milliseconds(100))
+
+      // Do another RPC.
+      try await doAnRPC()
+
+      group.cancelAll()
+    }
+  }
 }
 
 extension GRPCChannel.Config {