/* * Copyright 2023, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ private import Synchronization /// A gRPC server. /// /// The server accepts connections from clients and listens on each connection for new streams /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted /// streams to a service to handle the RPC or rejects them with an appropriate error if no service /// can handle the RPC. /// /// A ``GRPCServer`` listens with a specific transport implementation (for example, HTTP/2 or in-process), /// and routes requests from the transport to the service instance. You can also use "interceptors", /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors /// include request filtering, authentication, and logging. Once requests have been intercepted /// they are passed to a handler which in turn returns a response to send back to the client. /// /// ## Creating and configuring a server /// /// The following example demonstrates how to create and configure a server. /// /// ```swift /// // Create and an in-process transport. /// let inProcessTransport = InProcessTransport() /// /// // Create the 'Greeter' and 'Echo' services. /// let greeter = GreeterService() /// let echo = EchoService() /// /// // Create an interceptor. /// let statsRecorder = StatsRecordingServerInterceptors() /// /// // Finally create the server. /// let server = GRPCServer( /// transport: inProcessTransport.server, /// services: [greeter, echo], /// interceptors: [statsRecorder] /// ) /// ``` /// /// ## Starting and stopping the server /// /// Once you have configured the server call ``serve()`` to start it. Calling ``serve()`` starts the server's /// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other /// runtime error. /// /// ```swift /// // Start running the server. /// try await server.serve() /// ``` /// /// The ``serve()`` method won't return until the server has finished handling all requests. You can /// signal to the server that it should stop accepting new requests by calling ``beginGracefulShutdown()``. /// This allows the server to drain existing requests gracefully. To stop the server more abruptly /// you can cancel the task running your server. If your application requires additional resources /// that need their lifecycles managed you should consider using [Swift Service /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). public final class GRPCServer: Sendable { typealias Stream = RPCStream /// The ``ServerTransport`` implementation that the server uses to listen for new requests. public let transport: any ServerTransport /// The services registered which the server is serving. private let router: RPCRouter /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted /// RPCs. /// /// RPCs are intercepted in the order that interceptors are added. That is, a request received /// from the client will first be intercepted by the first added interceptor followed by the /// second, and so on. private let interceptors: [any ServerInterceptor] /// The state of the server. private let state: Mutex private enum State: Sendable { /// The server hasn't been started yet. Can transition to `running` or `stopped`. case notStarted /// The server is running and accepting RPCs. Can transition to `stopping`. case running /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to /// completion. May transition to `stopped`. case stopping /// The server has stopped, no RPCs are in flight and no more will be accepted. This state /// is terminal. case stopped mutating func startServing() throws { switch self { case .notStarted: self = .running case .running: throw RuntimeError( code: .serverIsAlreadyRunning, message: "The server is already running and can only be started once." ) case .stopping, .stopped: throw RuntimeError( code: .serverIsStopped, message: "The server has stopped and can only be started once." ) } } mutating func beginGracefulShutdown() -> Bool { switch self { case .notStarted: self = .stopped return false case .running: self = .stopping return true case .stopping, .stopped: // Already stopping/stopped, ignore. return false } } mutating func stopped() { self = .stopped } } /// Creates a new server with no resources. /// /// - Parameters: /// - transport: The transport the server should listen on. /// - services: Services offered by the server. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each /// accepted RPC. The order in which interceptors are added reflects the order in which they /// are called. The first interceptor added will be the first interceptor to intercept each /// request. The last interceptor added will be the final interceptor to intercept each /// request before calling the appropriate handler. public convenience init( transport: any ServerTransport, services: [any RegistrableRPCService], interceptors: [any ServerInterceptor] = [] ) { var router = RPCRouter() for service in services { service.registerMethods(with: &router) } self.init(transport: transport, router: router, interceptors: interceptors) } /// Creates a new server with no resources. /// /// - Parameters: /// - transport: The transport the server should listen on. /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each /// accepted RPC. The order in which interceptors are added reflects the order in which they /// are called. The first interceptor added will be the first interceptor to intercept each /// request. The last interceptor added will be the final interceptor to intercept each /// request before calling the appropriate handler. public init( transport: any ServerTransport, router: RPCRouter, interceptors: [any ServerInterceptor] = [] ) { self.state = Mutex(.notStarted) self.transport = transport self.router = router self.interceptors = interceptors } /// Starts the server and runs until the registered transport has closed. /// /// No RPCs are processed until the configured transport is listening. If the transport fails to start /// listening, or if it encounters a runtime error, then ``RuntimeError`` is thrown. /// /// This function returns when the configured transport has stopped listening and all requests have been /// handled. You can signal to the transport that it should stop listening by calling /// ``beginGracefulShutdown()``. The server will continue to process existing requests. /// /// To stop the server more abruptly you can cancel the task that this function is running in. /// /// - Note: You can only call this function once, repeated calls will result in a /// ``RuntimeError`` being thrown. public func serve() async throws { try self.state.withLock { try $0.startServing() } // When we exit this function the server must have stopped. defer { self.state.withLock { $0.stopped() } } do { try await transport.listen { stream, context in await self.router.handle(stream: stream, context: context, interceptors: self.interceptors) } } catch { throw RuntimeError( code: .transportError, message: "Server transport threw an error.", cause: error ) } } /// Signal to the server that it should stop listening for new requests. /// /// By calling this function you indicate to clients that they mustn't start new requests /// against this server. Once the server has processed all requests the ``serve()`` method returns. /// /// Calling this on a server which is already stopping or has stopped has no effect. public func beginGracefulShutdown() { let wasRunning = self.state.withLock { $0.beginGracefulShutdown() } if wasRunning { self.transport.beginGracefulShutdown() } } }