| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import EchoImplementation
- @testable import GRPC
- import NIOCore
- import NIOEmbedded
- import NIOHPACK
- import NIOHTTP2
- import NIOPosix
- import XCTest
- class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
- typealias StateMachine = HTTP2ToRawGRPCStateMachine
- typealias State = StateMachine.State
- // An event loop gets passed to any service handler that's created, we don't actually use it here.
- private var eventLoop: EventLoop {
- return EmbeddedEventLoop()
- }
- /// An allocator, just here for convenience.
- private let allocator = ByteBufferAllocator()
- private func makeHeaders(
- path: String = "/echo.Echo/Get",
- contentType: String?,
- encoding: String? = nil,
- acceptEncoding: [String]? = nil
- ) -> HPACKHeaders {
- var headers = HPACKHeaders()
- headers.add(name: ":path", value: path)
- if let contentType = contentType {
- headers.add(name: GRPCHeaderName.contentType, value: contentType)
- }
- if let encoding = encoding {
- headers.add(name: GRPCHeaderName.encoding, value: encoding)
- }
- if let acceptEncoding = acceptEncoding {
- headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
- }
- return headers
- }
- private func makeHeaders(
- path: String = "/echo.Echo/Get",
- contentType: ContentType? = .protobuf,
- encoding: CompressionAlgorithm? = nil,
- acceptEncoding: [CompressionAlgorithm]? = nil
- ) -> HPACKHeaders {
- return self.makeHeaders(
- path: path,
- contentType: contentType?.canonicalValue,
- encoding: encoding?.name,
- acceptEncoding: acceptEncoding?.map { $0.name }
- )
- }
- /// A minimum set of viable request headers for the service providers we register by default.
- private var viableHeaders: HPACKHeaders {
- return self.makeHeaders(
- path: "/echo.Echo/Get",
- contentType: "application/grpc"
- )
- }
- /// Just the echo service.
- private var services: [Substring: CallHandlerProvider] {
- let provider = EchoProvider()
- return [provider.serviceName: provider]
- }
- private enum DesiredState {
- case requestOpenResponseIdle(pipelineConfigured: Bool)
- case requestOpenResponseOpen
- case requestClosedResponseIdle(pipelineConfigured: Bool)
- case requestClosedResponseOpen
- }
- /// Makes a state machine in the desired state.
- private func makeStateMachine(
- services: [Substring: CallHandlerProvider]? = nil,
- encoding: ServerMessageEncoding = .disabled,
- state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
- ) -> StateMachine {
- var machine = StateMachine()
- let receiveHeadersAction = machine.receive(
- headers: self.viableHeaders,
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: services ?? self.services,
- encoding: encoding,
- normalizeHeaders: true
- )
- assertThat(receiveHeadersAction, .is(.configure()))
- switch state {
- case .requestOpenResponseIdle(pipelineConfigured: false):
- ()
- case .requestOpenResponseIdle(pipelineConfigured: true):
- let configuredAction = machine.pipelineConfigured()
- assertThat(configuredAction, .is(.forwardHeaders()))
- case .requestOpenResponseOpen:
- let configuredAction = machine.pipelineConfigured()
- assertThat(configuredAction, .is(.forwardHeaders()))
- let sendHeadersAction = machine.send(headers: [:])
- assertThat(sendHeadersAction, .is(.success()))
- case .requestClosedResponseIdle(pipelineConfigured: false):
- var emptyBuffer = ByteBuffer()
- let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(receiveEnd, .is(.nothing))
- case .requestClosedResponseIdle(pipelineConfigured: true):
- let configuredAction = machine.pipelineConfigured()
- assertThat(configuredAction, .is(.forwardHeaders()))
- var emptyBuffer = ByteBuffer()
- let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(receiveEnd, .is(.tryReading))
- case .requestClosedResponseOpen:
- let configuredAction = machine.pipelineConfigured()
- assertThat(configuredAction, .is(.forwardHeaders()))
- var emptyBuffer = ByteBuffer()
- let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(receiveEndAction, .is(.tryReading))
- let readAction = machine.readNextRequest()
- assertThat(readAction, .is(.forwardEnd()))
- let sendHeadersAction = machine.send(headers: [:])
- assertThat(sendHeadersAction, .is(.success()))
- }
- return machine
- }
- /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
- /// message bytes (UInt8 ⨉ message length).
- private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
- var buffer = ByteBuffer()
- buffer.reserveCapacity(count + 5)
- buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
- buffer.writeInteger(UInt32(count))
- buffer.writeRepeatingByte(0, count: count)
- return buffer
- }
- // MARK: Receive Headers Tests
- func testReceiveValidHeaders() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.viableHeaders,
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.configure()))
- }
- func testReceiveInvalidContentType() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(contentType: "application/json"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
- }
- func testReceiveValidHeadersForUnknownService() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(path: "/foo.Foo/Get"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
- }
- func testReceiveValidHeadersForUnknownMethod() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(path: "/echo.Echo/Foo"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
- }
- func testReceiveValidHeadersForInvalidPath() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(path: "nope"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
- }
- func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(encoding: .gzip),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
- }
- func testReceiveHeadersWithMultipleEncodings() {
- var machine = StateMachine()
- // We can't have multiple encodings.
- let action = machine.receive(
- headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
- }
- func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .enabled(.deflate, .identity),
- normalizeHeaders: false
- )
- assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
- assertThat(
- action,
- .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
- )
- }
- func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
- var machine = StateMachine()
- // We didn't advertise gzip, but we do support it.
- let action = machine.receive(
- headers: self.makeHeaders(encoding: .gzip),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .enabled(.deflate, .identity),
- normalizeHeaders: false
- )
- // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
- // metadata. Send back headers to test this.
- assertThat(action, .is(.configure()))
- let sendAction = machine.send(headers: [:])
- assertThat(sendAction, .success(.contains(
- "grpc-accept-encoding",
- ["deflate", "identity", "gzip"]
- )))
- }
- func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
- var machine = StateMachine()
- // Identity is always supported, even if compression is disabled.
- let action = machine.receive(
- headers: self.makeHeaders(encoding: .identity),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .disabled,
- normalizeHeaders: false
- )
- assertThat(action, .is(.configure()))
- }
- func testReceiveHeadersNegotiatesResponseEncoding() {
- var machine = StateMachine()
- let action = machine.receive(
- headers: self.makeHeaders(acceptEncoding: [.deflate]),
- eventLoop: self.eventLoop,
- errorDelegate: nil,
- remoteAddress: nil,
- logger: self.logger,
- allocator: ByteBufferAllocator(),
- responseWriter: NoOpResponseWriter(),
- closeFuture: self.eventLoop.makeSucceededVoidFuture(),
- services: self.services,
- encoding: .enabled(.gzip, .deflate),
- normalizeHeaders: false
- )
- // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
- assertThat(action, .is(.configure()))
- let sendAction = machine.send(headers: [:])
- assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
- }
- // MARK: Receive Data Tests
- func testReceiveDataBeforePipelineIsConfigured() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
- let buffer = self.makeLengthPrefixedBytes(1024)
- // Receive a request. The pipeline isn't configured so no action.
- var buffer1 = buffer
- let action1 = machine.receive(buffer: &buffer1, endStream: false)
- assertThat(action1, .is(.nothing))
- // Receive another request, still not configured so no action.
- var buffer2 = buffer
- let action2 = machine.receive(buffer: &buffer2, endStream: false)
- assertThat(action2, .is(.nothing))
- // Configure the pipeline. We'll have headers to forward and messages to read.
- let action3 = machine.pipelineConfigured()
- assertThat(action3, .is(.forwardHeadersThenRead()))
- // Do the first read.
- let action4 = machine.readNextRequest()
- assertThat(action4, .is(.forwardMessageThenRead()))
- // Do the second and final read.
- let action5 = machine.readNextRequest()
- assertThat(action5, .is(.forwardMessage()))
- // Receive an empty buffer with end stream. Since we're configured we'll always try to read
- // after receiving.
- var emptyBuffer = ByteBuffer()
- let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(action6, .is(.tryReading))
- // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
- let action7 = machine.readNextRequest()
- assertThat(action7, .is(.forwardEnd()))
- }
- func testReceiveDataWhenPipelineIsConfigured() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- let buffer = self.makeLengthPrefixedBytes(1024)
- // Receive a request. The pipeline is configured, so we should try reading.
- var buffer1 = buffer
- let action1 = machine.receive(buffer: &buffer1, endStream: false)
- assertThat(action1, .is(.tryReading))
- // Read the message, consuming all bytes.
- let action2 = machine.readNextRequest()
- assertThat(action2, .is(.forwardMessage()))
- // Receive another request, we'll split buffer into two parts.
- var buffer3 = buffer
- var buffer2 = buffer3.readSlice(length: 20)!
- // Not enough bytes to form a message, so read won't result in anything.
- let action4 = machine.receive(buffer: &buffer2, endStream: false)
- assertThat(action4, .is(.tryReading))
- let action5 = machine.readNextRequest()
- assertThat(action5, .is(.none()))
- // Now the rest of the message.
- let action6 = machine.receive(buffer: &buffer3, endStream: false)
- assertThat(action6, .is(.tryReading))
- let action7 = machine.readNextRequest()
- assertThat(action7, .is(.forwardMessage()))
- // Receive an empty buffer with end stream. Since we're configured we'll always try to read
- // after receiving.
- var emptyBuffer = ByteBuffer()
- let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(action8, .is(.tryReading))
- // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
- let action9 = machine.readNextRequest()
- assertThat(action9, .is(.forwardEnd()))
- }
- func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
- let buffer = self.makeLengthPrefixedBytes(1024)
- // No action: the pipeline isn't configured.
- var buffer1 = buffer
- let action1 = machine.receive(buffer: &buffer1, endStream: false)
- assertThat(action1, .is(.nothing))
- // Still no action.
- var buffer2 = buffer
- let action2 = machine.receive(buffer: &buffer2, endStream: true)
- assertThat(action2, .is(.nothing))
- // Configure the pipeline. We have headers to forward and messages to read.
- let action3 = machine.pipelineConfigured()
- assertThat(action3, .is(.forwardHeadersThenRead()))
- // Read the first message.
- let action4 = machine.readNextRequest()
- assertThat(action4, .is(.forwardMessageThenRead()))
- // Read the second and final message.
- let action5 = machine.readNextRequest()
- assertThat(action5, .is(.forwardMessageThenRead()))
- let action6 = machine.readNextRequest()
- assertThat(action6, .is(.forwardEnd()))
- }
- func testReceiveDataAfterPipelineIsConfigured() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- let buffer = self.makeLengthPrefixedBytes(1024)
- // Pipeline is configured, we should be able to read then forward the message.
- var buffer1 = buffer
- let action1 = machine.receive(buffer: &buffer1, endStream: false)
- assertThat(action1, .is(.tryReading))
- let action2 = machine.readNextRequest()
- assertThat(action2, .is(.forwardMessage()))
- // Receive another message with end stream set.
- // Still no action.
- var buffer2 = buffer
- let action3 = machine.receive(buffer: &buffer2, endStream: true)
- assertThat(action3, .is(.tryReading))
- let action4 = machine.readNextRequest()
- assertThat(action4, .is(.forwardMessageThenRead()))
- let action5 = machine.readNextRequest()
- assertThat(action5, .is(.forwardEnd()))
- }
- func testReceiveDataWhenResponseStreamIsOpen() {
- var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
- let buffer = self.makeLengthPrefixedBytes(1024)
- // Receive a message. We should read and forward it.
- var buffer1 = buffer
- let action1 = machine.receive(buffer: &buffer1, endStream: false)
- assertThat(action1, .is(.tryReading))
- let action2 = machine.readNextRequest()
- assertThat(action2, .is(.forwardMessage()))
- // Receive a message and end stream. We should read it then forward message and end.
- var buffer2 = buffer
- let action3 = machine.receive(buffer: &buffer2, endStream: true)
- assertThat(action3, .is(.tryReading))
- let action4 = machine.readNextRequest()
- assertThat(action4, .is(.forwardMessageThenRead()))
- let action5 = machine.readNextRequest()
- assertThat(action5, .is(.forwardEnd()))
- }
- func testReceiveCompressedMessageWhenCompressionIsDisabled() {
- var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
- var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
- let action1 = machine.receive(buffer: &buffer, endStream: false)
- assertThat(action1, .is(.tryReading))
- let action2 = machine.readNextRequest()
- assertThat(action2, .is(.errorCaught()))
- }
- func testReceiveDataWhenClosed() {
- var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
- // Close while the request stream is still open.
- let action1 = machine.send(
- status: GRPCStatus(code: .ok, message: "ok"),
- trailers: [:]
- )
- assertThat(action1, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
- // Now receive end of request stream: tear down the handler, we're closed
- var emptyBuffer = ByteBuffer()
- let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
- assertThat(action2, .is(.finishHandler))
- }
- // MARK: Send Metadata Tests
- func testSendMetadataRequestStreamOpen() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
- // We'll just validate more 'normal' things here.
- let action1 = machine.send(headers: [:])
- assertThat(action1, .is(.success(.contains(":status", ["200"]))))
- let action2 = machine.send(headers: [:])
- assertThat(action2, .is(.failure()))
- }
- func testSendMetadataRequestStreamClosed() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- var buffer = ByteBuffer()
- let action1 = machine.receive(buffer: &buffer, endStream: true)
- assertThat(action1, .is(.tryReading))
- let action2 = machine.readNextRequest()
- assertThat(action2, .is(.forwardEnd()))
- // Write some headers back.
- let action3 = machine.send(headers: [:])
- assertThat(action3, .is(.success(.contains(":status", ["200"]))))
- }
- func testSendMetadataWhenOpen() {
- var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
- // Response stream is already open.
- let action = machine.send(headers: [:])
- assertThat(action, .is(.failure()))
- }
- func testSendMetadataNormalizesUserProvidedMetadata() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- let action = machine.send(headers: ["FOO": "bar"])
- assertThat(action, .success(.contains(caseSensitive: "foo")))
- }
- // MARK: Send Data Tests
- func testSendData() {
- for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
- var machine = self.makeStateMachine(state: startingState)
- let buffer = ByteBuffer(repeating: 0, count: 1024)
- // We should be able to do this multiple times.
- for _ in 0 ..< 5 {
- let action = machine.send(
- buffer: buffer,
- compress: false,
- promise: nil
- )
- assertThat(action, .is(.success()))
- }
- // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
- // write as normal.
- let action = machine.send(
- buffer: buffer,
- compress: true,
- promise: nil
- )
- assertThat(action, .is(.success()))
- }
- }
- func testSendDataAfterClose() {
- var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
- let action1 = machine.send(status: .ok, trailers: [:])
- assertThat(action1, .is(.sendTrailersAndFinish(.contains("grpc-status", ["0"]))))
- // We're already closed, this should fail.
- let buffer = ByteBuffer(repeating: 0, count: 1024)
- let action2 = machine.send(
- buffer: buffer,
- compress: false,
- promise: nil
- )
- assertThat(action2, .is(.failure()))
- }
- func testSendDataBeforeMetadata() {
- var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
- // Response stream is still idle, so this should fail.
- let buffer = ByteBuffer(repeating: 0, count: 1024)
- let action2 = machine.send(
- buffer: buffer,
- compress: false,
- promise: nil
- )
- assertThat(action2, .is(.failure()))
- }
- // MARK: Next Response
- func testNextResponseBeforeMetadata() {
- var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
- XCTAssertNil(machine.nextResponse())
- }
- func testNextResponseWhenOpen() throws {
- for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
- var machine = self.makeStateMachine(state: startingState)
- // No response buffered yet.
- XCTAssertNil(machine.nextResponse())
- let buffer = ByteBuffer(repeating: 0, count: 1024)
- machine.send(buffer: buffer, compress: false, promise: nil).assertSuccess()
- let (framedBuffer, promise) = try XCTUnwrap(machine.nextResponse())
- XCTAssertNil(promise) // Didn't provide a promise.
- framedBuffer.assertSuccess()
- // No more responses.
- XCTAssertNil(machine.nextResponse())
- }
- }
- func testNextResponseWhenClosed() throws {
- var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
- let action = machine.send(status: .ok, trailers: [:])
- switch action {
- case .sendTrailersAndFinish:
- ()
- default:
- XCTFail("Expected 'sendTrailersAndFinish' but got \(action)")
- }
- XCTAssertNil(machine.nextResponse())
- }
- // MARK: Send End
- func testSendEndWhenResponseStreamIsIdle() {
- for (state, closed) in zip([
- DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
- DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
- ], [false, true]) {
- var machine = self.makeStateMachine(state: state)
- let action1 = machine.send(status: .ok, trailers: [:])
- // This'll be a trailers-only response.
- if closed {
- assertThat(action1, .is(.sendTrailersAndFinish(.trailersOnly(code: .ok))))
- } else {
- assertThat(action1, .is(.sendTrailers(.trailersOnly(code: .ok))))
- }
- // Already closed.
- let action2 = machine.send(status: .ok, trailers: [:])
- assertThat(action2, .is(.failure()))
- }
- }
- func testSendEndWhenResponseStreamIsOpen() {
- for (state, closed) in zip([
- DesiredState.requestOpenResponseOpen,
- DesiredState.requestClosedResponseOpen,
- ], [false, true]) {
- var machine = self.makeStateMachine(state: state)
- let action = machine.send(
- status: GRPCStatus(code: .ok, message: "ok"),
- trailers: [:]
- )
- if closed {
- assertThat(action, .is(.sendTrailersAndFinish(.trailers(code: .ok, message: "ok"))))
- } else {
- assertThat(action, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
- }
- // Already closed.
- let action2 = machine.send(status: .ok, trailers: [:])
- assertThat(action2, .is(.failure()))
- }
- }
- }
- extension ServerMessageEncoding {
- fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
- return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
- }
- }
- class NoOpResponseWriter: GRPCServerResponseWriter {
- func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
- promise?.succeed(())
- }
- func sendMessage(
- _ bytes: ByteBuffer,
- metadata: MessageMetadata,
- promise: EventLoopPromise<Void>?
- ) {
- promise?.succeed(())
- }
- func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
- promise?.succeed(())
- }
- }
- extension HTTP2ToRawGRPCStateMachine {
- fileprivate mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
- return self.readNextRequest(maxLength: .max)
- }
- }
|