2
0
Эх сурвалжийг харах

Provide an error when cancelling async writer (#1456)

Motivation:

The `AsyncWriter` fails any pending writes with a `CancellationError` if
the writer is cancelled.

This can be misleading if the writer is cancelled because a connection
could not be established, for example.

Modifications:

- Pass an error through `AsyncWriter.cancel`

Result:

Pending writes which are failed because the writer has been cancelled
have more relevant errors.
George Barnett 3 жил өмнө
parent
commit
babcff1f9d

+ 7 - 9
Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

@@ -198,9 +198,9 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
 
   /// As ``cancel()`` but executed asynchronously.
   @usableFromInline
-  internal nonisolated func cancelAsynchronously() {
+  internal nonisolated func cancelAsynchronously(withError error: Error) {
     Task {
-      await self.cancel()
+      await self.cancel(withError: error)
     }
   }
 
@@ -209,7 +209,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
   /// Any pending writes will be dropped and their continuations will be resumed with
   /// a `CancellationError`. Any writes after cancellation has completed will also fail.
   @usableFromInline
-  internal func cancel() {
+  internal func cancel(withError error: Error) {
     // If there's an end we should fail that last.
     let pendingEnd: PendingEnd?
 
@@ -228,13 +228,11 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
       pendingEnd = nil
     }
 
-    let cancellationError = CancellationError()
-
     while let pending = self._pendingElements.popFirst() {
-      pending.continuation.resume(throwing: cancellationError)
+      pending.continuation.resume(throwing: error)
     }
 
-    pendingEnd?.continuation.resume(throwing: cancellationError)
+    pendingEnd?.continuation.resume(throwing: error)
   }
 
   /// Write an `element`.
@@ -263,7 +261,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
         throw GRPCAsyncWriterError.tooManyPendingWrites
       }
     } onCancel: {
-      self.cancelAsynchronously()
+      self.cancelAsynchronously(withError: CancellationError())
     }
   }
 
@@ -283,7 +281,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
         }
       }
     } onCancel: {
-      self.cancelAsynchronously()
+      self.cancelAsynchronously(withError: CancellationError())
     }
   }
 }

+ 4 - 4
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

@@ -92,7 +92,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
       onError: { error in
         asyncCall.responseParts.handleError(error)
         asyncCall.responseSource.finish(throwing: error)
-        asyncCall.requestStream.asyncWriter.cancelAsynchronously()
+        asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
       },
       onResponsePart: AsyncCall.makeResponsePartHandler(
         responseParts: asyncCall.responseParts,
@@ -133,7 +133,7 @@ internal enum AsyncCall {
           responseSource.finish(throwing: status)
         }
 
-        requestStream?.asyncWriter.cancelAsynchronously()
+        requestStream?.asyncWriter.cancelAsynchronously(withError: status)
       }
     }
   }
@@ -152,8 +152,8 @@ internal enum AsyncCall {
       switch responsePart {
       case .metadata, .message:
         ()
-      case .end:
-        requestStream?.asyncWriter.cancelAsynchronously()
+      case let .end(status, _):
+        requestStream?.asyncWriter.cancelAsynchronously(withError: status)
       }
     }
   }

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

@@ -90,7 +90,7 @@ public struct GRPCAsyncClientStreamingCall<Request: Sendable, Response: Sendable
       },
       onError: { error in
         asyncCall.responseParts.handleError(error)
-        asyncCall.requestStream.asyncWriter.cancelAsynchronously()
+        asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
       },
       onResponsePart: AsyncCall.makeResponsePartHandler(
         responseParts: asyncCall.responseParts,

+ 2 - 2
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

@@ -487,7 +487,7 @@ internal final class AsyncServerHandler<
       try await responseStreamWriter.finish(.ok)
     } catch {
       // Drop pending writes as we're on the error path.
-      await responseStreamWriter.cancel()
+      await responseStreamWriter.cancel(withError: error)
 
       if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
         throw GRPCStatus(code: .unknown, message: "Handler threw error with status code 'ok'.")
@@ -831,7 +831,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
     // written. This should reduce how long the user handler runs for as it can no longer do
     // anything useful.
     self.requestSource.finish(throwing: CancellationError())
-    self.responseWriter.cancelAsynchronously()
+    self.responseWriter.cancelAsynchronously(withError: CancellationError())
     self.task.cancel()
   }
 }

+ 22 - 1
Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift

@@ -64,7 +64,10 @@ final class AsyncClientCancellationTests: GRPCTestCase {
     return try self.makeClient(port: self.server.channel.localAddress!.port!)
   }
 
-  private func makeClient(port: Int) throws -> Echo_EchoAsyncClient {
+  private func makeClient(
+    port: Int,
+    configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
+  ) throws -> Echo_EchoAsyncClient {
     precondition(self.pool == nil)
 
     self.pool = try GRPCChannelPool.with(
@@ -73,6 +76,7 @@ final class AsyncClientCancellationTests: GRPCTestCase {
       eventLoopGroup: self.group
     ) {
       $0.backgroundActivityLogger = self.clientLogger
+      configure(&$0)
     }
 
     return Echo_EchoAsyncClient(channel: self.pool)
@@ -394,6 +398,23 @@ final class AsyncClientCancellationTests: GRPCTestCase {
       return .bidirectionalStreaming(echo.makeUpdateCall())
     }
   }
+
+  func testConnectionFailureCancelsRequestStreamWithError() async throws {
+    let echo = try self.makeClient(port: 0) {
+      // Configure a short wait time; we will not start a server so fail quickly.
+      $0.connectionPool.maxWaitTime = .milliseconds(10)
+    }
+
+    let update = echo.makeUpdateCall()
+    await XCTAssertThrowsError(try await update.requestStream.send(.init())) { error in
+      XCTAssertFalse(error is CancellationError)
+    }
+
+    let collect = echo.makeCollectCall()
+    await XCTAssertThrowsError(try await collect.requestStream.send(.init())) { error in
+      XCTAssertFalse(error is CancellationError)
+    }
+  }
 }
 
 #endif // compiler(>=5.6)

+ 4 - 4
Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift

@@ -170,7 +170,7 @@ internal class AsyncWriterTests: GRPCTestCase {
 
     async let pendingWrite: Void = writer.write("foo")
 
-    await writer.cancel()
+    await writer.cancel(withError: CancellationError())
 
     do {
       try await pendingWrite
@@ -202,7 +202,7 @@ internal class AsyncWriterTests: GRPCTestCase {
 
     async let pendingWrite: Void = writer.finish(42)
 
-    await writer.cancel()
+    await writer.cancel(withError: CancellationError())
 
     do {
       try await pendingWrite
@@ -229,13 +229,13 @@ internal class AsyncWriterTests: GRPCTestCase {
     let delegate = CollectingDelegate<String, Int>()
     let writer = AsyncWriter(delegate: delegate)
 
-    await writer.cancel()
+    await writer.cancel(withError: CancellationError())
     await XCTAssertThrowsError(try await writer.write("1")) { error in
       XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
     }
 
     // Fine, no need to throw. Nothing should change.
-    await writer.cancel()
+    await writer.cancel(withError: CancellationError())
     await XCTAssertThrowsError(try await writer.write("2")) { error in
       XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
     }