ServerInterceptors.swift 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. /// A base class for server interceptors.
  18. ///
  19. /// Interceptors allow request and response and response parts to be observed, mutated or dropped
  20. /// as necessary. The default behaviour for this base class is to forward any events to the next
  21. /// interceptor.
  22. ///
  23. /// Interceptors may observe two different types of event:
  24. /// - receiving request parts with `receive(_:context:)`,
  25. /// - sending response parts with `send(_:promise:context:)`.
  26. ///
  27. /// These events flow through a pipeline of interceptors for each RPC. Request parts will enter
  28. /// the head of the interceptor pipeline once the request router has determined that there is a
  29. /// service provider which is able to handle the request stream. Response parts from the service
  30. /// provider enter the tail of the interceptor pipeline and will be sent to the client after
  31. /// traversing the pipeline through to the head.
  32. ///
  33. /// Each of the interceptor functions is provided with a `context` which exposes analogous functions
  34. /// (`receive(_:)` and `send(_:promise:)`) which may be called to forward events to the next
  35. /// interceptor.
  36. ///
  37. /// ### Thread Safety
  38. ///
  39. /// Functions on `context` are not thread safe and **must** be called on the `EventLoop` found on
  40. /// the `context`. Since each interceptor is invoked on the same `EventLoop` this does not usually
  41. /// require any extra attention. However, if work is done on a `DispatchQueue` or _other_
  42. /// `EventLoop` then implementers should ensure that they use `context` from the correct
  43. /// `EventLoop`.
  44. open class ServerInterceptor<Request, Response> {
  45. public init() {}
  46. /// Called when the interceptor has received a request part to handle.
  47. /// - Parameters:
  48. /// - part: The request part which has been received from the client.
  49. /// - context: An interceptor context which may be used to forward the response part.
  50. open func receive(
  51. _ part: GRPCServerRequestPart<Request>,
  52. context: ServerInterceptorContext<Request, Response>
  53. ) {
  54. context.receive(part)
  55. }
  56. /// Called when the interceptor has received a response part to handle.
  57. /// - Parameters:
  58. /// - part: The request part which should be sent to the client.
  59. /// - promise: A promise which should be completed when the response part has been written.
  60. /// - context: An interceptor context which may be used to forward the request part.
  61. open func send(
  62. _ part: GRPCServerResponsePart<Response>,
  63. promise: EventLoopPromise<Void>?,
  64. context: ServerInterceptorContext<Request, Response>
  65. ) {
  66. context.send(part, promise: promise)
  67. }
  68. }
  69. // MARK: Head/Tail
  70. /// An interceptor which offloads requests to the service provider and forwards any response parts
  71. /// to the rest of the pipeline.
  72. @usableFromInline
  73. internal struct TailServerInterceptor<Request, Response> {
  74. /// Called when a request part has been received.
  75. @usableFromInline
  76. internal let _onRequestPart: (GRPCServerRequestPart<Request>) -> Void
  77. @inlinable
  78. init(
  79. _ onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void
  80. ) {
  81. self._onRequestPart = onRequestPart
  82. }
  83. @inlinable
  84. internal func receive(
  85. _ part: GRPCServerRequestPart<Request>,
  86. context: ServerInterceptorContext<Request, Response>
  87. ) {
  88. self._onRequestPart(part)
  89. }
  90. @inlinable
  91. internal func send(
  92. _ part: GRPCServerResponsePart<Response>,
  93. promise: EventLoopPromise<Void>?,
  94. context: ServerInterceptorContext<Request, Response>
  95. ) {
  96. context.send(part, promise: promise)
  97. }
  98. }
  99. @usableFromInline
  100. internal struct HeadServerInterceptor<Request, Response> {
  101. /// The pipeline this interceptor belongs to.
  102. @usableFromInline
  103. internal let _pipeline: ServerInterceptorPipeline<Request, Response>
  104. /// Called when a response part has been received.
  105. @usableFromInline
  106. internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
  107. @inlinable
  108. internal init(
  109. for pipeline: ServerInterceptorPipeline<Request, Response>,
  110. _ onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
  111. ) {
  112. self._pipeline = pipeline
  113. self._onResponsePart = onResponsePart
  114. }
  115. @inlinable
  116. internal func receive(
  117. _ part: GRPCServerRequestPart<Request>,
  118. context: ServerInterceptorContext<Request, Response>
  119. ) {
  120. context.receive(part)
  121. }
  122. @inlinable
  123. internal func send(
  124. _ part: GRPCServerResponsePart<Response>,
  125. promise: EventLoopPromise<Void>?,
  126. context: ServerInterceptorContext<Request, Response>
  127. ) {
  128. // Close the pipeline on end.
  129. switch part {
  130. case .metadata, .message:
  131. ()
  132. case .end:
  133. self._pipeline.close()
  134. }
  135. self._onResponsePart(part, promise)
  136. }
  137. }
  138. // MARK: - Any Interceptor
  139. /// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
  140. @usableFromInline
  141. internal struct AnyServerInterceptor<Request, Response> {
  142. @usableFromInline
  143. internal enum Implementation {
  144. case head(HeadServerInterceptor<Request, Response>)
  145. case tail(TailServerInterceptor<Request, Response>)
  146. case base(ServerInterceptor<Request, Response>)
  147. }
  148. /// The underlying interceptor implementation.
  149. @usableFromInline
  150. internal let _implementation: Implementation
  151. @inlinable
  152. internal static func head(
  153. for pipeline: ServerInterceptorPipeline<Request, Response>,
  154. _ onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
  155. ) -> AnyServerInterceptor<Request, Response> {
  156. return .init(.head(.init(for: pipeline, onResponsePart)))
  157. }
  158. @inlinable
  159. internal static func tail(
  160. _ onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void
  161. ) -> AnyServerInterceptor<Request, Response> {
  162. return .init(.tail(.init(onRequestPart)))
  163. }
  164. /// A user provided interceptor.
  165. /// - Parameter interceptor: The interceptor to wrap.
  166. /// - Returns: An `AnyServerInterceptor` which wraps `interceptor`.
  167. @inlinable
  168. internal static func userProvided(
  169. _ interceptor: ServerInterceptor<Request, Response>
  170. ) -> AnyServerInterceptor<Request, Response> {
  171. return .init(.base(interceptor))
  172. }
  173. @inlinable
  174. internal init(_ implementation: Implementation) {
  175. self._implementation = implementation
  176. }
  177. @inlinable
  178. internal func receive(
  179. _ part: GRPCServerRequestPart<Request>,
  180. context: ServerInterceptorContext<Request, Response>
  181. ) {
  182. switch self._implementation {
  183. case let .head(interceptor):
  184. interceptor.receive(part, context: context)
  185. case let .tail(interceptor):
  186. interceptor.receive(part, context: context)
  187. case let .base(interceptor):
  188. interceptor.receive(part, context: context)
  189. }
  190. }
  191. @inlinable
  192. internal func send(
  193. _ part: GRPCServerResponsePart<Response>,
  194. promise: EventLoopPromise<Void>?,
  195. context: ServerInterceptorContext<Request, Response>
  196. ) {
  197. switch self._implementation {
  198. case let .head(interceptor):
  199. interceptor.send(part, promise: promise, context: context)
  200. case let .tail(interceptor):
  201. interceptor.send(part, promise: promise, context: context)
  202. case let .base(interceptor):
  203. interceptor.send(part, promise: promise, context: context)
  204. }
  205. }
  206. }