Selaa lähdekoodia

Fix continuation misuse in wrapped channel (#136)

Motivation:

The wrapped channel had a code path which failed pending stream
continuations...except that it didn't which in some cases led to
continuations being leaked.

I also noticed a tiny timing window where it was possible for the
regular clients to leak a continuation.

Modifications:

- Actually resume the continuations
- Fix tiny timing window

Result:

Fewer leaking continuations
George Barnett 1 kuukausi sitten
vanhempi
commit
407dee8336

+ 5 - 5
Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

@@ -337,11 +337,6 @@ extension GRPCChannel {
     let id = QueueEntryID()
     return try await withTaskCancellationHandler {
       try await withCheckedThrowingContinuation { continuation in
-        if Task.isCancelled {
-          continuation.resume(throwing: CancellationError())
-          return
-        }
-
         // Explicitly adding the types works around: https://github.com/swiftlang/swift/issues/78112
         let (enqueued, loadBalancer) = self.state.withLock { state -> (Bool, LoadBalancer?) in
           state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
@@ -356,6 +351,11 @@ extension GRPCChannel {
         if !enqueued {
           let error = RPCError(code: .unavailable, message: "channel is shutdown")
           continuation.resume(throwing: error)
+        } else if Task.isCancelled {
+          let dequeued = self.state.withLock { state in
+            state.dequeueContinuation(id: id)
+          }
+          dequeued?.resume(throwing: CancellationError())
         }
       }
     } onCancel: {

+ 13 - 8
Sources/GRPCNIOTransportCore/Client/WrappedChannel/WrappedChannel+State.swift

@@ -219,26 +219,31 @@ extension HTTP2ClientTransport.WrappedChannel.State {
 
   enum ConnectionClosedAction {
     case none
-    case failQueuedStreams
+    case failQueuedStreams(
+      [CheckedContinuation<NIOHTTP2Handler.AsyncStreamMultiplexer<Void>, any Error>]
+    )
   }
 
   mutating func connectionClosed() -> ConnectionClosedAction {
     switch consume self {
-    case .idle:
+    case .idle(var state):
       self = .shutDown
-      return .none
+      let continuations = state.queue.removeAll()
+      return .failQueuedStreams(continuations)
 
-    case .configuring:
+    case .configuring(var state):
       self = .shutDown
-      return .failQueuedStreams
+      let continuations = state.queue.removeAll()
+      return .failQueuedStreams(continuations)
 
-    case .configured:
+    case .configured(var state):
       self = .shutDown
-      return .failQueuedStreams
+      let continuations = state.queue.removeAll()
+      return .failQueuedStreams(continuations)
 
     case .ready:
       self = .shutDown
-      return .failQueuedStreams
+      return .none
 
     case .shuttingDown:
       self = .shuttingDown

+ 15 - 3
Sources/GRPCNIOTransportCore/Client/WrappedChannel/WrappedChannel.swift

@@ -133,8 +133,12 @@ extension HTTP2ClientTransport {
           switch self.state.withLock({ $0.connectionClosed() }) {
           case .none:
             ()
-          case .failQueuedStreams:
-            ()
+          case .failQueuedStreams(let continuations):
+            for continuation in continuations {
+              continuation.resume(
+                throwing: RPCError(code: .unavailable, message: "The channel was closed")
+              )
+            }
           }
 
         case .shutDown:
@@ -194,7 +198,15 @@ extension HTTP2ClientTransport {
             case .resume(.failure(let error)):
               continuation.resume(throwing: error)
             case .none:
-              ()
+              if Task.isCancelled {
+                let action = self.state.withLock { $0.dequeue(id: id) }
+                switch action {
+                case .dequeued(let continuation):
+                  continuation.resume(throwing: CancellationError())
+                case .none:
+                  ()
+                }
+              }
             }
           }
         } onCancel: {