Browse Source

Performance worker - client stats for runClient RPC (#1851)

* Performance worker - configuring clients for runClient RPC

Motivation:

The `runClient()` RPC receives a stream of messages that can either request to set up clients based on
some parameters sent in the message or send back stats on the currently running clients.

Modifications:

- implemented the resource usage class for clients similar to the server stats one
- implemented the LatencyHistogram struct
- added a new abstraction layer over the GRPCClient - BenchmarkClient. It stores
parameters for the client extracted from tne input messages and it implements the
`run()` method that starts the client abd makes the given number of RPCs, while
registering the latency for each one, and at the end it creates a histogram containing all the
latencies.

Result:

Configuring the clients for the performance tests is now possible and the latency histogram
gets created.
Stefana-Ioana Dranca 1 year ago
parent
commit
4fc6c25881

+ 96 - 0
Sources/performance-worker/BenchmarkClient.swift

@@ -0,0 +1,96 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Foundation
+import GRPCCore
+import NIOConcurrencyHelpers
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+struct BenchmarkClient {
+  private var client: GRPCClient
+  private var rpcNumber: Int32
+  private var rpcType: Grpc_Testing_RpcType
+  private let rpcStats: NIOLockedValueBox<RPCStats>
+
+  init(
+    client: GRPCClient,
+    rpcNumber: Int32,
+    rpcType: Grpc_Testing_RpcType,
+    histogramParams: Grpc_Testing_HistogramParams?
+  ) {
+    self.client = client
+    self.rpcNumber = rpcNumber
+    self.rpcType = rpcType
+
+    let histogram: RPCStats.LatencyHistogram
+    if let histogramParams = histogramParams {
+      histogram = .init(
+        resolution: histogramParams.resolution,
+        maxBucketStart: histogramParams.maxPossible
+      )
+    } else {
+      histogram = .init()
+    }
+
+    self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
+  }
+
+  internal func run() async throws {
+    let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: client)
+    return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
+      // Start the client.
+      clientGroup.addTask { try await client.run() }
+
+      // Make the requests to the server and register the latency for each one.
+      try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
+        for _ in 0 ..< self.rpcNumber {
+          rpcsGroup.addTask {
+            let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType)
+            self.rpcStats.withLockedValue {
+              $0.latencyHistogram.record(latency)
+              if let errorCode = errorCode {
+                $0.requestResultCount[errorCode, default: 1] += 1
+              }
+            }
+          }
+        }
+        try await rpcsGroup.waitForAll()
+      }
+
+      try await clientGroup.next()
+    }
+  }
+
+  // The result is the number of nanoseconds for processing the RPC.
+  private func makeRPC(
+    client: Grpc_Testing_BenchmarkServiceClient,
+    rpcType: Grpc_Testing_RpcType
+  ) -> (latency: Double, errorCode: RPCError.Code?) {
+    switch rpcType {
+    case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays,
+      .UNRECOGNIZED:
+      let startTime = DispatchTime.now().uptimeNanoseconds
+      let endTime = DispatchTime.now().uptimeNanoseconds
+      return (
+        latency: Double(endTime - startTime), errorCode: RPCError.Code(.unimplemented)
+      )
+    }
+  }
+
+  internal func shutdown() {
+    self.client.close()
+  }
+}

+ 132 - 0
Sources/performance-worker/RPCStats.swift

