GRPCClient+AsyncAwaitSupport.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import SwiftProtobuf
  18. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  19. extension GRPCClient {
  20. public func makeAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  21. path: String,
  22. request: Request,
  23. callOptions: CallOptions? = nil,
  24. interceptors: [ClientInterceptor<Request, Response>] = [],
  25. responseType: Response.Type = Response.self
  26. ) -> GRPCAsyncUnaryCall<Request, Response> {
  27. return self.channel.makeAsyncUnaryCall(
  28. path: path,
  29. request: request,
  30. callOptions: callOptions ?? self.defaultCallOptions,
  31. interceptors: interceptors
  32. )
  33. }
  34. public func makeAsyncUnaryCall<Request: GRPCPayload & Sendable, Response: GRPCPayload & Sendable>(
  35. path: String,
  36. request: Request,
  37. callOptions: CallOptions? = nil,
  38. interceptors: [ClientInterceptor<Request, Response>] = [],
  39. responseType: Response.Type = Response.self
  40. ) -> GRPCAsyncUnaryCall<Request, Response> {
  41. return self.channel.makeAsyncUnaryCall(
  42. path: path,
  43. request: request,
  44. callOptions: callOptions ?? self.defaultCallOptions,
  45. interceptors: interceptors
  46. )
  47. }
  48. public func makeAsyncServerStreamingCall<
  49. Request: SwiftProtobuf.Message & Sendable,
  50. Response: SwiftProtobuf.Message & Sendable
  51. >(
  52. path: String,
  53. request: Request,
  54. callOptions: CallOptions? = nil,
  55. interceptors: [ClientInterceptor<Request, Response>] = [],
  56. responseType: Response.Type = Response.self
  57. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  58. return self.channel.makeAsyncServerStreamingCall(
  59. path: path,
  60. request: request,
  61. callOptions: callOptions ?? self.defaultCallOptions,
  62. interceptors: interceptors
  63. )
  64. }
  65. public func makeAsyncServerStreamingCall<
  66. Request: GRPCPayload & Sendable,
  67. Response: GRPCPayload & Sendable
  68. >(
  69. path: String,
  70. request: Request,
  71. callOptions: CallOptions? = nil,
  72. interceptors: [ClientInterceptor<Request, Response>] = [],
  73. responseType: Response.Type = Response.self
  74. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  75. return self.channel.makeAsyncServerStreamingCall(
  76. path: path,
  77. request: request,
  78. callOptions: callOptions ?? self.defaultCallOptions,
  79. interceptors: interceptors
  80. )
  81. }
  82. public func makeAsyncClientStreamingCall<
  83. Request: SwiftProtobuf.Message & Sendable,
  84. Response: SwiftProtobuf.Message & Sendable
  85. >(
  86. path: String,
  87. callOptions: CallOptions? = nil,
  88. interceptors: [ClientInterceptor<Request, Response>] = [],
  89. requestType: Request.Type = Request.self,
  90. responseType: Response.Type = Response.self
  91. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  92. return self.channel.makeAsyncClientStreamingCall(
  93. path: path,
  94. callOptions: callOptions ?? self.defaultCallOptions,
  95. interceptors: interceptors
  96. )
  97. }
  98. public func makeAsyncClientStreamingCall<
  99. Request: GRPCPayload & Sendable,
  100. Response: GRPCPayload & Sendable
  101. >(
  102. path: String,
  103. callOptions: CallOptions? = nil,
  104. interceptors: [ClientInterceptor<Request, Response>] = [],
  105. requestType: Request.Type = Request.self,
  106. responseType: Response.Type = Response.self
  107. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  108. return self.channel.makeAsyncClientStreamingCall(
  109. path: path,
  110. callOptions: callOptions ?? self.defaultCallOptions,
  111. interceptors: interceptors
  112. )
  113. }
  114. public func makeAsyncBidirectionalStreamingCall<
  115. Request: SwiftProtobuf.Message & Sendable,
  116. Response: SwiftProtobuf.Message & Sendable
  117. >(
  118. path: String,
  119. callOptions: CallOptions? = nil,
  120. interceptors: [ClientInterceptor<Request, Response>] = [],
  121. requestType: Request.Type = Request.self,
  122. responseType: Response.Type = Response.self
  123. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  124. return self.channel.makeAsyncBidirectionalStreamingCall(
  125. path: path,
  126. callOptions: callOptions ?? self.defaultCallOptions,
  127. interceptors: interceptors
  128. )
  129. }
  130. public func makeAsyncBidirectionalStreamingCall<
  131. Request: GRPCPayload & Sendable,
  132. Response: GRPCPayload & Sendable
  133. >(
  134. path: String,
  135. callOptions: CallOptions? = nil,
  136. interceptors: [ClientInterceptor<Request, Response>] = [],
  137. requestType: Request.Type = Request.self,
  138. responseType: Response.Type = Response.self
  139. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  140. return self.channel.makeAsyncBidirectionalStreamingCall(
  141. path: path,
  142. callOptions: callOptions ?? self.defaultCallOptions,
  143. interceptors: interceptors
  144. )
  145. }
  146. }
  147. // MARK: - "Simple, but safe" wrappers.
  148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  149. extension GRPCClient {
  150. public func performAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
  151. path: String,
  152. request: Request,
  153. callOptions: CallOptions? = nil,
  154. interceptors: [ClientInterceptor<Request, Response>] = [],
  155. responseType: Response.Type = Response.self
  156. ) async throws -> Response {
  157. let call = self.channel.makeAsyncUnaryCall(
  158. path: path,
  159. request: request,
  160. callOptions: callOptions ?? self.defaultCallOptions,
  161. interceptors: interceptors
  162. )
  163. return try await withTaskCancellationHandler {
  164. try await call.response
  165. } onCancel: {
  166. call.cancel()
  167. }
  168. }
  169. public func performAsyncUnaryCall<
  170. Request: GRPCPayload & Sendable,
  171. Response: GRPCPayload & Sendable
  172. >(
  173. path: String,
  174. request: Request,
  175. callOptions: CallOptions? = nil,
  176. interceptors: [ClientInterceptor<Request, Response>] = [],
  177. responseType: Response.Type = Response.self
  178. ) async throws -> Response {
  179. let call = self.channel.makeAsyncUnaryCall(
  180. path: path,
  181. request: request,
  182. callOptions: callOptions ?? self.defaultCallOptions,
  183. interceptors: interceptors
  184. )
  185. return try await withTaskCancellationHandler {
  186. try await call.response
  187. } onCancel: {
  188. call.cancel()
  189. }
  190. }
  191. public func performAsyncServerStreamingCall<
  192. Request: SwiftProtobuf.Message & Sendable,
  193. Response: SwiftProtobuf.Message & Sendable
  194. >(
  195. path: String,
  196. request: Request,
  197. callOptions: CallOptions? = nil,
  198. interceptors: [ClientInterceptor<Request, Response>] = [],
  199. responseType: Response.Type = Response.self
  200. ) -> GRPCAsyncResponseStream<Response> {
  201. return self.channel.makeAsyncServerStreamingCall(
  202. path: path,
  203. request: request,
  204. callOptions: callOptions ?? self.defaultCallOptions,
  205. interceptors: interceptors
  206. ).responseStream
  207. }
  208. public func performAsyncServerStreamingCall<
  209. Request: GRPCPayload & Sendable,
  210. Response: GRPCPayload & Sendable
  211. >(
  212. path: String,
  213. request: Request,
  214. callOptions: CallOptions? = nil,
  215. interceptors: [ClientInterceptor<Request, Response>] = [],
  216. responseType: Response.Type = Response.self
  217. ) -> GRPCAsyncResponseStream<Response> {
  218. return self.channel.makeAsyncServerStreamingCall(
  219. path: path,
  220. request: request,
  221. callOptions: callOptions ?? self.defaultCallOptions,
  222. interceptors: interceptors
  223. ).responseStream
  224. }
  225. public func performAsyncClientStreamingCall<
  226. Request: SwiftProtobuf.Message & Sendable,
  227. Response: SwiftProtobuf.Message & Sendable,
  228. RequestStream: AsyncSequence & Sendable
  229. >(
  230. path: String,
  231. requests: RequestStream,
  232. callOptions: CallOptions? = nil,
  233. interceptors: [ClientInterceptor<Request, Response>] = [],
  234. requestType: Request.Type = Request.self,
  235. responseType: Response.Type = Response.self
  236. ) async throws -> Response where RequestStream.Element == Request {
  237. let call = self.channel.makeAsyncClientStreamingCall(
  238. path: path,
  239. callOptions: callOptions ?? self.defaultCallOptions,
  240. interceptors: interceptors
  241. )
  242. return try await self.perform(call, with: requests)
  243. }
  244. public func performAsyncClientStreamingCall<
  245. Request: GRPCPayload & Sendable,
  246. Response: GRPCPayload & Sendable,
  247. RequestStream: AsyncSequence & Sendable
  248. >(
  249. path: String,
  250. requests: RequestStream,
  251. callOptions: CallOptions? = nil,
  252. interceptors: [ClientInterceptor<Request, Response>] = [],
  253. requestType: Request.Type = Request.self,
  254. responseType: Response.Type = Response.self
  255. ) async throws -> Response where RequestStream.Element == Request {
  256. let call = self.channel.makeAsyncClientStreamingCall(
  257. path: path,
  258. callOptions: callOptions ?? self.defaultCallOptions,
  259. interceptors: interceptors
  260. )
  261. return try await self.perform(call, with: requests)
  262. }
  263. public func performAsyncClientStreamingCall<
  264. Request: SwiftProtobuf.Message & Sendable,
  265. Response: SwiftProtobuf.Message & Sendable,
  266. RequestStream: Sequence
  267. >(
  268. path: String,
  269. requests: RequestStream,
  270. callOptions: CallOptions? = nil,
  271. interceptors: [ClientInterceptor<Request, Response>] = [],
  272. requestType: Request.Type = Request.self,
  273. responseType: Response.Type = Response.self
  274. ) async throws -> Response where RequestStream.Element == Request {
  275. let call = self.channel.makeAsyncClientStreamingCall(
  276. path: path,
  277. callOptions: callOptions ?? self.defaultCallOptions,
  278. interceptors: interceptors
  279. )
  280. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  281. }
  282. public func performAsyncClientStreamingCall<
  283. Request: GRPCPayload & Sendable,
  284. Response: GRPCPayload & Sendable,
  285. RequestStream: Sequence
  286. >(
  287. path: String,
  288. requests: RequestStream,
  289. callOptions: CallOptions? = nil,
  290. interceptors: [ClientInterceptor<Request, Response>] = [],
  291. requestType: Request.Type = Request.self,
  292. responseType: Response.Type = Response.self
  293. ) async throws -> Response where RequestStream.Element == Request {
  294. let call = self.channel.makeAsyncClientStreamingCall(
  295. path: path,
  296. callOptions: callOptions ?? self.defaultCallOptions,
  297. interceptors: interceptors
  298. )
  299. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  300. }
  301. public func performAsyncBidirectionalStreamingCall<
  302. Request: SwiftProtobuf.Message & Sendable,
  303. Response: SwiftProtobuf.Message & Sendable,
  304. RequestStream: AsyncSequence & Sendable
  305. >(
  306. path: String,
  307. requests: RequestStream,
  308. callOptions: CallOptions? = nil,
  309. interceptors: [ClientInterceptor<Request, Response>] = [],
  310. requestType: Request.Type = Request.self,
  311. responseType: Response.Type = Response.self
  312. ) -> GRPCAsyncResponseStream<Response>
  313. where RequestStream.Element == Request {
  314. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  315. path: path,
  316. callOptions: callOptions ?? self.defaultCallOptions,
  317. interceptors: interceptors
  318. )
  319. return self.perform(call, with: requests)
  320. }
  321. public func performAsyncBidirectionalStreamingCall<
  322. Request: GRPCPayload & Sendable,
  323. Response: GRPCPayload & Sendable,
  324. RequestStream: AsyncSequence & Sendable
  325. >(
  326. path: String,
  327. requests: RequestStream,
  328. callOptions: CallOptions? = nil,
  329. interceptors: [ClientInterceptor<Request, Response>] = [],
  330. requestType: Request.Type = Request.self,
  331. responseType: Response.Type = Response.self
  332. ) -> GRPCAsyncResponseStream<Response>
  333. where RequestStream.Element == Request {
  334. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  335. path: path,
  336. callOptions: callOptions ?? self.defaultCallOptions,
  337. interceptors: interceptors
  338. )
  339. return self.perform(call, with: requests)
  340. }
  341. public func performAsyncBidirectionalStreamingCall<
  342. Request: SwiftProtobuf.Message & Sendable,
  343. Response: SwiftProtobuf.Message & Sendable,
  344. RequestStream: Sequence
  345. >(
  346. path: String,
  347. requests: RequestStream,
  348. callOptions: CallOptions? = nil,
  349. interceptors: [ClientInterceptor<Request, Response>] = [],
  350. requestType: Request.Type = Request.self,
  351. responseType: Response.Type = Response.self
  352. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  353. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  354. path: path,
  355. callOptions: callOptions ?? self.defaultCallOptions,
  356. interceptors: interceptors
  357. )
  358. return self.perform(call, with: AsyncStream(wrapping: requests))
  359. }
  360. public func performAsyncBidirectionalStreamingCall<
  361. Request: GRPCPayload & Sendable,
  362. Response: GRPCPayload & Sendable,
  363. RequestStream: Sequence
  364. >(
  365. path: String,
  366. requests: RequestStream,
  367. callOptions: CallOptions? = nil,
  368. interceptors: [ClientInterceptor<Request, Response>] = [],
  369. requestType: Request.Type = Request.self,
  370. responseType: Response.Type = Response.self
  371. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  372. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  373. path: path,
  374. callOptions: callOptions ?? self.defaultCallOptions,
  375. interceptors: interceptors
  376. )
  377. return self.perform(call, with: AsyncStream(wrapping: requests))
  378. }
  379. }
  380. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  381. extension GRPCClient {
  382. @inlinable
  383. internal func perform<
  384. Request: Sendable,
  385. Response: Sendable,
  386. RequestStream: AsyncSequence & Sendable
  387. >(
  388. _ call: GRPCAsyncClientStreamingCall<Request, Response>,
  389. with requests: RequestStream
  390. ) async throws -> Response where RequestStream.Element == Request {
  391. return try await withTaskCancellationHandler {
  392. Task {
  393. do {
  394. // `AsyncSequence`s are encouraged to co-operatively check for cancellation, and we will
  395. // cancel the call `onCancel` anyway, so there's no need to check here too.
  396. for try await request in requests {
  397. try await call.requestStream.send(request)
  398. }
  399. try await call.requestStream.finish()
  400. } catch {
  401. // If we throw then cancel the call. We will rely on the response throwing an appropriate
  402. // error below.
  403. call.cancel()
  404. }
  405. }
  406. return try await call.response
  407. } onCancel: {
  408. call.cancel()
  409. }
  410. }
  411. @inlinable
  412. internal func perform<
  413. Request: Sendable,
  414. Response: Sendable,
  415. RequestStream: AsyncSequence & Sendable
  416. >(
  417. _ call: GRPCAsyncBidirectionalStreamingCall<Request, Response>,
  418. with requests: RequestStream
  419. ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
  420. Task {
  421. do {
  422. try await withTaskCancellationHandler {
  423. // `AsyncSequence`s are encouraged to co-operatively check for cancellation, and we will
  424. // cancel the call `onCancel` anyway, so there's no need to check here too.
  425. for try await request in requests {
  426. try await call.requestStream.send(request)
  427. }
  428. try await call.requestStream.finish()
  429. } onCancel: {
  430. call.cancel()
  431. }
  432. } catch {
  433. call.cancel()
  434. }
  435. }
  436. return call.responseStream
  437. }
  438. }
  439. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  440. extension AsyncStream {
  441. /// Create an `AsyncStream` from a regular (non-async) `Sequence`.
  442. ///
  443. /// - Note: This is just here to avoid duplicating the above two `perform(_:with:)` functions
  444. /// for `Sequence`.
  445. fileprivate init<T>(wrapping sequence: T) where T: Sequence, T.Element == Element {
  446. self.init { continuation in
  447. var iterator = sequence.makeIterator()
  448. while let value = iterator.next() {
  449. continuation.yield(value)
  450. }
  451. continuation.finish()
  452. }
  453. }
  454. }
  455. #endif