فهرست منبع

Replace the placeholder client request and response mechanisms (#1264)

Motivation:

The client code for async-await has some placeholder implementations for
receiving responses and sending requests. Now that we have the necessary
types we can replace the placeholders.

Modifications:

- Replace `AsyncStream` usage with a `PassthroughMessageSequence`
- Add a `GRPCAsyncRequestStreamWriter` which request streaming RPCs may use to
  write requests and finish the request stream.
- This writer depends on a delegate which uses callbacks to send
  messages on the underlying `Call`.
- Request streaming RPCs now have a `requestStream` on which to send
  requests.
- Wrap up `AsyncWriterError` as a `public` `struct`.

Result:

Clients may send requests on a `requestStream`.
George Barnett 4 سال پیش
والد
کامیت
1af17c9b38

+ 33 - 8
Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

@@ -239,7 +239,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
   ///
   /// The call may be suspend if the writer is paused.
   ///
-  /// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
+  /// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks
   ///   have been suspended.
   @inlinable
   internal func write(_ element: Element) async throws {
@@ -256,7 +256,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
     // - error (the writer is complete or the queue is full).
 
     if self._completionState.isPendingOrCompleted {
-      continuation.resume(throwing: AsyncWriterError.alreadyFinished)
+      continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
     } else if !self._isPaused, self._pendingElements.isEmpty {
       self._delegate.write(element)
       continuation.resume()
@@ -264,7 +264,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
       // The continuation will be resumed later.
       self._pendingElements.append(PendingElement(element, continuation: continuation))
     } else {
-      continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
+      continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites)
     }
   }
 
@@ -279,7 +279,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
   @inlinable
   internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
     if self._completionState.isPendingOrCompleted {
-      continuation.resume(throwing: AsyncWriterError.alreadyFinished)
+      continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
     } else if !self._isPaused, self._pendingElements.isEmpty {
       self._completionState = .completed
       self._delegate.writeEnd(end)
@@ -291,10 +291,35 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
   }
 }
 
