GRPCClient+AsyncAwaitSupport.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import SwiftProtobuf
  18. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  19. extension GRPCClient {
  20. public func makeAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  21. path: String,
  22. request: Request,
  23. callOptions: CallOptions? = nil,
  24. interceptors: [ClientInterceptor<Request, Response>] = [],
  25. responseType: Response.Type = Response.self
  26. ) -> GRPCAsyncUnaryCall<Request, Response> {
  27. return self.channel.makeAsyncUnaryCall(
  28. path: path,
  29. request: request,
  30. callOptions: callOptions ?? self.defaultCallOptions,
  31. interceptors: interceptors
  32. )
  33. }
  34. public func makeAsyncUnaryCall<Request: GRPCPayload & Sendable, Response: GRPCPayload & Sendable>(
  35. path: String,
  36. request: Request,
  37. callOptions: CallOptions? = nil,
  38. interceptors: [ClientInterceptor<Request, Response>] = [],
  39. responseType: Response.Type = Response.self
  40. ) -> GRPCAsyncUnaryCall<Request, Response> {
  41. return self.channel.makeAsyncUnaryCall(
  42. path: path,
  43. request: request,
  44. callOptions: callOptions ?? self.defaultCallOptions,
  45. interceptors: interceptors
  46. )
  47. }
  48. public func makeAsyncServerStreamingCall<
  49. Request: SwiftProtobuf.Message & Sendable,
  50. Response: SwiftProtobuf.Message & Sendable
  51. >(
  52. path: String,
  53. request: Request,
  54. callOptions: CallOptions? = nil,
  55. interceptors: [ClientInterceptor<Request, Response>] = [],
  56. responseType: Response.Type = Response.self
  57. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  58. return self.channel.makeAsyncServerStreamingCall(
  59. path: path,
  60. request: request,
  61. callOptions: callOptions ?? self.defaultCallOptions,
  62. interceptors: interceptors
  63. )
  64. }
  65. public func makeAsyncServerStreamingCall<
  66. Request: GRPCPayload & Sendable,
  67. Response: GRPCPayload & Sendable
  68. >(
  69. path: String,
  70. request: Request,
  71. callOptions: CallOptions? = nil,
  72. interceptors: [ClientInterceptor<Request, Response>] = [],
  73. responseType: Response.Type = Response.self
  74. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  75. return self.channel.makeAsyncServerStreamingCall(
  76. path: path,
  77. request: request,
  78. callOptions: callOptions ?? self.defaultCallOptions,
  79. interceptors: interceptors
  80. )
  81. }
  82. public func makeAsyncClientStreamingCall<
  83. Request: SwiftProtobuf.Message & Sendable,
  84. Response: SwiftProtobuf.Message & Sendable
  85. >(
  86. path: String,
  87. callOptions: CallOptions? = nil,
  88. interceptors: [ClientInterceptor<Request, Response>] = [],
  89. requestType: Request.Type = Request.self,
  90. responseType: Response.Type = Response.self
  91. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  92. return self.channel.makeAsyncClientStreamingCall(
  93. path: path,
  94. callOptions: callOptions ?? self.defaultCallOptions,
  95. interceptors: interceptors
  96. )
  97. }
  98. public func makeAsyncClientStreamingCall<
  99. Request: GRPCPayload & Sendable,
  100. Response: GRPCPayload & Sendable
  101. >(
  102. path: String,
  103. callOptions: CallOptions? = nil,
  104. interceptors: [ClientInterceptor<Request, Response>] = [],
  105. requestType: Request.Type = Request.self,
  106. responseType: Response.Type = Response.self
  107. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  108. return self.channel.makeAsyncClientStreamingCall(
  109. path: path,
  110. callOptions: callOptions ?? self.defaultCallOptions,
  111. interceptors: interceptors
  112. )
  113. }
  114. public func makeAsyncBidirectionalStreamingCall<
  115. Request: SwiftProtobuf.Message & Sendable,
  116. Response: SwiftProtobuf.Message & Sendable
  117. >(
  118. path: String,
  119. callOptions: CallOptions? = nil,
  120. interceptors: [ClientInterceptor<Request, Response>] = [],
  121. requestType: Request.Type = Request.self,
  122. responseType: Response.Type = Response.self
  123. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  124. return self.channel.makeAsyncBidirectionalStreamingCall(
  125. path: path,
  126. callOptions: callOptions ?? self.defaultCallOptions,
  127. interceptors: interceptors
  128. )
  129. }
  130. public func makeAsyncBidirectionalStreamingCall<
  131. Request: GRPCPayload & Sendable,
  132. Response: GRPCPayload & Sendable
  133. >(
  134. path: String,
  135. callOptions: CallOptions? = nil,
  136. interceptors: [ClientInterceptor<Request, Response>] = [],
  137. requestType: Request.Type = Request.self,
  138. responseType: Response.Type = Response.self
  139. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  140. return self.channel.makeAsyncBidirectionalStreamingCall(
  141. path: path,
  142. callOptions: callOptions ?? self.defaultCallOptions,
  143. interceptors: interceptors
  144. )
  145. }
  146. }
  147. // MARK: - "Simple, but safe" wrappers.
  148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  149. extension GRPCClient {
  150. public func performAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  151. path: String,
  152. request: Request,
  153. callOptions: CallOptions? = nil,
  154. interceptors: [ClientInterceptor<Request, Response>] = [],
  155. responseType: Response.Type = Response.self
  156. ) async throws -> Response {
  157. return try await self.channel.makeAsyncUnaryCall(
  158. path: path,
  159. request: request,
  160. callOptions: callOptions ?? self.defaultCallOptions,
  161. interceptors: interceptors
  162. ).response
  163. }
  164. public func performAsyncUnaryCall<
  165. Request: GRPCPayload & Sendable,
  166. Response: GRPCPayload & Sendable
  167. >(
  168. path: String,
  169. request: Request,
  170. callOptions: CallOptions? = nil,
  171. interceptors: [ClientInterceptor<Request, Response>] = [],
  172. responseType: Response.Type = Response.self
  173. ) async throws -> Response {
  174. return try await self.channel.makeAsyncUnaryCall(
  175. path: path,
  176. request: request,
  177. callOptions: callOptions ?? self.defaultCallOptions,
  178. interceptors: interceptors
  179. ).response
  180. }
  181. public func performAsyncServerStreamingCall<
  182. Request: SwiftProtobuf.Message & Sendable,
  183. Response: SwiftProtobuf.Message & Sendable
  184. >(
  185. path: String,
  186. request: Request,
  187. callOptions: CallOptions? = nil,
  188. interceptors: [ClientInterceptor<Request, Response>] = [],
  189. responseType: Response.Type = Response.self
  190. ) -> GRPCAsyncResponseStream<Response> {
  191. return self.channel.makeAsyncServerStreamingCall(
  192. path: path,
  193. request: request,
  194. callOptions: callOptions ?? self.defaultCallOptions,
  195. interceptors: interceptors
  196. ).responseStream
  197. }
  198. public func performAsyncServerStreamingCall<
  199. Request: GRPCPayload & Sendable,
  200. Response: GRPCPayload & Sendable
  201. >(
  202. path: String,
  203. request: Request,
  204. callOptions: CallOptions? = nil,
  205. interceptors: [ClientInterceptor<Request, Response>] = [],
  206. responseType: Response.Type = Response.self
  207. ) -> GRPCAsyncResponseStream<Response> {
  208. return self.channel.makeAsyncServerStreamingCall(
  209. path: path,
  210. request: request,
  211. callOptions: callOptions ?? self.defaultCallOptions,
  212. interceptors: interceptors
  213. ).responseStream
  214. }
  215. public func performAsyncClientStreamingCall<
  216. Request: SwiftProtobuf.Message & Sendable,
  217. Response: SwiftProtobuf.Message & Sendable,
  218. RequestStream: AsyncSequence & Sendable
  219. >(
  220. path: String,
  221. requests: RequestStream,
  222. callOptions: CallOptions? = nil,
  223. interceptors: [ClientInterceptor<Request, Response>] = [],
  224. requestType: Request.Type = Request.self,
  225. responseType: Response.Type = Response.self
  226. ) async throws -> Response where RequestStream.Element == Request {
  227. let call = self.channel.makeAsyncClientStreamingCall(
  228. path: path,
  229. callOptions: callOptions ?? self.defaultCallOptions,
  230. interceptors: interceptors
  231. )
  232. return try await self.perform(call, with: requests)
  233. }
  234. public func performAsyncClientStreamingCall<
  235. Request: GRPCPayload & Sendable,
  236. Response: GRPCPayload & Sendable,
  237. RequestStream: AsyncSequence & Sendable
  238. >(
  239. path: String,
  240. requests: RequestStream,
  241. callOptions: CallOptions? = nil,
  242. interceptors: [ClientInterceptor<Request, Response>] = [],
  243. requestType: Request.Type = Request.self,
  244. responseType: Response.Type = Response.self
  245. ) async throws -> Response where RequestStream.Element == Request {
  246. let call = self.channel.makeAsyncClientStreamingCall(
  247. path: path,
  248. callOptions: callOptions ?? self.defaultCallOptions,
  249. interceptors: interceptors
  250. )
  251. return try await self.perform(call, with: requests)
  252. }
  253. public func performAsyncClientStreamingCall<
  254. Request: SwiftProtobuf.Message & Sendable,
  255. Response: SwiftProtobuf.Message & Sendable,
  256. RequestStream: Sequence
  257. >(
  258. path: String,
  259. requests: RequestStream,
  260. callOptions: CallOptions? = nil,
  261. interceptors: [ClientInterceptor<Request, Response>] = [],
  262. requestType: Request.Type = Request.self,
  263. responseType: Response.Type = Response.self
  264. ) async throws -> Response where RequestStream.Element == Request {
  265. let call = self.channel.makeAsyncClientStreamingCall(
  266. path: path,
  267. callOptions: callOptions ?? self.defaultCallOptions,
  268. interceptors: interceptors
  269. )
  270. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  271. }
  272. public func performAsyncClientStreamingCall<
  273. Request: GRPCPayload & Sendable,
  274. Response: GRPCPayload & Sendable,
  275. RequestStream: Sequence
  276. >(
  277. path: String,
  278. requests: RequestStream,
  279. callOptions: CallOptions? = nil,
  280. interceptors: [ClientInterceptor<Request, Response>] = [],
  281. requestType: Request.Type = Request.self,
  282. responseType: Response.Type = Response.self
  283. ) async throws -> Response where RequestStream.Element == Request {
  284. let call = self.channel.makeAsyncClientStreamingCall(
  285. path: path,
  286. callOptions: callOptions ?? self.defaultCallOptions,
  287. interceptors: interceptors
  288. )
  289. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  290. }
  291. public func performAsyncBidirectionalStreamingCall<
  292. Request: SwiftProtobuf.Message & Sendable,
  293. Response: SwiftProtobuf.Message & Sendable,
  294. RequestStream: AsyncSequence
  295. >(
  296. path: String,
  297. requests: RequestStream,
  298. callOptions: CallOptions? = nil,
  299. interceptors: [ClientInterceptor<Request, Response>] = [],
  300. requestType: Request.Type = Request.self,
  301. responseType: Response.Type = Response.self
  302. ) -> GRPCAsyncResponseStream<Response>
  303. where RequestStream.Element == Request {
  304. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  305. path: path,
  306. callOptions: callOptions ?? self.defaultCallOptions,
  307. interceptors: interceptors
  308. )
  309. return self.perform(call, with: requests)
  310. }
  311. public func performAsyncBidirectionalStreamingCall<
  312. Request: GRPCPayload & Sendable,
  313. Response: GRPCPayload & Sendable,
  314. RequestStream: AsyncSequence
  315. >(
  316. path: String,
  317. requests: RequestStream,
  318. callOptions: CallOptions? = nil,
  319. interceptors: [ClientInterceptor<Request, Response>] = [],
  320. requestType: Request.Type = Request.self,
  321. responseType: Response.Type = Response.self
  322. ) -> GRPCAsyncResponseStream<Response>
  323. where RequestStream.Element == Request {
  324. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  325. path: path,
  326. callOptions: callOptions ?? self.defaultCallOptions,
  327. interceptors: interceptors
  328. )
  329. return self.perform(call, with: requests)
  330. }
  331. public func performAsyncBidirectionalStreamingCall<
  332. Request: SwiftProtobuf.Message & Sendable,
  333. Response: SwiftProtobuf.Message & Sendable,
  334. RequestStream: Sequence
  335. >(
  336. path: String,
  337. requests: RequestStream,
  338. callOptions: CallOptions? = nil,
  339. interceptors: [ClientInterceptor<Request, Response>] = [],
  340. requestType: Request.Type = Request.self,
  341. responseType: Response.Type = Response.self
  342. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  343. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  344. path: path,
  345. callOptions: callOptions ?? self.defaultCallOptions,
  346. interceptors: interceptors
  347. )
  348. return self.perform(call, with: AsyncStream(wrapping: requests))
  349. }
  350. public func performAsyncBidirectionalStreamingCall<
  351. Request: GRPCPayload & Sendable,
  352. Response: GRPCPayload & Sendable,
  353. RequestStream: Sequence
  354. >(
  355. path: String,
  356. requests: RequestStream,
  357. callOptions: CallOptions? = nil,
  358. interceptors: [ClientInterceptor<Request, Response>] = [],
  359. requestType: Request.Type = Request.self,
  360. responseType: Response.Type = Response.self
  361. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  362. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  363. path: path,
  364. callOptions: callOptions ?? self.defaultCallOptions,
  365. interceptors: interceptors
  366. )
  367. return self.perform(call, with: AsyncStream(wrapping: requests))
  368. }
  369. }
  370. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  371. extension GRPCClient {
  372. @inlinable
  373. internal func perform<
  374. Request: Sendable,
  375. Response: Sendable,
  376. RequestStream: AsyncSequence & Sendable
  377. >(
  378. _ call: GRPCAsyncClientStreamingCall<Request, Response>,
  379. with requests: RequestStream
  380. ) async throws -> Response where RequestStream.Element == Request {
  381. // We use a detached task because we use cancellation to signal early, but successful exit.
  382. let requestsTask = Task.detached {
  383. try Task.checkCancellation()
  384. for try await request in requests {
  385. try Task.checkCancellation()
  386. try await call.requestStream.send(request)
  387. }
  388. try Task.checkCancellation()
  389. try await call.requestStream.finish()
  390. try Task.checkCancellation()
  391. }
  392. return try await withTaskCancellationHandler {
  393. // Await the response, which may come before the request stream is exhausted.
  394. let response = try await call.response
  395. // If we have a response, we can stop sending requests.
  396. requestsTask.cancel()
  397. // Return the response.
  398. return response
  399. } onCancel: {
  400. requestsTask.cancel()
  401. // If this outer task is cancelled then we should also cancel the RPC.
  402. Task.detached {
  403. try await call.cancel()
  404. }
  405. }
  406. }
  407. @inlinable
  408. internal func perform<
  409. Request: Sendable,
  410. Response: Sendable,
  411. RequestStream: AsyncSequence & Sendable
  412. >(
  413. _ call: GRPCAsyncBidirectionalStreamingCall<Request, Response>,
  414. with requests: RequestStream
  415. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  416. Task {
  417. try await withTaskCancellationHandler {
  418. try Task.checkCancellation()
  419. for try await request in requests {
  420. try Task.checkCancellation()
  421. try await call.requestStream.send(request)
  422. }
  423. try Task.checkCancellation()
  424. try await call.requestStream.finish()
  425. } onCancel: {
  426. Task.detached {
  427. try await call.cancel()
  428. }
  429. }
  430. }
  431. return call.responseStream
  432. }
  433. }
  434. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  435. extension AsyncStream {
  436. /// Create an `AsyncStream` from a regular (non-async) `Sequence`.
  437. ///
  438. /// - Note: This is just here to avoid duplicating the above two `perform(_:with:)` functions
  439. /// for `Sequence`.
  440. fileprivate init<T>(wrapping sequence: T) where T: Sequence, T.Element == Element {
  441. self.init { continuation in
  442. var iterator = sequence.makeIterator()
  443. while let value = iterator.next() {
  444. continuation.yield(value)
  445. }
  446. continuation.finish()
  447. }
  448. }
  449. }
  450. #endif