Call.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import protocol SwiftProtobuf.Message
  21. /// An object representing a single RPC from the perspective of a client. It allows the caller to
  22. /// send request parts, request a cancellation, and receive response parts in a provided callback.
  23. ///
  24. /// The call object sits atop an interceptor pipeline (see `ClientInterceptor`) which allows for
  25. /// request and response streams to be arbitrarily transformed or observed. Requests sent via this
  26. /// call will traverse the pipeline before reaching the network, and responses received will
  27. /// traverse the pipeline having been received from the network.
  28. ///
  29. /// This object is a lower-level API than the equivalent wrapped calls (such as `UnaryCall` and
  30. /// `BidirectionalStreamingCall`). The caller is therefore required to do more in order to use this
  31. /// object correctly. Callers must call `invoke(_:)` to start the call and ensure that the correct
  32. /// number of request parts are sent in the correct order (exactly one `metadata`, followed
  33. /// by at most one `message` for unary and server streaming calls, and any number of `message` parts
  34. /// for client streaming and bidirectional streaming calls. All call types must terminate their
  35. /// request stream by sending one `end` message.
  36. ///
  37. /// Callers are not able to create `Call` objects directly, rather they must be created via an
  38. /// object conforming to `GRPCChannel` such as `ClientConnection`.
  39. public class Call<Request, Response> {
  40. @usableFromInline
  41. internal enum State {
  42. /// Idle, waiting to be invoked.
  43. case idle(ClientTransportFactory<Request, Response>)
  44. /// Invoked, we have a transport on which to send requests. The transport may be closed if the
  45. /// RPC has already completed.
  46. case invoked(ClientTransport<Request, Response>)
  47. }
  48. /// The current state of the call.
  49. @usableFromInline
  50. internal var _state: State
  51. /// User provided interceptors for the call.
  52. private let interceptors: [ClientInterceptor<Request, Response>]
  53. /// Whether compression is enabled on the call.
  54. private var isCompressionEnabled: Bool {
  55. return self.options.messageEncoding.enabledForRequests
  56. }
  57. /// The `EventLoop` the call is being invoked on.
  58. public let eventLoop: EventLoop
  59. /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
  60. public let path: String
  61. /// The type of the RPC, e.g. unary, bidirectional streaming.
  62. public let type: GRPCCallType
  63. /// Options used to invoke the call.
  64. public let options: CallOptions
  65. /// A promise for the underlying `Channel`. We only allocate this if the user asks for
  66. /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate.
  67. private var channelPromise: EventLoopPromise<Channel>?
  68. /// Returns a future for the underlying `Channel`.
  69. internal var channel: EventLoopFuture<Channel> {
  70. if self.eventLoop.inEventLoop {
  71. return self._channel()
  72. } else {
  73. return self.eventLoop.flatSubmit {
  74. return self._channel()
  75. }
  76. }
  77. }
  78. // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
  79. internal init(
  80. path: String,
  81. type: GRPCCallType,
  82. eventLoop: EventLoop,
  83. options: CallOptions,
  84. interceptors: [ClientInterceptor<Request, Response>],
  85. transportFactory: ClientTransportFactory<Request, Response>
  86. ) {
  87. self.path = path
  88. self.type = type
  89. self.options = options
  90. self._state = .idle(transportFactory)
  91. self.eventLoop = eventLoop
  92. self.interceptors = interceptors
  93. }
  94. /// Starts the call and provides a callback which is invoked on every response part received from
  95. /// the server.
  96. ///
  97. /// This must be called prior to `send(_:promise:)` or `cancel(promise:)`.
  98. ///
  99. /// - Parameter onResponsePart: A callback which is invoked on every response part.
  100. /// - Important: This function should only be called once. Subsequent calls will be ignored.
  101. @inlinable
  102. public func invoke(_ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void) {
  103. self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
  104. if self.eventLoop.inEventLoop {
  105. self._invoke(onResponsePart)
  106. } else {
  107. self.eventLoop.execute {
  108. self._invoke(onResponsePart)
  109. }
  110. }
  111. }
  112. /// Send a request part on the RPC.
  113. /// - Parameters:
  114. /// - part: The request part to send.
  115. /// - promise: A promise which will be completed when the request part has been handled.
  116. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  117. @inlinable
  118. public func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  119. if self.eventLoop.inEventLoop {
  120. self._send(part, promise: promise)
  121. } else {
  122. self.eventLoop.execute {
  123. self._send(part, promise: promise)
  124. }
  125. }
  126. }
  127. /// Attempt to cancel the RPC.
  128. /// - Parameter promise: A promise which will be completed once the cancellation request has been
  129. /// dealt with.
  130. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  131. public func cancel(promise: EventLoopPromise<Void>?) {
  132. if self.eventLoop.inEventLoop {
  133. self._cancel(promise: promise)
  134. } else {
  135. self.eventLoop.execute {
  136. self._cancel(promise: promise)
  137. }
  138. }
  139. }
  140. }
  141. extension Call {
  142. /// Send a request part on the RPC.
  143. /// - Parameter part: The request part to send.
  144. /// - Returns: A future which will be resolved when the request has been handled.
  145. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  146. @inlinable
  147. public func send(_ part: ClientRequestPart<Request>) -> EventLoopFuture<Void> {
  148. let promise = self.eventLoop.makePromise(of: Void.self)
  149. self.send(part, promise: promise)
  150. return promise.futureResult
  151. }
  152. /// Attempt to cancel the RPC.
  153. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  154. /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
  155. public func cancel() -> EventLoopFuture<Void> {
  156. let promise = self.eventLoop.makePromise(of: Void.self)
  157. self.cancel(promise: promise)
  158. return promise.futureResult
  159. }
  160. }
  161. extension Call {
  162. internal func compress(_ compression: Compression) -> Bool {
  163. return compression.isEnabled(callDefault: self.isCompressionEnabled)
  164. }
  165. internal func sendMessages<Messages>(
  166. _ messages: Messages,
  167. compression: Compression,
  168. promise: EventLoopPromise<Void>?
  169. ) where Messages: Sequence, Messages.Element == Request {
  170. if self.eventLoop.inEventLoop {
  171. if let promise = promise {
  172. self._sendMessages(messages, compression: compression, promise: promise)
  173. } else {
  174. self._sendMessages(messages, compression: compression)
  175. }
  176. } else {
  177. self.eventLoop.execute {
  178. if let promise = promise {
  179. self._sendMessages(messages, compression: compression, promise: promise)
  180. } else {
  181. self._sendMessages(messages, compression: compression)
  182. }
  183. }
  184. }
  185. }
  186. // Provide a few convenience methods we need from the wrapped call objects.
  187. private func _sendMessages<Messages>(
  188. _ messages: Messages,
  189. compression: Compression
  190. ) where Messages: Sequence, Messages.Element == Request {
  191. self.eventLoop.assertInEventLoop()
  192. let compress = self.compress(compression)
  193. var iterator = messages.makeIterator()
  194. var maybeNext = iterator.next()
  195. while let current = maybeNext {
  196. let next = iterator.next()
  197. // If there's no next message, then we'll flush.
  198. let flush = next == nil
  199. self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil)
  200. maybeNext = next
  201. }
  202. }
  203. private func _sendMessages<Messages>(
  204. _ messages: Messages,
  205. compression: Compression,
  206. promise: EventLoopPromise<Void>
  207. ) where Messages: Sequence, Messages.Element == Request {
  208. self.eventLoop.assertInEventLoop()
  209. let compress = self.compress(compression)
  210. var iterator = messages.makeIterator()
  211. var maybeNext = iterator.next()
  212. while let current = maybeNext {
  213. let next = iterator.next()
  214. let isLast = next == nil
  215. // We're already on the event loop, use the `_` send.
  216. if isLast {
  217. // Only flush and attach the promise to the last message.
  218. self._send(.message(current, .init(compress: compress, flush: true)), promise: promise)
  219. } else {
  220. self._send(.message(current, .init(compress: compress, flush: false)), promise: nil)
  221. }
  222. maybeNext = next
  223. }
  224. }
  225. }
  226. extension Call {
  227. /// Invoke the RPC with this response part handler.
  228. /// - Important: This *must* to be called from the `eventLoop`.
  229. @usableFromInline
  230. internal func _invoke(
  231. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  232. ) {
  233. self.eventLoop.assertInEventLoop()
  234. switch self._state {
  235. case let .idle(factory):
  236. let transport = factory.makeConfiguredTransport(
  237. to: self.path,
  238. for: self.type,
  239. withOptions: self.options,
  240. interceptedBy: self.interceptors,
  241. onResponsePart
  242. )
  243. self._state = .invoked(transport)
  244. case .invoked:
  245. // We can't be invoked twice. Just ignore this.
  246. ()
  247. }
  248. }
  249. /// Send a request part on the transport.
  250. /// - Important: This *must* to be called from the `eventLoop`.
  251. @inlinable
  252. internal func _send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  253. self.eventLoop.assertInEventLoop()
  254. switch self._state {
  255. case .idle:
  256. promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
  257. case let .invoked(transport):
  258. transport.send(part, promise: promise)
  259. }
  260. }
  261. /// Attempt to cancel the call.
  262. /// - Important: This *must* to be called from the `eventLoop`.
  263. private func _cancel(promise: EventLoopPromise<Void>?) {
  264. self.eventLoop.assertInEventLoop()
  265. switch self._state {
  266. case .idle:
  267. // This is weird: does it make sense to cancel before invoking it?
  268. let error = GRPCError.InvalidState("Call must be invoked before cancelling it")
  269. promise?.fail(error)
  270. self.channelPromise?.fail(error)
  271. case let .invoked(transport):
  272. transport.cancel(promise: promise)
  273. }
  274. }
  275. /// Get the underlying `Channel` for this call.
  276. /// - Important: This *must* to be called from the `eventLoop`.
  277. private func _channel() -> EventLoopFuture<Channel> {
  278. self.eventLoop.assertInEventLoop()
  279. switch (self.channelPromise, self._state) {
  280. case let (.some(promise), .idle),
  281. let (.some(promise), .invoked):
  282. // We already have a promise, just use that.
  283. return promise.futureResult
  284. case (.none, .idle):
  285. // We need to allocate a promise and ask the transport for the channel later.
  286. let promise = self.eventLoop.makePromise(of: Channel.self)
  287. self.channelPromise = promise
  288. return promise.futureResult
  289. case let (.none, .invoked(transport)):
  290. // Just ask the transport.
  291. return transport.channel()
  292. }
  293. }
  294. }
  295. extension Call {
  296. // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
  297. /// Invokes the call and sends a single request. Sends the metadata, request and closes the
  298. /// request stream.
  299. /// - Parameters:
  300. /// - request: The request to send.
  301. /// - onResponsePart: A callback invoked for each response part received.
  302. @inlinable
  303. internal func invokeUnaryRequest(
  304. _ request: Request,
  305. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  306. ) {
  307. if self.eventLoop.inEventLoop {
  308. self._invokeUnaryRequest(request: request, onResponsePart)
  309. } else {
  310. self.eventLoop.execute {
  311. self._invokeUnaryRequest(request: request, onResponsePart)
  312. }
  313. }
  314. }
  315. /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
  316. /// additional messages and end the stream by calling `send(_:promise:)`.
  317. /// - Parameters:
  318. /// - onResponsePart: A callback invoked for each response part received.
  319. @inlinable
  320. internal func invokeStreamingRequests(
  321. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  322. ) {
  323. if self.eventLoop.inEventLoop {
  324. self._invokeStreamingRequests(onResponsePart)
  325. } else {
  326. self.eventLoop.execute {
  327. self._invokeStreamingRequests(onResponsePart)
  328. }
  329. }
  330. }
  331. /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
  332. @usableFromInline
  333. internal func _invokeUnaryRequest(
  334. request: Request,
  335. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  336. ) {
  337. self.eventLoop.assertInEventLoop()
  338. assert(self.type == .unary || self.type == .serverStreaming)
  339. self._invoke(onResponsePart)
  340. self._send(.metadata(self.options.customMetadata), promise: nil)
  341. self._send(
  342. .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
  343. promise: nil
  344. )
  345. self._send(.end, promise: nil)
  346. }
  347. /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
  348. @usableFromInline
  349. internal func _invokeStreamingRequests(
  350. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  351. ) {
  352. self.eventLoop.assertInEventLoop()
  353. assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
  354. self._invoke(onResponsePart)
  355. self._send(.metadata(self.options.customMetadata), promise: nil)
  356. }
  357. }