Browse Source

Wait for status to be written before closing (#103)

Motivation:

It's possible for the server to drop writes if the HTTP/2 stream's
outbound flow control buffer is full. The main reason for this is that
NIO's async channel has an 'executeThenClose' function within which we
execute the service provided handler for the RPC, after that returns
NIO's async channel closes the stream by doing a 'close(mode: .all)'.

If writes have been buffered in the HTTP/2 handler then the close will
cause the writes to be dropped.

Modifications:

- Wait for the HTTP/2 stream to close as a result of the status being
written in 'executeThenClose' rather than as a result of 'close'.

Result:

- Resolves #96
George Barnett 8 months ago
parent
commit
4c96199043

+ 5 - 0
Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift

@@ -300,6 +300,11 @@ package final class CommonHTTP2ServerTransport<
         }
         await streamHandler(rpcStream, context)
       }
+
+      // Wait for the stream to close (i.e. when the final status has been written or an error
+      // occurs.) This is done to avoid closing too early as 'executeThenClose' may forcefully
+      // close the stream and drop buffered writes.
+      try await stream.channel.closeFuture.get()
     }
   }
 

+ 42 - 5
Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift

@@ -145,6 +145,7 @@ final class HTTP2TransportTests: XCTestCase {
             transportSecurity: .plaintext,
             config: .defaults {
               $0.compression.enabledAlgorithms = compression
+              $0.rpc.maxRequestPayloadSize = .max
             }
           )
         ),
@@ -167,6 +168,7 @@ final class HTTP2TransportTests: XCTestCase {
             transportSecurity: .plaintext,
             config: .defaults {
               $0.compression.enabledAlgorithms = compression
+              $0.rpc.maxRequestPayloadSize = .max
             }
           )
         ),
@@ -193,11 +195,19 @@ final class HTTP2TransportTests: XCTestCase {
     enabledCompression: CompressionAlgorithmSet
   ) throws -> GRPCClient<NIOClientTransport> {
     let transport: NIOClientTransport
+    var serviceConfig = ServiceConfig()
+    serviceConfig.loadBalancingConfig = [.roundRobin]
+    serviceConfig.methodConfig = [
+      MethodConfig(
+        // Applies to all RPCs
+        names: [MethodConfig.Name(service: "", method: "")],
+        maxRequestMessageBytes: .max,
+        maxResponseMessageBytes: .max
+      )
+    ]
 
     switch kind {
     case .posix:
-      var serviceConfig = ServiceConfig()
-      serviceConfig.loadBalancingConfig = [.roundRobin]
       let posix = try HTTP2ClientTransport.Posix(
         target: target,
         transportSecurity: .plaintext,
@@ -211,8 +221,6 @@ final class HTTP2TransportTests: XCTestCase {
 
     #if canImport(Network)
     case .transportServices:
-      var serviceConfig = ServiceConfig()
-      serviceConfig.loadBalancingConfig = [.roundRobin]
       let transportServices = try HTTP2ClientTransport.TransportServices(
         target: target,
         transportSecurity: .plaintext,
@@ -251,7 +259,11 @@ final class HTTP2TransportTests: XCTestCase {
         )
       }
 
-      let wrapped = HTTP2ClientTransport.WrappedChannel(takingOwnershipOf: channel, config: config)
+      let wrapped = HTTP2ClientTransport.WrappedChannel(
+        takingOwnershipOf: channel,
+        config: config,
+        serviceConfig: serviceConfig
+      )
       transport = NIOClientTransport(wrapped)
     }
 
@@ -1729,6 +1741,31 @@ final class HTTP2TransportTests: XCTestCase {
       }
     }
   }
+
+  func testLargeResponse() async throws {
+    let sizeInMiB = 8
+    let sizeInBytes = sizeInMiB * 1024 * 1024
+
+    try await self.forEachTransportPair { control, _, pair in
+      let input = ControlInput.with {
+        $0.echoMetadataInHeaders = false
+        $0.echoMetadataInTrailers = false
+        $0.numberOfMessages = 1
+        $0.payloadParameters = .with {
+          $0.content = 0
+          $0.size = sizeInBytes
+        }
+      }
+
+      let metadata: Metadata = ["test-key": "test-value"]
+      let request = ClientRequest(message: input, metadata: metadata)
+
+      try await control.unary(request: request) { response in
+        let message = try response.message
+        XCTAssertEqual(message.payload, Data(repeating: 0, count: sizeInBytes), "\(pair)")
+      }
+    }
+  }
 }
 
 extension [HTTP2TransportTests.Transport] {