Преглед на файлове

Add async version of QPS Benchmark Service (#1470)

Motivation:

We want to provide an async/await version of the QPS Benchmark service, and be able to choose whether we want to use that or the ELF version when running the tests.

Modifications:

Added support for an async/await implementation of the Benchmark Service. This is currently not in use, but a follow-up CR will include changes to the Worker Service and other classes, and will enable choosing between the ELF and async/await implementations when running.

Result:

An async/await version of the Benchmark Service will be ready for use.
Gustavo Cairo преди 3 години
родител
ревизия
7e6d8fac42

+ 2 - 1
Performance/QPSBenchmark/Package.swift

@@ -1,4 +1,4 @@
-// swift-tools-version:5.4
+// swift-tools-version:5.6
 /*
  * Copyright 2020, gRPC Authors All rights reserved.
  *
@@ -19,6 +19,7 @@ import PackageDescription
 
 let package = Package(
   name: "QPSBenchmark",
+  platforms: [.macOS(.v12)],
   products: [
     .executable(name: "QPSBenchmark", targets: ["QPSBenchmark"]),
   ],

+ 3 - 3
Performance/QPSBenchmark/Sources/QPSBenchmark/Model/benchmark_service.grpc.swift

@@ -350,7 +350,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
   public func streamingCall<RequestStream>(
     _ requests: RequestStream,
     callOptions: CallOptions? = nil
-  ) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
+  ) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
     return self.performAsyncBidirectionalStreamingCall(
       path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingCall.path,
       requests: requests,
@@ -374,7 +374,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
   public func streamingFromClient<RequestStream>(
     _ requests: RequestStream,
     callOptions: CallOptions? = nil
-  ) async throws -> Grpc_Testing_SimpleResponse where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
+  ) async throws -> Grpc_Testing_SimpleResponse where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
     return try await self.performAsyncClientStreamingCall(
       path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingFromClient.path,
       requests: requests,
@@ -410,7 +410,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
   public func streamingBothWays<RequestStream>(
     _ requests: RequestStream,
     callOptions: CallOptions? = nil
-  ) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
+  ) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
     return self.performAsyncBidirectionalStreamingCall(
       path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingBothWays.path,
       requests: requests,

+ 2 - 2
Performance/QPSBenchmark/Sources/QPSBenchmark/Model/worker_service.grpc.swift

@@ -302,7 +302,7 @@ extension Grpc_Testing_WorkerServiceAsyncClientProtocol {
   public func runServer<RequestStream>(
     _ requests: RequestStream,
     callOptions: CallOptions? = nil
-  ) -> GRPCAsyncResponseStream<Grpc_Testing_ServerStatus> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_ServerArgs {
+  ) -> GRPCAsyncResponseStream<Grpc_Testing_ServerStatus> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_ServerArgs {
     return self.performAsyncBidirectionalStreamingCall(
       path: Grpc_Testing_WorkerServiceClientMetadata.Methods.runServer.path,
       requests: requests,
@@ -326,7 +326,7 @@ extension Grpc_Testing_WorkerServiceAsyncClientProtocol {
   public func runClient<RequestStream>(
     _ requests: RequestStream,
     callOptions: CallOptions? = nil
-  ) -> GRPCAsyncResponseStream<Grpc_Testing_ClientStatus> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_ClientArgs {
+  ) -> GRPCAsyncResponseStream<Grpc_Testing_ClientStatus> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_ClientArgs {
     return self.performAsyncBidirectionalStreamingCall(
       path: Grpc_Testing_WorkerServiceClientMetadata.Methods.runClient.path,
       requests: requests,

+ 121 - 0
Performance/QPSBenchmark/Sources/QPSBenchmark/Runtime/Async/AsyncBenchmarkServiceImpl.swift

@@ -0,0 +1,121 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Foundation
+import GRPC
+import NIOCore
+
+/// Implementation of asynchronous service for benchmarking.
+final class AsyncBenchmarkServiceImpl: Grpc_Testing_BenchmarkServiceAsyncProvider {
+  let interceptors: Grpc_Testing_BenchmarkServiceServerInterceptorFactoryProtocol? = nil
+
+  /// One request followed by one response.
+  /// The server returns the client payload as-is.
+  func unaryCall(
+    request: Grpc_Testing_SimpleRequest,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Grpc_Testing_SimpleResponse {
+    return try AsyncBenchmarkServiceImpl.processSimpleRPC(request: request)
+  }
+
+  /// Repeated sequence of one request followed by one response.
+  /// Should be called streaming ping-pong
+  /// The server returns the client payload as-is on each response
+  func streamingCall(
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
+    responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    for try await request in requestStream {
+      let response = try AsyncBenchmarkServiceImpl.processSimpleRPC(request: request)
+      try await responseStream.send(response)
+    }
+  }
+
+  /// Single-sided unbounded streaming from client to server
+  /// The server returns the client payload as-is once the client does WritesDone
+  func streamingFromClient(
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Grpc_Testing_SimpleResponse {
+    context.request.logger.warning("streamingFromClient not implemented yet")
+    throw GRPCStatus(
+      code: .unimplemented,
+      message: "Not implemented"
+    )
+  }
+
+  /// Single-sided unbounded streaming from server to client
+  /// The server repeatedly returns the client payload as-is
+  func streamingFromServer(
+    request: Grpc_Testing_SimpleRequest,
+    responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    context.request.logger.warning("streamingFromServer not implemented yet")
+    throw GRPCStatus(
+      code: GRPCStatus.Code.unimplemented,
+      message: "Not implemented"
+    )
+  }
+
+  /// Two-sided unbounded streaming between server to client
+  /// Both sides send the content of their own choice to the other
+  func streamingBothWays(
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
+    responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    context.request.logger.warning("streamingBothWays not implemented yet")
+    throw GRPCStatus(
+      code: GRPCStatus.Code.unimplemented,
+      message: "Not implemented"
+    )
+  }
+
+  /// Make a payload for sending back to the client.
+  private static func makePayload(
+    type: Grpc_Testing_PayloadType,
+    size: Int
+  ) throws -> Grpc_Testing_Payload {
+    if type != .compressable {
+      // Making a payload which is not compressable is hard - and not implemented in
+      // other implementations too.
+      throw GRPCStatus(code: .internalError, message: "Failed to make payload")
+    }
+    var payload = Grpc_Testing_Payload()
+    payload.body = Data(count: size)
+    payload.type = type
+    return payload
+  }
+
+  /// Process a simple RPC.
+  /// - parameters:
+  ///     - request: The request from the client.
+  /// - returns: A response to send back to the client.
+  private static func processSimpleRPC(
+    request: Grpc_Testing_SimpleRequest
+  ) throws -> Grpc_Testing_SimpleResponse {
+    var response = Grpc_Testing_SimpleResponse()
+    if request.responseSize > 0 {
+      response.payload = try self.makePayload(
+        type: request.responseType,
+        size: Int(request.responseSize)
+      )
+    }
+    return response
+  }
+}

+ 1 - 1
Performance/QPSBenchmark/Sources/QPSBenchmark/Runtime/AsyncServer.swift

@@ -51,7 +51,7 @@ final class AsyncQPSServer: QPSServer {
     let workerService = AsyncQPSServerImpl()
 
     // Start the server.
-    // TODO: Support TLS is requested.
+    // TODO: Support TLS if requested.
     self.server = Server.insecure(group: self.eventLoopGroup)
       .withServiceProviders([workerService])
       .withLogger(self.logger)