-@usableFromInline
-internal enum AsyncWriterError: Error, Hashable {
-  case tooManyPendingWrites
-  case alreadyFinished
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension AsyncWriter where End == Void {
+  @inlinable
+  internal func finish() async throws {
+    try await self.finish(())
+  }
+}
+
+public struct GRPCAsyncWriterError: Error, Hashable {
+  private let wrapped: Wrapped
+
+  @usableFromInline
+  internal enum Wrapped {
+    case tooManyPendingWrites
+    case alreadyFinished
+  }
+
+  @usableFromInline
+  internal init(_ wrapped: Wrapped) {
+    self.wrapped = wrapped
+  }
+
+  /// There are too many writes pending. This may occur when too many Tasks are writing
+  /// concurrently.
+  public static let tooManyPendingWrites = Self(.tooManyPendingWrites)
+
+  /// The writer has already finished. This may occur when the RPC completes prematurely, or when
+  /// a user calls finish more than once.
+  public static let alreadyFinished = Self(.alreadyFinished)
 }
 
 @usableFromInline

+ 33 - 0
Sources/GRPC/AsyncAwaitSupport/Call+AsyncRequestStreamWriter.swift

@@ -0,0 +1,33 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension Call {
+  internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter<Request> {
+    let delegate = GRPCAsyncRequestStreamWriter<Request>.Delegate(
+      compressionEnabled: self.options.messageEncoding.enabledForRequests
+    ) { request, metadata in
+      self.send(.message(request, metadata), promise: nil)
+    } finish: {
+      self.send(.end, promise: nil)
+    }
+
+    return GRPCAsyncRequestStreamWriter(asyncWriter: .init(delegate: delegate))
+  }
+}
+
+#endif // compiler(>=5.5)

+ 49 - 80
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

@@ -15,7 +15,6 @@
  */
 #if compiler(>=5.5)
 
-import _NIOConcurrency
 import NIOHPACK
 
 /// Async-await variant of BidirectionalStreamingCall.
@@ -23,6 +22,10 @@ import NIOHPACK
 public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
   private let call: Call<Request, Response>
   private let responseParts: StreamingResponseParts<Response>
+  private let responseSource: PassthroughMessageSource<Response, Error>
+
+  /// A request stream writer for sending messages to the server.
+  public let requestStream: GRPCAsyncRequestStreamWriter<Request>
 
   /// The stream of responses from the server.
   public let responses: GRPCAsyncResponseStream<Response>
@@ -74,93 +77,59 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
 
   private init(call: Call<Request, Response>) {
     self.call = call
-    // Initialise `responseParts` with an empty response handler because we
-    // provide the responses as an AsyncSequence in `responseStream`.
     self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
-
-    // Call and StreamingResponseParts are reference types so we grab a
-    // referecence to them here to avoid capturing mutable self in the  closure
-    // passed to the AsyncThrowingStream initializer.
-    //
-    // The alternative would be to declare the responseStream as:
-    // ```
-    // public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
-    // ```
-    //
-    // UPDATE: Additionally we expect to replace this soon with an AsyncSequence
-    // implementation that supports yielding values from outside the closure.
-    let call = self.call
-    let responseParts = self.responseParts
-    let responseStream = AsyncThrowingStream(Response.self) { continuation in
-      call.invokeStreamingRequests { error in
-        responseParts.handleError(error)
-        continuation.finish(throwing: error)
-      } onResponsePart: { responsePart in
-        responseParts.handle(responsePart)
-        switch responsePart {
-        case let .message(response):
-          continuation.yield(response)
-        case .metadata:
-          break
-        case .end:
-          continuation.finish()
-        }
-      }
-    }
-    self.responses = .init(responseStream)
+    self.responseSource = PassthroughMessageSource<Response, Error>()
+    self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
+    self.requestStream = call.makeRequestStreamWriter()
   }
 
   /// We expose this as the only non-private initializer so that the caller
   /// knows that invocation is part of initialisation.
   internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
-    Self(call: call)
-  }
-
-  // MARK: - Requests
-
-  /// Sends a message to the service.
-  ///
-  /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
-  ///
-  /// - Parameters:
-  ///   - message: The message to send.
-  ///   - compression: Whether compression should be used for this message. Ignored if compression
-  ///     was not enabled for the RPC.
-  public func sendMessage(
-    _ message: Request,
-    compression: Compression = .deferToCallDefault
-  ) async throws {
-    let compress = self.call.compress(compression)
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
-    // TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
-    try await promise.futureResult.get()
-  }
-
-  /// Sends a sequence of messages to the service.
-  ///
-  /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
-  ///
-  /// - Parameters:
-  ///   - messages: The sequence of messages to send.
-  ///   - compression: Whether compression should be used for this message. Ignored if compression
-  ///     was not enabled for the RPC.
-  public func sendMessages<S>(
-    _ messages: S,
-    compression: Compression = .deferToCallDefault
-  ) async throws where S: Sequence, S.Element == Request {
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.sendMessages(messages, compression: compression, promise: promise)
-    try await promise.futureResult.get()
+    let asyncCall = Self(call: call)
+
+    asyncCall.call.invokeStreamingRequests(
+      onError: { error in
+        asyncCall.responseParts.handleError(error)
+        asyncCall.responseSource.finish(throwing: error)
+      },
+      onResponsePart: AsyncCall.makeResponsePartHandler(
+        responseParts: asyncCall.responseParts,
+        responseSource: asyncCall.responseSource
+      )
+    )
+
+    return asyncCall
   }
+}
 
