| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import NIOCore
- import NIOEmbedded
- import NIOHPACK
- import XCTest
- @testable import GRPC
- class ServerInterceptorPipelineTests: GRPCTestCase {
- override func setUp() {
- super.setUp()
- self.embeddedEventLoop = EmbeddedEventLoop()
- }
- private var embeddedEventLoop: EmbeddedEventLoop!
- private func makePipeline<Request, Response>(
- requests: Request.Type = Request.self,
- responses: Response.Type = Response.self,
- path: String = "/foo/bar",
- callType: GRPCCallType = .unary,
- interceptors: [ServerInterceptor<Request, Response>] = [],
- onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
- onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
- ) -> ServerInterceptorPipeline<Request, Response> {
- return ServerInterceptorPipeline(
- logger: self.logger,
- eventLoop: self.embeddedEventLoop,
- path: path,
- callType: callType,
- remoteAddress: nil,
- userInfoRef: Ref(UserInfo()),
- closeFuture: self.embeddedEventLoop.makeSucceededVoidFuture(),
- interceptors: interceptors,
- onRequestPart: onRequestPart,
- onResponsePart: onResponsePart
- )
- }
- func testEmptyPipeline() {
- var requestParts: [GRPCServerRequestPart<String>] = []
- var responseParts: [GRPCServerResponsePart<String>] = []
- let pipeline = self.makePipeline(
- requests: String.self,
- responses: String.self,
- onRequestPart: { requestParts.append($0) },
- onResponsePart: { part, promise in
- responseParts.append(part)
- assertThat(promise, .is(.none()))
- }
- )
- pipeline.receive(.metadata([:]))
- pipeline.receive(.message("foo"))
- pipeline.receive(.end)
- assertThat(requestParts, .hasCount(3))
- assertThat(requestParts[0].metadata, .is([:]))
- assertThat(requestParts[1].message, .is("foo"))
- assertThat(requestParts[2].isEnd, .is(true))
- pipeline.send(.metadata([:]), promise: nil)
- pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
- pipeline.send(.end(.ok, [:]), promise: nil)
- assertThat(responseParts, .hasCount(3))
- assertThat(responseParts[0].metadata, .is([:]))
- assertThat(responseParts[1].message, .is("bar"))
- assertThat(responseParts[2].end, .is(.some()))
- // Pipelines should now be closed. We can't send or receive.
- let p = self.embeddedEventLoop.makePromise(of: Void.self)
- pipeline.send(.metadata([:]), promise: p)
- assertThat(try p.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
- responseParts.removeAll()
- pipeline.receive(.end)
- assertThat(responseParts, .isEmpty())
- }
- func testRecordingPipeline() {
- let recorder = RecordingServerInterceptor<String, String>()
- let pipeline = self.makePipeline(
- interceptors: [recorder],
- onRequestPart: { _ in },
- onResponsePart: { _, _ in }
- )
- pipeline.receive(.metadata([:]))
- pipeline.receive(.message("foo"))
- pipeline.receive(.end)
- pipeline.send(.metadata([:]), promise: nil)
- pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
- pipeline.send(.end(.ok, [:]), promise: nil)
- // Check the request parts are there.
- assertThat(recorder.requestParts, .hasCount(3))
- assertThat(recorder.requestParts[0].metadata, .is(.some()))
- assertThat(recorder.requestParts[1].message, .is(.some()))
- assertThat(recorder.requestParts[2].isEnd, .is(true))
- // Check the response parts are there.
- assertThat(recorder.responseParts, .hasCount(3))
- assertThat(recorder.responseParts[0].metadata, .is(.some()))
- assertThat(recorder.responseParts[1].message, .is(.some()))
- assertThat(recorder.responseParts[2].end, .is(.some()))
- }
- }
- internal class RecordingServerInterceptor<Request, Response>:
- ServerInterceptor<Request, Response>
- {
- var requestParts: [GRPCServerRequestPart<Request>] = []
- var responseParts: [GRPCServerResponsePart<Response>] = []
- override func receive(
- _ part: GRPCServerRequestPart<Request>,
- context: ServerInterceptorContext<Request, Response>
- ) {
- self.requestParts.append(part)
- context.receive(part)
- }
- override func send(
- _ part: GRPCServerResponsePart<Response>,
- promise: EventLoopPromise<Void>?,
- context: ServerInterceptorContext<Request, Response>
- ) {
- self.responseParts.append(part)
- context.send(part, promise: promise)
- }
- }
- extension GRPCServerRequestPart {
- var metadata: HPACKHeaders? {
- switch self {
- case let .metadata(metadata):
- return metadata
- default:
- return nil
- }
- }
- var message: Request? {
- switch self {
- case let .message(message):
- return message
- default:
- return nil
- }
- }
- var isEnd: Bool {
- switch self {
- case .end:
- return true
- default:
- return false
- }
- }
- }
- extension GRPCServerResponsePart {
- var metadata: HPACKHeaders? {
- switch self {
- case let .metadata(metadata):
- return metadata
- default:
- return nil
- }
- }
- var message: Response? {
- switch self {
- case let .message(message, _):
- return message
- default:
- return nil
- }
- }
- var end: (GRPCStatus, HPACKHeaders)? {
- switch self {
- case let .end(status, trailers):
- return (status, trailers)
- default:
- return nil
- }
- }
- }
|