Browse Source

Add transport protocols and surrounding types (#1660)

Motivation:

The transport protocols provide a lower-level abstraction for different
conection protocols. For example, there will be an HTTP/2 transport and
in the future, when an implementation arises, a separate HTTP/3 transport.
This change adds these interfaces and a handful of related types which
surround the transport protocols.

Modifications:

- Add RPC parts
- Add RPC stream composed of streams inbound and outbound RPC parts
- Add async sequence wrapper
- Add writer protocol and wrapper
- Add various currency types
- Tests

Results:

Transport protocols and related types are in place
George Barnett 2 years ago
parent
commit
2abc8bd2b1

+ 290 - 0
Sources/GRPCCore/Call/ClientRPCExecutionConfiguration.swift

@@ -0,0 +1,290 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// Configuration values for executing an RPC.
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+public struct ClientRPCExecutionConfiguration: Hashable, Sendable {
+  /// The default timeout for the RPC.
+  ///
+  /// If no reply is received in the specified amount of time the request is aborted
+  /// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``.
+  ///
+  /// The actual deadline used will be the minimum of the value specified here
+  /// and the value set by the application by the client API. If either one isn't set
+  /// then the other value is used. If neither is set then the request has no deadline.
+  ///
+  /// The timeout applies to the overall execution of an RPC. If, for example, a retry
+  /// policy is set then the timeout begins when the first attempt is started and _isn't_ reset
+  /// when subsequent attempts start.
+  public var timeout: Duration?
+
+  /// The policy determining how many times, and when, the RPC is executed.
+  ///
+  /// There are two policy types:
+  /// 1. Retry
+  /// 2. Hedging
+  ///
+  /// The retry policy allows an RPC to be retried a limited number of times if the RPC
+  /// fails with one of the configured set of status codes. RPCs are only retried if they
+  /// fail immediately, that is, the first response part received from the server is a
+  /// status code.
+  ///
+  /// The hedging policy allows an RPC to be executed multiple times concurrently. Typically
+  /// each execution will be staggered by some delay. The first successful response will be
+  /// reported to the client. Hedging is only suitable for idempotent RPCs.
+  public var executionPolicy: ExecutionPolicy?
+
+  /// Create an execution configuration.
+  ///
+  /// - Parameters:
+  ///   - executionPolicy: The execution policy to use for the RPC.
+  ///   - timeout: The default timeout for the RPC.
+  public init(
+    executionPolicy: ExecutionPolicy?,
+    timeout: Duration?
+  ) {
+    self.executionPolicy = executionPolicy
+    self.timeout = timeout
+  }
+
+  /// Create an execution configuration with a retry policy.
+  ///
+  /// - Parameters:
+  ///   - retryPolicy: The policy for retrying the RPC.
+  ///   - timeout: The default timeout for the RPC.
+  public init(
+    retryPolicy: RetryPolicy,
+    timeout: Duration? = nil
+  ) {
+    self.executionPolicy = .retry(retryPolicy)
+    self.timeout = timeout
+  }
+
+  /// Create an execution configuration with a hedging policy.
+  ///
+  /// - Parameters:
+  ///   - hedgingPolicy: The policy for hedging the RPC.
+  ///   - timeout: The default timeout for the RPC.
+  public init(
+    hedgingPolicy: HedgingPolicy,
+    timeout: Duration? = nil
+  ) {
+    self.executionPolicy = .hedge(hedgingPolicy)
+    self.timeout = timeout
+  }
+}
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension ClientRPCExecutionConfiguration {
+  /// The execution policy for an RPC.
+  public enum ExecutionPolicy: Hashable, Sendable {
+    /// Policy for retrying an RPC.
+    ///
+    /// See ``RetryPolicy`` for more details.
+    case retry(RetryPolicy)
+
+    /// Policy for hedging an RPC.
+    ///
+    /// See ``HedgingPolicy`` for more details.
+    case hedge(HedgingPolicy)
+  }
+}
+
+/// Policy for retrying an RPC.
+///
+/// gRPC retries RPCs when the first response from the server is a status code which matches
+/// one of the configured retryable status codes. If the server begins processing the RPC and
+/// first responds with metadata and later responds with a retryable status code then the RPC
+/// won't be retried.
+///
+/// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The
+/// maximum number of attempts is limited to five.
+///
+/// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will
+/// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally,
+/// the nth retry will happen after a randomly chosen delay between zero
+/// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`.
+///
+/// For more information see [gRFC A6 Client
+/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+public struct RetryPolicy: Hashable, Sendable {
+  /// The maximum number of RPC attempts, including the original attempt.
+  ///
+  /// Must be greater than one, values greater than five are treated as five.
+  public var maximumAttempts: Int {
+    didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) }
+  }
+
+  /// The initial backoff duration.
+  ///
+  /// The initial retry will occur after a random amount of time up to this value.
+  ///
+  /// - Precondition: Must be greater than zero.
+  public var initialBackoff: Duration {
+    willSet { Self.validateInitialBackoff(newValue) }
+  }
+
+  /// The maximum amount of time to backoff for.
+  ///
+  /// - Precondition: Must be greater than zero.
+  public var maximumBackoff: Duration {
+    willSet { Self.validateMaxBackoff(newValue) }
+  }
+
+  /// The multiplier to apply to backoff.
+  ///
+  /// - Precondition: Must be greater than zero.
+  public var backoffMultiplier: Double {
+    willSet { Self.validateBackoffMultiplier(newValue) }
+  }
+
+  /// The set of status codes which may be retried.
+  ///
+  /// - Precondition: Must not be empty.
+  public var retryableStatusCodes: Set<Status.Code> {
+    willSet { Self.validateRetryableStatusCodes(newValue) }
+  }
+
+  /// Create a new retry policy.
+  ///
+  /// - Parameters:
+  ///   - maximumAttempts: The maximum number of attempts allowed for the RPC.
+  ///   - initialBackoff: The initial backoff period for the first retry attempt. Must be
+  ///       greater than zero.
+  ///   - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than
+  ///       zero.
+  ///   - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero.
+  ///   - retryableStatusCodes: The set of status codes which may be retried. Must not be empty.
+  /// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier`
+  ///     must be greater than zero.
+  /// - Precondition: `retryableStatusCodes` must not be empty.
+  public init(
+    maximumAttempts: Int,
+    initialBackoff: Duration,
+    maximumBackoff: Duration,
+    backoffMultiplier: Double,
+    retryableStatusCodes: Set<Status.Code>
+  ) {
+    self.maximumAttempts = validateMaxAttempts(maximumAttempts)
+
+    Self.validateInitialBackoff(initialBackoff)
+    self.initialBackoff = initialBackoff
+
+    Self.validateMaxBackoff(maximumBackoff)
+    self.maximumBackoff = maximumBackoff
+
+    Self.validateBackoffMultiplier(backoffMultiplier)
+    self.backoffMultiplier = backoffMultiplier
+
+    Self.validateRetryableStatusCodes(retryableStatusCodes)
+    self.retryableStatusCodes = retryableStatusCodes
+  }
+
+  private static func validateInitialBackoff(_ value: Duration) {
+    precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero")
+  }
+
+  private static func validateMaxBackoff(_ value: Duration) {
+    precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero")
+  }
+
+  private static func validateBackoffMultiplier(_ value: Double) {
+    precondition(value > 0, "backoffMultiplier must be greater than zero")
+  }
+
+  private static func validateRetryableStatusCodes(_ value: Set<Status.Code>) {
+    precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty")
+  }
+}
+
+/// Policy for hedging an RPC.
+///
+/// Hedged RPCs may execute more than once on a server so only idempotent methods should
+/// be hedged.
+///
+/// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt
+/// by ``hedgingDelay``.
+///
+/// For more information see [gRFC A6 Client
+/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+public struct HedgingPolicy: Hashable, Sendable {
+  /// The maximum number of RPC attempts, including the original attempt.
+  ///
+  /// Values greater than five are treated as five.
+  ///
+  /// - Precondition: Must be greater than one.
+  public var maximumAttempts: Int {
+    didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) }
+  }
+
+  /// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals
+  /// of `hedgingDelay`. Set this to zero to immediately send all RPCs.
+  public var hedgingDelay: Duration {
+    willSet { Self.validateHedgingDelay(newValue) }
+  }
+
+  /// The set of status codes which indicate other hedged RPCs may still succeed.
+  ///
+  /// If a non-fatal status code is returned by the server, hedged RPCs will continue.
+  /// Otherwise, outstanding requests will be cancelled and the error returned to the
+  /// application layer.
+  public var nonFatalStatusCodes: Set<Status.Code>
+
+  /// Create a new hedging policy.
+  ///
+  /// - Parameters:
+  ///   - maximumAttempts: The maximum number of attempts allowed for the RPC.
+  ///   - hedgingDelay: The delay between each hedged RPC.
+  ///   - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still
+  ///       succeed.
+  /// - Precondition: `maximumAttempts` must be greater than zero.
+  public init(
+    maximumAttempts: Int,
+    hedgingDelay: Duration,
+    nonFatalStatusCodes: Set<Status.Code>
+  ) {
+    self.maximumAttempts = validateMaxAttempts(maximumAttempts)
+
+    Self.validateHedgingDelay(hedgingDelay)
+    self.hedgingDelay = hedgingDelay
+    self.nonFatalStatusCodes = nonFatalStatusCodes
+  }
+
+  private static func validateHedgingDelay(_ value: Duration) {
+    precondition(
+      value.isGreaterThanOrEqualToZero,
+      "hedgingDelay must be greater than or equal to zero"
+    )
+  }
+}
+
+private func validateMaxAttempts(_ value: Int) -> Int {
+  precondition(value > 0, "maximumAttempts must be greater than zero")
+  return min(value, 5)
+}
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension Duration {
+  fileprivate var isGreaterThanZero: Bool {
+    self.components.seconds > 0 || self.components.attoseconds > 0
+  }
+
+  fileprivate var isGreaterThanOrEqualToZero: Bool {
+    self.components.seconds >= 0 || self.components.attoseconds >= 0
+  }
+}

