GRPCClient.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. private import Synchronization
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:options:handler:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// You can set ``ServiceConfig``s on this client to override whatever configurations have been
  30. /// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
  31. /// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
  32. ///
  33. /// ## Creating and configuring a client
  34. ///
  35. /// The following example demonstrates how to create and configure a client.
  36. ///
  37. /// ```swift
  38. /// // Create a configuration object for the client and override the timeout for the 'Get' method on
  39. /// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport.
  40. /// var configuration = GRPCClient.Configuration()
  41. /// configuration.service.override = ServiceConfig(
  42. /// methodConfig: [
  43. /// MethodConfig(
  44. /// names: [
  45. /// MethodConfig.Name(service: "echo.Echo", method: "Get")
  46. /// ],
  47. /// timeout: .seconds(5)
  48. /// )
  49. /// ]
  50. /// )
  51. ///
  52. /// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if
  53. /// // no configuration is provided in the overrides or by the transport.
  54. /// configuration.service.defaults = ServiceConfig(
  55. /// methodConfig: [
  56. /// MethodConfig(
  57. /// names: [
  58. /// MethodConfig.Name(service: "", method: "")
  59. /// ],
  60. /// timeout: .seconds(10)
  61. /// )
  62. /// ]
  63. /// )
  64. ///
  65. /// // Finally create a transport and instantiate the client, adding an interceptor.
  66. /// let inProcessTransport = InProcessTransport()
  67. ///
  68. /// let client = GRPCClient(
  69. /// transport: inProcessTransport.client,
  70. /// interceptors: [StatsRecordingClientInterceptor()],
  71. /// configuration: configuration
  72. /// )
  73. /// ```
  74. ///
  75. /// ## Starting and stopping the client
  76. ///
  77. /// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
  78. /// transport to start connecting to the server.
  79. ///
  80. /// ```swift
  81. /// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
  82. /// // a task group.
  83. /// try await withThrowingTaskGroup(of: Void.self) { group in
  84. /// group.addTask {
  85. /// try await client.run()
  86. /// }
  87. ///
  88. /// // Execute a request against the "echo.Echo" service.
  89. /// try await client.unary(
  90. /// request: ClientRequest<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
  91. /// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
  92. /// serializer: IdentitySerializer(),
  93. /// deserializer: IdentityDeserializer(),
  94. /// ) { response in
  95. /// print(response.message)
  96. /// }
  97. ///
  98. /// // The RPC has completed, close the client.
  99. /// client.beginGracefulShutdown()
  100. /// }
  101. /// ```
  102. ///
  103. /// The ``run()`` method won't return until the client has finished handling all requests. You can
  104. /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
  105. /// This gives the client enough time to drain any requests already in flight. To stop the client
  106. /// more abruptly you can cancel the task running your client. If your application requires
  107. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  108. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  109. public final class GRPCClient: Sendable {
  110. /// The transport which provides a bidirectional communication channel with the server.
  111. private let transport: any ClientTransport
  112. /// The current state of the client.
  113. private let stateMachine: Mutex<StateMachine>
  114. /// The state of the client.
  115. private enum State: Sendable {
  116. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  117. case notStarted
  118. /// The client is running and can send RPCs. Can transition to `stopping`.
  119. case running
  120. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  121. /// completion. May transition to `stopped`.
  122. case stopping
  123. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  124. /// is terminal.
  125. case stopped
  126. mutating func run() throws {
  127. switch self {
  128. case .notStarted:
  129. self = .running
  130. case .running:
  131. throw RuntimeError(
  132. code: .clientIsAlreadyRunning,
  133. message: "The client is already running and can only be started once."
  134. )
  135. case .stopping, .stopped:
  136. throw RuntimeError(
  137. code: .clientIsStopped,
  138. message: "The client has stopped and can only be started once."
  139. )
  140. }
  141. }
  142. mutating func stopped() {
  143. self = .stopped
  144. }
  145. mutating func beginGracefulShutdown() -> Bool {
  146. switch self {
  147. case .notStarted:
  148. self = .stopped
  149. return false
  150. case .running:
  151. self = .stopping
  152. return true
  153. case .stopping, .stopped:
  154. return false
  155. }
  156. }
  157. func checkExecutable() throws {
  158. switch self {
  159. case .notStarted, .running:
  160. // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
  161. // queuing the request if not yet started.
  162. ()
  163. case .stopping, .stopped:
  164. throw RuntimeError(
  165. code: .clientIsStopped,
  166. message: "Client has been stopped. Can't make any more RPCs."
  167. )
  168. }
  169. }
  170. }
  171. private struct StateMachine {
  172. var state: State
  173. private let interceptorPipeline: [ClientInterceptorPipelineOperation]
  174. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
  175. ///
  176. /// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
  177. /// This caching is done to avoid having to compute the applicable interceptors for each request made.
  178. ///
  179. /// The order in which interceptors are added reflects the order in which they are called. The
  180. /// first interceptor added will be the first interceptor to intercept each request. The last
  181. /// interceptor added will be the final interceptor to intercept each request before calling
  182. /// the appropriate handler.
  183. var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
  184. init(interceptorPipeline: [ClientInterceptorPipelineOperation]) {
  185. self.state = .notStarted
  186. self.interceptorPipeline = interceptorPipeline
  187. self.interceptorsPerMethod = [:]
  188. }
  189. mutating func checkExecutableAndGetApplicableInterceptors(
  190. for method: MethodDescriptor
  191. ) throws -> [any ClientInterceptor] {
  192. try self.state.checkExecutable()
  193. guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
  194. let applicableInterceptors = self.interceptorPipeline
  195. .filter { $0.applies(to: method) }
  196. .map { $0.interceptor }
  197. self.interceptorsPerMethod[method] = applicableInterceptors
  198. return applicableInterceptors
  199. }
  200. return applicableInterceptors
  201. }
  202. }
  203. /// Creates a new client with the given transport, interceptors and configuration.
  204. ///
  205. /// - Parameters:
  206. /// - transport: The transport used to establish a communication channel with a server.
  207. /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
  208. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  209. /// are called. The first interceptor added will be the first interceptor to intercept each
  210. /// request. The last interceptor added will be the final interceptor to intercept each
  211. /// request before calling the appropriate handler.
  212. convenience public init(
  213. transport: some ClientTransport,
  214. interceptors: [any ClientInterceptor] = []
  215. ) {
  216. self.init(
  217. transport: transport,
  218. interceptorPipeline: interceptors.map { .apply($0, to: .all) }
  219. )
  220. }
  221. /// Creates a new client with the given transport, interceptors and configuration.
  222. ///
  223. /// - Parameters:
  224. /// - transport: The transport used to establish a communication channel with a server.
  225. /// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
  226. /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
  227. /// The order in which interceptors are added reflects the order in which they are called.
  228. /// The first interceptor added will be the first interceptor to intercept each request.
  229. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
  230. public init(
  231. transport: some ClientTransport,
  232. interceptorPipeline: [ClientInterceptorPipelineOperation]
  233. ) {
  234. self.transport = transport
  235. self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
  236. }
  237. /// Start the client.
  238. ///
  239. /// This returns once ``beginGracefulShutdown()`` has been called and all in-flight RPCs have finished executing.
  240. /// If you need to abruptly stop all work you should cancel the task executing this method.
  241. ///
  242. /// The client, and by extension this function, can only be run once. If the client is already
  243. /// running or has already been closed then a ``RuntimeError`` is thrown.
  244. public func run() async throws {
  245. try self.stateMachine.withLock { try $0.state.run() }
  246. // When this function exits the client must have stopped.
  247. defer {
  248. self.stateMachine.withLock { $0.state.stopped() }
  249. }
  250. do {
  251. try await self.transport.connect()
  252. } catch {
  253. throw RuntimeError(
  254. code: .transportError,
  255. message: "The transport threw an error while connected.",
  256. cause: error
  257. )
  258. }
  259. }
  260. /// Close the client.
  261. ///
  262. /// The transport will be closed: this means that it will be given enough time to wait for
  263. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  264. /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
  265. public func beginGracefulShutdown() {
  266. let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
  267. if wasRunning {
  268. self.transport.beginGracefulShutdown()
  269. }
  270. }
  271. /// Executes a unary RPC.
  272. ///
  273. /// - Parameters:
  274. /// - request: The unary request.
  275. /// - descriptor: The method descriptor for which to execute this request.
  276. /// - serializer: A request serializer.
  277. /// - deserializer: A response deserializer.
  278. /// - options: Call specific options.
  279. /// - handler: A unary response handler.
  280. ///
  281. /// - Returns: The return value from the `handler`.
  282. public func unary<Request, Response, ReturnValue: Sendable>(
  283. request: ClientRequest<Request>,
  284. descriptor: MethodDescriptor,
  285. serializer: some MessageSerializer<Request>,
  286. deserializer: some MessageDeserializer<Response>,
  287. options: CallOptions,
  288. handler: @Sendable @escaping (ClientResponse<Response>) async throws -> ReturnValue
  289. ) async throws -> ReturnValue {
  290. try await self.bidirectionalStreaming(
  291. request: StreamingClientRequest(single: request),
  292. descriptor: descriptor,
  293. serializer: serializer,
  294. deserializer: deserializer,
  295. options: options
  296. ) { stream in
  297. let singleResponse = await ClientResponse(stream: stream)
  298. return try await handler(singleResponse)
  299. }
  300. }
  301. /// Start a client-streaming RPC.
  302. ///
  303. /// - Parameters:
  304. /// - request: The request stream.
  305. /// - descriptor: The method descriptor for which to execute this request.
  306. /// - serializer: A request serializer.
  307. /// - deserializer: A response deserializer.
  308. /// - options: Call specific options.
  309. /// - handler: A unary response handler.
  310. ///
  311. /// - Returns: The return value from the `handler`.
  312. public func clientStreaming<Request, Response, ReturnValue: Sendable>(
  313. request: StreamingClientRequest<Request>,
  314. descriptor: MethodDescriptor,
  315. serializer: some MessageSerializer<Request>,
  316. deserializer: some MessageDeserializer<Response>,
  317. options: CallOptions,
  318. handler: @Sendable @escaping (ClientResponse<Response>) async throws -> ReturnValue
  319. ) async throws -> ReturnValue {
  320. try await self.bidirectionalStreaming(
  321. request: request,
  322. descriptor: descriptor,
  323. serializer: serializer,
  324. deserializer: deserializer,
  325. options: options
  326. ) { stream in
  327. let singleResponse = await ClientResponse(stream: stream)
  328. return try await handler(singleResponse)
  329. }
  330. }
  331. /// Start a server-streaming RPC.
  332. ///
  333. /// - Parameters:
  334. /// - request: The unary request.
  335. /// - descriptor: The method descriptor for which to execute this request.
  336. /// - serializer: A request serializer.
  337. /// - deserializer: A response deserializer.
  338. /// - options: Call specific options.
  339. /// - handler: A response stream handler.
  340. ///
  341. /// - Returns: The return value from the `handler`.
  342. public func serverStreaming<Request, Response, ReturnValue: Sendable>(
  343. request: ClientRequest<Request>,
  344. descriptor: MethodDescriptor,
  345. serializer: some MessageSerializer<Request>,
  346. deserializer: some MessageDeserializer<Response>,
  347. options: CallOptions,
  348. handler: @Sendable @escaping (StreamingClientResponse<Response>) async throws -> ReturnValue
  349. ) async throws -> ReturnValue {
  350. try await self.bidirectionalStreaming(
  351. request: StreamingClientRequest(single: request),
  352. descriptor: descriptor,
  353. serializer: serializer,
  354. deserializer: deserializer,
  355. options: options,
  356. handler: handler
  357. )
  358. }
  359. /// Start a bidirectional streaming RPC.
  360. ///
  361. /// - Note: ``run()`` must have been called and still executing, and ``beginGracefulShutdown()`` mustn't
  362. /// have been called.
  363. ///
  364. /// - Parameters:
  365. /// - request: The streaming request.
  366. /// - descriptor: The method descriptor for which to execute this request.
  367. /// - serializer: A request serializer.
  368. /// - deserializer: A response deserializer.
  369. /// - options: Call specific options.
  370. /// - handler: A response stream handler.
  371. ///
  372. /// - Returns: The return value from the `handler`.
  373. public func bidirectionalStreaming<Request, Response, ReturnValue: Sendable>(
  374. request: StreamingClientRequest<Request>,
  375. descriptor: MethodDescriptor,
  376. serializer: some MessageSerializer<Request>,
  377. deserializer: some MessageDeserializer<Response>,
  378. options: CallOptions,
  379. handler: @Sendable @escaping (StreamingClientResponse<Response>) async throws -> ReturnValue
  380. ) async throws -> ReturnValue {
  381. let applicableInterceptors = try self.stateMachine.withLock {
  382. try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
  383. }
  384. let methodConfig = self.transport.config(forMethod: descriptor)
  385. var options = options
  386. options.formUnion(with: methodConfig)
  387. return try await ClientRPCExecutor.execute(
  388. request: request,
  389. method: descriptor,
  390. options: options,
  391. serializer: serializer,
  392. deserializer: deserializer,
  393. transport: self.transport,
  394. interceptors: applicableInterceptors,
  395. handler: handler
  396. )
  397. }
  398. }