| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- /*
- * Copyright 2023, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- private import Synchronization
- /// A gRPC server.
- ///
- /// The server accepts connections from clients and listens on each connection for new streams
- /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted
- /// streams to a service to handle the RPC or rejects them with an appropriate error if no service
- /// can handle the RPC.
- ///
- /// A ``GRPCServer`` listens with a specific transport implementation (for example, HTTP/2 or in-process),
- /// and routes requests from the transport to the service instance. You can also use "interceptors",
- /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
- /// include request filtering, authentication, and logging. Once requests have been intercepted
- /// they are passed to a handler which in turn returns a response to send back to the client.
- ///
- /// ## Configuring and starting a server
- ///
- /// The following example demonstrates how to create and run a server.
- ///
- /// ```swift
- /// // Create a transport
- /// let transport = SomeServerTransport()
- ///
- /// // Create the 'Greeter' and 'Echo' services.
- /// let greeter = GreeterService()
- /// let echo = EchoService()
- ///
- /// // Create an interceptor.
- /// let statsRecorder = StatsRecordingServerInterceptors()
- ///
- /// // Run the server.
- /// try await withGRPCServer(
- /// transport: transport,
- /// services: [greeter, echo],
- /// interceptors: [statsRecorder]
- /// ) { server in
- /// // ...
- /// // The server begins shutting down when this closure returns
- /// // ...
- /// }
- /// ```
- ///
- /// ## Creating a client manually
- ///
- /// If the `with`-style methods for creating a server isn't suitable for your application then you
- /// can create and run it manually. This requires you to call the ``serve()`` method in a task
- /// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is
- /// thrown if the transport can't be started or encounters some other runtime error.
- ///
- /// ```swift
- /// // Start running the server.
- /// try await server.serve()
- /// ```
- ///
- /// The ``serve()`` method won't return until the server has finished handling all requests. You can
- /// signal to the server that it should stop accepting new requests by calling ``beginGracefulShutdown()``.
- /// This allows the server to drain existing requests gracefully. To stop the server more abruptly
- /// you can cancel the task running your server. If your application requires additional resources
- /// that need their lifecycles managed you should consider using [Swift Service
- /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle) and the
- /// `GRPCServiceLifecycle` module provided by [gRPC Swift Extras](https://github.com/grpc/grpc-swift-extras).
- @available(gRPCSwift 2.0, *)
- @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
- public final class GRPCServer<Transport: ServerTransport>: Sendable {
- typealias Stream = RPCStream<Transport.Inbound, Transport.Outbound>
- /// The ``ServerTransport`` implementation that the server uses to listen for new requests.
- public let transport: Transport
- /// The services registered which the server is serving.
- private let router: RPCRouter<Transport>
- /// The state of the server.
- private let state: Mutex<State>
- private enum State: Sendable {
- /// The server hasn't been started yet. Can transition to `running` or `stopped`.
- case notStarted
- /// The server is running and accepting RPCs. Can transition to `stopping`.
- case running
- /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
- /// completion. May transition to `stopped`.
- case stopping
- /// The server has stopped, no RPCs are in flight and no more will be accepted. This state
- /// is terminal.
- case stopped
- mutating func startServing() throws {
- switch self {
- case .notStarted:
- self = .running
- case .running:
- throw RuntimeError(
- code: .serverIsAlreadyRunning,
- message: "The server is already running and can only be started once."
- )
- case .stopping, .stopped:
- throw RuntimeError(
- code: .serverIsStopped,
- message: "The server has stopped and can only be started once."
- )
- }
- }
- mutating func beginGracefulShutdown() -> Bool {
- switch self {
- case .notStarted:
- self = .stopped
- return false
- case .running:
- self = .stopping
- return true
- case .stopping, .stopped:
- // Already stopping/stopped, ignore.
- return false
- }
- }
- mutating func stopped() {
- self = .stopped
- }
- }
- /// Creates a new server.
- ///
- /// - Parameters:
- /// - transport: The transport the server should listen on.
- /// - services: Services offered by the server.
- /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
- /// accepted RPC. The order in which interceptors are added reflects the order in which they
- /// are called. The first interceptor added will be the first interceptor to intercept each
- /// request. The last interceptor added will be the final interceptor to intercept each
- /// request before calling the appropriate handler.
- public convenience init(
- transport: Transport,
- services: [any RegistrableRPCService],
- interceptors: [any ServerInterceptor] = []
- ) {
- self.init(
- transport: transport,
- services: services,
- interceptorPipeline: interceptors.map { .apply($0, to: .all) }
- )
- }
- /// Creates a new server.
- ///
- /// - Parameters:
- /// - transport: The transport the server should listen on.
- /// - services: Services offered by the server.
- /// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
- /// accepted RPC. The order in which interceptors are added reflects the order in which they
- /// are called. The first interceptor added will be the first interceptor to intercept each
- /// request. The last interceptor added will be the final interceptor to intercept each
- /// request before calling the appropriate handler.
- public convenience init(
- transport: Transport,
- services: [any RegistrableRPCService],
- interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>]
- ) {
- var router = RPCRouter<Transport>()
- for service in services {
- service.registerMethods(with: &router)
- }
- router.registerInterceptors(pipeline: interceptorPipeline)
- self.init(transport: transport, router: router)
- }
- /// Creates a new server with a pre-configured router.
- ///
- /// - Parameters:
- /// - transport: The transport the server should listen on.
- /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
- public init(transport: Transport, router: RPCRouter<Transport>) {
- self.state = Mutex(.notStarted)
- self.transport = transport
- self.router = router
- }
- /// Starts the server and runs until the registered transport has closed.
- ///
- /// No RPCs are processed until the configured transport is listening. If the transport fails to start
- /// listening, or if it encounters a runtime error, then ``RuntimeError`` is thrown.
- ///
- /// This function returns when the configured transport has stopped listening and all requests have been
- /// handled. You can signal to the transport that it should stop listening by calling
- /// ``beginGracefulShutdown()``. The server will continue to process existing requests.
- ///
- /// To stop the server more abruptly you can cancel the task that this function is running in.
- ///
- /// - Note: You can only call this function once, repeated calls will result in a
- /// ``RuntimeError`` being thrown.
- public func serve() async throws {
- try self.state.withLock { try $0.startServing() }
- // When we exit this function the server must have stopped.
- defer {
- self.state.withLock { $0.stopped() }
- }
- do {
- try await transport.listen { stream, context in
- await self.router.handle(stream: stream, context: context)
- }
- } catch {
- throw RuntimeError(
- code: .transportError,
- message: "Server transport threw an error.",
- cause: error
- )
- }
- }
- /// Signal to the server that it should stop listening for new requests.
- ///
- /// By calling this function you indicate to clients that they mustn't start new requests
- /// against this server. Once the server has processed all requests the ``serve()`` method returns.
- ///
- /// Calling this on a server which is already stopping or has stopped has no effect.
- public func beginGracefulShutdown() {
- let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
- if wasRunning {
- self.transport.beginGracefulShutdown()
- }
- }
- }
- /// Creates and runs a gRPC server.
- ///
- /// - Parameters:
- /// - transport: The transport the server should listen on.
- /// - services: Services offered by the server.
- /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
- /// accepted RPC. The order in which interceptors are added reflects the order in which they
- /// are called. The first interceptor added will be the first interceptor to intercept each
- /// request. The last interceptor added will be the final interceptor to intercept each
- /// request before calling the appropriate handler.
- /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
- /// code is nonisolated.
- /// - handleServer: A closure which is called with the server. When the closure returns, the
- /// server is shutdown gracefully.
- /// - Returns: The result of the `handleServer` closure.
- @available(gRPCSwift 2.0, *)
- @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
- public func withGRPCServer<Transport: ServerTransport, Result: Sendable>(
- transport: Transport,
- services: [any RegistrableRPCService],
- interceptors: [any ServerInterceptor] = [],
- isolation: isolated (any Actor)? = #isolation,
- handleServer: (GRPCServer<Transport>) async throws -> Result
- ) async throws -> Result {
- try await withGRPCServer(
- transport: transport,
- services: services,
- interceptorPipeline: interceptors.map { .apply($0, to: .all) },
- isolation: isolation,
- handleServer: handleServer
- )
- }
- /// Creates and runs a gRPC server.
- ///
- /// - Parameters:
- /// - transport: The transport the server should listen on.
- /// - services: Services offered by the server.
- /// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
- /// accepted RPC. The order in which interceptors are added reflects the order in which they
- /// are called. The first interceptor added will be the first interceptor to intercept each
- /// request. The last interceptor added will be the final interceptor to intercept each
- /// request before calling the appropriate handler.
- /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
- /// code is nonisolated.
- /// - handleServer: A closure which is called with the server. When the closure returns, the
- /// server is shutdown gracefully.
- /// - Returns: The result of the `handleServer` closure.
- @available(gRPCSwift 2.0, *)
- @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
- public func withGRPCServer<Transport: ServerTransport, Result: Sendable>(
- transport: Transport,
- services: [any RegistrableRPCService],
- interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>],
- isolation: isolated (any Actor)? = #isolation,
- handleServer: (GRPCServer<Transport>) async throws -> Result
- ) async throws -> Result {
- return try await withThrowingDiscardingTaskGroup { group in
- let server = GRPCServer(
- transport: transport,
- services: services,
- interceptorPipeline: interceptorPipeline
- )
- group.addTask {
- try await server.serve()
- }
- let result = try await handleServer(server)
- server.beginGracefulShutdown()
- return result
- }
- }
|