-  /// Terminates a stream of messages sent to the service.
-  ///
-  /// - Important: This should only ever be called once.
-  public func sendEnd() async throws {
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.send(.end, promise: promise)
-    try await promise.futureResult.get()
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+internal enum AsyncCall {
+  internal static func makeResponsePartHandler<Response>(
+    responseParts: StreamingResponseParts<Response>,
+    responseSource: PassthroughMessageSource<Response, Error>
+  ) -> (GRPCClientResponsePart<Response>) -> Void {
+    return { responsePart in
+      // Handle the metadata, trailers and status.
+      responseParts.handle(responsePart)
+
+      // Handle the response messages and status.
+      switch responsePart {
+      case .metadata:
+        ()
+
+      case let .message(response):
+        // TODO: when we support backpressure we will need to stop ignoring the return value.
+        _ = responseSource.yield(response)
+
+      case let .end(status, _):
+        if status.isOk {
+          responseSource.finish()
+        } else {
+          responseSource.finish(throwing: status)
+        }
+      }
+    }
   }
 }
 

+ 4 - 47
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

@@ -23,6 +23,9 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
   private let call: Call<Request, Response>
   private let responseParts: UnaryResponseParts<Response>
 
+  /// A request stream writer for sending messages to the server.
+  public let requestStream: GRPCAsyncRequestStreamWriter<Request>
+
   /// The options used to make the RPC.
   public var options: CallOptions {
     return self.call.options
@@ -81,6 +84,7 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
       onError: self.responseParts.handleError(_:),
       onResponsePart: self.responseParts.handle(_:)
     )
+    self.requestStream = call.makeRequestStreamWriter()
   }
 
   /// We expose this as the only non-private initializer so that the caller
@@ -88,53 +92,6 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
   internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
     Self(call: call)
   }
-
-  // MARK: - Requests
-
-  /// Sends a message to the service.
-  ///
-  /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
-  ///
-  /// - Parameters:
-  ///   - message: The message to send.
-  ///   - compression: Whether compression should be used for this message. Ignored if compression
-  ///     was not enabled for the RPC.
-  public func sendMessage(
-    _ message: Request,
-    compression: Compression = .deferToCallDefault
-  ) async throws {
-    let compress = self.call.compress(compression)
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
-    // TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
-    try await promise.futureResult.get()
-  }
-
-  /// Sends a sequence of messages to the service.
-  ///
-  /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
-  ///
-  /// - Parameters:
-  ///   - messages: The sequence of messages to send.
-  ///   - compression: Whether compression should be used for this message. Ignored if compression
-  ///     was not enabled for the RPC.
-  public func sendMessages<S>(
-    _ messages: S,
-    compression: Compression = .deferToCallDefault
-  ) async throws where S: Sequence, S.Element == Request {
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.sendMessages(messages, compression: compression, promise: promise)
-    try await promise.futureResult.get()
-  }
-
-  /// Terminates a stream of messages sent to the service.
-  ///
-  /// - Important: This should only ever be called once.
-  public func sendEnd() async throws {
-    let promise = self.call.eventLoop.makePromise(of: Void.self)
-    self.call.send(.end, promise: promise)
-    try await promise.futureResult.get()
-  }
 }
 
 #endif

+ 127 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift

