GRPCServer.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. /// A gRPC server.
  18. ///
  19. /// The server accepts connections from clients and listens on each connection for new streams
  20. /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted
  21. /// streams to a service to handle the RPC or rejects them with an appropriate error if no service
  22. /// can handle the RPC.
  23. ///
  24. /// A ``GRPCServer`` may listen with multiple transports (for example, HTTP/2 and in-process) and route
  25. /// requests from each transport to the same service instance. You can also use "interceptors",
  26. /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
  27. /// include request filtering, authentication, and logging. Once requests have been intercepted
  28. /// they are passed to a handler which in turn returns a response to send back to the client.
  29. ///
  30. /// ## Creating and configuring a server
  31. ///
  32. /// The following example demonstrates how to create and configure a server.
  33. ///
  34. /// ```swift
  35. /// // Create and an in-process transport.
  36. /// let inProcessTransport = InProcessServerTransport()
  37. ///
  38. /// // Create the 'Greeter' and 'Echo' services.
  39. /// let greeter = GreeterService()
  40. /// let echo = EchoService()
  41. ///
  42. /// // Create an interceptor.
  43. /// let statsRecorder = StatsRecordingServerInterceptors()
  44. ///
  45. /// // Finally create the server.
  46. /// let server = GRPCServer(
  47. /// transports: [inProcessTransport],
  48. /// services: [greeter, echo],
  49. /// interceptors: [statsRecorder]
  50. /// )
  51. /// ```
  52. ///
  53. /// ## Starting and stopping the server
  54. ///
  55. /// Once you have configured the server call ``run()`` to start it. Calling ``run()`` starts each
  56. /// of the server's transports. A ``RuntimeError`` is thrown if any of the transports can't be
  57. /// started.
  58. ///
  59. /// ```swift
  60. /// // Start running the server.
  61. /// try await server.run()
  62. /// ```
  63. ///
  64. /// The ``run()`` method won't return until the server has finished handling all requests. You can
  65. /// signal to the server that it should stop accepting new requests by calling ``stopListening()``.
  66. /// This allows the server to drain existing requests gracefully. To stop the server more abruptly
  67. /// you can cancel the task running your server. If your application requires additional resources
  68. /// that need their lifecycles managed you should consider using [Swift Service
  69. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  70. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  71. public struct GRPCServer: Sendable {
  72. typealias Stream = RPCStream<ServerTransport.Inbound, ServerTransport.Outbound>
  73. /// A collection of ``ServerTransport`` implementations that the server uses to listen
  74. /// for new requests.
  75. private let transports: [any ServerTransport]
  76. /// The services registered which the server is serving.
  77. private let router: RPCRouter
  78. /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted
  79. /// RPCs.
  80. ///
  81. /// RPCs are intercepted in the order that interceptors are added. That is, a request received
  82. /// from the client will first be intercepted by the first added interceptor followed by the
  83. /// second, and so on.
  84. private let interceptors: [any ServerInterceptor]
  85. /// The state of the server.
  86. private let state: ManagedAtomic<State>
  87. private enum State: UInt8, AtomicValue {
  88. /// The server hasn't been started yet. Can transition to `starting` or `stopped`.
  89. case notStarted
  90. /// The server is starting but isn't accepting requests yet. Can transition to `running`
  91. /// and `stopping`.
  92. case starting
  93. /// The server is running and accepting RPCs. Can transition to `stopping`.
  94. case running
  95. /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
  96. /// completion. May transition to `stopped`.
  97. case stopping
  98. /// The server has stopped, no RPCs are in flight and no more will be accepted. This state
  99. /// is terminal.
  100. case stopped
  101. }
  102. /// Creates a new server with no resources.
  103. ///
  104. /// - Parameters:
  105. /// - transports: The transports the server should listen on.
  106. /// - services: Services offered by the server.
  107. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  108. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  109. /// are called. The first interceptor added will be the first interceptor to intercept each
  110. /// request. The last interceptor added will be the final interceptor to intercept each
  111. /// request before calling the appropriate handler.
  112. public init(
  113. transports: [any ServerTransport],
  114. services: [any RegistrableRPCService],
  115. interceptors: [any ServerInterceptor] = []
  116. ) {
  117. var router = RPCRouter()
  118. for service in services {
  119. service.registerMethods(with: &router)
  120. }
  121. self.init(transports: transports, router: router, interceptors: interceptors)
  122. }
  123. /// Creates a new server with no resources.
  124. ///
  125. /// - Parameters:
  126. /// - transports: The transports the server should listen on.
  127. /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
  128. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  129. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  130. /// are called. The first interceptor added will be the first interceptor to intercept each
  131. /// request. The last interceptor added will be the final interceptor to intercept each
  132. /// request before calling the appropriate handler.
  133. public init(
  134. transports: [any ServerTransport],
  135. router: RPCRouter,
  136. interceptors: [any ServerInterceptor] = []
  137. ) {
  138. self.state = ManagedAtomic(.notStarted)
  139. self.transports = transports
  140. self.router = router
  141. self.interceptors = interceptors
  142. }
  143. /// Starts the server and runs until all registered transports have closed.
  144. ///
  145. /// No RPCs are processed until all transports are listening. If a transport fails to start
  146. /// listening then all open transports are closed and a ``RuntimeError`` is thrown.
  147. ///
  148. /// This function returns when all transports have stopped listening and all requests have been
  149. /// handled. You can signal to transports that they should stop listening by calling
  150. /// ``stopListening()``. The server will continue to process existing requests.
  151. ///
  152. /// To stop the server more abruptly you can cancel the task that this function is running in.
  153. ///
  154. /// - Note: You can only call this function once, repeated calls will result in a
  155. /// ``RuntimeError`` being thrown.
  156. public func run() async throws {
  157. let (wasNotStarted, actualState) = self.state.compareExchange(
  158. expected: .notStarted,
  159. desired: .starting,
  160. ordering: .sequentiallyConsistent
  161. )
  162. guard wasNotStarted else {
  163. switch actualState {
  164. case .notStarted:
  165. fatalError()
  166. case .starting, .running:
  167. throw RuntimeError(
  168. code: .serverIsAlreadyRunning,
  169. message: "The server is already running and can only be started once."
  170. )
  171. case .stopping, .stopped:
  172. throw RuntimeError(
  173. code: .serverIsStopped,
  174. message: "The server has stopped and can only be started once."
  175. )
  176. }
  177. }
  178. // When we exit this function we must have stopped.
  179. defer {
  180. self.state.store(.stopped, ordering: .sequentiallyConsistent)
  181. }
  182. if self.transports.isEmpty {
  183. throw RuntimeError(
  184. code: .noTransportsConfigured,
  185. message: """
  186. Can't start server, no transports are configured. You must add at least one transport \
  187. to the server before calling 'run()'.
  188. """
  189. )
  190. }
  191. var listeners: [RPCAsyncSequence<Stream>] = []
  192. listeners.reserveCapacity(self.transports.count)
  193. for transport in self.transports {
  194. do {
  195. let listener = try await transport.listen()
  196. listeners.append(listener)
  197. } catch let cause {
  198. // Failed to start, so start stopping.
  199. self.state.store(.stopping, ordering: .sequentiallyConsistent)
  200. // Some listeners may have started and have streams which need closing.
  201. await self.rejectRequests(listeners)
  202. throw RuntimeError(
  203. code: .failedToStartTransport,
  204. message: """
  205. Server didn't start because the '\(type(of: transport))' transport threw an error \
  206. while starting.
  207. """,
  208. cause: cause
  209. )
  210. }
  211. }
  212. // May have been told to stop listening while starting the transports.
  213. let (wasStarting, _) = self.state.compareExchange(
  214. expected: .starting,
  215. desired: .running,
  216. ordering: .sequentiallyConsistent
  217. )
  218. // If the server is stopping then notify the transport and then consume them: there may be
  219. // streams opened at a lower level (e.g. HTTP/2) which are already open and need to be consumed.
  220. if wasStarting {
  221. await self.handleRequests(listeners)
  222. } else {
  223. await self.rejectRequests(listeners)
  224. }
  225. }
  226. private func rejectRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
  227. // Tell the active listeners to stop listening.
  228. for transport in self.transports.prefix(listeners.count) {
  229. transport.stopListening()
  230. }
  231. // Drain any open streams on active listeners.
  232. await withTaskGroup(of: Void.self) { group in
  233. let unavailable = Status(
  234. code: .unavailable,
  235. message: "The server isn't ready to accept requests."
  236. )
  237. for listener in listeners {
  238. do {
  239. for try await stream in listener {
  240. group.addTask {
  241. try? await stream.outbound.write(.status(unavailable, [:]))
  242. stream.outbound.finish()
  243. }
  244. }
  245. } catch {
  246. // Suppress any errors, the original error from the transport which failed to start
  247. // should be thrown.
  248. }
  249. }
  250. }
  251. }
  252. private func handleRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
  253. #if swift(>=5.9)
  254. if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
  255. await self.handleRequestsInDiscardingTaskGroup(listeners)
  256. } else {
  257. await self.handleRequestsInTaskGroup(listeners)
  258. }
  259. #else
  260. await self.handleRequestsInTaskGroup(listeners)
  261. #endif
  262. }
  263. #if swift(>=5.9)
  264. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  265. private func handleRequestsInDiscardingTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
  266. await withDiscardingTaskGroup { group in
  267. for listener in listeners {
  268. group.addTask {
  269. await withDiscardingTaskGroup { subGroup in
  270. do {
  271. for try await stream in listener {
  272. subGroup.addTask {
  273. await self.router.handle(stream: stream, interceptors: self.interceptors)
  274. }
  275. }
  276. } catch {
  277. // If the listener threw then the connection must be broken, cancel all work.
  278. subGroup.cancelAll()
  279. }
  280. }
  281. }
  282. }
  283. }
  284. }
  285. #endif
  286. private func handleRequestsInTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
  287. // If the discarding task group isn't available then fall back to using a regular task group
  288. // with a limit on subtasks. Most servers will use an HTTP/2 based transport, most
  289. // implementations limit connections to 100 concurrent streams. A limit of 4096 gives the server
  290. // scope to handle nearly 41 completely saturated connections.
  291. let maxConcurrentSubTasks = 4096
  292. let tasks = ManagedAtomic(0)
  293. await withTaskGroup(of: Void.self) { group in
  294. for listener in listeners {
  295. group.addTask {
  296. await withTaskGroup(of: Void.self) { subGroup in
  297. do {
  298. for try await stream in listener {
  299. let taskCount = tasks.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
  300. if taskCount >= maxConcurrentSubTasks {
  301. _ = await subGroup.next()
  302. tasks.wrappingDecrement(ordering: .sequentiallyConsistent)
  303. }
  304. subGroup.addTask {
  305. await self.router.handle(stream: stream, interceptors: self.interceptors)
  306. }
  307. }
  308. } catch {
  309. // If the listener threw then the connection must be broken, cancel all work.
  310. subGroup.cancelAll()
  311. }
  312. }
  313. }
  314. }
  315. }
  316. }
  317. /// Signal to the server that it should stop listening for new requests.
  318. ///
  319. /// By calling this function you indicate to clients that they mustn't start new requests
  320. /// against this server. Once the server has processed all requests the ``run()`` method returns.
  321. ///
  322. /// Calling this on a server which is already stopping or has stopped has no effect.
  323. public func stopListening() {
  324. let (wasRunning, actual) = self.state.compareExchange(
  325. expected: .running,
  326. desired: .stopping,
  327. ordering: .sequentiallyConsistent
  328. )
  329. if wasRunning {
  330. for transport in self.transports {
  331. transport.stopListening()
  332. }
  333. } else {
  334. switch actual {
  335. case .notStarted:
  336. let (exchanged, _) = self.state.compareExchange(
  337. expected: .notStarted,
  338. desired: .stopped,
  339. ordering: .sequentiallyConsistent
  340. )
  341. // Lost a race with 'run()', try again.
  342. if !exchanged {
  343. self.stopListening()
  344. }
  345. case .starting:
  346. let (exchanged, _) = self.state.compareExchange(
  347. expected: .starting,
  348. desired: .stopping,
  349. ordering: .sequentiallyConsistent
  350. )
  351. // Lost a race with 'run()', try again.
  352. if !exchanged {
  353. self.stopListening()
  354. }
  355. case .running:
  356. // Unreachable, this branch only happens when the initial exchange didn't take place.
  357. fatalError()
  358. case .stopping, .stopped:
  359. // Already stopping/stopped, ignore.
  360. ()
  361. }
  362. }
  363. }
  364. }