2
0

GRPCClient.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import Atomics
  17. /// A gRPC client.
  18. ///
  19. /// A ``GRPCClient`` communicates to a server via a ``ClientTransport``.
  20. ///
  21. /// You can start RPCs to the server by calling the corresponding method:
  22. /// - ``unary(request:descriptor:serializer:deserializer:options:handler:)``
  23. /// - ``clientStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  24. /// - ``serverStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  25. /// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:options:handler:)``
  26. ///
  27. /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
  28. ///
  29. /// You can set ``ServiceConfig``s on this client to override whatever configurations have been
  30. /// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
  31. /// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
  32. ///
  33. /// ## Creating and configuring a client
  34. ///
  35. /// The following example demonstrates how to create and configure a client.
  36. ///
  37. /// ```swift
  38. /// // Create a configuration object for the client and override the timeout for the 'Get' method on
  39. /// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport.
  40. /// var configuration = GRPCClient.Configuration()
  41. /// configuration.service.override = ServiceConfig(
  42. /// methodConfig: [
  43. /// MethodConfig(
  44. /// names: [
  45. /// MethodConfig.Name(service: "echo.Echo", method: "Get")
  46. /// ],
  47. /// timeout: .seconds(5)
  48. /// )
  49. /// ]
  50. /// )
  51. ///
  52. /// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if
  53. /// // no configuration is provided in the overrides or by the transport.
  54. /// configuration.service.defaults = ServiceConfig(
  55. /// methodConfig: [
  56. /// MethodConfig(
  57. /// names: [
  58. /// MethodConfig.Name(service: "", method: "")
  59. /// ],
  60. /// timeout: .seconds(10)
  61. /// )
  62. /// ]
  63. /// )
  64. ///
  65. /// // Finally create a transport and instantiate the client, adding an interceptor.
  66. /// let inProcessServerTransport = InProcessServerTransport()
  67. /// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
  68. ///
  69. /// let client = GRPCClient(
  70. /// transport: inProcessClientTransport,
  71. /// interceptors: [StatsRecordingClientInterceptor()],
  72. /// configuration: configuration
  73. /// )
  74. /// ```
  75. ///
  76. /// ## Starting and stopping the client
  77. ///
  78. /// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
  79. /// transport to start connecting to the server.
  80. ///
  81. /// ```swift
  82. /// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
  83. /// // a task group.
  84. /// try await withThrowingTaskGroup(of: Void.self) { group in
  85. /// group.addTask {
  86. /// try await client.run()
  87. /// }
  88. ///
  89. /// // Execute a request against the "echo.Echo" service.
  90. /// try await client.unary(
  91. /// request: ClientRequest.Single<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
  92. /// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
  93. /// serializer: IdentitySerializer(),
  94. /// deserializer: IdentityDeserializer(),
  95. /// ) { response in
  96. /// print(response.message)
  97. /// }
  98. ///
  99. /// // The RPC has completed, close the client.
  100. /// client.close()
  101. /// }
  102. /// ```
  103. ///
  104. /// The ``run()`` method won't return until the client has finished handling all requests. You can
  105. /// signal to the client that it should stop creating new request streams by calling ``close()``.
  106. /// This gives the client enough time to drain any requests already in flight. To stop the client
  107. /// more abruptly you can cancel the task running your client. If your application requires
  108. /// additional resources that need their lifecycles managed you should consider using [Swift Service
  109. /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
  110. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  111. public struct GRPCClient: Sendable {
  112. /// The transport which provides a bidirectional communication channel with the server.
  113. private let transport: any ClientTransport
  114. /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
  115. ///
  116. /// The order in which interceptors are added reflects the order in which they are called. The
  117. /// first interceptor added will be the first interceptor to intercept each request. The last
  118. /// interceptor added will be the final interceptor to intercept each request before calling
  119. /// the appropriate handler.
  120. private let interceptors: [any ClientInterceptor]
  121. /// The current state of the client.
  122. private let state: ManagedAtomic<State>
  123. /// The state of the client.
  124. private enum State: UInt8, AtomicValue {
  125. /// The client hasn't been started yet. Can transition to `running` or `stopped`.
  126. case notStarted
  127. /// The client is running and can send RPCs. Can transition to `stopping`.
  128. case running
  129. /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
  130. /// completion. May transition to `stopped`.
  131. case stopping
  132. /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
  133. /// is terminal.
  134. case stopped
  135. }
  136. /// Creates a new client with the given transport, interceptors and configuration.
  137. ///
  138. /// - Parameters:
  139. /// - transport: The transport used to establish a communication channel with a server.
  140. /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
  141. /// accepted RPC. The order in which interceptors are added reflects the order in which they
  142. /// are called. The first interceptor added will be the first interceptor to intercept each
  143. /// request. The last interceptor added will be the final interceptor to intercept each
  144. /// request before calling the appropriate handler.
  145. public init(
  146. transport: some ClientTransport,
  147. interceptors: [any ClientInterceptor] = []
  148. ) {
  149. self.transport = transport
  150. self.interceptors = interceptors
  151. self.state = ManagedAtomic(.notStarted)
  152. }
  153. /// Start the client.
  154. ///
  155. /// This returns once ``close()`` has been called and all in-flight RPCs have finished executing.
  156. /// If you need to abruptly stop all work you should cancel the task executing this method.
  157. ///
  158. /// The client, and by extension this function, can only be run once. If the client is already
  159. /// running or has already been closed then a ``RuntimeError`` is thrown.
  160. public func run() async throws {
  161. let (wasNotStarted, original) = self.state.compareExchange(
  162. expected: .notStarted,
  163. desired: .running,
  164. ordering: .sequentiallyConsistent
  165. )
  166. guard wasNotStarted else {
  167. switch original {
  168. case .notStarted:
  169. // The value wasn't exchanged so the original value can't be 'notStarted'.
  170. fatalError()
  171. case .running:
  172. throw RuntimeError(
  173. code: .clientIsAlreadyRunning,
  174. message: "The client is already running and can only be started once."
  175. )
  176. case .stopping, .stopped:
  177. throw RuntimeError(
  178. code: .clientIsStopped,
  179. message: "The client has stopped and can only be started once."
  180. )
  181. }
  182. }
  183. // When we exit this function we must have stopped.
  184. defer {
  185. self.state.store(.stopped, ordering: .sequentiallyConsistent)
  186. }
  187. do {
  188. try await self.transport.connect()
  189. } catch {
  190. throw RuntimeError(
  191. code: .transportError,
  192. message: "The transport threw an error while connected.",
  193. cause: error
  194. )
  195. }
  196. }
  197. /// Close the client.
  198. ///
  199. /// The transport will be closed: this means that it will be given enough time to wait for
  200. /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
  201. /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
  202. public func close() {
  203. while true {
  204. let (wasRunning, actualState) = self.state.compareExchange(
  205. expected: .running,
  206. desired: .stopping,
  207. ordering: .sequentiallyConsistent
  208. )
  209. // Transition from running to stopping: close the transport.
  210. if wasRunning {
  211. self.transport.close()
  212. return
  213. }
  214. // The expected state wasn't 'running'. There are two options:
  215. // 1. The client isn't running yet.
  216. // 2. The client is already stopping or stopped.
  217. switch actualState {
  218. case .notStarted:
  219. // Not started: try going straight to stopped.
  220. let (wasNotStarted, _) = self.state.compareExchange(
  221. expected: .notStarted,
  222. desired: .stopped,
  223. ordering: .sequentiallyConsistent
  224. )
  225. // If the exchange happened then just return: the client wasn't started so there's no
  226. // transport to start.
  227. //
  228. // If the exchange didn't happen then continue looping: the client must've been started by
  229. // another thread.
  230. if wasNotStarted {
  231. return
  232. } else {
  233. continue
  234. }
  235. case .running:
  236. // Unreachable: the value was exchanged and this was the expected value.
  237. fatalError()
  238. case .stopping, .stopped:
  239. // No exchange happened but the client is already stopping.
  240. return
  241. }
  242. }
  243. }
  244. /// Executes a unary RPC.
  245. ///
  246. /// - Parameters:
  247. /// - request: The unary request.
  248. /// - descriptor: The method descriptor for which to execute this request.
  249. /// - serializer: A request serializer.
  250. /// - deserializer: A response deserializer.
  251. /// - options: Call specific options.
  252. /// - handler: A unary response handler.
  253. ///
  254. /// - Returns: The return value from the `handler`.
  255. public func unary<Request, Response, ReturnValue>(
  256. request: ClientRequest.Single<Request>,
  257. descriptor: MethodDescriptor,
  258. serializer: some MessageSerializer<Request>,
  259. deserializer: some MessageDeserializer<Response>,
  260. options: CallOptions,
  261. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  262. ) async throws -> ReturnValue {
  263. try await self.bidirectionalStreaming(
  264. request: ClientRequest.Stream(single: request),
  265. descriptor: descriptor,
  266. serializer: serializer,
  267. deserializer: deserializer,
  268. options: options
  269. ) { stream in
  270. let singleResponse = await ClientResponse.Single(stream: stream)
  271. return try await handler(singleResponse)
  272. }
  273. }
  274. /// Start a client-streaming RPC.
  275. ///
  276. /// - Parameters:
  277. /// - request: The request stream.
  278. /// - descriptor: The method descriptor for which to execute this request.
  279. /// - serializer: A request serializer.
  280. /// - deserializer: A response deserializer.
  281. /// - options: Call specific options.
  282. /// - handler: A unary response handler.
  283. ///
  284. /// - Returns: The return value from the `handler`.
  285. public func clientStreaming<Request, Response, ReturnValue>(
  286. request: ClientRequest.Stream<Request>,
  287. descriptor: MethodDescriptor,
  288. serializer: some MessageSerializer<Request>,
  289. deserializer: some MessageDeserializer<Response>,
  290. options: CallOptions,
  291. handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
  292. ) async throws -> ReturnValue {
  293. try await self.bidirectionalStreaming(
  294. request: request,
  295. descriptor: descriptor,
  296. serializer: serializer,
  297. deserializer: deserializer,
  298. options: options
  299. ) { stream in
  300. let singleResponse = await ClientResponse.Single(stream: stream)
  301. return try await handler(singleResponse)
  302. }
  303. }
  304. /// Start a server-streaming RPC.
  305. ///
  306. /// - Parameters:
  307. /// - request: The unary request.
  308. /// - descriptor: The method descriptor for which to execute this request.
  309. /// - serializer: A request serializer.
  310. /// - deserializer: A response deserializer.
  311. /// - options: Call specific options.
  312. /// - handler: A response stream handler.
  313. ///
  314. /// - Returns: The return value from the `handler`.
  315. public func serverStreaming<Request, Response, ReturnValue>(
  316. request: ClientRequest.Single<Request>,
  317. descriptor: MethodDescriptor,
  318. serializer: some MessageSerializer<Request>,
  319. deserializer: some MessageDeserializer<Response>,
  320. options: CallOptions,
  321. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  322. ) async throws -> ReturnValue {
  323. try await self.bidirectionalStreaming(
  324. request: ClientRequest.Stream(single: request),
  325. descriptor: descriptor,
  326. serializer: serializer,
  327. deserializer: deserializer,
  328. options: options,
  329. handler: handler
  330. )
  331. }
  332. /// Start a bidirectional streaming RPC.
  333. ///
  334. /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't
  335. /// have been called.
  336. ///
  337. /// - Parameters:
  338. /// - request: The streaming request.
  339. /// - descriptor: The method descriptor for which to execute this request.
  340. /// - serializer: A request serializer.
  341. /// - deserializer: A response deserializer.
  342. /// - options: Call specific options.
  343. /// - handler: A response stream handler.
  344. ///
  345. /// - Returns: The return value from the `handler`.
  346. public func bidirectionalStreaming<Request, Response, ReturnValue>(
  347. request: ClientRequest.Stream<Request>,
  348. descriptor: MethodDescriptor,
  349. serializer: some MessageSerializer<Request>,
  350. deserializer: some MessageDeserializer<Response>,
  351. options: CallOptions,
  352. handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
  353. ) async throws -> ReturnValue {
  354. switch self.state.load(ordering: .sequentiallyConsistent) {
  355. case .notStarted, .running:
  356. // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
  357. // queuing the request if not yet started.
  358. ()
  359. case .stopping, .stopped:
  360. throw RuntimeError(
  361. code: .clientIsStopped,
  362. message: "Client has been stopped. Can't make any more RPCs."
  363. )
  364. }
  365. let methodConfig = self.transport.configuration(forMethod: descriptor)
  366. var options = options
  367. options.formUnion(with: methodConfig)
  368. return try await ClientRPCExecutor.execute(
  369. request: request,
  370. method: descriptor,
  371. options: options,
  372. serializer: serializer,
  373. deserializer: deserializer,
  374. transport: self.transport,
  375. interceptors: self.interceptors,
  376. handler: handler
  377. )
  378. }
  379. }