ClientInterceptors.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. /// A base class for client interceptors.
  18. ///
  19. /// Interceptors allow request and response parts to be observed, mutated or dropped as necessary.
  20. /// The default behaviour for this base class is to forward any events to the next interceptor.
  21. ///
  22. /// Interceptors may observe a number of different events:
  23. /// - receiving response parts with `receive(_:context:)`,
  24. /// - receiving errors with `errorCaught(_:context:)`,
  25. /// - sending request parts with `send(_:promise:context:)`, and
  26. /// - RPC cancellation with `cancel(context:)`.
  27. ///
  28. /// These events flow through a pipeline of interceptors for each RPC. Request parts sent from the
  29. /// call object (e.g. `UnaryCall`, `BidirectionalStreamingCall`) will traverse the pipeline in the
  30. /// outbound direction from its tail via `send(_:context:)` eventually reaching the head of the
  31. /// pipeline where it will be sent sent to the server.
  32. ///
  33. /// Response parts, or errors, received from the transport fill be fired in the inbound direction
  34. /// back through the interceptor pipeline via `receive(_:context:)` and `errorCaught(_:context:)`,
  35. /// respectively. Note that the `end` response part and any error received are terminal: the
  36. /// pipeline will be torn down once these parts reach the the tail and are a signal that the
  37. /// interceptor should free up any resources it may be using.
  38. ///
  39. /// Each of the interceptor functions is provided with a `context` which exposes analogous functions
  40. /// (`receive(_:)`, `errorCaught(_:)`, `send(_:promise:)`, and `cancel(promise:)`) which may be
  41. /// called to forward events to the next interceptor in the appropriate direction.
  42. ///
  43. /// ### Thread Safety
  44. ///
  45. /// Functions on `context` are not thread safe and **must** be called on the `EventLoop` found on
  46. /// the `context`. Since each interceptor is invoked on the same `EventLoop` this does not usually
  47. /// require any extra attention. However, if work is done on a `DispatchQueue` or _other_
  48. /// `EventLoop` then implementers should ensure that they use `context` from the correct
  49. /// `EventLoop`.
  50. open class ClientInterceptor<Request, Response> {
  51. public init() {}
  52. /// Called when the interceptor has received a response part to handle.
  53. /// - Parameters:
  54. /// - part: The response part which has been received from the server.
  55. /// - context: An interceptor context which may be used to forward the response part.
  56. open func receive(
  57. _ part: GRPCClientResponsePart<Response>,
  58. context: ClientInterceptorContext<Request, Response>
  59. ) {
  60. context.receive(part)
  61. }
  62. /// Called when the interceptor has received an error.
  63. /// - Parameters:
  64. /// - error: The error.
  65. /// - context: An interceptor context which may be used to forward the error.
  66. open func errorCaught(
  67. _ error: Error,
  68. context: ClientInterceptorContext<Request, Response>
  69. ) {
  70. context.errorCaught(error)
  71. }
  72. /// Called when the interceptor has received a request part to handle.
  73. /// - Parameters:
  74. /// - part: The request part which should be sent to the server.
  75. /// - promise: A promise which should be completed when the response part has been handled.
  76. /// - context: An interceptor context which may be used to forward the request part.
  77. open func send(
  78. _ part: GRPCClientRequestPart<Request>,
  79. promise: EventLoopPromise<Void>?,
  80. context: ClientInterceptorContext<Request, Response>
  81. ) {
  82. context.send(part, promise: promise)
  83. }
  84. /// Called when the interceptor has received a request to cancel the RPC.
  85. /// - Parameters:
  86. /// - promise: A promise which should be cancellation request has been handled.
  87. /// - context: An interceptor context which may be used to forward the cancellation request.
  88. open func cancel(
  89. promise: EventLoopPromise<Void>?,
  90. context: ClientInterceptorContext<Request, Response>
  91. ) {
  92. context.cancel(promise: promise)
  93. }
  94. }
  95. // MARK: - Head/Tail
  96. /// An interceptor which offloads requests to the transport and forwards any response parts to the
  97. /// rest of the pipeline.
  98. @usableFromInline
  99. internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  100. /// Called when a cancellation has been requested.
  101. private let onCancel: (EventLoopPromise<Void>?) -> Void
  102. /// Called when a request part has been written.
  103. @usableFromInline
  104. internal let _onRequestPart: (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  105. init(
  106. onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
  107. onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  108. ) {
  109. self.onCancel = onCancel
  110. self._onRequestPart = onRequestPart
  111. }
  112. @inlinable
  113. internal func send(
  114. _ part: GRPCClientRequestPart<Request>,
  115. promise: EventLoopPromise<Void>?,
  116. context: ClientInterceptorContext<Request, Response>
  117. ) {
  118. self._onRequestPart(part, promise)
  119. }
  120. internal func cancel(
  121. promise: EventLoopPromise<Void>?,
  122. context: ClientInterceptorContext<Request, Response>
  123. ) {
  124. self.onCancel(promise)
  125. }
  126. internal func receive(
  127. _ part: GRPCClientResponsePart<Response>,
  128. context: ClientInterceptorContext<Request, Response>
  129. ) {
  130. context.receive(part)
  131. }
  132. internal func errorCaught(
  133. _ error: Error,
  134. context: ClientInterceptorContext<Request, Response>
  135. ) {
  136. context.errorCaught(error)
  137. }
  138. }
  139. /// An interceptor which offloads responses to a provided callback and forwards any requests parts
  140. /// and cancellation requests to rest of the pipeline.
  141. @usableFromInline
  142. internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  143. /// The pipeline this interceptor belongs to.
  144. private let pipeline: ClientInterceptorPipeline<Request, Response>
  145. /// A user-provided error delegate.
  146. private let errorDelegate: ClientErrorDelegate?
  147. /// A callback invoked when an error is received.
  148. private let onErrorCaught: (Error) -> Void
  149. /// A response part handler; typically this will complete some promises, for streaming responses
  150. /// it will also invoke a user-supplied handler. This closure may also be provided by the user.
  151. /// We need to be careful about re-entrancy.
  152. private let onResponsePart: (GRPCClientResponsePart<Response>) -> Void
  153. internal init(
  154. for pipeline: ClientInterceptorPipeline<Request, Response>,
  155. errorDelegate: ClientErrorDelegate?,
  156. _ onErrorCaught: @escaping (Error) -> Void,
  157. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  158. ) {
  159. self.pipeline = pipeline
  160. self.errorDelegate = errorDelegate
  161. self.onErrorCaught = onErrorCaught
  162. self.onResponsePart = onResponsePart
  163. }
  164. internal func receive(
  165. _ part: GRPCClientResponsePart<Response>,
  166. context: ClientInterceptorContext<Request, Response>
  167. ) {
  168. switch part {
  169. case .metadata, .message:
  170. ()
  171. case .end:
  172. // We're about to complete, close the pipeline before calling out via `onResponsePart`.
  173. self.pipeline.close()
  174. }
  175. self.onResponsePart(part)
  176. }
  177. internal func errorCaught(
  178. _ error: Error,
  179. context: ClientInterceptorContext<Request, Response>
  180. ) {
  181. // We're about to complete, close the pipeline before calling out via the error delegate
  182. // or `onResponsePart`.
  183. self.pipeline.close()
  184. var unwrappedError: Error
  185. // Unwrap the error, if possible.
  186. if let errorContext = error as? GRPCError.WithContext {
  187. unwrappedError = errorContext.error
  188. self.errorDelegate?.didCatchError(
  189. errorContext.error,
  190. logger: context.logger,
  191. file: errorContext.file,
  192. line: errorContext.line
  193. )
  194. } else {
  195. unwrappedError = error
  196. self.errorDelegate?.didCatchErrorWithoutContext(
  197. error,
  198. logger: context.logger
  199. )
  200. }
  201. // Emit the unwrapped error.
  202. self.onErrorCaught(unwrappedError)
  203. }
  204. @inlinable
  205. internal func send(
  206. _ part: GRPCClientRequestPart<Request>,
  207. promise: EventLoopPromise<Void>?,
  208. context: ClientInterceptorContext<Request, Response>
  209. ) {
  210. context.send(part, promise: promise)
  211. }
  212. internal func cancel(
  213. promise: EventLoopPromise<Void>?,
  214. context: ClientInterceptorContext<Request, Response>
  215. ) {
  216. context.cancel(promise: promise)
  217. }
  218. }
  219. // MARK: - Any Interceptor
  220. /// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
  221. @usableFromInline
  222. internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  223. @usableFromInline
  224. internal enum Implementation {
  225. case head(HeadClientInterceptor<Request, Response>)
  226. case tail(TailClientInterceptor<Request, Response>)
  227. case base(ClientInterceptor<Request, Response>)
  228. }
  229. /// The underlying interceptor implementation.
  230. @usableFromInline
  231. internal let _implementation: Implementation
  232. /// Makes a head interceptor.
  233. /// - Returns: An `AnyClientInterceptor` which wraps a `HeadClientInterceptor`.
  234. internal static func head(
  235. onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
  236. onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  237. ) -> AnyClientInterceptor<Request, Response> {
  238. return .init(.head(.init(onCancel: onCancel, onRequestPart: onRequestPart)))
  239. }
  240. /// Makes a tail interceptor.
  241. /// - Parameters:
  242. /// - pipeline: The pipeline the tail interceptor belongs to.
  243. /// - errorDelegate: An error delegate.
  244. /// - onError: A callback invoked when an error is received.
  245. /// - onResponsePart: A handler called for each response part received from the pipeline.
  246. /// - Returns: An `AnyClientInterceptor` which wraps a `TailClientInterceptor`.
  247. internal static func tail(
  248. for pipeline: ClientInterceptorPipeline<Request, Response>,
  249. errorDelegate: ClientErrorDelegate?,
  250. onError: @escaping (Error) -> Void,
  251. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  252. ) -> AnyClientInterceptor<Request, Response> {
  253. let tail = TailClientInterceptor(
  254. for: pipeline,
  255. errorDelegate: errorDelegate,
  256. onError,
  257. onResponsePart
  258. )
  259. return .init(.tail(tail))
  260. }
  261. /// A user provided interceptor.
  262. /// - Parameter interceptor: The interceptor to wrap.
  263. /// - Returns: An `AnyClientInterceptor` which wraps `interceptor`.
  264. internal static func userProvided(
  265. _ interceptor: ClientInterceptor<Request, Response>
  266. ) -> AnyClientInterceptor<Request, Response> {
  267. return .init(.base(interceptor))
  268. }
  269. private init(_ implementation: Implementation) {
  270. self._implementation = implementation
  271. }
  272. internal func receive(
  273. _ part: GRPCClientResponsePart<Response>,
  274. context: ClientInterceptorContext<Request, Response>
  275. ) {
  276. switch self._implementation {
  277. case let .head(handler):
  278. handler.receive(part, context: context)
  279. case let .tail(handler):
  280. handler.receive(part, context: context)
  281. case let .base(handler):
  282. handler.receive(part, context: context)
  283. }
  284. }
  285. internal func errorCaught(
  286. _ error: Error,
  287. context: ClientInterceptorContext<Request, Response>
  288. ) {
  289. switch self._implementation {
  290. case let .head(handler):
  291. handler.errorCaught(error, context: context)
  292. case let .tail(handler):
  293. handler.errorCaught(error, context: context)
  294. case let .base(handler):
  295. handler.errorCaught(error, context: context)
  296. }
  297. }
  298. @inlinable
  299. internal func send(
  300. _ part: GRPCClientRequestPart<Request>,
  301. promise: EventLoopPromise<Void>?,
  302. context: ClientInterceptorContext<Request, Response>
  303. ) {
  304. switch self._implementation {
  305. case let .head(handler):
  306. handler.send(part, promise: promise, context: context)
  307. case let .tail(handler):
  308. handler.send(part, promise: promise, context: context)
  309. case let .base(handler):
  310. handler.send(part, promise: promise, context: context)
  311. }
  312. }
  313. internal func cancel(
  314. promise: EventLoopPromise<Void>?,
  315. context: ClientInterceptorContext<Request, Response>
  316. ) {
  317. switch self._implementation {
  318. case let .head(handler):
  319. handler.cancel(promise: promise, context: context)
  320. case let .tail(handler):
  321. handler.cancel(promise: promise, context: context)
  322. case let .base(handler):
  323. handler.cancel(promise: promise, context: context)
  324. }
  325. }
  326. }