GRPCClient.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:handler:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:handler:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:handler:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// You can set ``MethodConfiguration``s on this client to override whatever configurations have been
  30. /// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
  31. /// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
  32. ///
  33. /// ## Creating and configuring a client
  34. ///
  35. /// The following example demonstrates how to create and configure a client.
  36. ///
  37. /// ```swift
  38. /// // Create a configuration object for the client.
  39. /// var configuration = GRPCClient.Configuration()
  40. ///
  41. /// // Create and add an interceptor.
  42. /// configuration.interceptors.add(StatsRecordingClientInterceptor())
  43. ///
  44. /// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration
  45. /// // takes precedence over any set by the transport.
  46. /// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get")
  47. /// configuration.method.overrides[echoGet] = MethodConfiguration(
  48. /// executionPolicy: nil,
  49. /// timeout: .seconds(5)
  50. /// )
  51. ///
  52. /// // Configure a fallback timeout for all RPCs if no configuration is provided in the overrides
  53. /// // or by the transport.
  54. /// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10))
  55. /// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration)
  56. ///
  57. /// // Finally create a transport and instantiate the client.
  58. /// let inProcessServerTransport = InProcessServerTransport()
  59. /// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
  60. /// let client = GRPCClient(transport: inProcessClientTransport, configuration: configuration)
  61. /// ```
  62. ///
  63. /// ## Starting and stopping the client
  64. ///
  65. /// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
  66. /// transport to start connecting to the server.
  67. ///
  68. /// ```swift
  69. /// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
  70. /// // a task group.
  71. /// try await withThrowingTaskGroup(of: Void.self) { group in
  72. /// group.addTask {
  73. /// try await client.run()
  74. /// }
  75. ///
  76. /// // Execute a request against the "echo.Echo" service.
  77. /// try await client.unary(
  78. /// request: ClientRequest.Single<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
  79. /// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
  80. /// serializer: IdentitySerializer(),
  81. /// deserializer: IdentityDeserializer(),
  82. /// ) { response in
  83. /// print(response.message)
  84. /// }
  85. ///
  86. /// // The RPC has completed, close the client.
  87. /// client.close()
  88. /// }
  89. /// ```
  90. ///
  91. /// The ``run()`` method won't return until the client has finished handling all requests. You can
  92. /// signal to the client that it should stop creating new request streams by calling ``close()``.
  93. /// This gives the client enough time to drain any requests already in flight. To stop the client
  94. /// more abruptly you can cancel the task running your client. If your application requires
  95. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  96. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  97. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  98. public struct GRPCClient: Sendable {
  99. /// The transport which provides a bidirectional communication channel with the server.
  100. private let transport: any ClientTransport
  101. /// The configuration used by the client.
  102. public let configuration: Configuration
  103. /// The current state of the client.
  104. private let state: ManagedAtomic<State>
  105. /// The state of the client.
  106. private enum State: UInt8, AtomicValue {
  107. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  108. case notStarted
  109. /// The client is running and can send RPCs. Can transition to `stopping`.
  110. case running
  111. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  112. /// completion. May transition to `stopped`.
  113. case stopping
  114. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  115. /// is terminal.
  116. case stopped
  117. }
  118. /// Creates a new client with the given transport and configuration.
  119. ///
  120. /// - Parameters:
  121. /// - transport: The transport used to establish a communication channel with a server.
  122. /// - configuration: Configuration for the client.
  123. public init(transport: some ClientTransport, configuration: Configuration = Configuration()) {
  124. self.transport = transport
  125. self.configuration = configuration
  126. self.state = ManagedAtomic(.notStarted)
  127. }
  128. /// Start the client.
  129. ///
  130. /// This returns once ``close()`` has been called and all in-flight RPCs have finished executing.
  131. /// If you need to abruptly stop all work you should cancel the task executing this method.
  132. ///
  133. /// The client, and by extension this function, can only be run once. If the client is already
  134. /// running or has already been closed then a ``ClientError`` is thrown.
  135. public func run() async throws {
  136. let (wasNotStarted, original) = self.state.compareExchange(
  137. expected: .notStarted,
  138. desired: .running,
  139. ordering: .sequentiallyConsistent
  140. )
  141. guard wasNotStarted else {
  142. switch original {
  143. case .notStarted:
  144. // The value wasn't exchanged so the original value can't be 'notStarted'.
  145. fatalError()
  146. case .running:
  147. throw ClientError(
  148. code: .clientIsAlreadyRunning,
  149. message: "The client is already running and can only be started once."
  150. )
  151. case .stopping, .stopped:
  152. throw ClientError(
  153. code: .clientIsStopped,
  154. message: "The client has stopped and can only be started once."
  155. )
  156. }
  157. }
  158. // When we exit this function we must have stopped.
  159. defer {
  160. self.state.store(.stopped, ordering: .sequentiallyConsistent)
  161. }
  162. do {
  163. try await self.transport.connect(lazily: false)
  164. } catch {
  165. throw ClientError(
  166. code: .transportError,
  167. message: "The transport threw an error while connected.",
  168. cause: error
  169. )
  170. }
  171. }
  172. /// Close the client.
  173. ///
  174. /// The transport will be closed: this means that it will be given enough time to wait for
  175. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  176. /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
  177. public func close() {
  178. while true {
  179. let (wasRunning, actualState) = self.state.compareExchange(
  180. expected: .running,
  181. desired: .stopping,
  182. ordering: .sequentiallyConsistent
  183. )
  184. // Transition from running to stopping: close the transport.
  185. if wasRunning {
  186. self.transport.close()
  187. return
  188. }
  189. // The expected state wasn't 'running'. There are two options:
  190. // 1. The client isn't running yet.
  191. // 2. The client is already stopping or stopped.
  192. switch actualState {
  193. case .notStarted:
  194. // Not started: try going straight to stopped.
  195. let (wasNotStarted, _) = self.state.compareExchange(
  196. expected: .notStarted,
  197. desired: .stopped,
  198. ordering: .sequentiallyConsistent
  199. )
  200. // If the exchange happened then just return: the client wasn't started so there's no
  201. // transport to start.
  202. //
  203. // If the exchange didn't happen then continue looping: the client must've been started by
  204. // another thread.
  205. if wasNotStarted {
  206. return
  207. } else {
  208. continue
  209. }
  210. case .running:
  211. // Unreachable: the value was exchanged and this was the expected value.
  212. fatalError()
  213. case .stopping, .stopped:
  214. // No exchange happened but the client is already stopping.
  215. return
  216. }
  217. }
  218. }
  219. /// Executes a unary RPC.
  220. ///
  221. /// - Parameters:
  222. /// - request: The unary request.
  223. /// - descriptor: The method descriptor for which to execute this request.
  224. /// - serializer: A request serializer.
  225. /// - deserializer: A response deserializer.
  226. /// - handler: A unary response handler.
  227. ///
  228. /// - Returns: The return value from the `handler`.
  229. public func unary<Request, Response, ReturnValue>(
  230. request: ClientRequest.Single<Request>,
  231. descriptor: MethodDescriptor,
  232. serializer: some MessageSerializer<Request>,
  233. deserializer: some MessageDeserializer<Response>,
  234. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  235. ) async throws -> ReturnValue {
  236. try await bidirectionalStreaming(
  237. request: ClientRequest.Stream(single: request),
  238. descriptor: descriptor,
  239. serializer: serializer,
  240. deserializer: deserializer
  241. ) { stream in
  242. let singleResponse = await ClientResponse.Single(stream: stream)
  243. return try await handler(singleResponse)
  244. }
  245. }
  246. /// Start a client-streaming RPC.
  247. ///
  248. /// - Parameters:
  249. /// - request: The request stream.
  250. /// - descriptor: The method descriptor for which to execute this request.
  251. /// - serializer: A request serializer.
  252. /// - deserializer: A response deserializer.
  253. /// - handler: A unary response handler.
  254. ///
  255. /// - Returns: The return value from the `handler`.
  256. public func clientStreaming<Request, Response, ReturnValue>(
  257. request: ClientRequest.Stream<Request>,
  258. descriptor: MethodDescriptor,
  259. serializer: some MessageSerializer<Request>,
  260. deserializer: some MessageDeserializer<Response>,
  261. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  262. ) async throws -> ReturnValue {
  263. try await bidirectionalStreaming(
  264. request: request,
  265. descriptor: descriptor,
  266. serializer: serializer,
  267. deserializer: deserializer
  268. ) { stream in
  269. let singleResponse = await ClientResponse.Single(stream: stream)
  270. return try await handler(singleResponse)
  271. }
  272. }
  273. /// Start a server-streaming RPC.
  274. ///
  275. /// - Parameters:
  276. /// - request: The unary request.
  277. /// - descriptor: The method descriptor for which to execute this request.
  278. /// - serializer: A request serializer.
  279. /// - deserializer: A response deserializer.
  280. /// - handler: A response stream handler.
  281. ///
  282. /// - Returns: The return value from the `handler`.
  283. public func serverStreaming<Request, Response, ReturnValue>(
  284. request: ClientRequest.Single<Request>,
  285. descriptor: MethodDescriptor,
  286. serializer: some MessageSerializer<Request>,
  287. deserializer: some MessageDeserializer<Response>,
  288. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  289. ) async throws -> ReturnValue {
  290. try await bidirectionalStreaming(
  291. request: ClientRequest.Stream(single: request),
  292. descriptor: descriptor,
  293. serializer: serializer,
  294. deserializer: deserializer,
  295. handler: handler
  296. )
  297. }
  298. /// Start a bidirectional streaming RPC.
  299. ///
  300. /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't
  301. /// have been called.
  302. ///
  303. /// - Parameters:
  304. /// - request: The streaming request.
  305. /// - descriptor: The method descriptor for which to execute this request.
  306. /// - serializer: A request serializer.
  307. /// - deserializer: A response deserializer.
  308. /// - handler: A response stream handler.
  309. ///
  310. /// - Returns: The return value from the `handler`.
  311. public func bidirectionalStreaming<Request, Response, ReturnValue>(
  312. request: ClientRequest.Stream<Request>,
  313. descriptor: MethodDescriptor,
  314. serializer: some MessageSerializer<Request>,
  315. deserializer: some MessageDeserializer<Response>,
  316. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  317. ) async throws -> ReturnValue {
  318. switch self.state.load(ordering: .sequentiallyConsistent) {
  319. case .running:
  320. ()
  321. case .notStarted:
  322. throw ClientError(
  323. code: .clientIsNotRunning,
  324. message: "Client must be running to make an RPC: call run() first."
  325. )
  326. case .stopping, .stopped:
  327. throw ClientError(
  328. code: .clientIsStopped,
  329. message: "Client has been stopped. Can't make any more RPCs."
  330. )
  331. }
  332. return try await ClientRPCExecutor.execute(
  333. request: request,
  334. method: descriptor,
  335. configuration: self.resolveMethodConfiguration(for: descriptor),
  336. serializer: serializer,
  337. deserializer: deserializer,
  338. transport: self.transport,
  339. interceptors: self.configuration.interceptors.values,
  340. handler: handler
  341. )
  342. }
  343. private func resolveMethodConfiguration(for descriptor: MethodDescriptor) -> MethodConfiguration {
  344. if let configuration = self.configuration.method.overrides[descriptor] {
  345. return configuration
  346. }
  347. if let configuration = self.transport.executionConfiguration(forMethod: descriptor) {
  348. return configuration
  349. }
  350. if let configuration = self.configuration.method.defaults[descriptor] {
  351. return configuration
  352. }
  353. // No configuration found, return the "vanilla" configuration.
  354. return MethodConfiguration(executionPolicy: nil, timeout: nil)
  355. }
  356. }
  357. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  358. extension GRPCClient {
  359. public struct Configuration: Sendable {
  360. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
  361. ///
  362. /// The order in which interceptors are added reflects the order in which they are called. The
  363. /// first interceptor added will be the first interceptor to intercept each request. The last
  364. /// interceptor added will be the final interceptor to intercept each request before calling
  365. /// the appropriate handler.
  366. public var interceptors: Interceptors
  367. /// Configuration for how methods are executed.
  368. ///
  369. /// Method configuration determines how each RPC is executed by the client. Some services and
  370. /// transports provide this information to the client when the server name is resolved. However,
  371. /// you override this configuration and set default values should no override be set and the
  372. /// transport doesn't provide a value.
  373. public var method: Method
  374. /// Creates a new default configuration.
  375. public init() {
  376. self.interceptors = Interceptors()
  377. self.method = Method()
  378. }
  379. }
  380. }
  381. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  382. extension GRPCClient.Configuration {
  383. /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted
  384. /// RPCs.
  385. ///
  386. /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to
  387. /// the server will first be intercepted by the first added interceptor followed by the second, and so on.
  388. /// For responses from the server, they'll be applied in the opposite order.
  389. public struct Interceptors: Sendable {
  390. private(set) var values: [any ClientInterceptor] = []
  391. /// Add an interceptor to the client.
  392. ///
  393. /// The order in which interceptors are added reflects the order in which they are called. The
  394. /// first interceptor added will be the first interceptor to intercept each request. The last
  395. /// interceptor added will be the final interceptor to intercept each request before calling
  396. /// the appropriate handler.
  397. ///
  398. /// - Parameter interceptor: The interceptor to add.
  399. public mutating func add(_ interceptor: some ClientInterceptor) {
  400. self.values.append(interceptor)
  401. }
  402. /// Adds a sequence of interceptor to the client.
  403. ///
  404. /// The order in which interceptors are added reflects the order in which they are called. The
  405. /// first interceptor added will be the first interceptor to intercept each request. The last
  406. /// interceptor added will be the final interceptor to intercept each request before calling
  407. /// the appropriate handler.
  408. ///
  409. /// - Parameter interceptors: The interceptors to add.
  410. public mutating func add(contentsOf interceptors: some Sequence<ClientInterceptor>) {
  411. self.values.append(contentsOf: interceptors)
  412. }
  413. }
  414. /// Configuration for how methods should be executed.
  415. ///
  416. /// In most cases the client should defer to the configuration provided by the transport as this
  417. /// can be provided to the transport as part of name resolution when establishing a connection.
  418. ///
  419. /// The client first checks ``overrides`` for a configuration, followed by the transport, followed
  420. /// by ``defaults``.
  421. public struct Method: Sendable, Hashable {
  422. /// Configuration to use in precedence to that provided by the transport.
  423. public var overrides: MethodConfigurations
  424. /// Configuration to use only if there are no overrides and the transport doesn't specify
  425. /// any configuration.
  426. public var defaults: MethodConfigurations
  427. public init() {
  428. self.overrides = MethodConfigurations()
  429. self.defaults = MethodConfigurations()
  430. }
  431. }
  432. }
  433. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  434. extension GRPCClient.Configuration.Interceptors: CustomStringConvertible {
  435. public var description: String {
  436. return String(describing: self.values.map { String(describing: type(of: $0)) })
  437. }
  438. }