Call.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import protocol SwiftProtobuf.Message
  21. /// An object representing a single RPC from the perspective of a client. It allows the caller to
  22. /// send request parts, request a cancellation, and receive response parts in a provided callback.
  23. ///
  24. /// The call object sits atop an interceptor pipeline (see `ClientInterceptor`) which allows for
  25. /// request and response streams to be arbitrarily transformed or observed. Requests sent via this
  26. /// call will traverse the pipeline before reaching the network, and responses received will
  27. /// traverse the pipeline having been received from the network.
  28. ///
  29. /// This object is a lower-level API than the equivalent wrapped calls (such as `UnaryCall` and
  30. /// `BidirectionalStreamingCall`). The caller is therefore required to do more in order to use this
  31. /// object correctly. Callers must call `invoke(_:)` to start the call and ensure that the correct
  32. /// number of request parts are sent in the correct order (exactly one `metadata`, followed
  33. /// by at most one `message` for unary and server streaming calls, and any number of `message` parts
  34. /// for client streaming and bidirectional streaming calls. All call types must terminate their
  35. /// request stream by sending one `end` message.
  36. ///
  37. /// Callers are not able to create `Call` objects directly, rather they must be created via an
  38. /// object conforming to `GRPCChannel` such as `ClientConnection`.
  39. public class Call<Request, Response> {
  40. @usableFromInline
  41. internal enum State {
  42. /// Idle, waiting to be invoked.
  43. case idle(ClientTransportFactory<Request, Response>)
  44. /// Invoked, we have a transport on which to send requests. The transport may be closed if the
  45. /// RPC has already completed.
  46. case invoked(ClientTransport<Request, Response>)
  47. }
  48. /// The current state of the call.
  49. @usableFromInline
  50. internal var _state: State
  51. /// User provided interceptors for the call.
  52. private let interceptors: [ClientInterceptor<Request, Response>]
  53. /// Whether compression is enabled on the call.
  54. private var isCompressionEnabled: Bool {
  55. return self.options.messageEncoding.enabledForRequests
  56. }
  57. /// The `EventLoop` the call is being invoked on.
  58. public let eventLoop: EventLoop
  59. /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
  60. public let path: String
  61. /// The type of the RPC, e.g. unary, bidirectional streaming.
  62. public let type: GRPCCallType
  63. /// Options used to invoke the call.
  64. public let options: CallOptions
  65. /// A promise for the underlying `Channel`. We only allocate this if the user asks for
  66. /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate.
  67. private var channelPromise: EventLoopPromise<Channel>?
  68. /// Returns a future for the underlying `Channel`.
  69. internal var channel: EventLoopFuture<Channel> {
  70. if self.eventLoop.inEventLoop {
  71. return self._channel()
  72. } else {
  73. return self.eventLoop.flatSubmit {
  74. return self._channel()
  75. }
  76. }
  77. }
  78. // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
  79. internal init(
  80. path: String,
  81. type: GRPCCallType,
  82. eventLoop: EventLoop,
  83. options: CallOptions,
  84. interceptors: [ClientInterceptor<Request, Response>],
  85. transportFactory: ClientTransportFactory<Request, Response>
  86. ) {
  87. self.path = path
  88. self.type = type
  89. self.options = options
  90. self._state = .idle(transportFactory)
  91. self.eventLoop = eventLoop
  92. self.interceptors = interceptors
  93. }
  94. /// Starts the call and provides a callback which is invoked on every response part received from
  95. /// the server.
  96. ///
  97. /// This must be called prior to `send(_:promise:)` or `cancel(promise:)`.
  98. ///
  99. /// - Parameters:
  100. /// - onError: A callback invoked when an error is received.
  101. /// - onResponsePart: A callback which is invoked on every response part.
  102. /// - Important: This function should only be called once. Subsequent calls will be ignored.
  103. @inlinable
  104. public func invoke(
  105. onError: @escaping (Error) -> Void,
  106. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  107. ) {
  108. self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
  109. if self.eventLoop.inEventLoop {
  110. self._invoke(onError: onError, onResponsePart: onResponsePart)
  111. } else {
  112. self.eventLoop.execute {
  113. self._invoke(onError: onError, onResponsePart: onResponsePart)
  114. }
  115. }
  116. }
  117. /// Send a request part on the RPC.
  118. /// - Parameters:
  119. /// - part: The request part to send.
  120. /// - promise: A promise which will be completed when the request part has been handled.
  121. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  122. @inlinable
  123. public func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  124. if self.eventLoop.inEventLoop {
  125. self._send(part, promise: promise)
  126. } else {
  127. self.eventLoop.execute {
  128. self._send(part, promise: promise)
  129. }
  130. }
  131. }
  132. /// Attempt to cancel the RPC.
  133. /// - Parameter promise: A promise which will be completed once the cancellation request has been
  134. /// dealt with.
  135. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  136. public func cancel(promise: EventLoopPromise<Void>?) {
  137. if self.eventLoop.inEventLoop {
  138. self._cancel(promise: promise)
  139. } else {
  140. self.eventLoop.execute {
  141. self._cancel(promise: promise)
  142. }
  143. }
  144. }
  145. }
  146. extension Call {
  147. /// Send a request part on the RPC.
  148. /// - Parameter part: The request part to send.
  149. /// - Returns: A future which will be resolved when the request has been handled.
  150. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  151. @inlinable
  152. public func send(_ part: GRPCClientRequestPart<Request>) -> EventLoopFuture<Void> {
  153. let promise = self.eventLoop.makePromise(of: Void.self)
  154. self.send(part, promise: promise)
  155. return promise.futureResult
  156. }
  157. /// Attempt to cancel the RPC.
  158. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  159. /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
  160. public func cancel() -> EventLoopFuture<Void> {
  161. let promise = self.eventLoop.makePromise(of: Void.self)
  162. self.cancel(promise: promise)
  163. return promise.futureResult
  164. }
  165. }
  166. extension Call {
  167. internal func compress(_ compression: Compression) -> Bool {
  168. return compression.isEnabled(callDefault: self.isCompressionEnabled)
  169. }
  170. internal func sendMessages<Messages>(
  171. _ messages: Messages,
  172. compression: Compression,
  173. promise: EventLoopPromise<Void>?
  174. ) where Messages: Sequence, Messages.Element == Request {
  175. if self.eventLoop.inEventLoop {
  176. if let promise = promise {
  177. self._sendMessages(messages, compression: compression, promise: promise)
  178. } else {
  179. self._sendMessages(messages, compression: compression)
  180. }
  181. } else {
  182. self.eventLoop.execute {
  183. if let promise = promise {
  184. self._sendMessages(messages, compression: compression, promise: promise)
  185. } else {
  186. self._sendMessages(messages, compression: compression)
  187. }
  188. }
  189. }
  190. }
  191. // Provide a few convenience methods we need from the wrapped call objects.
  192. private func _sendMessages<Messages>(
  193. _ messages: Messages,
  194. compression: Compression
  195. ) where Messages: Sequence, Messages.Element == Request {
  196. self.eventLoop.assertInEventLoop()
  197. let compress = self.compress(compression)
  198. var iterator = messages.makeIterator()
  199. var maybeNext = iterator.next()
  200. while let current = maybeNext {
  201. let next = iterator.next()
  202. // If there's no next message, then we'll flush.
  203. let flush = next == nil
  204. self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil)
  205. maybeNext = next
  206. }
  207. }
  208. private func _sendMessages<Messages>(
  209. _ messages: Messages,
  210. compression: Compression,
  211. promise: EventLoopPromise<Void>
  212. ) where Messages: Sequence, Messages.Element == Request {
  213. self.eventLoop.assertInEventLoop()
  214. let compress = self.compress(compression)
  215. var iterator = messages.makeIterator()
  216. var maybeNext = iterator.next()
  217. while let current = maybeNext {
  218. let next = iterator.next()
  219. let isLast = next == nil
  220. // We're already on the event loop, use the `_` send.
  221. if isLast {
  222. // Only flush and attach the promise to the last message.
  223. self._send(.message(current, .init(compress: compress, flush: true)), promise: promise)
  224. } else {
  225. self._send(.message(current, .init(compress: compress, flush: false)), promise: nil)
  226. }
  227. maybeNext = next
  228. }
  229. }
  230. }
  231. extension Call {
  232. /// Invoke the RPC with this response part handler.
  233. /// - Important: This *must* to be called from the `eventLoop`.
  234. @usableFromInline
  235. internal func _invoke(
  236. onError: @escaping (Error) -> Void,
  237. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  238. ) {
  239. self.eventLoop.assertInEventLoop()
  240. switch self._state {
  241. case let .idle(factory):
  242. let transport = factory.makeConfiguredTransport(
  243. to: self.path,
  244. for: self.type,
  245. withOptions: self.options,
  246. interceptedBy: self.interceptors,
  247. onError: onError,
  248. onResponsePart: onResponsePart
  249. )
  250. self._state = .invoked(transport)
  251. case .invoked:
  252. // We can't be invoked twice. Just ignore this.
  253. ()
  254. }
  255. }
  256. /// Send a request part on the transport.
  257. /// - Important: This *must* to be called from the `eventLoop`.
  258. @inlinable
  259. internal func _send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  260. self.eventLoop.assertInEventLoop()
  261. switch self._state {
  262. case .idle:
  263. promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
  264. case let .invoked(transport):
  265. transport.send(part, promise: promise)
  266. }
  267. }
  268. /// Attempt to cancel the call.
  269. /// - Important: This *must* to be called from the `eventLoop`.
  270. private func _cancel(promise: EventLoopPromise<Void>?) {
  271. self.eventLoop.assertInEventLoop()
  272. switch self._state {
  273. case .idle:
  274. // This is weird: does it make sense to cancel before invoking it?
  275. let error = GRPCError.InvalidState("Call must be invoked before cancelling it")
  276. promise?.fail(error)
  277. self.channelPromise?.fail(error)
  278. case let .invoked(transport):
  279. transport.cancel(promise: promise)
  280. }
  281. }
  282. /// Get the underlying `Channel` for this call.
  283. /// - Important: This *must* to be called from the `eventLoop`.
  284. private func _channel() -> EventLoopFuture<Channel> {
  285. self.eventLoop.assertInEventLoop()
  286. switch (self.channelPromise, self._state) {
  287. case let (.some(promise), .idle),
  288. let (.some(promise), .invoked):
  289. // We already have a promise, just use that.
  290. return promise.futureResult
  291. case (.none, .idle):
  292. // We need to allocate a promise and ask the transport for the channel later.
  293. let promise = self.eventLoop.makePromise(of: Channel.self)
  294. self.channelPromise = promise
  295. return promise.futureResult
  296. case let (.none, .invoked(transport)):
  297. // Just ask the transport.
  298. return transport.channel()
  299. }
  300. }
  301. }
  302. extension Call {
  303. // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
  304. /// Invokes the call and sends a single request. Sends the metadata, request and closes the
  305. /// request stream.
  306. /// - Parameters:
  307. /// - request: The request to send.
  308. /// - onError: A callback invoked when an error is received.
  309. /// - onResponsePart: A callback invoked for each response part received.
  310. @inlinable
  311. internal func invokeUnaryRequest(
  312. _ request: Request,
  313. onError: @escaping (Error) -> Void,
  314. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  315. ) {
  316. if self.eventLoop.inEventLoop {
  317. self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
  318. } else {
  319. self.eventLoop.execute {
  320. self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
  321. }
  322. }
  323. }
  324. /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
  325. /// additional messages and end the stream by calling `send(_:promise:)`.
  326. /// - Parameters:
  327. /// - onError: A callback invoked when an error is received.
  328. /// - onResponsePart: A callback invoked for each response part received.
  329. @inlinable
  330. internal func invokeStreamingRequests(
  331. onError: @escaping (Error) -> Void,
  332. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  333. ) {
  334. if self.eventLoop.inEventLoop {
  335. self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
  336. } else {
  337. self.eventLoop.execute {
  338. self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
  339. }
  340. }
  341. }
  342. /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
  343. @usableFromInline
  344. internal func _invokeUnaryRequest(
  345. request: Request,
  346. onError: @escaping (Error) -> Void,
  347. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  348. ) {
  349. self.eventLoop.assertInEventLoop()
  350. assert(self.type == .unary || self.type == .serverStreaming)
  351. self._invoke(onError: onError, onResponsePart: onResponsePart)
  352. self._send(.metadata(self.options.customMetadata), promise: nil)
  353. self._send(
  354. .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
  355. promise: nil
  356. )
  357. self._send(.end, promise: nil)
  358. }
  359. /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
  360. @usableFromInline
  361. internal func _invokeStreamingRequests(
  362. onError: @escaping (Error) -> Void,
  363. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  364. ) {
  365. self.eventLoop.assertInEventLoop()
  366. assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
  367. self._invoke(onError: onError, onResponsePart: onResponsePart)
  368. self._send(.metadata(self.options.customMetadata), promise: nil)
  369. }
  370. }