ServerInterceptorTests.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. @testable import GRPC
  19. import HelloWorldModel
  20. import NIO
  21. import NIOHTTP1
  22. import SwiftProtobuf
  23. import XCTest
  24. class ServerInterceptorTests: GRPCTestCase {
  25. private var channel: EmbeddedChannel!
  26. override func setUp() {
  27. super.setUp()
  28. self.channel = EmbeddedChannel()
  29. }
  30. private func makeRecorder() -> RecordingServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  31. return .init()
  32. }
  33. private func echoProvider(
  34. interceptedBy interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
  35. ) -> EchoProvider {
  36. return EchoProvider(interceptors: EchoInterceptorFactory(interceptor: interceptor))
  37. }
  38. private func makeHandlerContext(for path: String) -> CallHandlerContext {
  39. return CallHandlerContext(
  40. errorDelegate: nil,
  41. logger: self.serverLogger,
  42. encoding: .disabled,
  43. eventLoop: self.channel.eventLoop,
  44. path: path
  45. )
  46. }
  47. // This is only useful for the type inference.
  48. private func request(
  49. _ request: _GRPCServerRequestPart<Echo_EchoRequest>
  50. ) -> _GRPCServerRequestPart<Echo_EchoRequest> {
  51. return request
  52. }
  53. private func handleMethod(
  54. _ method: Substring,
  55. using provider: CallHandlerProvider
  56. ) -> GRPCCallHandler? {
  57. let path = "/\(provider.serviceName)/\(method)"
  58. let context = self.makeHandlerContext(for: path)
  59. return provider.handleMethod(method, callHandlerContext: context)
  60. }
  61. fileprivate typealias ResponsePart = _GRPCServerResponsePart<Echo_EchoResponse>
  62. func testPassThroughInterceptor() throws {
  63. let recorder = self.makeRecorder()
  64. let provider = self.echoProvider(interceptedBy: recorder)
  65. let handler = try assertNotNil(self.handleMethod("Get", using: provider))
  66. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  67. // Send requests.
  68. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  69. assertThat(
  70. try self.channel.writeInbound(self.request(.message(.with { $0.text = "" }))),
  71. .doesNotThrow()
  72. )
  73. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  74. // Expect responses.
  75. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  76. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.message()))
  77. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  78. // We expect 2 request parts: the provider responds before it sees end, that's fine.
  79. assertThat(recorder.requestParts, .hasCount(2))
  80. assertThat(recorder.requestParts[0], .is(.metadata()))
  81. assertThat(recorder.requestParts[1], .is(.message()))
  82. assertThat(recorder.responseParts, .hasCount(3))
  83. assertThat(recorder.responseParts[0], .is(.metadata()))
  84. assertThat(recorder.responseParts[1], .is(.message()))
  85. assertThat(recorder.responseParts[2], .is(.end(status: .is(.ok))))
  86. }
  87. func _testExtraRequestPartsAreIgnored(
  88. part: ExtraRequestPartEmitter.Part,
  89. callType: GRPCCallType
  90. ) throws {
  91. let interceptor = ExtraRequestPartEmitter(repeat: part, times: 3)
  92. let provider = self.echoProvider(interceptedBy: interceptor)
  93. let method: Substring
  94. switch callType {
  95. case .unary:
  96. method = "Get"
  97. case .clientStreaming:
  98. method = "Collect"
  99. case .serverStreaming:
  100. method = "Expand"
  101. case .bidirectionalStreaming:
  102. method = "Update"
  103. }
  104. let handler = try assertNotNil(self.handleMethod(method, using: provider))
  105. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  106. // Send the requests.
  107. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  108. assertThat(try self.channel.writeInbound(self.request(.message(.init()))), .doesNotThrow())
  109. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  110. // Expect the responses.
  111. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  112. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.message()))
  113. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  114. // No more response parts.
  115. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .is(.nil()))
  116. }
  117. func testExtraRequestMetadataIsIgnoredForUnary() throws {
  118. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .unary)
  119. }
  120. func testExtraRequestMessageIsIgnoredForUnary() throws {
  121. try self._testExtraRequestPartsAreIgnored(part: .message, callType: .unary)
  122. }
  123. func testExtraRequestEndIsIgnoredForUnary() throws {
  124. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .unary)
  125. }
  126. func testExtraRequestMetadataIsIgnoredForClientStreaming() throws {
  127. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .clientStreaming)
  128. }
  129. func testExtraRequestEndIsIgnoredForClientStreaming() throws {
  130. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .clientStreaming)
  131. }
  132. func testExtraRequestMetadataIsIgnoredForServerStreaming() throws {
  133. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .serverStreaming)
  134. }
  135. func testExtraRequestMessageIsIgnoredForServerStreaming() throws {
  136. try self._testExtraRequestPartsAreIgnored(part: .message, callType: .serverStreaming)
  137. }
  138. func testExtraRequestEndIsIgnoredForServerStreaming() throws {
  139. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .serverStreaming)
  140. }
  141. func testExtraRequestMetadataIsIgnoredForBidirectionalStreaming() throws {
  142. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .bidirectionalStreaming)
  143. }
  144. func testExtraRequestEndIsIgnoredForBidirectionalStreaming() throws {
  145. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .bidirectionalStreaming)
  146. }
  147. func testUnaryFromInterceptor() throws {
  148. let provider = EchoFromInterceptor()
  149. let handler = try assertNotNil(self.handleMethod("Get", using: provider))
  150. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  151. // Send the requests.
  152. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  153. assertThat(
  154. try self.channel.writeInbound(self.request(.message(.init(text: "foo")))),
  155. .doesNotThrow()
  156. )
  157. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  158. // Get the responses.
  159. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  160. assertThat(
  161. try self.channel.readOutbound(as: ResponsePart.self),
  162. .notNil(.message(.equalTo(.with { $0.text = "echo: foo" })))
  163. )
  164. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  165. }
  166. func testClientStreamingFromInterceptor() throws {
  167. let provider = EchoFromInterceptor()
  168. let handler = try assertNotNil(self.handleMethod("Collect", using: provider))
  169. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  170. // Send the requests.
  171. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  172. for text in ["a", "b", "c"] {
  173. let message = self.request(.message(.init(text: text)))
  174. assertThat(try self.channel.writeInbound(message), .doesNotThrow())
  175. }
  176. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  177. // Receive responses.
  178. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  179. assertThat(
  180. try self.channel.readOutbound(as: ResponsePart.self),
  181. .notNil(.message(.equalTo(.with { $0.text = "echo: a b c" })))
  182. )
  183. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  184. }
  185. func testServerStreamingFromInterceptor() throws {
  186. let provider = EchoFromInterceptor()
  187. let handler = try assertNotNil(self.handleMethod("Expand", using: provider))
  188. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  189. // Send the requests.
  190. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  191. assertThat(
  192. try self.channel.writeInbound(self.request(.message(.with { $0.text = "a b c" }))),
  193. .doesNotThrow()
  194. )
  195. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  196. // Receive responses.
  197. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  198. for text in ["a", "b", "c"] {
  199. let expected = Echo_EchoResponse(text: "echo: " + text)
  200. assertThat(
  201. try self.channel.readOutbound(as: ResponsePart.self),
  202. .notNil(.message(.equalTo(expected)))
  203. )
  204. }
  205. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  206. }
  207. func testBidirectionalStreamingFromInterceptor() throws {
  208. let provider = EchoFromInterceptor()
  209. let handler = try assertNotNil(self.handleMethod("Update", using: provider))
  210. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  211. // Send the requests.
  212. assertThat(try self.channel.writeInbound(self.request(.headers([:]))), .doesNotThrow())
  213. for text in ["a", "b", "c"] {
  214. assertThat(
  215. try self.channel.writeInbound(self.request(.message(.init(text: text)))),
  216. .doesNotThrow()
  217. )
  218. }
  219. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  220. // Receive responses.
  221. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.headers()))
  222. for text in ["a", "b", "c"] {
  223. let expected = Echo_EchoResponse(text: "echo: " + text)
  224. assertThat(
  225. try self.channel.readOutbound(as: ResponsePart.self),
  226. .notNil(.message(.equalTo(expected)))
  227. )
  228. }
  229. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  230. }
  231. }
  232. class EchoInterceptorFactory: Echo_EchoServerInterceptorFactoryProtocol {
  233. private let interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
  234. init(interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>) {
  235. self.interceptor = interceptor
  236. }
  237. func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  238. return [self.interceptor]
  239. }
  240. func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  241. return [self.interceptor]
  242. }
  243. func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  244. return [self.interceptor]
  245. }
  246. func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  247. return [self.interceptor]
  248. }
  249. }
  250. class ExtraRequestPartEmitter: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  251. enum Part {
  252. case metadata
  253. case message
  254. case end
  255. }
  256. private let part: Part
  257. private let count: Int
  258. init(repeat part: Part, times count: Int) {
  259. self.part = part
  260. self.count = count
  261. }
  262. override func receive(
  263. _ part: GRPCServerRequestPart<Echo_EchoRequest>,
  264. context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
  265. ) {
  266. let count: Int
  267. switch (self.part, part) {
  268. case (.metadata, .metadata),
  269. (.message, .message),
  270. (.end, .end):
  271. count = self.count
  272. default:
  273. count = 1
  274. }
  275. for _ in 0 ..< count {
  276. context.receive(part)
  277. }
  278. }
  279. }
  280. class EchoFromInterceptor: Echo_EchoProvider {
  281. var interceptors: Echo_EchoServerInterceptorFactoryProtocol? = Interceptors()
  282. func get(
  283. request: Echo_EchoRequest,
  284. context: StatusOnlyCallContext
  285. ) -> EventLoopFuture<Echo_EchoResponse> {
  286. XCTFail("Unexpected call to \(#function)")
  287. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  288. }
  289. func expand(
  290. request: Echo_EchoRequest,
  291. context: StreamingResponseCallContext<Echo_EchoResponse>
  292. ) -> EventLoopFuture<GRPCStatus> {
  293. XCTFail("Unexpected call to \(#function)")
  294. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  295. }
  296. func collect(
  297. context: UnaryResponseCallContext<Echo_EchoResponse>
  298. ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
  299. XCTFail("Unexpected call to \(#function)")
  300. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  301. }
  302. func update(
  303. context: StreamingResponseCallContext<Echo_EchoResponse>
  304. ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
  305. XCTFail("Unexpected call to \(#function)")
  306. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  307. }
  308. class Interceptors: Echo_EchoServerInterceptorFactoryProtocol {
  309. func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  310. return [Interceptor()]
  311. }
  312. func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  313. return [Interceptor()]
  314. }
  315. func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  316. return [Interceptor()]
  317. }
  318. func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  319. return [Interceptor()]
  320. }
  321. }
  322. // Since all methods use the same request/response types, we can use a single interceptor to
  323. // respond to all of them.
  324. class Interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  325. private var collectedRequests: [Echo_EchoRequest] = []
  326. override func receive(
  327. _ part: GRPCServerRequestPart<Echo_EchoRequest>,
  328. context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
  329. ) {
  330. switch part {
  331. case .metadata:
  332. context.send(.metadata([:]), promise: nil)
  333. case let .message(request):
  334. if context.path.hasSuffix("Get") {
  335. // Unary, just reply.
  336. let response = Echo_EchoResponse.with {
  337. $0.text = "echo: \(request.text)"
  338. }
  339. context.send(.message(response, .init(compress: false, flush: false)), promise: nil)
  340. } else if context.path.hasSuffix("Expand") {
  341. // Server streaming.
  342. let parts = request.text.split(separator: " ")
  343. let metadata = MessageMetadata(compress: false, flush: false)
  344. for part in parts {
  345. context.send(.message(.with { $0.text = "echo: \(part)" }, metadata), promise: nil)
  346. }
  347. } else if context.path.hasSuffix("Collect") {
  348. // Client streaming, store the requests, reply on '.end'
  349. self.collectedRequests.append(request)
  350. } else if context.path.hasSuffix("Update") {
  351. // Bidirectional streaming.
  352. let response = Echo_EchoResponse.with {
  353. $0.text = "echo: \(request.text)"
  354. }
  355. let metadata = MessageMetadata(compress: false, flush: true)
  356. context.send(.message(response, metadata), promise: nil)
  357. } else {
  358. XCTFail("Unexpected path '\(context.path)'")
  359. }
  360. case .end:
  361. if !self.collectedRequests.isEmpty {
  362. let response = Echo_EchoResponse.with {
  363. $0.text = "echo: " + self.collectedRequests.map { $0.text }.joined(separator: " ")
  364. }
  365. context.send(.message(response, .init(compress: false, flush: false)), promise: nil)
  366. }
  367. context.send(.end(.ok, [:]), promise: nil)
  368. }
  369. }
  370. }
  371. }
  372. // Avoid having to serialize/deserialize messages in test cases.
  373. private class Codec: ChannelDuplexHandler {
  374. typealias InboundIn = _GRPCServerRequestPart<Echo_EchoRequest>
  375. typealias InboundOut = _RawGRPCServerRequestPart
  376. typealias OutboundIn = _RawGRPCServerResponsePart
  377. typealias OutboundOut = _GRPCServerResponsePart<Echo_EchoResponse>
  378. private let serializer = ProtobufSerializer<Echo_EchoRequest>()
  379. private let deserializer = ProtobufDeserializer<Echo_EchoResponse>()
  380. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  381. switch self.unwrapInboundIn(data) {
  382. case let .headers(headers):
  383. context.fireChannelRead(self.wrapInboundOut(.headers(headers)))
  384. case let .message(message):
  385. let serialized = try! self.serializer.serialize(message, allocator: context.channel.allocator)
  386. context.fireChannelRead(self.wrapInboundOut(.message(serialized)))
  387. case .end:
  388. context.fireChannelRead(self.wrapInboundOut(.end))
  389. }
  390. }
  391. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  392. switch self.unwrapOutboundIn(data) {
  393. case let .headers(headers):
  394. context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
  395. case let .message(message):
  396. let deserialzed = try! self.deserializer.deserialize(byteBuffer: message.message)
  397. context.write(
  398. self.wrapOutboundOut(.message(.init(deserialzed, compressed: message.compressed))),
  399. promise: promise
  400. )
  401. case let .statusAndTrailers(status, trailers):
  402. context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
  403. }
  404. }
  405. }