GRPCClient.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. private import Synchronization
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:options:onResponse:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// ## Creating a client
  30. ///
  31. /// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)``
  32. /// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and
  33. /// run the client providing scoped access to it via the `handleClient` closure. The client will
  34. /// begin gracefully shutting down when the closure returns.
  35. ///
  36. /// ```swift
  37. /// let transport: any ClientTransport = ...
  38. /// try await withGRPCClient(transport: transport) { client in
  39. /// // ...
  40. /// }
  41. /// ```
  42. ///
  43. /// ## Creating a client manually
  44. ///
  45. /// If the `with`-style methods for creating clients isn't suitable for your application then you
  46. /// can create and run a client manually. This requires you to call the ``runConnections()`` method in a task
  47. /// which instructs the client to start connecting to the server.
  48. ///
  49. /// The ``runConnections()`` method won't return until the client has finished handling all requests. You can
  50. /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
  51. /// This gives the client enough time to drain any requests already in flight. To stop the client
  52. /// more abruptly you can cancel the task running your client. If your application requires
  53. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  54. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  55. @available(gRPCSwift 2.0, *)
  56. public final class GRPCClient<Transport: ClientTransport>: Sendable {
  57. /// The transport which provides a bidirectional communication channel with the server.
  58. private let transport: Transport
  59. /// The current state of the client.
  60. private let stateMachine: Mutex<StateMachine>
  61. /// The state of the client.
  62. private enum State: Sendable {
  63. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  64. case notStarted
  65. /// The client is running and can send RPCs. Can transition to `stopping`.
  66. case running
  67. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  68. /// completion. May transition to `stopped`.
  69. case stopping
  70. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  71. /// is terminal.
  72. case stopped
  73. mutating func run() throws {
  74. switch self {
  75. case .notStarted:
  76. self = .running
  77. case .running:
  78. throw RuntimeError(
  79. code: .clientIsAlreadyRunning,
  80. message: "The client is already running and can only be started once."
  81. )
  82. case .stopping, .stopped:
  83. throw RuntimeError(
  84. code: .clientIsStopped,
  85. message: """
  86. Can't call 'runConnections()' as the client is stopped (or is stopping). \
  87. This can happen if the you call 'runConnections()' after shutting the \
  88. client down or if you used 'withGRPCClient' with an empty body.
  89. """
  90. )
  91. }
  92. }
  93. mutating func stopped() {
  94. self = .stopped
  95. }
  96. mutating func beginGracefulShutdown() -> Bool {
  97. switch self {
  98. case .notStarted:
  99. self = .stopped
  100. return false
  101. case .running:
  102. self = .stopping
  103. return true
  104. case .stopping, .stopped:
  105. return false
  106. }
  107. }
  108. func checkExecutable() throws {
  109. switch self {
  110. case .notStarted, .running:
  111. // Allow .notStarted as making a request can race with 'runConnections()'. Transports should tolerate
  112. // queuing the request if not yet started.
  113. ()
  114. case .stopping, .stopped:
  115. throw RuntimeError(
  116. code: .clientIsStopped,
  117. message: "Client has been stopped. Can't make any more RPCs."
  118. )
  119. }
  120. }
  121. }
  122. private struct StateMachine {
  123. var state: State
  124. private let interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  125. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
  126. ///
  127. /// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
  128. /// This caching is done to avoid having to compute the applicable interceptors for each request made.
  129. ///
  130. /// The order in which interceptors are added reflects the order in which they are called. The
  131. /// first interceptor added will be the first interceptor to intercept each request. The last
  132. /// interceptor added will be the final interceptor to intercept each request before calling
  133. /// the appropriate handler.
  134. var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
  135. init(interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]) {
  136. self.state = .notStarted
  137. self.interceptorPipeline = interceptorPipeline
  138. self.interceptorsPerMethod = [:]
  139. }
  140. mutating func checkExecutableAndGetApplicableInterceptors(
  141. for method: MethodDescriptor
  142. ) throws -> [any ClientInterceptor] {
  143. try self.state.checkExecutable()
  144. guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
  145. let applicableInterceptors = self.interceptorPipeline
  146. .filter { $0.applies(to: method) }
  147. .map { $0.interceptor }
  148. self.interceptorsPerMethod[method] = applicableInterceptors
  149. return applicableInterceptors
  150. }
  151. return applicableInterceptors
  152. }
  153. }
  154. /// Creates a new client with the given transport, interceptors and configuration.
  155. ///
  156. /// - Parameters:
  157. /// - transport: The transport used to establish a communication channel with a server.
  158. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  159. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  160. /// are called. The first interceptor added will be the first interceptor to intercept each
  161. /// request. The last interceptor added will be the final interceptor to intercept each
  162. /// request before calling the appropriate handler.
  163. convenience public init(
  164. transport: Transport,
  165. interceptors: [any ClientInterceptor] = []
  166. ) {
  167. self.init(
  168. transport: transport,
  169. interceptorPipeline: interceptors.map { .apply($0, to: .all) }
  170. )
  171. }
  172. /// Creates a new client with the given transport, interceptors and configuration.
  173. ///
  174. /// - Parameters:
  175. /// - transport: The transport used to establish a communication channel with a server.
  176. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  177. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  178. /// The order in which interceptors are added reflects the order in which they are called.
  179. /// The first interceptor added will be the first interceptor to intercept each request.
  180. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  181. public init(
  182. transport: Transport,
  183. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  184. ) {
  185. self.transport = transport
  186. self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
  187. }
  188. /// Start the client.
  189. ///
  190. /// This returns once ``beginGracefulShutdown()`` has been called and all in-flight RPCs have finished executing.
  191. /// If you need to abruptly stop all work you should cancel the task executing this method.
  192. ///
  193. /// The client, and by extension this function, can only be run once. If the client is already
  194. /// running or has already been closed then a ``RuntimeError`` is thrown.
  195. public func runConnections() async throws {
  196. try self.stateMachine.withLock { try $0.state.run() }
  197. // When this function exits the client must have stopped.
  198. defer {
  199. self.stateMachine.withLock { $0.state.stopped() }
  200. }
  201. do {
  202. try await self.transport.connect()
  203. } catch {
  204. throw RuntimeError(
  205. code: .transportError,
  206. message: "The transport threw an error while connected.",
  207. cause: error
  208. )
  209. }
  210. }
  211. /// Close the client.
  212. ///
  213. /// The transport will be closed: this means that it will be given enough time to wait for
  214. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  215. /// executing ``runConnections()`` if you want to abruptly stop in-flight RPCs.
  216. public func beginGracefulShutdown() {
  217. let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
  218. if wasRunning {
  219. self.transport.beginGracefulShutdown()
  220. }
  221. }
  222. /// Executes a unary RPC.
  223. ///
  224. /// - Parameters:
  225. /// - request: The unary request.
  226. /// - descriptor: The method descriptor for which to execute this request.
  227. /// - serializer: A request serializer.
  228. /// - deserializer: A response deserializer.
  229. /// - options: Call specific options.
  230. /// - handleResponse: A unary response handler.
  231. ///
  232. /// - Returns: The return value from the `handleResponse`.
  233. public func unary<Request, Response, ReturnValue: Sendable>(
  234. request: ClientRequest<Request>,
  235. descriptor: MethodDescriptor,
  236. serializer: some MessageSerializer<Request>,
  237. deserializer: some MessageDeserializer<Response>,
  238. options: CallOptions,
  239. onResponse handleResponse: @Sendable @escaping (
  240. _ response: ClientResponse<Response>
  241. ) async throws -> ReturnValue
  242. ) async throws -> ReturnValue {
  243. try await self.bidirectionalStreaming(
  244. request: StreamingClientRequest(single: request),
  245. descriptor: descriptor,
  246. serializer: serializer,
  247. deserializer: deserializer,
  248. options: options
  249. ) { stream in
  250. let singleResponse = await ClientResponse(stream: stream)
  251. return try await handleResponse(singleResponse)
  252. }
  253. }
  254. /// Start a client-streaming RPC.
  255. ///
  256. /// - Parameters:
  257. /// - request: The request stream.
  258. /// - descriptor: The method descriptor for which to execute this request.
  259. /// - serializer: A request serializer.
  260. /// - deserializer: A response deserializer.
  261. /// - options: Call specific options.
  262. /// - handleResponse: A unary response handler.
  263. ///
  264. /// - Returns: The return value from the `handleResponse`.
  265. public func clientStreaming<Request, Response, ReturnValue: Sendable>(
  266. request: StreamingClientRequest<Request>,
  267. descriptor: MethodDescriptor,
  268. serializer: some MessageSerializer<Request>,
  269. deserializer: some MessageDeserializer<Response>,
  270. options: CallOptions,
  271. onResponse handleResponse: @Sendable @escaping (
  272. _ response: ClientResponse<Response>
  273. ) async throws -> ReturnValue
  274. ) async throws -> ReturnValue {
  275. try await self.bidirectionalStreaming(
  276. request: request,
  277. descriptor: descriptor,
  278. serializer: serializer,
  279. deserializer: deserializer,
  280. options: options
  281. ) { stream in
  282. let singleResponse = await ClientResponse(stream: stream)
  283. return try await handleResponse(singleResponse)
  284. }
  285. }
  286. /// Start a server-streaming RPC.
  287. ///
  288. /// - Parameters:
  289. /// - request: The unary request.
  290. /// - descriptor: The method descriptor for which to execute this request.
  291. /// - serializer: A request serializer.
  292. /// - deserializer: A response deserializer.
  293. /// - options: Call specific options.
  294. /// - handleResponse: A response stream handler.
  295. ///
  296. /// - Returns: The return value from the `handleResponse`.
  297. public func serverStreaming<Request, Response, ReturnValue: Sendable>(
  298. request: ClientRequest<Request>,
  299. descriptor: MethodDescriptor,
  300. serializer: some MessageSerializer<Request>,
  301. deserializer: some MessageDeserializer<Response>,
  302. options: CallOptions,
  303. onResponse handleResponse: @Sendable @escaping (
  304. _ response: StreamingClientResponse<Response>
  305. ) async throws -> ReturnValue
  306. ) async throws -> ReturnValue {
  307. try await self.bidirectionalStreaming(
  308. request: StreamingClientRequest(single: request),
  309. descriptor: descriptor,
  310. serializer: serializer,
  311. deserializer: deserializer,
  312. options: options,
  313. onResponse: handleResponse
  314. )
  315. }
  316. /// Start a bidirectional streaming RPC.
  317. ///
  318. /// - Note: ``runConnections()`` must have been called and still executing, and ``beginGracefulShutdown()`` mustn't
  319. /// have been called.
  320. ///
  321. /// - Parameters:
  322. /// - request: The streaming request.
  323. /// - descriptor: The method descriptor for which to execute this request.
  324. /// - serializer: A request serializer.
  325. /// - deserializer: A response deserializer.
  326. /// - options: Call specific options.
  327. /// - handleResponse: A response stream handler.
  328. ///
  329. /// - Returns: The return value from the `handleResponse`.
  330. public func bidirectionalStreaming<Request, Response, ReturnValue: Sendable>(
  331. request: StreamingClientRequest<Request>,
  332. descriptor: MethodDescriptor,
  333. serializer: some MessageSerializer<Request>,
  334. deserializer: some MessageDeserializer<Response>,
  335. options: CallOptions,
  336. onResponse handleResponse: @Sendable @escaping (
  337. _ response: StreamingClientResponse<Response>
  338. ) async throws -> ReturnValue
  339. ) async throws -> ReturnValue {
  340. let applicableInterceptors = try self.stateMachine.withLock {
  341. try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
  342. }
  343. let methodConfig = self.transport.config(forMethod: descriptor)
  344. var options = options
  345. options.formUnion(with: methodConfig)
  346. return try await ClientRPCExecutor.execute(
  347. request: request,
  348. method: descriptor,
  349. options: options,
  350. serializer: serializer,
  351. deserializer: deserializer,
  352. transport: self.transport,
  353. interceptors: applicableInterceptors,
  354. handler: handleResponse
  355. )
  356. }
  357. }
  358. /// Creates and runs a new client with the given transport and interceptors.
  359. ///
  360. /// - Parameters:
  361. /// - transport: The transport used to establish a communication channel with a server.
  362. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  363. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  364. /// are called. The first interceptor added will be the first interceptor to intercept each
  365. /// request. The last interceptor added will be the final interceptor to intercept each
  366. /// request before calling the appropriate handler.
  367. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  368. /// code is nonisolated.
  369. /// - handleClient: A closure which is called with the client. When the closure returns, the
  370. /// client is shutdown gracefully.
  371. @available(gRPCSwift 2.0, *)
  372. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  373. transport: Transport,
  374. interceptors: [any ClientInterceptor] = [],
  375. isolation: isolated (any Actor)? = #isolation,
  376. handleClient: (GRPCClient<Transport>) async throws -> Result
  377. ) async throws -> Result {
  378. try await withGRPCClient(
  379. transport: transport,
  380. interceptorPipeline: interceptors.map { .apply($0, to: .all) },
  381. isolation: isolation,
  382. handleClient: handleClient
  383. )
  384. }
  385. /// Creates and runs a new client with the given transport and interceptors.
  386. ///
  387. /// - Parameters:
  388. /// - transport: The transport used to establish a communication channel with a server.
  389. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  390. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  391. /// The order in which interceptors are added reflects the order in which they are called.
  392. /// The first interceptor added will be the first interceptor to intercept each request.
  393. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  394. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  395. /// code is nonisolated.
  396. /// - handleClient: A closure which is called with the client. When the closure returns, the
  397. /// client is shutdown gracefully.
  398. /// - Returns: The result of the `handleClient` closure.
  399. @available(gRPCSwift 2.0, *)
  400. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  401. transport: Transport,
  402. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>],
  403. isolation: isolated (any Actor)? = #isolation,
  404. handleClient: (GRPCClient<Transport>) async throws -> Result
  405. ) async throws -> Result {
  406. try await withThrowingDiscardingTaskGroup { group in
  407. let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline)
  408. group.addTask {
  409. try await client.runConnections()
  410. }
  411. let result = try await handleClient(client)
  412. client.beginGracefulShutdown()
  413. return result
  414. }
  415. }