Browse Source

'runServer' implementation for Performance Worker Service (#1840)

Motivation:

The run server RPC starts a server and reports stats back to the client. We can’t yet start a real server, because we don’t yet have the http/2 transport, but we can stub it out and do the rest of the RPC which reports stats back to the client.

Modifications:

- implemented the methods that start the server and collects the stats
- added the initial stats property used in computing the requested server stats

Result:

We will be able to start the server from a performance worker.
Stefana-Ioana Dranca 1 year ago
parent
commit
f84657dce1
1 changed files with 115 additions and 8 deletions
  1. 115 8
      Sources/performance-worker/WorkerService.swift

+ 115 - 8
Sources/performance-worker/WorkerService.swift

@@ -32,7 +32,17 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
 
     enum Role {
       case client(GRPCClient)
-      case server(GRPCServer)
+      case server(ServerState)
+    }
+
+    struct ServerState {
+      var server: GRPCServer
+      var stats: ServerStats
+
+      init(server: GRPCServer, stats: ServerStats) {
+        self.server = server
+        self.stats = stats
+      }
     }
 
     init() {}
@@ -41,12 +51,41 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       self.role = role
     }
 
-    init(server: GRPCServer) {
-      self.role = .server(server)
+    var server: GRPCServer? {
+      switch self.role {
+      case let .server(serverState):
+        return serverState.server
+      case .client, .none:
+        return nil
+      }
+    }
+
+    mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
+      switch self.role {
+      case var .server(serverState):
+        let stats = serverState.stats
+        if let newStats = newStats {
+          serverState.stats = newStats
+          self.role = .server(serverState)
+        }
+        return stats
+      case .client, .none:
+        return nil
+      }
     }
 
-    init(client: GRPCClient) {
-      self.role = .client(client)
+    mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
+      let serverState = State.ServerState(server: server, stats: stats)
+      switch self.role {
+      case .server(_):
+        throw RPCError(code: .alreadyExists, message: "A server has already been set up.")
+
+      case .client(_):
+        throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
+
+      case .none:
+        self.role = .server(serverState)
+      }
     }
   }
 
@@ -63,8 +102,8 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       switch role {
       case .client(let client):
         client.close()
-      case .server(let server):
-        server.stopListening()
+      case .server(let serverState):
+        serverState.server.stopListening()
       }
     }
 
@@ -87,7 +126,34 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
   ) async throws
     -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
   {
-    throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
+    return ServerResponse.Stream { writer in
+      try await withThrowingTaskGroup(of: Void.self) { group in
+        for try await message in request.messages {
+          switch message.argtype {
+          case let .some(.setup(serverConfig)):
+            let server = try await self.setupServer(serverConfig)
+            group.addTask { try await server.run() }
+
+          case let .some(.mark(mark)):
+            let response = try await self.makeServerStatsResponse(reset: mark.reset)
+            try await writer.write(response)
+
+          case .none:
+            ()
+          }
+        }
+
+        try await group.next()
+      }
+
+      let server = self.state.withLockedValue { state in
+        defer { state.role = nil }
+        return state.server
+      }
+
+      server?.stopListening()
+      return [:]
+    }
   }
 
   func runClient(
@@ -98,3 +164,44 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
     throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
   }
 }
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension WorkerService {
+  private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
+    let server = GRPCServer(transports: [], services: [BenchmarkService()])
+    let stats = try await ServerStats()
+
+    try self.state.withLockedValue { state in
+      try state.setupServer(server: server, stats: stats)
+    }
+
+    return server
+  }
+
+  private func makeServerStatsResponse(
+    reset: Bool
+  ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
+    let currentStats = try await ServerStats()
+    let initialStats = self.state.withLockedValue { state in
+      return state.serverStats(replaceWith: reset ? currentStats : nil)
+    }
+
+    guard let initialStats = initialStats else {
+      throw RPCError(
+        code: .notFound,
+        message: "There are no initial server stats. A server must be setup before calling 'mark'."
+      )
+    }
+
+    let differences = currentStats.difference(to: initialStats)
+    return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
+      $0.stats = Grpc_Testing_ServerStats.with {
+        $0.idleCpuTime = differences.idleCPUTime
+        $0.timeElapsed = differences.time
+        $0.timeSystem = differences.systemTime
+        $0.timeUser = differences.userTime
+        $0.totalCpuTime = differences.totalCPUTime
+      }
+    }
+  }
+}