Browse Source

Various fixes to the benchmark client and server (#1944)

Motivation:

The benchmark client and server were implemented before we had a working
transport making them impossible to test. As it turns out there were a
few issues.

Modifications:

- BenchmarkClient:
  - Some RPC types have been removed. All other implementations only
    support unary and streaming and we never run scenarios using the
    other types.
  - The implementation of the streaming RPC has been altered to record
    the latency per request/response message pair. This aligns with
    other implementations.
  - The implementation of the streaming RPC has also been changed to
    send the response message sequence into the request writer. This
    yields a fairly substantial performance improvement (~3.5x) over
    the existing implementation.
  - The streaming RPC now respects the messages per stream config being
    zero (meaning no limit).
  - An 'is shutting down' atomic is used to stop the client from
    initiating new RPCs before closing.
- BenchmarkService:
  - No semantic changes; the typealiases have been desugared following
    from changes in fea1b722
- WorkerService:
  - The state machine has been tightened up a bit to more clearly
    separate state from side effects and to avoid leaking the
    implementation of the state machine into the service.
  - Added logic for creating clients and servers with an HTTP/2
    transport.

Result:

Can run perf tests
George Barnett 1 year ago
parent
commit
cd00d5cdcc

+ 2 - 0
Package.swift

@@ -259,6 +259,8 @@ extension Target {
     name: "performance-worker",
     dependencies: [
       .grpcCore,
+      .grpcHTTP2Core,
+      .grpcHTTP2TransportNIOPosix,
       .grpcProtobuf,
       .nioCore,
       .nioFileSystem

+ 130 - 127
Sources/performance-worker/BenchmarkClient.swift

@@ -14,41 +14,68 @@
  * limitations under the License.
  */
 
+import Atomics
 import Foundation
 import GRPCCore
 import NIOConcurrencyHelpers
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 struct BenchmarkClient {
+  private let _isShuttingDown = ManagedAtomic(false)
+
+  /// Whether the benchmark client is shutting down. Used to control when to stop sending messages
+  /// or creating new RPCs.
+  private var isShuttingDown: Bool {
+    self._isShuttingDown.load(ordering: .relaxed)
+  }
+
+  /// The underlying client.
   private var client: GRPCClient
-  private var rpcNumber: Int32
+
+  /// The number of concurrent RPCs to run.
+  private var concurrentRPCs: Int
+
+  /// The type of RPC to make against the server.
   private var rpcType: RPCType
-  private var messagesPerStream: Int32
-  private var protoParams: Grpc_Testing_SimpleProtoParams
+
+  /// The max number of messages to send on a stream before replacing the RPC with a new one. A
+  /// value of zero means there is no limit.
+  private var messagesPerStream: Int
+  private var noMessageLimit: Bool { self.messagesPerStream == 0 }
+
+  /// The message to send for all RPC types to the server.
+  private let message: Grpc_Testing_SimpleRequest
+
+  /// Per RPC stats.
   private let rpcStats: NIOLockedValueBox<RPCStats>
 
   init(
     client: GRPCClient,
-    rpcNumber: Int32,
+    concurrentRPCs: Int,
     rpcType: RPCType,
-    messagesPerStream: Int32,
+    messagesPerStream: Int,
     protoParams: Grpc_Testing_SimpleProtoParams,
     histogramParams: Grpc_Testing_HistogramParams?
   ) {
     self.client = client
-    self.rpcNumber = rpcNumber
+    self.concurrentRPCs = concurrentRPCs
     self.messagesPerStream = messagesPerStream
-    self.protoParams = protoParams
     self.rpcType = rpcType
+    self.message = .with {
+      $0.responseSize = protoParams.respSize
+      $0.payload = Grpc_Testing_Payload.with {
+        $0.body = Data(count: Int(protoParams.reqSize))
+      }
+    }
 
     let histogram: RPCStats.LatencyHistogram
     if let histogramParams = histogramParams {
-      histogram = .init(
+      histogram = RPCStats.LatencyHistogram(
         resolution: histogramParams.resolution,
         maxBucketStart: histogramParams.maxPossible
       )
     } else {
-      histogram = .init()
+      histogram = RPCStats.LatencyHistogram()
     }
 
     self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
@@ -57,9 +84,6 @@ struct BenchmarkClient {
   enum RPCType {
     case unary
     case streaming
-    case streamingFromClient
-    case streamingFromServer
-    case streamingBothWays
   }
 
   internal var currentStats: RPCStats {
@@ -69,33 +93,53 @@ struct BenchmarkClient {
   }
 
   internal func run() async throws {
-    let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: client)
+    let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: self.client)
     return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
       // Start the client.
-      clientGroup.addTask { try await client.run() }
+      clientGroup.addTask {
+        try await self.client.run()
+      }
 
-      // Make the requests to the server and register the latency for each one.
       try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
-        for _ in 0 ..< self.rpcNumber {
+        // Start one task for each concurrent RPC and keep looping in that task until indicated
+        // to stop.
+        for _ in 0 ..< self.concurrentRPCs {
           rpcsGroup.addTask {
-            let (latency, errorCode) = try await self.makeRPC(
-              benchmarkClient: benchmarkClient
-            )
-            self.rpcStats.withLockedValue {
-              $0.latencyHistogram.record(latency)
-              if let errorCode = errorCode {
-                $0.requestResultCount[errorCode, default: 1] += 1
+            while !self.isShuttingDown {
+              switch self.rpcType {
+              case .unary:
+                await self.unary(benchmark: benchmarkClient)
+
+              case .streaming:
+                await self.streaming(benchmark: benchmarkClient)
               }
             }
           }
         }
+
         try await rpcsGroup.waitForAll()
       }
 
+      self.client.close()
       try await clientGroup.next()
     }
   }
 
+  private func record(latencyNanos: Double, errorCode: RPCError.Code?) {
+    self.rpcStats.withLockedValue { stats in
+      stats.latencyHistogram.record(latencyNanos)
+      if let errorCode = errorCode {
+        stats.requestResultCount[errorCode, default: 0] += 1
+      }
+    }
+  }
+
+  private func record(errorCode: RPCError.Code) {
+    self.rpcStats.withLockedValue { stats in
+      stats.requestResultCount[errorCode, default: 0] += 1
+    }
+  }
+
   private func timeIt<R>(
     _ body: () async throws -> R
   ) async rethrows -> (R, nanoseconds: Double) {
@@ -105,133 +149,92 @@ struct BenchmarkClient {
     return (result, nanoseconds: Double(endTime - startTime))
   }
 
-  // The result is the number of nanoseconds for processing the RPC.
-  private func makeRPC(
-    benchmarkClient: Grpc_Testing_BenchmarkServiceClient
-  ) async throws -> (latency: Double, errorCode: RPCError.Code?) {
-    let message = Grpc_Testing_SimpleRequest.with {
-      $0.responseSize = self.protoParams.respSize
-      $0.payload = Grpc_Testing_Payload.with {
-        $0.body = Data(count: Int(self.protoParams.reqSize))
+  private func unary(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
+    let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+      do {
+        try await benchmark.unaryCall(request: ClientRequest.Single(message: self.message)) {
+          _ = try $0.message
+        }
+        return nil
+      } catch let error as RPCError {
+        return error.code
+      } catch {
+        return .unknown
       }
     }
 
-    switch self.rpcType {
-    case .unary:
-      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
-        do {
-          try await benchmarkClient.unaryCall(
-            request: ClientRequest.Single(message: message)
-          ) { response in
-            _ = try response.message
-          }
-          return nil
-        } catch let error as RPCError {
-          return error.code
-        } catch {
-          return .unknown
-        }
-      }
-      return (latency: nanoseconds, errorCode)
+    self.record(latencyNanos: nanoseconds, errorCode: errorCode)
+  }
 
-    // Repeated sequence of one request followed by one response.
-    // It is a ping-pong of messages between the client and the server.
-    case .streaming:
-      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
-        do {
-          let ids = AsyncStream.makeStream(of: Int.self)
-          let streamingRequest = ClientRequest.Stream { writer in
-            for try await id in ids.stream {
-              if id <= self.messagesPerStream {
-                try await writer.write(message)
-              } else {
-                return
-              }
-            }
-          }
+  private func streaming(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
+    // Streaming RPCs ping-pong messages back and forth. To achieve this the response message
+    // stream is sent to the request closure, and the request closure indicates the outcome back
+    // to the response handler to keep the RPC alive for the appropriate amount of time.
+    let status = AsyncStream.makeStream(of: RPCError.self)
+    let response = AsyncStream.makeStream(of: RPCAsyncSequence<Grpc_Testing_SimpleResponse>.self)
 
-          ids.continuation.yield(1)
+    let request = ClientRequest.Stream(of: Grpc_Testing_SimpleRequest.self) { writer in
+      defer { status.continuation.finish() }
 
-          try await benchmarkClient.streamingCall(request: streamingRequest) { response in
-            var id = 1
-            for try await _ in response.messages {
-              id += 1
-              ids.continuation.yield(id)
-            }
-          }
-          return nil
-        } catch let error as RPCError {
-          return error.code
-        } catch {
-          return .unknown
-        }
+      // The time at which the last message was sent.
+      var lastMessageSendTime = DispatchTime.now()
+      try await writer.write(self.message)
+
+      // Wait for the response stream.
+      var iterator = response.stream.makeAsyncIterator()
+      guard let responses = await iterator.next() else {
+        throw RPCError(code: .internalError, message: "")
       }
-      return (latency: nanoseconds, errorCode)
 
-    case .streamingFromClient:
-      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
-        do {
-          let streamingRequest = ClientRequest.Stream { writer in
-            for _ in 1 ... self.messagesPerStream {
-              try await writer.write(message)
-            }
-          }
+      // Record the first latency.
+      let now = DispatchTime.now()
+      let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
+      lastMessageSendTime = now
+      self.record(latencyNanos: Double(nanos), errorCode: nil)
 
-          try await benchmarkClient.streamingFromClient(
-            request: streamingRequest
-          ) { response in
-            _ = try response.message
-          }
-          return nil
-        } catch let error as RPCError {
-          return error.code
-        } catch {
-          return .unknown
-        }
-      }
-      return (latency: nanoseconds, errorCode)
+      // Now start looping. Only stop when the max messages per stream is hit or told to stop.
+      var responseIterator = responses.makeAsyncIterator()
+      var messagesSent = 1
 
-    case .streamingFromServer:
-      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
+      while !self.isShuttingDown && (self.noMessageLimit || messagesSent < self.messagesPerStream) {
+        messagesSent += 1
         do {
-          try await benchmarkClient.streamingFromServer(
-            request: ClientRequest.Single(message: message)
-          ) { response in
-            for try await _ in response.messages {}
+          if try await responseIterator.next() != nil {
+            let now = DispatchTime.now()
+            let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
+            lastMessageSendTime = now
+            self.record(latencyNanos: Double(nanos), errorCode: nil)
+            try await writer.write(message)
+          } else {
+            break
           }
-          return nil
         } catch let error as RPCError {
-          return error.code
+          status.continuation.yield(error)
+          break
         } catch {
-          return .unknown
+          status.continuation.yield(RPCError(code: .unknown, message: ""))
+          break
         }
       }
-      return (latency: nanoseconds, errorCode)
-
-    case .streamingBothWays:
-      let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
-        do {
-          let streamingRequest = ClientRequest.Stream { writer in
-            for _ in 1 ... self.messagesPerStream {
-              try await writer.write(message)
-            }
-          }
+    }
 
-          try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in
-            for try await _ in response.messages {}
-          }
-          return nil
-        } catch let error as RPCError {
-          return error.code
-        } catch {
-          return .unknown
+    do {
+      try await benchmark.streamingCall(request: request) {
+        response.continuation.yield($0.messages)
+        response.continuation.finish()
+        for await errorCode in status.stream {
+          throw errorCode
         }
       }
-      return (latency: nanoseconds, errorCode)
+    } catch let error as RPCError {
+      self.record(errorCode: error.code)
+    } catch {
+      self.record(errorCode: .unknown)
     }
   }
 
   internal func shutdown() {
+    self._isShuttingDown.store(true, ordering: .relaxed)
     self.client.close()
   }
 }

+ 30 - 36
Sources/performance-worker/BenchmarkService.swift

@@ -27,10 +27,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
   /// One request followed by one response.
   /// The server returns a client payload with the size requested by the client.
   func unaryCall(
-    request: GRPCCore.ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Input>
-  ) async throws
-    -> GRPCCore.ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Output>
-  {
+    request: ServerRequest.Single<Grpc_Testing_SimpleRequest>
+  ) async throws -> ServerResponse.Single<Grpc_Testing_SimpleResponse> {
     // Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
     // if the request is successful.
     if request.message.responseStatus.isInitialized {
@@ -38,7 +36,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
     }
 
     return ServerResponse.Single(
-      message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with {
+      message: .with {
         $0.payload = Grpc_Testing_Payload.with {
           $0.body = Data(count: Int(request.message.responseSize))
         }
@@ -49,23 +47,23 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
   /// Repeated sequence of one request followed by one response.
   /// The server returns a payload with the size requested by the client for each received message.
   func streamingCall(
-    request: GRPCCore.ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Input>
-  ) async throws
-    -> GRPCCore.ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Output>
-  {
+    request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>
+  ) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
     return ServerResponse.Stream { writer in
       for try await message in request.messages {
         if message.responseStatus.isInitialized {
           try self.checkOkStatus(message.responseStatus)
         }
-        try await writer.write(
-          Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
-            $0.payload = Grpc_Testing_Payload.with {
-              $0.body = Data(count: Int(message.responseSize))
-            }
+
+        let responseMessage = Grpc_Testing_SimpleResponse.with {
+          $0.payload = Grpc_Testing_Payload.with {
+            $0.body = Data(count: Int(message.responseSize))
           }
-        )
+        }
+
+        try await writer.write(responseMessage)
       }
+
       return [:]
     }
   }
@@ -73,10 +71,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
   /// Single-sided unbounded streaming from client to server.
   /// The server returns a payload with the size requested by the client once the client does WritesDone.
   func streamingFromClient(
-    request: ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Input>
-  ) async throws
-    -> ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output>
-  {
+    request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>
+  ) async throws -> ServerResponse.Single<Grpc_Testing_SimpleResponse> {
     var responseSize = 0
     for try await message in request.messages {
       if message.responseStatus.isInitialized {
@@ -86,8 +82,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
     }
 
     return ServerResponse.Single(
-      message: Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output.with {
-        $0.payload = Grpc_Testing_Payload.with {
+      message: .with {
+        $0.payload = .with {
           $0.body = Data(count: responseSize)
         }
       }
@@ -97,20 +93,20 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
   /// Single-sided unbounded streaming from server to client.
   /// The server repeatedly returns a payload with the size requested by the client.
   func streamingFromServer(
-    request: ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Input>
-  ) async throws
-    -> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Output>
-  {
+    request: ServerRequest.Single<Grpc_Testing_SimpleRequest>
+  ) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
     if request.message.responseStatus.isInitialized {
       try self.checkOkStatus(request.message.responseStatus)
     }
-    let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
-      $0.payload = Grpc_Testing_Payload.with {
+
+    let response = Grpc_Testing_SimpleResponse.with {
+      $0.payload = .with {
         $0.body = Data(count: Int(request.message.responseSize))
       }
     }
+
     return ServerResponse.Stream { writer in
-      while working.load(ordering: .relaxed) {
+      while self.working.load(ordering: .relaxed) {
         try await writer.write(response)
       }
       return [:]
@@ -120,17 +116,13 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
   /// Two-sided unbounded streaming between server to client.
   /// Both sides send the content of their own choice to the other.
   func streamingBothWays(
-    request: GRPCCore.ServerRequest.Stream<
-      Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input
-    >
-  ) async throws
-    -> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output>
-  {
+    request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>
+  ) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
     // The 100 size is used by the other implementations as well.
     // We are using the same canned response size for all responses
     // as it is allowed by the spec.
-    let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
-      $0.payload = Grpc_Testing_Payload.with {
+    let response = Grpc_Testing_SimpleResponse.with {
+      $0.payload = .with {
         $0.body = Data(count: 100)
       }
     }
@@ -148,6 +140,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
           }
           inboundStreaming.store(false, ordering: .relaxed)
         }
+
         group.addTask {
           while inboundStreaming.load(ordering: .relaxed)
             && self.working.load(ordering: .acquiring)
@@ -155,6 +148,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
             try await writer.write(response)
           }
         }
+
         try await group.next()
         group.cancelAll()
         return [:]

+ 1 - 3
Sources/performance-worker/RPCStats.swift

@@ -132,9 +132,7 @@ struct RPCStats {
 
   @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
   mutating func merge(_ other: RPCStats) throws {
-    try self.latencyHistogram.merge(
-      other.latencyHistogram
-    )
+    try self.latencyHistogram.merge(other.latencyHistogram)
     self.requestResultCount.merge(other.requestResultCount) { (current, new) in
       current + new
     }

+ 327 - 183
Sources/performance-worker/WorkerService.swift

@@ -15,92 +15,46 @@
  */
 
 import GRPCCore
+import GRPCHTTP2Core
+import GRPCHTTP2TransportNIOPosix
 import NIOConcurrencyHelpers
 import NIOCore
+import NIOPosix
 
 @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
-final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable {
+final class WorkerService: Sendable {
   private let state: NIOLockedValueBox<State>
 
   init() {
-    let clientAndServer = State()
-    self.state = NIOLockedValueBox(clientAndServer)
+    self.state = NIOLockedValueBox(State())
   }
 
   private struct State {
-    var role: Role?
+    private var role: Role
 
     enum Role {
-      case client(ClientState)
-      case server(ServerState)
+      case none
+      case client(Client)
+      case server(Server)
     }
 
-    struct ServerState {
+    struct Server {
       var server: GRPCServer
       var stats: ServerStats
-
-      init(server: GRPCServer, stats: ServerStats) {
-        self.server = server
-        self.stats = stats
-      }
+      var eventLoopGroup: MultiThreadedEventLoopGroup
     }
 
-    struct ClientState {
+    struct Client {
       var clients: [BenchmarkClient]
       var stats: ClientStats
       var rpcStats: RPCStats
-
-      init(
-        clients: [BenchmarkClient],
-        stats: ClientStats,
-        rpcStats: RPCStats
-      ) {
-        self.clients = clients
-        self.stats = stats
-        self.rpcStats = rpcStats
-      }
-
-      func shutdownClients() throws {
-        for benchmarkClient in self.clients {
-          benchmarkClient.shutdown()
-        }
-      }
-    }
-
-    init() {}
-
-    init(role: Role) {
-      self.role = role
-    }
-
-    var server: GRPCServer? {
-      switch self.role {
-      case let .server(serverState):
-        return serverState.server
-      case .client, .none:
-        return nil
-      }
-    }
-
-    var clients: [BenchmarkClient]? {
-      switch self.role {
-      case let .client(clientState):
-        return clientState.clients
-      case .server, .none:
-        return nil
-      }
     }
 
-    var clientRPCStats: RPCStats? {
-      switch self.role {
-      case let .client(clientState):
-        return clientState.rpcStats
-      case .server, .none:
-        return nil
-      }
+    init() {
+      self.role = .none
     }
 
-    mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
+    mutating func collectServerStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
       switch self.role {
       case var .server(serverState):
         let stats = serverState.stats
@@ -114,98 +68,186 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
       }
     }
 
-    mutating func clientStats(replaceWith newStats: ClientStats? = nil) -> ClientStats? {
+    mutating func collectClientStats(
+      replaceWith newStats: ClientStats? = nil
+    ) -> (ClientStats, RPCStats)? {
       switch self.role {
-      case var .client(clientState):
-        let stats = clientState.stats
+      case var .client(state):
+        // Grab the existing stats and update if necessary.
+        let stats = state.stats
         if let newStats = newStats {
-          clientState.stats = newStats
-          self.role = .client(clientState)
+          state.stats = newStats
         }
-        return stats
+
+        // Merge in RPC stats from each client.
+        for client in state.clients {
+          try? state.rpcStats.merge(client.currentStats)
+        }
+
+        self.role = .client(state)
+        return (stats, state.rpcStats)
+
       case .server, .none:
         return nil
       }
     }
 
-    mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
-      let serverState = State.ServerState(server: server, stats: stats)
-      switch self.role {
-      case .server(_):
-        throw RPCError(code: .alreadyExists, message: "A server has already been set up.")
+    enum OnStartedServer {
+      case runServer
+      case invalidState(RPCError)
+    }
 
-      case .client(_):
-        throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
+    mutating func startedServer(
+      _ server: GRPCServer,
+      stats: ServerStats,
+      eventLoopGroup: MultiThreadedEventLoopGroup
+    ) -> OnStartedServer {
+      let action: OnStartedServer
 
+      switch self.role {
       case .none:
-        self.role = .server(serverState)
+        let state = State.Server(server: server, stats: stats, eventLoopGroup: eventLoopGroup)
+        self.role = .server(state)
+        action = .runServer
+      case .server:
+        let error = RPCError(code: .alreadyExists, message: "A server has already been set up.")
+        action = .invalidState(error)
+      case .client:
+        let error = RPCError(code: .failedPrecondition, message: "This worker has a client setup.")
+        action = .invalidState(error)
       }
+
+      return action
     }
 
-    mutating func setupClients(
-      benchmarkClients: [BenchmarkClient],
+    enum OnStartedClients {
+      case runClients
+      case invalidState(RPCError)
+    }
+
+    mutating func startedClients(
+      _ clients: [BenchmarkClient],
       stats: ClientStats,
       rpcStats: RPCStats
-    ) throws {
-      let clientState = State.ClientState(
-        clients: benchmarkClients,
-        stats: stats,
-        rpcStats: rpcStats
-      )
+    ) -> OnStartedClients {
+      let action: OnStartedClients
+
       switch self.role {
-      case .server(_):
-        throw RPCError(code: .alreadyExists, message: "This worker has a server setup.")
+      case .none:
+        let state = State.Client(clients: clients, stats: stats, rpcStats: rpcStats)
+        self.role = .client(state)
+        action = .runClients
+      case .server:
+        let error = RPCError(code: .alreadyExists, message: "This worker has a server setup.")
+        action = .invalidState(error)
+      case .client:
+        let error = RPCError(
+          code: .failedPrecondition,
+          message: "Clients have already been set up."
+        )
+        action = .invalidState(error)
+      }
 
-      case .client(_):
-        throw RPCError(code: .failedPrecondition, message: "Clients have already been set up.")
+      return action
+    }
+
+    enum OnServerShutDown {
+      case shutdown(MultiThreadedEventLoopGroup)
+      case nothing
+    }
 
+    mutating func serverShutdown() -> OnServerShutDown {
+      switch self.role {
+      case .client:
+        preconditionFailure("Invalid state")
+      case .server(let state):
+        self.role = .none
+        return .shutdown(state.eventLoopGroup)
       case .none:
-        self.role = .client(clientState)
+        return .nothing
       }
     }
 
-    mutating func updateRPCStats() throws {
+    enum OnStopListening {
+      case stopListening(GRPCServer)
+      case nothing
+    }
+
+    func stopListening() -> OnStopListening {
       switch self.role {
-      case var .client(clientState):
-        let benchmarkClients = clientState.clients
-        var rpcStats = clientState.rpcStats
-        for benchmarkClient in benchmarkClients {
-          try rpcStats.merge(benchmarkClient.currentStats)
-        }
+      case .client:
+        preconditionFailure("Invalid state")
+      case .server(let state):
+        return .stopListening(state.server)
+      case .none:
+        return .nothing
+      }
+    }
 
-        clientState.rpcStats = rpcStats
-        self.role = .client(clientState)
+    enum OnCloseClient {
+      case close([BenchmarkClient])
+      case nothing
+    }
 
-      case .server, .none:
-        ()
+    mutating func closeClients() -> OnCloseClient {
+      switch self.role {
+      case .client(let state):
+        self.role = .none
+        return .close(state.clients)
+      case .server:
+        preconditionFailure("Invalid state")
+      case .none:
+        return .nothing
+      }
+    }
+
+    enum OnQuitWorker {
+      case shutDownServer(GRPCServer)
+      case shutDownClients([BenchmarkClient])
+      case nothing
+    }
+
+    mutating func quit() -> OnQuitWorker {
+      switch self.role {
+      case .none:
+        return .nothing
+      case .client(let state):
+        self.role = .none
+        return .shutDownClients(state.clients)
+      case .server(let state):
+        self.role = .none
+        return .shutDownServer(state.server)
       }
     }
   }
+}
 
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension WorkerService: Grpc_Testing_WorkerService.ServiceProtocol {
   func quitWorker(
-    request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Input>
-  ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.QuitWorker.Output> {
+    request: ServerRequest.Single<Grpc_Testing_Void>
+  ) async throws -> ServerResponse.Single<Grpc_Testing_Void> {
+    let onQuit = self.state.withLockedValue { $0.quit() }
 
-    let role = self.state.withLockedValue { state in
-      defer { state.role = nil }
-      return state.role
-    }
+    switch onQuit {
+    case .nothing:
+      ()
 
-    if let role = role {
-      switch role {
-      case .client(let clientState):
-        try clientState.shutdownClients()
-      case .server(let serverState):
-        serverState.server.stopListening()
+    case .shutDownClients(let clients):
+      for client in clients {
+        client.shutdown()
       }
+
+    case .shutDownServer(let server):
+      server.stopListening()
     }
 
-    return ServerResponse.Single(message: Grpc_Testing_WorkerService.Method.QuitWorker.Output())
+    return ServerResponse.Single(message: Grpc_Testing_Void())
   }
 
   func coreCount(
-    request: ServerRequest.Single<Grpc_Testing_WorkerService.Method.CoreCount.Input>
-  ) async throws -> ServerResponse.Single<Grpc_Testing_WorkerService.Method.CoreCount.Output> {
+    request: ServerRequest.Single<Grpc_Testing_CoreRequest>
+  ) async throws -> ServerResponse.Single<Grpc_Testing_CoreResponse> {
     let coreCount = System.coreCount
     return ServerResponse.Single(
       message: Grpc_Testing_WorkerService.Method.CoreCount.Output.with {
@@ -215,17 +257,52 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
   }
 
   func runServer(
-    request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunServer.Input>
-  ) async throws
-    -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
-  {
+    request: ServerRequest.Stream<Grpc_Testing_ServerArgs>
+  ) async throws -> ServerResponse.Stream<Grpc_Testing_ServerStatus> {
     return ServerResponse.Stream { writer in
       try await withThrowingTaskGroup(of: Void.self) { group in
         for try await message in request.messages {
           switch message.argtype {
           case let .some(.setup(serverConfig)):
-            let server = try await self.setupServer(serverConfig)
-            group.addTask { try await server.run() }
+            let (server, transport) = try await self.startServer(serverConfig)
+            group.addTask {
+              let result: Result<Void, Error>
+
+              do {
+                try await server.run()
+                result = .success(())
+              } catch {
+                result = .failure(error)
+              }
+
+              switch self.state.withLockedValue({ $0.serverShutdown() }) {
+              case .shutdown(let eventLoopGroup):
+                try await eventLoopGroup.shutdownGracefully()
+              case .nothing:
+                ()
+              }
+
+              try result.get()
+            }
+
+            // Wait for the server to bind.
+            let address = try await transport.listeningAddress
+
+            let port: Int
+            if let ipv4 = address.ipv4 {
+              port = ipv4.port
+            } else if let ipv6 = address.ipv6 {
+              port = ipv6.port
+            } else {
+              throw RPCError(
+                code: .internalError,
+                message: "Server listening on unsupported address '\(address)'"
+              )
+            }
+
+            // Tell the client what port the server is listening on.
+            let message = Grpc_Testing_ServerStatus.with { $0.port = Int32(port) }
+            try await writer.write(message)
 
           case let .some(.mark(mark)):
             let response = try await self.makeServerStatsResponse(reset: mark.reset)
@@ -236,24 +313,23 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
           }
         }
 
-        try await group.next()
-      }
-
-      let server = self.state.withLockedValue { state in
-        defer { state.role = nil }
-        return state.server
+        // Request stream ended, tell the server to stop listening. Once it's finished it will
+        // shutdown its ELG.
+        switch self.state.withLockedValue({ $0.stopListening() }) {
+        case .stopListening(let server):
+          server.stopListening()
+        case .nothing:
+          ()
+        }
       }
 
-      server?.stopListening()
       return [:]
     }
   }
 
   func runClient(
-    request: GRPCCore.ServerRequest.Stream<Grpc_Testing_WorkerService.Method.RunClient.Input>
-  ) async throws
-    -> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunClient.Output>
-  {
+    request: ServerRequest.Stream<Grpc_Testing_ClientArgs>
+  ) async throws -> ServerResponse.Stream<Grpc_Testing_ClientStatus> {
     return ServerResponse.Stream { writer in
       try await withThrowingTaskGroup(of: Void.self) { group in
         for try await message in request.messages {
@@ -268,6 +344,9 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
               }
             }
 
+            let message = try await self.makeClientStatsResponse(reset: false)
+            try await writer.write(message)
+
           case let .mark(mark):
             let response = try await self.makeClientStatsResponse(reset: mark.reset)
             try await writer.write(response)
@@ -276,6 +355,16 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
             ()
           }
         }
+
+        switch self.state.withLockedValue({ $0.closeClients() }) {
+        case .close(let clients):
+          for client in clients {
+            client.shutdown()
+          }
+        case .nothing:
+          ()
+        }
+
         try await group.waitForAll()
 
         return [:]
@@ -286,15 +375,44 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
 
 @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
 extension WorkerService {
-  private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
-    let server = GRPCServer(transport: NoOpServerTransport(), services: [BenchmarkService()])
+  private func startServer(
+    _ serverConfig: Grpc_Testing_ServerConfig
+  ) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) {
+    // Prepare an ELG, the test might require more than the default of one.
+    let numberOfThreads: Int
+    if serverConfig.asyncServerThreads > 0 {
+      numberOfThreads = Int(serverConfig.asyncServerThreads)
+    } else {
+      numberOfThreads = System.coreCount
+    }
+    let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: numberOfThreads)
+
+    // Don't restrict the max payload size, the client is always trusted.
+    var config = HTTP2ServerTransport.Posix.Config.defaults
+    config.rpc.maxRequestPayloadSize = .max
+
+    let transport = HTTP2ServerTransport.Posix(
+      address: .ipv4(host: "127.0.0.1", port: Int(serverConfig.port)),
+      config: config,
+      eventLoopGroup: eventLoopGroup
+    )
+
+    let server = GRPCServer(transport: transport, services: [BenchmarkService()])
     let stats = try await ServerStats()
 
-    try self.state.withLockedValue { state in
-      try state.setupServer(server: server, stats: stats)
+    // Hold on to the server and ELG in the state machine.
+    let action = self.state.withLockedValue {
+      $0.startedServer(server, stats: stats, eventLoopGroup: eventLoopGroup)
     }
 
-    return server
+    switch action {
+    case .runServer:
+      return (server, transport)
+    case .invalidState(let error):
+      server.stopListening()
+      try await eventLoopGroup.shutdownGracefully()
+      throw error
+    }
   }
 
   private func makeServerStatsResponse(
@@ -302,7 +420,7 @@ extension WorkerService {
   ) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
     let currentStats = try await ServerStats()
     let initialStats = self.state.withLockedValue { state in
-      return state.serverStats(replaceWith: reset ? currentStats : nil)
+      return state.collectServerStats(replaceWith: reset ? currentStats : nil)
     }
 
     guard let initialStats = initialStats else {
@@ -325,69 +443,90 @@ extension WorkerService {
   }
 
   private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
-    let rpcType: BenchmarkClient.RPCType
-    switch config.rpcType {
-    case .unary:
-      rpcType = .unary
-    case .streaming:
-      rpcType = .streaming
-    case .streamingFromClient:
-      rpcType = .streamingFromClient
-    case .streamingFromServer:
-      rpcType = .streamingFromServer
-    case .streamingBothWays:
-      rpcType = .streamingBothWays
-    case .UNRECOGNIZED:
-      throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.")
+    guard let rpcType = BenchmarkClient.RPCType(config.rpcType) else {
+      throw RPCError(code: .invalidArgument, message: "Unknown RPC type")
     }
 
+    // Parse the server targets into resolvable targets.
+    let ipv4Addresses = try self.parseServerTargets(config.serverTargets)
+    let target = ResolvableTargets.IPv4(addresses: ipv4Addresses)
+
     var clients = [BenchmarkClient]()
     for _ in 0 ..< config.clientChannels {
-      let grpcClient = self.makeGRPCClient()
-      clients.append(
-        BenchmarkClient(
-          client: grpcClient,
-          rpcNumber: config.outstandingRpcsPerChannel,
-          rpcType: rpcType,
-          messagesPerStream: config.messagesPerStream,
-          protoParams: config.payloadConfig.simpleParams,
-          histogramParams: config.histogramParams
-        )
+      let transport = try HTTP2ClientTransport.Posix(target: target)
+      let client = BenchmarkClient(
+        client: GRPCClient(transport: transport),
+        concurrentRPCs: Int(config.outstandingRpcsPerChannel),
+        rpcType: rpcType,
+        messagesPerStream: Int(config.messagesPerStream),
+        protoParams: config.payloadConfig.simpleParams,
+        histogramParams: config.histogramParams
       )
+
+      clients.append(client)
     }
+
     let stats = ClientStats()
     let histogram = RPCStats.LatencyHistogram(
       resolution: config.histogramParams.resolution,
       maxBucketStart: config.histogramParams.maxPossible
     )
+    let rpcStats = RPCStats(latencyHistogram: histogram)
 
-    try self.state.withLockedValue { state in
-      try state.setupClients(
-        benchmarkClients: clients,
-        stats: stats,
-        rpcStats: RPCStats(latencyHistogram: histogram)
-      )
+    let action = self.state.withLockedValue { state in
+      state.startedClients(clients, stats: stats, rpcStats: rpcStats)
+    }
+
+    switch action {
+    case .runClients:
+      return clients
+    case .invalidState(let error):
+      for client in clients {
+        client.shutdown()
+      }
+      throw error
     }
+  }
 
-    return clients
+  private func parseServerTarget(_ target: String) -> GRPCHTTP2Core.SocketAddress.IPv4? {
+    guard let index = target.firstIndex(of: ":") else { return nil }
+
+    let host = target[..<index]
+    if let port = Int(target[target.index(after: index)...]) {
+      return SocketAddress.IPv4(host: String(host), port: port)
+    } else {
+      return nil
+    }
   }
 
-  func makeGRPCClient() -> GRPCClient {
-    fatalError()
+  private func parseServerTargets(
+    _ targets: [String]
+  ) throws -> [GRPCHTTP2Core.SocketAddress.IPv4] {
+    try targets.map { target in
+      if let ipv4 = self.parseServerTarget(target) {
+        return ipv4
+      } else {
+        throw RPCError(
+          code: .invalidArgument,
+          message: """
+            Couldn't parse target '\(target)'. Must be in the format '<host>:<port>' for IPv4 \
+            or '[<host>]:<port>' for IPv6.
+            """
+        )
+      }
+    }
   }
 
   private func makeClientStatsResponse(
     reset: Bool
   ) async throws -> Grpc_Testing_WorkerService.Method.RunClient.Output {
     let currentUsageStats = ClientStats()
-    let (initialUsageStats, rpcStats) = try self.state.withLockedValue { state in
-      let initialUsageStats = state.clientStats(replaceWith: reset ? currentUsageStats : nil)
-      try state.updateRPCStats()
-      let rpcStats = state.clientRPCStats
-      return (initialUsageStats, rpcStats)
+
+    let stats = self.state.withLockedValue { state in
+      state.collectClientStats(replaceWith: reset ? currentUsageStats : nil)
     }
 
-    guard let initialUsageStats = initialUsageStats, let rpcStats = rpcStats else {
+    guard let (initialUsageStats, rpcStats) = stats else {
       throw RPCError(
         code: .notFound,
         message: "There are no initial client stats. Clients must be setup before calling 'mark'."
@@ -422,11 +561,16 @@ extension WorkerService {
   }
 }
 
-@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
-struct NoOpServerTransport: ServerTransport {
-  func listen(
-    _ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
-  ) async throws {}
-
-  func stopListening() {}
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension BenchmarkClient.RPCType {
+  init?(_ rpcType: Grpc_Testing_RpcType) {
+    switch rpcType {
+    case .unary:
+      self = .unary
+    case .streaming:
+      self = .streaming
+    default:
+      return nil
+    }
+  }
 }