BenchmarkClient.swift 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPCCore
  18. import NIOConcurrencyHelpers
  19. import Synchronization
  20. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  21. final class BenchmarkClient: Sendable {
  22. private let _isShuttingDown = Atomic(false)
  23. /// Whether the benchmark client is shutting down. Used to control when to stop sending messages
  24. /// or creating new RPCs.
  25. private var isShuttingDown: Bool {
  26. self._isShuttingDown.load(ordering: .relaxed)
  27. }
  28. /// The underlying client.
  29. private let client: GRPCClient
  30. /// The number of concurrent RPCs to run.
  31. private let concurrentRPCs: Int
  32. /// The type of RPC to make against the server.
  33. private let rpcType: RPCType
  34. /// The max number of messages to send on a stream before replacing the RPC with a new one. A
  35. /// value of zero means there is no limit.
  36. private let messagesPerStream: Int
  37. private var noMessageLimit: Bool { self.messagesPerStream == 0 }
  38. /// The message to send for all RPC types to the server.
  39. private let message: Grpc_Testing_SimpleRequest
  40. /// Per RPC stats.
  41. private let rpcStats: NIOLockedValueBox<RPCStats>
  42. init(
  43. client: GRPCClient,
  44. concurrentRPCs: Int,
  45. rpcType: RPCType,
  46. messagesPerStream: Int,
  47. protoParams: Grpc_Testing_SimpleProtoParams,
  48. histogramParams: Grpc_Testing_HistogramParams?
  49. ) {
  50. self.client = client
  51. self.concurrentRPCs = concurrentRPCs
  52. self.messagesPerStream = messagesPerStream
  53. self.rpcType = rpcType
  54. self.message = .with {
  55. $0.responseSize = protoParams.respSize
  56. $0.payload = Grpc_Testing_Payload.with {
  57. $0.body = Data(count: Int(protoParams.reqSize))
  58. }
  59. }
  60. let histogram: RPCStats.LatencyHistogram
  61. if let histogramParams = histogramParams {
  62. histogram = RPCStats.LatencyHistogram(
  63. resolution: histogramParams.resolution,
  64. maxBucketStart: histogramParams.maxPossible
  65. )
  66. } else {
  67. histogram = RPCStats.LatencyHistogram()
  68. }
  69. self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
  70. }
  71. enum RPCType {
  72. case unary
  73. case streaming
  74. }
  75. internal var currentStats: RPCStats {
  76. return self.rpcStats.withLockedValue { stats in
  77. return stats
  78. }
  79. }
  80. internal func run() async throws {
  81. let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(wrapping: self.client)
  82. return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
  83. // Start the client.
  84. clientGroup.addTask {
  85. try await self.client.run()
  86. }
  87. try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
  88. // Start one task for each concurrent RPC and keep looping in that task until indicated
  89. // to stop.
  90. for _ in 0 ..< self.concurrentRPCs {
  91. rpcsGroup.addTask {
  92. while !self.isShuttingDown {
  93. switch self.rpcType {
  94. case .unary:
  95. await self.unary(benchmark: benchmarkClient)
  96. case .streaming:
  97. await self.streaming(benchmark: benchmarkClient)
  98. }
  99. }
  100. }
  101. }
  102. try await rpcsGroup.waitForAll()
  103. }
  104. self.client.beginGracefulShutdown()
  105. try await clientGroup.next()
  106. }
  107. }
  108. private func record(latencyNanos: Double, errorCode: RPCError.Code?) {
  109. self.rpcStats.withLockedValue { stats in
  110. stats.latencyHistogram.record(latencyNanos)
  111. if let errorCode = errorCode {
  112. stats.requestResultCount[errorCode, default: 0] += 1
  113. }
  114. }
  115. }
  116. private func record(errorCode: RPCError.Code) {
  117. self.rpcStats.withLockedValue { stats in
  118. stats.requestResultCount[errorCode, default: 0] += 1
  119. }
  120. }
  121. private func timeIt<R>(
  122. _ body: () async throws -> R
  123. ) async rethrows -> (R, nanoseconds: Double) {
  124. let startTime = DispatchTime.now().uptimeNanoseconds
  125. let result = try await body()
  126. let endTime = DispatchTime.now().uptimeNanoseconds
  127. return (result, nanoseconds: Double(endTime - startTime))
  128. }
  129. private func unary(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
  130. let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
  131. do {
  132. try await benchmark.unaryCall(request: ClientRequest.Single(message: self.message)) {
  133. _ = try $0.message
  134. }
  135. return nil
  136. } catch let error as RPCError {
  137. return error.code
  138. } catch {
  139. return .unknown
  140. }
  141. }
  142. self.record(latencyNanos: nanoseconds, errorCode: errorCode)
  143. }
  144. private func streaming(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
  145. // Streaming RPCs ping-pong messages back and forth. To achieve this the response message
  146. // stream is sent to the request closure, and the request closure indicates the outcome back
  147. // to the response handler to keep the RPC alive for the appropriate amount of time.
  148. let status = AsyncStream.makeStream(of: RPCError.self)
  149. let response = AsyncStream.makeStream(
  150. of: RPCAsyncSequence<Grpc_Testing_SimpleResponse, any Error>.self
  151. )
  152. let request = ClientRequest.Stream(of: Grpc_Testing_SimpleRequest.self) { writer in
  153. defer { status.continuation.finish() }
  154. // The time at which the last message was sent.
  155. var lastMessageSendTime = DispatchTime.now()
  156. try await writer.write(self.message)
  157. // Wait for the response stream.
  158. var iterator = response.stream.makeAsyncIterator()
  159. guard let responses = await iterator.next() else {
  160. throw RPCError(code: .internalError, message: "")
  161. }
  162. // Record the first latency.
  163. let now = DispatchTime.now()
  164. let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
  165. lastMessageSendTime = now
  166. self.record(latencyNanos: Double(nanos), errorCode: nil)
  167. // Now start looping. Only stop when the max messages per stream is hit or told to stop.
  168. var responseIterator = responses.makeAsyncIterator()
  169. var messagesSent = 1
  170. while !self.isShuttingDown && (self.noMessageLimit || messagesSent < self.messagesPerStream) {
  171. messagesSent += 1
  172. do {
  173. if try await responseIterator.next() != nil {
  174. let now = DispatchTime.now()
  175. let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
  176. lastMessageSendTime = now
  177. self.record(latencyNanos: Double(nanos), errorCode: nil)
  178. try await writer.write(self.message)
  179. } else {
  180. break
  181. }
  182. } catch let error as RPCError {
  183. status.continuation.yield(error)
  184. break
  185. } catch {
  186. status.continuation.yield(RPCError(code: .unknown, message: ""))
  187. break
  188. }
  189. }
  190. }
  191. do {
  192. try await benchmark.streamingCall(request: request) {
  193. response.continuation.yield($0.messages)
  194. response.continuation.finish()
  195. for await errorCode in status.stream {
  196. throw errorCode
  197. }
  198. }
  199. } catch let error as RPCError {
  200. self.record(errorCode: error.code)
  201. } catch {
  202. self.record(errorCode: .unknown)
  203. }
  204. }
  205. internal func shutdown() {
  206. self._isShuttingDown.store(true, ordering: .relaxed)
  207. self.client.beginGracefulShutdown()
  208. }
  209. }