GRPCClient+AsyncAwaitSupport.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. import SwiftProtobuf
  18. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  19. extension GRPCClient {
  20. public func makeAsyncUnaryCall<Request: Message, Response: Message>(
  21. path: String,
  22. request: Request,
  23. callOptions: CallOptions? = nil,
  24. interceptors: [ClientInterceptor<Request, Response>] = [],
  25. responseType: Response.Type = Response.self
  26. ) -> GRPCAsyncUnaryCall<Request, Response> {
  27. return self.channel.makeAsyncUnaryCall(
  28. path: path,
  29. request: request,
  30. callOptions: callOptions ?? self.defaultCallOptions,
  31. interceptors: interceptors
  32. )
  33. }
  34. public func makeAsyncUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  35. path: String,
  36. request: Request,
  37. callOptions: CallOptions? = nil,
  38. interceptors: [ClientInterceptor<Request, Response>] = [],
  39. responseType: Response.Type = Response.self
  40. ) -> GRPCAsyncUnaryCall<Request, Response> {
  41. return self.channel.makeAsyncUnaryCall(
  42. path: path,
  43. request: request,
  44. callOptions: callOptions ?? self.defaultCallOptions,
  45. interceptors: interceptors
  46. )
  47. }
  48. public func makeAsyncServerStreamingCall<
  49. Request: SwiftProtobuf.Message,
  50. Response: SwiftProtobuf.Message
  51. >(
  52. path: String,
  53. request: Request,
  54. callOptions: CallOptions? = nil,
  55. interceptors: [ClientInterceptor<Request, Response>] = [],
  56. responseType: Response.Type = Response.self
  57. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  58. return self.channel.makeAsyncServerStreamingCall(
  59. path: path,
  60. request: request,
  61. callOptions: callOptions ?? self.defaultCallOptions,
  62. interceptors: interceptors
  63. )
  64. }
  65. public func makeAsyncServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  66. path: String,
  67. request: Request,
  68. callOptions: CallOptions? = nil,
  69. interceptors: [ClientInterceptor<Request, Response>] = [],
  70. responseType: Response.Type = Response.self
  71. ) -> GRPCAsyncServerStreamingCall<Request, Response> {
  72. return self.channel.makeAsyncServerStreamingCall(
  73. path: path,
  74. request: request,
  75. callOptions: callOptions ?? self.defaultCallOptions,
  76. interceptors: interceptors
  77. )
  78. }
  79. public func makeAsyncClientStreamingCall<
  80. Request: SwiftProtobuf.Message,
  81. Response: SwiftProtobuf.Message
  82. >(
  83. path: String,
  84. callOptions: CallOptions? = nil,
  85. interceptors: [ClientInterceptor<Request, Response>] = [],
  86. requestType: Request.Type = Request.self,
  87. responseType: Response.Type = Response.self
  88. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  89. return self.channel.makeAsyncClientStreamingCall(
  90. path: path,
  91. callOptions: callOptions ?? self.defaultCallOptions,
  92. interceptors: interceptors
  93. )
  94. }
  95. public func makeAsyncClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  96. path: String,
  97. callOptions: CallOptions? = nil,
  98. interceptors: [ClientInterceptor<Request, Response>] = [],
  99. requestType: Request.Type = Request.self,
  100. responseType: Response.Type = Response.self
  101. ) -> GRPCAsyncClientStreamingCall<Request, Response> {
  102. return self.channel.makeAsyncClientStreamingCall(
  103. path: path,
  104. callOptions: callOptions ?? self.defaultCallOptions,
  105. interceptors: interceptors
  106. )
  107. }
  108. public func makeAsyncBidirectionalStreamingCall<
  109. Request: SwiftProtobuf.Message,
  110. Response: SwiftProtobuf.Message
  111. >(
  112. path: String,
  113. callOptions: CallOptions? = nil,
  114. interceptors: [ClientInterceptor<Request, Response>] = [],
  115. requestType: Request.Type = Request.self,
  116. responseType: Response.Type = Response.self
  117. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  118. return self.channel.makeAsyncBidirectionalStreamingCall(
  119. path: path,
  120. callOptions: callOptions ?? self.defaultCallOptions,
  121. interceptors: interceptors
  122. )
  123. }
  124. public func makeAsyncBidirectionalStreamingCall<
  125. Request: GRPCPayload,
  126. Response: GRPCPayload
  127. >(
  128. path: String,
  129. callOptions: CallOptions? = nil,
  130. interceptors: [ClientInterceptor<Request, Response>] = [],
  131. requestType: Request.Type = Request.self,
  132. responseType: Response.Type = Response.self
  133. ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
  134. return self.channel.makeAsyncBidirectionalStreamingCall(
  135. path: path,
  136. callOptions: callOptions ?? self.defaultCallOptions,
  137. interceptors: interceptors
  138. )
  139. }
  140. }
  141. // MARK: - "Simple, but safe" wrappers.
  142. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  143. extension GRPCClient {
  144. public func performAsyncUnaryCall<Request: Message, Response: Message>(
  145. path: String,
  146. request: Request,
  147. callOptions: CallOptions? = nil,
  148. interceptors: [ClientInterceptor<Request, Response>] = [],
  149. responseType: Response.Type = Response.self
  150. ) async throws -> Response {
  151. return try await self.channel.makeAsyncUnaryCall(
  152. path: path,
  153. request: request,
  154. callOptions: callOptions ?? self.defaultCallOptions,
  155. interceptors: interceptors
  156. ).response
  157. }
  158. public func performAsyncUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  159. path: String,
  160. request: Request,
  161. callOptions: CallOptions? = nil,
  162. interceptors: [ClientInterceptor<Request, Response>] = [],
  163. responseType: Response.Type = Response.self
  164. ) async throws -> Response {
  165. return try await self.channel.makeAsyncUnaryCall(
  166. path: path,
  167. request: request,
  168. callOptions: callOptions ?? self.defaultCallOptions,
  169. interceptors: interceptors
  170. ).response
  171. }
  172. public func performAsyncServerStreamingCall<
  173. Request: SwiftProtobuf.Message,
  174. Response: SwiftProtobuf.Message
  175. >(
  176. path: String,
  177. request: Request,
  178. callOptions: CallOptions? = nil,
  179. interceptors: [ClientInterceptor<Request, Response>] = [],
  180. responseType: Response.Type = Response.self
  181. ) -> GRPCAsyncResponseStream<Response> {
  182. return self.channel.makeAsyncServerStreamingCall(
  183. path: path,
  184. request: request,
  185. callOptions: callOptions ?? self.defaultCallOptions,
  186. interceptors: interceptors
  187. ).responses
  188. }
  189. public func performAsyncServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  190. path: String,
  191. request: Request,
  192. callOptions: CallOptions? = nil,
  193. interceptors: [ClientInterceptor<Request, Response>] = [],
  194. responseType: Response.Type = Response.self
  195. ) -> GRPCAsyncResponseStream<Response> {
  196. return self.channel.makeAsyncServerStreamingCall(
  197. path: path,
  198. request: request,
  199. callOptions: callOptions ?? self.defaultCallOptions,
  200. interceptors: interceptors
  201. ).responses
  202. }
  203. public func performAsyncClientStreamingCall<
  204. Request: SwiftProtobuf.Message,
  205. Response: SwiftProtobuf.Message,
  206. RequestStream
  207. >(
  208. path: String,
  209. requests: RequestStream,
  210. callOptions: CallOptions? = nil,
  211. interceptors: [ClientInterceptor<Request, Response>] = [],
  212. requestType: Request.Type = Request.self,
  213. responseType: Response.Type = Response.self
  214. ) async throws -> Response
  215. where RequestStream: AsyncSequence, RequestStream.Element == Request {
  216. let call = self.channel.makeAsyncClientStreamingCall(
  217. path: path,
  218. callOptions: callOptions ?? self.defaultCallOptions,
  219. interceptors: interceptors
  220. )
  221. return try await self.perform(call, with: requests)
  222. }
  223. public func performAsyncClientStreamingCall<
  224. Request: GRPCPayload,
  225. Response: GRPCPayload,
  226. RequestStream
  227. >(
  228. path: String,
  229. requests: RequestStream,
  230. callOptions: CallOptions? = nil,
  231. interceptors: [ClientInterceptor<Request, Response>] = [],
  232. requestType: Request.Type = Request.self,
  233. responseType: Response.Type = Response.self
  234. ) async throws -> Response
  235. where RequestStream: AsyncSequence, RequestStream.Element == Request {
  236. let call = self.channel.makeAsyncClientStreamingCall(
  237. path: path,
  238. callOptions: callOptions ?? self.defaultCallOptions,
  239. interceptors: interceptors
  240. )
  241. return try await self.perform(call, with: requests)
  242. }
  243. public func performAsyncClientStreamingCall<
  244. Request: SwiftProtobuf.Message,
  245. Response: SwiftProtobuf.Message,
  246. RequestStream
  247. >(
  248. path: String,
  249. requests: RequestStream,
  250. callOptions: CallOptions? = nil,
  251. interceptors: [ClientInterceptor<Request, Response>] = [],
  252. requestType: Request.Type = Request.self,
  253. responseType: Response.Type = Response.self
  254. ) async throws -> Response
  255. where RequestStream: Sequence, RequestStream.Element == Request {
  256. let call = self.channel.makeAsyncClientStreamingCall(
  257. path: path,
  258. callOptions: callOptions ?? self.defaultCallOptions,
  259. interceptors: interceptors
  260. )
  261. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  262. }
  263. public func performAsyncClientStreamingCall<
  264. Request: GRPCPayload,
  265. Response: GRPCPayload,
  266. RequestStream
  267. >(
  268. path: String,
  269. requests: RequestStream,
  270. callOptions: CallOptions? = nil,
  271. interceptors: [ClientInterceptor<Request, Response>] = [],
  272. requestType: Request.Type = Request.self,
  273. responseType: Response.Type = Response.self
  274. ) async throws -> Response
  275. where RequestStream: Sequence, RequestStream.Element == Request {
  276. let call = self.channel.makeAsyncClientStreamingCall(
  277. path: path,
  278. callOptions: callOptions ?? self.defaultCallOptions,
  279. interceptors: interceptors
  280. )
  281. return try await self.perform(call, with: AsyncStream(wrapping: requests))
  282. }
  283. public func performAsyncBidirectionalStreamingCall<
  284. Request: SwiftProtobuf.Message,
  285. Response: SwiftProtobuf.Message,
  286. RequestStream: AsyncSequence
  287. >(
  288. path: String,
  289. requests: RequestStream,
  290. callOptions: CallOptions? = nil,
  291. interceptors: [ClientInterceptor<Request, Response>] = [],
  292. requestType: Request.Type = Request.self,
  293. responseType: Response.Type = Response.self
  294. ) -> GRPCAsyncResponseStream<Response>
  295. where RequestStream.Element == Request {
  296. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  297. path: path,
  298. callOptions: callOptions ?? self.defaultCallOptions,
  299. interceptors: interceptors
  300. )
  301. return self.perform(call, with: requests)
  302. }
  303. public func performAsyncBidirectionalStreamingCall<
  304. Request: GRPCPayload,
  305. Response: GRPCPayload,
  306. RequestStream: AsyncSequence
  307. >(
  308. path: String,
  309. requests: RequestStream,
  310. callOptions: CallOptions? = nil,
  311. interceptors: [ClientInterceptor<Request, Response>] = [],
  312. requestType: Request.Type = Request.self,
  313. responseType: Response.Type = Response.self
  314. ) -> GRPCAsyncResponseStream<Response>
  315. where RequestStream.Element == Request {
  316. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  317. path: path,
  318. callOptions: callOptions ?? self.defaultCallOptions,
  319. interceptors: interceptors
  320. )
  321. return self.perform(call, with: requests)
  322. }
  323. public func performAsyncBidirectionalStreamingCall<
  324. Request: SwiftProtobuf.Message,
  325. Response: SwiftProtobuf.Message,
  326. RequestStream: Sequence
  327. >(
  328. path: String,
  329. requests: RequestStream,
  330. callOptions: CallOptions? = nil,
  331. interceptors: [ClientInterceptor<Request, Response>] = [],
  332. requestType: Request.Type = Request.self,
  333. responseType: Response.Type = Response.self
  334. ) -> GRPCAsyncResponseStream<Response>
  335. where RequestStream.Element == Request {
  336. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  337. path: path,
  338. callOptions: callOptions ?? self.defaultCallOptions,
  339. interceptors: interceptors
  340. )
  341. return self.perform(call, with: AsyncStream(wrapping: requests))
  342. }
  343. public func performAsyncBidirectionalStreamingCall<
  344. Request: GRPCPayload,
  345. Response: GRPCPayload,
  346. RequestStream: Sequence
  347. >(
  348. path: String,
  349. requests: RequestStream,
  350. callOptions: CallOptions? = nil,
  351. interceptors: [ClientInterceptor<Request, Response>] = [],
  352. requestType: Request.Type = Request.self,
  353. responseType: Response.Type = Response.self
  354. ) -> GRPCAsyncResponseStream<Response>
  355. where RequestStream.Element == Request {
  356. let call = self.channel.makeAsyncBidirectionalStreamingCall(
  357. path: path,
  358. callOptions: callOptions ?? self.defaultCallOptions,
  359. interceptors: interceptors
  360. )
  361. return self.perform(call, with: AsyncStream(wrapping: requests))
  362. }
  363. }
  364. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  365. extension GRPCClient {
  366. @inlinable
  367. internal func perform<Request, Response, RequestStream>(
  368. _ call: GRPCAsyncClientStreamingCall<Request, Response>,
  369. with requests: RequestStream
  370. ) async throws -> Response
  371. where RequestStream: AsyncSequence, RequestStream.Element == Request {
  372. // We use a detached task because we use cancellation to signal early, but successful exit.
  373. let requestsTask = Task.detached {
  374. try Task.checkCancellation()
  375. for try await request in requests {
  376. try Task.checkCancellation()
  377. try await call.requestStream.send(request)
  378. }
  379. try Task.checkCancellation()
  380. try await call.requestStream.finish()
  381. try Task.checkCancellation()
  382. }
  383. return try await withTaskCancellationHandler {
  384. // Await the response, which may come before the request stream is exhausted.
  385. let response = try await call.response
  386. // If we have a response, we can stop sending requests.
  387. requestsTask.cancel()
  388. // Return the response.
  389. return response
  390. } onCancel: {
  391. requestsTask.cancel()
  392. // If this outer task is cancelled then we should also cancel the RPC.
  393. Task.detached {
  394. try await call.cancel()
  395. }
  396. }
  397. }
  398. @inlinable
  399. internal func perform<Request, Response, RequestStream>(
  400. _ call: GRPCAsyncBidirectionalStreamingCall<Request, Response>,
  401. with requests: RequestStream
  402. )
  403. -> GRPCAsyncResponseStream<Response>
  404. where RequestStream: AsyncSequence, RequestStream.Element == Request {
  405. Task {
  406. try await withTaskCancellationHandler {
  407. try Task.checkCancellation()
  408. for try await request in requests {
  409. try Task.checkCancellation()
  410. try await call.requestStream.send(request)
  411. }
  412. try Task.checkCancellation()
  413. try await call.requestStream.finish()
  414. } onCancel: {
  415. Task.detached {
  416. try await call.cancel()
  417. }
  418. }
  419. }
  420. return call.responses
  421. }
  422. }
  423. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  424. extension AsyncStream {
  425. /// Create an `AsyncStream` from a regular (non-async) `Sequence`.
  426. ///
  427. /// - Note: This is just here to avoid duplicating the above two `perform(_:with:)` functions
  428. /// for `Sequence`.
  429. fileprivate init<T>(wrapping sequence: T) where T: Sequence, T.Element == Element {
  430. self.init { continuation in
  431. var iterator = sequence.makeIterator()
  432. while let value = iterator.next() {
  433. continuation.yield(value)
  434. }
  435. continuation.finish()
  436. }
  437. }
  438. }
  439. #endif