+ 46 - 0
Sources/GRPCCore/MethodDescriptor.swift

@@ -0,0 +1,46 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A description of a method on a service.
+public struct MethodDescriptor: Sendable, Hashable {
+  /// The name of the service, including the package name.
+  ///
+  /// For example, the name of the "Greeter" service in "helloworld" package
+  /// is "helloworld.Greeter".
+  public var service: String
+
+  /// The name of the method in the service, excluding the service name.
+  public var method: String
+
+  /// The fully qualified method name in the format "package.service/method".
+  ///
+  /// For example, the fully qualified name of the "SayHello" method of the "Greeter" service in
+  /// "helloworld" package is "helloworld.Greeter/SayHelllo".
+  public var fullyQualifiedMethod: String {
+    "\(self.service)/\(self.method)"
+  }
+
+  /// Creates a new method descriptor.
+  ///
+  /// - Parameters:
+  ///   - service: The name of the service, including the package name. For example,
+  ///       "helloworld.Greeter".
+  ///   - method: The name of the method. For example, "SayHello".
+  public init(service: String, method: String) {
+    self.service = service
+    self.method = method
+  }
+}

+ 49 - 0
Sources/GRPCCore/Stream/RPCAsyncSequence.swift

@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A type-erasing `AsyncSequence`.
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+public struct RPCAsyncSequence<Element>: AsyncSequence, Sendable {
+  private let _makeAsyncIterator: @Sendable () -> AsyncIterator
+
+  /// Creates an ``RPCAsyncSequence`` by wrapping another `AsyncSequence`.
+  public init<S: AsyncSequence>(wrapping other: S) where S.Element == Element {
+    self._makeAsyncIterator = {
+      AsyncIterator(wrapping: other.makeAsyncIterator())
+    }
+  }
+
+  public func makeAsyncIterator() -> AsyncIterator {
+    self._makeAsyncIterator()
+  }
+
+  public struct AsyncIterator: AsyncIteratorProtocol {
+    private var iterator: any AsyncIteratorProtocol
+
+    fileprivate init<Iterator>(
+      wrapping other: Iterator
+    ) where Iterator: AsyncIteratorProtocol, Iterator.Element == Element {
+      self.iterator = other
+    }
+
+    public mutating func next() async throws -> Element? {
+      return try await self.iterator.next() as? Element
+    }
+  }
+}
+
+@available(*, unavailable)
+extension RPCAsyncSequence.AsyncIterator: Sendable {}

