Browse Source

Delay client async writer starting (#1531)

Motivation:

It's possible for async streaming clients to fail and drop messages in
some situations. The situation leading to this happens because streaming
calls set up state and then write out headers. While setting up state
the HTTP/2 stream channel is configured, when it becomes active gRPC
calls outs to enable the async writer to start emitting writes. This can
happen before the headers are written so if a write is already pending
then it can race the headers being written. If the message is written
first then the write promise is failed and the message is dropped.

Modifications:

Delay letting the async writer emit writes until the headers have been
written.

Result:

Correct ordering is enforced.
George Barnett 3 years ago
parent
commit
4312579fc8

+ 4 - 1
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -337,7 +337,6 @@ extension ClientTransport {
       self._pipeline?.logger = self.logger
       self.logger.debug("activated stream channel")
       self.channel = channel
-      self.onStart()
       self.unbuffer()
 
     case .close:
@@ -943,6 +942,10 @@ extension ClientTransport {
     case let .metadata(headers):
       let head = self.makeRequestHead(with: headers)
       channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
+      // Messages are buffered by this class and in the async writer for async calls. Initially the
+      // async writer is not allowed to emit messages; the call to 'onStart()' signals that messages
+      // may be emitted. We call it here to avoid races between writing headers and messages.
+      self.onStart()
 
     case let .message(request, metadata):
       do {

+ 39 - 0
Tests/GRPCTests/ClientCallTests.swift

@@ -206,4 +206,43 @@ class ClientCallTests: GRPCTestCase {
     // Cancellation should now fail, we've already cancelled.
     assertThat(try get.cancel().wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
   }
+
+  func testWriteMessageOnStart() throws {
+    // This test isn't deterministic so run a bunch of iterations.
+    for _ in 0 ..< 100 {
+      let call = self.update()
+      let promise = call.eventLoop.makePromise(of: Void.self)
+      let finished = call.eventLoop.makePromise(of: Void.self)
+
+      call.invokeStreamingRequests {
+        // Send in onStart.
+        call.send(
+          .message(.with { $0.text = "foo" }, .init(compress: false, flush: false)),
+          promise: promise
+        )
+      } onError: { _ in // ignore errors
+      } onResponsePart: {
+        switch $0 {
+        case .metadata, .message:
+          ()
+        case .end:
+          finished.succeed(())
+        }
+      }
+
+      // End the stream.
+      promise.futureResult.whenComplete { _ in
+        call.send(.end, promise: nil)
+      }
+
+      do {
+        try promise.futureResult.wait()
+        try finished.futureResult.wait()
+      } catch {
+        // Stop on the first error.
+        XCTFail("Unexpected error: \(error)")
+        return
+      }
+    }
+  }
 }