소스 검색

Better handle errors on client connections (#1908)

Motivation:

At the moment if an error is encountered on an active client connection
an error is thrown from the async channel. This results in the
`Connection` being closed and, without further information, the
subchannel is returned to the idle state. This can lead to some tests
being flaky.

Rather than throwing the error down the pipeline, we should store it and
close the connection so that the client connection handler can fire an
approriate close reason (with the error) down the pipeline before
becoming inactive. This allows the `Connection` to inform the
`Subchannel` whether it should attempt to reconnect or remain idle.

Modifications:

- Call close when receiving an error in `ClientConnectionHandler` rather
  than forwarding it
- Make `testConnectionDropWithOpenStreams` more reliable

Result:

More consistent behavior
George Barnett 1 년 전
부모
커밋
1ac484d73f

+ 4 - 1
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -173,8 +173,11 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
   }
 
   func errorCaught(context: ChannelHandlerContext, error: any Error) {
+    // Store the error and close, this will result in the final close event being fired down
+    // the pipeline with an appropriate close reason and appropriate error. (This avoids
+    // the async channel just throwing the error.)
     self.state.receivedError(error)
-    context.fireErrorCaught(error)
+    context.close(mode: .all, promise: nil)
   }
 
   func channelRead(context: ChannelHandlerContext, data: NIOAny) {

+ 18 - 10
Tests/GRPCHTTP2CoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift

@@ -404,6 +404,8 @@ final class SubchannelTests: XCTestCase {
       }
 
       var events = [Subchannel.Event]()
+      var readyCount = 0
+
       for await event in subchannel.events {
         events.append(event)
         switch event {
@@ -411,19 +413,24 @@ final class SubchannelTests: XCTestCase {
           subchannel.connect()
 
         case .connectivityStateChanged(.ready):
-          let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
-          try await stream.execute { inbound, outbound in
-            try await outbound.write(.metadata([:]))
-            // Stream is definitely open. Bork the connection.
-            server.clients.first?.close(mode: .all, promise: nil)
-            for try await _ in inbound {
-              ()
+          readyCount += 1
+          // When the connection becomes ready the first time, open a stream and forcibly close the
+          // channel. This will result in an automatic reconnect. Close the subchannel when that
+          // happens.
+          if readyCount == 1 {
+            let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
+            try await stream.execute { inbound, outbound in
+              try await outbound.write(.metadata([:]))
+              // Stream is definitely open. Bork the connection.
+              server.clients.first?.close(mode: .all, promise: nil)
+              for try await _ in inbound {
+                ()
+              }
             }
+          } else if readyCount == 2 {
+            subchannel.close()
           }
 
-        case .connectivityStateChanged(.transientFailure):
-          subchannel.close()
-
         case .connectivityStateChanged(.shutdown):
           group.cancelAll()
 
@@ -439,6 +446,7 @@ final class SubchannelTests: XCTestCase {
         .connectivityStateChanged(.transientFailure),
         .requiresNameResolution,
         .connectivityStateChanged(.connecting),
+        .connectivityStateChanged(.ready),
         .connectivityStateChanged(.shutdown),
       ]