浏览代码

Allow responses to be sent without a promise (#1020)

Motivation:

For server streming RPCs, users must send each response one-at-a-time,
there is also no option for callers to avoid allocating the promise
associated with writing that response.

Modifications:

- Add methods to send a single response or sequence of responses  with
  an optional promise
- Have the old implementations which return a future delegate to these
  methods
- Update the echo service provider so that we hit these paths
- Update the QPS streaming benchmark to avoid allocating the future.
- Update the docs for the QPS benchmark to be a little easier to follow
  and move the scenarios from an inline JSON string to JSON files.

Result:

- Users can send responses on streaming RPCs without allocating a future
- Users can send multple responses at a time (avoiding excessive
  flushing)
- A small (~1%) improvement in QPS in the streaming benchmark
- Resolves #539
George Barnett 5 年之前
父节点
当前提交
dc05942d34

+ 54 - 19
Performance/QPSBenchmark/README.md

@@ -1,34 +1,69 @@
 #  QPS Benchmark worker
 
-An implementation of the QPS worker for benchmarking described in the  
+An implementation of the QPS worker for benchmarking described in the
 [gRPC benchmarking guide](https://grpc.io/docs/guides/benchmarking/)
 
 ## Building
+
 To rebuild the proto files run `make generate-qps-worker`.
 
-The benchmarks can be built in the usual SPM way but release mode is strongly recommended - `swift build -c release`
+The benchmarks can be built in the usual Swift Package Manager way but release
+mode is strongly recommended: `swift build -c release`
 
 ## Running the benchmarks
 
-To date the changes to gRPC to run the tests automatically have not been pushed upstream.
+To date the changes to gRPC to run the tests automatically have not been pushed
+upstream. You can easily run the tests locally using the C++ driver program.
+
+This can be built using Bazel from the root of a checkout of the
+[grpc/grpc](https://github.com/grpc/grpc) repository with:
+
+```sh
+bazel build test/cpp/qps:qps_json_driver
+```
+
+The `qps_json_driver` binary will be in `bazel-bin/test/cpp/qps/`.
+
+For examples of running benchmarking tests proceed as follows.
+
+> **Note:** the driver may also be built (via CMake) as a side effect of
+> running the performance testing script (`./tools/run_tests/run_performance_tests.py`)
+> from [grpc/grpc](https://github.com/grpc/grpc).
+>
+> The script is also the source of the scenarios listed below.
+
+### Setting Up the Environment
+
+1. Open a terminal window and run the QPSBenchmark, this will become the server when instructed by the driver.
+
+   ```sh
+   swift run -c release QPSBenchmark --driver_port 10400
+   ```
+
+
+2. Open another terminal window and run QPSBenchmark, this will become the client when instructed by the driver.
+
+   ```sh
+   swift run -c release QPSBenchmark --driver_port 10410
+   ```
+
+3. Configure the environment for the driver:
+
+   ```sh
+   export QPS_WORKERS="localhost:10400,localhost:10410"
+   ```
 
-You can easily run the tests locally using the C++ driver program from gRPC - note this is built as a side effect 
-of running the C++ tests which can be done in a gRPC checkout with 
-`./tools/run_tests/run_performance_tests.py -l c++ -r cpp_protobuf_async_unary_qps_unconstrained_insecure`
+4. Invoke the driver with a scenario file, for example:
 
-For examples of running benchmarking tests proceed as follows
+   ```sh
+   /path/to/qps_json_driver --scenario_file=/path/to/scenario.json
+   ```
 
-### Unary Benchmark
-1. Open a terminal window and run the QPSBenchmark - `swift run -c release QPSBenchmark --driver_port 10400`.  
-This will become the server when instructed by the driver.
-2. Open another terminal window and run QPSBenchmark - `swift run -c release QPSBenchmark --driver_port 10410`.
-This will become the client when instructed by the driver.
-3. Use the driver to control the test.  In your checkout of [gRPC](https://github.com/grpc/grpc) 
-configure the environment with `export QPS_WORKERS="localhost:10400,localhost:10410"` then run
-`cmake/build/qps_json_driver '--scenarios_json={"scenarios": [{"name": "swift_protobuf_async_unary_qps_unconstrained_insecure", "warmup_seconds": 5, "benchmark_seconds": 30, "num_servers": 1, "server_config": {"async_server_threads": 0, "channel_args": [{"str_value": "throughput", "name": "grpc.optimization_target"}], "server_type": "ASYNC_SERVER", "security_params": null, "threads_per_cq": 0, "server_processes": 0}, "client_config": {"security_params": null, "channel_args": [{"str_value": "throughput", "name": "grpc.optimization_target"}], "async_client_threads": 0, "outstanding_rpcs_per_channel": 100, "rpc_type": "UNARY", "payload_config": {"simple_params": {"resp_size": 0, "req_size": 0}}, "client_channels": 64, "threads_per_cq": 0, "load_params": {"closed_loop": {}}, "client_type": "ASYNC_CLIENT", "histogram_params": {"max_possible": 60000000000.0, "resolution": 0.01}, "client_processes": 0}, "num_clients": 0}]}' --scenario_result_file=scenario_result.json`
-This will run a test of asynchronous unary client and server, using all the cores on the machine.  
-64 channels each with 100 outstanding requests.
+### Scenarios
 
-### Ping Pong Benchmark
+- `scenarios/unary-unconstrained.json`: will run a test with unary RPCs
+  using all cores on the machine. 64 clients will connect to the server, each
+  enqueuing up to 100 requests.
+- `scenarios/bidirectional-ping-pong.json`: will run bidirectional streaming
+  RPCs.
 
-As above but drive with `cmake/build/qps_json_driver '--scenarios_json={"scenarios": [{"name": "swift_protobuf_async_streaming_ping_pong_insecure", "warmup_seconds": 5, "benchmark_seconds": 30, "num_servers": 1, "server_config": {"async_server_threads": 1, "channel_args": [{"str_value": "latency", "name": "grpc.optimization_target"}, {"int_value": 1, "name": "grpc.minimal_stack"}], "server_type": "ASYNC_SERVER", "security_params": null, "threads_per_cq": 0, "server_processes": 0}, "client_config": {"security_params": null, "channel_args": [{"str_value": "latency", "name": "grpc.optimization_target"}, {"int_value": 1, "name": "grpc.minimal_stack"}], "async_client_threads": 1, "outstanding_rpcs_per_channel": 1, "rpc_type": "STREAMING", "payload_config": {"simple_params": {"resp_size": 0, "req_size": 0}}, "client_channels": 1, "threads_per_cq": 0, "load_params": {"closed_loop": {}}, "client_type": "ASYNC_CLIENT", "histogram_params": {"max_possible": 60000000000.0, "resolution": 0.01}, "client_processes": 0}, "num_clients": 1}]}' --scenario_result_file=scenario_result.json`

+ 1 - 1
Performance/QPSBenchmark/Sources/QPSBenchmark/Runtime/BenchmarkServiceImpl.swift

@@ -43,7 +43,7 @@ final class AsyncQPSServerImpl: Grpc_Testing_BenchmarkServiceProvider {
       case let .message(request):
         do {
           let response = try AsyncQPSServerImpl.processSimpleRPC(request: request)
-          _ = context.sendResponse(response)
+          context.sendResponse(response, promise: nil)
         } catch {
           context.statusPromise.fail(error)
         }

+ 61 - 0
Performance/QPSBenchmark/scenarios/bidirectional-ping-pong.json

@@ -0,0 +1,61 @@
+{
+  "scenarios": [
+    {
+      "name": "swift_protobuf_async_streaming_ping_pong_insecure",
+      "warmup_seconds": 5,
+      "benchmark_seconds": 30,
+      "num_servers": 1,
+      "server_config": {
+        "async_server_threads": 1,
+        "channel_args": [
+          {
+            "str_value": "latency",
+            "name": "grpc.optimization_target"
+          },
+          {
+            "int_value": 1,
+            "name": "grpc.minimal_stack"
+          }
+        ],
+        "server_type": "ASYNC_SERVER",
+        "security_params": null,
+        "threads_per_cq": 0,
+        "server_processes": 0
+      },
+      "client_config": {
+        "security_params": null,
+        "channel_args": [
+          {
+            "str_value": "latency",
+            "name": "grpc.optimization_target"
+          },
+          {
+            "int_value": 1,
+            "name": "grpc.minimal_stack"
+          }
+        ],
+        "async_client_threads": 1,
+        "outstanding_rpcs_per_channel": 1,
+        "rpc_type": "STREAMING",
+        "payload_config": {
+          "simple_params": {
+            "resp_size": 0,
+            "req_size": 0
+          }
+        },
+        "client_channels": 1,
+        "threads_per_cq": 0,
+        "load_params": {
+          "closed_loop": {}
+        },
+        "client_type": "ASYNC_CLIENT",
+        "histogram_params": {
+          "max_possible": 60000000000,
+          "resolution": 0.01
+        },
+        "client_processes": 0
+      },
+      "num_clients": 1
+    }
+  ]
+}

+ 53 - 0
Performance/QPSBenchmark/scenarios/unary-unconstrained.json

@@ -0,0 +1,53 @@
+{
+  "scenarios": [
+    {
+      "name": "swift_protobuf_async_unary_qps_unconstrained_insecure",
+      "warmup_seconds": 5,
+      "benchmark_seconds": 30,
+      "num_servers": 1,
+      "server_config": {
+        "async_server_threads": 0,
+        "channel_args": [
+          {
+            "str_value": "throughput",
+            "name": "grpc.optimization_target"
+          }
+        ],
+        "server_type": "ASYNC_SERVER",
+        "security_params": null,
+        "threads_per_cq": 0,
+        "server_processes": 0
+      },
+      "client_config": {
+        "security_params": null,
+        "channel_args": [
+          {
+            "str_value": "throughput",
+            "name": "grpc.optimization_target"
+          }
+        ],
+        "async_client_threads": 0,
+        "outstanding_rpcs_per_channel": 100,
+        "rpc_type": "UNARY",
+        "payload_config": {
+          "simple_params": {
+            "resp_size": 0,
+            "req_size": 0
+          }
+        },
+        "client_channels": 64,
+        "threads_per_cq": 0,
+        "load_params": {
+          "closed_loop": {}
+        },
+        "client_type": "ASYNC_CLIENT",
+        "histogram_params": {
+          "max_possible": 60000000000,
+          "resolution": 0.01
+        },
+        "client_processes": 0
+      },
+      "num_clients": 0
+    }
+  ]
+}

+ 28 - 24
Sources/Examples/Echo/Implementation/EchoProvider.swift

@@ -21,10 +21,13 @@ import NIO
 public class EchoProvider: Echo_EchoProvider {
   public init() {}
 
-  public func get(request: Echo_EchoRequest,
-                  context: StatusOnlyCallContext) -> EventLoopFuture<Echo_EchoResponse> {
-    var response = Echo_EchoResponse()
-    response.text = "Swift echo get: " + request.text
+  public func get(
+    request: Echo_EchoRequest,
+    context: StatusOnlyCallContext
+  ) -> EventLoopFuture<Echo_EchoResponse> {
+    let response = Echo_EchoResponse.with {
+      $0.text = "Swift echo get: " + request.text
+    }
     return context.eventLoop.makeSucceededFuture(response)
   }
 
@@ -32,18 +35,19 @@ public class EchoProvider: Echo_EchoProvider {
     request: Echo_EchoRequest,
     context: StreamingResponseCallContext<Echo_EchoResponse>
   ) -> EventLoopFuture<GRPCStatus> {
-    var endOfSendOperationQueue = context.eventLoop.makeSucceededFuture(())
-    let parts = request.text.components(separatedBy: " ")
-    for (i, part) in parts.enumerated() {
-      var response = Echo_EchoResponse()
-      response.text = "Swift echo expand (\(i)): \(part)"
-      endOfSendOperationQueue = endOfSendOperationQueue.flatMap { context.sendResponse(response) }
+    let responses = request.text.components(separatedBy: " ").lazy.enumerated().map { i, part in
+      Echo_EchoResponse.with {
+        $0.text = "Swift echo expand (\(i)): \(part)"
+      }
     }
-    return endOfSendOperationQueue.map { GRPCStatus.ok }
+
+    context.sendResponses(responses, promise: nil)
+    return context.eventLoop.makeSucceededFuture(.ok)
   }
 
-  public func collect(context: UnaryResponseCallContext<Echo_EchoResponse>)
-    -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
+  public func collect(
+    context: UnaryResponseCallContext<Echo_EchoResponse>
+  ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
     var parts: [String] = []
     return context.eventLoop.makeSucceededFuture({ event in
       switch event {
@@ -51,29 +55,29 @@ public class EchoProvider: Echo_EchoProvider {
         parts.append(message.text)
 
       case .end:
-        var response = Echo_EchoResponse()
-        response.text = "Swift echo collect: " + parts.joined(separator: " ")
+        let response = Echo_EchoResponse.with {
+          $0.text = "Swift echo collect: " + parts.joined(separator: " ")
+        }
         context.responsePromise.succeed(response)
       }
     })
   }
 
-  public func update(context: StreamingResponseCallContext<Echo_EchoResponse>)
-    -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
-    var endOfSendOperationQueue = context.eventLoop.makeSucceededFuture(())
+  public func update(
+    context: StreamingResponseCallContext<Echo_EchoResponse>
+  ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
     var count = 0
     return context.eventLoop.makeSucceededFuture({ event in
       switch event {
       case let .message(message):
-        var response = Echo_EchoResponse()
-        response.text = "Swift echo update (\(count)): \(message.text)"
-        endOfSendOperationQueue = endOfSendOperationQueue.flatMap { context.sendResponse(response) }
+        let response = Echo_EchoResponse.with {
+          $0.text = "Swift echo update (\(count)): \(message.text)"
+        }
         count += 1
+        context.sendResponse(response, promise: nil)
 
       case .end:
-        endOfSendOperationQueue
-          .map { GRPCStatus.ok }
-          .cascade(to: context.statusPromise)
+        context.statusPromise.succeed(.ok)
       }
     })
   }

+ 98 - 13
Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift

@@ -44,14 +44,65 @@ open class StreamingResponseCallContext<ResponsePayload>: ServerCallContextBase
 
   /// Send a response to the client.
   ///
-  /// - Parameter message: The message to send to the client.
-  /// - Parameter compression: Whether compression should be used for this response. If compression
-  ///   is enabled in the call context, the value passed here takes precedence. Defaults to deferring
-  ///   to the value set on the call context.
-  open func sendResponse(_ message: ResponsePayload,
-                         compression: Compression = .deferToCallDefault) -> EventLoopFuture<Void> {
+  /// - Parameters:
+  ///   - message: The message to send to the client.
+  ///   - compression: Whether compression should be used for this response. If compression
+  ///     is enabled in the call context, the value passed here takes precedence. Defaults to
+  ///     deferring to the value set on the call context.
+  ///   - promise: A promise to complete once the message has been sent.
+  open func sendResponse(
+    _ message: ResponsePayload,
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) {
     fatalError("needs to be overridden")
   }
+
+  /// Send a response to the client.
+  ///
+  /// - Parameters:
+  ///   - message: The message to send to the client.
+  ///   - compression: Whether compression should be used for this response. If compression
+  ///     is enabled in the call context, the value passed here takes precedence. Defaults to
+  ///     deferring to the value set on the call context.
+  open func sendResponse(
+    _ message: ResponsePayload,
+    compression: Compression = .deferToCallDefault
+  ) -> EventLoopFuture<Void> {
+    let promise = self.eventLoop.makePromise(of: Void.self)
+    self.sendResponse(message, compression: compression, promise: promise)
+    return promise.futureResult
+  }
+
+  /// Sends a sequence of responses to the client.
+  /// - Parameters:
+  ///   - messages: The messages to send to the client.
+  ///   - compression: Whether compression should be used for this response. If compression
+  ///     is enabled in the call context, the value passed here takes precedence. Defaults to
+  ///     deferring to the value set on the call context.
+  ///   - promise: A promise to complete once the messages have been sent.
+  open func sendResponses<Messages: Sequence>(
+    _ messages: Messages,
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) where Messages.Element == ResponsePayload {
+    fatalError("needs to be overridden")
+  }
+
+  /// Sends a sequence of responses to the client.
+  /// - Parameters:
+  ///   - messages: The messages to send to the client.
+  ///   - compression: Whether compression should be used for this response. If compression
+  ///     is enabled in the call context, the value passed here takes precedence. Defaults to
+  ///     deferring to the value set on the call context.
+  open func sendResponses<Messages: Sequence>(
+    _ messages: Messages,
+    compression: Compression = .deferToCallDefault
+  ) -> EventLoopFuture<Void> where Messages.Element == ResponsePayload {
+    let promise = self.eventLoop.makePromise(of: Void.self)
+    self.sendResponses(messages, compression: compression, promise: promise)
+    return promise.futureResult
+  }
 }
 
 /// Concrete implementation of `StreamingResponseCallContext` used by our generated code.
@@ -113,13 +164,37 @@ open class StreamingResponseCallContextImpl<ResponsePayload>: StreamingResponseC
 
   override open func sendResponse(
     _ message: ResponsePayload,
-    compression: Compression = .deferToCallDefault
-  ) -> EventLoopFuture<Void> {
-    let messageContext = _MessageContext(
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) {
+    let response = _MessageContext(
       message,
       compressed: compression.isEnabled(callDefault: self.compressionEnabled)
     )
-    return self.channel.writeAndFlush(NIOAny(WrappedResponse.message(messageContext)))
+    self.channel.writeAndFlush(self.wrap(.message(response)), promise: promise)
+  }
+
+  override open func sendResponses<Messages: Sequence>(
+    _ messages: Messages,
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) where ResponsePayload == Messages.Element {
+    let compress = compression.isEnabled(callDefault: self.compressionEnabled)
+
+    var iterator = messages.makeIterator()
+    var next = iterator.next()
+
+    while let current = next {
+      next = iterator.next()
+      // Attach the promise, if present, to the last message.
+      let isLast = next == nil
+      self.channel.write(
+        self.wrap(.message(.init(current, compressed: compress))),
+        promise: isLast ? promise : nil
+      )
+    }
+
+    self.channel.flush()
   }
 }
 
@@ -131,9 +206,19 @@ open class StreamingResponseCallContextTestStub<ResponsePayload>: StreamingRespo
 
   override open func sendResponse(
     _ message: ResponsePayload,
-    compression: Compression = .deferToCallDefault
-  ) -> EventLoopFuture<Void> {
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) {
     self.recordedResponses.append(message)
-    return eventLoop.makeSucceededFuture(())
+    promise?.succeed(())
+  }
+
+  override open func sendResponses<Messages: Sequence>(
+    _ messages: Messages,
+    compression: Compression = .deferToCallDefault,
+    promise: EventLoopPromise<Void>?
+  ) where ResponsePayload == Messages.Element {
+    self.recordedResponses.append(contentsOf: messages)
+    promise?.succeed(())
   }
 }