WorkerService.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  20. final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable {
  21. private let state: NIOLockedValueBox<State>
  22. init() {
  23. let clientAndServer = State()
  24. self.state = NIOLockedValueBox(clientAndServer)
  25. }
  26. private struct State {
  27. var role: Role?
  28. enum Role {
  29. case client(ClientState)
  30. case server(ServerState)
  31. }
  32. struct ServerState {
  33. var server: GRPCServer
  34. var stats: ServerStats
  35. init(server: GRPCServer, stats: ServerStats) {
  36. self.server = server
  37. self.stats = stats
  38. }
  39. }
  40. struct ClientState {
  41. var clients: [BenchmarkClient]
  42. var stats: ClientStats
  43. var rpcStats: RPCStats
  44. init(
  45. clients: [BenchmarkClient],
  46. stats: ClientStats,
  47. rpcStats: RPCStats
  48. ) {
  49. self.clients = clients
  50. self.stats = stats
  51. self.rpcStats = rpcStats
  52. }
  53. func shutdownClients() throws {
  54. for benchmarkClient in self.clients {
  55. benchmarkClient.shutdown()
  56. }
  57. }
  58. }
  59. init() {}
  60. init(role: Role) {
  61. self.role = role
  62. }
  63. var server: GRPCServer? {
  64. switch self.role {
  65. case let .server(serverState):
  66. return serverState.server
  67. case .client, .none:
  68. return nil
  69. }
  70. }
  71. var clients: [BenchmarkClient]? {
  72. switch self.role {
  73. case let .client(clientState):
  74. return clientState.clients
  75. case .server, .none:
  76. return nil
  77. }
  78. }
  79. var clientRPCStats: RPCStats? {
  80. switch self.role {
  81. case let .client(clientState):
  82. return clientState.rpcStats
  83. case .server, .none:
  84. return nil
  85. }
  86. }
  87. mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
  88. switch self.role {
  89. case var .server(serverState):
  90. let stats = serverState.stats
  91. if let newStats = newStats {
  92. serverState.stats = newStats
  93. self.role = .server(serverState)
  94. }
  95. return stats
  96. case .client, .none:
  97. return nil
  98. }
  99. }
  100. mutating func clientStats(replaceWith newStats: ClientStats? = nil) -> ClientStats? {
  101. switch self.role {
  102. case var .client(clientState):
  103. let stats = clientState.stats
  104. if let newStats = newStats {
  105. clientState.stats = newStats
  106. self.role = .client(clientState)
  107. }
  108. return stats
  109. case .server, .none:
  110. return nil
  111. }
  112. }
  113. mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
  114. let serverState = State.ServerState(server: server, stats: stats)
  115. switch self.role {
  116. case .server(_):
  117. throw RPCError(code: .alreadyExists, message: "A server has already been set up.")
  118. case .client(_):
  119. throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
  120. case .none:
  121. self.role = .server(serverState)
  122. }
  123. }
  124. mutating func setupClients(
  125. benchmarkClients: [BenchmarkClient],
  126. stats: ClientStats,
  127. rpcStats: RPCStats
  128. ) throws {
  129. let clientState = State.ClientState(
  130. clients: benchmarkClients,
  131. stats: stats,
  132. rpcStats: rpcStats
  133. )
  134. switch self.role {
  135. case .server(_):
  136. throw RPCError(code: .alreadyExists, message: "This worker has a server setup.")
  137. case .client(_):
  138. throw RPCError(code: .failedPrecondition, message: "Clients have already been set up.")
  139. case .none:
  140. self.role = .client(clientState)
  141. }
  142. }
  143. mutating func updateRPCStats() throws {
  144. switch self.role {
  145. case var .client(clientState):
  146. let benchmarkClients = clientState.clients
  147. var rpcStats = clientState.rpcStats
  148. for benchmarkClient in benchmarkClients {
  149. try rpcStats.merge(benchmarkClient.currentStats)
  150. }
  151. clientState.rpcStats = rpcStats
  152. self.role = .client(clientState)
  153. case .server, .none:
  154. ()
  155. }
  156. }
  157. }
  158. func quitWorker(
  159. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Input>
  160. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Output> {
  161. let role = self.state.withLockedValue { state in
  162. defer { state.role = nil }
  163. return state.role
  164. }
  165. if let role = role {
  166. switch role {
  167. case .client(let clientState):
  168. try clientState.shutdownClients()
  169. case .server(let serverState):
  170. serverState.server.stopListening()
  171. }
  172. }
  173. return ServerResponse.Single(message: Grpc_Testing_WorkerService.Method.QuitWorker.Output())
  174. }
  175. func coreCount(
  176. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.CoreCount.Input>
  177. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.CoreCount.Output> {
  178. let coreCount = System.coreCount
  179. return ServerResponse.Single(
  180. message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
  181. $0.cores = Int32(coreCount)
  182. }
  183. )
  184. }
  185. func runServer(
  186. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunServer.Input>
  187. ) async throws
  188. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
  189. {
  190. return ServerResponse.Stream { writer in
  191. try await withThrowingTaskGroup(of: Void.self) { group in
  192. for try await message in request.messages {
  193. switch message.argtype {
  194. case let .some(.setup(serverConfig)):
  195. let server = try await self.setupServer(serverConfig)
  196. group.addTask { try await server.run() }
  197. case let .some(.mark(mark)):
  198. let response = try await self.makeServerStatsResponse(reset: mark.reset)
  199. try await writer.write(response)
  200. case .none:
  201. ()
  202. }
  203. }
  204. try await group.next()
  205. }
  206. let server = self.state.withLockedValue { state in
  207. defer { state.role = nil }
  208. return state.server
  209. }
  210. server?.stopListening()
  211. return [:]
  212. }
  213. }
  214. func runClient(
  215. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunClient.Input>
  216. ) async throws
  217. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunClient.Output>
  218. {
  219. return ServerResponse.Stream { writer in
  220. try await withThrowingTaskGroup(of: Void.self) { group in
  221. for try await message in request.messages {
  222. switch message.argtype {
  223. case let .setup(config):
  224. // Create the clients with the initial stats.
  225. let clients = try await self.setupClients(config)
  226. for client in clients {
  227. group.addTask {
  228. try await client.run()
  229. }
  230. }
  231. case let .mark(mark):
  232. let response = try await self.makeClientStatsResponse(reset: mark.reset)
  233. try await writer.write(response)
  234. case .none:
  235. ()
  236. }
  237. }
  238. try await group.waitForAll()
  239. return [:]
  240. }
  241. }
  242. }
  243. }
  244. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  245. extension WorkerService {
  246. private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
  247. let server = GRPCServer(transports: [], services: [BenchmarkService()])
  248. let stats = try await ServerStats()
  249. try self.state.withLockedValue { state in
  250. try state.setupServer(server: server, stats: stats)
  251. }
  252. return server
  253. }
  254. private func makeServerStatsResponse(
  255. reset: Bool
  256. ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
  257. let currentStats = try await ServerStats()
  258. let initialStats = self.state.withLockedValue { state in
  259. return state.serverStats(replaceWith: reset ? currentStats : nil)
  260. }
  261. guard let initialStats = initialStats else {
  262. throw RPCError(
  263. code: .notFound,
  264. message: "There are no initial server stats. A server must be setup before calling 'mark'."
  265. )
  266. }
  267. let differences = currentStats.difference(to: initialStats)
  268. return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
  269. $0.stats = Grpc_Testing_ServerStats.with {
  270. $0.idleCpuTime = differences.idleCPUTime
  271. $0.timeElapsed = differences.time
  272. $0.timeSystem = differences.systemTime
  273. $0.timeUser = differences.userTime
  274. $0.totalCpuTime = differences.totalCPUTime
  275. }
  276. }
  277. }
  278. private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
  279. var clients = [BenchmarkClient]()
  280. for _ in 0 ..< config.clientChannels {
  281. let grpcClient = self.makeGRPCClient()
  282. clients.append(
  283. BenchmarkClient(
  284. client: grpcClient,
  285. rpcNumber: config.outstandingRpcsPerChannel,
  286. rpcType: config.rpcType,
  287. histogramParams: config.histogramParams
  288. )
  289. )
  290. }
  291. let stats = ClientStats()
  292. let histogram = RPCStats.LatencyHistogram(
  293. resolution: config.histogramParams.resolution,
  294. maxBucketStart: config.histogramParams.maxPossible
  295. )
  296. try self.state.withLockedValue { state in
  297. try state.setupClients(
  298. benchmarkClients: clients,
  299. stats: stats,
  300. rpcStats: RPCStats(latencyHistogram: histogram)
  301. )
  302. }
  303. return clients
  304. }
  305. func makeGRPCClient() -> GRPCClient {
  306. fatalError()
  307. }
  308. private func makeClientStatsResponse(
  309. reset: Bool
  310. ) async throws -> Grpc_Testing_WorkerService.Method.RunClient.Output {
  311. let currentUsageStats = ClientStats()
  312. let (initialUsageStats, rpcStats) = try self.state.withLockedValue { state in
  313. let initialUsageStats = state.clientStats(replaceWith: reset ? currentUsageStats : nil)
  314. try state.updateRPCStats()
  315. let rpcStats = state.clientRPCStats
  316. return (initialUsageStats, rpcStats)
  317. }
  318. guard let initialUsageStats = initialUsageStats, let rpcStats = rpcStats else {
  319. throw RPCError(
  320. code: .notFound,
  321. message: "There are no initial client stats. Clients must be setup before calling 'mark'."
  322. )
  323. }
  324. let differences = currentUsageStats.difference(to: initialUsageStats)
  325. let requestResults = rpcStats.requestResultCount.map { (key, value) in
  326. return Grpc_Testing_RequestResultCount.with {
  327. $0.statusCode = Int32(key.rawValue)
  328. $0.count = value
  329. }
  330. }
  331. return Grpc_Testing_WorkerService.Method.RunClient.Output.with {
  332. $0.stats = Grpc_Testing_ClientStats.with {
  333. $0.timeElapsed = differences.time
  334. $0.timeSystem = differences.systemTime
  335. $0.timeUser = differences.userTime
  336. $0.requestResults = requestResults
  337. $0.latencies = Grpc_Testing_HistogramData.with {
  338. $0.bucket = rpcStats.latencyHistogram.buckets
  339. $0.minSeen = rpcStats.latencyHistogram.minSeen
  340. $0.maxSeen = rpcStats.latencyHistogram.maxSeen
  341. $0.sum = rpcStats.latencyHistogram.sum
  342. $0.sumOfSquares = rpcStats.latencyHistogram.sumOfSquares
  343. $0.count = rpcStats.latencyHistogram.countOfValuesSeen
  344. }
  345. }
  346. }
  347. }
  348. }