GRPCServer.swift 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. /// A gRPC server.
  18. ///
  19. /// The server accepts connections from clients and listens on each connection for new streams
  20. /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted
  21. /// streams to a service to handle the RPC or rejects them with an appropriate error if no service
  22. /// can handle the RPC.
  23. ///
  24. /// A ``GRPCServer`` listens with a specific transport implementation (for example, HTTP/2 or in-process),
  25. /// and routes requests from the transport to the service instance. You can also use "interceptors",
  26. /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
  27. /// include request filtering, authentication, and logging. Once requests have been intercepted
  28. /// they are passed to a handler which in turn returns a response to send back to the client.
  29. ///
  30. /// ## Creating and configuring a server
  31. ///
  32. /// The following example demonstrates how to create and configure a server.
  33. ///
  34. /// ```swift
  35. /// // Create and an in-process transport.
  36. /// let inProcessTransport = InProcessServerTransport()
  37. ///
  38. /// // Create the 'Greeter' and 'Echo' services.
  39. /// let greeter = GreeterService()
  40. /// let echo = EchoService()
  41. ///
  42. /// // Create an interceptor.
  43. /// let statsRecorder = StatsRecordingServerInterceptors()
  44. ///
  45. /// // Finally create the server.
  46. /// let server = GRPCServer(
  47. /// transport: inProcessTransport,
  48. /// services: [greeter, echo],
  49. /// interceptors: [statsRecorder]
  50. /// )
  51. /// ```
  52. ///
  53. /// ## Starting and stopping the server
  54. ///
  55. /// Once you have configured the server call ``run()`` to start it. Calling ``run()`` starts the server's
  56. /// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other
  57. /// runtime error.
  58. ///
  59. /// ```swift
  60. /// // Start running the server.
  61. /// try await server.run()
  62. /// ```
  63. ///
  64. /// The ``run()`` method won't return until the server has finished handling all requests. You can
  65. /// signal to the server that it should stop accepting new requests by calling ``stopListening()``.
  66. /// This allows the server to drain existing requests gracefully. To stop the server more abruptly
  67. /// you can cancel the task running your server. If your application requires additional resources
  68. /// that need their lifecycles managed you should consider using [Swift Service
  69. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  70. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  71. public struct GRPCServer: Sendable {
  72. typealias Stream = RPCStream<ServerTransport.Inbound, ServerTransport.Outbound>
  73. /// The ``ServerTransport`` implementation that the server uses to listen for new requests.
  74. private let transport: any ServerTransport
  75. /// The services registered which the server is serving.
  76. private let router: RPCRouter
  77. /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted
  78. /// RPCs.
  79. ///
  80. /// RPCs are intercepted in the order that interceptors are added. That is, a request received
  81. /// from the client will first be intercepted by the first added interceptor followed by the
  82. /// second, and so on.
  83. private let interceptors: [any ServerInterceptor]
  84. /// The state of the server.
  85. private let state: ManagedAtomic<State>
  86. private enum State: UInt8, AtomicValue {
  87. /// The server hasn't been started yet. Can transition to `running` or `stopped`.
  88. case notStarted
  89. /// The server is running and accepting RPCs. Can transition to `stopping`.
  90. case running
  91. /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
  92. /// completion. May transition to `stopped`.
  93. case stopping
  94. /// The server has stopped, no RPCs are in flight and no more will be accepted. This state
  95. /// is terminal.
  96. case stopped
  97. }
  98. /// Creates a new server with no resources.
  99. ///
  100. /// - Parameters:
  101. /// - transport: The transport the server should listen on.
  102. /// - services: Services offered by the server.
  103. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  104. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  105. /// are called. The first interceptor added will be the first interceptor to intercept each
  106. /// request. The last interceptor added will be the final interceptor to intercept each
  107. /// request before calling the appropriate handler.
  108. public init(
  109. transport: any ServerTransport,
  110. services: [any RegistrableRPCService],
  111. interceptors: [any ServerInterceptor] = []
  112. ) {
  113. var router = RPCRouter()
  114. for service in services {
  115. service.registerMethods(with: &router)
  116. }
  117. self.init(transport: transport, router: router, interceptors: interceptors)
  118. }
  119. /// Creates a new server with no resources.
  120. ///
  121. /// - Parameters:
  122. /// - transport: The transport the server should listen on.
  123. /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
  124. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  125. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  126. /// are called. The first interceptor added will be the first interceptor to intercept each
  127. /// request. The last interceptor added will be the final interceptor to intercept each
  128. /// request before calling the appropriate handler.
  129. public init(
  130. transport: any ServerTransport,
  131. router: RPCRouter,
  132. interceptors: [any ServerInterceptor] = []
  133. ) {
  134. self.state = ManagedAtomic(.notStarted)
  135. self.transport = transport
  136. self.router = router
  137. self.interceptors = interceptors
  138. }
  139. /// Starts the server and runs until the registered transport has closed.
  140. ///
  141. /// No RPCs are processed until the configured transport is listening. If the transport fails to start
  142. /// listening, or if it encounters a runtime error, then ``RuntimeError`` is thrown.
  143. ///
  144. /// This function returns when the configured transport has stopped listening and all requests have been
  145. /// handled. You can signal to the transport that it should stop listening by calling
  146. /// ``stopListening()``. The server will continue to process existing requests.
  147. ///
  148. /// To stop the server more abruptly you can cancel the task that this function is running in.
  149. ///
  150. /// - Note: You can only call this function once, repeated calls will result in a
  151. /// ``RuntimeError`` being thrown.
  152. public func run() async throws {
  153. let (wasNotStarted, actualState) = self.state.compareExchange(
  154. expected: .notStarted,
  155. desired: .running,
  156. ordering: .sequentiallyConsistent
  157. )
  158. guard wasNotStarted else {
  159. switch actualState {
  160. case .notStarted:
  161. fatalError()
  162. case .running:
  163. throw RuntimeError(
  164. code: .serverIsAlreadyRunning,
  165. message: "The server is already running and can only be started once."
  166. )
  167. case .stopping, .stopped:
  168. throw RuntimeError(
  169. code: .serverIsStopped,
  170. message: "The server has stopped and can only be started once."
  171. )
  172. }
  173. }
  174. // When we exit this function we must have stopped.
  175. defer {
  176. self.state.store(.stopped, ordering: .sequentiallyConsistent)
  177. }
  178. do {
  179. try await transport.listen { stream in
  180. await self.router.handle(stream: stream, interceptors: self.interceptors)
  181. }
  182. } catch {
  183. throw RuntimeError(
  184. code: .transportError,
  185. message: "Server transport threw an error.",
  186. cause: error
  187. )
  188. }
  189. }
  190. /// Signal to the server that it should stop listening for new requests.
  191. ///
  192. /// By calling this function you indicate to clients that they mustn't start new requests
  193. /// against this server. Once the server has processed all requests the ``run()`` method returns.
  194. ///
  195. /// Calling this on a server which is already stopping or has stopped has no effect.
  196. public func stopListening() {
  197. let (wasRunning, actual) = self.state.compareExchange(
  198. expected: .running,
  199. desired: .stopping,
  200. ordering: .sequentiallyConsistent
  201. )
  202. if wasRunning {
  203. self.transport.stopListening()
  204. } else {
  205. switch actual {
  206. case .notStarted:
  207. let (exchanged, _) = self.state.compareExchange(
  208. expected: .notStarted,
  209. desired: .stopped,
  210. ordering: .sequentiallyConsistent
  211. )
  212. // Lost a race with 'run()', try again.
  213. if !exchanged {
  214. self.stopListening()
  215. }
  216. case .running:
  217. // Unreachable, this branch only happens when the initial exchange didn't take place.
  218. fatalError()
  219. case .stopping, .stopped:
  220. // Already stopping/stopped, ignore.
  221. ()
  222. }
  223. }
  224. }
  225. }