Bladeren bron

Add a handful more perf tests. (#1096)

Motivation:

Our embedded server perf tests only run unary RPCs. We should test a
broader range of scenarios. Additionally, the echo service
implementation used by the benchmarks prefixes the request text, when
using large requests this operation can dominate and introduces
unnecessary noise.

Modifications:

- Add a 'MinimalEchoProvider' which doesn't prepend the request text but
  is otherwise very similar to the normal 'EchoProvider'.
- Add benchmarks for:
  - client streaming:
    - 1 RPCs with 10k requests
    - 10k RPCs with 1 request
  - server streaming:
    - 1 RPCs with 10k responses
    - 10k RPCs with 1 response
  - bidirectional streaming:
    - 1 RPCs with 10k requests and responses
    - 10k RPCs with 1 requests and responses

Result:

More insight into perf.
George Barnett 4 jaren geleden
bovenliggende
commit
9f7a5d5df0

+ 0 - 1
Package.swift

@@ -133,7 +133,6 @@ let package = Package(
       dependencies: [
         "GRPC",
         "EchoModel",
-        "EchoImplementation",
         "NIO",
         "NIOSSL",
       ]

+ 91 - 29
Sources/GRPCPerformanceTests/Benchmarks/EmbeddedServer.swift

@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import EchoImplementation
 import EchoModel
 import GRPC
 import Logging
@@ -21,25 +20,60 @@ import NIO
 import NIOHPACK
 import NIOHTTP2
 
-final class EmbeddedServerUnaryBenchmark: Benchmark {
-  private let count: Int
+final class EmbeddedServerChildChannelBenchmark: Benchmark {
   private let text: String
   private let providers: [Substring: CallHandlerProvider]
   private let logger: Logger
+  private let mode: Mode
 
-  static let headersPayload = HTTP2Frame.FramePayload.headers(.init(headers: [
-    ":path": "/echo.Echo/Get",
-    ":method": "POST",
-    "content-type": "application/grpc",
-  ]))
+  enum Mode {
+    case unary(rpcs: Int)
+    case clientStreaming(rpcs: Int, requestsPerRPC: Int)
+    case serverStreaming(rpcs: Int, responsesPerRPC: Int)
+    case bidirectional(rpcs: Int, requestsPerRPC: Int)
 
+    var method: String {
+      switch self {
+      case .unary:
+        return "Get"
+      case .clientStreaming:
+        return "Collect"
+      case .serverStreaming:
+        return "Expand"
+      case .bidirectional:
+        return "Update"
+      }
+    }
+  }
+
+  static func makeHeadersPayload(method: String) -> HTTP2Frame.FramePayload {
+    return .headers(.init(headers: [
+      ":path": "/echo.Echo/\(method)",
+      ":method": "POST",
+      "content-type": "application/grpc",
+    ]))
+  }
+
+  private var headersPayload: HTTP2Frame.FramePayload!
   private var requestPayload: HTTP2Frame.FramePayload!
+  private var requestPayloadWithEndStream: HTTP2Frame.FramePayload!
+
+  private func makeChannel() throws -> EmbeddedChannel {
+    let channel = EmbeddedChannel()
+    try channel._configureForEmbeddedServerTest(
+      servicesByName: self.providers,
+      encoding: .disabled,
+      normalizeHeaders: true,
+      logger: self.logger
+    ).wait()
+    return channel
+  }
 
-  init(count: Int, text: String) {
-    self.count = count
+  init(mode: Mode, text: String) {
+    self.mode = mode
     self.text = text
 
-    let echo = EchoProvider()
+    let echo = MinimalEchoProvider()
     self.providers = [echo.serviceName: echo]
     self.logger = Logger(label: "noop") { _ in
       SwiftLogNoOpLogHandler()
@@ -48,33 +82,61 @@ final class EmbeddedServerUnaryBenchmark: Benchmark {
 
   func setUp() throws {
     var buffer = ByteBuffer()
-    let serialized = try Echo_EchoRequest.with { $0.text = self.text }.serializedData()
+    let requestText: String
+
+    switch self.mode {
+    case .unary, .clientStreaming, .bidirectional:
+      requestText = self.text
+    case let .serverStreaming(_, responsesPerRPC):
+      // For server streaming the request is split on spaces. We'll build up a request based on text
+      // and the number of responses we want.
+      var text = String()
+      text.reserveCapacity((self.text.count + 1) * responsesPerRPC)
+      for _ in 0 ..< responsesPerRPC {
+        text.append(self.text)
+        text.append(" ")
+      }
+      requestText = text
+    }
+
+    let serialized = try Echo_EchoRequest.with { $0.text = requestText }.serializedData()
     buffer.reserveCapacity(5 + serialized.count)
     buffer.writeInteger(UInt8(0)) // not compressed
     buffer.writeInteger(UInt32(serialized.count)) // length
     buffer.writeData(serialized)
-    self.requestPayload = .data(.init(data: .byteBuffer(buffer), endStream: true))
+
+    self.requestPayload = .data(.init(data: .byteBuffer(buffer), endStream: false))
+    self.requestPayloadWithEndStream = .data(.init(data: .byteBuffer(buffer), endStream: true))
+    self.headersPayload = Self.makeHeadersPayload(method: self.mode.method)
   }
 
   func tearDown() throws {}
 
   func run() throws {
-    for _ in 0 ..< self.count {
-      let channel = EmbeddedChannel()
-      try channel._configureForEmbeddedServerTest(
-        servicesByName: self.providers,
-        encoding: .disabled,
-        normalizeHeaders: true,
-        logger: self.logger
-      ).wait()
-
-      try channel.writeInbound(Self.headersPayload)
-      try channel.writeInbound(self.requestPayload)
-
-      // headers, data, trailers
-      _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
-      _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
-      _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
+    switch self.mode {
+    case let .unary(rpcs):
+      try self.run(rpcs: rpcs, requestsPerRPC: 1)
+    case let .clientStreaming(rpcs, requestsPerRPC):
+      try self.run(rpcs: rpcs, requestsPerRPC: requestsPerRPC)
+    case let .serverStreaming(rpcs, _):
+      try self.run(rpcs: rpcs, requestsPerRPC: 1)
+    case let .bidirectional(rpcs, requestsPerRPC):
+      try self.run(rpcs: rpcs, requestsPerRPC: requestsPerRPC)
+    }
+  }
+
+  func run(rpcs: Int, requestsPerRPC: Int) throws {
+    for _ in 0 ..< rpcs {
+      let channel = try self.makeChannel()
+      try channel.writeInbound(self.headersPayload)
+      for _ in 0 ..< (requestsPerRPC - 1) {
+        try channel.writeInbound(self.requestPayload)
+      }
+      try channel.writeInbound(self.requestPayloadWithEndStream)
+
+      while try channel.readOutbound(as: HTTP2Frame.FramePayload.self) != nil {
+        ()
+      }
     }
   }
 }

+ 73 - 0
Sources/GRPCPerformanceTests/Benchmarks/MinimalEchoProvider.swift

@@ -0,0 +1,73 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import EchoModel
+import GRPC
+import NIO
+
+/// The echo provider that comes with the example does some string processing, we'll avoid some of
+/// that here so we're looking at the right things.
+public class MinimalEchoProvider: Echo_EchoProvider {
+  public let interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil
+
+  public func get(
+    request: Echo_EchoRequest,
+    context: StatusOnlyCallContext
+  ) -> EventLoopFuture<Echo_EchoResponse> {
+    return context.eventLoop.makeSucceededFuture(.with { $0.text = request.text })
+  }
+
+  public func expand(
+    request: Echo_EchoRequest,
+    context: StreamingResponseCallContext<Echo_EchoResponse>
+  ) -> EventLoopFuture<GRPCStatus> {
+    for part in request.text.utf8.split(separator: UInt8(ascii: " ")) {
+      context.sendResponse(.with { $0.text = String(part)! }, promise: nil)
+    }
+    return context.eventLoop.makeSucceededFuture(.ok)
+  }
+
+  public func collect(
+    context: UnaryResponseCallContext<Echo_EchoResponse>
+  ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
+    var parts: [String] = []
+
+    func onEvent(_ event: StreamEvent<Echo_EchoRequest>) {
+      switch event {
+      case let .message(request):
+        parts.append(request.text)
+      case .end:
+        context.responsePromise.succeed(.with { $0.text = parts.joined(separator: " ") })
+      }
+    }
+
+    return context.eventLoop.makeSucceededFuture(onEvent(_:))
+  }
+
+  public func update(
+    context: StreamingResponseCallContext<Echo_EchoResponse>
+  ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
+    func onEvent(_ event: StreamEvent<Echo_EchoRequest>) {
+      switch event {
+      case let .message(request):
+        context.sendResponse(.with { $0.text = request.text }, promise: nil)
+      case .end:
+        context.statusPromise.succeed(.ok)
+      }
+    }
+
+    return context.eventLoop.makeSucceededFuture(onEvent(_:))
+  }
+}

+ 1 - 2
Sources/GRPCPerformanceTests/Benchmarks/UnaryThroughput.swift

@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import EchoImplementation
 import EchoModel
 import Foundation
 import GRPC
@@ -33,7 +32,7 @@ class Unary: ServerProvidingBenchmark {
   init(requests: Int, text: String) {
     self.requestCount = requests
     self.requestText = text
-    super.init(providers: [EchoProvider()])
+    super.init(providers: [MinimalEchoProvider()])
   }
 
   override func setUp() throws {

+ 61 - 5
Sources/GRPCPerformanceTests/main.swift

@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import EchoImplementation
 import EchoModel
 import Foundation
 import GRPC
@@ -21,11 +20,11 @@ import Logging
 import NIO
 import NIOSSL
 
+let smallRequest = String(repeating: "x", count: 8)
+let largeRequest = String(repeating: "x", count: 1 << 16) // 65k
+
 // Add benchmarks here!
 func runBenchmarks(spec: TestSpec) {
-  let smallRequest = String(repeating: "x", count: 8)
-  let largeRequest = String(repeating: "x", count: 1 << 16) // 65k
-
   measureAndPrint(
     description: "unary_10k_small_requests",
     benchmark: Unary(requests: 10000, text: smallRequest),
@@ -80,7 +79,64 @@ func runBenchmarks(spec: TestSpec) {
 
   measureAndPrint(
     description: "embedded_server_unary_10k_small_requests",
-    benchmark: EmbeddedServerUnaryBenchmark(count: 10000, text: smallRequest),
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .unary(rpcs: 10000),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_client_streaming_1_rpc_10k_small_requests",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .clientStreaming(rpcs: 1, requestsPerRPC: 10000),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_client_streaming_10k_rpcs_1_small_requests",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .clientStreaming(rpcs: 10000, requestsPerRPC: 1),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_server_streaming_1_rpc_10k_small_responses",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .serverStreaming(rpcs: 1, responsesPerRPC: 10000),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_server_streaming_10k_rpcs_1_small_response",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .serverStreaming(rpcs: 10000, responsesPerRPC: 1),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_bidi_1_rpc_10k_small_requests",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .bidirectional(rpcs: 1, requestsPerRPC: 10000),
+      text: smallRequest
+    ),
+    spec: spec
+  )
+
+  measureAndPrint(
+    description: "embedded_server_bidi_10k_rpcs_1_small_request",
+    benchmark: EmbeddedServerChildChannelBenchmark(
+      mode: .bidirectional(rpcs: 10000, requestsPerRPC: 1),
+      text: smallRequest
+    ),
     spec: spec
   )