Call.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import protocol SwiftProtobuf.Message
  21. /// An object representing a single RPC from the perspective of a client. It allows the caller to
  22. /// send request parts, request a cancellation, and receive response parts in a provided callback.
  23. ///
  24. /// The call object sits atop an interceptor pipeline (see ``ClientInterceptor``) which allows for
  25. /// request and response streams to be arbitrarily transformed or observed. Requests sent via this
  26. /// call will traverse the pipeline before reaching the network, and responses received will
  27. /// traverse the pipeline having been received from the network.
  28. ///
  29. /// This object is a lower-level API than the equivalent wrapped calls (such as ``UnaryCall`` and
  30. /// ``BidirectionalStreamingCall``). The caller is therefore required to do more in order to use this
  31. /// object correctly. Callers must call ``invoke(onError:onResponsePart:)`` to start the call and ensure that the correct
  32. /// number of request parts are sent in the correct order (exactly one `metadata`, followed
  33. /// by at most one `message` for unary and server streaming calls, and any number of `message` parts
  34. /// for client streaming and bidirectional streaming calls. All call types must terminate their
  35. /// request stream by sending one `end` message.
  36. ///
  37. /// Callers are not able to create ``Call`` objects directly, rather they must be created via an
  38. /// object conforming to ``GRPCChannel`` such as ``ClientConnection``.
  39. public final class Call<Request, Response> {
  40. @usableFromInline
  41. internal enum State {
  42. /// Idle, waiting to be invoked.
  43. case idle(ClientTransportFactory<Request, Response>)
  44. /// Invoked, we have a transport on which to send requests. The transport may be closed if the
  45. /// RPC has already completed.
  46. case invoked(ClientTransport<Request, Response>)
  47. }
  48. /// The current state of the call.
  49. @usableFromInline
  50. internal var _state: State
  51. /// User provided interceptors for the call.
  52. @usableFromInline
  53. internal let _interceptors: [ClientInterceptor<Request, Response>]
  54. /// Whether compression is enabled on the call.
  55. private var isCompressionEnabled: Bool {
  56. return self.options.messageEncoding.enabledForRequests
  57. }
  58. /// The `EventLoop` the call is being invoked on.
  59. public let eventLoop: EventLoop
  60. /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
  61. public let path: String
  62. /// The type of the RPC, e.g. unary, bidirectional streaming.
  63. public let type: GRPCCallType
  64. /// Options used to invoke the call.
  65. public let options: CallOptions
  66. /// A promise for the underlying `Channel`. We only allocate this if the user asks for
  67. /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate.
  68. private var channelPromise: EventLoopPromise<Channel>?
  69. /// Returns a future for the underlying `Channel`.
  70. internal var channel: EventLoopFuture<Channel> {
  71. if self.eventLoop.inEventLoop {
  72. return self._channel()
  73. } else {
  74. return self.eventLoop.flatSubmit {
  75. return self._channel()
  76. }
  77. }
  78. }
  79. // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
  80. @inlinable
  81. internal init(
  82. path: String,
  83. type: GRPCCallType,
  84. eventLoop: EventLoop,
  85. options: CallOptions,
  86. interceptors: [ClientInterceptor<Request, Response>],
  87. transportFactory: ClientTransportFactory<Request, Response>
  88. ) {
  89. self.path = path
  90. self.type = type
  91. self.options = options
  92. self._state = .idle(transportFactory)
  93. self.eventLoop = eventLoop
  94. self._interceptors = interceptors
  95. }
  96. /// Starts the call and provides a callback which is invoked on every response part received from
  97. /// the server.
  98. ///
  99. /// This must be called prior to ``send(_:)`` or ``cancel()``.
  100. ///
  101. /// - Parameters:
  102. /// - onError: A callback invoked when an error is received.
  103. /// - onResponsePart: A callback which is invoked on every response part.
  104. /// - Important: This function should only be called once. Subsequent calls will be ignored.
  105. @inlinable
  106. public func invoke(
  107. onError: @escaping (Error) -> Void,
  108. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  109. ) {
  110. self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
  111. if self.eventLoop.inEventLoop {
  112. self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart)
  113. } else {
  114. self.eventLoop.execute {
  115. self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart)
  116. }
  117. }
  118. }
  119. /// Send a request part on the RPC.
  120. /// - Parameters:
  121. /// - part: The request part to send.
  122. /// - promise: A promise which will be completed when the request part has been handled.
  123. /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
  124. @inlinable
  125. public func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  126. if self.eventLoop.inEventLoop {
  127. self._send(part, promise: promise)
  128. } else {
  129. self.eventLoop.execute {
  130. self._send(part, promise: promise)
  131. }
  132. }
  133. }
  134. /// Attempt to cancel the RPC.
  135. /// - Parameter promise: A promise which will be completed once the cancellation request has been
  136. /// dealt with.
  137. /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
  138. public func cancel(promise: EventLoopPromise<Void>?) {
  139. if self.eventLoop.inEventLoop {
  140. self._cancel(promise: promise)
  141. } else {
  142. self.eventLoop.execute {
  143. self._cancel(promise: promise)
  144. }
  145. }
  146. }
  147. }
  148. extension Call {
  149. /// Send a request part on the RPC.
  150. /// - Parameter part: The request part to send.
  151. /// - Returns: A future which will be resolved when the request has been handled.
  152. /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
  153. @inlinable
  154. public func send(_ part: GRPCClientRequestPart<Request>) -> EventLoopFuture<Void> {
  155. let promise = self.eventLoop.makePromise(of: Void.self)
  156. self.send(part, promise: promise)
  157. return promise.futureResult
  158. }
  159. /// Attempt to cancel the RPC.
  160. /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
  161. /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
  162. public func cancel() -> EventLoopFuture<Void> {
  163. let promise = self.eventLoop.makePromise(of: Void.self)
  164. self.cancel(promise: promise)
  165. return promise.futureResult
  166. }
  167. }
  168. extension Call {
  169. internal func compress(_ compression: Compression) -> Bool {
  170. return compression.isEnabled(callDefault: self.isCompressionEnabled)
  171. }
  172. internal func sendMessages<Messages>(
  173. _ messages: Messages,
  174. compression: Compression,
  175. promise: EventLoopPromise<Void>?
  176. ) where Messages: Sequence, Messages.Element == Request {
  177. if self.eventLoop.inEventLoop {
  178. if let promise = promise {
  179. self._sendMessages(messages, compression: compression, promise: promise)
  180. } else {
  181. self._sendMessages(messages, compression: compression)
  182. }
  183. } else {
  184. self.eventLoop.execute {
  185. if let promise = promise {
  186. self._sendMessages(messages, compression: compression, promise: promise)
  187. } else {
  188. self._sendMessages(messages, compression: compression)
  189. }
  190. }
  191. }
  192. }
  193. // Provide a few convenience methods we need from the wrapped call objects.
  194. private func _sendMessages<Messages>(
  195. _ messages: Messages,
  196. compression: Compression
  197. ) where Messages: Sequence, Messages.Element == Request {
  198. self.eventLoop.assertInEventLoop()
  199. let compress = self.compress(compression)
  200. var iterator = messages.makeIterator()
  201. var maybeNext = iterator.next()
  202. while let current = maybeNext {
  203. let next = iterator.next()
  204. // If there's no next message, then we'll flush.
  205. let flush = next == nil
  206. self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil)
  207. maybeNext = next
  208. }
  209. }
  210. private func _sendMessages<Messages>(
  211. _ messages: Messages,
  212. compression: Compression,
  213. promise: EventLoopPromise<Void>
  214. ) where Messages: Sequence, Messages.Element == Request {
  215. self.eventLoop.assertInEventLoop()
  216. let compress = self.compress(compression)
  217. var iterator = messages.makeIterator()
  218. var maybeNext = iterator.next()
  219. while let current = maybeNext {
  220. let next = iterator.next()
  221. let isLast = next == nil
  222. // We're already on the event loop, use the `_` send.
  223. if isLast {
  224. // Only flush and attach the promise to the last message.
  225. self._send(.message(current, .init(compress: compress, flush: true)), promise: promise)
  226. } else {
  227. self._send(.message(current, .init(compress: compress, flush: false)), promise: nil)
  228. }
  229. maybeNext = next
  230. }
  231. }
  232. }
  233. extension Call {
  234. /// Invoke the RPC with this response part handler.
  235. /// - Important: This *must* to be called from the `eventLoop`.
  236. @usableFromInline
  237. internal func _invoke(
  238. onStart: @escaping () -> Void,
  239. onError: @escaping (Error) -> Void,
  240. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  241. ) {
  242. self.eventLoop.assertInEventLoop()
  243. switch self._state {
  244. case let .idle(factory):
  245. let transport = factory.makeConfiguredTransport(
  246. to: self.path,
  247. for: self.type,
  248. withOptions: self.options,
  249. onEventLoop: self.eventLoop,
  250. interceptedBy: self._interceptors,
  251. onStart: onStart,
  252. onError: onError,
  253. onResponsePart: onResponsePart
  254. )
  255. self._state = .invoked(transport)
  256. case .invoked:
  257. // We can't be invoked twice. Just ignore this.
  258. ()
  259. }
  260. }
  261. /// Send a request part on the transport.
  262. /// - Important: This *must* to be called from the `eventLoop`.
  263. @inlinable
  264. internal func _send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  265. self.eventLoop.assertInEventLoop()
  266. switch self._state {
  267. case .idle:
  268. promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
  269. case let .invoked(transport):
  270. transport.send(part, promise: promise)
  271. }
  272. }
  273. /// Attempt to cancel the call.
  274. /// - Important: This *must* to be called from the `eventLoop`.
  275. private func _cancel(promise: EventLoopPromise<Void>?) {
  276. self.eventLoop.assertInEventLoop()
  277. switch self._state {
  278. case .idle:
  279. promise?.succeed(())
  280. self.channelPromise?.fail(GRPCStatus(code: .cancelled))
  281. case let .invoked(transport):
  282. transport.cancel(promise: promise)
  283. }
  284. }
  285. /// Get the underlying `Channel` for this call.
  286. /// - Important: This *must* to be called from the `eventLoop`.
  287. private func _channel() -> EventLoopFuture<Channel> {
  288. self.eventLoop.assertInEventLoop()
  289. switch (self.channelPromise, self._state) {
  290. case let (.some(promise), .idle),
  291. let (.some(promise), .invoked):
  292. // We already have a promise, just use that.
  293. return promise.futureResult
  294. case (.none, .idle):
  295. // We need to allocate a promise and ask the transport for the channel later.
  296. let promise = self.eventLoop.makePromise(of: Channel.self)
  297. self.channelPromise = promise
  298. return promise.futureResult
  299. case let (.none, .invoked(transport)):
  300. // Just ask the transport.
  301. return transport.getChannel()
  302. }
  303. }
  304. }
  305. extension Call {
  306. // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
  307. /// Invokes the call and sends a single request. Sends the metadata, request and closes the
  308. /// request stream.
  309. /// - Parameters:
  310. /// - request: The request to send.
  311. /// - onError: A callback invoked when an error is received.
  312. /// - onResponsePart: A callback invoked for each response part received.
  313. @inlinable
  314. internal func invokeUnaryRequest(
  315. _ request: Request,
  316. onStart: @escaping () -> Void,
  317. onError: @escaping (Error) -> Void,
  318. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  319. ) {
  320. if self.eventLoop.inEventLoop {
  321. self._invokeUnaryRequest(
  322. request: request,
  323. onStart: onStart,
  324. onError: onError,
  325. onResponsePart: onResponsePart
  326. )
  327. } else {
  328. self.eventLoop.execute {
  329. self._invokeUnaryRequest(
  330. request: request,
  331. onStart: onStart,
  332. onError: onError,
  333. onResponsePart: onResponsePart
  334. )
  335. }
  336. }
  337. }
  338. /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
  339. /// additional messages and end the stream by calling `send(_:promise:)`.
  340. /// - Parameters:
  341. /// - onError: A callback invoked when an error is received.
  342. /// - onResponsePart: A callback invoked for each response part received.
  343. @inlinable
  344. internal func invokeStreamingRequests(
  345. onStart: @escaping () -> Void,
  346. onError: @escaping (Error) -> Void,
  347. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  348. ) {
  349. if self.eventLoop.inEventLoop {
  350. self._invokeStreamingRequests(
  351. onStart: onStart,
  352. onError: onError,
  353. onResponsePart: onResponsePart
  354. )
  355. } else {
  356. self.eventLoop.execute {
  357. self._invokeStreamingRequests(
  358. onStart: onStart,
  359. onError: onError,
  360. onResponsePart: onResponsePart
  361. )
  362. }
  363. }
  364. }
  365. /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
  366. @usableFromInline
  367. internal func _invokeUnaryRequest(
  368. request: Request,
  369. onStart: @escaping () -> Void,
  370. onError: @escaping (Error) -> Void,
  371. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  372. ) {
  373. self.eventLoop.assertInEventLoop()
  374. assert(self.type == .unary || self.type == .serverStreaming)
  375. self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart)
  376. self._send(.metadata(self.options.customMetadata), promise: nil)
  377. self._send(
  378. .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
  379. promise: nil
  380. )
  381. self._send(.end, promise: nil)
  382. }
  383. /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
  384. @usableFromInline
  385. internal func _invokeStreamingRequests(
  386. onStart: @escaping () -> Void,
  387. onError: @escaping (Error) -> Void,
  388. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  389. ) {
  390. self.eventLoop.assertInEventLoop()
  391. assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
  392. self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart)
  393. self._send(.metadata(self.options.customMetadata), promise: nil)
  394. }
  395. }
  396. // @unchecked is ok: all mutable state is accessed/modified from the appropriate event loop.
  397. extension Call: @unchecked Sendable where Request: Sendable, Response: Sendable {}