+ 37 - 0
Sources/GRPCCore/Stream/RPCWriter.swift

@@ -0,0 +1,37 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A type-erasing ``RPCWriterProtocol``.
+public struct RPCWriter<Element>: Sendable, RPCWriterProtocol {
+  private let writer: any RPCWriterProtocol<Element>
+
+  /// Creates an ``RPCWriter`` by wrapping the `other` writer.
+  ///
+  /// - Parameter other: The writer to wrap.
+  public init(wrapping other: some RPCWriterProtocol<Element>) {
+    self.writer = other
+  }
+
+  /// Writes a sequence of elements.
+  ///
+  /// This function suspends until the elements have been accepted. Implements can use this
+  /// to exert backpressure on callers.
+  ///
+  /// - Parameter elements: The elements to write.
+  public func write(contentsOf elements: some Sequence<Element>) async throws {
+    try await self.writer.write(contentsOf: elements)
+  }
+}

+ 58 - 0
Sources/GRPCCore/Stream/RPCWriterProtocol.swift

@@ -0,0 +1,58 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A sink for values which are produced over time.
+public protocol RPCWriterProtocol<Element>: Sendable {
+  /// The type of value written.
+  associatedtype Element
+
+  /// Writes a sequence of elements.
+  ///
+  /// This function suspends until the elements have been accepted. Implements can use this
+  /// to exert backpressure on callers.
+  ///
+  /// - Parameter elements: The elements to write.
+  func write(contentsOf elements: some Sequence<Element>) async throws
+}
+
+extension RPCWriterProtocol {
+  /// Writes a single element into the sink.
+  ///
+  /// - Parameter element: The element to write.
+  public func write(_ element: Element) async throws {
+    try await self.write(contentsOf: CollectionOfOne(element))
+  }
+
+  /// Writes an `AsyncSequence` of values into the sink.
+  ///
+  /// - Parameter elements: The elements to write.
+  @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+  public func write<Elements: AsyncSequence>(
+    _ elements: Elements
+  ) async throws where Elements.Element == Element {
+    for try await element in elements {
+      try await self.write(element)
+    }
+  }
+}
+
+public protocol ClosableRPCWriterProtocol<Element>: RPCWriterProtocol {
+  /// Indicate to the writer that no more writes are to be accepted.
+  ///
+  /// All writes after ``finish()`` has been called should result in an error
+  /// being thrown.
+  func finish()
+}

