Browse Source

BenchmarkClient RPCs implementation (#1861)

Motivation:

When the WorkerService holds and monitors BenchmarkClients, they need to make the requests to the server, as
configured in the config input the WorkerService receives from the Test Driver.

Modifications:

- implemented a helper function that computes the latency and extracts the error code for a RPC
- implemented the body of the makrRPC function that makes one of the 5 possible RPCs

Result:

The BenchmarkClient implementation for performance testing will be completed.
Stefana-Ioana Dranca 1 year ago
parent
commit
bdb7458e45

+ 149 - 14
Sources/performance-worker/BenchmarkClient.swift

@@ -22,17 +22,23 @@ import NIOConcurrencyHelpers
 struct BenchmarkClient {
   private var client: GRPCClient
   private var rpcNumber: Int32
-  private var rpcType: Grpc_Testing_RpcType
+  private var rpcType: RPCType
+  private var messagesPerStream: Int32
+  private var protoParams: Grpc_Testing_SimpleProtoParams
   private let rpcStats: NIOLockedValueBox<RPCStats>
 
   init(
     client: GRPCClient,
     rpcNumber: Int32,
-    rpcType: Grpc_Testing_RpcType,
+    rpcType: RPCType,
+    messagesPerStream: Int32,
+    protoParams: Grpc_Testing_SimpleProtoParams,
     histogramParams: Grpc_Testing_HistogramParams?
   ) {
     self.client = client
     self.rpcNumber = rpcNumber
+    self.messagesPerStream = messagesPerStream
+    self.protoParams = protoParams
     self.rpcType = rpcType
 
     let histogram: RPCStats.LatencyHistogram
@@ -48,6 +54,14 @@ struct BenchmarkClient {
     self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
   }
 
+  enum RPCType {
+    case unary
+    case streaming
+    case streamingFromClient
+    case streamingFromServer
+    case streamingBothWays
+  }
+
   internal var currentStats: RPCStats {
     return self.rpcStats.withLockedValue { stats in
       return stats
@@ -64,7 +78,9 @@ struct BenchmarkClient {
       try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
         for _ in 0 ..< self.rpcNumber {
           rpcsGroup.addTask {
-            let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType)
+            let (latency, errorCode) = try await self.makeRPC(
+              benchmarkClient: benchmarkClient
+            )
             self.rpcStats.withLockedValue {
               $0.latencyHistogram.record(latency)
               if let errorCode = errorCode {
@@ -80,19 +96,138 @@ struct BenchmarkClient {
     }
   }
 
+  private func timeIt<R>(
+    _ body: () async throws -> R
+  ) async rethrows -> (R, nanoseconds: Double) {
+    let startTime = DispatchTime.now().uptimeNanoseconds
+    let result = try await body()
+    let endTime = DispatchTime.now().uptimeNanoseconds
+    return (result, nanoseconds: Double(endTime - startTime))
+  }
+
   // The result is the number of nanoseconds for processing the RPC.
   private func makeRPC(
-    client: Grpc_Testing_BenchmarkServiceClient,
-    rpcType: Grpc_Testing_RpcType
-  ) -> (latency: Double, errorCode: RPCError.Code?) {
-    switch rpcType {
-    case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays,
-      .UNRECOGNIZED:
-      let startTime = DispatchTime.now().uptimeNanoseconds
-      let endTime = DispatchTime.now().uptimeNanoseconds
-      return (
-        latency: Double(endTime - startTime), errorCode: RPCError.Code(.unimplemented)
-      )
+    benchmarkClient: Grpc_Testing_BenchmarkServiceClient
+  ) async throws -> (latency: Double, errorCode: RPCError.Code?) {
+    let message = Grpc_Testing_SimpleRequest.with {
+      $0.responseSize = self.protoParams.respSize
+      $0.payload = Grpc_Testing_Payload.with {
+        $0.body = Data(count: Int(self.protoParams.reqSize))
+      }
+    }
+
+    switch self.rpcType {
+    case .unary:
+      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+        do {
+          try await benchmarkClient.unaryCall(
+            request: ClientRequest.Single(message: message)
+          ) { response in
+            _ = try response.message
+          }
+          return nil
+        } catch let error as RPCError {
+          return error.code
+        } catch {
+          return .unknown
+        }
+      }
+      return (latency: nanoseconds, errorCode)
+
+    // Repeated sequence of one request followed by one response.
+    // It is a ping-pong of messages between the client and the server.
+    case .streaming:
+      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+        do {
+          let ids = AsyncStream.makeStream(of: Int.self)
+          let streamingRequest = ClientRequest.Stream { writer in
+            for try await id in ids.stream {
+              if id <= self.messagesPerStream {
+                try await writer.write(message)
+              } else {
+                return
+              }
+            }
+          }
+
+          ids.continuation.yield(1)
+
+          try await benchmarkClient.streamingCall(request: streamingRequest) { response in
+            var id = 1
+            for try await _ in response.messages {
+              id += 1
+              ids.continuation.yield(id)
+            }
+          }
+          return nil
+        } catch let error as RPCError {
+          return error.code
+        } catch {
+          return .unknown
+        }
+      }
+      return (latency: nanoseconds, errorCode)
+
+    case .streamingFromClient:
+      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+        do {
+          let streamingRequest = ClientRequest.Stream { writer in
+            for _ in 1 ... self.messagesPerStream {
+              try await writer.write(message)
+            }
+          }
+
+          try await benchmarkClient.streamingFromClient(
+            request: streamingRequest
+          ) { response in
+            _ = try response.message
+          }
+          return nil
+        } catch let error as RPCError {
+          return error.code
+        } catch {
+          return .unknown
+        }
+      }
+      return (latency: nanoseconds, errorCode)
+
+    case .streamingFromServer:
+      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+        do {
+          try await benchmarkClient.streamingFromServer(
+            request: ClientRequest.Single(message: message)
+          ) { response in
+            for try await _ in response.messages {}
+          }
+          return nil
+        } catch let error as RPCError {
+          return error.code
+        } catch {
+          return .unknown
+        }
+      }
+      return (latency: nanoseconds, errorCode)
+
+    case .streamingBothWays:
+      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+        do {
+          let streamingRequest = ClientRequest.Stream { writer in
+            for _ in 1 ... self.messagesPerStream {
+              try await writer.write(message)
+            }
+          }
+
+          try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in
+            for try await _ in response.messages {}
+          }
+          return nil
+        } catch let error as RPCError {
+          return error.code
+        } catch {
+          return .unknown
+        }
+      }
+      return (latency: nanoseconds, errorCode)
     }
   }
 

+ 32 - 0
Sources/performance-worker/Internal/AsyncStream+MakeStream.swift

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if swift(<5.9)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension AsyncStream {
+  @inlinable
+  static func makeStream(
+    of elementType: Element.Type = Element.self,
+    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
+  ) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
+    var continuation: AsyncStream<Element>.Continuation!
+    let stream = AsyncStream(Element.self, bufferingPolicy: limit) {
+      continuation = $0
+    }
+    return (stream, continuation)
+  }
+}
+#endif

+ 19 - 1
Sources/performance-worker/WorkerService.swift

@@ -325,6 +325,22 @@ extension WorkerService {
   }
 
   private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
+    let rpcType: BenchmarkClient.RPCType
+    switch config.rpcType {
+    case .unary:
+      rpcType = .unary
+    case .streaming:
+      rpcType = .streaming
+    case .streamingFromClient:
+      rpcType = .streamingFromClient
+    case .streamingFromServer:
+      rpcType = .streamingFromServer
+    case .streamingBothWays:
+      rpcType = .streamingBothWays
+    case .UNRECOGNIZED:
+      throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.")
+    }
+
     var clients = [BenchmarkClient]()
     for _ in 0 ..< config.clientChannels {
       let grpcClient = self.makeGRPCClient()
@@ -332,7 +348,9 @@ extension WorkerService {
         BenchmarkClient(
           client: grpcClient,
           rpcNumber: config.outstandingRpcsPerChannel,
-          rpcType: config.rpcType,
+          rpcType: rpcType,
+          messagesPerStream: config.messagesPerStream,
+          protoParams: config.payloadConfig.simpleParams,
           histogramParams: config.histogramParams
         )
       )