ServerInterceptorTests.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. @testable import GRPC
  19. import HelloWorldModel
  20. import NIO
  21. import NIOHTTP1
  22. import SwiftProtobuf
  23. import XCTest
  24. class ServerInterceptorTests: GRPCTestCase {
  25. private var channel: EmbeddedChannel!
  26. override func setUp() {
  27. super.setUp()
  28. self.channel = EmbeddedChannel()
  29. }
  30. private func makeRecorder() -> RecordingServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  31. return .init()
  32. }
  33. private func echoProvider(
  34. interceptedBy interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
  35. ) -> EchoProvider {
  36. return EchoProvider(interceptors: EchoInterceptorFactory(interceptor: interceptor))
  37. }
  38. private func makeHandlerContext(for path: String) -> CallHandlerContext {
  39. return CallHandlerContext(
  40. errorDelegate: nil,
  41. logger: self.serverLogger,
  42. encoding: .disabled,
  43. eventLoop: self.channel.eventLoop,
  44. path: path,
  45. responseWriter: NoOpResponseWriter(),
  46. allocator: ByteBufferAllocator()
  47. )
  48. }
  49. // This is only useful for the type inference.
  50. private func request(
  51. _ request: GRPCServerRequestPart<Echo_EchoRequest>
  52. ) -> GRPCServerRequestPart<Echo_EchoRequest> {
  53. return request
  54. }
  55. private func handleMethod(
  56. _ method: Substring,
  57. using provider: CallHandlerProvider
  58. ) -> GRPCCallHandler? {
  59. let path = "/\(provider.serviceName)/\(method)"
  60. let context = self.makeHandlerContext(for: path)
  61. return provider.handleMethod(method, callHandlerContext: context)
  62. }
  63. fileprivate typealias ResponsePart = GRPCServerResponsePart<Echo_EchoResponse>
  64. func testPassThroughInterceptor() throws {
  65. let recorder = self.makeRecorder()
  66. let provider = self.echoProvider(interceptedBy: recorder)
  67. let handler = try assertNotNil(self.handleMethod("Get", using: provider))
  68. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  69. // Send requests.
  70. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  71. assertThat(
  72. try self.channel.writeInbound(self.request(.message(.with { $0.text = "" }))),
  73. .doesNotThrow()
  74. )
  75. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  76. // Expect responses.
  77. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  78. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.message()))
  79. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  80. // We expect 2 request parts: the provider responds before it sees end, that's fine.
  81. assertThat(recorder.requestParts, .hasCount(2))
  82. assertThat(recorder.requestParts[0], .is(.metadata()))
  83. assertThat(recorder.requestParts[1], .is(.message()))
  84. assertThat(recorder.responseParts, .hasCount(3))
  85. assertThat(recorder.responseParts[0], .is(.metadata()))
  86. assertThat(recorder.responseParts[1], .is(.message()))
  87. assertThat(recorder.responseParts[2], .is(.end(status: .is(.ok))))
  88. }
  89. func _testExtraRequestPartsAreIgnored(
  90. part: ExtraRequestPartEmitter.Part,
  91. callType: GRPCCallType
  92. ) throws {
  93. let interceptor = ExtraRequestPartEmitter(repeat: part, times: 3)
  94. let provider = self.echoProvider(interceptedBy: interceptor)
  95. let method: Substring
  96. switch callType {
  97. case .unary:
  98. method = "Get"
  99. case .clientStreaming:
  100. method = "Collect"
  101. case .serverStreaming:
  102. method = "Expand"
  103. case .bidirectionalStreaming:
  104. method = "Update"
  105. }
  106. let handler = try assertNotNil(self.handleMethod(method, using: provider))
  107. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  108. // Send the requests.
  109. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  110. assertThat(try self.channel.writeInbound(self.request(.message(.init()))), .doesNotThrow())
  111. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  112. // Expect the responses.
  113. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  114. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.message()))
  115. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  116. // No more response parts.
  117. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .is(.nil()))
  118. }
  119. func testExtraRequestMetadataIsIgnoredForUnary() throws {
  120. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .unary)
  121. }
  122. func testExtraRequestMessageIsIgnoredForUnary() throws {
  123. try self._testExtraRequestPartsAreIgnored(part: .message, callType: .unary)
  124. }
  125. func testExtraRequestEndIsIgnoredForUnary() throws {
  126. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .unary)
  127. }
  128. func testExtraRequestMetadataIsIgnoredForClientStreaming() throws {
  129. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .clientStreaming)
  130. }
  131. func testExtraRequestEndIsIgnoredForClientStreaming() throws {
  132. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .clientStreaming)
  133. }
  134. func testExtraRequestMetadataIsIgnoredForServerStreaming() throws {
  135. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .serverStreaming)
  136. }
  137. func testExtraRequestMessageIsIgnoredForServerStreaming() throws {
  138. try self._testExtraRequestPartsAreIgnored(part: .message, callType: .serverStreaming)
  139. }
  140. func testExtraRequestEndIsIgnoredForServerStreaming() throws {
  141. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .serverStreaming)
  142. }
  143. func testExtraRequestMetadataIsIgnoredForBidirectionalStreaming() throws {
  144. try self._testExtraRequestPartsAreIgnored(part: .metadata, callType: .bidirectionalStreaming)
  145. }
  146. func testExtraRequestEndIsIgnoredForBidirectionalStreaming() throws {
  147. try self._testExtraRequestPartsAreIgnored(part: .end, callType: .bidirectionalStreaming)
  148. }
  149. func testUnaryFromInterceptor() throws {
  150. let provider = EchoFromInterceptor()
  151. let handler = try assertNotNil(self.handleMethod("Get", using: provider))
  152. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  153. // Send the requests.
  154. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  155. assertThat(
  156. try self.channel.writeInbound(self.request(.message(.init(text: "foo")))),
  157. .doesNotThrow()
  158. )
  159. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  160. // Get the responses.
  161. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  162. assertThat(
  163. try self.channel.readOutbound(as: ResponsePart.self),
  164. .notNil(.message(.equalTo(.with { $0.text = "echo: foo" })))
  165. )
  166. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  167. }
  168. func testClientStreamingFromInterceptor() throws {
  169. let provider = EchoFromInterceptor()
  170. let handler = try assertNotNil(self.handleMethod("Collect", using: provider))
  171. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  172. // Send the requests.
  173. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  174. for text in ["a", "b", "c"] {
  175. let message = self.request(.message(.init(text: text)))
  176. assertThat(try self.channel.writeInbound(message), .doesNotThrow())
  177. }
  178. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  179. // Receive responses.
  180. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  181. assertThat(
  182. try self.channel.readOutbound(as: ResponsePart.self),
  183. .notNil(.message(.equalTo(.with { $0.text = "echo: a b c" })))
  184. )
  185. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  186. }
  187. func testServerStreamingFromInterceptor() throws {
  188. let provider = EchoFromInterceptor()
  189. let handler = try assertNotNil(self.handleMethod("Expand", using: provider))
  190. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  191. // Send the requests.
  192. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  193. assertThat(
  194. try self.channel.writeInbound(self.request(.message(.with { $0.text = "a b c" }))),
  195. .doesNotThrow()
  196. )
  197. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  198. // Receive responses.
  199. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  200. for text in ["a", "b", "c"] {
  201. let expected = Echo_EchoResponse(text: "echo: " + text)
  202. assertThat(
  203. try self.channel.readOutbound(as: ResponsePart.self),
  204. .notNil(.message(.equalTo(expected)))
  205. )
  206. }
  207. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  208. }
  209. func testBidirectionalStreamingFromInterceptor() throws {
  210. let provider = EchoFromInterceptor()
  211. let handler = try assertNotNil(self.handleMethod("Update", using: provider))
  212. assertThat(try self.channel.pipeline.addHandlers([Codec(), handler]).wait(), .doesNotThrow())
  213. // Send the requests.
  214. assertThat(try self.channel.writeInbound(self.request(.metadata([:]))), .doesNotThrow())
  215. for text in ["a", "b", "c"] {
  216. assertThat(
  217. try self.channel.writeInbound(self.request(.message(.init(text: text)))),
  218. .doesNotThrow()
  219. )
  220. }
  221. assertThat(try self.channel.writeInbound(self.request(.end)), .doesNotThrow())
  222. // Receive responses.
  223. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.metadata()))
  224. for text in ["a", "b", "c"] {
  225. let expected = Echo_EchoResponse(text: "echo: " + text)
  226. assertThat(
  227. try self.channel.readOutbound(as: ResponsePart.self),
  228. .notNil(.message(.equalTo(expected)))
  229. )
  230. }
  231. assertThat(try self.channel.readOutbound(as: ResponsePart.self), .notNil(.end()))
  232. }
  233. }
  234. class EchoInterceptorFactory: Echo_EchoServerInterceptorFactoryProtocol {
  235. private let interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
  236. init(interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>) {
  237. self.interceptor = interceptor
  238. }
  239. func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  240. return [self.interceptor]
  241. }
  242. func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  243. return [self.interceptor]
  244. }
  245. func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  246. return [self.interceptor]
  247. }
  248. func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  249. return [self.interceptor]
  250. }
  251. }
  252. class ExtraRequestPartEmitter: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  253. enum Part {
  254. case metadata
  255. case message
  256. case end
  257. }
  258. private let part: Part
  259. private let count: Int
  260. init(repeat part: Part, times count: Int) {
  261. self.part = part
  262. self.count = count
  263. }
  264. override func receive(
  265. _ part: GRPCServerRequestPart<Echo_EchoRequest>,
  266. context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
  267. ) {
  268. let count: Int
  269. switch (self.part, part) {
  270. case (.metadata, .metadata),
  271. (.message, .message),
  272. (.end, .end):
  273. count = self.count
  274. default:
  275. count = 1
  276. }
  277. for _ in 0 ..< count {
  278. context.receive(part)
  279. }
  280. }
  281. }
  282. class EchoFromInterceptor: Echo_EchoProvider {
  283. var interceptors: Echo_EchoServerInterceptorFactoryProtocol? = Interceptors()
  284. func get(
  285. request: Echo_EchoRequest,
  286. context: StatusOnlyCallContext
  287. ) -> EventLoopFuture<Echo_EchoResponse> {
  288. XCTFail("Unexpected call to \(#function)")
  289. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  290. }
  291. func expand(
  292. request: Echo_EchoRequest,
  293. context: StreamingResponseCallContext<Echo_EchoResponse>
  294. ) -> EventLoopFuture<GRPCStatus> {
  295. XCTFail("Unexpected call to \(#function)")
  296. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  297. }
  298. func collect(
  299. context: UnaryResponseCallContext<Echo_EchoResponse>
  300. ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
  301. XCTFail("Unexpected call to \(#function)")
  302. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  303. }
  304. func update(
  305. context: StreamingResponseCallContext<Echo_EchoResponse>
  306. ) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
  307. XCTFail("Unexpected call to \(#function)")
  308. return context.eventLoop.makeFailedFuture(GRPCStatus.processingError)
  309. }
  310. class Interceptors: Echo_EchoServerInterceptorFactoryProtocol {
  311. func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  312. return [Interceptor()]
  313. }
  314. func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  315. return [Interceptor()]
  316. }
  317. func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  318. return [Interceptor()]
  319. }
  320. func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
  321. return [Interceptor()]
  322. }
  323. }
  324. // Since all methods use the same request/response types, we can use a single interceptor to
  325. // respond to all of them.
  326. class Interceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
  327. private var collectedRequests: [Echo_EchoRequest] = []
  328. override func receive(
  329. _ part: GRPCServerRequestPart<Echo_EchoRequest>,
  330. context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
  331. ) {
  332. switch part {
  333. case .metadata:
  334. context.send(.metadata([:]), promise: nil)
  335. case let .message(request):
  336. if context.path.hasSuffix("Get") {
  337. // Unary, just reply.
  338. let response = Echo_EchoResponse.with {
  339. $0.text = "echo: \(request.text)"
  340. }
  341. context.send(.message(response, .init(compress: false, flush: false)), promise: nil)
  342. } else if context.path.hasSuffix("Expand") {
  343. // Server streaming.
  344. let parts = request.text.split(separator: " ")
  345. let metadata = MessageMetadata(compress: false, flush: false)
  346. for part in parts {
  347. context.send(.message(.with { $0.text = "echo: \(part)" }, metadata), promise: nil)
  348. }
  349. } else if context.path.hasSuffix("Collect") {
  350. // Client streaming, store the requests, reply on '.end'
  351. self.collectedRequests.append(request)
  352. } else if context.path.hasSuffix("Update") {
  353. // Bidirectional streaming.
  354. let response = Echo_EchoResponse.with {
  355. $0.text = "echo: \(request.text)"
  356. }
  357. let metadata = MessageMetadata(compress: false, flush: true)
  358. context.send(.message(response, metadata), promise: nil)
  359. } else {
  360. XCTFail("Unexpected path '\(context.path)'")
  361. }
  362. case .end:
  363. if !self.collectedRequests.isEmpty {
  364. let response = Echo_EchoResponse.with {
  365. $0.text = "echo: " + self.collectedRequests.map { $0.text }.joined(separator: " ")
  366. }
  367. context.send(.message(response, .init(compress: false, flush: false)), promise: nil)
  368. }
  369. context.send(.end(.ok, [:]), promise: nil)
  370. }
  371. }
  372. }
  373. }
  374. // Avoid having to serialize/deserialize messages in test cases.
  375. private class Codec: ChannelDuplexHandler {
  376. typealias InboundIn = GRPCServerRequestPart<Echo_EchoRequest>
  377. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  378. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  379. typealias OutboundOut = GRPCServerResponsePart<Echo_EchoResponse>
  380. private let serializer = ProtobufSerializer<Echo_EchoRequest>()
  381. private let deserializer = ProtobufDeserializer<Echo_EchoResponse>()
  382. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  383. switch self.unwrapInboundIn(data) {
  384. case let .metadata(headers):
  385. context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
  386. case let .message(message):
  387. let serialized = try! self.serializer.serialize(message, allocator: context.channel.allocator)
  388. context.fireChannelRead(self.wrapInboundOut(.message(serialized)))
  389. case .end:
  390. context.fireChannelRead(self.wrapInboundOut(.end))
  391. }
  392. }
  393. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  394. switch self.unwrapOutboundIn(data) {
  395. case let .metadata(headers):
  396. context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
  397. case let .message(message, metadata):
  398. let deserialzed = try! self.deserializer.deserialize(byteBuffer: message)
  399. context.write(self.wrapOutboundOut(.message(deserialzed, metadata)), promise: promise)
  400. case let .end(status, trailers):
  401. context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise)
  402. }
  403. }
  404. }