BenchmarkClient.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPCCore
  18. import NIOConcurrencyHelpers
  19. import Synchronization
  20. final class BenchmarkClient: Sendable {
  21. private let _isShuttingDown = Atomic(false)
  22. /// Whether the benchmark client is shutting down. Used to control when to stop sending messages
  23. /// or creating new RPCs.
  24. private var isShuttingDown: Bool {
  25. self._isShuttingDown.load(ordering: .relaxed)
  26. }
  27. /// The underlying client.
  28. private let client: GRPCClient
  29. /// The number of concurrent RPCs to run.
  30. private let concurrentRPCs: Int
  31. /// The type of RPC to make against the server.
  32. private let rpcType: RPCType
  33. /// The max number of messages to send on a stream before replacing the RPC with a new one. A
  34. /// value of zero means there is no limit.
  35. private let messagesPerStream: Int
  36. private var noMessageLimit: Bool { self.messagesPerStream == 0 }
  37. /// The message to send for all RPC types to the server.
  38. private let message: Grpc_Testing_SimpleRequest
  39. /// Per RPC stats.
  40. private let rpcStats: NIOLockedValueBox<RPCStats>
  41. init(
  42. client: GRPCClient,
  43. concurrentRPCs: Int,
  44. rpcType: RPCType,
  45. messagesPerStream: Int,
  46. protoParams: Grpc_Testing_SimpleProtoParams,
  47. histogramParams: Grpc_Testing_HistogramParams?
  48. ) {
  49. self.client = client
  50. self.concurrentRPCs = concurrentRPCs
  51. self.messagesPerStream = messagesPerStream
  52. self.rpcType = rpcType
  53. self.message = .with {
  54. $0.responseSize = protoParams.respSize
  55. $0.payload = Grpc_Testing_Payload.with {
  56. $0.body = Data(count: Int(protoParams.reqSize))
  57. }
  58. }
  59. let histogram: RPCStats.LatencyHistogram
  60. if let histogramParams = histogramParams {
  61. histogram = RPCStats.LatencyHistogram(
  62. resolution: histogramParams.resolution,
  63. maxBucketStart: histogramParams.maxPossible
  64. )
  65. } else {
  66. histogram = RPCStats.LatencyHistogram()
  67. }
  68. self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
  69. }
  70. enum RPCType {
  71. case unary
  72. case streaming
  73. }
  74. internal var currentStats: RPCStats {
  75. return self.rpcStats.withLockedValue { stats in
  76. return stats
  77. }
  78. }
  79. internal func run() async throws {
  80. let benchmarkClient = Grpc_Testing_BenchmarkService.Client(wrapping: self.client)
  81. return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
  82. // Start the client.
  83. clientGroup.addTask {
  84. try await self.client.run()
  85. }
  86. try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
  87. // Start one task for each concurrent RPC and keep looping in that task until indicated
  88. // to stop.
  89. for _ in 0 ..< self.concurrentRPCs {
  90. rpcsGroup.addTask {
  91. while !self.isShuttingDown {
  92. switch self.rpcType {
  93. case .unary:
  94. await self.unary(benchmark: benchmarkClient)
  95. case .streaming:
  96. await self.streaming(benchmark: benchmarkClient)
  97. }
  98. }
  99. }
  100. }
  101. try await rpcsGroup.waitForAll()
  102. }
  103. self.client.beginGracefulShutdown()
  104. try await clientGroup.next()
  105. }
  106. }
  107. private func record(latencyNanos: Double, errorCode: RPCError.Code?) {
  108. self.rpcStats.withLockedValue { stats in
  109. stats.latencyHistogram.record(latencyNanos)
  110. if let errorCode = errorCode {
  111. stats.requestResultCount[errorCode, default: 0] += 1
  112. }
  113. }
  114. }
  115. private func record(errorCode: RPCError.Code) {
  116. self.rpcStats.withLockedValue { stats in
  117. stats.requestResultCount[errorCode, default: 0] += 1
  118. }
  119. }
  120. private func timeIt<R>(
  121. _ body: () async throws -> R
  122. ) async rethrows -> (R, nanoseconds: Double) {
  123. let startTime = DispatchTime.now().uptimeNanoseconds
  124. let result = try await body()
  125. let endTime = DispatchTime.now().uptimeNanoseconds
  126. return (result, nanoseconds: Double(endTime - startTime))
  127. }
  128. private func unary(benchmark: Grpc_Testing_BenchmarkService.Client) async {
  129. let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
  130. do {
  131. try await benchmark.unaryCall(request: ClientRequest(message: self.message)) {
  132. _ = try $0.message
  133. }
  134. return nil
  135. } catch let error as RPCError {
  136. return error.code
  137. } catch {
  138. return .unknown
  139. }
  140. }
  141. self.record(latencyNanos: nanoseconds, errorCode: errorCode)
  142. }
  143. private func streaming(benchmark: Grpc_Testing_BenchmarkService.Client) async {
  144. // Streaming RPCs ping-pong messages back and forth. To achieve this the response message
  145. // stream is sent to the request closure, and the request closure indicates the outcome back
  146. // to the response handler to keep the RPC alive for the appropriate amount of time.
  147. let status = AsyncStream.makeStream(of: RPCError.self)
  148. let response = AsyncStream.makeStream(
  149. of: RPCAsyncSequence<Grpc_Testing_SimpleResponse, any Error>.self
  150. )
  151. let request = StreamingClientRequest(of: Grpc_Testing_SimpleRequest.self) { writer in
  152. defer { status.continuation.finish() }
  153. // The time at which the last message was sent.
  154. var lastMessageSendTime = DispatchTime.now()
  155. try await writer.write(self.message)
  156. // Wait for the response stream.
  157. var iterator = response.stream.makeAsyncIterator()
  158. guard let responses = await iterator.next() else {
  159. throw RPCError(code: .internalError, message: "")
  160. }
  161. // Record the first latency.
  162. let now = DispatchTime.now()
  163. let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
  164. lastMessageSendTime = now
  165. self.record(latencyNanos: Double(nanos), errorCode: nil)
  166. // Now start looping. Only stop when the max messages per stream is hit or told to stop.
  167. var responseIterator = responses.makeAsyncIterator()
  168. var messagesSent = 1
  169. while !self.isShuttingDown && (self.noMessageLimit || messagesSent < self.messagesPerStream) {
  170. messagesSent += 1
  171. do {
  172. if try await responseIterator.next() != nil {
  173. let now = DispatchTime.now()
  174. let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
  175. lastMessageSendTime = now
  176. self.record(latencyNanos: Double(nanos), errorCode: nil)
  177. try await writer.write(self.message)
  178. } else {
  179. break
  180. }
  181. } catch let error as RPCError {
  182. status.continuation.yield(error)
  183. break
  184. } catch {
  185. status.continuation.yield(RPCError(code: .unknown, message: ""))
  186. break
  187. }
  188. }
  189. }
  190. do {
  191. try await benchmark.streamingCall(request: request) {
  192. response.continuation.yield($0.messages)
  193. response.continuation.finish()
  194. for await errorCode in status.stream {
  195. throw errorCode
  196. }
  197. }
  198. } catch let error as RPCError {
  199. self.record(errorCode: error.code)
  200. } catch {
  201. self.record(errorCode: .unknown)
  202. }
  203. }
  204. internal func shutdown() {
  205. self._isShuttingDown.store(true, ordering: .relaxed)
  206. self.client.beginGracefulShutdown()
  207. }
  208. }