WorkerServiceImpl.swift 12 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPC
  17. import NIO
  18. // Implementation of the control service for communication with the driver process.
  19. class WorkerServiceImpl: Grpc_Testing_WorkerServiceProvider {
  20. let interceptors: Grpc_Testing_WorkerServiceServerInterceptorFactoryProtocol? = nil
  21. private let finishedPromise: EventLoopPromise<Void>
  22. private let serverPortOverride: Int?
  23. private var runningServer: QPSServer?
  24. private var runningClient: QPSClient?
  25. /// Initialise.
  26. /// - parameters:
  27. /// - finishedPromise: Promise to complete when the server has finished running.
  28. /// - serverPortOverride: An override to port number requested by the driver process.
  29. init(finishedPromise: EventLoopPromise<Void>, serverPortOverride: Int?) {
  30. self.finishedPromise = finishedPromise
  31. self.serverPortOverride = serverPortOverride
  32. }
  33. /// Start server with specified workload.
  34. /// First request sent specifies the ServerConfig followed by ServerStatus
  35. /// response. After that, a "Mark" can be sent anytime to request the latest
  36. /// stats. Closing the stream will initiate shutdown of the test server
  37. /// and once the shutdown has finished, the OK status is sent to terminate
  38. /// this RPC.
  39. func runServer(context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>)
  40. -> EventLoopFuture<(StreamEvent<Grpc_Testing_ServerArgs>) -> Void> {
  41. context.logger.info("runServer stream started.")
  42. return context.eventLoop.makeSucceededFuture({ event in
  43. switch event {
  44. case let .message(serverArgs):
  45. self.handleServerMessage(context: context, args: serverArgs)
  46. case .end:
  47. self.handleServerEnd(context: context)
  48. }
  49. })
  50. }
  51. /// Start client with specified workload.
  52. /// First request sent specifies the ClientConfig followed by ClientStatus
  53. /// response. After that, a "Mark" can be sent anytime to request the latest
  54. /// stats. Closing the stream will initiate shutdown of the test client
  55. /// and once the shutdown has finished, the OK status is sent to terminate
  56. /// this RPC.
  57. func runClient(context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>)
  58. -> EventLoopFuture<(StreamEvent<Grpc_Testing_ClientArgs>) -> Void> {
  59. context.logger.info("runClient stream started")
  60. return context.eventLoop.makeSucceededFuture({ event in
  61. switch event {
  62. case let .message(clientArgs):
  63. self.handleClientMessage(context: context, args: clientArgs)
  64. case .end:
  65. self.handleClientEnd(context: context)
  66. }
  67. })
  68. }
  69. /// Just return the core count - unary call
  70. func coreCount(request: Grpc_Testing_CoreRequest,
  71. context: StatusOnlyCallContext) -> EventLoopFuture<Grpc_Testing_CoreResponse> {
  72. context.logger.notice("coreCount queried")
  73. let cores = Grpc_Testing_CoreResponse.with { $0.cores = Int32(System.coreCount) }
  74. return context.eventLoop.makeSucceededFuture(cores)
  75. }
  76. /// Quit this worker
  77. func quitWorker(request: Grpc_Testing_Void,
  78. context: StatusOnlyCallContext) -> EventLoopFuture<Grpc_Testing_Void> {
  79. context.logger.warning("quitWorker called")
  80. self.finishedPromise.succeed(())
  81. return context.eventLoop.makeSucceededFuture(Grpc_Testing_Void())
  82. }
  83. // MARK: Run Server
  84. /// Handle a message received from the driver about operating as a server.
  85. private func handleServerMessage(
  86. context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>,
  87. args: Grpc_Testing_ServerArgs
  88. ) {
  89. switch args.argtype {
  90. case let .some(.setup(serverConfig)):
  91. self.handleServerSetup(context: context, config: serverConfig)
  92. case let .some(.mark(mark)):
  93. self.handleServerMarkRequested(context: context, mark: mark)
  94. case .none:
  95. ()
  96. }
  97. }
  98. /// Handle a request to setup a server.
  99. /// Makes a new server and sets it running.
  100. private func handleServerSetup(context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>,
  101. config: Grpc_Testing_ServerConfig) {
  102. context.logger.info("server setup requested")
  103. guard self.runningServer == nil else {
  104. context.logger.error("server already running")
  105. context.statusPromise
  106. .fail(GRPCStatus(
  107. code: GRPCStatus.Code.resourceExhausted,
  108. message: "Server worker busy"
  109. ))
  110. return
  111. }
  112. self.runServerBody(context: context, serverConfig: config)
  113. }
  114. /// Gathers stats and returns them to the driver process.
  115. private func handleServerMarkRequested(
  116. context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>,
  117. mark: Grpc_Testing_Mark
  118. ) {
  119. context.logger.info("server mark requested")
  120. guard let runningServer = self.runningServer else {
  121. context.logger.error("server not running")
  122. context.statusPromise
  123. .fail(GRPCStatus(
  124. code: GRPCStatus.Code.failedPrecondition,
  125. message: "Server not running"
  126. ))
  127. return
  128. }
  129. runningServer.sendStatus(reset: mark.reset, context: context)
  130. }
  131. /// Handle a message from the driver asking this server function to stop running.
  132. private func handleServerEnd(context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>) {
  133. context.logger.info("runServer stream ended.")
  134. if let runningServer = self.runningServer {
  135. self.runningServer = nil
  136. let shutdownFuture = runningServer.shutdown(callbackLoop: context.eventLoop)
  137. shutdownFuture.map { () -> GRPCStatus in
  138. GRPCStatus(code: .ok, message: nil)
  139. }.cascade(to: context.statusPromise)
  140. } else {
  141. context.statusPromise.succeed(.ok)
  142. }
  143. }
  144. // MARK: Create Server
  145. /// Start a server running of the requested type.
  146. private func runServerBody(context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>,
  147. serverConfig: Grpc_Testing_ServerConfig) {
  148. var serverConfig = serverConfig
  149. self.serverPortOverride.map { serverConfig.port = Int32($0) }
  150. do {
  151. self.runningServer = try WorkerServiceImpl.createServer(
  152. context: context,
  153. config: serverConfig
  154. )
  155. } catch {
  156. context.statusPromise.fail(error)
  157. }
  158. }
  159. /// Create a server of the requested type.
  160. private static func createServer(
  161. context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>,
  162. config: Grpc_Testing_ServerConfig
  163. ) throws -> QPSServer {
  164. context.logger.info(
  165. "Starting server",
  166. metadata: ["type": .stringConvertible(config.serverType)]
  167. )
  168. switch config.serverType {
  169. case .syncServer:
  170. throw GRPCStatus(code: .unimplemented, message: "Server Type not implemented")
  171. case .asyncServer:
  172. let asyncServer = AsyncQPSServer(
  173. config: config,
  174. whenBound: { serverInfo in
  175. var response = Grpc_Testing_ServerStatus()
  176. response.cores = Int32(serverInfo.threadCount)
  177. response.port = Int32(serverInfo.port)
  178. _ = context.sendResponse(response)
  179. }
  180. )
  181. return asyncServer
  182. case .asyncGenericServer:
  183. throw GRPCStatus(code: .unimplemented, message: "Server Type not implemented")
  184. case .otherServer:
  185. throw GRPCStatus(code: .unimplemented, message: "Server Type not implemented")
  186. case .callbackServer:
  187. throw GRPCStatus(code: .unimplemented, message: "Server Type not implemented")
  188. case .UNRECOGNIZED:
  189. throw GRPCStatus(code: .invalidArgument, message: "Unrecognised server type")
  190. }
  191. }
  192. // MARK: Run Client
  193. /// Handle a message from the driver about operating as a client.
  194. private func handleClientMessage(
  195. context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>,
  196. args: Grpc_Testing_ClientArgs
  197. ) {
  198. switch args.argtype {
  199. case let .some(.setup(clientConfig)):
  200. self.handleClientSetup(context: context, config: clientConfig)
  201. case let .some(.mark(mark)):
  202. // Capture stats
  203. self.handleClientMarkRequested(context: context, mark: mark)
  204. case .none:
  205. ()
  206. }
  207. }
  208. /// Setup a client as described by the message from the driver.
  209. private func handleClientSetup(context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>,
  210. config: Grpc_Testing_ClientConfig) {
  211. context.logger.info("client setup requested")
  212. guard self.runningClient == nil else {
  213. context.logger.error("client already running")
  214. context.statusPromise
  215. .fail(GRPCStatus(
  216. code: GRPCStatus.Code.resourceExhausted,
  217. message: "Client worker busy"
  218. ))
  219. return
  220. }
  221. self.runClientBody(context: context, clientConfig: config)
  222. // Initial status is the default (in C++)
  223. _ = context.sendResponse(Grpc_Testing_ClientStatus())
  224. }
  225. /// Captures stats and send back to driver process.
  226. private func handleClientMarkRequested(
  227. context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>,
  228. mark: Grpc_Testing_Mark
  229. ) {
  230. context.logger.info("client mark requested")
  231. guard let runningClient = self.runningClient else {
  232. context.logger.error("client not running")
  233. context.statusPromise
  234. .fail(GRPCStatus(
  235. code: GRPCStatus.Code.failedPrecondition,
  236. message: "Client not running"
  237. ))
  238. return
  239. }
  240. runningClient.sendStatus(reset: mark.reset, context: context)
  241. }
  242. /// Call when an end message has been received.
  243. /// Causes the running client to shutdown.
  244. private func handleClientEnd(context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>) {
  245. context.logger.info("runClient ended")
  246. // Shutdown
  247. if let runningClient = self.runningClient {
  248. self.runningClient = nil
  249. let shutdownFuture = runningClient.shutdown(callbackLoop: context.eventLoop)
  250. shutdownFuture.map { () in
  251. GRPCStatus(code: .ok, message: nil)
  252. }.cascade(to: context.statusPromise)
  253. } else {
  254. context.statusPromise.succeed(.ok)
  255. }
  256. }
  257. // MARK: Create Client
  258. /// Setup and run a client of the requested type.
  259. private func runClientBody(context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>,
  260. clientConfig: Grpc_Testing_ClientConfig) {
  261. do {
  262. self.runningClient = try WorkerServiceImpl.makeClient(
  263. context: context,
  264. clientConfig: clientConfig
  265. )
  266. } catch {
  267. context.statusPromise.fail(error)
  268. }
  269. }
  270. /// Create a client of the requested type.
  271. private static func makeClient(
  272. context: StreamingResponseCallContext<Grpc_Testing_ClientStatus>,
  273. clientConfig: Grpc_Testing_ClientConfig
  274. ) throws -> QPSClient {
  275. switch clientConfig.clientType {
  276. case .syncClient:
  277. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  278. case .asyncClient:
  279. if let payloadConfig = clientConfig.payloadConfig.payload {
  280. switch payloadConfig {
  281. case .bytebufParams:
  282. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  283. case .simpleParams:
  284. return try makeAsyncClient(config: clientConfig)
  285. case .complexParams:
  286. return try makeAsyncClient(config: clientConfig)
  287. }
  288. } else {
  289. // If there are no parameters assume simple.
  290. return try makeAsyncClient(config: clientConfig)
  291. }
  292. case .otherClient:
  293. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  294. case .callbackClient:
  295. throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
  296. case .UNRECOGNIZED:
  297. throw GRPCStatus(code: .invalidArgument, message: "Unrecognised client type")
  298. }
  299. }
  300. }