+ 69 - 0
Sources/GRPCCore/Transport/ClientTransport.swift

@@ -0,0 +1,69 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+public protocol ClientTransport: Sendable {
+  associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart
+  associatedtype Outbound: ClosableRPCWriterProtocol<RPCRequestPart>
+
+  /// Establish and maintain a connection to the remote destination.
+  ///
+  /// Maintains a long-lived connection, or set of connections, to a remote destination.
+  /// Connections may be added or removed over time as required by the implementation and the
+  /// demand for streams by the client.
+  ///
+  /// Implementations of this function will typically create a long-lived task group which
+  /// maintains connections. The function exits when connections are no longer required by
+  /// the caller who signals this by calling ``close()`` to indicate that no new streams are
+  /// required or by cancelling the task this function runs in.
+  ///
+  /// - Parameter lazily: Whether the transport should establish connections lazily, that is,
+  ///     when the first stream is opened or eagerly, when this function is called. If `false`
+  ///     then the transport should attempt to establish a connection immediately. Note that
+  ///     this is a _hint_: transports aren't required to respect this value and you should
+  ///     refer to the documentation of the transport you're using to check whether it's supported.
+  func connect(lazily: Bool) async throws
+
+  /// Signal to the transport that no new streams may be created.
+  ///
+  /// Existing streams may run to completion naturally but calling ``openStream(descriptor:)``
+  /// should result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown.
+  ///
+  /// If you want to forcefully cancel all active streams then cancel the task
+  /// running ``connect(lazily:)``.
+  func close()
+
+  /// Open a stream using the transport.
+  ///
+  /// Transport implementations should throw an ``RPCError`` with the following error codes:
+  /// - ``RPCError/Code/failedPrecondition`` if the transport is closing or has been closed.
+  /// - ``RPCError/Code/unavailable`` if it's temporarily not possible to create a stream and it
+  ///   may be possible after some backoff period.
+  ///
+  /// - Parameter descriptor: A description of the method to open a stream for.
+  /// - Returns: A stream.
+  func openStream(
+    descriptor: MethodDescriptor
+  ) async throws -> RPCStream<Inbound, Outbound>
+
+  /// Returns the execution configuration for a given method.
+  ///
+  /// - Parameter descriptor: The method to lookup configuration for.
+  /// - Returns: Execution configuration for the method, if it exists.
+  func executionConfiguration(
+    forMethod descriptor: MethodDescriptor
+  ) -> ClientRPCExecutionConfiguration?
+}

