Răsfoiți Sursa

Expose public creation methods for `GRPCAsyncRequestStream` and `GRPCAsyncResponseStreamWriter` (#1485)

Motivation:

It is highly desirable to be able to write tests against the generated method of a service. Currently, this is close to impossible since both the `GRPCAsyncRequestStream` and the `GRPCAsyncResponseStreamWriter` don't expose a public init so users cannot drive and observe the functions.

Modification:

Adds new public methods to drive and observe the request stream and the response writer.

Result

We can now test functions which use the request stream and response stream writer.
Franz Busch 3 ani în urmă
părinte
comite
b11a8562b3

+ 107 - 13
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift

@@ -15,46 +15,140 @@
  */
 
 #if compiler(>=5.6)
-
-/// This is currently a wrapper around AsyncThrowingStream because we want to be
+/// A type for the stream of request messages send to a gRPC server method.
+///
+/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
+/// method which allows you to create a stream that you can drive.
+///
+/// - Note: This is currently a wrapper around AsyncThrowingStream because we want to be
 /// able to swap out the implementation for something else in the future.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
+  /// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
+  public struct Source {
+    @usableFromInline
+    internal let continuation: AsyncThrowingStream<Element, Error>.Continuation
+
+    @inlinable
+    init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
+      self.continuation = continuation
+    }
+
+    /// Yields the element to the request stream.
+    ///
+    /// - Parameter element: The element to yield to the request stream.
+    @inlinable
+    public func yield(_ element: Element) {
+      self.continuation.yield(element)
+    }
+
+    /// Finished the request stream.
+    @inlinable
+    public func finish() {
+      self.continuation.finish()
+    }
+
+    /// Finished the request stream.
+    ///
+    /// - Parameter error: An optional `Error` to finish the request stream with.
+    @inlinable
+    public func finish(throwing error: Error?) {
+      self.continuation.finish(throwing: error)
+    }
+  }
+
+  /// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
+  ///
+  /// This struct contains two properties:
+  /// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
+  /// 2. The ``source`` which can be used to drive the stream.
+  public struct TestingStream {
+    /// The actual stream.
+    public let stream: GRPCAsyncRequestStream<Element>
+    /// The source used to drive the stream.
+    public let source: Source
+
+    @inlinable
+    init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
+      self.stream = stream
+      self.source = source
+    }
+  }
+
   @usableFromInline
-  internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
+  enum Backing: Sendable {
+    case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>)
+    case asyncStream(AsyncThrowingStream<Element, Error>)
+  }
 
   @usableFromInline
-  internal let _stream: _WrappedStream
+  internal let backing: Backing
+
+  @inlinable
+  internal init(_ sequence: PassthroughMessageSequence<Element, Error>) {
+    self.backing = .passthroughMessageSequence(sequence)
+  }
 
   @inlinable
-  internal init(_ stream: _WrappedStream) {
-    self._stream = stream
+  internal init(_ stream: AsyncThrowingStream<Element, Error>) {
+    self.backing = .asyncStream(stream)
+  }
+
+  /// Creates a new testing stream.
+  ///
+  /// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
+  /// to your method.
+  ///
+  /// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
+  @inlinable
+  public static func makeTestingRequestStream() -> TestingStream {
+    var continuation: AsyncThrowingStream<Element, Error>.Continuation!
+    let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
+    let source = Source(continuation: continuation)
+    let requestStream = Self(stream)
+    return TestingStream(stream: requestStream, source: source)
   }
 
   @inlinable
   public func makeAsyncIterator() -> Iterator {
-    Self.AsyncIterator(self._stream)
+    switch self.backing {
+    case let .passthroughMessageSequence(sequence):
+      return Self.AsyncIterator(.passthroughMessageSequence(sequence.makeAsyncIterator()))
+    case let .asyncStream(stream):
+      return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
+    }
   }
 
   public struct Iterator: AsyncIteratorProtocol {
     @usableFromInline
-    internal var iterator: _WrappedStream.AsyncIterator
+    enum BackingIterator {
+      case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>.Iterator)
+      case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
+    }
+
+    @usableFromInline
+    internal var iterator: BackingIterator
 
     @usableFromInline
-    internal init(_ stream: _WrappedStream) {
-      self.iterator = stream.makeAsyncIterator()
+    internal init(_ iterator: BackingIterator) {
+      self.iterator = iterator
     }
 
     @inlinable
     public mutating func next() async throws -> Element? {
-      try await self.iterator.next()
+      switch self.iterator {
+      case let .passthroughMessageSequence(iterator):
+        return try await iterator.next()
+      case var .asyncStream(iterator):
+        let element = try await iterator.next()
+        self.iterator = .asyncStream(iterator)
+        return element
+      }
     }
   }
 }
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}
-@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-extension GRPCAsyncRequestStream.Iterator: Sendable where Element: Sendable {}
 
 #endif

