Browse Source

runClient RPC response in performance worker (#1857)

Motivation:
We need the `runClient` RPC to monitor the performance of the benchmark clients.

Modifications:

- implemented the methods that merge the histograms and responses count
- implemented the method that creates the response for `runClient`
- implemented the `runClient` method

Result:

We can construct a latency histogram that represents the data from all clients and the `runClient` method returns the stats about the clients started by the worker.
Stefana-Ioana Dranca 1 year ago
parent
commit
192a0d37ef

+ 6 - 0
Sources/performance-worker/BenchmarkClient.swift

@@ -48,6 +48,12 @@ struct BenchmarkClient {
     self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
   }
 
+  internal var currentStats: RPCStats {
+    return self.rpcStats.withLockedValue { stats in
+      return stats
+    }
+  }
+
   internal func run() async throws {
     let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: client)
     return try await withThrowingTaskGroup(of: Void.self) { clientGroup in

+ 11 - 1
Sources/performance-worker/RPCStats.swift

@@ -107,7 +107,7 @@ struct RPCStats {
 
     /// Merge two histograms together updating `self`
     /// - parameters:
-    ///    - source: the other histogram to merge into this.
+    ///    - other: the other histogram to merge into this.
     public mutating func merge(_ other: LatencyHistogram) throws {
       guard (self.buckets.count == other.buckets.count) || (self.multiplier == other.multiplier)
       else {
@@ -129,4 +129,14 @@ struct RPCStats {
       }
     }
   }
+
+  @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+  mutating func merge(_ other: RPCStats) throws {
+    try self.latencyHistogram.merge(
+      other.latencyHistogram
+    )
+    self.requestResultCount.merge(other.requestResultCount) { (current, new) in
+      current + new
+    }
+  }
 }

+ 167 - 2
Sources/performance-worker/WorkerService.swift

@@ -48,13 +48,16 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
     struct ClientState {
       var clients: [BenchmarkClient]
       var stats: ClientStats
+      var rpcStats: RPCStats
 
       init(
         clients: [BenchmarkClient],
-        stats: ClientStats
+        stats: ClientStats,
+        rpcStats: RPCStats
       ) {
         self.clients = clients
         self.stats = stats
+        self.rpcStats = rpcStats
       }
 
       func shutdownClients() throws {
@@ -79,6 +82,24 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       }
     }
 
+    var clients: [BenchmarkClient]? {
+      switch self.role {
+      case let .client(clientState):
+        return clientState.clients
+      case .server, .none:
+        return nil
+      }
+    }
+
+    var clientRPCStats: RPCStats? {
+      switch self.role {
+      case let .client(clientState):
+        return clientState.rpcStats
+      case .server, .none:
+        return nil
+      }
+    }
+
     mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
       switch self.role {
       case var .server(serverState):
@@ -120,6 +141,45 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
         self.role = .server(serverState)
       }
     }
+
+    mutating func setupClients(
+      benchmarkClients: [BenchmarkClient],
+      stats: ClientStats,
+      rpcStats: RPCStats
+    ) throws {
+      let clientState = State.ClientState(
+        clients: benchmarkClients,
+        stats: stats,
+        rpcStats: rpcStats
+      )
+      switch self.role {
+      case .server(_):
+        throw RPCError(code: .alreadyExists, message: "This worker has a server setup.")
+
+      case .client(_):
+        throw RPCError(code: .failedPrecondition, message: "Clients have already been set up.")
+
+      case .none:
+        self.role = .client(clientState)
+      }
+    }
+
+    mutating func updateRPCStats() throws {
+      switch self.role {
+      case var .client(clientState):
+        let benchmarkClients = clientState.clients
+        var rpcStats = clientState.rpcStats
+        for benchmarkClient in benchmarkClients {
+          try rpcStats.merge(benchmarkClient.currentStats)
+        }
+
+        clientState.rpcStats = rpcStats
+        self.role = .client(clientState)
+
+      case .server, .none:
+        ()
+      }
+    }
   }
 
   func quitWorker(
@@ -194,7 +254,33 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
   ) async throws
     -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunClient.Output>
   {
-    throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
+    return ServerResponse.Stream { writer in
+      try await withThrowingTaskGroup(of: Void.self) { group in
+        for try await message in request.messages {
+          switch message.argtype {
+          case let .setup(config):
+            // Create the clients with the initial stats.
+            let clients = try await self.setupClients(config)
+
+            for client in clients {
+              group.addTask {
+                try await client.run()
+              }
+            }
+
+          case let .mark(mark):
+            let response = try await self.makeClientStatsResponse(reset: mark.reset)
+            try await writer.write(response)
+
+          case .none:
+            ()
+          }
+        }
+        try await group.waitForAll()
+
+        return [:]
+      }
+    }
   }
 }
 
