WorkerService.swift 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  20. final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable {
  21. private let state: NIOLockedValueBox<State>
  22. init() {
  23. let clientAndServer = State()
  24. self.state = NIOLockedValueBox(clientAndServer)
  25. }
  26. private struct State {
  27. var role: Role?
  28. enum Role {
  29. case client(ClientState)
  30. case server(ServerState)
  31. }
  32. struct ServerState {
  33. var server: GRPCServer
  34. var stats: ServerStats
  35. init(server: GRPCServer, stats: ServerStats) {
  36. self.server = server
  37. self.stats = stats
  38. }
  39. }
  40. struct ClientState {
  41. var clients: [BenchmarkClient]
  42. var stats: ClientStats
  43. init(
  44. clients: [BenchmarkClient],
  45. stats: ClientStats
  46. ) {
  47. self.clients = clients
  48. self.stats = stats
  49. }
  50. func shutdownClients() throws {
  51. for benchmarkClient in self.clients {
  52. benchmarkClient.shutdown()
  53. }
  54. }
  55. }
  56. init() {}
  57. init(role: Role) {
  58. self.role = role
  59. }
  60. var server: GRPCServer? {
  61. switch self.role {
  62. case let .server(serverState):
  63. return serverState.server
  64. case .client, .none:
  65. return nil
  66. }
  67. }
  68. mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
  69. switch self.role {
  70. case var .server(serverState):
  71. let stats = serverState.stats
  72. if let newStats = newStats {
  73. serverState.stats = newStats
  74. self.role = .server(serverState)
  75. }
  76. return stats
  77. case .client, .none:
  78. return nil
  79. }
  80. }
  81. mutating func clientStats(replaceWith newStats: ClientStats? = nil) -> ClientStats? {
  82. switch self.role {
  83. case var .client(clientState):
  84. let stats = clientState.stats
  85. if let newStats = newStats {
  86. clientState.stats = newStats
  87. self.role = .client(clientState)
  88. }
  89. return stats
  90. case .server, .none:
  91. return nil
  92. }
  93. }
  94. mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
  95. let serverState = State.ServerState(server: server, stats: stats)
  96. switch self.role {
  97. case .server(_):
  98. throw RPCError(code: .alreadyExists, message: "A server has already been set up.")
  99. case .client(_):
  100. throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
  101. case .none:
  102. self.role = .server(serverState)
  103. }
  104. }
  105. }
  106. func quitWorker(
  107. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Input>
  108. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Output> {
  109. let role = self.state.withLockedValue { state in
  110. defer { state.role = nil }
  111. return state.role
  112. }
  113. if let role = role {
  114. switch role {
  115. case .client(let clientState):
  116. try clientState.shutdownClients()
  117. case .server(let serverState):
  118. serverState.server.stopListening()
  119. }
  120. }
  121. return ServerResponse.Single(message: Grpc_Testing_WorkerService.Method.QuitWorker.Output())
  122. }
  123. func coreCount(
  124. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.CoreCount.Input>
  125. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.CoreCount.Output> {
  126. let coreCount = System.coreCount
  127. return ServerResponse.Single(
  128. message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
  129. $0.cores = Int32(coreCount)
  130. }
  131. )
  132. }
  133. func runServer(
  134. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunServer.Input>
  135. ) async throws
  136. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
  137. {
  138. return ServerResponse.Stream { writer in
  139. try await withThrowingTaskGroup(of: Void.self) { group in
  140. for try await message in request.messages {
  141. switch message.argtype {
  142. case let .some(.setup(serverConfig)):
  143. let server = try await self.setupServer(serverConfig)
  144. group.addTask { try await server.run() }
  145. case let .some(.mark(mark)):
  146. let response = try await self.makeServerStatsResponse(reset: mark.reset)
  147. try await writer.write(response)
  148. case .none:
  149. ()
  150. }
  151. }
  152. try await group.next()
  153. }
  154. let server = self.state.withLockedValue { state in
  155. defer { state.role = nil }
  156. return state.server
  157. }
  158. server?.stopListening()
  159. return [:]
  160. }
  161. }
  162. func runClient(
  163. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunClient.Input>
  164. ) async throws
  165. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunClient.Output>
  166. {
  167. throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
  168. }
  169. }
  170. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  171. extension WorkerService {
  172. private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
  173. let server = GRPCServer(transports: [], services: [BenchmarkService()])
  174. let stats = try await ServerStats()
  175. try self.state.withLockedValue { state in
  176. try state.setupServer(server: server, stats: stats)
  177. }
  178. return server
  179. }
  180. private func makeServerStatsResponse(
  181. reset: Bool
  182. ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
  183. let currentStats = try await ServerStats()
  184. let initialStats = self.state.withLockedValue { state in
  185. return state.serverStats(replaceWith: reset ? currentStats : nil)
  186. }
  187. guard let initialStats = initialStats else {
  188. throw RPCError(
  189. code: .notFound,
  190. message: "There are no initial server stats. A server must be setup before calling 'mark'."
  191. )
  192. }
  193. let differences = currentStats.difference(to: initialStats)
  194. return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
  195. $0.stats = Grpc_Testing_ServerStats.with {
  196. $0.idleCpuTime = differences.idleCPUTime
  197. $0.timeElapsed = differences.time
  198. $0.timeSystem = differences.systemTime
  199. $0.timeUser = differences.userTime
  200. $0.totalCpuTime = differences.totalCPUTime
  201. }
  202. }
  203. }
  204. }