| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- @testable import GRPC
- import NIO
- import NIOHPACK
- import XCTest
- // MARK: - Utils
- final class ResponseRecorder: GRPCServerResponseWriter {
- var metadata: HPACKHeaders?
- var messages: [ByteBuffer] = []
- var messageMetadata: [MessageMetadata] = []
- var status: GRPCStatus?
- var trailers: HPACKHeaders?
- func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
- XCTAssertNil(self.metadata)
- self.metadata = metadata
- promise?.succeed(())
- }
- func sendMessage(
- _ bytes: ByteBuffer,
- metadata: MessageMetadata,
- promise: EventLoopPromise<Void>?
- ) {
- self.messages.append(bytes)
- self.messageMetadata.append(metadata)
- promise?.succeed(())
- }
- func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
- XCTAssertNil(self.status)
- XCTAssertNil(self.trailers)
- self.status = status
- self.trailers = trailers
- promise?.succeed(())
- }
- }
- protocol ServerHandlerTestCase: GRPCTestCase {
- var eventLoop: EmbeddedEventLoop { get }
- var allocator: ByteBufferAllocator { get }
- var recorder: ResponseRecorder { get }
- }
- extension ServerHandlerTestCase {
- func makeCallHandlerContext(encoding: ServerMessageEncoding = .disabled) -> CallHandlerContext {
- return CallHandlerContext(
- errorDelegate: nil,
- logger: self.logger,
- encoding: encoding,
- eventLoop: self.eventLoop,
- path: "/ignored",
- remoteAddress: nil,
- responseWriter: self.recorder,
- allocator: self.allocator,
- closeFuture: self.eventLoop.makeSucceededVoidFuture()
- )
- }
- }
- // MARK: - Unary
- class UnaryServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
- let eventLoop = EmbeddedEventLoop()
- let allocator = ByteBufferAllocator()
- let recorder = ResponseRecorder()
- private func makeHandler(
- encoding: ServerMessageEncoding = .disabled,
- function: @escaping (String, StatusOnlyCallContext) -> EventLoopFuture<String>
- ) -> UnaryServerHandler<StringSerializer, StringDeserializer> {
- return UnaryServerHandler(
- context: self.makeCallHandlerContext(encoding: encoding),
- requestDeserializer: StringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- userFunction: function
- )
- }
- private func echo(_ request: String, context: StatusOnlyCallContext) -> EventLoopFuture<String> {
- return context.eventLoop.makeSucceededFuture(request)
- }
- private func neverComplete(
- _ request: String,
- context: StatusOnlyCallContext
- ) -> EventLoopFuture<String> {
- let scheduled = context.eventLoop.scheduleTask(deadline: .distantFuture) {
- return request
- }
- return scheduled.futureResult
- }
- private func neverCalled(
- _ request: String,
- context: StatusOnlyCallContext
- ) -> EventLoopFuture<String> {
- XCTFail("Unexpected function invocation")
- return context.eventLoop.makeFailedFuture(GRPCError.InvalidState(""))
- }
- func testHappyPath() {
- let handler = self.makeHandler(function: self.echo(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- handler.finish()
- assertThat(self.recorder.messages.first, .is(buffer))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testHappyPathWithCompressionEnabled() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
- function: self.echo(_:context:)
- )
- handler.receiveMetadata([:])
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages.first, .is(buffer))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
- }
- func testHappyPathWithCompressionEnabledButDisabledByCaller() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
- ) { request, context in
- context.compressionEnabled = false
- return self.echo(request, context: context)
- }
- handler.receiveMetadata([:])
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages.first, .is(buffer))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
- }
- func testThrowingDeserializer() {
- let handler = UnaryServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: ThrowingStringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- userFunction: self.neverCalled(_:context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testThrowingSerializer() {
- let handler = UnaryServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: StringDeserializer(),
- responseSerializer: ThrowingStringSerializer(),
- interceptors: [],
- userFunction: self.echo(_:context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testUserFunctionReturnsFailedFuture() {
- let handler = self.makeHandler { _, context in
- return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
- }
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.status?.message, .is(":("))
- }
- func testReceiveMessageBeforeHeaders() {
- let handler = self.makeHandler(function: self.neverCalled(_:context:))
- handler.receiveMessage(ByteBuffer(string: "foo"))
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleHeaders() {
- let handler = self.makeHandler(function: self.neverCalled(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleMessages() {
- let handler = self.makeHandler(function: self.neverComplete(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- // Send another message before the function completes.
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testFinishBeforeStarting() {
- let handler = self.makeHandler(function: self.neverCalled(_:context:))
- handler.finish()
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .is(.nil()))
- assertThat(self.recorder.trailers, .is(.nil()))
- }
- func testFinishAfterHeaders() {
- let handler = self.makeHandler(function: self.neverCalled(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testFinishAfterMessage() {
- let handler = self.makeHandler(function: self.neverComplete(_:context:))
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "hello"))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- }
- // MARK: - Client Streaming
- class ClientStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
- let eventLoop = EmbeddedEventLoop()
- let allocator = ByteBufferAllocator()
- let recorder = ResponseRecorder()
- private func makeHandler(
- encoding: ServerMessageEncoding = .disabled,
- observerFactory: @escaping (UnaryResponseCallContext<String>)
- -> EventLoopFuture<(StreamEvent<String>) -> Void>
- ) -> ClientStreamingServerHandler<StringSerializer, StringDeserializer> {
- return ClientStreamingServerHandler(
- context: self.makeCallHandlerContext(encoding: encoding),
- requestDeserializer: StringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- observerFactory: observerFactory
- )
- }
- private func joinWithSpaces(
- context: UnaryResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- var messages: [String] = []
- func onEvent(_ event: StreamEvent<String>) {
- switch event {
- case let .message(message):
- messages.append(message)
- case .end:
- context.responsePromise.succeed(messages.joined(separator: " "))
- }
- }
- return context.eventLoop.makeSucceededFuture(onEvent(_:))
- }
- private func neverReceivesMessage(
- context: UnaryResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- func onEvent(_ event: StreamEvent<String>) {
- switch event {
- case let .message(message):
- XCTFail("Unexpected message: '\(message)'")
- case .end:
- context.responsePromise.succeed("")
- }
- }
- return context.eventLoop.makeSucceededFuture(onEvent(_:))
- }
- private func neverCalled(
- context: UnaryResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- XCTFail("This observer factory should never be called")
- return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
- }
- func testHappyPath() {
- let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- handler.finish()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testHappyPathWithCompressionEnabled() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
- observerFactory: self.joinWithSpaces(context:)
- )
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
- }
- func testHappyPathWithCompressionEnabledButDisabledByCaller() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
- ) { context in
- context.compressionEnabled = false
- return self.joinWithSpaces(context: context)
- }
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
- }
- func testThrowingDeserializer() {
- let handler = ClientStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: ThrowingStringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- observerFactory: self.neverReceivesMessage(context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testThrowingSerializer() {
- let handler = ClientStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: StringDeserializer(),
- responseSerializer: ThrowingStringSerializer(),
- interceptors: [],
- observerFactory: self.joinWithSpaces(context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testObserverFactoryReturnsFailedFuture() {
- let handler = self.makeHandler { context in
- context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
- }
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.status?.message, .is(":("))
- }
- func testDelayedObserverFactory() {
- let promise = self.eventLoop.makePromise(of: Void.self)
- let handler = self.makeHandler { context in
- return promise.futureResult.flatMap {
- self.joinWithSpaces(context: context)
- }
- }
- handler.receiveMetadata([:])
- // Queue up some messages.
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- // Succeed the observer block.
- promise.succeed(())
- // A few more messages.
- handler.receiveMessage(ByteBuffer(string: "4"))
- handler.receiveMessage(ByteBuffer(string: "5"))
- handler.receiveEnd()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3 4 5")))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- }
- func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
- let promise = self.eventLoop.makePromise(of: Void.self)
- let handler = self.makeHandler { context in
- return promise.futureResult.flatMap {
- self.joinWithSpaces(context: context)
- }
- }
- handler.receiveMetadata([:])
- // Queue up some messages.
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- // Succeed the observer block.
- promise.succeed(())
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- }
- func testReceiveMessageBeforeHeaders() {
- let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
- handler.receiveMessage(ByteBuffer(string: "foo"))
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleHeaders() {
- let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testFinishBeforeStarting() {
- let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
- handler.finish()
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .is(.nil()))
- assertThat(self.recorder.trailers, .is(.nil()))
- }
- func testFinishAfterHeaders() {
- let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testFinishAfterMessage() {
- let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "hello"))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- }
- class ServerStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
- let eventLoop = EmbeddedEventLoop()
- let allocator = ByteBufferAllocator()
- let recorder = ResponseRecorder()
- private func makeHandler(
- encoding: ServerMessageEncoding = .disabled,
- userFunction: @escaping (String, StreamingResponseCallContext<String>)
- -> EventLoopFuture<GRPCStatus>
- ) -> ServerStreamingServerHandler<StringSerializer, StringDeserializer> {
- return ServerStreamingServerHandler(
- context: self.makeCallHandlerContext(encoding: encoding),
- requestDeserializer: StringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- userFunction: userFunction
- )
- }
- private func breakOnSpaces(
- _ request: String,
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<GRPCStatus> {
- let parts = request.components(separatedBy: " ")
- context.sendResponses(parts, promise: nil)
- return context.eventLoop.makeSucceededFuture(.ok)
- }
- private func neverCalled(
- _ request: String,
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<GRPCStatus> {
- XCTFail("Unexpected invocation")
- return context.eventLoop.makeSucceededFuture(.processingError)
- }
- private func neverComplete(
- _ request: String,
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<GRPCStatus> {
- return context.eventLoop.scheduleTask(deadline: .distantFuture) {
- return .processingError
- }.futureResult
- }
- func testHappyPath() {
- let handler = self.makeHandler(userFunction: self.breakOnSpaces(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMessage(ByteBuffer(string: "a b"))
- handler.receiveEnd()
- handler.finish()
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "a"), ByteBuffer(string: "b")])
- )
- assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false]))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testHappyPathWithCompressionEnabled() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
- userFunction: self.breakOnSpaces(_:context:)
- )
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "a"))
- handler.receiveEnd()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
- }
- func testHappyPathWithCompressionEnabledButDisabledByCaller() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
- ) { request, context in
- context.compressionEnabled = false
- return self.breakOnSpaces(request, context: context)
- }
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "a"))
- handler.receiveEnd()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
- assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
- }
- func testThrowingDeserializer() {
- let handler = ServerStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: ThrowingStringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- userFunction: self.neverCalled(_:context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testThrowingSerializer() {
- let handler = ServerStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: StringDeserializer(),
- responseSerializer: ThrowingStringSerializer(),
- interceptors: [],
- userFunction: self.breakOnSpaces(_:context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "1 2 3")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testUserFunctionReturnsFailedFuture() {
- let handler = self.makeHandler { _, context in
- return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
- }
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.status?.message, .is(":("))
- }
- func testReceiveMessageBeforeHeaders() {
- let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
- handler.receiveMessage(ByteBuffer(string: "foo"))
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleHeaders() {
- let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleMessages() {
- let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- // Send another message before the function completes.
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testFinishBeforeStarting() {
- let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
- handler.finish()
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .is(.nil()))
- assertThat(self.recorder.trailers, .is(.nil()))
- }
- func testFinishAfterHeaders() {
- let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testFinishAfterMessage() {
- let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "hello"))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- }
- // MARK: - Bidirectional Streaming
- class BidirectionalStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
- let eventLoop = EmbeddedEventLoop()
- let allocator = ByteBufferAllocator()
- let recorder = ResponseRecorder()
- private func makeHandler(
- encoding: ServerMessageEncoding = .disabled,
- observerFactory: @escaping (StreamingResponseCallContext<String>)
- -> EventLoopFuture<(StreamEvent<String>) -> Void>
- ) -> BidirectionalStreamingServerHandler<StringSerializer, StringDeserializer> {
- return BidirectionalStreamingServerHandler(
- context: self.makeCallHandlerContext(encoding: encoding),
- requestDeserializer: StringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- observerFactory: observerFactory
- )
- }
- private func echo(
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- func onEvent(_ event: StreamEvent<String>) {
- switch event {
- case let .message(message):
- context.sendResponse(message, promise: nil)
- case .end:
- context.statusPromise.succeed(.ok)
- }
- }
- return context.eventLoop.makeSucceededFuture(onEvent(_:))
- }
- private func neverReceivesMessage(
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- func onEvent(_ event: StreamEvent<String>) {
- switch event {
- case let .message(message):
- XCTFail("Unexpected message: '\(message)'")
- case .end:
- context.statusPromise.succeed(.ok)
- }
- }
- return context.eventLoop.makeSucceededFuture(onEvent(_:))
- }
- private func neverCalled(
- context: StreamingResponseCallContext<String>
- ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
- XCTFail("This observer factory should never be called")
- return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
- }
- func testHappyPath() {
- let handler = self.makeHandler(observerFactory: self.echo(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- handler.finish()
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
- )
- assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testHappyPathWithCompressionEnabled() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
- observerFactory: self.echo(context:)
- )
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
- )
- assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([true, true, true]))
- }
- func testHappyPathWithCompressionEnabledButDisabledByCaller() {
- let handler = self.makeHandler(
- encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
- ) { context in
- context.compressionEnabled = false
- return self.echo(context: context)
- }
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveMessage(ByteBuffer(string: "3"))
- handler.receiveEnd()
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
- )
- assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
- }
- func testThrowingDeserializer() {
- let handler = BidirectionalStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: ThrowingStringDeserializer(),
- responseSerializer: StringSerializer(),
- interceptors: [],
- observerFactory: self.neverReceivesMessage(context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testThrowingSerializer() {
- let handler = BidirectionalStreamingServerHandler(
- context: self.makeCallHandlerContext(),
- requestDeserializer: StringDeserializer(),
- responseSerializer: ThrowingStringSerializer(),
- interceptors: [],
- observerFactory: self.echo(context:)
- )
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- let buffer = ByteBuffer(string: "hello")
- handler.receiveMessage(buffer)
- handler.receiveEnd()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testObserverFactoryReturnsFailedFuture() {
- let handler = self.makeHandler { context in
- context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
- }
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.status?.message, .is(":("))
- }
- func testDelayedObserverFactory() {
- let promise = self.eventLoop.makePromise(of: Void.self)
- let handler = self.makeHandler { context in
- return promise.futureResult.flatMap {
- self.echo(context: context)
- }
- }
- handler.receiveMetadata([:])
- // Queue up some messages.
- handler.receiveMessage(ByteBuffer(string: "1"))
- // Succeed the observer block.
- promise.succeed(())
- // A few more messages.
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveEnd()
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
- )
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- }
- func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
- let promise = self.eventLoop.makePromise(of: Void.self)
- let handler = self.makeHandler { context in
- return promise.futureResult.flatMap {
- self.echo(context: context)
- }
- }
- handler.receiveMetadata([:])
- // Queue up some messages.
- handler.receiveMessage(ByteBuffer(string: "1"))
- handler.receiveMessage(ByteBuffer(string: "2"))
- handler.receiveEnd()
- // Succeed the observer block.
- promise.succeed(())
- assertThat(
- self.recorder.messages,
- .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
- )
- assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
- }
- func testReceiveMessageBeforeHeaders() {
- let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
- handler.receiveMessage(ByteBuffer(string: "foo"))
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testReceiveMultipleHeaders() {
- let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.receiveMetadata([:])
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
- }
- func testFinishBeforeStarting() {
- let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
- handler.finish()
- assertThat(self.recorder.metadata, .is(.nil()))
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .is(.nil()))
- assertThat(self.recorder.trailers, .is(.nil()))
- }
- func testFinishAfterHeaders() {
- let handler = self.makeHandler(observerFactory: self.echo(context:))
- handler.receiveMetadata([:])
- assertThat(self.recorder.metadata, .is([:]))
- handler.finish()
- assertThat(self.recorder.messages, .isEmpty())
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- func testFinishAfterMessage() {
- let handler = self.makeHandler(observerFactory: self.echo(context:))
- handler.receiveMetadata([:])
- handler.receiveMessage(ByteBuffer(string: "hello"))
- handler.finish()
- assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "hello")))
- assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
- assertThat(self.recorder.trailers, .is([:]))
- }
- }
|