+ 112 - 3
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

@@ -17,8 +17,84 @@
 #if compiler(>=5.6)
 
 /// Writer for server-streaming RPC handlers to provide responses.
+///
+/// To enable testability this type provides a static ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``
+/// method which allows you to create a stream that you can drive.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
+  /// An `AsyncSequence` backing a ``GRPCAsyncResponseStreamWriter`` for testing purposes.
+  ///
+  /// - Important: This `AsyncSequence` is never finishing.
+  public struct ResponseStream: AsyncSequence {
+    public typealias Element = (Response, Compression)
+
+    @usableFromInline
+    internal let stream: AsyncStream<(Response, Compression)>
+
+    @usableFromInline
+    internal let continuation: AsyncStream<(Response, Compression)>.Continuation
+
+    @inlinable
+    init(
+      stream: AsyncStream<(Response, Compression)>,
+      continuation: AsyncStream<(Response, Compression)>.Continuation
+    ) {
+      self.stream = stream
+      self.continuation = continuation
+    }
+
+    public func makeAsyncIterator() -> AsyncIterator {
+      AsyncIterator(iterator: self.stream.makeAsyncIterator())
+    }
+
+    /// Finishes the response stream.
+    ///
+    /// This is useful in tests to finish the stream after the async method finished and allows you to collect all written responses.
+    public func finish() {
+      self.continuation.finish()
+    }
+
+    public struct AsyncIterator: AsyncIteratorProtocol {
+      @usableFromInline
+      internal var iterator: AsyncStream<(Response, Compression)>.AsyncIterator
+
+      @inlinable
+      init(iterator: AsyncStream<(Response, Compression)>.AsyncIterator) {
+        self.iterator = iterator
+      }
+
+      public mutating func next() async -> Element? {
+        await self.iterator.next()
+      }
+    }
+  }
+
+  /// Simple struct for the return type of ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``.
+  ///
+  /// This struct contains two properties:
+  /// 1. The ``writer`` which is the actual ``GRPCAsyncResponseStreamWriter`` and should be passed to the method under testing.
+  /// 2. The ``stream`` which can be used to observe the written responses.
+  public struct TestingStreamWriter {
+    /// The actual writer.
+    public let writer: GRPCAsyncResponseStreamWriter<Response>
+    /// The written responses in a stream.
+    ///
+    /// - Important: This `AsyncSequence` is never finishing.
+    public let stream: ResponseStream
+
+    @inlinable
+    init(writer: GRPCAsyncResponseStreamWriter<Response>, stream: ResponseStream) {
+      self.writer = writer
+      self.stream = stream
+    }
+  }
+
+  @usableFromInline
+  enum Backing: Sendable {
+    case asyncWriter(AsyncWriter<Delegate>)
+    case closure(@Sendable ((Response, Compression)) async -> Void)
+  }
+
   @usableFromInline
   internal typealias Element = (Response, Compression)
 
