GRPCClient.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. private import Synchronization
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:options:onResponse:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// ## Creating a client
  30. ///
  31. /// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)``
  32. /// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and
  33. /// run the client providing scoped access to it via the `handleClient` closure. The client will
  34. /// begin gracefully shutting down when the closure returns.
  35. ///
  36. /// ```swift
  37. /// let transport: any ClientTransport = ...
  38. /// try await withGRPCClient(transport: transport) { client in
  39. /// // ...
  40. /// }
  41. /// ```
  42. ///
  43. /// ## Creating a client manually
  44. ///
  45. /// If the `with`-style methods for creating clients isn't suitable for your application then you
  46. /// can create and run a client manually. This requires you to call the ``runConnections()`` method in a task
  47. /// which instructs the client to start connecting to the server.
  48. ///
  49. /// The ``runConnections()`` method won't return until the client has finished handling all requests. You can
  50. /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
  51. /// This gives the client enough time to drain any requests already in flight. To stop the client
  52. /// more abruptly you can cancel the task running your client. If your application requires
  53. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  54. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  55. public final class GRPCClient<Transport: ClientTransport>: Sendable {
  56. /// The transport which provides a bidirectional communication channel with the server.
  57. private let transport: Transport
  58. /// The current state of the client.
  59. private let stateMachine: Mutex<StateMachine>
  60. /// The state of the client.
  61. private enum State: Sendable {
  62. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  63. case notStarted
  64. /// The client is running and can send RPCs. Can transition to `stopping`.
  65. case running
  66. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  67. /// completion. May transition to `stopped`.
  68. case stopping
  69. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  70. /// is terminal.
  71. case stopped
  72. mutating func run() throws {
  73. switch self {
  74. case .notStarted:
  75. self = .running
  76. case .running:
  77. throw RuntimeError(
  78. code: .clientIsAlreadyRunning,
  79. message: "The client is already running and can only be started once."
  80. )
  81. case .stopping, .stopped:
  82. throw RuntimeError(
  83. code: .clientIsStopped,
  84. message: "The client has stopped and can only be started once."
  85. )
  86. }
  87. }
  88. mutating func stopped() {
  89. self = .stopped
  90. }
  91. mutating func beginGracefulShutdown() -> Bool {
  92. switch self {
  93. case .notStarted:
  94. self = .stopped
  95. return false
  96. case .running:
  97. self = .stopping
  98. return true
  99. case .stopping, .stopped:
  100. return false
  101. }
  102. }
  103. func checkExecutable() throws {
  104. switch self {
  105. case .notStarted, .running:
  106. // Allow .notStarted as making a request can race with 'runConnections()'. Transports should tolerate
  107. // queuing the request if not yet started.
  108. ()
  109. case .stopping, .stopped:
  110. throw RuntimeError(
  111. code: .clientIsStopped,
  112. message: "Client has been stopped. Can't make any more RPCs."
  113. )
  114. }
  115. }
  116. }
  117. private struct StateMachine {
  118. var state: State
  119. private let interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  120. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
  121. ///
  122. /// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
  123. /// This caching is done to avoid having to compute the applicable interceptors for each request made.
  124. ///
  125. /// The order in which interceptors are added reflects the order in which they are called. The
  126. /// first interceptor added will be the first interceptor to intercept each request. The last
  127. /// interceptor added will be the final interceptor to intercept each request before calling
  128. /// the appropriate handler.
  129. var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
  130. init(interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]) {
  131. self.state = .notStarted
  132. self.interceptorPipeline = interceptorPipeline
  133. self.interceptorsPerMethod = [:]
  134. }
  135. mutating func checkExecutableAndGetApplicableInterceptors(
  136. for method: MethodDescriptor
  137. ) throws -> [any ClientInterceptor] {
  138. try self.state.checkExecutable()
  139. guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
  140. let applicableInterceptors = self.interceptorPipeline
  141. .filter { $0.applies(to: method) }
  142. .map { $0.interceptor }
  143. self.interceptorsPerMethod[method] = applicableInterceptors
  144. return applicableInterceptors
  145. }
  146. return applicableInterceptors
  147. }
  148. }
  149. /// Creates a new client with the given transport, interceptors and configuration.
  150. ///
  151. /// - Parameters:
  152. /// - transport: The transport used to establish a communication channel with a server.
  153. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  154. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  155. /// are called. The first interceptor added will be the first interceptor to intercept each
  156. /// request. The last interceptor added will be the final interceptor to intercept each
  157. /// request before calling the appropriate handler.
  158. convenience public init(
  159. transport: Transport,
  160. interceptors: [any ClientInterceptor] = []
  161. ) {
  162. self.init(
  163. transport: transport,
  164. interceptorPipeline: interceptors.map { .apply($0, to: .all) }
  165. )
  166. }
  167. /// Creates a new client with the given transport, interceptors and configuration.
  168. ///
  169. /// - Parameters:
  170. /// - transport: The transport used to establish a communication channel with a server.
  171. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  172. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  173. /// The order in which interceptors are added reflects the order in which they are called.
  174. /// The first interceptor added will be the first interceptor to intercept each request.
  175. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  176. public init(
  177. transport: Transport,
  178. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  179. ) {
  180. self.transport = transport
  181. self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
  182. }
  183. /// Start the client.
  184. ///
  185. /// This returns once ``beginGracefulShutdown()`` has been called and all in-flight RPCs have finished executing.
  186. /// If you need to abruptly stop all work you should cancel the task executing this method.
  187. ///
  188. /// The client, and by extension this function, can only be run once. If the client is already
  189. /// running or has already been closed then a ``RuntimeError`` is thrown.
  190. public func runConnections() async throws {
  191. try self.stateMachine.withLock { try $0.state.run() }
  192. // When this function exits the client must have stopped.
  193. defer {
  194. self.stateMachine.withLock { $0.state.stopped() }
  195. }
  196. do {
  197. try await self.transport.connect()
  198. } catch {
  199. throw RuntimeError(
  200. code: .transportError,
  201. message: "The transport threw an error while connected.",
  202. cause: error
  203. )
  204. }
  205. }
  206. @available(*, deprecated, renamed: "runConnections", message: "It'll be removed before v2.")
  207. public func run() async throws {
  208. try await self.runConnections()
  209. }
  210. /// Close the client.
  211. ///
  212. /// The transport will be closed: this means that it will be given enough time to wait for
  213. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  214. /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
  215. public func beginGracefulShutdown() {
  216. let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
  217. if wasRunning {
  218. self.transport.beginGracefulShutdown()
  219. }
  220. }
  221. /// Executes a unary RPC.
  222. ///
  223. /// - Parameters:
  224. /// - request: The unary request.
  225. /// - descriptor: The method descriptor for which to execute this request.
  226. /// - serializer: A request serializer.
  227. /// - deserializer: A response deserializer.
  228. /// - options: Call specific options.
  229. /// - handleResponse: A unary response handler.
  230. ///
  231. /// - Returns: The return value from the `handleResponse`.
  232. public func unary<Request, Response, ReturnValue: Sendable>(
  233. request: ClientRequest<Request>,
  234. descriptor: MethodDescriptor,
  235. serializer: some MessageSerializer<Request>,
  236. deserializer: some MessageDeserializer<Response>,
  237. options: CallOptions,
  238. onResponse handleResponse: @Sendable @escaping (
  239. _ response: ClientResponse<Response>
  240. ) async throws -> ReturnValue
  241. ) async throws -> ReturnValue {
  242. try await self.bidirectionalStreaming(
  243. request: StreamingClientRequest(single: request),
  244. descriptor: descriptor,
  245. serializer: serializer,
  246. deserializer: deserializer,
  247. options: options
  248. ) { stream in
  249. let singleResponse = await ClientResponse(stream: stream)
  250. return try await handleResponse(singleResponse)
  251. }
  252. }
  253. /// Start a client-streaming RPC.
  254. ///
  255. /// - Parameters:
  256. /// - request: The request stream.
  257. /// - descriptor: The method descriptor for which to execute this request.
  258. /// - serializer: A request serializer.
  259. /// - deserializer: A response deserializer.
  260. /// - options: Call specific options.
  261. /// - handleResponse: A unary response handler.
  262. ///
  263. /// - Returns: The return value from the `handleResponse`.
  264. public func clientStreaming<Request, Response, ReturnValue: Sendable>(
  265. request: StreamingClientRequest<Request>,
  266. descriptor: MethodDescriptor,
  267. serializer: some MessageSerializer<Request>,
  268. deserializer: some MessageDeserializer<Response>,
  269. options: CallOptions,
  270. onResponse handleResponse: @Sendable @escaping (
  271. _ response: ClientResponse<Response>
  272. ) async throws -> ReturnValue
  273. ) async throws -> ReturnValue {
  274. try await self.bidirectionalStreaming(
  275. request: request,
  276. descriptor: descriptor,
  277. serializer: serializer,
  278. deserializer: deserializer,
  279. options: options
  280. ) { stream in
  281. let singleResponse = await ClientResponse(stream: stream)
  282. return try await handleResponse(singleResponse)
  283. }
  284. }
  285. /// Start a server-streaming RPC.
  286. ///
  287. /// - Parameters:
  288. /// - request: The unary request.
  289. /// - descriptor: The method descriptor for which to execute this request.
  290. /// - serializer: A request serializer.
  291. /// - deserializer: A response deserializer.
  292. /// - options: Call specific options.
  293. /// - handleResponse: A response stream handler.
  294. ///
  295. /// - Returns: The return value from the `handleResponse`.
  296. public func serverStreaming<Request, Response, ReturnValue: Sendable>(
  297. request: ClientRequest<Request>,
  298. descriptor: MethodDescriptor,
  299. serializer: some MessageSerializer<Request>,
  300. deserializer: some MessageDeserializer<Response>,
  301. options: CallOptions,
  302. onResponse handleResponse: @Sendable @escaping (
  303. _ response: StreamingClientResponse<Response>
  304. ) async throws -> ReturnValue
  305. ) async throws -> ReturnValue {
  306. try await self.bidirectionalStreaming(
  307. request: StreamingClientRequest(single: request),
  308. descriptor: descriptor,
  309. serializer: serializer,
  310. deserializer: deserializer,
  311. options: options,
  312. onResponse: handleResponse
  313. )
  314. }
  315. /// Start a bidirectional streaming RPC.
  316. ///
  317. /// - Note: ``runConnections()`` must have been called and still executing, and ``beginGracefulShutdown()`` mustn't
  318. /// have been called.
  319. ///
  320. /// - Parameters:
  321. /// - request: The streaming request.
  322. /// - descriptor: The method descriptor for which to execute this request.
  323. /// - serializer: A request serializer.
  324. /// - deserializer: A response deserializer.
  325. /// - options: Call specific options.
  326. /// - handleResponse: A response stream handler.
  327. ///
  328. /// - Returns: The return value from the `handleResponse`.
  329. public func bidirectionalStreaming<Request, Response, ReturnValue: Sendable>(
  330. request: StreamingClientRequest<Request>,
  331. descriptor: MethodDescriptor,
  332. serializer: some MessageSerializer<Request>,
  333. deserializer: some MessageDeserializer<Response>,
  334. options: CallOptions,
  335. onResponse handleResponse: @Sendable @escaping (
  336. _ response: StreamingClientResponse<Response>
  337. ) async throws -> ReturnValue
  338. ) async throws -> ReturnValue {
  339. let applicableInterceptors = try self.stateMachine.withLock {
  340. try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
  341. }
  342. let methodConfig = self.transport.config(forMethod: descriptor)
  343. var options = options
  344. options.formUnion(with: methodConfig)
  345. return try await ClientRPCExecutor.execute(
  346. request: request,
  347. method: descriptor,
  348. options: options,
  349. serializer: serializer,
  350. deserializer: deserializer,
  351. transport: self.transport,
  352. interceptors: applicableInterceptors,
  353. handler: handleResponse
  354. )
  355. }
  356. }
  357. /// Creates and runs a new client with the given transport and interceptors.
  358. ///
  359. /// - Parameters:
  360. /// - transport: The transport used to establish a communication channel with a server.
  361. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  362. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  363. /// are called. The first interceptor added will be the first interceptor to intercept each
  364. /// request. The last interceptor added will be the final interceptor to intercept each
  365. /// request before calling the appropriate handler.
  366. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  367. /// code is nonisolated.
  368. /// - handleClient: A closure which is called with the client. When the closure returns, the
  369. /// client is shutdown gracefully.
  370. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  371. transport: Transport,
  372. interceptors: [any ClientInterceptor] = [],
  373. isolation: isolated (any Actor)? = #isolation,
  374. handleClient: (GRPCClient<Transport>) async throws -> Result
  375. ) async throws -> Result {
  376. try await withGRPCClient(
  377. transport: transport,
  378. interceptorPipeline: interceptors.map { .apply($0, to: .all) },
  379. isolation: isolation,
  380. handleClient: handleClient
  381. )
  382. }
  383. /// Creates and runs a new client with the given transport and interceptors.
  384. ///
  385. /// - Parameters:
  386. /// - transport: The transport used to establish a communication channel with a server.
  387. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  388. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  389. /// The order in which interceptors are added reflects the order in which they are called.
  390. /// The first interceptor added will be the first interceptor to intercept each request.
  391. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  392. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  393. /// code is nonisolated.
  394. /// - handleClient: A closure which is called with the client. When the closure returns, the
  395. /// client is shutdown gracefully.
  396. /// - Returns: The result of the `handleClient` closure.
  397. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  398. transport: Transport,
  399. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>],
  400. isolation: isolated (any Actor)? = #isolation,
  401. handleClient: (GRPCClient<Transport>) async throws -> Result
  402. ) async throws -> Result {
  403. try await withThrowingDiscardingTaskGroup { group in
  404. let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline)
  405. group.addTask {
  406. try await client.runConnections()
  407. }
  408. let result = try await handleClient(client)
  409. client.beginGracefulShutdown()
  410. return result
  411. }
  412. }