WorkerService.swift 17 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCHTTP2Core
  18. import GRPCHTTP2TransportNIOPosix
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOPosix
  22. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  23. final class WorkerService: Sendable {
  24. private let state: NIOLockedValueBox<State>
  25. init() {
  26. self.state = NIOLockedValueBox(State())
  27. }
  28. private struct State {
  29. private var role: Role
  30. enum Role {
  31. case none
  32. case client(Client)
  33. case server(Server)
  34. }
  35. struct Server {
  36. var server: GRPCServer
  37. var stats: ServerStats
  38. var eventLoopGroup: MultiThreadedEventLoopGroup
  39. }
  40. struct Client {
  41. var clients: [BenchmarkClient]
  42. var stats: ClientStats
  43. var rpcStats: RPCStats
  44. }
  45. init() {
  46. self.role = .none
  47. }
  48. mutating func collectServerStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
  49. switch self.role {
  50. case var .server(serverState):
  51. let stats = serverState.stats
  52. if let newStats = newStats {
  53. serverState.stats = newStats
  54. self.role = .server(serverState)
  55. }
  56. return stats
  57. case .client, .none:
  58. return nil
  59. }
  60. }
  61. mutating func collectClientStats(
  62. replaceWith newStats: ClientStats? = nil
  63. ) -> (ClientStats, RPCStats)? {
  64. switch self.role {
  65. case var .client(state):
  66. // Grab the existing stats and update if necessary.
  67. let stats = state.stats
  68. if let newStats = newStats {
  69. state.stats = newStats
  70. }
  71. // Merge in RPC stats from each client.
  72. for client in state.clients {
  73. try? state.rpcStats.merge(client.currentStats)
  74. }
  75. self.role = .client(state)
  76. return (stats, state.rpcStats)
  77. case .server, .none:
  78. return nil
  79. }
  80. }
  81. enum OnStartedServer {
  82. case runServer
  83. case invalidState(RPCError)
  84. }
  85. mutating func startedServer(
  86. _ server: GRPCServer,
  87. stats: ServerStats,
  88. eventLoopGroup: MultiThreadedEventLoopGroup
  89. ) -> OnStartedServer {
  90. let action: OnStartedServer
  91. switch self.role {
  92. case .none:
  93. let state = State.Server(server: server, stats: stats, eventLoopGroup: eventLoopGroup)
  94. self.role = .server(state)
  95. action = .runServer
  96. case .server:
  97. let error = RPCError(code: .alreadyExists, message: "A server has already been set up.")
  98. action = .invalidState(error)
  99. case .client:
  100. let error = RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
  101. action = .invalidState(error)
  102. }
  103. return action
  104. }
  105. enum OnStartedClients {
  106. case runClients
  107. case invalidState(RPCError)
  108. }
  109. mutating func startedClients(
  110. _ clients: [BenchmarkClient],
  111. stats: ClientStats,
  112. rpcStats: RPCStats
  113. ) -> OnStartedClients {
  114. let action: OnStartedClients
  115. switch self.role {
  116. case .none:
  117. let state = State.Client(clients: clients, stats: stats, rpcStats: rpcStats)
  118. self.role = .client(state)
  119. action = .runClients
  120. case .server:
  121. let error = RPCError(code: .alreadyExists, message: "This worker has a server setup.")
  122. action = .invalidState(error)
  123. case .client:
  124. let error = RPCError(
  125. code: .failedPrecondition,
  126. message: "Clients have already been set up."
  127. )
  128. action = .invalidState(error)
  129. }
  130. return action
  131. }
  132. enum OnServerShutDown {
  133. case shutdown(MultiThreadedEventLoopGroup)
  134. case nothing
  135. }
  136. mutating func serverShutdown() -> OnServerShutDown {
  137. switch self.role {
  138. case .client:
  139. preconditionFailure("Invalid state")
  140. case .server(let state):
  141. self.role = .none
  142. return .shutdown(state.eventLoopGroup)
  143. case .none:
  144. return .nothing
  145. }
  146. }
  147. enum OnStopListening {
  148. case stopListening(GRPCServer)
  149. case nothing
  150. }
  151. func stopListening() -> OnStopListening {
  152. switch self.role {
  153. case .client:
  154. preconditionFailure("Invalid state")
  155. case .server(let state):
  156. return .stopListening(state.server)
  157. case .none:
  158. return .nothing
  159. }
  160. }
  161. enum OnCloseClient {
  162. case close([BenchmarkClient])
  163. case nothing
  164. }
  165. mutating func closeClients() -> OnCloseClient {
  166. switch self.role {
  167. case .client(let state):
  168. self.role = .none
  169. return .close(state.clients)
  170. case .server:
  171. preconditionFailure("Invalid state")
  172. case .none:
  173. return .nothing
  174. }
  175. }
  176. enum OnQuitWorker {
  177. case shutDownServer(GRPCServer)
  178. case shutDownClients([BenchmarkClient])
  179. case nothing
  180. }
  181. mutating func quit() -> OnQuitWorker {
  182. switch self.role {
  183. case .none:
  184. return .nothing
  185. case .client(let state):
  186. self.role = .none
  187. return .shutDownClients(state.clients)
  188. case .server(let state):
  189. self.role = .none
  190. return .shutDownServer(state.server)
  191. }
  192. }
  193. }
  194. }
  195. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  196. extension WorkerService: Grpc_Testing_WorkerService.ServiceProtocol {
  197. func quitWorker(
  198. request: ServerRequest.Single<Grpc_Testing_Void>
  199. ) async throws -> ServerResponse.Single<Grpc_Testing_Void> {
  200. let onQuit = self.state.withLockedValue { $0.quit() }
  201. switch onQuit {
  202. case .nothing:
  203. ()
  204. case .shutDownClients(let clients):
  205. for client in clients {
  206. client.shutdown()
  207. }
  208. case .shutDownServer(let server):
  209. server.beginGracefulShutdown()
  210. }
  211. return ServerResponse.Single(message: Grpc_Testing_Void())
  212. }
  213. func coreCount(
  214. request: ServerRequest.Single<Grpc_Testing_CoreRequest>
  215. ) async throws -> ServerResponse.Single<Grpc_Testing_CoreResponse> {
  216. let coreCount = System.coreCount
  217. return ServerResponse.Single(
  218. message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
  219. $0.cores = Int32(coreCount)
  220. }
  221. )
  222. }
  223. func runServer(
  224. request: ServerRequest.Stream<Grpc_Testing_ServerArgs>
  225. ) async throws -> ServerResponse.Stream<Grpc_Testing_ServerStatus> {
  226. return ServerResponse.Stream { writer in
  227. try await withThrowingTaskGroup(of: Void.self) { group in
  228. for try await message in request.messages {
  229. switch message.argtype {
  230. case let .some(.setup(serverConfig)):
  231. let (server, transport) = try await self.startServer(serverConfig)
  232. group.addTask {
  233. let result: Result<Void, any Error>
  234. do {
  235. try await server.serve()
  236. result = .success(())
  237. } catch {
  238. result = .failure(error)
  239. }
  240. switch self.state.withLockedValue({ $0.serverShutdown() }) {
  241. case .shutdown(let eventLoopGroup):
  242. try await eventLoopGroup.shutdownGracefully()
  243. case .nothing:
  244. ()
  245. }
  246. try result.get()
  247. }
  248. // Wait for the server to bind.
  249. let address = try await transport.listeningAddress
  250. let port: Int
  251. if let ipv4 = address.ipv4 {
  252. port = ipv4.port
  253. } else if let ipv6 = address.ipv6 {
  254. port = ipv6.port
  255. } else {
  256. throw RPCError(
  257. code: .internalError,
  258. message: "Server listening on unsupported address '\(address)'"
  259. )
  260. }
  261. // Tell the client what port the server is listening on.
  262. let message = Grpc_Testing_ServerStatus.with { $0.port = Int32(port) }
  263. try await writer.write(message)
  264. case let .some(.mark(mark)):
  265. let response = try await self.makeServerStatsResponse(reset: mark.reset)
  266. try await writer.write(response)
  267. case .none:
  268. ()
  269. }
  270. }
  271. // Request stream ended, tell the server to stop listening. Once it's finished it will
  272. // shutdown its ELG.
  273. switch self.state.withLockedValue({ $0.stopListening() }) {
  274. case .stopListening(let server):
  275. server.beginGracefulShutdown()
  276. case .nothing:
  277. ()
  278. }
  279. }
  280. return [:]
  281. }
  282. }
  283. func runClient(
  284. request: ServerRequest.Stream<Grpc_Testing_ClientArgs>
  285. ) async throws -> ServerResponse.Stream<Grpc_Testing_ClientStatus> {
  286. return ServerResponse.Stream { writer in
  287. try await withThrowingTaskGroup(of: Void.self) { group in
  288. for try await message in request.messages {
  289. switch message.argtype {
  290. case let .setup(config):
  291. // Create the clients with the initial stats.
  292. let clients = try await self.setupClients(config)
  293. for client in clients {
  294. group.addTask {
  295. try await client.run()
  296. }
  297. }
  298. let message = try await self.makeClientStatsResponse(reset: false)
  299. try await writer.write(message)
  300. case let .mark(mark):
  301. let response = try await self.makeClientStatsResponse(reset: mark.reset)
  302. try await writer.write(response)
  303. case .none:
  304. ()
  305. }
  306. }
  307. switch self.state.withLockedValue({ $0.closeClients() }) {
  308. case .close(let clients):
  309. for client in clients {
  310. client.shutdown()
  311. }
  312. case .nothing:
  313. ()
  314. }
  315. try await group.waitForAll()
  316. return [:]
  317. }
  318. }
  319. }
  320. }
  321. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  322. extension WorkerService {
  323. private func startServer(
  324. _ serverConfig: Grpc_Testing_ServerConfig
  325. ) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) {
  326. // Prepare an ELG, the test might require more than the default of one.
  327. let numberOfThreads: Int
  328. if serverConfig.asyncServerThreads > 0 {
  329. numberOfThreads = Int(serverConfig.asyncServerThreads)
  330. } else {
  331. numberOfThreads = System.coreCount
  332. }
  333. let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: numberOfThreads)
  334. // Don't restrict the max payload size, the client is always trusted.
  335. var config = HTTP2ServerTransport.Posix.Config.defaults(transportSecurity: .plaintext)
  336. config.rpc.maxRequestPayloadSize = .max
  337. let transport = HTTP2ServerTransport.Posix(
  338. address: .ipv4(host: "127.0.0.1", port: Int(serverConfig.port)),
  339. config: config,
  340. eventLoopGroup: eventLoopGroup
  341. )
  342. let server = GRPCServer(transport: transport, services: [BenchmarkService()])
  343. let stats = try await ServerStats()
  344. // Hold on to the server and ELG in the state machine.
  345. let action = self.state.withLockedValue {
  346. $0.startedServer(server, stats: stats, eventLoopGroup: eventLoopGroup)
  347. }
  348. switch action {
  349. case .runServer:
  350. return (server, transport)
  351. case .invalidState(let error):
  352. server.beginGracefulShutdown()
  353. try await eventLoopGroup.shutdownGracefully()
  354. throw error
  355. }
  356. }
  357. private func makeServerStatsResponse(
  358. reset: Bool
  359. ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
  360. let currentStats = try await ServerStats()
  361. let initialStats = self.state.withLockedValue { state in
  362. return state.collectServerStats(replaceWith: reset ? currentStats : nil)
  363. }
  364. guard let initialStats = initialStats else {
  365. throw RPCError(
  366. code: .notFound,
  367. message: "There are no initial server stats. A server must be setup before calling 'mark'."
  368. )
  369. }
  370. let differences = currentStats.difference(to: initialStats)
  371. return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
  372. $0.stats = Grpc_Testing_ServerStats.with {
  373. $0.idleCpuTime = differences.idleCPUTime
  374. $0.timeElapsed = differences.time
  375. $0.timeSystem = differences.systemTime
  376. $0.timeUser = differences.userTime
  377. $0.totalCpuTime = differences.totalCPUTime
  378. }
  379. }
  380. }
  381. private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
  382. guard let rpcType = BenchmarkClient.RPCType(config.rpcType) else {
  383. throw RPCError(code: .invalidArgument, message: "Unknown RPC type")
  384. }
  385. // Parse the server targets into resolvable targets.
  386. let ipv4Addresses = try self.parseServerTargets(config.serverTargets)
  387. let target = ResolvableTargets.IPv4(addresses: ipv4Addresses)
  388. var clients = [BenchmarkClient]()
  389. for _ in 0 ..< config.clientChannels {
  390. let client = BenchmarkClient(
  391. client: GRPCClient(
  392. transport: try .http2NIOPosix(
  393. target: target,
  394. config: .defaults(transportSecurity: .plaintext)
  395. )
  396. ),
  397. concurrentRPCs: Int(config.outstandingRpcsPerChannel),
  398. rpcType: rpcType,
  399. messagesPerStream: Int(config.messagesPerStream),
  400. protoParams: config.payloadConfig.simpleParams,
  401. histogramParams: config.histogramParams
  402. )
  403. clients.append(client)
  404. }
  405. let stats = ClientStats()
  406. let histogram = RPCStats.LatencyHistogram(
  407. resolution: config.histogramParams.resolution,
  408. maxBucketStart: config.histogramParams.maxPossible
  409. )
  410. let rpcStats = RPCStats(latencyHistogram: histogram)
  411. let action = self.state.withLockedValue { state in
  412. state.startedClients(clients, stats: stats, rpcStats: rpcStats)
  413. }
  414. switch action {
  415. case .runClients:
  416. return clients
  417. case .invalidState(let error):
  418. for client in clients {
  419. client.shutdown()
  420. }
  421. throw error
  422. }
  423. }
  424. private func parseServerTarget(_ target: String) -> GRPCHTTP2Core.SocketAddress.IPv4? {
  425. guard let index = target.firstIndex(of: ":") else { return nil }
  426. let host = target[..<index]
  427. if let port = Int(target[target.index(after: index)...]) {
  428. return SocketAddress.IPv4(host: String(host), port: port)
  429. } else {
  430. return nil
  431. }
  432. }
  433. private func parseServerTargets(
  434. _ targets: [String]
  435. ) throws -> [GRPCHTTP2Core.SocketAddress.IPv4] {
  436. try targets.map { target in
  437. if let ipv4 = self.parseServerTarget(target) {
  438. return ipv4
  439. } else {
  440. throw RPCError(
  441. code: .invalidArgument,
  442. message: """
  443. Couldn't parse target '\(target)'. Must be in the format '<host>:<port>' for IPv4 \
  444. or '[<host>]:<port>' for IPv6.
  445. """
  446. )
  447. }
  448. }
  449. }
  450. private func makeClientStatsResponse(
  451. reset: Bool
  452. ) async throws -> Grpc_Testing_WorkerService.Method.RunClient.Output {
  453. let currentUsageStats = ClientStats()
  454. let stats = self.state.withLockedValue { state in
  455. state.collectClientStats(replaceWith: reset ? currentUsageStats : nil)
  456. }
  457. guard let (initialUsageStats, rpcStats) = stats else {
  458. throw RPCError(
  459. code: .notFound,
  460. message: "There are no initial client stats. Clients must be setup before calling 'mark'."
  461. )
  462. }
  463. let differences = currentUsageStats.difference(to: initialUsageStats)
  464. let requestResults = rpcStats.requestResultCount.map { (key, value) in
  465. return Grpc_Testing_RequestResultCount.with {
  466. $0.statusCode = Int32(key.rawValue)
  467. $0.count = value
  468. }
  469. }
  470. return Grpc_Testing_WorkerService.Method.RunClient.Output.with {
  471. $0.stats = Grpc_Testing_ClientStats.with {
  472. $0.timeElapsed = differences.time
  473. $0.timeSystem = differences.systemTime
  474. $0.timeUser = differences.userTime
  475. $0.requestResults = requestResults
  476. $0.latencies = Grpc_Testing_HistogramData.with {
  477. $0.bucket = rpcStats.latencyHistogram.buckets
  478. $0.minSeen = rpcStats.latencyHistogram.minSeen
  479. $0.maxSeen = rpcStats.latencyHistogram.maxSeen
  480. $0.sum = rpcStats.latencyHistogram.sum
  481. $0.sumOfSquares = rpcStats.latencyHistogram.sumOfSquares
  482. $0.count = rpcStats.latencyHistogram.countOfValuesSeen
  483. }
  484. }
  485. }
  486. }
  487. }
  488. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  489. extension BenchmarkClient.RPCType {
  490. init?(_ rpcType: Grpc_Testing_RpcType) {
  491. switch rpcType {
  492. case .unary:
  493. self = .unary
  494. case .streaming:
  495. self = .streaming
  496. default:
  497. return nil
  498. }
  499. }
  500. }