GRPCClient.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:handler:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:handler:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:handler:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// You can set ``MethodConfiguration``s on this client to override whatever configurations have been
  30. /// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
  31. /// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
  32. ///
  33. /// ## Creating and configuring a client
  34. ///
  35. /// The following example demonstrates how to create and configure a client.
  36. ///
  37. /// ```swift
  38. /// // Create a configuration object for the client.
  39. /// var configuration = GRPCClient.Configuration()
  40. ///
  41. /// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration
  42. /// // takes precedence over any set by the transport.
  43. /// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get")
  44. /// configuration.method.overrides[echoGet] = MethodConfiguration(
  45. /// executionPolicy: nil,
  46. /// timeout: .seconds(5)
  47. /// )
  48. ///
  49. /// // Configure a fallback timeout for all RPCs if no configuration is provided in the overrides
  50. /// // or by the transport.
  51. /// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10))
  52. /// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration)
  53. ///
  54. /// // Finally create a transport and instantiate the client, adding an interceptor.
  55. /// let inProcessServerTransport = InProcessServerTransport()
  56. /// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
  57. ///
  58. /// let client = GRPCClient(
  59. /// transport: inProcessClientTransport,
  60. /// interceptors: [StatsRecordingClientInterceptor()],
  61. /// configuration: configuration
  62. /// )
  63. /// ```
  64. ///
  65. /// ## Starting and stopping the client
  66. ///
  67. /// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
  68. /// transport to start connecting to the server.
  69. ///
  70. /// ```swift
  71. /// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
  72. /// // a task group.
  73. /// try await withThrowingTaskGroup(of: Void.self) { group in
  74. /// group.addTask {
  75. /// try await client.run()
  76. /// }
  77. ///
  78. /// // Execute a request against the "echo.Echo" service.
  79. /// try await client.unary(
  80. /// request: ClientRequest.Single<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
  81. /// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
  82. /// serializer: IdentitySerializer(),
  83. /// deserializer: IdentityDeserializer(),
  84. /// ) { response in
  85. /// print(response.message)
  86. /// }
  87. ///
  88. /// // The RPC has completed, close the client.
  89. /// client.close()
  90. /// }
  91. /// ```
  92. ///
  93. /// The ``run()`` method won't return until the client has finished handling all requests. You can
  94. /// signal to the client that it should stop creating new request streams by calling ``close()``.
  95. /// This gives the client enough time to drain any requests already in flight. To stop the client
  96. /// more abruptly you can cancel the task running your client. If your application requires
  97. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  98. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  99. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  100. public struct GRPCClient: Sendable {
  101. /// The transport which provides a bidirectional communication channel with the server.
  102. private let transport: any ClientTransport
  103. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
  104. ///
  105. /// The order in which interceptors are added reflects the order in which they are called. The
  106. /// first interceptor added will be the first interceptor to intercept each request. The last
  107. /// interceptor added will be the final interceptor to intercept each request before calling
  108. /// the appropriate handler.
  109. private let interceptors: [any ClientInterceptor]
  110. /// The configuration used by the client.
  111. public let configuration: Configuration
  112. /// The current state of the client.
  113. private let state: ManagedAtomic<State>
  114. /// The state of the client.
  115. private enum State: UInt8, AtomicValue {
  116. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  117. case notStarted
  118. /// The client is running and can send RPCs. Can transition to `stopping`.
  119. case running
  120. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  121. /// completion. May transition to `stopped`.
  122. case stopping
  123. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  124. /// is terminal.
  125. case stopped
  126. }
  127. /// Creates a new client with the given transport, interceptors and configuration.
  128. ///
  129. /// - Parameters:
  130. /// - transport: The transport used to establish a communication channel with a server.
  131. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  132. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  133. /// are called. The first interceptor added will be the first interceptor to intercept each
  134. /// request. The last interceptor added will be the final interceptor to intercept each
  135. /// request before calling the appropriate handler.
  136. /// - configuration: Configuration for the client.
  137. public init(
  138. transport: some ClientTransport,
  139. interceptors: [any ClientInterceptor] = [],
  140. configuration: Configuration = Configuration()
  141. ) {
  142. self.transport = transport
  143. self.interceptors = interceptors
  144. self.configuration = configuration
  145. self.state = ManagedAtomic(.notStarted)
  146. }
  147. /// Start the client.
  148. ///
  149. /// This returns once ``close()`` has been called and all in-flight RPCs have finished executing.
  150. /// If you need to abruptly stop all work you should cancel the task executing this method.
  151. ///
  152. /// The client, and by extension this function, can only be run once. If the client is already
  153. /// running or has already been closed then a ``ClientError`` is thrown.
  154. public func run() async throws {
  155. let (wasNotStarted, original) = self.state.compareExchange(
  156. expected: .notStarted,
  157. desired: .running,
  158. ordering: .sequentiallyConsistent
  159. )
  160. guard wasNotStarted else {
  161. switch original {
  162. case .notStarted:
  163. // The value wasn't exchanged so the original value can't be 'notStarted'.
  164. fatalError()
  165. case .running:
  166. throw ClientError(
  167. code: .clientIsAlreadyRunning,
  168. message: "The client is already running and can only be started once."
  169. )
  170. case .stopping, .stopped:
  171. throw ClientError(
  172. code: .clientIsStopped,
  173. message: "The client has stopped and can only be started once."
  174. )
  175. }
  176. }
  177. // When we exit this function we must have stopped.
  178. defer {
  179. self.state.store(.stopped, ordering: .sequentiallyConsistent)
  180. }
  181. do {
  182. try await self.transport.connect(lazily: false)
  183. } catch {
  184. throw ClientError(
  185. code: .transportError,
  186. message: "The transport threw an error while connected.",
  187. cause: error
  188. )
  189. }
  190. }
  191. /// Close the client.
  192. ///
  193. /// The transport will be closed: this means that it will be given enough time to wait for
  194. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  195. /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
  196. public func close() {
  197. while true {
  198. let (wasRunning, actualState) = self.state.compareExchange(
  199. expected: .running,
  200. desired: .stopping,
  201. ordering: .sequentiallyConsistent
  202. )
  203. // Transition from running to stopping: close the transport.
  204. if wasRunning {
  205. self.transport.close()
  206. return
  207. }
  208. // The expected state wasn't 'running'. There are two options:
  209. // 1. The client isn't running yet.
  210. // 2. The client is already stopping or stopped.
  211. switch actualState {
  212. case .notStarted:
  213. // Not started: try going straight to stopped.
  214. let (wasNotStarted, _) = self.state.compareExchange(
  215. expected: .notStarted,
  216. desired: .stopped,
  217. ordering: .sequentiallyConsistent
  218. )
  219. // If the exchange happened then just return: the client wasn't started so there's no
  220. // transport to start.
  221. //
  222. // If the exchange didn't happen then continue looping: the client must've been started by
  223. // another thread.
  224. if wasNotStarted {
  225. return
  226. } else {
  227. continue
  228. }
  229. case .running:
  230. // Unreachable: the value was exchanged and this was the expected value.
  231. fatalError()
  232. case .stopping, .stopped:
  233. // No exchange happened but the client is already stopping.
  234. return
  235. }
  236. }
  237. }
  238. /// Executes a unary RPC.
  239. ///
  240. /// - Parameters:
  241. /// - request: The unary request.
  242. /// - descriptor: The method descriptor for which to execute this request.
  243. /// - serializer: A request serializer.
  244. /// - deserializer: A response deserializer.
  245. /// - handler: A unary response handler.
  246. ///
  247. /// - Returns: The return value from the `handler`.
  248. public func unary<Request, Response, ReturnValue>(
  249. request: ClientRequest.Single<Request>,
  250. descriptor: MethodDescriptor,
  251. serializer: some MessageSerializer<Request>,
  252. deserializer: some MessageDeserializer<Response>,
  253. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  254. ) async throws -> ReturnValue {
  255. try await self.bidirectionalStreaming(
  256. request: ClientRequest.Stream(single: request),
  257. descriptor: descriptor,
  258. serializer: serializer,
  259. deserializer: deserializer
  260. ) { stream in
  261. let singleResponse = await ClientResponse.Single(stream: stream)
  262. return try await handler(singleResponse)
  263. }
  264. }
  265. /// Start a client-streaming RPC.
  266. ///
  267. /// - Parameters:
  268. /// - request: The request stream.
  269. /// - descriptor: The method descriptor for which to execute this request.
  270. /// - serializer: A request serializer.
  271. /// - deserializer: A response deserializer.
  272. /// - handler: A unary response handler.
  273. ///
  274. /// - Returns: The return value from the `handler`.
  275. public func clientStreaming<Request, Response, ReturnValue>(
  276. request: ClientRequest.Stream<Request>,
  277. descriptor: MethodDescriptor,
  278. serializer: some MessageSerializer<Request>,
  279. deserializer: some MessageDeserializer<Response>,
  280. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  281. ) async throws -> ReturnValue {
  282. try await self.bidirectionalStreaming(
  283. request: request,
  284. descriptor: descriptor,
  285. serializer: serializer,
  286. deserializer: deserializer
  287. ) { stream in
  288. let singleResponse = await ClientResponse.Single(stream: stream)
  289. return try await handler(singleResponse)
  290. }
  291. }
  292. /// Start a server-streaming RPC.
  293. ///
  294. /// - Parameters:
  295. /// - request: The unary request.
  296. /// - descriptor: The method descriptor for which to execute this request.
  297. /// - serializer: A request serializer.
  298. /// - deserializer: A response deserializer.
  299. /// - handler: A response stream handler.
  300. ///
  301. /// - Returns: The return value from the `handler`.
  302. public func serverStreaming<Request, Response, ReturnValue>(
  303. request: ClientRequest.Single<Request>,
  304. descriptor: MethodDescriptor,
  305. serializer: some MessageSerializer<Request>,
  306. deserializer: some MessageDeserializer<Response>,
  307. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  308. ) async throws -> ReturnValue {
  309. try await self.bidirectionalStreaming(
  310. request: ClientRequest.Stream(single: request),
  311. descriptor: descriptor,
  312. serializer: serializer,
  313. deserializer: deserializer,
  314. handler: handler
  315. )
  316. }
  317. /// Start a bidirectional streaming RPC.
  318. ///
  319. /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't
  320. /// have been called.
  321. ///
  322. /// - Parameters:
  323. /// - request: The streaming request.
  324. /// - descriptor: The method descriptor for which to execute this request.
  325. /// - serializer: A request serializer.
  326. /// - deserializer: A response deserializer.
  327. /// - handler: A response stream handler.
  328. ///
  329. /// - Returns: The return value from the `handler`.
  330. public func bidirectionalStreaming<Request, Response, ReturnValue>(
  331. request: ClientRequest.Stream<Request>,
  332. descriptor: MethodDescriptor,
  333. serializer: some MessageSerializer<Request>,
  334. deserializer: some MessageDeserializer<Response>,
  335. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  336. ) async throws -> ReturnValue {
  337. switch self.state.load(ordering: .sequentiallyConsistent) {
  338. case .notStarted, .running:
  339. // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
  340. // queuing the request if not yet started.
  341. ()
  342. case .stopping, .stopped:
  343. throw ClientError(
  344. code: .clientIsStopped,
  345. message: "Client has been stopped. Can't make any more RPCs."
  346. )
  347. }
  348. return try await ClientRPCExecutor.execute(
  349. request: request,
  350. method: descriptor,
  351. configuration: self.resolveMethodConfiguration(for: descriptor),
  352. serializer: serializer,
  353. deserializer: deserializer,
  354. transport: self.transport,
  355. interceptors: self.interceptors,
  356. handler: handler
  357. )
  358. }
  359. private func resolveMethodConfiguration(for descriptor: MethodDescriptor) -> MethodConfiguration {
  360. if let configuration = self.configuration.method.overrides[descriptor] {
  361. return configuration
  362. }
  363. if let configuration = self.transport.executionConfiguration(forMethod: descriptor) {
  364. return configuration
  365. }
  366. if let configuration = self.configuration.method.defaults[descriptor] {
  367. return configuration
  368. }
  369. // No configuration found, return the "vanilla" configuration.
  370. return MethodConfiguration(executionPolicy: nil, timeout: nil)
  371. }
  372. }
  373. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  374. extension GRPCClient {
  375. public struct Configuration: Sendable {
  376. /// Configuration for how methods are executed.
  377. ///
  378. /// Method configuration determines how each RPC is executed by the client. Some services and
  379. /// transports provide this information to the client when the server name is resolved. However,
  380. /// you override this configuration and set default values should no override be set and the
  381. /// transport doesn't provide a value.
  382. public var method: Method
  383. /// Creates a new default configuration.
  384. public init() {
  385. self.method = Method()
  386. }
  387. }
  388. }
  389. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  390. extension GRPCClient.Configuration {
  391. /// Configuration for how methods should be executed.
  392. ///
  393. /// In most cases the client should defer to the configuration provided by the transport as this
  394. /// can be provided to the transport as part of name resolution when establishing a connection.
  395. ///
  396. /// The client first checks ``overrides`` for a configuration, followed by the transport, followed
  397. /// by ``defaults``.
  398. public struct Method: Sendable, Hashable {
  399. /// Configuration to use in precedence to that provided by the transport.
  400. public var overrides: MethodConfigurations
  401. /// Configuration to use only if there are no overrides and the transport doesn't specify
  402. /// any configuration.
  403. public var defaults: MethodConfigurations
  404. public init() {
  405. self.overrides = MethodConfigurations()
  406. self.defaults = MethodConfigurations()
  407. }
  408. }
  409. }