ClientInterceptors.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. /// A base class for client interceptors.
  18. ///
  19. /// Interceptors allow request and response and response parts to be observed, mutated or dropped
  20. /// as necessary. The default behaviour for this base class is to forward any events to the next
  21. /// interceptor.
  22. ///
  23. /// Interceptors may observe three different types of event:
  24. /// - receiving response parts with `receive(_:context:)`,
  25. /// - sending request parts with `send(_:promise:context:)`, and
  26. /// - RPC cancellation with `cancel(context:)`.
  27. ///
  28. /// These events flow through a pipeline of interceptors for each RPC. Request parts sent from the
  29. /// call object (such as `UnaryCall` and `BidirectionalStreamingCall`) will traverse the pipeline
  30. /// from its tail via `send(_:context:)` eventually reaching the head of the pipeline where it will
  31. /// be sent sent to the server.
  32. ///
  33. /// Response parts, or errors, received from the transport fill be fired back through the
  34. /// interceptor pipeline via `receive(_:context:)`. Note that the `end` and `error` response parts
  35. /// are terminal: the pipeline will be torn down once these parts reach the the tail of the
  36. /// pipeline.
  37. ///
  38. /// Each of the interceptor functions is provided with a `context` which exposes analogous functions
  39. /// (`receive(_:)`, `send(_:promise:)`, and `cancel(promise:)`) which may be called to forward
  40. /// events to the next interceptor.
  41. ///
  42. /// ### Thread Safety
  43. ///
  44. /// Functions on `context` are not thread safe and **must** be called on the `EventLoop` found on
  45. /// the `context`. Since each interceptor is invoked on the same `EventLoop` this does not usually
  46. /// require any extra attention. However, if work is done on a `DispatchQueue` or _other_
  47. /// `EventLoop` then implementers should ensure that they use `context` from the correct
  48. /// `EventLoop`.
  49. open class ClientInterceptor<Request, Response> {
  50. public init() {}
  51. /// Called when the interceptor has received a response part to handle.
  52. /// - Parameters:
  53. /// - part: The response part which has been received from the server.
  54. /// - context: An interceptor context which may be used to forward the response part.
  55. open func receive(
  56. _ part: ClientResponsePart<Response>,
  57. context: ClientInterceptorContext<Request, Response>
  58. ) {
  59. context.receive(part)
  60. }
  61. /// Called when the interceptor has received a request part to handle.
  62. /// - Parameters:
  63. /// - part: The request part which should be sent to the server.
  64. /// - promise: A promise which should be completed when the response part has been handled.
  65. /// - context: An interceptor context which may be used to forward the request part.
  66. open func send(
  67. _ part: ClientRequestPart<Request>,
  68. promise: EventLoopPromise<Void>?,
  69. context: ClientInterceptorContext<Request, Response>
  70. ) {
  71. context.send(part, promise: promise)
  72. }
  73. /// Called when the interceptor has received a request to cancel the RPC.
  74. /// - Parameters:
  75. /// - promise: A promise which should be cancellation request has been handled.
  76. /// - context: An interceptor context which may be used to forward the cancellation request.
  77. open func cancel(
  78. promise: EventLoopPromise<Void>?,
  79. context: ClientInterceptorContext<Request, Response>
  80. ) {
  81. context.cancel(promise: promise)
  82. }
  83. }
  84. // MARK: - Head/Tail
  85. /// An interceptor which offloads requests to the transport and forwards any response parts to the
  86. /// rest of the pipeline.
  87. @usableFromInline
  88. internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  89. /// Called when a cancellation has been requested.
  90. private let onCancel: (EventLoopPromise<Void>?) -> Void
  91. /// Called when a request part has been written.
  92. @usableFromInline
  93. internal let _onRequestPart: (ClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  94. init(
  95. onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
  96. onRequestPart: @escaping (ClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  97. ) {
  98. self.onCancel = onCancel
  99. self._onRequestPart = onRequestPart
  100. }
  101. @inlinable
  102. internal func send(
  103. _ part: ClientRequestPart<Request>,
  104. promise: EventLoopPromise<Void>?,
  105. context: ClientInterceptorContext<Request, Response>
  106. ) {
  107. self._onRequestPart(part, promise)
  108. }
  109. internal func cancel(
  110. promise: EventLoopPromise<Void>?,
  111. context: ClientInterceptorContext<Request, Response>
  112. ) {
  113. self.onCancel(promise)
  114. }
  115. internal func receive(
  116. _ part: ClientResponsePart<Response>,
  117. context: ClientInterceptorContext<Request, Response>
  118. ) {
  119. context.receive(part)
  120. }
  121. }
  122. /// An interceptor which offloads responses to a provided callback and forwards any requests parts
  123. /// and cancellation requests to rest of the pipeline.
  124. @usableFromInline
  125. internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  126. /// The pipeline this interceptor belongs to.
  127. private let pipeline: ClientInterceptorPipeline<Request, Response>
  128. /// A user-provided error delegate.
  129. private let errorDelegate: ClientErrorDelegate?
  130. /// A response part handler; typically this will complete some promises, for streaming responses
  131. /// it will also invoke a user-supplied handler. This closure may also be provided by the user.
  132. /// We need to be careful about re-entrancy.
  133. private let onResponsePart: (ClientResponsePart<Response>) -> Void
  134. internal init(
  135. for pipeline: ClientInterceptorPipeline<Request, Response>,
  136. errorDelegate: ClientErrorDelegate?,
  137. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  138. ) {
  139. self.pipeline = pipeline
  140. self.errorDelegate = errorDelegate
  141. self.onResponsePart = onResponsePart
  142. }
  143. internal func receive(
  144. _ part: ClientResponsePart<Response>,
  145. context: ClientInterceptorContext<Request, Response>
  146. ) {
  147. switch part {
  148. case .metadata, .message:
  149. self.onResponsePart(part)
  150. case .end:
  151. // We're about to complete, close the pipeline before calling out via `onResponsePart`.
  152. self.pipeline.close()
  153. self.onResponsePart(part)
  154. case let .error(error):
  155. // We're about to complete, close the pipeline before calling out via the error delegate
  156. // or `onResponsePart`.
  157. self.pipeline.close()
  158. var unwrappedError: Error
  159. // Unwrap the error, if possible.
  160. if let errorContext = error as? GRPCError.WithContext {
  161. unwrappedError = errorContext.error
  162. self.errorDelegate?.didCatchError(
  163. errorContext.error,
  164. logger: context.logger,
  165. file: errorContext.file,
  166. line: errorContext.line
  167. )
  168. } else {
  169. unwrappedError = error
  170. self.errorDelegate?.didCatchErrorWithoutContext(
  171. error,
  172. logger: context.logger
  173. )
  174. }
  175. // Emit the unwrapped error.
  176. self.onResponsePart(.error(unwrappedError))
  177. }
  178. }
  179. @inlinable
  180. internal func send(
  181. _ part: ClientRequestPart<Request>,
  182. promise: EventLoopPromise<Void>?,
  183. context: ClientInterceptorContext<Request, Response>
  184. ) {
  185. context.send(part, promise: promise)
  186. }
  187. internal func cancel(
  188. promise: EventLoopPromise<Void>?,
  189. context: ClientInterceptorContext<Request, Response>
  190. ) {
  191. context.cancel(promise: promise)
  192. }
  193. }
  194. // MARK: - Any Interceptor
  195. /// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
  196. @usableFromInline
  197. internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtocol {
  198. @usableFromInline
  199. internal enum Implementation {
  200. case head(HeadClientInterceptor<Request, Response>)
  201. case tail(TailClientInterceptor<Request, Response>)
  202. case base(ClientInterceptor<Request, Response>)
  203. }
  204. /// The underlying interceptor implementation.
  205. @usableFromInline
  206. internal let _implementation: Implementation
  207. /// Makes a head interceptor.
  208. /// - Returns: An `AnyClientInterceptor` which wraps a `HeadClientInterceptor`.
  209. internal static func head(
  210. onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
  211. onRequestPart: @escaping (ClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
  212. ) -> AnyClientInterceptor<Request, Response> {
  213. return .init(.head(.init(onCancel: onCancel, onRequestPart: onRequestPart)))
  214. }
  215. /// Makes a tail interceptor.
  216. /// - Parameters:
  217. /// - pipeline: The pipeline the tail interceptor belongs to.
  218. /// - errorDelegate: An error delegate.
  219. /// - onResponsePart: A handler called for each response part received from the pipeline.
  220. /// - Returns: An `AnyClientInterceptor` which wraps a `TailClientInterceptor`.
  221. internal static func tail(
  222. for pipeline: ClientInterceptorPipeline<Request, Response>,
  223. errorDelegate: ClientErrorDelegate?,
  224. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  225. ) -> AnyClientInterceptor<Request, Response> {
  226. let tail = TailClientInterceptor(for: pipeline, errorDelegate: errorDelegate, onResponsePart)
  227. return .init(.tail(tail))
  228. }
  229. /// A user provided interceptor.
  230. /// - Parameter interceptor: The interceptor to wrap.
  231. /// - Returns: An `AnyClientInterceptor` which wraps `interceptor`.
  232. internal static func userProvided(
  233. _ interceptor: ClientInterceptor<Request, Response>
  234. ) -> AnyClientInterceptor<Request, Response> {
  235. return .init(.base(interceptor))
  236. }
  237. private init(_ implementation: Implementation) {
  238. self._implementation = implementation
  239. }
  240. internal func receive(
  241. _ part: ClientResponsePart<Response>,
  242. context: ClientInterceptorContext<Request, Response>
  243. ) {
  244. switch self._implementation {
  245. case let .head(handler):
  246. handler.receive(part, context: context)
  247. case let .tail(handler):
  248. handler.receive(part, context: context)
  249. case let .base(handler):
  250. handler.receive(part, context: context)
  251. }
  252. }
  253. @inlinable
  254. internal func send(
  255. _ part: ClientRequestPart<Request>,
  256. promise: EventLoopPromise<Void>?,
  257. context: ClientInterceptorContext<Request, Response>
  258. ) {
  259. switch self._implementation {
  260. case let .head(handler):
  261. handler.send(part, promise: promise, context: context)
  262. case let .tail(handler):
  263. handler.send(part, promise: promise, context: context)
  264. case let .base(handler):
  265. handler.send(part, promise: promise, context: context)
  266. }
  267. }
  268. internal func cancel(
  269. promise: EventLoopPromise<Void>?,
  270. context: ClientInterceptorContext<Request, Response>
  271. ) {
  272. switch self._implementation {
  273. case let .head(handler):
  274. handler.cancel(promise: promise, context: context)
  275. case let .tail(handler):
  276. handler.cancel(promise: promise, context: context)
  277. case let .base(handler):
  278. handler.cancel(promise: promise, context: context)
  279. }
  280. }
  281. }