AsyncClient.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import BenchmarkUtils
  17. import Foundation
  18. import GRPC
  19. import Logging
  20. import NIO
  21. /// Client to make a series of asynchronous calls.
  22. final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
  23. private let eventLoopGroup: MultiThreadedEventLoopGroup
  24. private let threadCount: Int
  25. private let logger = Logger(label: "AsyncQPSClient")
  26. private let channelRepeaters: [ChannelRepeater]
  27. private var statsPeriodStart: DispatchTime
  28. private var cpuStatsPeriodStart: CPUTime
  29. /// Initialise a client to send requests.
  30. /// - parameters:
  31. /// - config: Config from the driver specifying how the client should behave.
  32. init(config: Grpc_Testing_ClientConfig) throws {
  33. // Parse possible invalid targets before code with side effects.
  34. let serverTargets = try config.parsedServerTargets()
  35. precondition(serverTargets.count > 0)
  36. // Setup threads
  37. let threadCount = config.threadsToUse()
  38. self.threadCount = threadCount
  39. self.logger.info("Sizing AsyncQPSClient", metadata: ["threads": "\(threadCount)"])
  40. let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: threadCount)
  41. self.eventLoopGroup = eventLoopGroup
  42. // Start recording stats.
  43. self.statsPeriodStart = grpcTimeNow()
  44. self.cpuStatsPeriodStart = getResourceUsage()
  45. let requestMessage = try AsyncQPSClient
  46. .makeClientRequest(payloadConfig: config.payloadConfig)
  47. // Start the requested number of channels.
  48. self.channelRepeaters = (0 ..< Int(config.clientChannels)).map { channelNumber in
  49. ChannelRepeater(
  50. target: serverTargets[channelNumber % serverTargets.count],
  51. requestMessage: requestMessage,
  52. config: config,
  53. eventLoopGroup: eventLoopGroup
  54. )
  55. }
  56. // Start the train.
  57. for channelRepeater in self.channelRepeaters {
  58. channelRepeater.start()
  59. }
  60. }
  61. /// Send current status back to the driver process.
  62. /// - parameters:
  63. /// - reset: Should the stats reset after being sent.
  64. /// - context: Calling context to allow results to be sent back to the driver.
  65. func sendStatus(reset: Bool, context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>) {
  66. let currentTime = grpcTimeNow()
  67. let currentResourceUsage = getResourceUsage()
  68. var result = Grpc_Testing_ClientStatus()
  69. result.stats.timeElapsed = (currentTime - self.statsPeriodStart).asSeconds()
  70. result.stats.timeSystem = currentResourceUsage.systemTime - self.cpuStatsPeriodStart
  71. .systemTime
  72. result.stats.timeUser = currentResourceUsage.userTime - self.cpuStatsPeriodStart.userTime
  73. result.stats.cqPollCount = 0
  74. // Collect stats from each of the channels.
  75. var latencyHistogram = Histogram()
  76. var statusCounts = StatusCounts()
  77. for channelRepeater in self.channelRepeaters {
  78. let stats = channelRepeater.getStats(reset: reset)
  79. try! latencyHistogram.merge(source: stats.latencies)
  80. statusCounts.merge(source: stats.statuses)
  81. }
  82. result.stats.latencies = Grpc_Testing_HistogramData(from: latencyHistogram)
  83. result.stats.requestResults = statusCounts.toRequestResultCounts()
  84. self.logger.info("Sending client status")
  85. _ = context.sendResponse(result)
  86. if reset {
  87. self.statsPeriodStart = currentTime
  88. self.cpuStatsPeriodStart = currentResourceUsage
  89. }
  90. }
  91. /// Shutdown the service.
  92. /// - parameters:
  93. /// - callbackLoop: Which eventloop should be called back on completion.
  94. /// - returns: A future on the `callbackLoop` which will succeed on completion of shutdown.
  95. func shutdown(callbackLoop: EventLoop) -> EventLoopFuture<Void> {
  96. let stoppedFutures = self.channelRepeaters.map { repeater in repeater.stop() }
  97. let allStopped = EventLoopFuture.andAllComplete(stoppedFutures, on: callbackLoop)
  98. return allStopped.flatMap { _ in
  99. let promise: EventLoopPromise<Void> = callbackLoop.makePromise()
  100. self.eventLoopGroup.shutdownGracefully { error in
  101. if let error = error {
  102. promise.fail(error)
  103. } else {
  104. promise.succeed(())
  105. }
  106. }
  107. return promise.futureResult
  108. }
  109. }
  110. /// Make a request which can be sent to the server.
  111. private static func makeClientRequest(payloadConfig: Grpc_Testing_PayloadConfig) throws
  112. -> Grpc_Testing_SimpleRequest {
  113. if let payload = payloadConfig.payload {
  114. switch payload {
  115. case .bytebufParams:
  116. throw GRPCStatus(code: .invalidArgument, message: "Byte buffer not supported.")
  117. case let .simpleParams(simpleParams):
  118. var result = Grpc_Testing_SimpleRequest()
  119. result.responseType = .compressable
  120. result.responseSize = simpleParams.respSize
  121. result.payload.type = .compressable
  122. let size = Int(simpleParams.reqSize)
  123. let body = Data(count: size)
  124. result.payload.body = body
  125. return result
  126. case .complexParams:
  127. throw GRPCStatus(
  128. code: .invalidArgument,
  129. message: "Complex params not supported."
  130. )
  131. }
  132. } else {
  133. // Default - simple proto without payloads.
  134. var result = Grpc_Testing_SimpleRequest()
  135. result.responseType = .compressable
  136. result.responseSize = 0
  137. result.payload.type = .compressable
  138. return result
  139. }
  140. }
  141. /// Class to manage a channel. Repeatedly makes requests on that channel and records what happens.
  142. private class ChannelRepeater {
  143. private let connection: ClientConnection
  144. private let maxPermittedOutstandingRequests: Int
  145. private let stats: StatsWithLock
  146. /// Has a stop been requested - if it has don't submit any more
  147. /// requests and when all existing requests are complete signal
  148. /// using `stopComplete`
  149. private var stopRequested = false
  150. /// Succeeds after a stop has been requested and all outstanding requests have completed.
  151. private var stopComplete: EventLoopPromise<Void>
  152. private var numberOfOutstandingRequests = 0
  153. private var requestMaker: RequestMakerType
  154. init(target: HostAndPort,
  155. requestMessage: Grpc_Testing_SimpleRequest,
  156. config: Grpc_Testing_ClientConfig,
  157. eventLoopGroup: EventLoopGroup) {
  158. // TODO: Support TLS if requested.
  159. self.connection = ClientConnection.insecure(group: eventLoopGroup)
  160. .connect(host: target.host, port: target.port)
  161. let logger = Logger(label: "ChannelRepeater")
  162. let client = Grpc_Testing_BenchmarkServiceClient(channel: self.connection)
  163. self.maxPermittedOutstandingRequests = Int(config.outstandingRpcsPerChannel)
  164. self.stopComplete = self.connection.eventLoop.makePromise()
  165. self.stats = StatsWithLock()
  166. self.requestMaker = RequestMakerType(
  167. config: config,
  168. client: client,
  169. requestMessage: requestMessage,
  170. logger: logger,
  171. stats: self.stats
  172. )
  173. }
  174. /// Launch as many requests as allowed on the channel.
  175. /// This must be called from the connection eventLoop.
  176. private func launchRequests() {
  177. self.connection.eventLoop.preconditionInEventLoop()
  178. while self.canMakeRequest {
  179. self.makeRequestAndRepeat()
  180. }
  181. }
  182. /// Returns if it is permissible to make another request - ie we've not been asked to stop, and we're not at the limit of outstanding requests.
  183. private var canMakeRequest: Bool {
  184. self.connection.eventLoop.assertInEventLoop()
  185. return !self.stopRequested
  186. && self.numberOfOutstandingRequests < self.maxPermittedOutstandingRequests
  187. }
  188. /// If there is spare permitted capacity make a request and repeat when it is done.
  189. private func makeRequestAndRepeat() {
  190. self.connection.eventLoop.preconditionInEventLoop()
  191. // Check for capacity.
  192. if !self.canMakeRequest {
  193. return
  194. }
  195. self.numberOfOutstandingRequests += 1
  196. let resultStatus = self.requestMaker.makeRequest()
  197. // Wait for the request to complete.
  198. resultStatus.whenSuccess { status in
  199. self.requestCompleted(status: status)
  200. }
  201. }
  202. /// Call when a request has completed.
  203. /// Records stats and attempts to make more requests if there is available capacity.
  204. private func requestCompleted(status: GRPCStatus) {
  205. self.connection.eventLoop.preconditionInEventLoop()
  206. self.numberOfOutstandingRequests -= 1
  207. if self.stopRequested, self.numberOfOutstandingRequests == 0 {
  208. self.stopIsComplete()
  209. } else {
  210. // Try scheduling another request.
  211. self.makeRequestAndRepeat()
  212. }
  213. }
  214. /// Get stats for sending to the driver.
  215. /// - parameters:
  216. /// - reset: Should the stats reset after copying.
  217. /// - returns: The statistics for this channel.
  218. func getStats(reset: Bool) -> Stats {
  219. return self.stats.copyData(reset: reset)
  220. }
  221. /// Start sending requests to the server.
  222. func start() {
  223. if self.connection.eventLoop.inEventLoop {
  224. self.launchRequests()
  225. } else {
  226. self.connection.eventLoop.execute {
  227. self.launchRequests()
  228. }
  229. }
  230. }
  231. private func stopIsComplete() {
  232. assert(self.stopRequested)
  233. assert(self.numberOfOutstandingRequests == 0)
  234. // Close the connection then signal done.
  235. self.connection.close().cascade(to: self.stopComplete)
  236. }
  237. /// Stop sending requests to the server.
  238. /// - returns: A future which can be waited on to signal when all activity has ceased.
  239. func stop() -> EventLoopFuture<Void> {
  240. self.connection.eventLoop.execute {
  241. self.stopRequested = true
  242. self.requestMaker.requestStop()
  243. if self.numberOfOutstandingRequests == 0 {
  244. self.stopIsComplete()
  245. }
  246. }
  247. return self.stopComplete.futureResult
  248. }
  249. }
  250. }
  251. /// Create an asynchronous client of the requested type.
  252. /// - parameters:
  253. /// - config: Description of the client required.
  254. /// - returns: The client created.
  255. func makeAsyncClient(config: Grpc_Testing_ClientConfig) throws -> QPSClient {
  256. switch config.rpcType {
  257. case .unary:
  258. return try AsyncQPSClient<AsyncUnaryRequestMaker>(config: config)
  259. case .streaming:
  260. return try AsyncQPSClient<AsyncPingPongRequestMaker>(config: config)
  261. case .streamingFromClient:
  262. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  263. case .streamingFromServer:
  264. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  265. case .streamingBothWays:
  266. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  267. case .UNRECOGNIZED:
  268. throw GRPCStatus(code: .invalidArgument, message: "Unrecognised client rpc type")
  269. }
  270. }