Browse Source

Allow to write a sequence of requests/responses (#1499)

* Allow to write a sequence of requests/responses

# Motivation
We recently adopted the `NIOAsyncWriter` to back the `GRPCAsyncRequestStreamWriter` and the `GRPCAsyncResponseStreamWriter`; however, we did not expose the functionality to write a sequence of the elements that the `NIOAsyncWriter` offers. This can be useful in cases where you want to write a batch of requests/responses since it reduces the amount of locks and thread hops.

# Modification
Expose new methods on the `GRPCAsyncRequestStreamWriter` and the `GRPCAsyncResponseStreamWriter` to enable to write a sequence. I also fixed up the comments for the `GRPCAsyncRequestStreamWriter` since they were outdated.

# Result
Users can write a batch of requests/responses.

* Update tests
Franz Busch 3 years ago
parent
commit
cfa950fbe1

+ 21 - 8
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift

@@ -49,12 +49,7 @@ public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
 
   /// Send a single request.
   ///
-  /// To ensure requests are delivered in order callers should `await` the result of this call
-  /// before sending another request. Callers who do not need this guarantee do not have to `await`
-  /// the completion of this call and may send messages concurrently from multiple ``Task``s.
-  /// However, it is important to note that no more than 16 writes may be pending at any one time
-  /// and attempting to exceed this will result in an ``GRPCAsyncWriterError/tooManyPendingWrites``
-  /// error being thrown.
+  /// It is safe to send multiple requests concurrently by sharing the ``GRPCAsyncRequestStreamWriter`` across tasks.
   ///
   /// Callers must call ``finish()`` when they have no more requests left to send.
   ///
@@ -62,8 +57,7 @@ public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
   ///   - request: The request to send.
   ///   - compression: Whether the request should be compressed or not. Ignored if compression was
   ///       not enabled for the RPC.
-  /// - Throws: ``GRPCAsyncWriterError`` if there are too many pending writes or the request stream
-  ///     has already been finished.
+  /// - Throws: If the request stream has already been finished.
   @inlinable
   public func send(
     _ request: Request,
@@ -72,6 +66,25 @@ public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
     try await self.asyncWriter.yield((request, compression))
   }
 
+  /// Send a sequence of requests.
+  ///
+  /// It is safe to send multiple requests concurrently by sharing the ``GRPCAsyncRequestStreamWriter`` across tasks.
+  ///
+  /// Callers must call ``finish()`` when they have no more requests left to send.
+  ///
+  /// - Parameters:
+  ///   - requests: The requests to send.
+  ///   - compression: Whether the requests should be compressed or not. Ignored if compression was
+  ///       not enabled for the RPC.
+  /// - Throws: If the request stream has already been finished.
+  @inlinable
+  public func send<S: Sequence>(
+    _ requests: S,
+    compression: Compression = .deferToCallDefault
+  ) async throws where S.Element == Request {
+    try await self.asyncWriter.yield(contentsOf: requests.lazy.map { ($0, compression) })
+  }
+
   /// Finish the request stream for the RPC. This must be called when there are no more requests to be sent.
   public func finish() async throws {
     self.asyncWriter.finish()

+ 17 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

@@ -129,6 +129,23 @@ public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
     }
   }
 
+  @inlinable
+  public func send<S: Sequence>(
+    contentsOf responses: S,
+    compression: Compression = .deferToCallDefault
+  ) async throws where S.Element == Response {
+    let responsesWithCompression = responses.lazy.map { ($0, compression) }
+    switch self.backing {
+    case let .asyncWriter(writer):
+      try await writer.yield(contentsOf: responsesWithCompression)
+
+    case let .closure(closure):
+      for response in responsesWithCompression {
+        await closure(response)
+      }
+    }
+  }
+
   /// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
   /// This is mostly useful for testing purposes where one wants to observe the written responses.
   ///

+ 6 - 3
Tests/GRPCTests/GRPCAsyncClientCallTests.swift

@@ -132,14 +132,17 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
         callOptions: .init()
       )
 
-    for word in ["boyle", "jeffers", "holt"] {
-      try await update.requestStream.send(.with { $0.text = word })
+    let requests = ["boyle", "jeffers", "holt"]
+      .map { word in Echo_EchoRequest.with { $0.text = word } }
+    for request in requests {
+      try await update.requestStream.send(request)
     }
+    try await update.requestStream.send(requests)
     try await update.requestStream.finish()
 
     let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +)
 
-    await assertThat(numResponses, .is(.equalTo(3)))
+    await assertThat(numResponses, .is(.equalTo(6)))
     await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
     await assertThat(await update.status, .hasCode(.ok))
   }

+ 22 - 0
Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift

@@ -230,6 +230,28 @@ class AsyncServerHandlerTests: GRPCTestCase {
     await responseStream.next().assertNil()
   }
 
+  func testResponseSequence() async throws {
+    let handler = self.makeHandler { _, responseStreamWriter, _ in
+      try await responseStreamWriter.send(contentsOf: ["1", "2", "3"])
+    }
+    defer {
+      XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
+    }
+
+    self.loop.execute {
+      handler.receiveMetadata([:])
+      handler.receiveEnd()
+    }
+
+    let responseStream = self.recorder.responseSequence.makeAsyncIterator()
+    await responseStream.next().assertMetadata { _ in }
+    await responseStream.next().assertMessage()
+    await responseStream.next().assertMessage()
+    await responseStream.next().assertMessage()
+    await responseStream.next().assertStatus { _, _ in }
+    await responseStream.next().assertNil()
+  }
+
   func testThrowingDeserializer() async throws {
     let handler = AsyncServerHandler(
       context: self.makeCallHandlerContext(),