ソースを参照

Simplify client and server init (#1735)

Motivation:

When configuring a client and server must users will specify only the
transport and services (for a server) and potentially interceptors too.
This should be the "easy" path. Moreover these are all reference types
so it makes sense to separate them from "raw" configuration.

Creating a client also diverged somewhat from the server. Having both
follow a similar pattern simplifies things for users.

Modifications:

- Move interceptors from the client config to the init.
- Modify the server to follow a similar pattern to the server.

Results:

Server and client are configured similarly.
George Barnett 2 年 前
コミット
79865359b8

+ 0 - 6
Sources/GRPCCore/ClientError.swift

@@ -109,7 +109,6 @@ extension ClientError {
   public struct Code: Hashable, Sendable {
     private enum Value {
       case clientIsAlreadyRunning
-      case clientIsNotRunning
       case clientIsStopped
       case transportError
     }
@@ -124,11 +123,6 @@ extension ClientError {
       Self(.clientIsAlreadyRunning)
     }
 
-    /// An attempt to start an RPC was made but the client is not running.
-    public static var clientIsNotRunning: Self {
-      Self(.clientIsNotRunning)
-    }
-
     /// At attempt to start the client was made but it has already stopped.
     public static var clientIsStopped: Self {
       Self(.clientIsStopped)

+ 34 - 67
Sources/GRPCCore/GRPCClient.swift

@@ -40,9 +40,6 @@ import Atomics
 /// // Create a configuration object for the client.
 /// var configuration = GRPCClient.Configuration()
 ///
-/// // Create and add an interceptor.
-/// configuration.interceptors.add(StatsRecordingClientInterceptor())
-///
 /// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration
 /// // takes precedence over any set by the transport.
 /// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get")
@@ -56,10 +53,15 @@ import Atomics
 /// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10))
 /// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration)
 ///
-/// // Finally create a transport and instantiate the client.
+/// // Finally create a transport and instantiate the client, adding an interceptor.
 /// let inProcessServerTransport = InProcessServerTransport()
 /// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
-/// let client = GRPCClient(transport: inProcessClientTransport, configuration: configuration)
+///
+/// let client = GRPCClient(
+///   transport: inProcessClientTransport,
+///   interceptors: [StatsRecordingClientInterceptor()],
+///   configuration: configuration
+/// )
 /// ```
 ///
 /// ## Starting and stopping the client
@@ -101,6 +103,14 @@ public struct GRPCClient: Sendable {
   /// The transport which provides a bidirectional communication channel with the server.
   private let transport: any ClientTransport
 
+  /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
+  ///
+  /// The order in which interceptors are added reflects the order in which they are called. The
+  /// first interceptor added will be the first interceptor to intercept each request. The last
+  /// interceptor added will be the final interceptor to intercept each request before calling
+  /// the appropriate handler.
+  private let interceptors: [any ClientInterceptor]
+
   /// The configuration used by the client.
   public let configuration: Configuration
 
@@ -121,13 +131,23 @@ public struct GRPCClient: Sendable {
     case stopped
   }
 
-  /// Creates a new client with the given transport and configuration.
+  /// Creates a new client with the given transport, interceptors and configuration.
   ///
   /// - Parameters:
   ///   - transport: The transport used to establish a communication channel with a server.
+  ///   - interceptors: A collection of interceptors providing cross-cutting functionality to each
+  ///       accepted RPC. The order in which interceptors are added reflects the order in which they
+  ///       are called. The first interceptor added will be the first interceptor to intercept each
+  ///       request. The last interceptor added will be the final interceptor to intercept each
+  ///       request before calling the appropriate handler.
   ///   - configuration: Configuration for the client.
-  public init(transport: some ClientTransport, configuration: Configuration = Configuration()) {
+  public init(
+    transport: some ClientTransport,
+    interceptors: [any ClientInterceptor] = [],
+    configuration: Configuration = Configuration()
+  ) {
     self.transport = transport
+    self.interceptors = interceptors
     self.configuration = configuration
     self.state = ManagedAtomic(.notStarted)
   }
@@ -250,7 +270,7 @@ public struct GRPCClient: Sendable {
     deserializer: some MessageDeserializer<Response>,
     handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
   ) async throws -> ReturnValue {
-    try await bidirectionalStreaming(
+    try await self.bidirectionalStreaming(
       request: ClientRequest.Stream(single: request),
       descriptor: descriptor,
       serializer: serializer,
@@ -278,7 +298,7 @@ public struct GRPCClient: Sendable {
     deserializer: some MessageDeserializer<Response>,
     handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
   ) async throws -> ReturnValue {
-    try await bidirectionalStreaming(
+    try await self.bidirectionalStreaming(
       request: request,
       descriptor: descriptor,
       serializer: serializer,
@@ -306,7 +326,7 @@ public struct GRPCClient: Sendable {
     deserializer: some MessageDeserializer<Response>,
     handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
   ) async throws -> ReturnValue {
-    try await bidirectionalStreaming(
+    try await self.bidirectionalStreaming(
       request: ClientRequest.Stream(single: request),
       descriptor: descriptor,
       serializer: serializer,
@@ -336,13 +356,10 @@ public struct GRPCClient: Sendable {
     handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
   ) async throws -> ReturnValue {
     switch self.state.load(ordering: .sequentiallyConsistent) {
-    case .running:
+    case .notStarted, .running:
+      // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
+      // queuing the request if not yet started.
       ()
-    case .notStarted:
-      throw ClientError(
-        code: .clientIsNotRunning,
-        message: "Client must be running to make an RPC: call run() first."
-      )
     case .stopping, .stopped:
       throw ClientError(
         code: .clientIsStopped,
@@ -357,7 +374,7 @@ public struct GRPCClient: Sendable {
       serializer: serializer,
       deserializer: deserializer,
       transport: self.transport,
-      interceptors: self.configuration.interceptors.values,
+      interceptors: self.interceptors,
       handler: handler
     )
   }
@@ -383,14 +400,6 @@ public struct GRPCClient: Sendable {
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 extension GRPCClient {
   public struct Configuration: Sendable {
-    /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
-    ///
-    /// The order in which interceptors are added reflects the order in which they are called. The
-    /// first interceptor added will be the first interceptor to intercept each request. The last
-    /// interceptor added will be the final interceptor to intercept each request before calling
-    /// the appropriate handler.
-    public var interceptors: Interceptors
-
     /// Configuration for how methods are executed.
     ///
     /// Method configuration determines how each RPC is executed by the client. Some services and
@@ -401,7 +410,6 @@ extension GRPCClient {
 
     /// Creates a new default configuration.
     public init() {
-      self.interceptors = Interceptors()
       self.method = Method()
     }
   }
@@ -409,40 +417,6 @@ extension GRPCClient {
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 extension GRPCClient.Configuration {
-  /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted
-  /// RPCs.
-  ///
-  /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to
-  /// the server will first be intercepted by the first added interceptor followed by the second, and so on.
-  /// For responses from the server, they'll be applied in the opposite order.
-  public struct Interceptors: Sendable {
-    private(set) var values: [any ClientInterceptor] = []
-
-    /// Add an interceptor to the client.
-    ///
-    /// The order in which interceptors are added reflects the order in which they are called. The
-    /// first interceptor added will be the first interceptor to intercept each request. The last
-    /// interceptor added will be the final interceptor to intercept each request before calling
-    /// the appropriate handler.
-    ///
-    /// - Parameter interceptor: The interceptor to add.
-    public mutating func add(_ interceptor: some ClientInterceptor) {
-      self.values.append(interceptor)
-    }
-
-    /// Adds a sequence of interceptor to the client.
-    ///
-    /// The order in which interceptors are added reflects the order in which they are called. The
-    /// first interceptor added will be the first interceptor to intercept each request. The last
-    /// interceptor added will be the final interceptor to intercept each request before calling
-    /// the appropriate handler.
-    ///
-    /// - Parameter interceptors: The interceptors to add.
-    public mutating func add(contentsOf interceptors: some Sequence<ClientInterceptor>) {
-      self.values.append(contentsOf: interceptors)
-    }
-  }
-
   /// Configuration for how methods should be executed.
   ///
   /// In most cases the client should defer to the configuration provided by the transport as this
@@ -464,10 +438,3 @@ extension GRPCClient.Configuration {
     }
   }
 }
-
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-extension GRPCClient.Configuration.Interceptors: CustomStringConvertible {
-  public var description: String {
-    return String(describing: self.values.map { String(describing: type(of: $0)) })
-  }
-}

+ 132 - 212
Sources/GRPCCore/GRPCServer.swift

@@ -34,18 +34,22 @@ import Atomics
 /// The following example demonstrates how to create and configure a server.
 ///
 /// ```swift
-/// let server = GRPCServer()
-///
-/// // Create and add an in-process transport.
+/// // Create and an in-process transport.
 /// let inProcessTransport = InProcessServerTransport()
-/// server.transports.add(inProcessTransport)
 ///
-/// // Create and register the 'Greeter' and 'Echo' services.
-/// server.services.register(GreeterService())
-/// server.services.register(EchoService())
+/// // Create the 'Greeter' and 'Echo' services.
+/// let greeter = GreeterService()
+/// let echo = EchoService()
+///
+/// // Create an interceptor.
+/// let statsRecorder = StatsRecordingServerInterceptors()
 ///
-/// // Create and add some interceptors.
-/// server.interceptors.add(StatsRecordingServerInterceptors())
+/// // Finally create the server.
+/// let server = GRPCServer(
+///   transports: [inProcessTransport],
+///   services: [greeter, echo],
+///   interceptors: [statsRecorder]
+/// )
 /// ```
 ///
 /// ## Starting and stopping the server
@@ -66,29 +70,15 @@ import Atomics
 /// that need their lifecycles managed you should consider using [Swift Service
 /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-public final class GRPCServer: Sendable {
+public struct GRPCServer: Sendable {
   typealias Stream = RPCStream<ServerTransport.Inbound, ServerTransport.Outbound>
 
   /// A collection of ``ServerTransport`` implementations that the server uses to listen
   /// for new requests.
-  public var transports: Transports {
-    get {
-      self.storage.withLockedValue { $0.transports }
-    }
-    set {
-      self.storage.withLockedValue { $0.transports = newValue }
-    }
-  }
+  private let transports: [any ServerTransport]
 
   /// The services registered which the server is serving.
-  public var services: Services {
-    get {
-      self.storage.withLockedValue { $0.services }
-    }
-    set {
-      self.storage.withLockedValue { $0.services = newValue }
-    }
-  }
+  private let router: RPCRouter
 
   /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted
   /// RPCs.
@@ -96,34 +86,12 @@ public final class GRPCServer: Sendable {
   /// RPCs are intercepted in the order that interceptors are added. That is, a request received
   /// from the client will first be intercepted by the first added interceptor followed by the
   /// second, and so on.
-  public var interceptors: Interceptors {
-    get {
-      self.storage.withLockedValue { $0.interceptors }
-    }
-    set {
-      self.storage.withLockedValue { $0.interceptors = newValue }
-    }
-  }
-
-  /// Underlying storage for the server.
-  private struct Storage {
-    var transports: Transports
-    var services: Services
-    var interceptors: Interceptors
-    var state: State
-
-    init() {
-      self.transports = Transports()
-      self.services = Services()
-      self.interceptors = Interceptors()
-      self.state = .notStarted
-    }
-  }
-
-  private let storage: LockedValueBox<Storage>
+  private let interceptors: [any ServerInterceptor]
 
   /// The state of the server.
-  private enum State {
+  private let state: ManagedAtomic<State>
+
+  private enum State: UInt8, AtomicValue {
     /// The server hasn't been started yet. Can transition to `starting` or `stopped`.
     case notStarted
     /// The server is starting but isn't accepting requests yet. Can transition to `running`
@@ -141,11 +109,46 @@ public final class GRPCServer: Sendable {
 
   /// Creates a new server with no resources.
   ///
-  /// You can add resources to the server via ``transports-swift.property``,
-  /// ``services-swift.property``, and ``interceptors-swift.property`` and start the server by
-  /// calling ``run()``. Any changes to resources after ``run()`` has been called will be ignored.
-  public init() {
-    self.storage = LockedValueBox(Storage())
+  /// - Parameters:
+  ///   - transports: The transports the server should listen on.
+  ///   - services: Services offered by the server.
+  ///   - interceptors: A collection of interceptors providing cross-cutting functionality to each
+  ///       accepted RPC. The order in which interceptors are added reflects the order in which they
+  ///       are called. The first interceptor added will be the first interceptor to intercept each
+  ///       request. The last interceptor added will be the final interceptor to intercept each
+  ///       request before calling the appropriate handler.
+  public init(
+    transports: [any ServerTransport],
+    services: [any RegistrableRPCService],
+    interceptors: [any ServerInterceptor] = []
+  ) {
+    var router = RPCRouter()
+    for service in services {
+      service.registerMethods(with: &router)
+    }
+
+    self.init(transports: transports, router: router, interceptors: interceptors)
+  }
+
+  /// Creates a new server with no resources.
+  ///
+  /// - Parameters:
+  ///   - transports: The transports the server should listen on.
+  ///   - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
+  ///   - interceptors: A collection of interceptors providing cross-cutting functionality to each
+  ///       accepted RPC. The order in which interceptors are added reflects the order in which they
+  ///       are called. The first interceptor added will be the first interceptor to intercept each
+  ///       request. The last interceptor added will be the final interceptor to intercept each
+  ///       request before calling the appropriate handler.
+  public init(
+    transports: [any ServerTransport],
+    router: RPCRouter,
+    interceptors: [any ServerInterceptor] = []
+  ) {
+    self.state = ManagedAtomic(.notStarted)
+    self.transports = transports
+    self.router = router
+    self.interceptors = interceptors
   }
 
   /// Starts the server and runs until all registered transports have closed.
@@ -159,20 +162,19 @@ public final class GRPCServer: Sendable {
   ///
   /// To stop the server more abruptly you can cancel the task that this function is running in.
   ///
-  /// You must register all resources you wish to use with the server before calling this function
-  /// as changes made after calling ``run()`` won't be reflected.
-  ///
   /// - Note: You can only call this function once, repeated calls will result in a
   ///   ``ServerError`` being thrown.
-  /// - Important: You must register at least one transport by calling
-  ///   ``Transports-swift.struct/add(_:)`` before calling this method.
   public func run() async throws {
-    let (transports, router, interceptors) = try self.storage.withLockedValue { storage in
-      switch storage.state {
+    let (wasNotStarted, actualState) = self.state.compareExchange(
+      expected: .notStarted,
+      desired: .starting,
+      ordering: .sequentiallyConsistent
+    )
+
+    guard wasNotStarted else {
+      switch actualState {
       case .notStarted:
-        storage.state = .starting
-        return (storage.transports, storage.services.router, storage.interceptors)
-
+        fatalError()
       case .starting, .running:
         throw ServerError(
           code: .serverIsAlreadyRunning,
@@ -189,31 +191,31 @@ public final class GRPCServer: Sendable {
 
     // When we exit this function we must have stopped.
     defer {
-      self.storage.withLockedValue { $0.state = .stopped }
+      self.state.store(.stopped, ordering: .sequentiallyConsistent)
     }
 
-    if transports.values.isEmpty {
+    if self.transports.isEmpty {
       throw ServerError(
         code: .noTransportsConfigured,
         message: """
           Can't start server, no transports are configured. You must add at least one transport \
-          to the server using 'transports.add(_:)' before calling 'run()'.
+          to the server before calling 'run()'.
           """
       )
     }
 
     var listeners: [RPCAsyncSequence<Stream>] = []
-    listeners.reserveCapacity(transports.values.count)
+    listeners.reserveCapacity(self.transports.count)
 
-    for transport in transports.values {
+    for transport in self.transports {
       do {
         let listener = try await transport.listen()
         listeners.append(listener)
       } catch let cause {
         // Failed to start, so start stopping.
-        self.storage.withLockedValue { $0.state = .stopping }
+        self.state.store(.stopping, ordering: .sequentiallyConsistent)
         // Some listeners may have started and have streams which need closing.
-        await Self.rejectRequests(listeners, transports: transports)
+        await self.rejectRequests(listeners)
 
         throw ServerError(
           code: .failedToStartTransport,
@@ -227,35 +229,24 @@ public final class GRPCServer: Sendable {
     }
 
     // May have been told to stop listening while starting the transports.
-    let isStopping = self.storage.withLockedValue { storage in
-      switch storage.state {
-      case .notStarted, .running, .stopped:
-        fatalError("Invalid state")
-
-      case .starting:
-        storage.state = .running
-        return false
-
-      case .stopping:
-        return true
-      }
-    }
+    let (wasStarting, _) = self.state.compareExchange(
+      expected: .starting,
+      desired: .running,
+      ordering: .sequentiallyConsistent
+    )
 
     // If the server is stopping then notify the transport and then consume them: there may be
     // streams opened at a lower level (e.g. HTTP/2) which are already open and need to be consumed.
-    if isStopping {
-      await Self.rejectRequests(listeners, transports: transports)
+    if wasStarting {
+      await self.handleRequests(listeners)
     } else {
-      await Self.handleRequests(listeners, router: router, interceptors: interceptors)
+      await self.rejectRequests(listeners)
     }
   }
 
-  private static func rejectRequests(
-    _ listeners: [RPCAsyncSequence<Stream>],
-    transports: Transports
-  ) async {
+  private func rejectRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
     // Tell the active listeners to stop listening.
-    for transport in transports.values.prefix(listeners.count) {
+    for transport in self.transports.prefix(listeners.count) {
       transport.stopListening()
     }
 
@@ -282,33 +273,21 @@ public final class GRPCServer: Sendable {
     }
   }
 
-  private static func handleRequests(
-    _ listeners: [RPCAsyncSequence<Stream>],
-    router: RPCRouter,
-    interceptors: Interceptors
-  ) async {
+  private func handleRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
     #if swift(>=5.9)
     if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
-      await Self.handleRequestsInDiscardingTaskGroup(
-        listeners,
-        router: router,
-        interceptors: interceptors
-      )
+      await self.handleRequestsInDiscardingTaskGroup(listeners)
     } else {
-      await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors)
+      await self.handleRequestsInTaskGroup(listeners)
     }
     #else
-    await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors)
+    await self.handleRequestsInTaskGroup(listeners)
     #endif
   }
 
   #if swift(>=5.9)
   @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
-  private static func handleRequestsInDiscardingTaskGroup(
-    _ listeners: [RPCAsyncSequence<Stream>],
-    router: RPCRouter,
-    interceptors: Interceptors
-  ) async {
+  private func handleRequestsInDiscardingTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
     await withDiscardingTaskGroup { group in
       for listener in listeners {
         group.addTask {
@@ -316,7 +295,7 @@ public final class GRPCServer: Sendable {
             do {
               for try await stream in listener {
                 subGroup.addTask {
-                  await router.handle(stream: stream, interceptors: interceptors.values)
+                  await self.router.handle(stream: stream, interceptors: self.interceptors)
                 }
               }
             } catch {
@@ -330,11 +309,7 @@ public final class GRPCServer: Sendable {
   }
   #endif
 
-  private static func handleRequestsInTaskGroup(
-    _ listeners: [RPCAsyncSequence<Stream>],
-    router: RPCRouter,
-    interceptors: Interceptors
-  ) async {
+  private func handleRequestsInTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
     // If the discarding task group isn't available then fall back to using a regular task group
     // with a limit on subtasks. Most servers will use an HTTP/2 based transport, most
     // implementations limit connections to 100 concurrent streams. A limit of 4096 gives the server
@@ -355,7 +330,7 @@ public final class GRPCServer: Sendable {
                 }
 
                 subGroup.addTask {
-                  await router.handle(stream: stream, interceptors: interceptors.values)
+                  await self.router.handle(stream: stream, interceptors: self.interceptors)
                 }
               }
             } catch {
@@ -375,105 +350,50 @@ public final class GRPCServer: Sendable {
   ///
   /// Calling this on a server which is already stopping or has stopped has no effect.
   public func stopListening() {
-    let transports = self.storage.withLockedValue { storage in
-      let transports: Transports?
-
-      switch storage.state {
-      case .notStarted:
-        storage.state = .stopped
-        transports = nil
-      case .starting:
-        storage.state = .stopping
-        transports = nil
-      case .running:
-        storage.state = .stopping
-        transports = storage.transports
-      case .stopping:
-        transports = nil
-      case .stopped:
-        transports = nil
-      }
-
-      return transports
-    }
-
-    if let transports = transports?.values {
-      for transport in transports {
+    let (wasRunning, actual) = self.state.compareExchange(
+      expected: .running,
+      desired: .stopping,
+      ordering: .sequentiallyConsistent
+    )
+
+    if wasRunning {
+      for transport in self.transports {
         transport.stopListening()
       }
-    }
-  }
-}
-
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-extension GRPCServer {
-  /// The transports which provide a bidirectional communication channel with clients.
-  ///
-  /// You can add a new transport by calling ``add(_:)``.
-  public struct Transports: Sendable {
-    private(set) var values: [any (ServerTransport & Sendable)] = []
-
-    /// Add a transport to the server.
-    ///
-    /// - Parameter transport: The transport to add.
-    public mutating func add(_ transport: some (ServerTransport & Sendable)) {
-      self.values.append(transport)
-    }
-  }
+    } else {
+      switch actual {
+      case .notStarted:
+        let (exchanged, _) = self.state.compareExchange(
+          expected: .notStarted,
+          desired: .stopped,
+          ordering: .sequentiallyConsistent
+        )
 
-  /// The services registered with this server.
-  ///
-  /// You can register services by calling ``register(_:)`` or by manually adding handlers for
-  /// methods to the ``router``.
-  public struct Services: Sendable {
-    /// The router storing handlers for known methods.
-    public var router = RPCRouter()
-
-    /// Registers service methods with the ``router``.
-    ///
-    /// - Parameter service: The service to register with the ``router``.
-    public mutating func register(_ service: some RegistrableRPCService) {
-      service.registerMethods(with: &self.router)
-    }
-  }
+        // Lost a race with 'run()', try again.
+        if !exchanged {
+          self.stopListening()
+        }
 
-  /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
-  public struct Interceptors: Sendable {
-    private(set) var values: [any ServerInterceptor] = []
-
-    /// Add an interceptor to the server.
-    ///
-    /// The order in which interceptors are added reflects the order in which they are called. The
-    /// first interceptor added will be the first interceptor to intercept each request. The last
-    /// interceptor added will be the final interceptor to intercept each request before calling
-    /// the appropriate handler.
-    ///
-    /// - Parameter interceptor: The interceptor to add.
-    public mutating func add(_ interceptor: some ServerInterceptor) {
-      self.values.append(interceptor)
-    }
-  }
-}
+      case .starting:
+        let (exchanged, _) = self.state.compareExchange(
+          expected: .starting,
+          desired: .stopping,
+          ordering: .sequentiallyConsistent
+        )
 
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-extension GRPCServer.Transports: CustomStringConvertible {
-  public var description: String {
-    return String(describing: self.values)
-  }
-}
+        // Lost a race with 'run()', try again.
+        if !exchanged {
+          self.stopListening()
+        }
 
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-extension GRPCServer.Services: CustomStringConvertible {
-  public var description: String {
-    // List the fully qualified all methods ordered by service and then method
-    let rpcs = self.router.methods.map { $0.fullyQualifiedMethod }.sorted()
-    return String(describing: rpcs)
-  }
-}
+      case .running:
+        // Unreachable, this branch only happens when the initial exchange didn't take place.
+        fatalError()
 
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
-extension GRPCServer.Interceptors: CustomStringConvertible {
-  public var description: String {
-    return String(describing: self.values.map { String(describing: type(of: $0)) })
+      case .stopping, .stopped:
+        // Already stopping/stopped, ignore.
+        ()
+      }
+    }
   }
 }

+ 1 - 1
Sources/GRPCCore/Transport/ServerTransport.swift

@@ -15,7 +15,7 @@
  */
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public protocol ServerTransport {
+public protocol ServerTransport: Sendable {
   typealias Inbound = RPCAsyncSequence<RPCRequestPart>
   typealias Outbound = RPCWriter<RPCResponsePart>.Closable
 

+ 3 - 43
Tests/GRPCCoreTests/GRPCClientTests.swift

@@ -36,16 +36,8 @@ final class GRPCClientTests: XCTestCase {
     _ body: (GRPCClient, GRPCServer) async throws -> Void
   ) async throws {
     let inProcess = self.makeInProcessPair()
-    var configuration = GRPCClient.Configuration()
-    configuration.interceptors.add(contentsOf: interceptors)
-    let client = GRPCClient(transport: inProcess.client, configuration: configuration)
-
-    let server = GRPCServer()
-    server.transports.add(inProcess.server)
-
-    for service in services {
-      server.services.register(service)
-    }
+    let client = GRPCClient(transport: inProcess.client, interceptors: interceptors)
+    let server = GRPCServer(transports: [inProcess.server], services: services)
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
@@ -338,9 +330,7 @@ final class GRPCClientTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        let server = GRPCServer()
-        server.services.register(BinaryEcho())
-        server.transports.add(inProcess.server)
+        let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()])
         try await server.run()
       }
 
@@ -416,34 +406,4 @@ final class GRPCClientTests: XCTestCase {
 
     task.cancel()
   }
-
-  func testRunClientNotRunning() async throws {
-    let (clientTransport, _) = self.makeInProcessPair()
-    let client = GRPCClient(transport: clientTransport)
-
-    // Client is not running, should throw an error.
-    await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
-      try await client.unary(
-        request: .init(message: [3, 1, 4, 1, 5]),
-        descriptor: BinaryEcho.Methods.collect,
-        serializer: IdentitySerializer(),
-        deserializer: IdentityDeserializer()
-      ) { response in
-        let message = try response.message
-        XCTAssertEqual(message, [3, 1, 4, 1, 5])
-      }
-    } errorHandler: { error in
-      XCTAssertEqual(error.code, .clientIsNotRunning)
-    }
-  }
-
-  func testInterceptorsDescription() async throws {
-    var config = GRPCClient.Configuration()
-    config.interceptors.add(.rejectAll(with: .init(code: .aborted, message: "")))
-    config.interceptors.add(.requestCounter(.init(0)))
-
-    let description = String(describing: config.interceptors)
-    let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"#
-    XCTAssertEqual(description, expected)
-  }
 }

+ 16 - 60
Tests/GRPCCoreTests/GRPCServerTests.swift

@@ -33,16 +33,11 @@ final class GRPCServerTests: XCTestCase {
     _ body: (InProcessClientTransport, GRPCServer) async throws -> Void
   ) async throws {
     let inProcess = self.makeInProcessPair()
-    let server = GRPCServer()
-    server.transports.add(inProcess.server)
-
-    for service in services {
-      server.services.register(service)
-    }
-
-    for interceptor in interceptors {
-      server.interceptors.add(interceptor)
-    }
+    let server = GRPCServer(
+      transports: [inProcess.server],
+      services: services,
+      interceptors: interceptors
+    )
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
@@ -305,9 +300,7 @@ final class GRPCServerTests: XCTestCase {
   func testCancelRunningServer() async throws {
     let inProcess = self.makeInProcessPair()
     let task = Task {
-      let server = GRPCServer()
-      server.services.register(BinaryEcho())
-      server.transports.add(inProcess.server)
+      let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()])
       try await server.run()
     }
 
@@ -326,7 +319,7 @@ final class GRPCServerTests: XCTestCase {
   }
 
   func testTestRunServerWithNoTransport() async throws {
-    let server = GRPCServer()
+    let server = GRPCServer(transports: [], services: [])
     await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
       try await server.run()
     } errorHandler: { error in
@@ -335,8 +328,7 @@ final class GRPCServerTests: XCTestCase {
   }
 
   func testTestRunStoppedServer() async throws {
-    let server = GRPCServer()
-    server.transports.add(InProcessServerTransport())
+    let server = GRPCServer(transports: [InProcessServerTransport()], services: [])
     // Run the server.
     let task = Task { try await server.run() }
     task.cancel()
@@ -351,8 +343,7 @@ final class GRPCServerTests: XCTestCase {
   }
 
   func testRunServerWhenTransportThrows() async throws {
-    let server = GRPCServer()
-    server.transports.add(ThrowOnRunServerTransport())
+    let server = GRPCServer(transports: [ThrowOnRunServerTransport()], services: [])
     await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
       try await server.run()
     } errorHandler: { error in
@@ -361,15 +352,17 @@ final class GRPCServerTests: XCTestCase {
   }
 
   func testRunServerDrainsRunningTransportsWhenOneFailsToStart() async throws {
-    let server = GRPCServer()
-
     // Register the in process transport first and allow it to come up.
     let inProcess = self.makeInProcessPair()
-    server.transports.add(inProcess.server)
-
     // Register a transport waits for a signal before throwing.
     let signal = AsyncStream.makeStream(of: Void.self)
-    server.transports.add(ThrowOnSignalServerTransport(signal: signal.stream))
+    let server = GRPCServer(
+      transports: [
+        inProcess.server,
+        ThrowOnSignalServerTransport(signal: signal.stream),
+      ],
+      services: []
+    )
 
     // Connect the in process client and start an RPC. When the stream is opened signal the
     // other transport to throw. This stream should be failed by the server.
@@ -402,43 +395,6 @@ final class GRPCServerTests: XCTestCase {
     }
   }
 
-  func testInterceptorsDescription() async throws {
-    let server = GRPCServer()
-    server.interceptors.add(.rejectAll(with: .init(code: .aborted, message: "")))
-    server.interceptors.add(.requestCounter(.init(0)))
-    let description = String(describing: server.interceptors)
-    let expected = #"["RejectAllServerInterceptor", "RequestCountingServerInterceptor"]"#
-    XCTAssertEqual(description, expected)
-  }
-
-  func testServicesDescription() async throws {
-    let server = GRPCServer()
-    let methods: [(String, String)] = [
-      ("helloworld.Greeter", "SayHello"),
-      ("echo.Echo", "Foo"),
-      ("echo.Echo", "Bar"),
-      ("echo.Echo", "Baz"),
-    ]
-
-    for (service, method) in methods {
-      let descriptor = MethodDescriptor(service: service, method: method)
-      server.services.router.registerHandler(
-        forMethod: descriptor,
-        deserializer: IdentityDeserializer(),
-        serializer: IdentitySerializer()
-      ) { _ in
-        fatalError("Unreachable")
-      }
-    }
-
-    let description = String(describing: server.services)
-    let expected = """
-      ["echo.Echo/Bar", "echo.Echo/Baz", "echo.Echo/Foo", "helloworld.Greeter/SayHello"]
-      """
-
-    XCTAssertEqual(description, expected)
-  }
-
   private func doEchoGet(using transport: some ClientTransport) async throws {
     try await transport.withStream(descriptor: BinaryEcho.Methods.get) { stream in
       try await stream.outbound.write(.metadata([:]))