@@ -0,0 +1,132 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Foundation
+import GRPCCore
+import NIOConcurrencyHelpers
+
+/// Stores the real time latency histogram and error code count dictionary,
+/// for the RPCs made by a particular GRPCClient. It gets updated after
+/// each finished RPC.
+///
+/// The time latency is measured in nanoseconds.
+struct RPCStats {
+  var latencyHistogram: LatencyHistogram
+  var requestResultCount: [RPCError.Code: Int64]
+
+  init(latencyHistogram: LatencyHistogram, requestResultCount: [RPCError.Code: Int64] = [:]) {
+    self.latencyHistogram = latencyHistogram
+    self.requestResultCount = requestResultCount
+  }
+
+  /// Histograms are stored with exponentially increasing bucket sizes.
+  /// The first bucket is [0, `multiplier`) where `multiplier` = 1 + resolution
+  /// Bucket n (n>=1) contains [`multiplier`**n, `multiplier`**(n+1))
+  /// There are sufficient buckets to reach max_bucket_start
+  struct LatencyHistogram {
+    var sum: Double
+    var sumOfSquares: Double
+    var countOfValuesSeen: Double
+    var multiplier: Double
+    var oneOnLogMultiplier: Double
+    var minSeen: Double
+    var maxSeen: Double
+    var maxPossible: Double
+    var buckets: [UInt32]
+
+    /// Initialise a histogram.
+    /// - parameters:
+    ///     - resolution: Defines the width of the buckets - see the description of this structure.
+    ///     - maxBucketStart: Defines the start of the greatest valued bucket.
+    init(resolution: Double = 0.01, maxBucketStart: Double = 60e9) {
+      precondition(resolution > 0.0)
+      precondition(maxBucketStart > resolution)
+      self.sum = 0.0
+      self.sumOfSquares = 0.0
+      self.multiplier = 1.0 + resolution
+      self.oneOnLogMultiplier = 1.0 / log(1.0 + resolution)
+      self.maxPossible = maxBucketStart
+      self.countOfValuesSeen = 0.0
+      self.minSeen = maxBucketStart
+      self.maxSeen = 0.0
+      let numBuckets =
+        LatencyHistogram.uncheckedBucket(
+          forValue: maxBucketStart,
+          oneOnLogMultiplier: self.oneOnLogMultiplier
+        ) + 1
+      precondition(numBuckets > 1)
+      precondition(numBuckets < 100_000_000)
+      self.buckets = .init(repeating: 0, count: numBuckets)
+    }
+
+    struct HistorgramShapeMismatch: Error {}
+
+    /// Determine a bucket index given a value - does no bounds checking
+    private static func uncheckedBucket(forValue value: Double, oneOnLogMultiplier: Double) -> Int {
+      return Int(log(value) * oneOnLogMultiplier)
+    }
+
+    private func bucket(forValue value: Double) -> Int {
+      let bucket = LatencyHistogram.uncheckedBucket(
+        forValue: min(self.maxPossible, max(0, value)),
+        oneOnLogMultiplier: self.oneOnLogMultiplier
+      )
+      assert(bucket < self.buckets.count)
+      assert(bucket >= 0)
+      return bucket
+    }
+
+    /// Add a value to this histogram, updating buckets and stats
+    /// - parameters:
+    ///     - value: The value to add.
+    public mutating func record(_ value: Double) {
+      self.sum += value
+      self.sumOfSquares += value * value
+      self.countOfValuesSeen += 1
+      if value < self.minSeen {
+        self.minSeen = value
+      }
+      if value > self.maxSeen {
+        self.maxSeen = value
+      }
+      self.buckets[self.bucket(forValue: value)] += 1
+    }
+
+    /// Merge two histograms together updating `self`
+    /// - parameters:
+    ///    - source: the other histogram to merge into this.
+    public mutating func merge(_ other: LatencyHistogram) throws {
+      guard (self.buckets.count == other.buckets.count) || (self.multiplier == other.multiplier)
+      else {
+        // Fail because these histograms don't match.
+        throw HistorgramShapeMismatch()
+      }
+
+      self.sum += other.sum
+      self.sumOfSquares += other.sumOfSquares
+      self.countOfValuesSeen += other.countOfValuesSeen
+      if other.minSeen < self.minSeen {
+        self.minSeen = other.minSeen
+      }
+      if other.maxSeen > self.maxSeen {
+        self.maxSeen = other.maxSeen
+      }
+      for bucket in 0 ..< self.buckets.count {
+        self.buckets[bucket] += other.buckets[bucket]
+      }
+    }
+  }
+}

+ 51 - 5
Sources/performance-worker/ServerStats.swift → Sources/performance-worker/ResourceUsage.swift

