Bläddra i källkod

Add client request and response objects (#1665)

George Barnett 2 år sedan
förälder
incheckning
9f01202193

+ 116 - 0
Sources/GRPCCore/Call/ClientRequest.swift

@@ -0,0 +1,116 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A namespace for request message types used by clients.
+public enum ClientRequest {}
+
+extension ClientRequest {
+  /// A request created by the client for a single message.
+  ///
+  /// This is used for unary and server-streaming RPCs.
+  ///
+  /// See ``ClientRequest/Stream`` for streaming requests and ``ServerRequest/Single`` for the
+  /// servers representation of a single-message request.
+  ///
+  /// ## Creating ``Single`` requests
+  ///
+  /// ```swift
+  /// let request = ClientRequest.Single<String>(message: "Hello, gRPC!")
+  /// print(request.metadata)  // prints '[:]'
+  /// print(request.message)  // prints 'Hello, gRPC!'
+  /// ```
+  public struct Single<Message: Sendable>: Sendable {
+    /// Caller-specified metadata to send to the server at the start of the RPC.
+    ///
+    /// Both gRPC Swift and its transport layer may insert additional metadata. Keys prefixed with
+    /// "grpc-" are prohibited and may result in undefined behaviour. Transports may also insert
+    /// their own metadata, you should avoid using key names which may clash with transport specific
+    /// metadata. Note that transports may also impose limits in the amount of metadata which may
+    /// be sent.
+    public var metadata: Metadata
+
+    /// The message to send to the server.
+    public var message: Message
+
+    /// Create a new single client request.
+    ///
+    /// - Parameters:
+    ///   - message: The message to send to the server.
+    ///   - metadata: Metadata to send to the server at the start of the request. Defaults to empty.
+    public init(
+      message: Message,
+      metadata: Metadata = [:]
+    ) {
+      self.metadata = metadata
+      self.message = message
+    }
+  }
+}
+
+extension ClientRequest {
+  /// A request created by the client for a stream of messages.
+  ///
+  /// This is used for client-streaming and bidirectional-streaming RPCs.
+  ///
+  /// See ``ClientRequest/Single`` for single-message requests and ``ServerRequest/Stream`` for the
+  /// servers representation of a streaming-message request.
+  @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+  public struct Stream<Message: Sendable>: Sendable {
+    /// Caller-specified metadata sent to the server at the start of the RPC.
+    ///
+    /// Both gRPC Swift and its transport layer may insert additional metadata. Keys prefixed with
+    /// "grpc-" are prohibited and may result in undefined behaviour. Transports may also insert
+    /// their own metadata, you should avoid using key names which may clash with transport specific
+    /// metadata. Note that transports may also impose limits in the amount of metadata which may
+    /// be sent.
+    public var metadata: Metadata
+
+    /// A closure which, when called, writes messages in the writer.
+    ///
+    /// The producer will only be consumed once by gRPC and therefore isn't required to be
+    /// idempotent. If the producer throws an error then the RPC will be cancelled. Once the
+    /// producer returns the request stream is closed.
+    public var producer: @Sendable (RPCWriter<Message>) async throws -> Void
+
+    /// Create a new streaming client request.
+    ///
+    /// - Parameters:
+    ///   - messageType: The type of message contained in this request, defaults to `Message.self`.
+    ///   - metadata: Metadata to send to the server at the start of the request. Defaults to empty.
+    ///   - producer: A closure which writes messages to send to the server. The closure is called
+    ///       at most once and may not be called.
+    public init(
+      of messageType: Message.Type = Message.self,
+      metadata: Metadata = [:],
+      producer: @escaping @Sendable (RPCWriter<Message>) async throws -> Void
+    ) {
+      self.metadata = metadata
+      self.producer = producer
+    }
+  }
+}
+
+// MARK: - Conversion
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension ClientRequest.Stream {
+  @_spi(Testing)
+  public init(single request: ClientRequest.Single<Message>) {
+    self.init(metadata: request.metadata) {
+      try await $0.write(request.message)
+    }
+  }
+}

+ 436 - 0
Sources/GRPCCore/Call/ClientResponse.swift

@@ -0,0 +1,436 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A namespace for response message types used by clients.
+public enum ClientResponse {}
+
+extension ClientResponse {
+  /// A response for a single message received by a client.
+  ///
+  /// Single responses are used for unary and client-streaming RPCs. For streaming responses
+  /// see ``ClientResponse/Stream``.
+  ///
+  /// A single response captures every part of the response stream and distinguishes successful
+  /// and unsuccessful responses via the ``accepted`` property. The value for the `success` case
+  /// contains the initial metadata, response message, and the trailing metadata and implicitly
+  /// has an ``Status/Code-swift.struct/ok`` status code.
+  ///
+  /// The `failure` case indicates that the server chose not to process the RPC, or the processing
+  /// of the RPC failed, or the client failed to execute the request. The failure case contains
+  /// an ``RPCError`` describing why the RPC failed, including an error code, error message and any
+  /// metadata sent by the server.
+  ///
+  /// ### Using ``Single`` responses
+  ///
+  /// Each response has a ``accepted`` property which contains all RPC information. You can create
+  /// one by calling ``init(accepted:)`` or one of the two convenience initializers:
+  /// - ``init(message:metadata:trailingMetadata:)`` to create a successful response, or
+  /// - ``init(of:error:)`` to create a failed response.
+  ///
+  /// You can interrogate a response by inspecting the ``accepted`` property directly or by using
+  /// its convenience properties:
+  /// - ``metadata`` extracts the initial metadata,
+  /// - ``message`` extracts the message, or throws if the response failed, and
+  /// - ``trailingMetadata`` extracts the trailing metadata.
+  ///
+  /// The following example demonstrates how you can use the API:
+  ///
+  /// ```swift
+  /// // Create a successful response
+  /// let response = ClientResponse.Single<String>(
+  ///   message: "Hello, World!",
+  ///   metadata: ["hello": "initial metadata"],
+  ///   trailingMetadata: ["goodbye": "trailing metadata"]
+  /// )
+  ///
+  /// // The explicit API:
+  /// switch response {
+  /// case .success(let contents):
+  ///   print("Received response with message '\(contents.message)'")
+  /// case .failure(let error):
+  ///   print("RPC failed with code '\(error.code)'")
+  /// }
+  ///
+  /// // The convenience API:
+  /// do {
+  ///   print("Received response with message '\(try response.message)'")
+  /// } catch let error as RPCError {
+  ///   print("RPC failed with code '\(error.code)'")
+  /// }
+  /// ```
+  public struct Single<Message: Sendable>: Sendable {
+    /// The contents of an accepted response with a single message.
+    public struct Contents: Sendable {
+      /// Metadata received from the server at the beginning of the response.
+      ///
+      /// The metadata may contain transport-specific information in addition to any application
+      /// level metadata provided by the service.
+      public var metadata: Metadata
+
+      /// The response message received from the server.
+      public var message: Message
+
+      /// Metadata received from the server at the end of the response.
+      ///
+      /// The metadata may contain transport-specific information in addition to any application
+      /// level metadata provided by the service.
+      public var trailingMetadata: Metadata
+
+      /// Creates a `Contents`.
+      ///
+      /// - Parameters:
+      ///   - metadata: Metadata received from the server at the beginning of the response.
+      ///   - message: The response message received from the server.
+      ///   - trailingMetadata: Metadata received from the server at the end of the response.
+      public init(
+        metadata: Metadata,
+        message: Message,
+        trailingMetadata: Metadata
+      ) {
+        self.metadata = metadata
+        self.message = message
+        self.trailingMetadata = trailingMetadata
+      }
+    }
+
+    /// Whether the RPC was accepted or rejected.
+    ///
+    /// The `success` case indicates the RPC completed successfully with an
+    /// ``Status/Code-swift.struct/ok`` status code. The `failure` case indicates that the RPC was
+    /// rejected by the server and wasn't processed or couldn't be processed successfully.
+    public var accepted: Result<Contents, RPCError>
+
+    /// Creates a new response.
+    ///
+    /// - Parameter accepted: The result of the RPC.
+    public init(accepted: Result<Contents, RPCError>) {
+      self.accepted = accepted
+    }
+  }
+}
+
+extension ClientResponse {
+  /// A response for a stream of messages received by a client.
+  ///
+  /// Stream responses are used for server-streaming and bidirectional-streaming RPCs. For single
+  /// responses see ``ClientResponse/Single``.
+  ///
+  /// A stream response captures every part of the response stream over time and distinguishes
+  /// accepted and rejected requests via the ``accepted`` property. An "accepted" request is one
+  /// where the the server responds with initial metadata and attempts to process the request. A
+  /// "rejected" request is one where the server responds with a status as the first and only
+  /// response part and doesn't process the request body.
+  ///
+  /// The value for the `success` case contains the initial metadata and a ``RPCAsyncSequence`` of
+  /// message parts (messages followed by a single status). If the sequence completes without
+  /// throwing then the response implicitly has an ``Status/Code-swift.struct/ok`` status code.
+  /// However, the response sequence may also throw an ``RPCError`` if the server fails to complete
+  /// processing the request.
+  ///
+  /// The `failure` case indicates that the server chose not to process the RPC or the client failed
+  /// to execute the request. The failure case contains an ``RPCError`` describing why the RPC
+  /// failed, including an error code, error message and any metadata sent by the server.
+  ///
+  /// ### Using ``Stream`` responses
+  ///
+  /// Each response has a ``accepted`` property which contains RPC information. You can create
+  /// one by calling ``init(accepted:)`` or one of the two convenience initializers:
+  /// - ``init(of:metadata:bodyParts:)`` to create an accepted response, or
+  /// - ``init(of:error:)`` to create a failed response.
+  ///
+  /// You can interrogate a response by inspecting the ``accepted`` property directly or by using
+  /// its convenience properties:
+  /// - ``metadata`` extracts the initial metadata,
+  /// - ``messages`` extracts the sequence of response message, or throws if the response failed.
+  ///
+  /// The following example demonstrates how you can use the API:
+  ///
+  /// ```swift
+  /// // Create a failed response
+  /// let response = ClientResponse.Stream(
+  ///   of: String.self,
+  ///   error: RPCError(code: .notFound, message: "The requested resource couldn't be located")
+  /// )
+  ///
+  /// // The explicit API:
+  /// switch response {
+  /// case .success(let contents):
+  ///   for try await part in contents.bodyParts {
+  ///     switch part {
+  ///     case .message(let message):
+  ///       print("Received message '\(message)'")
+  ///     case .trailingMetadata(let metadata):
+  ///       print("Received trailing metadata '\(metadata)'")
+  ///     }
+  ///   }
+  /// case .failure(let error):
+  ///   print("RPC failed with code '\(error.code)'")
+  /// }
+  ///
+  /// // The convenience API:
+  /// do {
+  ///   for try await message in response.messages {
+  ///     print("Received message '\(message)'")
+  ///   }
+  /// } catch let error as RPCError {
+  ///   print("RPC failed with code '\(error.code)'")
+  /// }
+  /// ```
+  @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+  public struct Stream<Message: Sendable>: Sendable {
+    public struct Contents: Sendable {
+      /// Metadata received from the server at the beginning of the response.
+      ///
+      /// The metadata may contain transport-specific information in addition to any application
+      /// level metadata provided by the service.
+      public var metadata: Metadata
+
+      /// A sequence of stream parts received from the server ending with metadata if the RPC
+      /// succeeded.
+      ///
+      /// If the RPC fails then the sequence will throw an ``RPCError``.
+      ///
+      /// The sequence may only be iterated once.
+      public var bodyParts: RPCAsyncSequence<BodyPart>
+
+      /// Parts received from the server.
+      public enum BodyPart: Sendable {
+        /// A response message.
+        case message(Message)
+        /// Metadata. Must be the final value of the sequence unless the stream throws an error.
+        case trailingMetadata(Metadata)
+      }
+
+      /// Creates a ``Contents``.
+      ///
+      /// - Parameters:
+      ///   - metadata: Metadata received from the server at the beginning of the response.
+      ///   - bodyParts: An `AsyncSequence` of parts received from the server.
+      public init(
+        metadata: Metadata,
+        bodyParts: RPCAsyncSequence<BodyPart>
+      ) {
+        self.metadata = metadata
+        self.bodyParts = bodyParts
+      }
+    }
+
+    /// Whether the RPC was accepted or rejected.
+    ///
+    /// The `success` case indicates the RPC was accepted by the server for
+    /// processing, however, the RPC may still fail by throwing an error from its
+    /// `messages` sequence. The `failure` case indicates that the RPC was
+    /// rejected by the server.
+    public var accepted: Result<Contents, RPCError>
+
+    /// Creates a new response.
+    ///
+    /// - Parameter accepted: The result of the RPC.
+    public init(accepted: Result<Contents, RPCError>) {
+      self.accepted = accepted
+    }
+  }
+}
+
+// MARK: - Convenience API
+
+extension ClientResponse.Single {
+  /// Creates a new accepted response.
+  ///
+  /// - Parameters:
+  ///   - metadata: Metadata received from the server at the beginning of the response.
+  ///   - message: The response message received from the server.
+  ///   - trailingMetadata: Metadata received from the server at the end of the response.
+  public init(message: Message, metadata: Metadata = [:], trailingMetadata: Metadata = [:]) {
+    let contents = Contents(
+      metadata: metadata,
+      message: message,
+      trailingMetadata: trailingMetadata
+    )
+    self.accepted = .success(contents)
+  }
+
+  /// Creates a new failed response.
+  ///
+  /// - Parameters:
+  ///   - messageType: The type of message.
+  ///   - error: An error describing why the RPC failed.
+  public init(of messageType: Message.Type = Message.self, error: RPCError) {
+    self.accepted = .failure(error)
+  }
+
+  /// Returns metadata received from the server at the start of the response.
+  ///
+  /// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty.
+  public var metadata: Metadata {
+    switch self.accepted {
+    case let .success(contents):
+      return contents.metadata
+    case .failure:
+      return [:]
+    }
+  }
+
+  /// Returns the message received from the server.
+  ///
+  /// - Throws: ``RPCError`` if the request failed.
+  public var message: Message {
+    get throws {
+      try self.accepted.map { $0.message }.get()
+    }
+  }
+
+  /// Returns metadata received from the server at the end of the response.
+  ///
+  /// Unlike ``metadata``, for rejected RPCs the metadata returned may contain values.
+  public var trailingMetadata: Metadata {
+    switch self.accepted {
+    case let .success(contents):
+      return contents.trailingMetadata
+    case let .failure(error):
+      return error.metadata
+    }
+  }
+}
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension ClientResponse.Stream {
+  /// Creates a new accepted response.
+  ///
+  /// - Parameters:
+  ///   - messageType: The type of message.
+  ///   - metadata: Metadata received from the server at the beginning of the response.
+  ///   - bodyParts: An ``RPCAsyncSequence`` of response parts received from the server.
+  public init(
+    of messageType: Message.Type = Message.self,
+    metadata: Metadata,
+    bodyParts: RPCAsyncSequence<Contents.BodyPart>
+  ) {
+    let contents = Contents(metadata: metadata, bodyParts: bodyParts)
+    self.accepted = .success(contents)
+  }
+
+  /// Creates a new failed response.
+  ///
+  /// - Parameters:
+  ///   - messageType: The type of message.
+  ///   - error: An error describing why the RPC failed.
+  public init(of messageType: Message.Type = Message.self, error: RPCError) {
+    self.accepted = .failure(error)
+  }
+
+  /// Returns metadata received from the server at the start of the response.
+  ///
+  /// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty.
+  public var metadata: Metadata {
+    switch self.accepted {
+    case let .success(contents):
+      return contents.metadata
+    case .failure:
+      return [:]
+    }
+  }
+
+  /// Returns metadata received from the server at the end of the response.
+  ///
+  /// Unlike ``metadata``, for rejected RPCs the metadata returned may contain values.
+  public var messages: RPCAsyncSequence<Message> {
+    switch self.accepted {
+    case let .success(contents):
+      let filtered = contents.bodyParts.compactMap {
+        switch $0 {
+        case let .message(message):
+          return message
+        case .trailingMetadata:
+          return nil
+        }
+      }
+
+      return RPCAsyncSequence(wrapping: filtered)
+
+    case let .failure(error):
+      return RPCAsyncSequence.throwing(error)
+    }
+  }
+}
+
+// MARK: - Conversion
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension ClientResponse.Single {
+  @_spi(Testing)
+  public init(stream response: ClientResponse.Stream<Message>) async {
+    switch response.accepted {
+    case .success(let contents):
+      do {
+        let metadata = contents.metadata
+        var iterator = contents.bodyParts.makeAsyncIterator()
+
+        // Happy path: message, trailing metadata, nil.
+        let part1 = try await iterator.next()
+        let part2 = try await iterator.next()
+        let part3 = try await iterator.next()
+
+        switch (part1, part2, part3) {
+        case (.some(.message(let message)), .some(.trailingMetadata(let trailingMetadata)), .none):
+          let contents = Contents(
+            metadata: metadata,
+            message: message,
+            trailingMetadata: trailingMetadata
+          )
+          self.accepted = .success(contents)
+
+        case (.some(.message), .some(.message), _):
+          let error = RPCError(
+            code: .unimplemented,
+            message: """
+              Multiple messages received, but only one is expected. The server may have \
+              incorrectly implemented the RPC or the client and server may have a different \
+              opinion on whether this RPC streams responses.
+              """
+          )
+          self.accepted = .failure(error)
+
+        case (.some(.trailingMetadata), .none, .none):
+          let error = RPCError(
+            code: .unimplemented,
+            message: "No messages received, exactly one was expected."
+          )
+          self.accepted = .failure(error)
+
+        case (_, _, _):
+          let error = RPCError(
+            code: .internalError,
+            message: """
+              The stream from the client transport is invalid. This is likely to be an incorrectly \
+              implemented transport. Received parts: \([part1, part2, part3])."
+              """
+          )
+          self.accepted = .failure(error)
+        }
+      } catch let error as RPCError {
+        // Known error type.
+        self.accepted = .failure(error)
+      } catch {
+        // Unexpected, but should be handled nonetheless.
+        self.accepted = .failure(RPCError(code: .unknown, message: String(describing: error)))
+      }
+
+    case .failure(let error):
+      self.accepted = .failure(error)
+    }
+  }
+}

+ 58 - 0
Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift

@@ -0,0 +1,58 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension RPCAsyncSequence {
+  /// Returns an ``RPCAsyncSequence`` containing just the given element.
+  @_spi(Testing)
+  public static func one(_ element: Element) -> Self {
+    return Self(wrapping: AsyncSequenceOfOne<Element, Never>(result: .success(element)))
+  }
+
+  /// Returns an ``RPCAsyncSequence`` throwing the given error.
+  @_spi(Testing)
+  public static func throwing<E: Error>(_ error: E) -> Self {
+    return Self(wrapping: AsyncSequenceOfOne<Element, E>(result: .failure(error)))
+  }
+}
+
+/// An `AsyncSequence` of a single value.
+private struct AsyncSequenceOfOne<Element: Sendable, Failure: Error>: AsyncSequence {
+  private let result: Result<Element, Failure>
+
+  init(result: Result<Element, Failure>) {
+    self.result = result
+  }
+
+  func makeAsyncIterator() -> AsyncIterator {
+    AsyncIterator(result: self.result)
+  }
+
+  struct AsyncIterator: AsyncIteratorProtocol {
+    private var result: Result<Element, Failure>?
+
+    fileprivate init(result: Result<Element, Failure>) {
+      self.result = result
+    }
+
+    mutating func next() async throws -> Element? {
+      guard let result = self.result else { return nil }
+
+      self.result = nil
+      return try result.get()
+    }
+  }
+}

+ 31 - 0
Tests/GRPCCoreTests/Call/ClientRequestTests.swift

@@ -0,0 +1,31 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@_spi(Testing) import GRPCCore
+import XCTest
+
+final class ClientRequestTests: XCTestCase {
+  func testSingleToStreamConversion() async throws {
+    let (messages, continuation) = AsyncStream.makeStream(of: String.self)
+    let single = ClientRequest.Single(message: "foo", metadata: ["bar": "baz"])
+    let stream = ClientRequest.Stream(single: single)
+
+    XCTAssertEqual(stream.metadata, ["bar": "baz"])
+    try await stream.producer(.gathering(into: continuation))
+    continuation.finish()
+    let collected = try await messages.collect()
+    XCTAssertEqual(collected, ["foo"])
+  }
+}

+ 171 - 0
Tests/GRPCCoreTests/Call/ClientResponseTests.swift

@@ -0,0 +1,171 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@_spi(Testing) import GRPCCore
+import XCTest
+
+final class ClientResponseTests: XCTestCase {
+  func testAcceptedSingleResponseConvenienceMethods() {
+    let response = ClientResponse.Single(
+      message: "message",
+      metadata: ["foo": "bar"],
+      trailingMetadata: ["bar": "baz"]
+    )
+
+    XCTAssertEqual(response.metadata, ["foo": "bar"])
+    XCTAssertEqual(try response.message, "message")
+    XCTAssertEqual(response.trailingMetadata, ["bar": "baz"])
+  }
+
+  func testRejectedSingleResponseConvenienceMethods() {
+    let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"])
+    let response = ClientResponse.Single(of: String.self, error: error)
+
+    XCTAssertEqual(response.metadata, [:])
+    XCTAssertThrowsRPCError(try response.message) {
+      XCTAssertEqual($0, error)
+    }
+    XCTAssertEqual(response.trailingMetadata, ["bar": "baz"])
+  }
+
+  func testAcceptedStreamResponseConvenienceMethods() async throws {
+    let response = ClientResponse.Stream(
+      of: String.self,
+      metadata: ["foo": "bar"],
+      bodyParts: RPCAsyncSequence(
+        wrapping: AsyncStream {
+          $0.yield(.message("foo"))
+          $0.yield(.message("bar"))
+          $0.yield(.message("baz"))
+          $0.yield(.trailingMetadata(["baz": "baz"]))
+          $0.finish()
+        }
+      )
+    )
+
+    XCTAssertEqual(response.metadata, ["foo": "bar"])
+    let messages = try await response.messages.collect()
+    XCTAssertEqual(messages, ["foo", "bar", "baz"])
+  }
+
+  func testRejectedStreamResponseConvenienceMethods() async throws {
+    let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"])
+    let response = ClientResponse.Stream(of: String.self, error: error)
+
+    XCTAssertEqual(response.metadata, [:])
+    await XCTAssertThrowsRPCErrorAsync {
+      try await response.messages.collect()
+    } errorHandler: {
+      XCTAssertEqual($0, error)
+    }
+  }
+
+  func testStreamToSingleConversionForValidStream() async throws {
+    let stream = ClientResponse.Stream(
+      of: String.self,
+      metadata: ["foo": "bar"],
+      bodyParts: .elements(.message("foo"), .trailingMetadata(["bar": "baz"]))
+    )
+
+    let single = await ClientResponse.Single(stream: stream)
+    XCTAssertEqual(single.metadata, ["foo": "bar"])
+    XCTAssertEqual(try single.message, "foo")
+    XCTAssertEqual(single.trailingMetadata, ["bar": "baz"])
+  }
+
+  func testStreamToSingleConversionForFailedStream() async throws {
+    let error = RPCError(code: .aborted, message: "aborted", metadata: ["bar": "baz"])
+    let stream = ClientResponse.Stream(of: String.self, error: error)
+
+    let single = await ClientResponse.Single(stream: stream)
+    XCTAssertEqual(single.metadata, [:])
+    XCTAssertThrowsRPCError(try single.message) {
+      XCTAssertEqual($0, error)
+    }
+    XCTAssertEqual(single.trailingMetadata, ["bar": "baz"])
+  }
+
+  func testStreamToSingleConversionForInvalidSingleStream() async throws {
+    let bodies: [[ClientResponse.Stream<String>.Contents.BodyPart]] = [
+      [.message("1"), .message("2")],  // Too many messages.
+      [.trailingMetadata([:])],  // Too few messages
+    ]
+
+    for body in bodies {
+      let stream = ClientResponse.Stream(
+        of: String.self,
+        metadata: ["foo": "bar"],
+        bodyParts: .elements(body)
+      )
+
+      let single = await ClientResponse.Single(stream: stream)
+      XCTAssertEqual(single.metadata, [:])
+      XCTAssertThrowsRPCError(try single.message) { error in
+        XCTAssertEqual(error.code, .unimplemented)
+      }
+      XCTAssertEqual(single.trailingMetadata, [:])
+    }
+  }
+
+  func testStreamToSingleConversionForInvalidStream() async throws {
+    let bodies: [[ClientResponse.Stream<String>.Contents.BodyPart]] = [
+      [],  // Empty stream
+      [.trailingMetadata([:]), .trailingMetadata([:])],  // Multiple metadatas
+      [.trailingMetadata([:]), .message("")],  // Metadata then message
+    ]
+
+    for body in bodies {
+      let stream = ClientResponse.Stream(
+        of: String.self,
+        metadata: ["foo": "bar"],
+        bodyParts: .elements(body)
+      )
+
+      let single = await ClientResponse.Single(stream: stream)
+      XCTAssertEqual(single.metadata, [:])
+      XCTAssertThrowsRPCError(try single.message) { error in
+        XCTAssertEqual(error.code, .internalError)
+      }
+      XCTAssertEqual(single.trailingMetadata, [:])
+    }
+  }
+
+  func testStreamToSingleConversionForStreamThrowingRPCError() async throws {
+    let error = RPCError(code: .dataLoss, message: "oops")
+    let stream = ClientResponse.Stream(
+      of: String.self,
+      metadata: [:],
+      bodyParts: .throwing(error)
+    )
+
+    let single = await ClientResponse.Single(stream: stream)
+    XCTAssertThrowsRPCError(try single.message) {
+      XCTAssertEqual($0, error)
+    }
+  }
+
+  func testStreamToSingleConversionForStreamThrowingUnknownError() async throws {
+    let stream = ClientResponse.Stream(
+      of: String.self,
+      metadata: [:],
+      bodyParts: .throwing(CancellationError())
+    )
+
+    let single = await ClientResponse.Single(stream: stream)
+    XCTAssertThrowsRPCError(try single.message) { error in
+      XCTAssertEqual(error.code, .unknown)
+    }
+  }
+}

+ 39 - 0
Tests/GRPCCoreTests/Stream/AsyncSequenceOfOne.swift

@@ -0,0 +1,39 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@_spi(Testing) import GRPCCore
+import XCTest
+
+internal final class AsyncSequenceOfOneTests: XCTestCase {
+  func testSuccessPath() async throws {
+    let sequence = RPCAsyncSequence.one("foo")
+    let contents = try await sequence.collect()
+    XCTAssertEqual(contents, ["foo"])
+  }
+
+  func testFailurePath() async throws {
+    let sequence = RPCAsyncSequence<String>.throwing(RPCError(code: .cancelled, message: "foo"))
+
+    do {
+      let _ = try await sequence.collect()
+      XCTFail("Expected an error to be thrown")
+    } catch let error as RPCError {
+      XCTAssertEqual(error.code, .cancelled)
+      XCTAssertEqual(error.message, "foo")
+    } catch {
+      XCTFail("Expected error of type RPCError to be thrown")
+    }
+  }
+}

+ 36 - 0
Tests/GRPCCoreTests/Test Utilities/AsyncSequence+Utilities.swift

@@ -0,0 +1,36 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+extension AsyncSequence {
+  func collect() async throws -> [Element] {
+    return try await self.reduce(into: []) { $0.append($1) }
+  }
+}
+
+#if swift(<5.9)
+extension AsyncStream {
+  static func makeStream(
+    of elementType: Element.Type = Element.self,
+    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
+  ) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
+    var continuation: AsyncStream<Element>.Continuation!
+    let stream = AsyncStream(Element.self, bufferingPolicy: limit) {
+      continuation = $0
+    }
+    return (stream, continuation)
+  }
+}
+#endif

