WorkerService.swift 17 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCHTTP2Core
  18. import GRPCHTTP2TransportNIOPosix
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOPosix
  22. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  23. final class WorkerService: Sendable {
  24. private let state: NIOLockedValueBox<State>
  25. init() {
  26. self.state = NIOLockedValueBox(State())
  27. }
  28. private struct State {
  29. private var role: Role
  30. enum Role {
  31. case none
  32. case client(Client)
  33. case server(Server)
  34. }
  35. struct Server {
  36. var server: GRPCServer
  37. var stats: ServerStats
  38. var eventLoopGroup: MultiThreadedEventLoopGroup
  39. }
  40. struct Client {
  41. var clients: [BenchmarkClient]
  42. var stats: ClientStats
  43. var rpcStats: RPCStats
  44. }
  45. init() {
  46. self.role = .none
  47. }
  48. mutating func collectServerStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
  49. switch self.role {
  50. case var .server(serverState):
  51. let stats = serverState.stats
  52. if let newStats = newStats {
  53. serverState.stats = newStats
  54. self.role = .server(serverState)
  55. }
  56. return stats
  57. case .client, .none:
  58. return nil
  59. }
  60. }
  61. mutating func collectClientStats(
  62. replaceWith newStats: ClientStats? = nil
  63. ) -> (ClientStats, RPCStats)? {
  64. switch self.role {
  65. case var .client(state):
  66. // Grab the existing stats and update if necessary.
  67. let stats = state.stats
  68. if let newStats = newStats {
  69. state.stats = newStats
  70. }
  71. // Merge in RPC stats from each client.
  72. for client in state.clients {
  73. try? state.rpcStats.merge(client.currentStats)
  74. }
  75. self.role = .client(state)
  76. return (stats, state.rpcStats)
  77. case .server, .none:
  78. return nil
  79. }
  80. }
  81. enum OnStartedServer {
  82. case runServer
  83. case invalidState(RPCError)
  84. }
  85. mutating func startedServer(
  86. _ server: GRPCServer,
  87. stats: ServerStats,
  88. eventLoopGroup: MultiThreadedEventLoopGroup
  89. ) -> OnStartedServer {
  90. let action: OnStartedServer
  91. switch self.role {
  92. case .none:
  93. let state = State.Server(server: server, stats: stats, eventLoopGroup: eventLoopGroup)
  94. self.role = .server(state)
  95. action = .runServer
  96. case .server:
  97. let error = RPCError(code: .alreadyExists, message: "A server has already been set up.")
  98. action = .invalidState(error)
  99. case .client:
  100. let error = RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
  101. action = .invalidState(error)
  102. }
  103. return action
  104. }
  105. enum OnStartedClients {
  106. case runClients
  107. case invalidState(RPCError)
  108. }
  109. mutating func startedClients(
  110. _ clients: [BenchmarkClient],
  111. stats: ClientStats,
  112. rpcStats: RPCStats
  113. ) -> OnStartedClients {
  114. let action: OnStartedClients
  115. switch self.role {
  116. case .none:
  117. let state = State.Client(clients: clients, stats: stats, rpcStats: rpcStats)
  118. self.role = .client(state)
  119. action = .runClients
  120. case .server:
  121. let error = RPCError(code: .alreadyExists, message: "This worker has a server setup.")
  122. action = .invalidState(error)
  123. case .client:
  124. let error = RPCError(
  125. code: .failedPrecondition,
  126. message: "Clients have already been set up."
  127. )
  128. action = .invalidState(error)
  129. }
  130. return action
  131. }
  132. enum OnServerShutDown {
  133. case shutdown(MultiThreadedEventLoopGroup)
  134. case nothing
  135. }
  136. mutating func serverShutdown() -> OnServerShutDown {
  137. switch self.role {
  138. case .client:
  139. preconditionFailure("Invalid state")
  140. case .server(let state):
  141. self.role = .none
  142. return .shutdown(state.eventLoopGroup)
  143. case .none:
  144. return .nothing
  145. }
  146. }
  147. enum OnStopListening {
  148. case stopListening(GRPCServer)
  149. case nothing
  150. }
  151. func stopListening() -> OnStopListening {
  152. switch self.role {
  153. case .client:
  154. preconditionFailure("Invalid state")
  155. case .server(let state):
  156. return .stopListening(state.server)
  157. case .none:
  158. return .nothing
  159. }
  160. }
  161. enum OnCloseClient {
  162. case close([BenchmarkClient])
  163. case nothing
  164. }
  165. mutating func closeClients() -> OnCloseClient {
  166. switch self.role {
  167. case .client(let state):
  168. self.role = .none
  169. return .close(state.clients)
  170. case .server:
  171. preconditionFailure("Invalid state")
  172. case .none:
  173. return .nothing
  174. }
  175. }
  176. enum OnQuitWorker {
  177. case shutDownServer(GRPCServer)
  178. case shutDownClients([BenchmarkClient])
  179. case nothing
  180. }
  181. mutating func quit() -> OnQuitWorker {
  182. switch self.role {
  183. case .none:
  184. return .nothing
  185. case .client(let state):
  186. self.role = .none
  187. return .shutDownClients(state.clients)
  188. case .server(let state):
  189. self.role = .none
  190. return .shutDownServer(state.server)
  191. }
  192. }
  193. }
  194. }
  195. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  196. extension WorkerService: Grpc_Testing_WorkerService.ServiceProtocol {
  197. func quitWorker(
  198. request: ServerRequest.Single<Grpc_Testing_Void>,
  199. context: ServerContext
  200. ) async throws -> ServerResponse.Single<Grpc_Testing_Void> {
  201. let onQuit = self.state.withLockedValue { $0.quit() }
  202. switch onQuit {
  203. case .nothing:
  204. ()
  205. case .shutDownClients(let clients):
  206. for client in clients {
  207. client.shutdown()
  208. }
  209. case .shutDownServer(let server):
  210. server.beginGracefulShutdown()
  211. }
  212. return ServerResponse.Single(message: Grpc_Testing_Void())
  213. }
  214. func coreCount(
  215. request: ServerRequest.Single<Grpc_Testing_CoreRequest>,
  216. context: ServerContext
  217. ) async throws -> ServerResponse.Single<Grpc_Testing_CoreResponse> {
  218. let coreCount = System.coreCount
  219. return ServerResponse.Single(
  220. message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
  221. $0.cores = Int32(coreCount)
  222. }
  223. )
  224. }
  225. func runServer(
  226. request: ServerRequest.Stream<Grpc_Testing_ServerArgs>,
  227. context: ServerContext
  228. ) async throws -> ServerResponse.Stream<Grpc_Testing_ServerStatus> {
  229. return ServerResponse.Stream { writer in
  230. try await withThrowingTaskGroup(of: Void.self) { group in
  231. for try await message in request.messages {
  232. switch message.argtype {
  233. case let .some(.setup(serverConfig)):
  234. let (server, transport) = try await self.startServer(serverConfig)
  235. group.addTask {
  236. let result: Result<Void, any Error>
  237. do {
  238. try await server.serve()
  239. result = .success(())
  240. } catch {
  241. result = .failure(error)
  242. }
  243. switch self.state.withLockedValue({ $0.serverShutdown() }) {
  244. case .shutdown(let eventLoopGroup):
  245. try await eventLoopGroup.shutdownGracefully()
  246. case .nothing:
  247. ()
  248. }
  249. try result.get()
  250. }
  251. // Wait for the server to bind.
  252. let address = try await transport.listeningAddress
  253. let port: Int
  254. if let ipv4 = address.ipv4 {
  255. port = ipv4.port
  256. } else if let ipv6 = address.ipv6 {
  257. port = ipv6.port
  258. } else {
  259. throw RPCError(
  260. code: .internalError,
  261. message: "Server listening on unsupported address '\(address)'"
  262. )
  263. }
  264. // Tell the client what port the server is listening on.
  265. let message = Grpc_Testing_ServerStatus.with { $0.port = Int32(port) }
  266. try await writer.write(message)
  267. case let .some(.mark(mark)):
  268. let response = try await self.makeServerStatsResponse(reset: mark.reset)
  269. try await writer.write(response)
  270. case .none:
  271. ()
  272. }
  273. }
  274. // Request stream ended, tell the server to stop listening. Once it's finished it will
  275. // shutdown its ELG.
  276. switch self.state.withLockedValue({ $0.stopListening() }) {
  277. case .stopListening(let server):
  278. server.beginGracefulShutdown()
  279. case .nothing:
  280. ()
  281. }
  282. }
  283. return [:]
  284. }
  285. }
  286. func runClient(
  287. request: ServerRequest.Stream<Grpc_Testing_ClientArgs>,
  288. context: ServerContext
  289. ) async throws -> ServerResponse.Stream<Grpc_Testing_ClientStatus> {
  290. return ServerResponse.Stream { writer in
  291. try await withThrowingTaskGroup(of: Void.self) { group in
  292. for try await message in request.messages {
  293. switch message.argtype {
  294. case let .setup(config):
  295. // Create the clients with the initial stats.
  296. let clients = try await self.setupClients(config)
  297. for client in clients {
  298. group.addTask {
  299. try await client.run()
  300. }
  301. }
  302. let message = try await self.makeClientStatsResponse(reset: false)
  303. try await writer.write(message)
  304. case let .mark(mark):
  305. let response = try await self.makeClientStatsResponse(reset: mark.reset)
  306. try await writer.write(response)
  307. case .none:
  308. ()
  309. }
  310. }
  311. switch self.state.withLockedValue({ $0.closeClients() }) {
  312. case .close(let clients):
  313. for client in clients {
  314. client.shutdown()
  315. }
  316. case .nothing:
  317. ()
  318. }
  319. try await group.waitForAll()
  320. return [:]
  321. }
  322. }
  323. }
  324. }
  325. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  326. extension WorkerService {
  327. private func startServer(
  328. _ serverConfig: Grpc_Testing_ServerConfig
  329. ) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) {
  330. // Prepare an ELG, the test might require more than the default of one.
  331. let numberOfThreads: Int
  332. if serverConfig.asyncServerThreads > 0 {
  333. numberOfThreads = Int(serverConfig.asyncServerThreads)
  334. } else {
  335. numberOfThreads = System.coreCount
  336. }
  337. let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: numberOfThreads)
  338. // Don't restrict the max payload size, the client is always trusted.
  339. var config = HTTP2ServerTransport.Posix.Config.defaults(transportSecurity: .plaintext)
  340. config.rpc.maxRequestPayloadSize = .max
  341. let transport = HTTP2ServerTransport.Posix(
  342. address: .ipv4(host: "127.0.0.1", port: Int(serverConfig.port)),
  343. config: config,
  344. eventLoopGroup: eventLoopGroup
  345. )
  346. let server = GRPCServer(transport: transport, services: [BenchmarkService()])
  347. let stats = try await ServerStats()
  348. // Hold on to the server and ELG in the state machine.
  349. let action = self.state.withLockedValue {
  350. $0.startedServer(server, stats: stats, eventLoopGroup: eventLoopGroup)
  351. }
  352. switch action {
  353. case .runServer:
  354. return (server, transport)
  355. case .invalidState(let error):
  356. server.beginGracefulShutdown()
  357. try await eventLoopGroup.shutdownGracefully()
  358. throw error
  359. }
  360. }
  361. private func makeServerStatsResponse(
  362. reset: Bool
  363. ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
  364. let currentStats = try await ServerStats()
  365. let initialStats = self.state.withLockedValue { state in
  366. return state.collectServerStats(replaceWith: reset ? currentStats : nil)
  367. }
  368. guard let initialStats = initialStats else {
  369. throw RPCError(
  370. code: .notFound,
  371. message: "There are no initial server stats. A server must be setup before calling 'mark'."
  372. )
  373. }
  374. let differences = currentStats.difference(to: initialStats)
  375. return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
  376. $0.stats = Grpc_Testing_ServerStats.with {
  377. $0.idleCpuTime = differences.idleCPUTime
  378. $0.timeElapsed = differences.time
  379. $0.timeSystem = differences.systemTime
  380. $0.timeUser = differences.userTime
  381. $0.totalCpuTime = differences.totalCPUTime
  382. }
  383. }
  384. }
  385. private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
  386. guard let rpcType = BenchmarkClient.RPCType(config.rpcType) else {
  387. throw RPCError(code: .invalidArgument, message: "Unknown RPC type")
  388. }
  389. // Parse the server targets into resolvable targets.
  390. let ipv4Addresses = try self.parseServerTargets(config.serverTargets)
  391. let target = ResolvableTargets.IPv4(addresses: ipv4Addresses)
  392. var clients = [BenchmarkClient]()
  393. for _ in 0 ..< config.clientChannels {
  394. let client = BenchmarkClient(
  395. client: GRPCClient(
  396. transport: try .http2NIOPosix(
  397. target: target,
  398. config: .defaults(transportSecurity: .plaintext)
  399. )
  400. ),
  401. concurrentRPCs: Int(config.outstandingRpcsPerChannel),
  402. rpcType: rpcType,
  403. messagesPerStream: Int(config.messagesPerStream),
  404. protoParams: config.payloadConfig.simpleParams,
  405. histogramParams: config.histogramParams
  406. )
  407. clients.append(client)
  408. }
  409. let stats = ClientStats()
  410. let histogram = RPCStats.LatencyHistogram(
  411. resolution: config.histogramParams.resolution,
  412. maxBucketStart: config.histogramParams.maxPossible
  413. )
  414. let rpcStats = RPCStats(latencyHistogram: histogram)
  415. let action = self.state.withLockedValue { state in
  416. state.startedClients(clients, stats: stats, rpcStats: rpcStats)
  417. }
  418. switch action {
  419. case .runClients:
  420. return clients
  421. case .invalidState(let error):
  422. for client in clients {
  423. client.shutdown()
  424. }
  425. throw error
  426. }
  427. }
  428. private func parseServerTarget(_ target: String) -> GRPCHTTP2Core.SocketAddress.IPv4? {
  429. guard let index = target.firstIndex(of: ":") else { return nil }
  430. let host = target[..<index]
  431. if let port = Int(target[target.index(after: index)...]) {
  432. return SocketAddress.IPv4(host: String(host), port: port)
  433. } else {
  434. return nil
  435. }
  436. }
  437. private func parseServerTargets(
  438. _ targets: [String]
  439. ) throws -> [GRPCHTTP2Core.SocketAddress.IPv4] {
  440. try targets.map { target in
  441. if let ipv4 = self.parseServerTarget(target) {
  442. return ipv4
  443. } else {
  444. throw RPCError(
  445. code: .invalidArgument,
  446. message: """
  447. Couldn't parse target '\(target)'. Must be in the format '<host>:<port>' for IPv4 \
  448. or '[<host>]:<port>' for IPv6.
  449. """
  450. )
  451. }
  452. }
  453. }
  454. private func makeClientStatsResponse(
  455. reset: Bool
  456. ) async throws -> Grpc_Testing_WorkerService.Method.RunClient.Output {
  457. let currentUsageStats = ClientStats()
  458. let stats = self.state.withLockedValue { state in
  459. state.collectClientStats(replaceWith: reset ? currentUsageStats : nil)
  460. }
  461. guard let (initialUsageStats, rpcStats) = stats else {
  462. throw RPCError(
  463. code: .notFound,
  464. message: "There are no initial client stats. Clients must be setup before calling 'mark'."
  465. )
  466. }
  467. let differences = currentUsageStats.difference(to: initialUsageStats)
  468. let requestResults = rpcStats.requestResultCount.map { (key, value) in
  469. return Grpc_Testing_RequestResultCount.with {
  470. $0.statusCode = Int32(key.rawValue)
  471. $0.count = value
  472. }
  473. }
  474. return Grpc_Testing_WorkerService.Method.RunClient.Output.with {
  475. $0.stats = Grpc_Testing_ClientStats.with {
  476. $0.timeElapsed = differences.time
  477. $0.timeSystem = differences.systemTime
  478. $0.timeUser = differences.userTime
  479. $0.requestResults = requestResults
  480. $0.latencies = Grpc_Testing_HistogramData.with {
  481. $0.bucket = rpcStats.latencyHistogram.buckets
  482. $0.minSeen = rpcStats.latencyHistogram.minSeen
  483. $0.maxSeen = rpcStats.latencyHistogram.maxSeen
  484. $0.sum = rpcStats.latencyHistogram.sum
  485. $0.sumOfSquares = rpcStats.latencyHistogram.sumOfSquares
  486. $0.count = rpcStats.latencyHistogram.countOfValuesSeen
  487. }
  488. }
  489. }
  490. }
  491. }
  492. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  493. extension BenchmarkClient.RPCType {
  494. init?(_ rpcType: Grpc_Testing_RpcType) {
  495. switch rpcType {
  496. case .unary:
  497. self = .unary
  498. case .streaming:
  499. self = .streaming
  500. default:
  501. return nil
  502. }
  503. }
  504. }