Browse Source

Emit fewer flushes when handling RPCs on the server (#1110)

Motivation:

The changes in #1101 included a change in when flushes were emitted.
For unary RPCs this led to an additional flush, which in some
circumstances led to a drop in QPS of approximately 5%.

Modifications:

- Introduce an option as to whether a flush should be emitted when
  writing initial response metadata
- Fix a bug where the request to flush or not for messages was ignored

Result:

A 5% increase in QPS for the unary-single benchmark (single connection,
one rpc at a time).
George Barnett 5 years ago
parent
commit
6f920564d9

+ 1 - 1
Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift

@@ -353,7 +353,7 @@ public final class BidirectionalStreamingServerHandler<
   ) {
   ) {
     switch part {
     switch part {
     case let .metadata(headers):
     case let .metadata(headers):
-      self.context.responseWriter.sendMetadata(headers, promise: promise)
+      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
 
 
     case let .message(message, metadata):
     case let .message(message, metadata):
       do {
       do {

+ 1 - 1
Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift

@@ -339,7 +339,7 @@ public final class ClientStreamingServerHandler<
   ) {
   ) {
     switch part {
     switch part {
     case let .metadata(headers):
     case let .metadata(headers):
-      self.context.responseWriter.sendMetadata(headers, promise: promise)
+      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
 
 
     case let .message(message, metadata):
     case let .message(message, metadata):
       do {
       do {

+ 2 - 1
Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift

@@ -49,8 +49,9 @@ internal protocol GRPCServerResponseWriter {
   /// Send the initial response metadata.
   /// Send the initial response metadata.
   /// - Parameters:
   /// - Parameters:
   ///   - metadata: The user-provided metadata to send to the client.
   ///   - metadata: The user-provided metadata to send to the client.
+  ///   - flush: Whether a flush should be emitted after writing the metadata.
   ///   - promise: A promise to complete once the metadata has been handled.
   ///   - promise: A promise to complete once the metadata has been handled.
-  func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?)
+  func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?)
 
 
   /// Send the serialized bytes of a response message.
   /// Send the serialized bytes of a response message.
   /// - Parameters:
   /// - Parameters:

+ 1 - 1
Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift

@@ -279,7 +279,7 @@ public final class ServerStreamingServerHandler<
   ) {
   ) {
     switch part {
     switch part {
     case let .metadata(headers):
     case let .metadata(headers):
-      self.context.responseWriter.sendMetadata(headers, promise: promise)
+      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
 
 
     case let .message(message, metadata):
     case let .message(message, metadata):
       do {
       do {

+ 2 - 1
Sources/GRPC/CallHandlers/UnaryServerHandler.swift

@@ -262,7 +262,8 @@ public final class UnaryServerHandler<
   ) {
   ) {
     switch part {
     switch part {
     case let .metadata(headers):
     case let .metadata(headers):
-      self.context.responseWriter.sendMetadata(headers, promise: promise)
+      // We can delay this flush until the end of the RPC.
+      self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise)
 
 
     case let .message(message, metadata):
     case let .message(message, metadata):
       do {
       do {

+ 22 - 24
Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift

@@ -170,7 +170,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
 
 
     switch responsePart {
     switch responsePart {
     case let .metadata(headers):
     case let .metadata(headers):
-      self.sendMetadata(headers, promise: promise)
+      // We're in 'write' so we're using the old type of RPC handler which emits its own flushes,
+      // no need to emit an extra one.
+      self.sendMetadata(headers, flush: false, promise: promise)
 
 
     case let .message(buffer, metadata):
     case let .message(buffer, metadata):
       self.sendMessage(buffer, metadata: metadata, promise: promise)
       self.sendMessage(buffer, metadata: metadata, promise: promise)
@@ -181,13 +183,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
   }
   }
 
 
   internal func flush(context: ChannelHandlerContext) {
   internal func flush(context: ChannelHandlerContext) {
-    if self.isReading {
-      // We're already reading; record the flush and emit it when the read completes.
-      self.flushPending = true
-    } else {
-      // Not reading: flush now.
-      context.flush()
-    }
+    self.markFlushPoint()
   }
   }
 
 
   /// Called when the pipeline has finished configuring.
   /// Called when the pipeline has finished configuring.
@@ -280,17 +276,15 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
 
 
   internal func sendMetadata(
   internal func sendMetadata(
     _ headers: HPACKHeaders,
     _ headers: HPACKHeaders,
+    flush: Bool,
     promise: EventLoopPromise<Void>?
     promise: EventLoopPromise<Void>?
   ) {
   ) {
     switch self.state.send(headers: headers) {
     switch self.state.send(headers: headers) {
     case let .success(headers):
     case let .success(headers):
       let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
       let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
-
-      if self.isReading {
-        self.flushPending = true
-      } else {
-        self.context.flush()
+      if flush {
+        self.markFlushPoint()
       }
       }
 
 
     case let .failure(error):
     case let .failure(error):
@@ -313,11 +307,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
     case let .success(buffer):
     case let .success(buffer):
       let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
       let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
-
-      if self.isReading {
-        self.flushPending = true
-      } else {
-        self.context.flush()
+      if metadata.flush {
+        self.markFlushPoint()
       }
       }
 
 
     case let .failure(error):
     case let .failure(error):
@@ -335,15 +326,22 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
       // Always end stream for status and trailers.
       // Always end stream for status and trailers.
       let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
       let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
       self.context.write(self.wrapOutboundOut(payload), promise: promise)
-
-      if self.isReading {
-        self.flushPending = true
-      } else {
-        self.context.flush()
-      }
+      // We'll always flush on end.
+      self.markFlushPoint()
 
 
     case let .failure(error):
     case let .failure(error):
       promise?.fail(error)
       promise?.fail(error)
     }
     }
   }
   }
+
+  /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
+  /// or emit a flush now if we are not.
+  private func markFlushPoint() {
+    if self.isReading {
+      self.flushPending = true
+    } else {
+      self.flushPending = false
+      self.context.flush()
+    }
+  }
 }
 }

+ 1 - 1
Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift

@@ -656,7 +656,7 @@ extension ServerMessageEncoding {
 }
 }
 
 
 class NoOpResponseWriter: GRPCServerResponseWriter {
 class NoOpResponseWriter: GRPCServerResponseWriter {
-  func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
+  func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
     promise?.succeed(())
     promise?.succeed(())
   }
   }
 
 

+ 1 - 1
Tests/GRPCTests/UnaryServerHandlerTests.swift

@@ -27,7 +27,7 @@ final class ResponseRecorder: GRPCServerResponseWriter {
   var status: GRPCStatus?
   var status: GRPCStatus?
   var trailers: HPACKHeaders?
   var trailers: HPACKHeaders?
 
 
-  func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
+  func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
     XCTAssertNil(self.metadata)
     XCTAssertNil(self.metadata)
     self.metadata = metadata
     self.metadata = metadata
     promise?.succeed(())
     promise?.succeed(())