GRPCServer.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. /// A gRPC server.
  18. ///
  19. /// The server accepts connections from clients and listens on each connection for new streams
  20. /// which are initiated by the client. Each stream maps to a single RPC. The server routes accepted
  21. /// streams to a service to handle the RPC or rejects them with an appropriate error if no service
  22. /// can handle the RPC.
  23. ///
  24. /// A ``Server`` may listen with multiple transports (for example, HTTP/2 and in-process) and route
  25. /// requests from each transport to the same service instance. You can also use "interceptors",
  26. /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
  27. /// include request filtering, authentication, and logging. Once requests have been intercepted
  28. /// they are passed to a handler which in turn returns a response to send back to the client.
  29. ///
  30. /// ## Creating and configuring a server
  31. ///
  32. /// The following example demonstrates how to create and configure a server.
  33. ///
  34. /// ```swift
  35. /// let server = GRPCServer()
  36. ///
  37. /// // Create and add an in-process transport.
  38. /// let inProcessTransport = InProcessServerTransport()
  39. /// server.transports.add(inProcessTransport)
  40. ///
  41. /// // Create and register the 'Greeter' and 'Echo' services.
  42. /// server.services.register(GreeterService())
  43. /// server.services.register(EchoService())
  44. ///
  45. /// // Create and add some interceptors.
  46. /// server.interceptors.add(StatsRecordingServerInterceptors())
  47. /// ```
  48. ///
  49. /// ## Starting and stopping the server
  50. ///
  51. /// Once you have configured the server call ``run()`` to start it. Calling ``run()`` starts each
  52. /// of the server's transports. A ``ServerError`` is thrown if any of the transports can't be
  53. /// started.
  54. ///
  55. /// ```swift
  56. /// // Start running the server.
  57. /// try await server.run()
  58. /// ```
  59. ///
  60. /// The ``run()`` method won't return until the server has finished handling all requests. You can
  61. /// signal to the server that it should stop accepting new requests by calling ``stopListening()``.
  62. /// This allows the server to drain existing requests gracefully. To stop the server more abruptly
  63. /// you can cancel the task running your server. If your application requires additional resources
  64. /// that need their lifecycles managed you should consider using [Swift Service
  65. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  66. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  67. public final class GRPCServer: Sendable {
  68. typealias Stream = RPCStream<ServerTransport.Inbound, ServerTransport.Outbound>
  69. /// A collection of ``ServerTransport`` implementations that the server uses to listen
  70. /// for new requests.
  71. public var transports: Transports {
  72. get {
  73. self.storage.withLockedValue { $0.transports }
  74. }
  75. set {
  76. self.storage.withLockedValue { $0.transports = newValue }
  77. }
  78. }
  79. /// The services registered which the server is serving.
  80. public var services: Services {
  81. get {
  82. self.storage.withLockedValue { $0.services }
  83. }
  84. set {
  85. self.storage.withLockedValue { $0.services = newValue }
  86. }
  87. }
  88. /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted
  89. /// RPCs.
  90. ///
  91. /// RPCs are intercepted in the order that interceptors are added. That is, a request received
  92. /// from the client will first be intercepted by the first added interceptor followed by the
  93. /// second, and so on.
  94. public var interceptors: Interceptors {
  95. get {
  96. self.storage.withLockedValue { $0.interceptors }
  97. }
  98. set {
  99. self.storage.withLockedValue { $0.interceptors = newValue }
  100. }
  101. }
  102. /// Underlying storage for the server.
  103. private struct Storage {
  104. var transports: Transports
  105. var services: Services
  106. var interceptors: Interceptors
  107. var state: State
  108. init() {
  109. self.transports = Transports()
  110. self.services = Services()
  111. self.interceptors = Interceptors()
  112. self.state = .notStarted
  113. }
  114. }
  115. private let storage: LockedValueBox<Storage>
  116. /// The state of the server.
  117. private enum State {
  118. /// The server hasn't been started yet. Can transition to `starting` or `stopped`.
  119. case notStarted
  120. /// The server is starting but isn't accepting requests yet. Can transition to `running`
  121. /// and `stopping`.
  122. case starting
  123. /// The server is running and accepting RPCs. Can transition to `stopping`.
  124. case running
  125. /// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
  126. /// completion. May transition to `stopped`.
  127. case stopping
  128. /// The server has stopped, no RPCs are in flight and no more will be accepted. This state
  129. /// is terminal.
  130. case stopped
  131. }
  132. /// Creates a new server with no resources.
  133. ///
  134. /// You can add resources to the server via ``transports-swift.property``,
  135. /// ``services-swift.property``, and ``interceptors-swift.property`` and start the server by
  136. /// calling ``run()``. Any changes to resources after ``run()`` has been called will be ignored.
  137. public init() {
  138. self.storage = LockedValueBox(Storage())
  139. }
  140. /// Starts the server and runs until all registered transports have closed.
  141. ///
  142. /// No RPCs are processed until all transports are listening. If a transport fails to start
  143. /// listening then all open transports are closed and a ``ServerError`` is thrown.
  144. ///
  145. /// This function returns when all transports have stopped listening and all requests have been
  146. /// handled. You can signal to transports that they should stop listening by calling
  147. /// ``stopListening()``. The server will continue to process existing requests.
  148. ///
  149. /// To stop the server more abruptly you can cancel the task that this function is running in.
  150. ///
  151. /// You must register all resources you wish to use with the server before calling this function
  152. /// as changes made after calling ``run()`` won't be reflected.
  153. ///
  154. /// - Note: You can only call this function once, repeated calls will result in a
  155. /// ``ServerError`` being thrown.
  156. /// - Important: You must register at least one transport by calling
  157. /// ``Transports-swift.struct/add(_:)`` before calling this method.
  158. public func run() async throws {
  159. let (transports, router, interceptors) = try self.storage.withLockedValue { storage in
  160. switch storage.state {
  161. case .notStarted:
  162. storage.state = .starting
  163. return (storage.transports, storage.services.router, storage.interceptors)
  164. case .starting, .running:
  165. throw ServerError(
  166. code: .serverIsAlreadyRunning,
  167. message: "The server is already running and can only be started once."
  168. )
  169. case .stopping, .stopped:
  170. throw ServerError(
  171. code: .serverIsStopped,
  172. message: "The server has stopped and can only be started once."
  173. )
  174. }
  175. }
  176. // When we exit this function we must have stopped.
  177. defer {
  178. self.storage.withLockedValue { $0.state = .stopped }
  179. }
  180. if transports.values.isEmpty {
  181. throw ServerError(
  182. code: .noTransportsConfigured,
  183. message: """
  184. Can't start server, no transports are configured. You must add at least one transport \
  185. to the server using 'transports.add(_:)' before calling 'run()'.
  186. """
  187. )
  188. }
  189. var listeners: [RPCAsyncSequence<Stream>] = []
  190. listeners.reserveCapacity(transports.values.count)
  191. for transport in transports.values {
  192. do {
  193. let listener = try await transport.listen()
  194. listeners.append(listener)
  195. } catch let cause {
  196. // Failed to start, so start stopping.
  197. self.storage.withLockedValue { $0.state = .stopping }
  198. // Some listeners may have started and have streams which need closing.
  199. await Self.rejectRequests(listeners, transports: transports)
  200. throw ServerError(
  201. code: .failedToStartTransport,
  202. message: """
  203. Server didn't start because the '\(type(of: transport))' transport threw an error \
  204. while starting.
  205. """,
  206. cause: cause
  207. )
  208. }
  209. }
  210. // May have been told to stop listening while starting the transports.
  211. let isStopping = self.storage.withLockedValue { storage in
  212. switch storage.state {
  213. case .notStarted, .running, .stopped:
  214. fatalError("Invalid state")
  215. case .starting:
  216. storage.state = .running
  217. return false
  218. case .stopping:
  219. return true
  220. }
  221. }
  222. // If the server is stopping then notify the transport and then consume them: there may be
  223. // streams opened at a lower level (e.g. HTTP/2) which are already open and need to be consumed.
  224. if isStopping {
  225. await Self.rejectRequests(listeners, transports: transports)
  226. } else {
  227. await Self.handleRequests(listeners, router: router, interceptors: interceptors)
  228. }
  229. }
  230. private static func rejectRequests(
  231. _ listeners: [RPCAsyncSequence<Stream>],
  232. transports: Transports
  233. ) async {
  234. // Tell the active listeners to stop listening.
  235. for transport in transports.values.prefix(listeners.count) {
  236. transport.stopListening()
  237. }
  238. // Drain any open streams on active listeners.
  239. await withTaskGroup(of: Void.self) { group in
  240. let unavailable = Status(
  241. code: .unavailable,
  242. message: "The server isn't ready to accept requests."
  243. )
  244. for listener in listeners {
  245. do {
  246. for try await stream in listener {
  247. group.addTask {
  248. try? await stream.outbound.write(.status(unavailable, [:]))
  249. stream.outbound.finish()
  250. }
  251. }
  252. } catch {
  253. // Suppress any errors, the original error from the transport which failed to start
  254. // should be thrown.
  255. }
  256. }
  257. }
  258. }
  259. private static func handleRequests(
  260. _ listeners: [RPCAsyncSequence<Stream>],
  261. router: RPCRouter,
  262. interceptors: Interceptors
  263. ) async {
  264. #if swift(>=5.9)
  265. if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
  266. await Self.handleRequestsInDiscardingTaskGroup(
  267. listeners,
  268. router: router,
  269. interceptors: interceptors
  270. )
  271. } else {
  272. await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors)
  273. }
  274. #else
  275. await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors)
  276. #endif
  277. }
  278. #if swift(>=5.9)
  279. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  280. private static func handleRequestsInDiscardingTaskGroup(
  281. _ listeners: [RPCAsyncSequence<Stream>],
  282. router: RPCRouter,
  283. interceptors: Interceptors
  284. ) async {
  285. await withDiscardingTaskGroup { group in
  286. for listener in listeners {
  287. group.addTask {
  288. await withDiscardingTaskGroup { subGroup in
  289. do {
  290. for try await stream in listener {
  291. subGroup.addTask {
  292. await router.handle(stream: stream, interceptors: interceptors.values)
  293. }
  294. }
  295. } catch {
  296. // If the listener threw then the connection must be broken, cancel all work.
  297. subGroup.cancelAll()
  298. }
  299. }
  300. }
  301. }
  302. }
  303. }
  304. #endif
  305. private static func handleRequestsInTaskGroup(
  306. _ listeners: [RPCAsyncSequence<Stream>],
  307. router: RPCRouter,
  308. interceptors: Interceptors
  309. ) async {
  310. // If the discarding task group isn't available then fall back to using a regular task group
  311. // with a limit on subtasks. Most servers will use an HTTP/2 based transport, most
  312. // implementations limit connections to 100 concurrent streams. A limit of 4096 gives the server
  313. // scope to handle nearly 41 completely saturated connections.
  314. let maxConcurrentSubTasks = 4096
  315. let tasks = ManagedAtomic(0)
  316. await withTaskGroup(of: Void.self) { group in
  317. for listener in listeners {
  318. group.addTask {
  319. await withTaskGroup(of: Void.self) { subGroup in
  320. do {
  321. for try await stream in listener {
  322. let taskCount = tasks.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
  323. if taskCount >= maxConcurrentSubTasks {
  324. _ = await subGroup.next()
  325. tasks.wrappingDecrement(ordering: .sequentiallyConsistent)
  326. }
  327. subGroup.addTask {
  328. await router.handle(stream: stream, interceptors: interceptors.values)
  329. }
  330. }
  331. } catch {
  332. // If the listener threw then the connection must be broken, cancel all work.
  333. subGroup.cancelAll()
  334. }
  335. }
  336. }
  337. }
  338. }
  339. }
  340. /// Signal to the server that it should stop listening for new requests.
  341. ///
  342. /// By calling this function you indicate to clients that they mustn't start new requests
  343. /// against this server. Once the server has processed all requests the ``run()`` method returns.
  344. ///
  345. /// Calling this on a server which is already stopping or has stopped has no effect.
  346. public func stopListening() {
  347. let transports = self.storage.withLockedValue { storage in
  348. let transports: Transports?
  349. switch storage.state {
  350. case .notStarted:
  351. storage.state = .stopped
  352. transports = nil
  353. case .starting:
  354. storage.state = .stopping
  355. transports = nil
  356. case .running:
  357. storage.state = .stopping
  358. transports = storage.transports
  359. case .stopping:
  360. transports = nil
  361. case .stopped:
  362. transports = nil
  363. }
  364. return transports
  365. }
  366. if let transports = transports?.values {
  367. for transport in transports {
  368. transport.stopListening()
  369. }
  370. }
  371. }
  372. }
  373. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  374. extension GRPCServer {
  375. /// The transports which provide a bidirectional communication channel with clients.
  376. ///
  377. /// You can add a new transport by calling ``add(_:)``.
  378. public struct Transports: Sendable {
  379. private(set) var values: [any (ServerTransport & Sendable)] = []
  380. /// Add a transport to the server.
  381. ///
  382. /// - Parameter transport: The transport to add.
  383. public mutating func add(_ transport: some (ServerTransport & Sendable)) {
  384. self.values.append(transport)
  385. }
  386. }
  387. /// The services registered with this server.
  388. ///
  389. /// You can register services by calling ``register(_:)`` or by manually adding handlers for
  390. /// methods to the ``router``.
  391. public struct Services: Sendable {
  392. /// The router storing handlers for known methods.
  393. public var router = RPCRouter()
  394. /// Registers service methods with the ``router``.
  395. ///
  396. /// - Parameter service: The service to register with the ``router``.
  397. public mutating func register(_ service: some RegistrableRPCService) {
  398. service.registerMethods(with: &self.router)
  399. }
  400. }
  401. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
  402. public struct Interceptors: Sendable {
  403. private(set) var values: [any ServerInterceptor] = []
  404. /// Add an interceptor to the server.
  405. ///
  406. /// The order in which interceptors are added reflects the order in which they are called. The
  407. /// first interceptor added will be the first interceptor to intercept each request. The last
  408. /// interceptor added will be the final interceptor to intercept each request before calling
  409. /// the appropriate handler.
  410. ///
  411. /// - Parameter interceptor: The interceptor to add.
  412. public mutating func add(_ interceptor: some ServerInterceptor) {
  413. self.values.append(interceptor)
  414. }
  415. }
  416. }
  417. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  418. extension GRPCServer.Transports: CustomStringConvertible {
  419. public var description: String {
  420. return String(describing: self.values)
  421. }
  422. }
  423. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  424. extension GRPCServer.Services: CustomStringConvertible {
  425. public var description: String {
  426. // List the fully qualified all methods ordered by service and then method
  427. let rpcs = self.router.methods.map { $0.fullyQualifiedMethod }.sorted()
  428. return String(describing: rpcs)
  429. }
  430. }
  431. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  432. extension GRPCServer.Interceptors: CustomStringConvertible {
  433. public var description: String {
  434. return String(describing: self.values.map { String(describing: type(of: $0)) })
  435. }
  436. }