AsyncServer.swift 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPC
  18. import Logging
  19. import NIO
  20. /// Server setup for asynchronous requests.
  21. final class AsyncQPSServer: QPSServer {
  22. private let eventLoopGroup: MultiThreadedEventLoopGroup
  23. private let server: EventLoopFuture<Server>
  24. private let threadCount: Int
  25. private var statsPeriodStart: DispatchTime
  26. private var cpuStatsPeriodStart: CPUTime
  27. private let logger = Logger(label: "AsyncQPSServer")
  28. /// Initialisation.
  29. /// - parameters:
  30. /// - config: Description of the type of server required.
  31. /// - whenBound: Called when the server has successful bound to a port.
  32. init(config: Grpc_Testing_ServerConfig, whenBound: @escaping (ServerInfo) -> Void) {
  33. // Setup threads as requested.
  34. let threadCount = config.asyncServerThreads > 0
  35. ? Int(config.asyncServerThreads)
  36. : System.coreCount
  37. self.threadCount = threadCount
  38. self.logger.info("Sizing AsyncQPSServer", metadata: ["threads": "\(threadCount)"])
  39. self.eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: threadCount)
  40. // Start stats gathering.
  41. self.statsPeriodStart = grpcTimeNow()
  42. self.cpuStatsPeriodStart = getResourceUsage()
  43. let workerService = AsyncQPSServerImpl()
  44. // Start the server.
  45. // TODO: Support TLS is requested.
  46. self.server = Server.insecure(group: self.eventLoopGroup)
  47. .withServiceProviders([workerService])
  48. .withLogger(self.logger)
  49. .bind(host: "localhost", port: Int(config.port))
  50. self.server.whenSuccess { server in
  51. let port = server.channel.localAddress?.port ?? 0
  52. whenBound(ServerInfo(threadCount: threadCount, port: port))
  53. }
  54. }
  55. /// Send the status of the current test
  56. /// - parameters:
  57. /// - reset: Indicates if the stats collection should be reset after publication or not.
  58. /// - context: Context to describe where to send the status to.
  59. func sendStatus(reset: Bool, context: StreamingResponseCallContext<Grpc_Testing_ServerStatus>) {
  60. let currentTime = grpcTimeNow()
  61. let currentResourceUsage = getResourceUsage()
  62. var result = Grpc_Testing_ServerStatus()
  63. result.stats.timeElapsed = (currentTime - self.statsPeriodStart).asSeconds()
  64. result.stats.timeSystem = currentResourceUsage.systemTime - self.cpuStatsPeriodStart
  65. .systemTime
  66. result.stats.timeUser = currentResourceUsage.userTime - self.cpuStatsPeriodStart.userTime
  67. result.stats.totalCpuTime = 0
  68. result.stats.idleCpuTime = 0
  69. result.stats.cqPollCount = 0
  70. self.logger.info("Sending server status")
  71. _ = context.sendResponse(result)
  72. if reset {
  73. self.statsPeriodStart = currentTime
  74. self.cpuStatsPeriodStart = currentResourceUsage
  75. }
  76. }
  77. /// Shutdown the service.
  78. /// - parameters:
  79. /// - callbackLoop: Which eventloop should be called back on completion.
  80. /// - returns: A future on the `callbackLoop` which will succeed on completion of shutdown.
  81. func shutdown(callbackLoop: EventLoop) -> EventLoopFuture<Void> {
  82. return self.server.flatMap { server in
  83. server.close()
  84. }.recover { error in
  85. self.logger.error("Error closing server", metadata: ["error": "\(error)"])
  86. // May as well plough on anyway -
  87. // we will hopefully sort outselves out shutting down the eventloops
  88. return ()
  89. }.hop(to: callbackLoop).flatMap { _ in
  90. let promise: EventLoopPromise<Void> = callbackLoop.makePromise()
  91. self.eventLoopGroup.shutdownGracefully { error in
  92. if let error = error {
  93. promise.fail(error)
  94. } else {
  95. promise.succeed(())
  96. }
  97. }
  98. return promise.futureResult
  99. }
  100. }
  101. }