+ 45 - 0
Sources/GRPCCore/Transport/RPCParts.swift

@@ -0,0 +1,45 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// Part of a request sent from a client to a server in a stream.
+public enum RPCRequestPart: Hashable, Sendable {
+  /// Key-value pairs sent at the start of a request stream. Only one ``metadata(_:)`` value may
+  /// be sent to the server.
+  case metadata(Metadata)
+
+  /// The bytes of a serialized message to send to the server. A stream may have any number of
+  /// messages sent on it. Restrictions for unary request or response streams are imposed at a
+  /// higher level.
+  case message([UInt8])
+}
+
+/// Part of a response sent from a server to a client in a stream.
+public enum RPCResponsePart: Hashable, Sendable {
+  /// Key-value pairs sent at the start of the response stream. At most one ``metadata(_:)`` value
+  /// may be sent to the client. If the server sends ``metadata(_:)`` it must be the first part in
+  /// the response stream.
+  case metadata(Metadata)
+
+  /// The bytes of a serialized message to send to the client. A stream may have any number of
+  /// messages sent on it. Restrictions for unary request or response streams are imposed at a
+  /// higher level.
+  case message([UInt8])
+
+  /// A status and key-value pairs sent to the client at the end of the response stream. Every
+  /// response stream must have exactly one ``status(_:_:)`` as the final part of the request
+  /// stream.
+  case status(Status, Metadata)
+}

+ 37 - 0
Sources/GRPCCore/Transport/RPCStream.swift

