Pārlūkot izejas kodu

Generate async echo client and server, add async server implementation and basic tests (#1260)

Motivation:

In #1259 code generation was added for async client and servers. We
should generate the echo service and have tests exercising the generated
async code.

Modifications:

Switch on async code generation for echo
Implement an async echo service and add some basic tests
Results:

We run tests with an async client and server
George Barnett 4 gadi atpakaļ
vecāks
revīzija
4d3986f9e4

+ 1 - 1
Makefile

@@ -78,7 +78,7 @@ ${ECHO_GRPC}: ${ECHO_PROTO} ${PROTOC_GEN_GRPC_SWIFT}
 	protoc $< \
 		--proto_path=$(dir $<) \
 		--plugin=${PROTOC_GEN_GRPC_SWIFT} \
-		--grpc-swift_opt=Visibility=Public,TestClient=true \
+		--grpc-swift_opt=Visibility=Public,TestClient=true,ExperimentalAsyncClient=true,ExperimentalAsyncServer=true \
 		--grpc-swift_out=$(dir $<)
 
 # Generates protobufs and gRPC client and server for the Echo example

+ 72 - 0
Sources/Examples/Echo/Implementation/EchoAsyncProvider.swift

@@ -0,0 +1,72 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+import EchoModel
+import GRPC
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public final class EchoAsyncProvider: Echo_EchoAsyncProvider {
+  public let interceptors: Echo_EchoServerInterceptorFactoryProtocol?
+
+  public init(interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil) {
+    self.interceptors = interceptors
+  }
+
+  public func get(
+    request: Echo_EchoRequest,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Echo_EchoResponse {
+    return .with {
+      $0.text = "Swift echo get: " + request.text
+    }
+  }
+
+  public func expand(
+    request: Echo_EchoRequest,
+    responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    for (i, part) in request.text.components(separatedBy: " ").lazy.enumerated() {
+      try await responseStream.send(.with { $0.text = "Swift echo expand (\(i)): \(part)" })
+    }
+  }
+
+  public func collect(
+    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Echo_EchoResponse {
+    let text = try await requests.reduce(into: "Swift echo collect:") { result, request in
+      result += " \(request.text)"
+    }
+
+    return .with { $0.text = text }
+  }
+
+  public func update(
+    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    var counter = 0
+    for try await request in requests {
+      let text = "Swift echo update (\(counter)): \(request.text)"
+      try await responseStream.send(.with { $0.text = text })
+      counter += 1
+    }
+  }
+}
+
+#endif // compiler(>=5.5)

+ 193 - 4
Sources/Examples/Echo/Model/echo.grpc.swift

@@ -172,6 +172,101 @@ public final class Echo_EchoClient: Echo_EchoClientProtocol {
   }
 }
 
+#if compiler(>=5.5)
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public protocol Echo_EchoAsyncClientProtocol: GRPCClient {
+  var serviceName: String { get }
+  var interceptors: Echo_EchoClientInterceptorFactoryProtocol? { get }
+
+  func makeGetCall(
+    _ request: Echo_EchoRequest,
+    callOptions: CallOptions?
+  ) -> GRPCAsyncUnaryCall<Echo_EchoRequest, Echo_EchoResponse>
+
+  func makeExpandCall(
+    _ request: Echo_EchoRequest,
+    callOptions: CallOptions?
+  ) -> GRPCAsyncServerStreamingCall<Echo_EchoRequest, Echo_EchoResponse>
+
+  func makeCollectCall(
+    callOptions: CallOptions?
+  ) -> GRPCAsyncClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>
+
+  func makeUpdateCall(
+    callOptions: CallOptions?
+  ) -> GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse>
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension Echo_EchoAsyncClientProtocol {
+  public var serviceName: String {
+    return "echo.Echo"
+  }
+
+  public var interceptors: Echo_EchoClientInterceptorFactoryProtocol? {
+    return nil
+  }
+
+  public func makeGetCall(
+    _ request: Echo_EchoRequest,
+    callOptions: CallOptions? = nil
+  ) -> GRPCAsyncUnaryCall<Echo_EchoRequest, Echo_EchoResponse> {
+    return self.makeAsyncUnaryCall(
+      path: "/echo.Echo/Get",
+      request: request,
+      callOptions: callOptions ?? self.defaultCallOptions
+    )
+  }
+
+  public func makeExpandCall(
+    _ request: Echo_EchoRequest,
+    callOptions: CallOptions? = nil
+  ) -> GRPCAsyncServerStreamingCall<Echo_EchoRequest, Echo_EchoResponse> {
+    return self.makeAsyncServerStreamingCall(
+      path: "/echo.Echo/Expand",
+      request: request,
+      callOptions: callOptions ?? self.defaultCallOptions
+    )
+  }
+
+  public func makeCollectCall(
+    callOptions: CallOptions? = nil
+  ) -> GRPCAsyncClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse> {
+    return self.makeAsyncClientStreamingCall(
+      path: "/echo.Echo/Collect",
+      callOptions: callOptions ?? self.defaultCallOptions
+    )
+  }
+
+  public func makeUpdateCall(
+    callOptions: CallOptions? = nil
+  ) -> GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> {
+    return self.makeAsyncBidirectionalStreamingCall(
+      path: "/echo.Echo/Update",
+      callOptions: callOptions ?? self.defaultCallOptions
+    )
+  }
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public struct Echo_EchoAsyncClient: Echo_EchoAsyncClientProtocol {
+  public var channel: GRPCChannel
+  public var defaultCallOptions: CallOptions
+  public var interceptors: Echo_EchoClientInterceptorFactoryProtocol?
+
+  public init(
+    channel: GRPCChannel,
+    defaultCallOptions: CallOptions = CallOptions(),
+    interceptors: Echo_EchoClientInterceptorFactoryProtocol? = nil
+  ) {
+    self.channel = channel
+    self.defaultCallOptions = defaultCallOptions
+    self.interceptors = interceptors
+  }
+}
+
+#endif // compiler(>=5.5)
+
 public final class Echo_EchoTestClient: Echo_EchoClientProtocol {
   private let fakeChannel: FakeChannel
   public var defaultCallOptions: CallOptions
@@ -204,7 +299,7 @@ public final class Echo_EchoTestClient: Echo_EchoClientProtocol {
   public func enqueueGetResponse(
     _ response: Echo_EchoResponse,
     _ requestHandler: @escaping (FakeRequestPart<Echo_EchoRequest>) -> () = { _ in }
-  )  {
+  ) {
     let stream = self.makeGetResponseStream(requestHandler)
     // This is the only operation on the stream; try! is fine.
     try! stream.sendMessage(response)
@@ -228,7 +323,7 @@ public final class Echo_EchoTestClient: Echo_EchoClientProtocol {
   public func enqueueExpandResponses(
     _ responses: [Echo_EchoResponse],
     _ requestHandler: @escaping (FakeRequestPart<Echo_EchoRequest>) -> () = { _ in }
-  )  {
+  ) {
     let stream = self.makeExpandResponseStream(requestHandler)
     // These are the only operation on the stream; try! is fine.
     responses.forEach { try! stream.sendMessage($0) }
@@ -253,7 +348,7 @@ public final class Echo_EchoTestClient: Echo_EchoClientProtocol {
   public func enqueueCollectResponse(
     _ response: Echo_EchoResponse,
     _ requestHandler: @escaping (FakeRequestPart<Echo_EchoRequest>) -> () = { _ in }
-  )  {
+  ) {
     let stream = self.makeCollectResponseStream(requestHandler)
     // This is the only operation on the stream; try! is fine.
     try! stream.sendMessage(response)
@@ -277,7 +372,7 @@ public final class Echo_EchoTestClient: Echo_EchoClientProtocol {
   public func enqueueUpdateResponses(
     _ responses: [Echo_EchoResponse],
     _ requestHandler: @escaping (FakeRequestPart<Echo_EchoRequest>) -> () = { _ in }
-  )  {
+  ) {
     let stream = self.makeUpdateResponseStream(requestHandler)
     // These are the only operation on the stream; try! is fine.
     responses.forEach { try! stream.sendMessage($0) }
@@ -377,3 +472,97 @@ public protocol Echo_EchoServerInterceptorFactoryProtocol {
   ///   Defaults to calling `self.makeInterceptors()`.
   func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>]
 }
+
+#if compiler(>=5.5)
+
+/// To implement a server, implement an object which conforms to this protocol.
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public protocol Echo_EchoAsyncProvider: CallHandlerProvider {
+  var interceptors: Echo_EchoServerInterceptorFactoryProtocol? { get }
+
+  /// Immediately returns an echo of a request.
+  @Sendable func get(
+    request: Echo_EchoRequest,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Echo_EchoResponse
+
+  /// Splits a request into words and returns each word in a stream of messages.
+  @Sendable func expand(
+    request: Echo_EchoRequest,
+    responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws
+
+  /// Collects a stream of messages and returns them concatenated when the caller closes.
+  @Sendable func collect(
+    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    context: GRPCAsyncServerCallContext
+  ) async throws -> Echo_EchoResponse
+
+  /// Streams back messages as they are received in an input stream.
+  @Sendable func update(
+    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
+    context: GRPCAsyncServerCallContext
+  ) async throws
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension Echo_EchoAsyncProvider {
+  public var serviceName: Substring {
+    return "echo.Echo"
+  }
+
+  public var interceptors: Echo_EchoServerInterceptorFactoryProtocol? {
+    return nil
+  }
+
+  public func handle(
+    method name: Substring,
+    context: CallHandlerContext
+  ) -> GRPCServerHandlerProtocol? {
+    switch name {
+    case "Get":
+      return GRPCAsyncServerHandler(
+        context: context,
+        requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
+        responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
+        interceptors: self.interceptors?.makeGetInterceptors() ?? [],
+        wrapping: self.get(request:context:)
+      )
+
+    case "Expand":
+      return GRPCAsyncServerHandler(
+        context: context,
+        requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
+        responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
+        interceptors: self.interceptors?.makeExpandInterceptors() ?? [],
+        wrapping: self.expand(request:responseStream:context:)
+      )
+
+    case "Collect":
+      return GRPCAsyncServerHandler(
+        context: context,
+        requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
+        responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
+        interceptors: self.interceptors?.makeCollectInterceptors() ?? [],
+        wrapping: self.collect(requests:context:)
+      )
+
+    case "Update":
+      return GRPCAsyncServerHandler(
+        context: context,
+        requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
+        responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
+        interceptors: self.interceptors?.makeUpdateInterceptors() ?? [],
+        wrapping: self.update(requests:responseStream:context:)
+      )
+
+    default:
+      return nil
+    }
+  }
+}
+
+#endif // compiler(>=5.5)
+

+ 156 - 0
Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift

@@ -0,0 +1,156 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+import EchoImplementation
+import EchoModel
+import GRPC
+import NIOCore
+import NIOHPACK
+import NIOPosix
+import XCTest
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+final class AsyncIntegrationTests: GRPCTestCase {
+  private var group: EventLoopGroup!
+  private var server: Server!
+  private var client: GRPCChannel!
+
+  private var echo: Echo_EchoAsyncClient {
+    return .init(channel: self.client, defaultCallOptions: self.callOptionsWithLogger)
+  }
+
+  override func setUp() {
+    super.setUp()
+
+    self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
+    self.server = try! Server.insecure(group: self.group)
+      .withLogger(self.serverLogger)
+      .withServiceProviders([EchoAsyncProvider()])
+      .bind(host: "127.0.0.1", port: 0)
+      .wait()
+
+    let port = self.server.channel.localAddress!.port!
+    self.client = ClientConnection.insecure(group: self.group)
+      .withBackgroundActivityLogger(self.clientLogger)
+      .connect(host: "127.0.0.1", port: port)
+  }
+
+  override func tearDown() {
+    XCTAssertNoThrow(try self.client.close().wait())
+    XCTAssertNoThrow(try self.server.close().wait())
+    XCTAssertNoThrow(try self.group.syncShutdownGracefully())
+    super.tearDown()
+  }
+
+  func testUnary() {
+    XCTAsyncTest {
+      let get = self.echo.makeGetCall(.with { $0.text = "hello" })
+
+      let initialMetadata = try await get.initialMetadata
+      initialMetadata.assertFirst("200", forName: ":status")
+
+      let response = try await get.response
+      XCTAssertEqual(response.text, "Swift echo get: hello")
+
+      let trailingMetadata = try await get.trailingMetadata
+      trailingMetadata.assertFirst("0", forName: "grpc-status")
+
+      let status = await get.status
+      XCTAssertTrue(status.isOk)
+    }
+  }
+
+  func testClientStreaming() {
+    XCTAsyncTest {
+      let collect = self.echo.makeCollectCall()
+
+      try await collect.sendMessage(.with { $0.text = "boyle" })
+      try await collect.sendMessage(.with { $0.text = "jeffers" })
+      try await collect.sendMessage(.with { $0.text = "holt" })
+      try await collect.sendEnd()
+
+      let initialMetadata = try await collect.initialMetadata
+      initialMetadata.assertFirst("200", forName: ":status")
+
+      let response = try await collect.response
+      XCTAssertEqual(response.text, "Swift echo collect: boyle jeffers holt")
+
+      let trailingMetadata = try await collect.trailingMetadata
+      trailingMetadata.assertFirst("0", forName: "grpc-status")
+
+      let status = await collect.status
+      XCTAssertTrue(status.isOk)
+    }
+  }
+
+  func testServerStreaming() {
+    XCTAsyncTest {
+      let expand = self.echo.makeExpandCall(.with { $0.text = "boyle jeffers holt" })
+
+      let initialMetadata = try await expand.initialMetadata
+      initialMetadata.assertFirst("200", forName: ":status")
+
+      let respones = try await expand.responses.map { $0.text }.collect()
+      XCTAssertEqual(respones, [
+        "Swift echo expand (0): boyle",
+        "Swift echo expand (1): jeffers",
+        "Swift echo expand (2): holt",
+      ])
+
+      let trailingMetadata = try await expand.trailingMetadata
+      trailingMetadata.assertFirst("0", forName: "grpc-status")
+
+      let status = await expand.status
+      XCTAssertTrue(status.isOk)
+    }
+  }
+
+  func testBidirectionalStreaming() {
+    XCTAsyncTest {
+      let update = self.echo.makeUpdateCall()
+
+      var responseIterator = update.responses.map { $0.text }.makeAsyncIterator()
+
+      for (i, name) in ["boyle", "jeffers", "holt"].enumerated() {
+        try await update.sendMessage(.with { $0.text = name })
+        let response = try await responseIterator.next()
+        XCTAssertEqual(response, "Swift echo update (\(i)): \(name)")
+      }
+
+      try await update.sendEnd()
+
+      // This isn't right after we make the call as servers are not guaranteed to send metadata back
+      // immediately. Concretely, we don't send initial metadata back until the first response
+      // message is sent by the server.
+      let initialMetadata = try await update.initialMetadata
+      initialMetadata.assertFirst("200", forName: ":status")
+
+      let trailingMetadata = try await update.trailingMetadata
+      trailingMetadata.assertFirst("0", forName: "grpc-status")
+
+      let status = await update.status
+      XCTAssertTrue(status.isOk)
+    }
+  }
+}
+
+extension HPACKHeaders {
+  func assertFirst(_ value: String, forName name: String) {
+    XCTAssertEqual(self.first(name: name), value)
+  }
+}
+
+#endif // compiler(>=5.5)