@@ -237,4 +323,83 @@ extension WorkerService {
       }
     }
   }
+
+  private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
+    var clients = [BenchmarkClient]()
+    for _ in 0 ..< config.clientChannels {
+      let grpcClient = self.makeGRPCClient()
+      clients.append(
+        BenchmarkClient(
+          client: grpcClient,
+          rpcNumber: config.outstandingRpcsPerChannel,
+          rpcType: config.rpcType,
+          histogramParams: config.histogramParams
+        )
+      )
+    }
+    let stats = ClientStats()
+    let histogram = RPCStats.LatencyHistogram(
+      resolution: config.histogramParams.resolution,
+      maxBucketStart: config.histogramParams.maxPossible
+    )
+
+    try self.state.withLockedValue { state in
+      try state.setupClients(
+        benchmarkClients: clients,
+        stats: stats,
+        rpcStats: RPCStats(latencyHistogram: histogram)
+      )
+    }
+
+    return clients
+  }
+
+  func makeGRPCClient() -> GRPCClient {
+    fatalError()
+  }
+
+  private func makeClientStatsResponse(
+    reset: Bool
+  ) async throws -> Grpc_Testing_WorkerService.Method.RunClient.Output {
+    let currentUsageStats = ClientStats()
+    let (initialUsageStats, rpcStats) = try self.state.withLockedValue { state in
+      let initialUsageStats = state.clientStats(replaceWith: reset ? currentUsageStats : nil)
+      try state.updateRPCStats()
+      let rpcStats = state.clientRPCStats
+      return (initialUsageStats, rpcStats)
+    }
+
+    guard let initialUsageStats = initialUsageStats, let rpcStats = rpcStats else {
+      throw RPCError(
+        code: .notFound,
+        message: "There are no initial client stats. Clients must be setup before calling 'mark'."
+      )
+    }
+
+    let differences = currentUsageStats.difference(to: initialUsageStats)
+
+    let requestResults = rpcStats.requestResultCount.map { (key, value) in
+      return Grpc_Testing_RequestResultCount.with {
+        $0.statusCode = Int32(key.rawValue)
+        $0.count = value
+      }
+    }
+
+    return Grpc_Testing_WorkerService.Method.RunClient.Output.with {
+      $0.stats = Grpc_Testing_ClientStats.with {
+        $0.timeElapsed = differences.time
+        $0.timeSystem = differences.systemTime
+        $0.timeUser = differences.userTime
+        $0.requestResults = requestResults
+        $0.latencies = Grpc_Testing_HistogramData.with {
+          $0.bucket = rpcStats.latencyHistogram.buckets
+          $0.minSeen = rpcStats.latencyHistogram.minSeen
+          $0.maxSeen = rpcStats.latencyHistogram.maxSeen
+          $0.sum = rpcStats.latencyHistogram.sum
+          $0.sumOfSquares = rpcStats.latencyHistogram.sumOfSquares
+          $0.count = rpcStats.latencyHistogram.countOfValuesSeen
+        }
+      }
+    }
+  }
 }