QPSWorker.swift 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPC
  17. import Logging
  18. import NIOCore
  19. import NIOPosix
  20. /// Sets up and runs a worker service which listens for instructions on what tests to run.
  21. /// Currently doesn't understand TLS for communication with the driver.
  22. class QPSWorker {
  23. private var driverPort: Int
  24. private var serverPort: Int?
  25. /// Initialise.
  26. /// - parameters:
  27. /// - driverPort: Port to listen for instructions on.
  28. /// - serverPort: Possible override for the port the testing will actually occur on - usually supplied by the driver process.
  29. init(driverPort: Int, serverPort: Int?) {
  30. self.driverPort = driverPort
  31. self.serverPort = serverPort
  32. }
  33. private let logger = Logger(label: "QPSWorker")
  34. private var eventLoopGroup: MultiThreadedEventLoopGroup?
  35. private var server: EventLoopFuture<Server>?
  36. private var workEndFuture: EventLoopFuture<Void>?
  37. /// Start up the server which listens for instructions from the driver.
  38. /// - parameters:
  39. /// - onQuit: Function to call when the driver has indicated that the server should exit.
  40. func start(onQuit: @escaping () -> Void) {
  41. precondition(self.eventLoopGroup == nil)
  42. self.logger.info("Starting")
  43. let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  44. self.eventLoopGroup = eventLoopGroup
  45. let workEndPromise: EventLoopPromise<Void> = eventLoopGroup.next().makePromise()
  46. workEndPromise.futureResult.whenSuccess(onQuit)
  47. let workerService = WorkerServiceImpl(
  48. finishedPromise: workEndPromise,
  49. serverPortOverride: self.serverPort
  50. )
  51. // Start the server.
  52. self.logger.info("Binding to localhost", metadata: ["driverPort": "\(self.driverPort)"])
  53. self.server = Server.insecure(group: eventLoopGroup)
  54. .withServiceProviders([workerService])
  55. .withLogger(Logger(label: "GRPC"))
  56. .bind(host: "localhost", port: self.driverPort)
  57. }
  58. /// Shutdown waiting for completion.
  59. func syncShutdown() throws {
  60. precondition(self.eventLoopGroup != nil)
  61. self.logger.info("Stopping")
  62. try self.eventLoopGroup?.syncShutdownGracefully()
  63. self.logger.info("Stopped")
  64. }
  65. }