| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- private import Atomics
- private import Foundation
- internal import GRPCCore
- private import NIOConcurrencyHelpers
- @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
- struct BenchmarkClient {
- private let _isShuttingDown = ManagedAtomic(false)
- /// Whether the benchmark client is shutting down. Used to control when to stop sending messages
- /// or creating new RPCs.
- private var isShuttingDown: Bool {
- self._isShuttingDown.load(ordering: .relaxed)
- }
- /// The underlying client.
- private var client: GRPCClient
- /// The number of concurrent RPCs to run.
- private var concurrentRPCs: Int
- /// The type of RPC to make against the server.
- private var rpcType: RPCType
- /// The max number of messages to send on a stream before replacing the RPC with a new one. A
- /// value of zero means there is no limit.
- private var messagesPerStream: Int
- private var noMessageLimit: Bool { self.messagesPerStream == 0 }
- /// The message to send for all RPC types to the server.
- private let message: Grpc_Testing_SimpleRequest
- /// Per RPC stats.
- private let rpcStats: NIOLockedValueBox<RPCStats>
- init(
- client: GRPCClient,
- concurrentRPCs: Int,
- rpcType: RPCType,
- messagesPerStream: Int,
- protoParams: Grpc_Testing_SimpleProtoParams,
- histogramParams: Grpc_Testing_HistogramParams?
- ) {
- self.client = client
- self.concurrentRPCs = concurrentRPCs
- self.messagesPerStream = messagesPerStream
- self.rpcType = rpcType
- self.message = .with {
- $0.responseSize = protoParams.respSize
- $0.payload = Grpc_Testing_Payload.with {
- $0.body = Data(count: Int(protoParams.reqSize))
- }
- }
- let histogram: RPCStats.LatencyHistogram
- if let histogramParams = histogramParams {
- histogram = RPCStats.LatencyHistogram(
- resolution: histogramParams.resolution,
- maxBucketStart: histogramParams.maxPossible
- )
- } else {
- histogram = RPCStats.LatencyHistogram()
- }
- self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
- }
- enum RPCType {
- case unary
- case streaming
- }
- internal var currentStats: RPCStats {
- return self.rpcStats.withLockedValue { stats in
- return stats
- }
- }
- internal func run() async throws {
- let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(wrapping: self.client)
- return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
- // Start the client.
- clientGroup.addTask {
- try await self.client.run()
- }
- try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
- // Start one task for each concurrent RPC and keep looping in that task until indicated
- // to stop.
- for _ in 0 ..< self.concurrentRPCs {
- rpcsGroup.addTask {
- while !self.isShuttingDown {
- switch self.rpcType {
- case .unary:
- await self.unary(benchmark: benchmarkClient)
- case .streaming:
- await self.streaming(benchmark: benchmarkClient)
- }
- }
- }
- }
- try await rpcsGroup.waitForAll()
- }
- self.client.close()
- try await clientGroup.next()
- }
- }
- private func record(latencyNanos: Double, errorCode: RPCError.Code?) {
- self.rpcStats.withLockedValue { stats in
- stats.latencyHistogram.record(latencyNanos)
- if let errorCode = errorCode {
- stats.requestResultCount[errorCode, default: 0] += 1
- }
- }
- }
- private func record(errorCode: RPCError.Code) {
- self.rpcStats.withLockedValue { stats in
- stats.requestResultCount[errorCode, default: 0] += 1
- }
- }
- private func timeIt<R>(
- _ body: () async throws -> R
- ) async rethrows -> (R, nanoseconds: Double) {
- let startTime = DispatchTime.now().uptimeNanoseconds
- let result = try await body()
- let endTime = DispatchTime.now().uptimeNanoseconds
- return (result, nanoseconds: Double(endTime - startTime))
- }
- private func unary(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
- let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
- do {
- try await benchmark.unaryCall(request: ClientRequest.Single(message: self.message)) {
- _ = try $0.message
- }
- return nil
- } catch let error as RPCError {
- return error.code
- } catch {
- return .unknown
- }
- }
- self.record(latencyNanos: nanoseconds, errorCode: errorCode)
- }
- private func streaming(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
- // Streaming RPCs ping-pong messages back and forth. To achieve this the response message
- // stream is sent to the request closure, and the request closure indicates the outcome back
- // to the response handler to keep the RPC alive for the appropriate amount of time.
- let status = AsyncStream.makeStream(of: RPCError.self)
- let response = AsyncStream.makeStream(
- of: RPCAsyncSequence<Grpc_Testing_SimpleResponse, any Error>.self
- )
- let request = ClientRequest.Stream(of: Grpc_Testing_SimpleRequest.self) { writer in
- defer { status.continuation.finish() }
- // The time at which the last message was sent.
- var lastMessageSendTime = DispatchTime.now()
- try await writer.write(self.message)
- // Wait for the response stream.
- var iterator = response.stream.makeAsyncIterator()
- guard let responses = await iterator.next() else {
- throw RPCError(code: .internalError, message: "")
- }
- // Record the first latency.
- let now = DispatchTime.now()
- let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
- lastMessageSendTime = now
- self.record(latencyNanos: Double(nanos), errorCode: nil)
- // Now start looping. Only stop when the max messages per stream is hit or told to stop.
- var responseIterator = responses.makeAsyncIterator()
- var messagesSent = 1
- while !self.isShuttingDown && (self.noMessageLimit || messagesSent < self.messagesPerStream) {
- messagesSent += 1
- do {
- if try await responseIterator.next() != nil {
- let now = DispatchTime.now()
- let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
- lastMessageSendTime = now
- self.record(latencyNanos: Double(nanos), errorCode: nil)
- try await writer.write(message)
- } else {
- break
- }
- } catch let error as RPCError {
- status.continuation.yield(error)
- break
- } catch {
- status.continuation.yield(RPCError(code: .unknown, message: ""))
- break
- }
- }
- }
- do {
- try await benchmark.streamingCall(request: request) {
- response.continuation.yield($0.messages)
- response.continuation.finish()
- for await errorCode in status.stream {
- throw errorCode
- }
- }
- } catch let error as RPCError {
- self.record(errorCode: error.code)
- } catch {
- self.record(errorCode: .unknown)
- }
- }
- internal func shutdown() {
- self._isShuttingDown.store(true, ordering: .relaxed)
- self.client.close()
- }
- }
|