GRPCServer.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. private import Synchronization
  17. /// A gRPC server.
  18. ///
  19. /// The server accepts connections from clients and listens on each connection for new streams
  20. /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted
  21. /// streams to a service to handle the RPC or rejects them with an appropriate error if no service
  22. /// can handle the RPC.
  23. ///
  24. /// A ``GRPCServer`` listens with a specific transport implementation (for example, HTTP/2 or in-process),
  25. /// and routes requests from the transport to the service instance. You can also use "interceptors",
  26. /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
  27. /// include request filtering, authentication, and logging. Once requests have been intercepted
  28. /// they are passed to a handler which in turn returns a response to send back to the client.
  29. ///
  30. /// ## Configuring and starting a server
  31. ///
  32. /// The following example demonstrates how to create and run a server.
  33. ///
  34. /// ```swift
  35. /// // Create a transport
  36. /// let transport = SomeServerTransport()
  37. ///
  38. /// // Create the 'Greeter' and 'Echo' services.
  39. /// let greeter = GreeterService()
  40. /// let echo = EchoService()
  41. ///
  42. /// // Create an interceptor.
  43. /// let statsRecorder = StatsRecordingServerInterceptors()
  44. ///
  45. /// // Run the server.
  46. /// try await withGRPCServer(
  47. /// transport: transport,
  48. /// services: [greeter, echo],
  49. /// interceptors: [statsRecorder]
  50. /// ) { server in
  51. /// // ...
  52. /// // The server begins shutting down when this closure returns
  53. /// // ...
  54. /// }
  55. /// ```
  56. ///
  57. /// ## Creating a client manually
  58. ///
  59. /// If the `with`-style methods for creating a server isn't suitable for your application then you
  60. /// can create and run it manually. This requires you to call the ``serve()`` method in a task
  61. /// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is
  62. /// thrown if the transport can't be started or encounters some other runtime error.
  63. ///
  64. /// ```swift
  65. /// // Start running the server.
  66. /// try await server.serve()
  67. /// ```
  68. ///
  69. /// The ``serve()`` method won't return until the server has finished handling all requests. You can
  70. /// signal to the server that it should stop accepting new requests by calling ``beginGracefulShutdown()``.
  71. /// This allows the server to drain existing requests gracefully. To stop the server more abruptly
  72. /// you can cancel the task running your server. If your application requires additional resources
  73. /// that need their lifecycles managed you should consider using [Swift Service
  74. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle) and the
  75. /// `GRPCServiceLifecycle` module provided by [gRPC Swift Extras](https://github.com/grpc/grpc-swift-extras).
  76. public final class GRPCServer<Transport: ServerTransport>: Sendable {
  77. typealias Stream = RPCStream<Transport.Inbound, Transport.Outbound>
  78. /// The ``ServerTransport`` implementation that the server uses to listen for new requests.
  79. public let transport: Transport
  80. /// The services registered which the server is serving.
  81. private let router: RPCRouter<Transport>
  82. /// The state of the server.
  83. private let state: Mutex<State>
  84. private enum State: Sendable {
  85. /// The server hasn't been started yet. Can transition to `running` or `stopped`.
  86. case notStarted
  87. /// The server is running and accepting RPCs. Can transition to `stopping`.
  88. case running
  89. /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
  90. /// completion. May transition to `stopped`.
  91. case stopping
  92. /// The server has stopped, no RPCs are in flight and no more will be accepted. This state
  93. /// is terminal.
  94. case stopped
  95. mutating func startServing() throws {
  96. switch self {
  97. case .notStarted:
  98. self = .running
  99. case .running:
  100. throw RuntimeError(
  101. code: .serverIsAlreadyRunning,
  102. message: "The server is already running and can only be started once."
  103. )
  104. case .stopping, .stopped:
  105. throw RuntimeError(
  106. code: .serverIsStopped,
  107. message: "The server has stopped and can only be started once."
  108. )
  109. }
  110. }
  111. mutating func beginGracefulShutdown() -> Bool {
  112. switch self {
  113. case .notStarted:
  114. self = .stopped
  115. return false
  116. case .running:
  117. self = .stopping
  118. return true
  119. case .stopping, .stopped:
  120. // Already stopping/stopped, ignore.
  121. return false
  122. }
  123. }
  124. mutating func stopped() {
  125. self = .stopped
  126. }
  127. }
  128. /// Creates a new server.
  129. ///
  130. /// - Parameters:
  131. /// - transport: The transport the server should listen on.
  132. /// - services: Services offered by the server.
  133. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  134. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  135. /// are called. The first interceptor added will be the first interceptor to intercept each
  136. /// request. The last interceptor added will be the final interceptor to intercept each
  137. /// request before calling the appropriate handler.
  138. public convenience init(
  139. transport: Transport,
  140. services: [any RegistrableRPCService],
  141. interceptors: [any ServerInterceptor] = []
  142. ) {
  143. self.init(
  144. transport: transport,
  145. services: services,
  146. interceptorPipeline: interceptors.map { .apply($0, to: .all) }
  147. )
  148. }
  149. /// Creates a new server.
  150. ///
  151. /// - Parameters:
  152. /// - transport: The transport the server should listen on.
  153. /// - services: Services offered by the server.
  154. /// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
  155. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  156. /// are called. The first interceptor added will be the first interceptor to intercept each
  157. /// request. The last interceptor added will be the final interceptor to intercept each
  158. /// request before calling the appropriate handler.
  159. public convenience init(
  160. transport: Transport,
  161. services: [any RegistrableRPCService],
  162. interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>]
  163. ) {
  164. var router = RPCRouter<Transport>()
  165. for service in services {
  166. service.registerMethods(with: &router)
  167. }
  168. router.registerInterceptors(pipeline: interceptorPipeline)
  169. self.init(transport: transport, router: router)
  170. }
  171. /// Creates a new server with a pre-configured router.
  172. ///
  173. /// - Parameters:
  174. /// - transport: The transport the server should listen on.
  175. /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
  176. public init(transport: Transport, router: RPCRouter<Transport>) {
  177. self.state = Mutex(.notStarted)
  178. self.transport = transport
  179. self.router = router
  180. }
  181. /// Starts the server and runs until the registered transport has closed.
  182. ///
  183. /// No RPCs are processed until the configured transport is listening. If the transport fails to start
  184. /// listening, or if it encounters a runtime error, then ``RuntimeError`` is thrown.
  185. ///
  186. /// This function returns when the configured transport has stopped listening and all requests have been
  187. /// handled. You can signal to the transport that it should stop listening by calling
  188. /// ``beginGracefulShutdown()``. The server will continue to process existing requests.
  189. ///
  190. /// To stop the server more abruptly you can cancel the task that this function is running in.
  191. ///
  192. /// - Note: You can only call this function once, repeated calls will result in a
  193. /// ``RuntimeError`` being thrown.
  194. public func serve() async throws {
  195. try self.state.withLock { try $0.startServing() }
  196. // When we exit this function the server must have stopped.
  197. defer {
  198. self.state.withLock { $0.stopped() }
  199. }
  200. do {
  201. try await transport.listen { stream, context in
  202. await self.router.handle(stream: stream, context: context)
  203. }
  204. } catch {
  205. throw RuntimeError(
  206. code: .transportError,
  207. message: "Server transport threw an error.",
  208. cause: error
  209. )
  210. }
  211. }
  212. /// Signal to the server that it should stop listening for new requests.
  213. ///
  214. /// By calling this function you indicate to clients that they mustn't start new requests
  215. /// against this server. Once the server has processed all requests the ``serve()`` method returns.
  216. ///
  217. /// Calling this on a server which is already stopping or has stopped has no effect.
  218. public func beginGracefulShutdown() {
  219. let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
  220. if wasRunning {
  221. self.transport.beginGracefulShutdown()
  222. }
  223. }
  224. }
  225. /// Creates and runs a gRPC server.
  226. ///
  227. /// - Parameters:
  228. /// - transport: The transport the server should listen on.
  229. /// - services: Services offered by the server.
  230. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  231. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  232. /// are called. The first interceptor added will be the first interceptor to intercept each
  233. /// request. The last interceptor added will be the final interceptor to intercept each
  234. /// request before calling the appropriate handler.
  235. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  236. /// code is nonisolated.
  237. /// - handleServer: A closure which is called with the server. When the closure returns, the
  238. /// server is shutdown gracefully.
  239. /// - Returns: The result of the `handleServer` closure.
  240. public func withGRPCServer<Transport: ServerTransport, Result: Sendable>(
  241. transport: Transport,
  242. services: [any RegistrableRPCService],
  243. interceptors: [any ServerInterceptor] = [],
  244. isolation: isolated (any Actor)? = #isolation,
  245. handleServer: (GRPCServer<Transport>) async throws -> Result
  246. ) async throws -> Result {
  247. try await withGRPCServer(
  248. transport: transport,
  249. services: services,
  250. interceptorPipeline: interceptors.map { .apply($0, to: .all) },
  251. isolation: isolation,
  252. handleServer: handleServer
  253. )
  254. }
  255. /// Creates and runs a gRPC server.
  256. ///
  257. /// - Parameters:
  258. /// - transport: The transport the server should listen on.
  259. /// - services: Services offered by the server.
  260. /// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
  261. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  262. /// are called. The first interceptor added will be the first interceptor to intercept each
  263. /// request. The last interceptor added will be the final interceptor to intercept each
  264. /// request before calling the appropriate handler.
  265. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  266. /// code is nonisolated.
  267. /// - handleServer: A closure which is called with the server. When the closure returns, the
  268. /// server is shutdown gracefully.
  269. /// - Returns: The result of the `handleServer` closure.
  270. public func withGRPCServer<Transport: ServerTransport, Result: Sendable>(
  271. transport: Transport,
  272. services: [any RegistrableRPCService],
  273. interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>],
  274. isolation: isolated (any Actor)? = #isolation,
  275. handleServer: (GRPCServer<Transport>) async throws -> Result
  276. ) async throws -> Result {
  277. return try await withThrowingDiscardingTaskGroup { group in
  278. let server = GRPCServer(
  279. transport: transport,
  280. services: services,
  281. interceptorPipeline: interceptorPipeline
  282. )
  283. group.addTask {
  284. try await server.serve()
  285. }
  286. let result = try await handleServer(server)
  287. server.beginGracefulShutdown()
  288. return result
  289. }
  290. }