Call.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import protocol SwiftProtobuf.Message
  21. /// An object representing a single RPC from the perspective of a client. It allows the caller to
  22. /// send request parts, request a cancellation, and receive response parts in a provided callback.
  23. ///
  24. /// The call object sits atop an interceptor pipeline (see `ClientInterceptor`) which allows for
  25. /// request and response streams to be arbitrarily transformed or observed. Requests sent via this
  26. /// call will traverse the pipeline before reaching the network, and responses received will
  27. /// traverse the pipeline having been received from the network.
  28. ///
  29. /// This object is a lower-level API than the equivalent wrapped calls (such as `UnaryCall` and
  30. /// `BidirectionalStreamingCall`). The caller is therefore required to do more in order to use this
  31. /// object correctly. Callers must call `invoke(_:)` to start the call and ensure that the correct
  32. /// number of request parts are sent in the correct order (exactly one `metadata`, followed
  33. /// by at most one `message` for unary and server streaming calls, and any number of `message` parts
  34. /// for client streaming and bidirectional streaming calls. All call types must terminate their
  35. /// request stream by sending one `end` message.
  36. ///
  37. /// Callers are not able to create `Call` objects directly, rather they must be created via an
  38. /// object conforming to `GRPCChannel` such as `ClientConnection`.
  39. public class Call<Request, Response> {
  40. @usableFromInline
  41. internal enum State {
  42. /// Idle, waiting to be invoked.
  43. case idle(ClientTransportFactory<Request, Response>)
  44. /// Invoked, we have a transport on which to send requests. The transport may be closed if the
  45. /// RPC has already completed.
  46. case invoked(ClientTransport<Request, Response>)
  47. }
  48. /// The current state of the call.
  49. @usableFromInline
  50. internal var _state: State
  51. /// User provided interceptors for the call.
  52. private let interceptors: [ClientInterceptor<Request, Response>]
  53. /// Whether compression is enabled on the call.
  54. private var isCompressionEnabled: Bool {
  55. return self.options.messageEncoding.enabledForRequests
  56. }
  57. /// The `EventLoop` the call is being invoked on.
  58. public let eventLoop: EventLoop
  59. /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
  60. public let path: String
  61. /// The type of the RPC, e.g. unary, bidirectional streaming.
  62. public let type: GRPCCallType
  63. /// Options used to invoke the call.
  64. public let options: CallOptions
  65. // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
  66. internal init(
  67. path: String,
  68. type: GRPCCallType,
  69. eventLoop: EventLoop,
  70. options: CallOptions,
  71. interceptors: [ClientInterceptor<Request, Response>],
  72. transportFactory: ClientTransportFactory<Request, Response>
  73. ) {
  74. self.path = path
  75. self.type = type
  76. self.options = options
  77. self._state = .idle(transportFactory)
  78. self.eventLoop = eventLoop
  79. self.interceptors = interceptors
  80. }
  81. /// Starts the call and provides a callback which is invoked on every response part received from
  82. /// the server.
  83. ///
  84. /// This must be called prior to `send(_:promise:)` or `cancel(promise:)`.
  85. ///
  86. /// - Parameter onResponsePart: A callback which is invoked on every response part.
  87. /// - Important: This function should only be called once. Subsequent calls will be ignored.
  88. @inlinable
  89. public func invoke(_ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void) {
  90. self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
  91. if self.eventLoop.inEventLoop {
  92. self._invoke(onResponsePart)
  93. } else {
  94. self.eventLoop.execute {
  95. self._invoke(onResponsePart)
  96. }
  97. }
  98. }
  99. /// Send a request part on the RPC.
  100. /// - Parameters:
  101. /// - part: The request part to send.
  102. /// - promise: A promise which will be completed when the request part has been handled.
  103. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  104. @inlinable
  105. public func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  106. if self.eventLoop.inEventLoop {
  107. self._send(part, promise: promise)
  108. } else {
  109. self.eventLoop.execute {
  110. self._send(part, promise: promise)
  111. }
  112. }
  113. }
  114. /// Attempt to cancel the RPC.
  115. /// - Parameter promise: A promise which will be completed once the cancellation request has been
  116. /// dealt with.
  117. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  118. public func cancel(promise: EventLoopPromise<Void>?) {
  119. if self.eventLoop.inEventLoop {
  120. self._cancel(promise: promise)
  121. } else {
  122. self.eventLoop.execute {
  123. self._cancel(promise: promise)
  124. }
  125. }
  126. }
  127. }
  128. extension Call {
  129. /// Send a request part on the RPC.
  130. /// - Parameter part: The request part to send.
  131. /// - Returns: A future which will be resolved when the request has been handled.
  132. /// - Note: Sending will always fail if `invoke(_:)` has not been called.
  133. @inlinable
  134. public func send(_ part: ClientRequestPart<Request>) -> EventLoopFuture<Void> {
  135. let promise = self.eventLoop.makePromise(of: Void.self)
  136. self.send(part, promise: promise)
  137. return promise.futureResult
  138. }
  139. /// Attempt to cancel the RPC.
  140. /// - Note: Cancellation will always fail if `invoke(_:)` has not been called.
  141. /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
  142. public func cancel() -> EventLoopFuture<Void> {
  143. let promise = self.eventLoop.makePromise(of: Void.self)
  144. self.cancel(promise: promise)
  145. return promise.futureResult
  146. }
  147. }
  148. extension Call {
  149. /// Invoke the RPC with this response part handler.
  150. /// - Important: This *must* to be called from the `eventLoop`.
  151. @usableFromInline
  152. internal func _invoke(
  153. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  154. ) {
  155. self.eventLoop.assertInEventLoop()
  156. switch self._state {
  157. case let .idle(factory):
  158. let transport = factory.makeConfiguredTransport(
  159. to: self.path,
  160. for: self.type,
  161. withOptions: self.options,
  162. interceptedBy: self.interceptors,
  163. onResponsePart
  164. )
  165. self._state = .invoked(transport)
  166. case .invoked:
  167. // We can't be invoked twice. Just ignore this.
  168. ()
  169. }
  170. }
  171. /// Send a request part on the transport.
  172. /// - Important: This *must* to be called from the `eventLoop`.
  173. @inlinable
  174. internal func _send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  175. self.eventLoop.assertInEventLoop()
  176. switch self._state {
  177. case .idle:
  178. promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
  179. case let .invoked(transport):
  180. transport.send(part, promise: promise)
  181. }
  182. }
  183. /// Attempt to cancel the call.
  184. /// - Important: This *must* to be called from the `eventLoop`.
  185. private func _cancel(promise: EventLoopPromise<Void>?) {
  186. self.eventLoop.assertInEventLoop()
  187. switch self._state {
  188. case .idle:
  189. // This is weird: does it make sense to cancel before invoking it?
  190. promise?.fail(GRPCError.InvalidState("Call must be invoked before cancelling it"))
  191. case let .invoked(transport):
  192. transport.cancel(promise: promise)
  193. }
  194. }
  195. }
  196. extension Call {
  197. // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
  198. /// Invokes the call and sends a single request. Sends the metadata, request and closes the
  199. /// request stream.
  200. /// - Parameters:
  201. /// - request: The request to send.
  202. /// - onResponsePart: A callback invoked for each response part received.
  203. @inlinable
  204. internal func invokeUnaryRequest(
  205. _ request: Request,
  206. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  207. ) {
  208. if self.eventLoop.inEventLoop {
  209. self._invokeUnaryRequest(request: request, onResponsePart)
  210. } else {
  211. self.eventLoop.execute {
  212. self._invokeUnaryRequest(request: request, onResponsePart)
  213. }
  214. }
  215. }
  216. /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
  217. /// additional messages and end the stream by calling `send(_:promise:)`.
  218. /// - Parameters:
  219. /// - onResponsePart: A callback invoked for each response part received.
  220. @inlinable
  221. internal func invokeStreamingRequests(
  222. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  223. ) {
  224. if self.eventLoop.inEventLoop {
  225. self._invokeStreamingRequests(onResponsePart)
  226. } else {
  227. self.eventLoop.execute {
  228. self._invokeStreamingRequests(onResponsePart)
  229. }
  230. }
  231. }
  232. /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
  233. @usableFromInline
  234. internal func _invokeUnaryRequest(
  235. request: Request,
  236. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  237. ) {
  238. self.eventLoop.assertInEventLoop()
  239. assert(self.type == .unary || self.type == .serverStreaming)
  240. self._invoke(onResponsePart)
  241. self._send(.metadata(self.options.customMetadata), promise: nil)
  242. self._send(
  243. .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
  244. promise: nil
  245. )
  246. self._send(.end, promise: nil)
  247. }
  248. /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
  249. @usableFromInline
  250. internal func _invokeStreamingRequests(
  251. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  252. ) {
  253. self.eventLoop.assertInEventLoop()
  254. assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
  255. self._invoke(onResponsePart)
  256. self._send(.metadata(self.options.customMetadata), promise: nil)
  257. }
  258. }