WorkerService.swift 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  20. final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable {
  21. private let state: NIOLockedValueBox<State>
  22. init() {
  23. let clientAndServer = State()
  24. self.state = NIOLockedValueBox(clientAndServer)
  25. }
  26. private struct State {
  27. var role: Role?
  28. enum Role {
  29. case client(GRPCClient)
  30. case server(ServerState)
  31. }
  32. struct ServerState {
  33. var server: GRPCServer
  34. var stats: ServerStats
  35. init(server: GRPCServer, stats: ServerStats) {
  36. self.server = server
  37. self.stats = stats
  38. }
  39. }
  40. init() {}
  41. init(role: Role) {
  42. self.role = role
  43. }
  44. var server: GRPCServer? {
  45. switch self.role {
  46. case let .server(serverState):
  47. return serverState.server
  48. case .client, .none:
  49. return nil
  50. }
  51. }
  52. mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
  53. switch self.role {
  54. case var .server(serverState):
  55. let stats = serverState.stats
  56. if let newStats = newStats {
  57. serverState.stats = newStats
  58. self.role = .server(serverState)
  59. }
  60. return stats
  61. case .client, .none:
  62. return nil
  63. }
  64. }
  65. mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
  66. let serverState = State.ServerState(server: server, stats: stats)
  67. switch self.role {
  68. case .server(_):
  69. throw RPCError(code: .alreadyExists, message: "A server has already been set up.")
  70. case .client(_):
  71. throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
  72. case .none:
  73. self.role = .server(serverState)
  74. }
  75. }
  76. }
  77. func quitWorker(
  78. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Input>
  79. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Output> {
  80. let role = self.state.withLockedValue { state in
  81. defer { state.role = nil }
  82. return state.role
  83. }
  84. if let role = role {
  85. switch role {
  86. case .client(let client):
  87. client.close()
  88. case .server(let serverState):
  89. serverState.server.stopListening()
  90. }
  91. }
  92. return ServerResponse.Single(message: Grpc_Testing_WorkerService.Method.QuitWorker.Output())
  93. }
  94. func coreCount(
  95. request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.CoreCount.Input>
  96. ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.CoreCount.Output> {
  97. let coreCount = System.coreCount
  98. return ServerResponse.Single(
  99. message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
  100. $0.cores = Int32(coreCount)
  101. }
  102. )
  103. }
  104. func runServer(
  105. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunServer.Input>
  106. ) async throws
  107. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
  108. {
  109. return ServerResponse.Stream { writer in
  110. try await withThrowingTaskGroup(of: Void.self) { group in
  111. for try await message in request.messages {
  112. switch message.argtype {
  113. case let .some(.setup(serverConfig)):
  114. let server = try await self.setupServer(serverConfig)
  115. group.addTask { try await server.run() }
  116. case let .some(.mark(mark)):
  117. let response = try await self.makeServerStatsResponse(reset: mark.reset)
  118. try await writer.write(response)
  119. case .none:
  120. ()
  121. }
  122. }
  123. try await group.next()
  124. }
  125. let server = self.state.withLockedValue { state in
  126. defer { state.role = nil }
  127. return state.server
  128. }
  129. server?.stopListening()
  130. return [:]
  131. }
  132. }
  133. func runClient(
  134. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunClient.Input>
  135. ) async throws
  136. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunClient.Output>
  137. {
  138. throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
  139. }
  140. }
  141. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  142. extension WorkerService {
  143. private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
  144. let server = GRPCServer(transports: [], services: [BenchmarkService()])
  145. let stats = try await ServerStats()
  146. try self.state.withLockedValue { state in
  147. try state.setupServer(server: server, stats: stats)
  148. }
  149. return server
  150. }
  151. private func makeServerStatsResponse(
  152. reset: Bool
  153. ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
  154. let currentStats = try await ServerStats()
  155. let initialStats = self.state.withLockedValue { state in
  156. return state.serverStats(replaceWith: reset ? currentStats : nil)
  157. }
  158. guard let initialStats = initialStats else {
  159. throw RPCError(
  160. code: .notFound,
  161. message: "There are no initial server stats. A server must be setup before calling 'mark'."
  162. )
  163. }
  164. let differences = currentStats.difference(to: initialStats)
  165. return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
  166. $0.stats = Grpc_Testing_ServerStats.with {
  167. $0.idleCpuTime = differences.idleCPUTime
  168. $0.timeElapsed = differences.time
  169. $0.timeSystem = differences.systemTime
  170. $0.timeUser = differences.userTime
  171. $0.totalCpuTime = differences.totalCPUTime
  172. }
  173. }
  174. }
  175. }