2
0

Call.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import protocol SwiftProtobuf.Message
  21. /// An object representing a single RPC from the perspective of a client. It allows the caller to
  22. /// send request parts, request a cancellation, and receive response parts in a provided callback.
  23. ///
  24. /// The call object sits atop an interceptor pipeline (see `ClientInterceptor`) which allows for
  25. /// request and response streams to be arbitrarily transformed or observed. Requests sent via this
  26. /// call will traverse the pipeline before reaching the network, and responses received will
  27. /// traverse the pipeline having been received from the network.
  28. ///
  29. /// This object is a lower-level API than the equivalent wrapped calls (such as `UnaryCall` and
  30. /// `BidirectionalStreamingCall`). The caller is therefore required to do more in order to use this
  31. /// object correctly. Callers must call `invoke(_:)` to start the call and ensure that the correct
  32. /// number of request parts are sent in the correct order (exactly one `metadata`, followed
  33. /// by at most one `message` for unary and server streaming calls, and any number of `message` parts
  34. /// for client streaming and bidirectional streaming calls. All call types must terminate their
  35. /// request stream by sending one `end` message.
  36. ///
  37. /// Callers are not able to create `Call` objects directly, rather they must be created via an
  38. /// object conforming to `GRPCChannel` such as `ClientConnection`.
  39. public final class Call<Request, Response> {
  40. @usableFromInline
  41. internal enum State {
  42. /// Idle, waiting to be invoked.
  43. case idle(ClientTransportFactory<Request, Response>)
  44. /// Invoked, we have a transport on which to send requests. The transport may be closed if the
  45. /// RPC has already completed.
  46. case invoked(ClientTransport<Request, Response>)
  47. }
  48. /// The current state of the call.
  49. @usableFromInline
  50. internal var _state: State
  51. /// User provided interceptors for the call.
  52. @usableFromInline
  53. internal let _interceptors: [ClientInterceptor<Request, Response>]
  54. /// Whether compression is enabled on the call.
  55. private var isCompressionEnabled: Bool {
  56. return self.options.messageEncoding.enabledForRequests
  57. }
  58. /// The `EventLoop` the call is being invoked on.
  59. public let eventLoop: EventLoop
  60. /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
  61. public let path: String
  62. /// The type of the RPC, e.g. unary, bidirectional streaming.
  63. public let type: GRPCCallType
  64. /// Options used to invoke the call.
  65. public let options: CallOptions
  66. /// A promise for the underlying `Channel`. We only allocate this if the user asks for
  67. /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate.
  68. private var channelPromise: EventLoopPromise<Channel>?
  69. /// Returns a future for the underlying `Channel`.
  70. internal var channel: EventLoopFuture<Channel> {
  71. if self.eventLoop.inEventLoop {
  72. return self._channel()
  73. } else {
  74. return self.eventLoop.flatSubmit {
  75. return self._channel()
  76. }
  77. }
  78. }
  79. // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
  80. @inlinable
  81. internal init(
  82. path: String,
  83. type: GRPCCallType,
  84. eventLoop: EventLoop,
  85. options: CallOptions,
  86. interceptors: [ClientInterceptor<Request, Response>],
  87. transportFactory: ClientTransportFactory<Request, Response>
  88. ) {
  89. self.path = path
  90. self.type = type
  91. self.options = options
  92. self._state = .idle(transportFactory)
  93. self.eventLoop = eventLoop
  94. self._interceptors = interceptors
  95. }
  96. /// Starts the call and provides a callback which is invoked on every response part received from
  97. /// the server.
  98. ///
  99. /// This must be called prior to `send(_:promise:)` or `cancel(promise:)`.
  100. ///
  101. /// - Parameters:
  102. /// - onError: A callback invoked when an error is received.
  103. /// - onResponsePart: A callback which is invoked on every response part.
  104. /// - Important: This function should only be called once. Subsequent calls will be ignored.
  105. @inlinable
  106. public func invoke(
  107. onError: @escaping (Error) -> Void,
  108. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  109. ) {
  110. self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
  111. if self.eventLoop.inEventLoop {
  112. self._invoke(onError: onError, onResponsePart: onResponsePart)
  113. } else {
  114. self.eventLoop.execute {
  115. self._invoke(onError: onError, onResponsePart: onResponsePart)
  116. }
  117. }
  118. }
  119. /// Send a request part on the RPC.
  120. /// - Parameters:
  121. /// - part: The request part to send.
  122. /// - promise: A promise which will be completed when the request part has been handled.
  123. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  124. @inlinable
  125. public func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  126. if self.eventLoop.inEventLoop {
  127. self._send(part, promise: promise)
  128. } else {
  129. self.eventLoop.execute {
  130. self._send(part, promise: promise)
  131. }
  132. }
  133. }
  134. /// Attempt to cancel the RPC.
  135. /// - Parameter promise: A promise which will be completed once the cancellation request has been
  136. /// dealt with.
  137. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  138. public func cancel(promise: EventLoopPromise<Void>?) {
  139. if self.eventLoop.inEventLoop {
  140. self._cancel(promise: promise)
  141. } else {
  142. self.eventLoop.execute {
  143. self._cancel(promise: promise)
  144. }
  145. }
  146. }
  147. }
  148. extension Call {
  149. /// Send a request part on the RPC.
  150. /// - Parameter part: The request part to send.
  151. /// - Returns: A future which will be resolved when the request has been handled.
  152. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  153. @inlinable
  154. public func send(_ part: GRPCClientRequestPart<Request>) -> EventLoopFuture<Void> {
  155. let promise = self.eventLoop.makePromise(of: Void.self)
  156. self.send(part, promise: promise)
  157. return promise.futureResult
  158. }
  159. /// Attempt to cancel the RPC.
  160. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  161. /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
  162. public func cancel() -> EventLoopFuture<Void> {
  163. let promise = self.eventLoop.makePromise(of: Void.self)
  164. self.cancel(promise: promise)
  165. return promise.futureResult
  166. }
  167. }
  168. extension Call {
  169. internal func compress(_ compression: Compression) -> Bool {
  170. return compression.isEnabled(callDefault: self.isCompressionEnabled)
  171. }
  172. internal func sendMessages<Messages>(
  173. _ messages: Messages,
  174. compression: Compression,
  175. promise: EventLoopPromise<Void>?
  176. ) where Messages: Sequence, Messages.Element == Request {
  177. if self.eventLoop.inEventLoop {
  178. if let promise = promise {
  179. self._sendMessages(messages, compression: compression, promise: promise)
  180. } else {
  181. self._sendMessages(messages, compression: compression)
  182. }
  183. } else {
  184. self.eventLoop.execute {
  185. if let promise = promise {
  186. self._sendMessages(messages, compression: compression, promise: promise)
  187. } else {
  188. self._sendMessages(messages, compression: compression)
  189. }
  190. }
  191. }
  192. }
  193. // Provide a few convenience methods we need from the wrapped call objects.
  194. private func _sendMessages<Messages>(
  195. _ messages: Messages,
  196. compression: Compression
  197. ) where Messages: Sequence, Messages.Element == Request {
  198. self.eventLoop.assertInEventLoop()
  199. let compress = self.compress(compression)
  200. var iterator = messages.makeIterator()
  201. var maybeNext = iterator.next()
  202. while let current = maybeNext {
  203. let next = iterator.next()
  204. // If there's no next message, then we'll flush.
  205. let flush = next == nil
  206. self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil)
  207. maybeNext = next
  208. }
  209. }
  210. private func _sendMessages<Messages>(
  211. _ messages: Messages,
  212. compression: Compression,
  213. promise: EventLoopPromise<Void>
  214. ) where Messages: Sequence, Messages.Element == Request {
  215. self.eventLoop.assertInEventLoop()
  216. let compress = self.compress(compression)
  217. var iterator = messages.makeIterator()
  218. var maybeNext = iterator.next()
  219. while let current = maybeNext {
  220. let next = iterator.next()
  221. let isLast = next == nil
  222. // We're already on the event loop, use the `_` send.
  223. if isLast {
  224. // Only flush and attach the promise to the last message.
  225. self._send(.message(current, .init(compress: compress, flush: true)), promise: promise)
  226. } else {
  227. self._send(.message(current, .init(compress: compress, flush: false)), promise: nil)
  228. }
  229. maybeNext = next
  230. }
  231. }
  232. }
  233. extension Call {
  234. /// Invoke the RPC with this response part handler.
  235. /// - Important: This *must* to be called from the `eventLoop`.
  236. @usableFromInline
  237. internal func _invoke(
  238. onError: @escaping (Error) -> Void,
  239. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  240. ) {
  241. self.eventLoop.assertInEventLoop()
  242. switch self._state {
  243. case let .idle(factory):
  244. let transport = factory.makeConfiguredTransport(
  245. to: self.path,
  246. for: self.type,
  247. withOptions: self.options,
  248. onEventLoop: self.eventLoop,
  249. interceptedBy: self._interceptors,
  250. onError: onError,
  251. onResponsePart: onResponsePart
  252. )
  253. self._state = .invoked(transport)
  254. case .invoked:
  255. // We can't be invoked twice. Just ignore this.
  256. ()
  257. }
  258. }
  259. /// Send a request part on the transport.
  260. /// - Important: This *must* to be called from the `eventLoop`.
  261. @inlinable
  262. internal func _send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  263. self.eventLoop.assertInEventLoop()
  264. switch self._state {
  265. case .idle:
  266. promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
  267. case let .invoked(transport):
  268. transport.send(part, promise: promise)
  269. }
  270. }
  271. /// Attempt to cancel the call.
  272. /// - Important: This *must* to be called from the `eventLoop`.
  273. private func _cancel(promise: EventLoopPromise<Void>?) {
  274. self.eventLoop.assertInEventLoop()
  275. switch self._state {
  276. case .idle:
  277. // This is weird: does it make sense to cancel before invoking it?
  278. let error = GRPCError.InvalidState("Call must be invoked before cancelling it")
  279. promise?.fail(error)
  280. self.channelPromise?.fail(error)
  281. case let .invoked(transport):
  282. transport.cancel(promise: promise)
  283. }
  284. }
  285. /// Get the underlying `Channel` for this call.
  286. /// - Important: This *must* to be called from the `eventLoop`.
  287. private func _channel() -> EventLoopFuture<Channel> {
  288. self.eventLoop.assertInEventLoop()
  289. switch (self.channelPromise, self._state) {
  290. case let (.some(promise), .idle),
  291. let (.some(promise), .invoked):
  292. // We already have a promise, just use that.
  293. return promise.futureResult
  294. case (.none, .idle):
  295. // We need to allocate a promise and ask the transport for the channel later.
  296. let promise = self.eventLoop.makePromise(of: Channel.self)
  297. self.channelPromise = promise
  298. return promise.futureResult
  299. case let (.none, .invoked(transport)):
  300. // Just ask the transport.
  301. return transport.getChannel()
  302. }
  303. }
  304. }
  305. extension Call {
  306. // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
  307. /// Invokes the call and sends a single request. Sends the metadata, request and closes the
  308. /// request stream.
  309. /// - Parameters:
  310. /// - request: The request to send.
  311. /// - onError: A callback invoked when an error is received.
  312. /// - onResponsePart: A callback invoked for each response part received.
  313. @inlinable
  314. internal func invokeUnaryRequest(
  315. _ request: Request,
  316. onError: @escaping (Error) -> Void,
  317. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  318. ) {
  319. if self.eventLoop.inEventLoop {
  320. self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
  321. } else {
  322. self.eventLoop.execute {
  323. self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
  324. }
  325. }
  326. }
  327. /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
  328. /// additional messages and end the stream by calling `send(_:promise:)`.
  329. /// - Parameters:
  330. /// - onError: A callback invoked when an error is received.
  331. /// - onResponsePart: A callback invoked for each response part received.
  332. @inlinable
  333. internal func invokeStreamingRequests(
  334. onError: @escaping (Error) -> Void,
  335. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  336. ) {
  337. if self.eventLoop.inEventLoop {
  338. self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
  339. } else {
  340. self.eventLoop.execute {
  341. self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
  342. }
  343. }
  344. }
  345. /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
  346. @usableFromInline
  347. internal func _invokeUnaryRequest(
  348. request: Request,
  349. onError: @escaping (Error) -> Void,
  350. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  351. ) {
  352. self.eventLoop.assertInEventLoop()
  353. assert(self.type == .unary || self.type == .serverStreaming)
  354. self._invoke(onError: onError, onResponsePart: onResponsePart)
  355. self._send(.metadata(self.options.customMetadata), promise: nil)
  356. self._send(
  357. .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
  358. promise: nil
  359. )
  360. self._send(.end, promise: nil)
  361. }
  362. /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
  363. @usableFromInline
  364. internal func _invokeStreamingRequests(
  365. onError: @escaping (Error) -> Void,
  366. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  367. ) {
  368. self.eventLoop.assertInEventLoop()
  369. assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
  370. self._invoke(onError: onError, onResponsePart: onResponsePart)
  371. self._send(.metadata(self.options.customMetadata), promise: nil)
  372. }
  373. }
  374. #if compiler(>=5.6)
  375. // @unchecked is ok: all mutable state is accessed/modified from the appropriate event loop.
  376. extension Call: @unchecked Sendable where Request: Sendable, Response: Sendable {}
  377. #endif