| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #if compiler(>=5.6)
- import SwiftProtobuf
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension GRPCClient {
- public func makeAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncUnaryCall<Request, Response> {
- return self.channel.makeAsyncUnaryCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncUnaryCall<Request: GRPCPayload & Sendable, Response: GRPCPayload & Sendable>(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncUnaryCall<Request, Response> {
- return self.channel.makeAsyncUnaryCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncServerStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable
- >(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncServerStreamingCall<Request, Response> {
- return self.channel.makeAsyncServerStreamingCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncServerStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable
- >(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncServerStreamingCall<Request, Response> {
- return self.channel.makeAsyncServerStreamingCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncClientStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable
- >(
- path: String,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncClientStreamingCall<Request, Response> {
- return self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncClientStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable
- >(
- path: String,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncClientStreamingCall<Request, Response> {
- return self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncBidirectionalStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable
- >(
- path: String,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
- return self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- public func makeAsyncBidirectionalStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable
- >(
- path: String,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncBidirectionalStreamingCall<Request, Response> {
- return self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- }
- }
- // MARK: - "Simple, but safe" wrappers.
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension GRPCClient {
- public func performAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) async throws -> Response {
- return try await self.channel.makeAsyncUnaryCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- ).response
- }
- public func performAsyncUnaryCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable
- >(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) async throws -> Response {
- return try await self.channel.makeAsyncUnaryCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- ).response
- }
- public func performAsyncServerStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable
- >(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response> {
- return self.channel.makeAsyncServerStreamingCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- ).responseStream
- }
- public func performAsyncServerStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable
- >(
- path: String,
- request: Request,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response> {
- return self.channel.makeAsyncServerStreamingCall(
- path: path,
- request: request,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- ).responseStream
- }
- public func performAsyncClientStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable,
- RequestStream: AsyncSequence & Sendable
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) async throws -> Response where RequestStream.Element == Request {
- let call = self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return try await self.perform(call, with: requests)
- }
- public func performAsyncClientStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable,
- RequestStream: AsyncSequence & Sendable
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) async throws -> Response where RequestStream.Element == Request {
- let call = self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return try await self.perform(call, with: requests)
- }
- public func performAsyncClientStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable,
- RequestStream: Sequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) async throws -> Response where RequestStream.Element == Request {
- let call = self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return try await self.perform(call, with: AsyncStream(wrapping: requests))
- }
- public func performAsyncClientStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable,
- RequestStream: Sequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) async throws -> Response where RequestStream.Element == Request {
- let call = self.channel.makeAsyncClientStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return try await self.perform(call, with: AsyncStream(wrapping: requests))
- }
- public func performAsyncBidirectionalStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable,
- RequestStream: AsyncSequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response>
- where RequestStream.Element == Request {
- let call = self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return self.perform(call, with: requests)
- }
- public func performAsyncBidirectionalStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable,
- RequestStream: AsyncSequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response>
- where RequestStream.Element == Request {
- let call = self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return self.perform(call, with: requests)
- }
- public func performAsyncBidirectionalStreamingCall<
- Request: SwiftProtobuf.Message & Sendable,
- Response: SwiftProtobuf.Message & Sendable,
- RequestStream: Sequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
- let call = self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return self.perform(call, with: AsyncStream(wrapping: requests))
- }
- public func performAsyncBidirectionalStreamingCall<
- Request: GRPCPayload & Sendable,
- Response: GRPCPayload & Sendable,
- RequestStream: Sequence
- >(
- path: String,
- requests: RequestStream,
- callOptions: CallOptions? = nil,
- interceptors: [ClientInterceptor<Request, Response>] = [],
- requestType: Request.Type = Request.self,
- responseType: Response.Type = Response.self
- ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
- let call = self.channel.makeAsyncBidirectionalStreamingCall(
- path: path,
- callOptions: callOptions ?? self.defaultCallOptions,
- interceptors: interceptors
- )
- return self.perform(call, with: AsyncStream(wrapping: requests))
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension GRPCClient {
- @inlinable
- internal func perform<
- Request: Sendable,
- Response: Sendable,
- RequestStream: AsyncSequence & Sendable
- >(
- _ call: GRPCAsyncClientStreamingCall<Request, Response>,
- with requests: RequestStream
- ) async throws -> Response where RequestStream.Element == Request {
- // We use a detached task because we use cancellation to signal early, but successful exit.
- let requestsTask = Task.detached {
- try Task.checkCancellation()
- for try await request in requests {
- try Task.checkCancellation()
- try await call.requestStream.send(request)
- }
- try Task.checkCancellation()
- try await call.requestStream.finish()
- try Task.checkCancellation()
- }
- return try await withTaskCancellationHandler {
- // Await the response, which may come before the request stream is exhausted.
- let response = try await call.response
- // If we have a response, we can stop sending requests.
- requestsTask.cancel()
- // Return the response.
- return response
- } onCancel: {
- requestsTask.cancel()
- // If this outer task is cancelled then we should also cancel the RPC.
- Task.detached {
- try await call.cancel()
- }
- }
- }
- @inlinable
- internal func perform<
- Request: Sendable,
- Response: Sendable,
- RequestStream: AsyncSequence & Sendable
- >(
- _ call: GRPCAsyncBidirectionalStreamingCall<Request, Response>,
- with requests: RequestStream
- ) -> GRPCAsyncResponseStream<Response> where RequestStream.Element == Request {
- Task {
- try await withTaskCancellationHandler {
- try Task.checkCancellation()
- for try await request in requests {
- try Task.checkCancellation()
- try await call.requestStream.send(request)
- }
- try Task.checkCancellation()
- try await call.requestStream.finish()
- } onCancel: {
- Task.detached {
- try await call.cancel()
- }
- }
- }
- return call.responseStream
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension AsyncStream {
- /// Create an `AsyncStream` from a regular (non-async) `Sequence`.
- ///
- /// - Note: This is just here to avoid duplicating the above two `perform(_:with:)` functions
- /// for `Sequence`.
- fileprivate init<T>(wrapping sequence: T) where T: Sequence, T.Element == Element {
- self.init { continuation in
- var iterator = sequence.makeIterator()
- while let value = iterator.next() {
- continuation.yield(value)
- }
- continuation.finish()
- }
- }
- }
- #endif
|