@@ -34,7 +34,44 @@ private let OUR_RUSAGE_SELF: Int32 = RUSAGE_SELF
 private let OUR_RUSAGE_SELF: Int32 = RUSAGE_SELF.rawValue
 #endif
 
-/// Current server stats.
+/// Client resource usage stats.
+@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
+internal struct ClientStats: Sendable {
+  var time: Double
+  var userTime: Double
+  var systemTime: Double
+
+  init(
+    time: Double,
+    userTime: Double,
+    systemTime: Double
+  ) {
+    self.time = time
+    self.userTime = userTime
+    self.systemTime = systemTime
+  }
+
+  init() {
+    self.time = Double(DispatchTime.now().uptimeNanoseconds) * 1e-9
+    if let usage = System.resourceUsage() {
+      self.userTime = Double(usage.ru_utime.tv_sec) + Double(usage.ru_utime.tv_usec) * 1e-6
+      self.systemTime = Double(usage.ru_stime.tv_sec) + Double(usage.ru_stime.tv_usec) * 1e-6
+    } else {
+      self.userTime = 0
+      self.systemTime = 0
+    }
+  }
+
+  internal func difference(to state: ClientStats) -> ClientStats {
+    return ClientStats(
+      time: self.time - state.time,
+      userTime: self.userTime - state.userTime,
+      systemTime: self.systemTime - state.systemTime
+    )
+  }
+}
+
+/// Server resource usage stats.
 @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
 internal struct ServerStats: Sendable {
   var time: Double
@@ -59,10 +96,7 @@ internal struct ServerStats: Sendable {
 
   init() async throws {
     self.time = Double(DispatchTime.now().uptimeNanoseconds) * 1e-9
-    var usage = rusage()
-    if getrusage(OUR_RUSAGE_SELF, &usage) == 0 {
-      // Adding the seconds with the microseconds transformed into seconds to get the
-      // real number of seconds as a `Double`.
+    if let usage = System.resourceUsage() {
       self.userTime = Double(usage.ru_utime.tv_sec) + Double(usage.ru_utime.tv_usec) * 1e-6
       self.systemTime = Double(usage.ru_stime.tv_sec) + Double(usage.ru_stime.tv_usec) * 1e-6
     } else {
@@ -128,3 +162,15 @@ internal struct ServerStats: Sendable {
     #endif
   }
 }
+
+extension System {
+  fileprivate static func resourceUsage() -> rusage? {
+    var usage = rusage()
+
+    if getrusage(OUR_RUSAGE_SELF, &usage) == 0 {
+      return usage
+    } else {
+      return nil
+    }
+  }
+}

+ 36 - 3
Sources/performance-worker/WorkerService.swift

@@ -31,7 +31,7 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
     var role: Role?
 
     enum Role {
-      case client(GRPCClient)
+      case client(ClientState)
       case server(ServerState)
     }
 
@@ -45,6 +45,25 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       }
     }
 
+    struct ClientState {
+      var clients: [BenchmarkClient]
+      var stats: ClientStats
+
+      init(
+        clients: [BenchmarkClient],
+        stats: ClientStats
+      ) {
+        self.clients = clients
+        self.stats = stats
+      }
+
+      func shutdownClients() throws {
+        for benchmarkClient in self.clients {
+          benchmarkClient.shutdown()
+        }
+      }
+    }
+
     init() {}
 
     init(role: Role) {
@@ -74,6 +93,20 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       }
     }
 
+    mutating func clientStats(replaceWith newStats: ClientStats? = nil) -> ClientStats? {
+      switch self.role {
+      case var .client(clientState):
+        let stats = clientState.stats
+        if let newStats = newStats {
+          clientState.stats = newStats
+          self.role = .client(clientState)
+        }
+        return stats
+      case .server, .none:
+        return nil
+      }
+    }
+
     mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
       let serverState = State.ServerState(server: server, stats: stats)
       switch self.role {
@@ -100,8 +133,8 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
 
     if let role = role {
       switch role {
-      case .client(let client):
-        client.close()
+      case .client(let clientState):
+        try clientState.shutdownClients()
       case .server(let serverState):
         serverState.server.stopListening()
       }