+ 32 - 0
Tests/GRPCCoreTests/Test Utilities/RPCAsyncSequence+Utilities.swift

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPCCore
+
+extension RPCAsyncSequence {
+  static func elements(_ elements: Element...) -> Self {
+    return .elements(elements)
+  }
+
+  static func elements(_ elements: [Element]) -> Self {
+    let stream = AsyncStream<Element> {
+      for element in elements {
+        $0.yield(element)
+      }
+      $0.finish()
+    }
+    return RPCAsyncSequence(wrapping: stream)
+  }
+}

+ 49 - 0
Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift

@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPCCore
+import XCTest
+
+extension RPCWriter {
+  /// Returns a writer which calls `XCTFail(_:)` on every write.
+  static func failTestOnWrite(elementType: Element.Type = Element.self) -> Self {
+    return RPCWriter(wrapping: FailOnWrite())
+  }
+
+  /// Returns a writer which gathers writes into an `AsyncStream`.
+  static func gathering(into continuation: AsyncStream<Element>.Continuation) -> Self {
+    return RPCWriter(wrapping: AsyncStreamGatheringWriter(continuation: continuation))
+  }
+}
+
+private struct FailOnWrite<Element>: RPCWriterProtocol {
+  func write(contentsOf elements: some Sequence<Element>) async throws {
+    XCTFail("Unexpected write")
+  }
+}
+
+private struct AsyncStreamGatheringWriter<Element>: RPCWriterProtocol {
+  let continuation: AsyncStream<Element>.Continuation
+
+  init(continuation: AsyncStream<Element>.Continuation) {
+    self.continuation = continuation
+  }
+
+  func write(contentsOf elements: some Sequence<Element>) async throws {
+    for element in elements {
+      self.continuation.yield(element)
+    }
+  }
+}

+ 28 - 0
Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift

@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import GRPCCore
 import XCTest
 
 func XCTAssertDescription(
@@ -23,3 +24,30 @@ func XCTAssertDescription(
 ) {
   XCTAssertEqual(String(describing: subject), expected, file: file, line: line)
 }
+
+func XCTAssertThrowsRPCError<T>(
+  _ expression: @autoclosure () throws -> T,
+  _ errorHandler: (RPCError) -> Void
+) {
+  XCTAssertThrowsError(try expression()) { error in
+    guard let error = error as? RPCError else {
+      return XCTFail("Error had unexpected type '\(type(of: error))'")
+    }
+
+    errorHandler(error)
+  }
+}
+
+func XCTAssertThrowsRPCErrorAsync<T>(
+  _ expression: () async throws -> T,
+  errorHandler: (RPCError) -> Void
+) async {
+  do {
+    _ = try await expression()
+    XCTFail("Expression didn't throw")
+  } catch let error as RPCError {
+    errorHandler(error)
+  } catch {
+    XCTFail("Error had unexpected type '\(type(of: error))'")
+  }
+}