فهرست منبع

Adopt the coalescing writer for clients (#1539)

Motivation:

In #1357 we introduced a message frames which coalesces writes into a
single buffer in a write-flush cycle to reduce the number of emitted
DATA frames. This PR adopts those changes for the client.

Modifications:

- Adjust the client state machine to use the coalescing writer

Results:

Small messages are coalesced in a flush cycle within a stream.

Co-authored-by: Cory Benfield <lukasa@apple.com>
George Barnett 2 سال پیش
والد
کامیت
254ea135be

+ 57 - 23
Sources/GRPC/GRPCClientChannelHandler.swift

@@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
       // Feed the request message into the state machine:
       let result = self.stateMachine.sendRequest(
         request.message,
-        compressed: request.compressed
+        compressed: request.compressed,
+        promise: promise
       )
-      switch result {
-      case let .success((buffer, maybeBuffer)):
-        let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
-        self.logger.trace("writing HTTP2 frame", metadata: [
-          MetadataKey.h2Payload: "DATA",
-          MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
-          MetadataKey.h2EndStream: "false",
-        ])
-        // If there's a second buffer, attach the promise to the second write.
-        let promise1 = maybeBuffer == nil ? promise : nil
-        context.write(self.wrapOutboundOut(frame1), promise: promise1)
 
-        if let actuallyBuffer = maybeBuffer {
-          let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
-          self.logger.trace("writing HTTP2 frame", metadata: [
-            MetadataKey.h2Payload: "DATA",
-            MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
-            MetadataKey.h2EndStream: "false",
-          ])
-          context.write(self.wrapOutboundOut(frame2), promise: promise)
-        }
+      switch result {
+      case .success:
+        ()
 
       case let .failure(writeError):
         switch writeError {
@@ -572,13 +556,37 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
       }
 
     case .end:
+      // About to send end: write any outbound messages first.
+      while let (result, promise) = self.stateMachine.nextRequest() {
+        switch result {
+        case let .success(buffer):
+          let framePayload: HTTP2Frame.FramePayload = .data(
+            .init(data: .byteBuffer(buffer), endStream: false)
+          )
+
+          self.logger.trace("writing HTTP2 frame", metadata: [
+            MetadataKey.h2Payload: "DATA",
+            MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
+            MetadataKey.h2EndStream: "false",
+          ])
+          context.write(self.wrapOutboundOut(framePayload), promise: promise)
+
+        case let .failure(error):
+          context.fireErrorCaught(error)
+          promise?.fail(error)
+          return
+        }
+      }
+
       // Okay: can we close the request stream?
       switch self.stateMachine.sendEndOfRequestStream() {
       case .success:
         // We can. Send an empty DATA frame with end-stream set.
         let empty = context.channel.allocator.buffer(capacity: 0)
-        let framePayload = HTTP2Frame.FramePayload
-          .data(.init(data: .byteBuffer(empty), endStream: true))
+        let framePayload: HTTP2Frame.FramePayload = .data(
+          .init(data: .byteBuffer(empty), endStream: true)
+        )
+
         self.logger.trace("writing HTTP2 frame", metadata: [
           MetadataKey.h2Payload: "DATA",
           MetadataKey.h2DataBytes: "0",
@@ -605,4 +613,30 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
       }
     }
   }
+
+  func flush(context: ChannelHandlerContext) {
+    // Drain any requests.
+    while let (result, promise) = self.stateMachine.nextRequest() {
+      switch result {
+      case let .success(buffer):
+        let framePayload: HTTP2Frame.FramePayload = .data(
+          .init(data: .byteBuffer(buffer), endStream: false)
+        )
+
+        self.logger.trace("writing HTTP2 frame", metadata: [
+          MetadataKey.h2Payload: "DATA",
+          MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
+          MetadataKey.h2EndStream: "false",
+        ])
+        context.write(self.wrapOutboundOut(framePayload), promise: promise)
+
+      case let .failure(error):
+        context.fireErrorCaught(error)
+        promise?.fail(error)
+        return
+      }
+    }
+
+    context.flush()
+  }
 }

+ 41 - 8
Sources/GRPC/GRPCClientStateMachine.swift

@@ -196,13 +196,18 @@ struct GRPCClientStateMachine {
   ///     request will be written.
   mutating func sendRequest(
     _ message: ByteBuffer,
-    compressed: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
+    compressed: Bool,
+    promise: EventLoopPromise<Void>? = nil
+  ) -> Result<Void, MessageWriteError> {
     return self.withStateAvoidingCoWs { state in
-      state.sendRequest(message, compressed: compressed)
+      state.sendRequest(message, compressed: compressed, promise: promise)
     }
   }
 
+  mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
+    return self.state.nextRequest()
+  }
+
   /// Closes the request stream.
   ///
   /// The client must be streaming requests in order to terminate the request stream. Valid
@@ -394,18 +399,21 @@ extension GRPCClientStateMachine.State {
   /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
   mutating func sendRequest(
     _ message: ByteBuffer,
-    compressed: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
-    let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
+    compressed: Bool,
+    promise: EventLoopPromise<Void>?
+  ) -> Result<Void, MessageWriteError> {
+    let result: Result<Void, MessageWriteError>
 
     switch self {
     case .clientActiveServerIdle(var writeState, let pendingReadState):
-      result = writeState.write(message, compressed: compressed)
+      let result = writeState.write(message, compressed: compressed, promise: promise)
       self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
+      return result
 
     case .clientActiveServerActive(var writeState, let readState):
-      result = writeState.write(message, compressed: compressed)
+      let result = writeState.write(message, compressed: compressed, promise: promise)
       self = .clientActiveServerActive(writeState: writeState, readState: readState)
+      return result
 
     case .clientClosedServerIdle,
          .clientClosedServerActive,
@@ -422,6 +430,31 @@ extension GRPCClientStateMachine.State {
     return result
   }
 
+  mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
+    switch self {
+    case .clientActiveServerIdle(var writeState, let pendingReadState):
+      self = .modifying
+      let result = writeState.next()
+      self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
+      return result
+
+    case .clientActiveServerActive(var writeState, let readState):
+      self = .modifying
+      let result = writeState.next()
+      self = .clientActiveServerActive(writeState: writeState, readState: readState)
+      return result
+
+    case .clientIdleServerIdle,
+         .clientClosedServerIdle,
+         .clientClosedServerActive,
+         .clientClosedServerClosed:
+      return nil
+
+    case .modifying:
+      preconditionFailure("State left as 'modifying'")
+    }
+  }
+
   /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
   mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
     let result: Result<Void, SendEndOfRequestStreamError>

+ 1 - 0
Sources/GRPC/LengthPrefixedMessageWriter.swift

@@ -15,6 +15,7 @@
  */
 import Foundation
 import NIOCore
+import NIOHPACK
 
 internal struct LengthPrefixedMessageWriter {
   static let metadataLength = 5

+ 40 - 29
Sources/GRPC/ReadWriteStates.swift

@@ -42,50 +42,61 @@ struct PendingWriteState {
       compression = nil
     }
 
-    let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator)
-    return .writing(self.arity, self.contentType, writer)
+    let writer = CoalescingLengthPrefixedMessageWriter(
+      compression: compression,
+      allocator: allocator
+    )
+    return .init(arity: self.arity, contentType: self.contentType, writer: writer)
   }
 }
 
 /// The write state of a stream.
-enum WriteState {
-  /// Writing may be attempted using the given writer.
-  case writing(MessageArity, ContentType, LengthPrefixedMessageWriter)
-
-  /// Writing may not be attempted: either a write previously failed or it is not valid for any
-  /// more messages to be written.
-  case notWriting
+struct WriteState {
+  private var arity: MessageArity
+  private var contentType: ContentType
+  private var writer: CoalescingLengthPrefixedMessageWriter
+  private var canWrite: Bool
+
+  init(
+    arity: MessageArity,
+    contentType: ContentType,
+    writer: CoalescingLengthPrefixedMessageWriter
+  ) {
+    self.arity = arity
+    self.contentType = contentType
+    self.writer = writer
+    self.canWrite = true
+  }
 
   /// Writes a message into a buffer using the `writer`.
   ///
   /// - Parameter message: The `Message` to write.
   mutating func write(
     _ message: ByteBuffer,
-    compressed: Bool
-  ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
-    switch self {
-    case .notWriting:
+    compressed: Bool,
+    promise: EventLoopPromise<Void>?
+  ) -> Result<Void, MessageWriteError> {
+    guard self.canWrite else {
       return .failure(.cardinalityViolation)
+    }
 
-    case .writing(let writeArity, let contentType, var writer):
-      self = .notWriting
-      let buffers: (ByteBuffer, ByteBuffer?)
+    self.writer.append(buffer: message, compress: compressed, promise: promise)
 
-      do {
-        buffers = try writer.write(buffer: message, compressed: compressed)
-      } catch {
-        self = .notWriting
-        return .failure(.serializationFailed)
-      }
+    switch self.arity {
+    case .one:
+      self.canWrite = false
+    case .many:
+      ()
+    }
 
-      // If we only expect to write one message then we're no longer writable.
-      if case .one = writeArity {
-        self = .notWriting
-      } else {
-        self = .writing(writeArity, contentType, writer)
-      }
+    return .success(())
+  }
 
-      return .success(buffers)
+  mutating func next() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
+    if let next = self.writer.next() {
+      return (next.0.mapError { _ in .serializationFailed }, next.1)
+    } else {
+      return nil
     }
   }
 }

+ 11 - 10
Tests/GRPCTests/GRPCClientStateMachineTests.swift

@@ -154,14 +154,7 @@ extension GRPCClientStateMachineTests {
     stateMachine.sendRequest(
       ByteBuffer(string: request),
       compressed: false
-    ).assertSuccess { buffers in
-      var buffer = buffers.0
-      XCTAssertNil(buffers.1)
-      // Remove the length and compression flag prefix.
-      buffer.moveReaderIndex(forwardBy: 5)
-      let data = buffer.readString(length: buffer.readableBytes)!
-      XCTAssertEqual(request, data)
-    }
+    ).assertSuccess()
   }
 
   func testSendRequestFromIdle() {
@@ -1299,10 +1292,18 @@ extension PendingWriteState {
 
 extension WriteState {
   static func one() -> WriteState {
-    return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
+    return .init(
+      arity: .one,
+      contentType: .protobuf,
+      writer: .init(compression: .none, allocator: .init())
+    )
   }
 
   static func many() -> WriteState {
-    return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
+    return .init(
+      arity: .many,
+      contentType: .protobuf,
+      writer: .init(compression: .none, allocator: .init())
+    )
   }
 }