GRPCClient+AsyncAwaitSupport.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import SwiftProtobuf
  17. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  18. extension GRPCClient {
  19. public func makeAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  20. path: String,
  21. request: Request,
  22. callOptions: CallOptions? = nil,
  23. interceptors: [ClientInterceptor<Request, Response>] = [],
  24. responseType: Response.Type = Response.self
  25. ) -> GRPCAsyncUnaryCall<Request, Response> {
  26. return self.channel.makeAsyncUnaryCall(
  27. path: path,
  28. request: request,
  29. callOptions: callOptions ?? self.defaultCallOptions,
  30. interceptors: interceptors
  31. )
  32. }
  33. public func makeAsyncUnaryCall<Request: GRPCPayload & Sendable, Response: GRPCPayload & Sendable>(
  34. path: String,
  35. request: Request,
  36. callOptions: CallOptions? = nil,
  37. interceptors: [ClientInterceptor<Request, Response>] = [],
  38. responseType: Response.Type = Response.self
  39. ) -> GRPCAsyncUnaryCall<Request, Response> {
  40. return self.channel.makeAsyncUnaryCall(
  41. path: path,
  42. request: request,
  43. callOptions: callOptions ?? self.defaultCallOptions,
  44. interceptors: interceptors
  45. )
  46. }
  47. public func makeAsyncServerStreamingCall<
  48. Request: SwiftProtobuf.Message & Sendable,
  49. Response: SwiftProtobuf.Message & Sendable
  50. >(
  51. path: String,
  52. request: Request,
  53. callOptions: CallOptions? = nil,
  54. interceptors: [ClientInterceptor<Request, Response>] = [],
  55. responseType: Response.Type = Response.self
  56. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  57. return self.channel.makeAsyncServerStreamingCall(
  58. path: path,
  59. request: request,
  60. callOptions: callOptions ?? self.defaultCallOptions,
  61. interceptors: interceptors
  62. )
  63. }
  64. public func makeAsyncServerStreamingCall<
  65. Request: GRPCPayload & Sendable,
  66. Response: GRPCPayload & Sendable
  67. >(
  68. path: String,
  69. request: Request,
  70. callOptions: CallOptions? = nil,
  71. interceptors: [ClientInterceptor<Request, Response>] = [],
  72. responseType: Response.Type = Response.self
  73. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  74. return self.channel.makeAsyncServerStreamingCall(
  75. path: path,
  76. request: request,
  77. callOptions: callOptions ?? self.defaultCallOptions,
  78. interceptors: interceptors
  79. )
  80. }
  81. public func makeAsyncClientStreamingCall<
  82. Request: SwiftProtobuf.Message & Sendable,
  83. Response: SwiftProtobuf.Message & Sendable
  84. >(
  85. path: String,
  86. callOptions: CallOptions? = nil,
  87. interceptors: [ClientInterceptor<Request, Response>] = [],
  88. requestType: Request.Type = Request.self,
  89. responseType: Response.Type = Response.self
  90. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  91. return self.channel.makeAsyncClientStreamingCall(
  92. path: path,
  93. callOptions: callOptions ?? self.defaultCallOptions,
  94. interceptors: interceptors
  95. )
  96. }
  97. public func makeAsyncClientStreamingCall<
  98. Request: GRPCPayload & Sendable,
  99. Response: GRPCPayload & Sendable
  100. >(
  101. path: String,
  102. callOptions: CallOptions? = nil,
  103. interceptors: [ClientInterceptor<Request, Response>] = [],
  104. requestType: Request.Type = Request.self,
  105. responseType: Response.Type = Response.self
  106. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  107. return self.channel.makeAsyncClientStreamingCall(
  108. path: path,
  109. callOptions: callOptions ?? self.defaultCallOptions,
  110. interceptors: interceptors
  111. )
  112. }
  113. public func makeAsyncBidirectionalStreamingCall<
  114. Request: SwiftProtobuf.Message & Sendable,
  115. Response: SwiftProtobuf.Message & Sendable
  116. >(
  117. path: String,
  118. callOptions: CallOptions? = nil,
  119. interceptors: [ClientInterceptor<Request, Response>] = [],
  120. requestType: Request.Type = Request.self,
  121. responseType: Response.Type = Response.self
  122. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  123. return self.channel.makeAsyncBidirectionalStreamingCall(
  124. path: path,
  125. callOptions: callOptions ?? self.defaultCallOptions,
  126. interceptors: interceptors
  127. )
  128. }
  129. public func makeAsyncBidirectionalStreamingCall<
  130. Request: GRPCPayload & Sendable,
  131. Response: GRPCPayload & Sendable
  132. >(
  133. path: String,
  134. callOptions: CallOptions? = nil,
  135. interceptors: [ClientInterceptor<Request, Response>] = [],
  136. requestType: Request.Type = Request.self,
  137. responseType: Response.Type = Response.self
  138. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  139. return self.channel.makeAsyncBidirectionalStreamingCall(
  140. path: path,
  141. callOptions: callOptions ?? self.defaultCallOptions,
  142. interceptors: interceptors
  143. )
  144. }
  145. }
  146. // MARK: - "Simple, but safe" wrappers.
  147. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  148. extension GRPCClient {
  149. public func performAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  150. path: String,
  151. request: Request,
  152. callOptions: CallOptions? = nil,
  153. interceptors: [ClientInterceptor<Request, Response>] = [],
  154. responseType: Response.Type = Response.self
  155. ) async throws -> Response {
  156. let call = self.channel.makeAsyncUnaryCall(
  157. path: path,
  158. request: request,
  159. callOptions: callOptions ?? self.defaultCallOptions,
  160. interceptors: interceptors
  161. )
  162. return try await withTaskCancellationHandler {
  163. try await call.response
  164. } onCancel: {
  165. call.cancel()
  166. }
  167. }
  168. public func performAsyncUnaryCall<
  169. Request: GRPCPayload & Sendable,
  170. Response: GRPCPayload & Sendable
  171. >(
  172. path: String,
  173. request: Request,
  174. callOptions: CallOptions? = nil,
  175. interceptors: [ClientInterceptor<Request, Response>] = [],
  176. responseType: Response.Type = Response.self
  177. ) async throws -> Response {
  178. let call = self.channel.makeAsyncUnaryCall(
  179. path: path,
  180. request: request,
  181. callOptions: callOptions ?? self.defaultCallOptions,
  182. interceptors: interceptors
  183. )
  184. return try await withTaskCancellationHandler {
  185. try await call.response
  186. } onCancel: {
  187. call.cancel()
  188. }
  189. }
  190. public func performAsyncServerStreamingCall<
  191. Request: SwiftProtobuf.Message & Sendable,
  192. Response: SwiftProtobuf.Message & Sendable
  193. >(
  194. path: String,
  195. request: Request,
  196. callOptions: CallOptions? = nil,
  197. interceptors: [ClientInterceptor<Request, Response>] = [],
  198. responseType: Response.Type = Response.self
  199. ) -> GRPCAsyncResponseStream<Response> {
  200. return self.channel.makeAsyncServerStreamingCall(
  201. path: path,
  202. request: request,
  203. callOptions: callOptions ?? self.defaultCallOptions,
  204. interceptors: interceptors
  205. ).responseStream
  206. }
  207. public func performAsyncServerStreamingCall<
  208. Request: GRPCPayload & Sendable,
  209. Response: GRPCPayload & Sendable
  210. >(
  211. path: String,
  212. request: Request,
  213. callOptions: CallOptions? = nil,
  214. interceptors: [ClientInterceptor<Request, Response>] = [],
  215. responseType: Response.Type = Response.self
  216. ) -> GRPCAsyncResponseStream<Response> {
  217. return self.channel.makeAsyncServerStreamingCall(
  218. path: path,
  219. request: request,
  220. callOptions: callOptions ?? self.defaultCallOptions,
  221. interceptors: interceptors
  222. ).responseStream
  223. }
  224. public func performAsyncClientStreamingCall<
  225. Request: SwiftProtobuf.Message & Sendable,
  226. Response: SwiftProtobuf.Message & Sendable,
  227. RequestStream: AsyncSequence & Sendable
  228. >(
  229. path: String,
  230. requests: RequestStream,
  231. callOptions: CallOptions? = nil,
  232. interceptors: [ClientInterceptor<Request, Response>] = [],
  233. requestType: Request.Type = Request.self,
  234. responseType: Response.Type = Response.self
  235. ) async throws -> Response where RequestStream.Element == Request {
  236. let call = self.channel.makeAsyncClientStreamingCall(
  237. path: path,
  238. callOptions: callOptions ?? self.defaultCallOptions,
  239. interceptors: interceptors
  240. )
  241. return try await self.perform(call, with: requests)
  242. }
  243. public func performAsyncClientStreamingCall<
  244. Request: GRPCPayload & Sendable,
  245. Response: GRPCPayload & Sendable,
  246. RequestStream: AsyncSequence & Sendable
  247. >(
  248. path: String,
  249. requests: RequestStream,
  250. callOptions: CallOptions? = nil,
  251. interceptors: [ClientInterceptor<Request, Response>] = [],
  252. requestType: Request.Type = Request.self,
  253. responseType: Response.Type = Response.self
  254. ) async throws -> Response where RequestStream.Element == Request {
  255. let call = self.channel.makeAsyncClientStreamingCall(
  256. path: path,
  257. callOptions: callOptions ?? self.defaultCallOptions,
  258. interceptors: interceptors
  259. )
  260. return try await self.perform(call, with: requests)
  261. }
  262. public func performAsyncClientStreamingCall<
  263. Request: SwiftProtobuf.Message & Sendable,
  264. Response: SwiftProtobuf.Message & Sendable,
  265. RequestStream: Sequence
  266. >(
  267. path: String,
  268. requests: RequestStream,
  269. callOptions: CallOptions? = nil,
  270. interceptors: [ClientInterceptor<Request, Response>] = [],
  271. requestType: Request.Type = Request.self,
  272. responseType: Response.Type = Response.self
  273. ) async throws -> Response where RequestStream.Element == Request {
  274. let call = self.channel.makeAsyncClientStreamingCall(
  275. path: path,
  276. callOptions: callOptions ?? self.defaultCallOptions,
  277. interceptors: interceptors
  278. )
  279. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  280. }
  281. public func performAsyncClientStreamingCall<
  282. Request: GRPCPayload & Sendable,
  283. Response: GRPCPayload & Sendable,
  284. RequestStream: Sequence
  285. >(
  286. path: String,
  287. requests: RequestStream,
  288. callOptions: CallOptions? = nil,
  289. interceptors: [ClientInterceptor<Request, Response>] = [],
  290. requestType: Request.Type = Request.self,
  291. responseType: Response.Type = Response.self
  292. ) async throws -> Response where RequestStream.Element == Request {
  293. let call = self.channel.makeAsyncClientStreamingCall(
  294. path: path,
  295. callOptions: callOptions ?? self.defaultCallOptions,
  296. interceptors: interceptors
  297. )
  298. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  299. }
  300. public func performAsyncBidirectionalStreamingCall<
  301. Request: SwiftProtobuf.Message & Sendable,
  302. Response: SwiftProtobuf.Message & Sendable,
  303. RequestStream: AsyncSequence & Sendable
  304. >(
  305. path: String,
  306. requests: RequestStream,
  307. callOptions: CallOptions? = nil,
  308. interceptors: [ClientInterceptor<Request, Response>] = [],
  309. requestType: Request.Type = Request.self,
  310. responseType: Response.Type = Response.self
  311. ) -> GRPCAsyncResponseStream<Response>
  312. where RequestStream.Element == Request {
  313. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  314. path: path,
  315. callOptions: callOptions ?? self.defaultCallOptions,
  316. interceptors: interceptors
  317. )
  318. return self.perform(call, with: requests)
  319. }
  320. public func performAsyncBidirectionalStreamingCall<
  321. Request: GRPCPayload & Sendable,
  322. Response: GRPCPayload & Sendable,
  323. RequestStream: AsyncSequence & Sendable
  324. >(
  325. path: String,
  326. requests: RequestStream,
  327. callOptions: CallOptions? = nil,
  328. interceptors: [ClientInterceptor<Request, Response>] = [],
  329. requestType: Request.Type = Request.self,
  330. responseType: Response.Type = Response.self
  331. ) -> GRPCAsyncResponseStream<Response>
  332. where RequestStream.Element == Request {
  333. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  334. path: path,
  335. callOptions: callOptions ?? self.defaultCallOptions,
  336. interceptors: interceptors
  337. )
  338. return self.perform(call, with: requests)
  339. }
  340. public func performAsyncBidirectionalStreamingCall<
  341. Request: SwiftProtobuf.Message & Sendable,
  342. Response: SwiftProtobuf.Message & Sendable,
  343. RequestStream: Sequence
  344. >(
  345. path: String,
  346. requests: RequestStream,
  347. callOptions: CallOptions? = nil,
  348. interceptors: [ClientInterceptor<Request, Response>] = [],
  349. requestType: Request.Type = Request.self,
  350. responseType: Response.Type = Response.self
  351. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  352. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  353. path: path,
  354. callOptions: callOptions ?? self.defaultCallOptions,
  355. interceptors: interceptors
  356. )
  357. return self.perform(call, with: AsyncStream(wrapping: requests))
  358. }
  359. public func performAsyncBidirectionalStreamingCall<
  360. Request: GRPCPayload & Sendable,
  361. Response: GRPCPayload & Sendable,
  362. RequestStream: Sequence
  363. >(
  364. path: String,
  365. requests: RequestStream,
  366. callOptions: CallOptions? = nil,
  367. interceptors: [ClientInterceptor<Request, Response>] = [],
  368. requestType: Request.Type = Request.self,
  369. responseType: Response.Type = Response.self
  370. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  371. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  372. path: path,
  373. callOptions: callOptions ?? self.defaultCallOptions,
  374. interceptors: interceptors
  375. )
  376. return self.perform(call, with: AsyncStream(wrapping: requests))
  377. }
  378. }
  379. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  380. extension GRPCClient {
  381. @inlinable
  382. internal func perform<
  383. Request: Sendable,
  384. Response: Sendable,
  385. RequestStream: AsyncSequence & Sendable
  386. >(
  387. _ call: GRPCAsyncClientStreamingCall<Request, Response>,
  388. with requests: RequestStream
  389. ) async throws -> Response where RequestStream.Element == Request {
  390. return try await withTaskCancellationHandler {
  391. Task {
  392. do {
  393. // `AsyncSequence`s are encouraged to co-operatively check for cancellation, and we will
  394. // cancel the call `onCancel` anyway, so there's no need to check here too.
  395. for try await request in requests {
  396. try await call.requestStream.send(request)
  397. }
  398. call.requestStream.finish()
  399. } catch {
  400. // If we throw then cancel the call. We will rely on the response throwing an appropriate
  401. // error below.
  402. call.cancel()
  403. }
  404. }
  405. return try await call.response
  406. } onCancel: {
  407. call.cancel()
  408. }
  409. }
  410. @inlinable
  411. internal func perform<
  412. Request: Sendable,
  413. Response: Sendable,
  414. RequestStream: AsyncSequence & Sendable
  415. >(
  416. _ call: GRPCAsyncBidirectionalStreamingCall<Request, Response>,
  417. with requests: RequestStream
  418. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  419. Task {
  420. do {
  421. try await withTaskCancellationHandler {
  422. // `AsyncSequence`s are encouraged to co-operatively check for cancellation, and we will
  423. // cancel the call `onCancel` anyway, so there's no need to check here too.
  424. for try await request in requests {
  425. try await call.requestStream.send(request)
  426. }
  427. call.requestStream.finish()
  428. } onCancel: {
  429. call.cancel()
  430. }
  431. } catch {
  432. call.cancel()
  433. }
  434. }
  435. return call.responseStream
  436. }
  437. }
  438. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  439. extension AsyncStream {
  440. /// Create an `AsyncStream` from a regular (non-async) `Sequence`.
  441. ///
  442. /// - Note: This is just here to avoid duplicating the above two `perform(_:with:)` functions
  443. /// for `Sequence`.
  444. fileprivate init<T>(wrapping sequence: T) where T: Sequence, T.Element == Element {
  445. self.init { continuation in
  446. var iterator = sequence.makeIterator()
  447. while let value = iterator.next() {
  448. continuation.yield(value)
  449. }
  450. continuation.finish()
  451. }
  452. }
  453. }