Browse Source

Adopt the coalescing writer for servers (#1546)

Motivation:

In #1357 we introduced a message frames which coalesces writes into a
single buffer in a write-flush cycle to reduce the number of emitted
DATA frames. This PR adopts those changes for the server.

Modifications:

- Adjust the server state machine and handler to use the coalescing
  writer

Results:

Small messages are coalesced in a flush cycle within a stream.

Co-authored-by: Cory Benfield <lukasa@apple.com>
George Barnett 2 years ago
parent
commit
4715a1dfaf

+ 23 - 14
Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift

@@ -181,6 +181,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
     self.isReading = false
 
     if self.flushPending {
+      self.deliverPendingResponses()
       self.flushPending = false
       context.flush()
     }
@@ -188,6 +189,18 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
     context.fireChannelReadComplete()
   }
 
+  private func deliverPendingResponses() {
+    while let (result, promise) = self.state.nextResponse() {
+      switch result {
+      case let .success(buffer):
+        let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
+        self.context.write(self.wrapOutboundOut(payload), promise: promise)
+      case let .failure(error):
+        promise?.fail(error)
+      }
+    }
+  }
+
   /// Called when the pipeline has finished configuring.
   private func configured() {
     switch self.state.pipelineConfigured() {
@@ -288,23 +301,14 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
     metadata: MessageMetadata,
     promise: EventLoopPromise<Void>?
   ) {
-    let writeBuffer = self.state.send(
+    let result = self.state.send(
       buffer: buffer,
-      allocator: self.context.channel.allocator,
-      compress: metadata.compress
+      compress: metadata.compress,
+      promise: promise
     )
 
-    switch writeBuffer {
-    case let .success((buffer, maybeBuffer)):
-      if let actuallyBuffer = maybeBuffer {
-        let payload1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
-        self.context.write(self.wrapOutboundOut(payload1), promise: nil)
-        let payload2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
-        self.context.write(self.wrapOutboundOut(payload2), promise: promise)
-      } else {
-        let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
-        self.context.write(self.wrapOutboundOut(payload), promise: promise)
-      }
+    switch result {
+    case .success:
       if metadata.flush {
         self.markFlushPoint()
       }
@@ -319,6 +323,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
     trailers: HPACKHeaders,
     promise: EventLoopPromise<Void>?
   ) {
+    // About to end the stream: send any pending responses.
+    self.deliverPendingResponses()
+
     switch self.state.send(status: status, trailers: trailers) {
     case let .sendTrailers(trailers):
       self.sendTrailers(trailers, promise: promise)
@@ -349,6 +356,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
     if self.isReading {
       self.flushPending = true
     } else {
+      // About to flush: send any pending responses.
+      self.deliverPendingResponses()
       self.flushPending = false
       self.context.flush()
     }

+ 59 - 53
Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift

@@ -53,7 +53,7 @@ extension HTTP2ToRawGRPCStateMachine {
     var reader: LengthPrefixedMessageReader
 
     /// A length prefixed message writer for response messages.
-    var writer: LengthPrefixedMessageWriter
+    var writer: CoalescingLengthPrefixedMessageWriter
 
     /// The content type of the RPC.
     var contentType: ContentType
@@ -78,7 +78,7 @@ extension HTTP2ToRawGRPCStateMachine {
     var reader: LengthPrefixedMessageReader
 
     /// A length prefixed message writer for response messages.
-    var writer: LengthPrefixedMessageWriter
+    var writer: CoalescingLengthPrefixedMessageWriter
 
     /// The content type of the RPC.
     var contentType: ContentType
@@ -113,7 +113,7 @@ extension HTTP2ToRawGRPCStateMachine {
     var reader: LengthPrefixedMessageReader
 
     /// A length prefixed message writer for response messages.
-    var writer: LengthPrefixedMessageWriter
+    var writer: CoalescingLengthPrefixedMessageWriter
 
     /// Whether to normalize user-provided metadata.
     var normalizeHeaders: Bool
@@ -130,7 +130,7 @@ extension HTTP2ToRawGRPCStateMachine {
     var reader: LengthPrefixedMessageReader
 
     /// A length prefixed message writer for response messages.
-    var writer: LengthPrefixedMessageWriter
+    var writer: CoalescingLengthPrefixedMessageWriter
 
     /// Whether to normalize user-provided metadata.
     var normalizeHeaders: Bool
@@ -502,8 +502,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
     from headers: HPACKHeaders,
     encoding: ServerMessageEncoding,
     allocator: ByteBufferAllocator
-  ) -> (LengthPrefixedMessageWriter, String?) {
-    let writer: LengthPrefixedMessageWriter
+  ) -> (CoalescingLengthPrefixedMessageWriter, String?) {
+    let writer: CoalescingLengthPrefixedMessageWriter
     let responseEncoding: String?
 
     switch encoding {
@@ -519,12 +519,12 @@ extension HTTP2ToRawGRPCStateMachine.State {
         configuration.enabledAlgorithms.contains($0)
       }
 
-      writer = LengthPrefixedMessageWriter(compression: algorithm, allocator: allocator)
+      writer = .init(compression: algorithm, allocator: allocator)
       responseEncoding = algorithm?.name
 
     case .disabled:
       // The server doesn't have compression enabled.
-      writer = LengthPrefixedMessageWriter(compression: .none, allocator: allocator)
+      writer = .init(compression: .none, allocator: allocator)
       responseEncoding = nil
     }
 
@@ -623,44 +623,23 @@ extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
 
 // MARK: - Send Data
 
-extension HTTP2ToRawGRPCStateMachine {
-  static func writeGRPCFramedMessage(
-    _ buffer: ByteBuffer,
-    compress: Bool,
-    writer: inout LengthPrefixedMessageWriter
-  ) -> Result<(ByteBuffer, ByteBuffer?), Error> {
-    do {
-      let buffers = try writer.write(buffer: buffer, compressed: compress)
-      return .success(buffers)
-    } catch {
-      return .failure(error)
-    }
-  }
-}
-
 extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
   mutating func send(
     buffer: ByteBuffer,
-    compress: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), Error> {
-    return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
-      buffer,
-      compress: compress,
-      writer: &self.writer
-    )
+    compress: Bool,
+    promise: EventLoopPromise<Void>?
+  ) {
+    self.writer.append(buffer: buffer, compress: compress, promise: promise)
   }
 }
 
 extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
   mutating func send(
     buffer: ByteBuffer,
-    compress: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), Error> {
-    return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
-      buffer,
-      compress: compress,
-      writer: &self.writer
-    )
+    compress: Bool,
+    promise: EventLoopPromise<Void>?
+  ) {
+    self.writer.append(buffer: buffer, compress: compress, promise: promise)
   }
 }
 
@@ -879,10 +858,14 @@ extension HTTP2ToRawGRPCStateMachine {
   /// Send a response buffer.
   mutating func send(
     buffer: ByteBuffer,
-    allocator: ByteBufferAllocator,
-    compress: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), Error> {
-    self.state.send(buffer: buffer, allocator: allocator, compress: compress)
+    compress: Bool,
+    promise: EventLoopPromise<Void>?
+  ) -> Result<Void, Error> {
+    self.state.send(buffer: buffer, compress: compress, promise: promise)
+  }
+
+  mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
+    self.state.nextResponse()
   }
 
   /// Send status and trailers.
@@ -1070,9 +1053,9 @@ extension HTTP2ToRawGRPCStateMachine.State {
 
   mutating func send(
     buffer: ByteBuffer,
-    allocator: ByteBufferAllocator,
-    compress: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), Error> {
+    compress: Bool,
+    promise: EventLoopPromise<Void>?
+  ) -> Result<Void, Error> {
     switch self {
     case .requestIdleResponseIdle:
       preconditionFailure("Invalid state: the request stream is still closed")
@@ -1083,24 +1066,47 @@ extension HTTP2ToRawGRPCStateMachine.State {
       return .failure(error)
 
     case var .requestOpenResponseOpen(state):
-      let result = state.send(
-        buffer: buffer,
-        compress: compress
-      )
+      self = .requestClosedResponseClosed
+      state.send(buffer: buffer, compress: compress, promise: promise)
+      self = .requestOpenResponseOpen(state)
+      return .success(())
+
+    case var .requestClosedResponseOpen(state):
+      self = .requestClosedResponseClosed
+      state.send(buffer: buffer, compress: compress, promise: promise)
+      self = .requestClosedResponseOpen(state)
+      return .success(())
+
+    case .requestOpenResponseClosed,
+         .requestClosedResponseClosed:
+      return .failure(GRPCError.AlreadyComplete())
+    }
+  }
+
+  mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
+    switch self {
+    case .requestIdleResponseIdle:
+      preconditionFailure("Invalid state: the request stream is still closed")
+
+    case .requestOpenResponseIdle,
+         .requestClosedResponseIdle:
+      return nil
+
+    case var .requestOpenResponseOpen(state):
+      self = .requestClosedResponseClosed
+      let result = state.writer.next()
       self = .requestOpenResponseOpen(state)
       return result
 
     case var .requestClosedResponseOpen(state):
-      let result = state.send(
-        buffer: buffer,
-        compress: compress
-      )
+      self = .requestClosedResponseClosed
+      let result = state.writer.next()
       self = .requestClosedResponseOpen(state)
       return result
 
     case .requestOpenResponseClosed,
          .requestClosedResponseClosed:
-      return .failure(GRPCError.AlreadyComplete())
+      return nil
     }
   }
 

+ 16 - 8
Tests/GRPCTests/CompressionTests.swift

@@ -16,6 +16,7 @@
 import EchoImplementation
 import EchoModel
 import GRPC
+import NIOConcurrencyHelpers
 import NIOCore
 import NIOHPACK
 import NIOPosix
@@ -268,21 +269,27 @@ class MessageCompressionTests: GRPCTestCase {
 
   func testDecompressionLimitIsRespectedByClientForStreamingCall() throws {
     try self.setupServer(encoding: .enabled(.init(decompressionLimit: .absolute(2048))))
-    self
-      .setupClient(encoding: .enabled(.init(
-        forRequests: .gzip,
-        decompressionLimit: .absolute(1024)
-      )))
+    self.setupClient(
+      encoding: .enabled(.init(forRequests: .gzip, decompressionLimit: .absolute(1024)))
+    )
+
+    let responsePromise = self.group.next().makePromise(of: Echo_EchoResponse.self)
+    let lock = NIOLock()
+    var responseCount = 0
 
-    var responses: [Echo_EchoResponse] = []
     let update = self.echo.update {
-      responses.append($0)
+      lock.withLock {
+        responseCount += 1
+      }
+      responsePromise.succeed($0)
     }
 
     let status = self.expectation(description: "received status")
 
     // Smaller than limit.
     update.sendMessage(.with { $0.text = "foo" }, promise: nil)
+    XCTAssertNoThrow(try responsePromise.futureResult.wait())
+
     // Should be just over the limit.
     update.sendMessage(.with { $0.text = String(repeating: "x", count: 1024) }, promise: nil)
     update.sendEnd(promise: nil)
@@ -292,7 +299,8 @@ class MessageCompressionTests: GRPCTestCase {
     }.assertEqual(.resourceExhausted, fulfill: status)
 
     self.wait(for: [status], timeout: self.defaultTimeout)
-    XCTAssertEqual(responses.count, 1)
+    let receivedResponses = lock.withLock { responseCount }
+    XCTAssertEqual(receivedResponses, 1)
   }
 
   func testIdentityCompressionIsntCompression() throws {

+ 47 - 8
Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift

@@ -623,8 +623,8 @@ class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
       for _ in 0 ..< 5 {
         let action = machine.send(
           buffer: buffer,
-          allocator: self.allocator,
-          compress: false
+          compress: false,
+          promise: nil
         )
         assertThat(action, .is(.success()))
       }
@@ -633,8 +633,8 @@ class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
       // write as normal.
       let action = machine.send(
         buffer: buffer,
-        allocator: self.allocator,
-        compress: true
+        compress: true,
+        promise: nil
       )
       assertThat(action, .is(.success()))
     }
@@ -649,8 +649,8 @@ class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
     let buffer = ByteBuffer(repeating: 0, count: 1024)
     let action2 = machine.send(
       buffer: buffer,
-      allocator: self.allocator,
-      compress: false
+      compress: false,
+      promise: nil
     )
     assertThat(action2, .is(.failure()))
   }
@@ -662,12 +662,51 @@ class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
     let buffer = ByteBuffer(repeating: 0, count: 1024)
     let action2 = machine.send(
       buffer: buffer,
-      allocator: self.allocator,
-      compress: false
+      compress: false,
+      promise: nil
     )
     assertThat(action2, .is(.failure()))
   }
 
+  // MARK: Next Response
+
+  func testNextResponseBeforeMetadata() {
+    var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
+    XCTAssertNil(machine.nextResponse())
+  }
+
+  func testNextResponseWhenOpen() throws {
+    for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
+      var machine = self.makeStateMachine(state: startingState)
+
+      // No response buffered yet.
+      XCTAssertNil(machine.nextResponse())
+
+      let buffer = ByteBuffer(repeating: 0, count: 1024)
+      machine.send(buffer: buffer, compress: false, promise: nil).assertSuccess()
+
+      let (framedBuffer, promise) = try XCTUnwrap(machine.nextResponse())
+      XCTAssertNil(promise) // Didn't provide a promise.
+      framedBuffer.assertSuccess()
+
+      // No more responses.
+      XCTAssertNil(machine.nextResponse())
+    }
+  }
+
+  func testNextResponseWhenClosed() throws {
+    var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
+    let action = machine.send(status: .ok, trailers: [:])
+    switch action {
+    case .sendTrailersAndFinish:
+      ()
+    default:
+      XCTFail("Expected 'sendTrailersAndFinish' but got \(action)")
+    }
+
+    XCTAssertNil(machine.nextResponse())
+  }
+
   // MARK: Send End
 
   func testSendEndWhenResponseStreamIsIdle() {