@@ -0,0 +1,37 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A bidirectional communication channel between a client and server for a given method.
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+public struct RPCStream<
+  Inbound: AsyncSequence & Sendable,
+  Outbound: ClosableRPCWriterProtocol & Sendable
+>: Sendable {
+  /// Information about the method this stream is for.
+  public var descriptor: MethodDescriptor
+
+  /// A sequence of messages received from the network.
+  public var inbound: Inbound
+
+  /// A writer for messages sent across the network.
+  public var outbound: Outbound
+
+  public init(descriptor: MethodDescriptor, inbound: Inbound, outbound: Outbound) {
+    self.descriptor = descriptor
+    self.inbound = inbound
+    self.outbound = outbound
+  }
+}

+ 39 - 0
Sources/GRPCCore/Transport/ServerTransport.swift

@@ -0,0 +1,39 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+public protocol ServerTransport {
+  associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCRequestPart
+  associatedtype Outbound: ClosableRPCWriterProtocol<RPCResponsePart>
+
+  /// Starts the transport and returns a sequence of accepted streams to handle.
+  ///
+  /// Implementations will typically bind to a listening port when this function is called
+  /// and start accepting new connections. Each accepted inbound RPC stream should be published
+  /// to the async sequence returned by the function.
+  ///
+  /// You can call ``stopListening()`` to stop the transport from accepting new streams. Existing
+  /// streams must be allowed to complete naturally. However, transports may also enforce a grace
+  /// period after which any open streams may be cancelled. You can also cancel the task running
+  /// ``listen()`` to abruptly close connections and streams.
+  func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>>
+
+  /// Indicates to the transport that no new streams should be accepted.
+  ///
+  /// Existing streams are permitted to run to completion. However, the transport may also enforce
+  /// a grace period, after which remaining streams are cancelled.
+  func stopListening()
+}

+ 49 - 0
Tests/GRPCCoreTests/Call/ClientRPCExecutionConfigurationTests.swift

@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPCCore
+import XCTest
+
+final class ClientRPCExecutionConfigurationTests: XCTestCase {
+  func testRetryPolicyClampsMaxAttempts() {
+    var policy = RetryPolicy(
+      maximumAttempts: 10,
+      initialBackoff: .seconds(1),
+      maximumBackoff: .seconds(1),
+      backoffMultiplier: 1.0,
+      retryableStatusCodes: [.unavailable]
+    )
+
+    // Should be clamped on init
+    XCTAssertEqual(policy.maximumAttempts, 5)
+    // and when modifying
+    policy.maximumAttempts = 10
+    XCTAssertEqual(policy.maximumAttempts, 5)
+  }
+
+  func testHedgingPolicyClampsMaxAttempts() {
+    var policy = HedgingPolicy(
+      maximumAttempts: 10,
+      hedgingDelay: .seconds(1),
+      nonFatalStatusCodes: []
+    )
+
+    // Should be clamped on init
+    XCTAssertEqual(policy.maximumAttempts, 5)
+    // and when modifying
+    policy.maximumAttempts = 10
+    XCTAssertEqual(policy.maximumAttempts, 5)
+  }
+}

+ 26 - 0
Tests/GRPCCoreTests/MethodDescriptorTests.swift

@@ -0,0 +1,26 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPCCore
+import XCTest
+
+final class MethodDescriptorTests: XCTestCase {
+  func testFullyQualifiedName() {
+    let descriptor = MethodDescriptor(service: "foo.bar", method: "Baz")
+    XCTAssertEqual(descriptor.service, "foo.bar")
+    XCTAssertEqual(descriptor.method, "Baz")
+    XCTAssertEqual(descriptor.fullyQualifiedMethod, "foo.bar/Baz")
+  }
+}

+ 24 - 0
Tests/GRPCCoreTests/RPCPartsTests.swift

@@ -0,0 +1,24 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPCCore
+import XCTest
+
+final class RPCPartsTests: XCTestCase {
+  func testPartsFitInExistentialContainer() {
+    XCTAssertLessThanOrEqual(MemoryLayout<RPCRequestPart>.size, 24)
+    XCTAssertLessThanOrEqual(MemoryLayout<RPCResponsePart>.size, 24)
+  }
+}