@@ -26,11 +102,16 @@ public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
   internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
 
   @usableFromInline
-  internal let asyncWriter: AsyncWriter<Delegate>
+  internal let backing: Backing
 
   @inlinable
   internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
-    self.asyncWriter = asyncWriter
+    self.backing = .asyncWriter(asyncWriter)
+  }
+
+  @inlinable
+  internal init(onWrite: @escaping @Sendable ((Response, Compression)) async -> Void) {
+    self.backing = .closure(onWrite)
   }
 
   @inlinable
@@ -38,7 +119,35 @@ public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
     _ response: Response,
     compression: Compression = .deferToCallDefault
   ) async throws {
-    try await self.asyncWriter.write((response, compression))
+    switch self.backing {
+    case let .asyncWriter(writer):
+      try await writer.write((response, compression))
+
+    case let .closure(closure):
+      await closure((response, compression))
+    }
+  }
+
+  /// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
+  /// This is mostly useful for testing purposes where one wants to observe the written responses.
+  ///
+  /// - Note: For most tests it is useful to call ``ResponseStream/finish()`` after the async method under testing
+  /// resumed. This allows you to easily collect all written responses.
+  @inlinable
+  public static func makeTestingResponseStreamWriter() -> TestingStreamWriter {
+    var continuation: AsyncStream<(Response, Compression)>.Continuation!
+    let asyncStream = AsyncStream<(Response, Compression)> { cont in
+      continuation = cont
+    }
+    let writer = Self.init { [continuation] in
+      continuation!.yield($0)
+    }
+    let responseStream = ResponseStream(
+      stream: asyncStream,
+      continuation: continuation
+    )
+
+    return TestingStreamWriter(writer: writer, stream: responseStream)
   }
 }
 

+ 1 - 2
Sources/GRPC/Interceptor/ClientTransportFactory.swift

@@ -304,8 +304,7 @@ internal struct FakeClientTransportFactory<Request, Response> {
   ) where RequestSerializer.Input == Request,
     RequestDeserializer.Output == Request,
     ResponseSerializer.Input == Response,
-    ResponseDeserializer.Output == Response
-  {
+    ResponseDeserializer.Output == Response {
     self.fakeResponseStream = fakeResponseStream
     self.requestSerializer = AnySerializer(wrapping: requestSerializer)
     self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)

+ 34 - 0
Tests/GRPCTests/AsyncAwaitSupport/GRPCAsyncRequestStreamTests.swift

@@ -0,0 +1,34 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+
+import GRPC
+import XCTest
+
+@available(macOS 12, iOS 13, tvOS 13, watchOS 6, *)
+final class GRPCAsyncRequestStreamTests: XCTestCase {
+  func testRecorder() async throws {
+    let testingStream = GRPCAsyncRequestStream<Int>.makeTestingRequestStream()
+
+    testingStream.source.yield(1)
+    testingStream.source.finish(throwing: nil)
+
+    let results = try await testingStream.stream.collect()
+
+    XCTAssertEqual(results, [1])
+  }
+}
+#endif

+ 34 - 0
Tests/GRPCTests/AsyncAwaitSupport/GRPCAsyncResponseStreamWriterTests.swift

@@ -0,0 +1,34 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+
+import GRPC
+import XCTest
+
+@available(macOS 12, iOS 13, tvOS 13, watchOS 6, *)
+final class GRPCAsyncResponseStreamWriterTests: XCTestCase {
+  func testRecorder() async throws {
+    let responseStreamWriter = GRPCAsyncResponseStreamWriter<Int>.makeTestingResponseStreamWriter()
+
+    try await responseStreamWriter.writer.send(1, compression: .disabled)
+    responseStreamWriter.stream.finish()
+
+    let results = try await responseStreamWriter.stream.collect()
+    XCTAssertEqual(results[0].0, 1)
+    XCTAssertEqual(results[0].1, .disabled)
+  }
+}
+#endif