@@ -0,0 +1,127 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+
+/// An object allowing the holder -- a client -- to send requests on an RPC.
+///
+/// Requests may be sent using ``send(_:compression:)``. After all requests have been sent
+/// the user is responsible for closing the request stream by calling ``finish()``.
+///
+/// ```
+/// // Send a request on the request stream, use the compression setting configured for the RPC.
+/// try await stream.send(request)
+///
+/// // Send a request and explicitly disable compression.
+/// try await stream.send(request, compression: .disabled)
+///
+/// // Finish the stream to indicate that no more messages will be sent.
+/// try await stream.finish()
+/// ```
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public struct GRPCAsyncRequestStreamWriter<Request> {
+  @usableFromInline
+  internal let asyncWriter: AsyncWriter<Delegate<Request>>
+
+  @inlinable
+  internal init(asyncWriter: AsyncWriter<Delegate<Request>>) {
+    self.asyncWriter = asyncWriter
+  }
+
+  /// Send a single request.
+  ///
+  /// To ensure requests are delivered in order callers should `await` the result of this call
+  /// before sending another request. Callers who do not need this guarantee do not have to `await`
+  /// the completion of this call and may send messages concurrently from multiple ``Task``s.
+  /// However, it is important to note that no more than 16 writes may be pending at any one time
+  /// and attempting to exceed this will result in an ``GRPCAsyncWriterError.tooManyPendingWrites``
+  /// error being thrown.
+  ///
+  /// Callers must call ``finish()`` when they have no more requests left to send.
+  ///
+  /// - Parameters:
+  ///   - request: The request to send.
+  ///   - compression: Whether the request should be compressed or not. Ignored if compression was
+  ///       not enabled for the RPC.
+  /// - Throws: ``GRPCAsyncWriterError`` if there are too many pending writes or the request stream
+  ///     has already been finished.
+  @inlinable
+  public func send(
+    _ request: Request,
+    compression: Compression = .deferToCallDefault
+  ) async throws {
+    try await self.asyncWriter.write((request, compression))
+  }
+
+  /// Finish the request stream for the RPC. This must be called when there are no more requests to
+  /// be sent.
+  ///
+  /// - Throws: ``GRPCAsyncWriterError`` if the request stream has already been finished.
+  public func finish() async throws {
+    try await self.asyncWriter.finish()
+  }
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension GRPCAsyncRequestStreamWriter {
+  /// A delegate for the writer which writes messages to an underlying receiver.`
+  @usableFromInline
+  internal final class Delegate<Request>: AsyncWriterDelegate {
+    @usableFromInline
+    internal typealias Element = (Request, Compression)
+
+    @usableFromInline
+    internal typealias End = Void
+
+    @usableFromInline
+    internal let _compressionEnabled: Bool
+
+    @usableFromInline
+    internal let _send: (Request, MessageMetadata) -> Void
+
+    @usableFromInline
+    internal let _finish: () -> Void
+
+    @inlinable
+    internal init(
+      compressionEnabled: Bool,
+      send: @escaping (Request, MessageMetadata) -> Void,
+      finish: @escaping () -> Void
+    ) {
+      self._compressionEnabled = compressionEnabled
+      self._send = send
+      self._finish = finish
+    }
+
+    @inlinable
+    internal func write(_ element: (Request, Compression)) {
+      let (request, compression) = element
+      let compress = compression.isEnabled(callDefault: self._compressionEnabled)
+
+      // TODO: be smarter about inserting flushes.
+      //
+      // We currently always flush after every write which may trigger more syscalls than necessary.
+      let metadata = MessageMetadata(compress: compress, flush: true)
+      self._send(request, metadata)
+    }
+
+    @inlinable
+    internal func writeEnd(_ end: Void) {
+      self._finish()
+    }
+  }
+}
+
+#endif // compiler(>=5.5)

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift

@@ -20,7 +20,7 @@
 @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
 public struct GRPCAsyncResponseStream<Element>: AsyncSequence {
   @usableFromInline
-  internal typealias WrappedStream = AsyncThrowingStream<Element, Error>
+  internal typealias WrappedStream = PassthroughMessageSequence<Element, Error>
 
   @usableFromInline
   internal let stream: WrappedStream

+ 21 - 38
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift

@@ -22,6 +22,7 @@ import NIOHPACK
 public struct GRPCAsyncServerStreamingCall<Request, Response> {
   private let call: Call<Request, Response>
   private let responseParts: StreamingResponseParts<Response>
+  private let responseSource: PassthroughMessageSource<Response, Error>
 
   /// The stream of responses from the server.
   public let responses: GRPCAsyncResponseStream<Response>
@@ -71,45 +72,13 @@ public struct GRPCAsyncServerStreamingCall<Request, Response> {
     }
   }
 
-  private init(
-    call: Call<Request, Response>,
-    _ request: Request
-  ) {
+  private init(call: Call<Request, Response>) {
     self.call = call
-    // Initialise `responseParts` with an empty response handler because we
-    // provide the responses as an AsyncSequence in `responseStream`.
+    // We ignore messages in the closure and instead feed them into the response source when we
+    // invoke the `call`.
     self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
-
-    // Call and StreamingResponseParts are reference types so we grab a
-    // referecence to them here to avoid capturing mutable self in the  closure
-    // passed to the AsyncThrowingStream initializer.
-    //
-    // The alternative would be to declare the responseStream as:
-    // ```
-    // public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
-    // ```
-    //
-    // UPDATE: Additionally we expect to replace this soon with an AsyncSequence
-    // implementation that supports yielding values from outside the closure.
-    let call = self.call
-    let responseParts = self.responseParts
-    self
-      .responses = GRPCAsyncResponseStream(AsyncThrowingStream(Response.self) { continuation in
-        call.invokeUnaryRequest(request) { error in
-          responseParts.handleError(error)
-          continuation.finish(throwing: error)
-        } onResponsePart: { responsePart in
-          responseParts.handle(responsePart)
-          switch responsePart {
-          case let .message(response):
-            continuation.yield(response)
-          case .metadata:
-            break
-          case .end:
-            continuation.finish()
-          }
-        }
-      })
+    self.responseSource = PassthroughMessageSource<Response, Error>()
+    self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
   }
 
   /// We expose this as the only non-private initializer so that the caller
@@ -118,7 +87,21 @@ public struct GRPCAsyncServerStreamingCall<Request, Response> {
     call: Call<Request, Response>,
     _ request: Request
   ) -> Self {
-    Self(call: call, request)
+    let asyncCall = Self(call: call)
+
+    asyncCall.call.invokeUnaryRequest(
+      request,
+      onError: { error in
+        asyncCall.responseParts.handleError(error)
+        asyncCall.responseSource.finish(throwing: error)
+      },
+      onResponsePart: AsyncCall.makeResponsePartHandler(
+        responseParts: asyncCall.responseParts,
+        responseSource: asyncCall.responseSource
+      )
+    )
+
+    return asyncCall
   }
 }
 

+ 6 - 6
Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift

@@ -77,10 +77,10 @@ final class AsyncIntegrationTests: GRPCTestCase {
     XCTAsyncTest {
       let collect = self.echo.makeCollectCall()
 
-      try await collect.sendMessage(.with { $0.text = "boyle" })
-      try await collect.sendMessage(.with { $0.text = "jeffers" })
-      try await collect.sendMessage(.with { $0.text = "holt" })
-      try await collect.sendEnd()
+      try await collect.requestStream.send(.with { $0.text = "boyle" })
+      try await collect.requestStream.send(.with { $0.text = "jeffers" })
+      try await collect.requestStream.send(.with { $0.text = "holt" })
+      try await collect.requestStream.finish()
 
       let initialMetadata = try await collect.initialMetadata
       initialMetadata.assertFirst("200", forName: ":status")
@@ -125,12 +125,12 @@ final class AsyncIntegrationTests: GRPCTestCase {
       var responseIterator = update.responses.map { $0.text }.makeAsyncIterator()
 
       for (i, name) in ["boyle", "jeffers", "holt"].enumerated() {
-        try await update.sendMessage(.with { $0.text = name })
+        try await update.requestStream.send(.with { $0.text = name })
         let response = try await responseIterator.next()
         XCTAssertEqual(response, "Swift echo update (\(i)): \(name)")
       }
 
-      try await update.sendEnd()
+      try await update.requestStream.finish()
 
       // This isn't right after we make the call as servers are not guaranteed to send metadata back
       // immediately. Concretely, we don't send initial metadata back until the first response

+ 12 - 12
Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift

@@ -67,7 +67,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       await writer.toggleWritability()
 
       await XCTAssertThrowsError(try await writer.write("pontiac")) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .tooManyPendingWrites)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .tooManyPendingWrites)
       }
 
       // resume (we must finish the writer.)
@@ -87,7 +87,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       XCTAssertEqual(delegate.end, 0)
 
       await XCTAssertThrowsError(try await writer.write("cheddar")) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       XCTAssertTrue(delegate.elements.isEmpty)
@@ -103,7 +103,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       XCTAssertEqual(delegate.end, 0)
 
       await XCTAssertThrowsError(try await writer.finish(1)) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       // Still 0.
@@ -151,7 +151,7 @@ internal class AsyncWriterTests: GRPCTestCase {
           do {
             try await writer.finish(1)
           } catch {
-            XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+            XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
             // Resume.
             await writer.toggleWritability()
           }
@@ -161,7 +161,7 @@ internal class AsyncWriterTests: GRPCTestCase {
           do {
             try await writer.finish(2)
           } catch {
-            XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+            XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
             // Resume.
             await writer.toggleWritability()
           }
@@ -170,7 +170,7 @@ internal class AsyncWriterTests: GRPCTestCase {
 
       // We should definitely be finished by this point.
       await XCTAssertThrowsError(try await writer.finish(3)) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
     }
   }
@@ -193,7 +193,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       } catch is CancellationError {
         // Cancellation is fine: we cancelled while the write was pending.
         ()
-      } catch let error as AsyncWriterError {
+      } catch let error as GRPCAsyncWriterError {
         // Already finish is also fine: we cancelled before the write was enqueued.
         XCTAssertEqual(error, .alreadyFinished)
       } catch {
@@ -201,7 +201,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       }
 
       await XCTAssertThrowsError(try await writer.write("bar")) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       XCTAssertTrue(delegate.elements.isEmpty)
@@ -227,7 +227,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       } catch is CancellationError {
         // Cancellation is fine: we cancelled while the write was pending.
         ()
-      } catch let error as AsyncWriterError {
+      } catch let error as GRPCAsyncWriterError {
         // Already finish is also fine: we cancelled before the write was enqueued.
         XCTAssertEqual(error, .alreadyFinished)
       } catch {
@@ -235,7 +235,7 @@ internal class AsyncWriterTests: GRPCTestCase {
       }
 
       await XCTAssertThrowsError(try await writer.finish(42)) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       XCTAssertTrue(delegate.elements.isEmpty)
@@ -250,13 +250,13 @@ internal class AsyncWriterTests: GRPCTestCase {
 
       await writer.cancel()
       await XCTAssertThrowsError(try await writer.write("1")) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       // Fine, no need to throw. Nothing should change.
       await writer.cancel()
       await XCTAssertThrowsError(try await writer.write("2")) { error in
-        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+        XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
       }
 
       XCTAssertTrue(delegate.elements.isEmpty)

+ 8 - 8
Tests/GRPCTests/GRPCAsyncClientCallTests.swift

@@ -96,9 +96,9 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
       )
 
     for word in ["boyle", "jeffers", "holt"] {
-      try await collect.sendMessage(.with { $0.text = word })
+      try await collect.requestStream.send(.with { $0.text = word })
     }
-    try await collect.sendEnd()
+    try await collect.requestStream.finish()
 
     await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
     await assertThat(try await collect.response, .doesNotThrow())
@@ -133,9 +133,9 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
       )
 
     for word in ["boyle", "jeffers", "holt"] {
-      try await update.sendMessage(.with { $0.text = word })
+      try await update.requestStream.send(.with { $0.text = word })
     }
-    try await update.sendEnd()
+    try await update.requestStream.finish()
 
     let numResponses = try await update.responses.map { _ in 1 }.reduce(0, +)
 
@@ -156,11 +156,11 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
 
     var responseStreamIterator = update.responses.makeAsyncIterator()
     for word in ["boyle", "jeffers", "holt"] {
-      try await update.sendMessage(.with { $0.text = word })
+      try await update.requestStream.send(.with { $0.text = word })
       await assertThat(try await responseStreamIterator.next(), .is(.notNil()))
     }
 
-    try await update.sendEnd()
+    try await update.requestStream.finish()
 
     await assertThat(try await responseStreamIterator.next(), .is(.nil()))
 
@@ -185,10 +185,10 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
       // Send requests, then end, in a task.
       taskGroup.addTask {
         for word in ["boyle", "jeffers", "holt"] {
-          try await update.sendMessage(.with { $0.text = word })
+          try await update.requestStream.send(.with { $0.text = word })
           await counter.incrementRequests()
         }
-        try await update.sendEnd()
+        try await update.requestStream.finish()
       }
       // Get responses in a separate task.
       taskGroup.addTask {