GRPCClient.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. private import Synchronization
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:options:onResponse:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:options:onResponse:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// ## Creating a client
  30. ///
  31. /// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)``
  32. /// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and
  33. /// run the client providing scoped access to it via the `handleClient` closure. The client will
  34. /// begin gracefully shutting down when the closure returns.
  35. ///
  36. /// ```swift
  37. /// let transport: any ClientTransport = ...
  38. /// try await withGRPCClient(transport: transport) { client in
  39. /// // ...
  40. /// }
  41. /// ```
  42. ///
  43. /// ## Creating a client manually
  44. ///
  45. /// If the `with`-style methods for creating clients isn't suitable for your application then you
  46. /// can create and run a client manually. This requires you to call the ``runConnections()`` method in a task
  47. /// which instructs the client to start connecting to the server.
  48. ///
  49. /// The ``runConnections()`` method won't return until the client has finished handling all requests. You can
  50. /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
  51. /// This gives the client enough time to drain any requests already in flight. To stop the client
  52. /// more abruptly you can cancel the task running your client. If your application requires
  53. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  54. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  55. @available(gRPCSwift 2.0, *)
  56. @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
  57. public final class GRPCClient<Transport: ClientTransport>: Sendable {
  58. /// The transport which provides a bidirectional communication channel with the server.
  59. private let transport: Transport
  60. /// The current state of the client.
  61. private let stateMachine: Mutex<StateMachine>
  62. /// The state of the client.
  63. private enum State: Sendable {
  64. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  65. case notStarted
  66. /// The client is running and can send RPCs. Can transition to `stopping`.
  67. case running
  68. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  69. /// completion. May transition to `stopped`.
  70. case stopping
  71. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  72. /// is terminal.
  73. case stopped
  74. mutating func run() throws {
  75. switch self {
  76. case .notStarted:
  77. self = .running
  78. case .running:
  79. throw RuntimeError(
  80. code: .clientIsAlreadyRunning,
  81. message: "The client is already running and can only be started once."
  82. )
  83. case .stopping, .stopped:
  84. throw RuntimeError(
  85. code: .clientIsStopped,
  86. message: """
  87. Can't call 'runConnections()' as the client is stopped (or is stopping). \
  88. This can happen if the you call 'runConnections()' after shutting the \
  89. client down or if you used 'withGRPCClient' with an empty body.
  90. """
  91. )
  92. }
  93. }
  94. mutating func stopped() {
  95. self = .stopped
  96. }
  97. mutating func beginGracefulShutdown() -> Bool {
  98. switch self {
  99. case .notStarted:
  100. self = .stopped
  101. return false
  102. case .running:
  103. self = .stopping
  104. return true
  105. case .stopping, .stopped:
  106. return false
  107. }
  108. }
  109. func checkExecutable() throws {
  110. switch self {
  111. case .notStarted, .running:
  112. // Allow .notStarted as making a request can race with 'runConnections()'. Transports should tolerate
  113. // queuing the request if not yet started.
  114. ()
  115. case .stopping, .stopped:
  116. throw RuntimeError(
  117. code: .clientIsStopped,
  118. message: "Client has been stopped. Can't make any more RPCs."
  119. )
  120. }
  121. }
  122. }
  123. private struct StateMachine {
  124. var state: State
  125. private let interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  126. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
  127. ///
  128. /// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
  129. /// This caching is done to avoid having to compute the applicable interceptors for each request made.
  130. ///
  131. /// The order in which interceptors are added reflects the order in which they are called. The
  132. /// first interceptor added will be the first interceptor to intercept each request. The last
  133. /// interceptor added will be the final interceptor to intercept each request before calling
  134. /// the appropriate handler.
  135. var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
  136. init(interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]) {
  137. self.state = .notStarted
  138. self.interceptorPipeline = interceptorPipeline
  139. self.interceptorsPerMethod = [:]
  140. }
  141. mutating func checkExecutableAndGetApplicableInterceptors(
  142. for method: MethodDescriptor
  143. ) throws -> [any ClientInterceptor] {
  144. try self.state.checkExecutable()
  145. guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
  146. let applicableInterceptors = self.interceptorPipeline
  147. .filter { $0.applies(to: method) }
  148. .map { $0.interceptor }
  149. self.interceptorsPerMethod[method] = applicableInterceptors
  150. return applicableInterceptors
  151. }
  152. return applicableInterceptors
  153. }
  154. }
  155. /// Creates a new client with the given transport, interceptors and configuration.
  156. ///
  157. /// - Parameters:
  158. /// - transport: The transport used to establish a communication channel with a server.
  159. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  160. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  161. /// are called. The first interceptor added will be the first interceptor to intercept each
  162. /// request. The last interceptor added will be the final interceptor to intercept each
  163. /// request before calling the appropriate handler.
  164. convenience public init(
  165. transport: Transport,
  166. interceptors: [any ClientInterceptor] = []
  167. ) {
  168. self.init(
  169. transport: transport,
  170. interceptorPipeline: interceptors.map { .apply($0, to: .all) }
  171. )
  172. }
  173. /// Creates a new client with the given transport, interceptors and configuration.
  174. ///
  175. /// - Parameters:
  176. /// - transport: The transport used to establish a communication channel with a server.
  177. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  178. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  179. /// The order in which interceptors are added reflects the order in which they are called.
  180. /// The first interceptor added will be the first interceptor to intercept each request.
  181. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  182. public init(
  183. transport: Transport,
  184. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>]
  185. ) {
  186. self.transport = transport
  187. self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
  188. }
  189. /// Start the client.
  190. ///
  191. /// This returns once ``beginGracefulShutdown()`` has been called and all in-flight RPCs have finished executing.
  192. /// If you need to abruptly stop all work you should cancel the task executing this method.
  193. ///
  194. /// The client, and by extension this function, can only be run once. If the client is already
  195. /// running or has already been closed then a ``RuntimeError`` is thrown.
  196. public func runConnections() async throws {
  197. try self.stateMachine.withLock { try $0.state.run() }
  198. // When this function exits the client must have stopped.
  199. defer {
  200. self.stateMachine.withLock { $0.state.stopped() }
  201. }
  202. do {
  203. try await self.transport.connect()
  204. } catch {
  205. throw RuntimeError(
  206. code: .transportError,
  207. message: "The transport threw an error while connected.",
  208. cause: error
  209. )
  210. }
  211. }
  212. /// Close the client.
  213. ///
  214. /// The transport will be closed: this means that it will be given enough time to wait for
  215. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  216. /// executing ``runConnections()`` if you want to abruptly stop in-flight RPCs.
  217. public func beginGracefulShutdown() {
  218. let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
  219. if wasRunning {
  220. self.transport.beginGracefulShutdown()
  221. }
  222. }
  223. /// Executes a unary RPC.
  224. ///
  225. /// - Parameters:
  226. /// - request: The unary request.
  227. /// - descriptor: The method descriptor for which to execute this request.
  228. /// - serializer: A request serializer.
  229. /// - deserializer: A response deserializer.
  230. /// - options: Call specific options.
  231. /// - handleResponse: A unary response handler.
  232. ///
  233. /// - Returns: The return value from the `handleResponse`.
  234. public func unary<Request, Response, ReturnValue: Sendable>(
  235. request: ClientRequest<Request>,
  236. descriptor: MethodDescriptor,
  237. serializer: some MessageSerializer<Request>,
  238. deserializer: some MessageDeserializer<Response>,
  239. options: CallOptions,
  240. onResponse handleResponse: @Sendable @escaping (
  241. _ response: ClientResponse<Response>
  242. ) async throws -> ReturnValue
  243. ) async throws -> ReturnValue {
  244. try await self.bidirectionalStreaming(
  245. request: StreamingClientRequest(single: request),
  246. descriptor: descriptor,
  247. serializer: serializer,
  248. deserializer: deserializer,
  249. options: options
  250. ) { stream in
  251. let singleResponse = await ClientResponse(stream: stream)
  252. return try await handleResponse(singleResponse)
  253. }
  254. }
  255. /// Start a client-streaming RPC.
  256. ///
  257. /// - Parameters:
  258. /// - request: The request stream.
  259. /// - descriptor: The method descriptor for which to execute this request.
  260. /// - serializer: A request serializer.
  261. /// - deserializer: A response deserializer.
  262. /// - options: Call specific options.
  263. /// - handleResponse: A unary response handler.
  264. ///
  265. /// - Returns: The return value from the `handleResponse`.
  266. public func clientStreaming<Request, Response, ReturnValue: Sendable>(
  267. request: StreamingClientRequest<Request>,
  268. descriptor: MethodDescriptor,
  269. serializer: some MessageSerializer<Request>,
  270. deserializer: some MessageDeserializer<Response>,
  271. options: CallOptions,
  272. onResponse handleResponse: @Sendable @escaping (
  273. _ response: ClientResponse<Response>
  274. ) async throws -> ReturnValue
  275. ) async throws -> ReturnValue {
  276. try await self.bidirectionalStreaming(
  277. request: request,
  278. descriptor: descriptor,
  279. serializer: serializer,
  280. deserializer: deserializer,
  281. options: options
  282. ) { stream in
  283. let singleResponse = await ClientResponse(stream: stream)
  284. return try await handleResponse(singleResponse)
  285. }
  286. }
  287. /// Start a server-streaming RPC.
  288. ///
  289. /// - Parameters:
  290. /// - request: The unary request.
  291. /// - descriptor: The method descriptor for which to execute this request.
  292. /// - serializer: A request serializer.
  293. /// - deserializer: A response deserializer.
  294. /// - options: Call specific options.
  295. /// - handleResponse: A response stream handler.
  296. ///
  297. /// - Returns: The return value from the `handleResponse`.
  298. public func serverStreaming<Request, Response, ReturnValue: Sendable>(
  299. request: ClientRequest<Request>,
  300. descriptor: MethodDescriptor,
  301. serializer: some MessageSerializer<Request>,
  302. deserializer: some MessageDeserializer<Response>,
  303. options: CallOptions,
  304. onResponse handleResponse: @Sendable @escaping (
  305. _ response: StreamingClientResponse<Response>
  306. ) async throws -> ReturnValue
  307. ) async throws -> ReturnValue {
  308. try await self.bidirectionalStreaming(
  309. request: StreamingClientRequest(single: request),
  310. descriptor: descriptor,
  311. serializer: serializer,
  312. deserializer: deserializer,
  313. options: options,
  314. onResponse: handleResponse
  315. )
  316. }
  317. /// Start a bidirectional streaming RPC.
  318. ///
  319. /// - Note: ``runConnections()`` must have been called and still executing, and ``beginGracefulShutdown()`` mustn't
  320. /// have been called.
  321. ///
  322. /// - Parameters:
  323. /// - request: The streaming request.
  324. /// - descriptor: The method descriptor for which to execute this request.
  325. /// - serializer: A request serializer.
  326. /// - deserializer: A response deserializer.
  327. /// - options: Call specific options.
  328. /// - handleResponse: A response stream handler.
  329. ///
  330. /// - Returns: The return value from the `handleResponse`.
  331. public func bidirectionalStreaming<Request, Response, ReturnValue: Sendable>(
  332. request: StreamingClientRequest<Request>,
  333. descriptor: MethodDescriptor,
  334. serializer: some MessageSerializer<Request>,
  335. deserializer: some MessageDeserializer<Response>,
  336. options: CallOptions,
  337. onResponse handleResponse: @Sendable @escaping (
  338. _ response: StreamingClientResponse<Response>
  339. ) async throws -> ReturnValue
  340. ) async throws -> ReturnValue {
  341. let applicableInterceptors = try self.stateMachine.withLock {
  342. try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
  343. }
  344. let methodConfig = self.transport.config(forMethod: descriptor)
  345. var options = options
  346. options.formUnion(with: methodConfig)
  347. return try await ClientRPCExecutor.execute(
  348. request: request,
  349. method: descriptor,
  350. options: options,
  351. serializer: serializer,
  352. deserializer: deserializer,
  353. transport: self.transport,
  354. interceptors: applicableInterceptors,
  355. handler: handleResponse
  356. )
  357. }
  358. }
  359. /// Creates and runs a new client with the given transport and interceptors.
  360. ///
  361. /// - Parameters:
  362. /// - transport: The transport used to establish a communication channel with a server.
  363. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  364. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  365. /// are called. The first interceptor added will be the first interceptor to intercept each
  366. /// request. The last interceptor added will be the final interceptor to intercept each
  367. /// request before calling the appropriate handler.
  368. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  369. /// code is nonisolated.
  370. /// - handleClient: A closure which is called with the client. When the closure returns, the
  371. /// client is shutdown gracefully.
  372. @available(gRPCSwift 2.0, *)
  373. @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
  374. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  375. transport: Transport,
  376. interceptors: [any ClientInterceptor] = [],
  377. isolation: isolated (any Actor)? = #isolation,
  378. handleClient: (GRPCClient<Transport>) async throws -> Result
  379. ) async throws -> Result {
  380. try await withGRPCClient(
  381. transport: transport,
  382. interceptorPipeline: interceptors.map { .apply($0, to: .all) },
  383. isolation: isolation,
  384. handleClient: handleClient
  385. )
  386. }
  387. /// Creates and runs a new client with the given transport and interceptors.
  388. ///
  389. /// - Parameters:
  390. /// - transport: The transport used to establish a communication channel with a server.
  391. /// - interceptorPipeline: A collection of ``ConditionalInterceptor``s providing cross-cutting
  392. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  393. /// The order in which interceptors are added reflects the order in which they are called.
  394. /// The first interceptor added will be the first interceptor to intercept each request.
  395. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  396. /// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
  397. /// code is nonisolated.
  398. /// - handleClient: A closure which is called with the client. When the closure returns, the
  399. /// client is shutdown gracefully.
  400. /// - Returns: The result of the `handleClient` closure.
  401. @available(gRPCSwift 2.0, *)
  402. @available(*, deprecated, message: "See https://forums.swift.org/t/80177")
  403. public func withGRPCClient<Transport: ClientTransport, Result: Sendable>(
  404. transport: Transport,
  405. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>],
  406. isolation: isolated (any Actor)? = #isolation,
  407. handleClient: (GRPCClient<Transport>) async throws -> Result
  408. ) async throws -> Result {
  409. try await withThrowingDiscardingTaskGroup { group in
  410. let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline)
  411. group.addTask {
  412. try await client.runConnections()
  413. }
  414. let result = try await handleClient(client)
  415. client.beginGracefulShutdown()
  416. return result
  417. }
  418. }