| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCCore
- import NIOCore
- import NIOEmbedded
- import NIOHPACK
- import NIOHTTP1
- import NIOHTTP2
- import XCTest
- @testable import GRPCNIOTransportCore
- @available(gRPCSwiftNIOTransport 2.0, *)
- final class GRPCClientStreamHandlerTests: XCTestCase {
- func testH2FramesAreIgnored() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1
- )
- let channel = EmbeddedChannel(handler: handler)
- let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
- .ping(.init(), ack: false),
- .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
- .priority(
- HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
- ),
- .settings(.ack),
- .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
- .windowUpdate(windowSizeIncrement: 4),
- .alternativeService(origin: nil, field: nil),
- .origin([]),
- ]
- for toBeIgnored in framesToBeIgnored {
- XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
- XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
- }
- }
- func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Receive server's initial metadata without :status
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(code: .unknown, message: "HTTP Status Code is missing."),
- Metadata(headers: serverInitialMetadata)
- )
- )
- }
- func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Receive server's initial metadata with 1xx status
- let invalidServerInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "104",
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: invalidServerInitialMetadata))
- )
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- // We are still expecting the correct headers after getting a 1xx response, so make sure we
- // don't fail if we get the metadata twice.
- let validServerInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- "some-custom-header": "some-custom-value",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: validServerInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.metadata(Metadata(headers: validServerInitialMetadata))
- )
- }
- func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Receive server's initial metadata with non-200 and non-1xx :status
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
- Metadata(headers: serverInitialMetadata)
- )
- )
- // We should not throw if the server sends another message:
- // we should drop it, since the server is now closed.
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- buffer.writeInteger(UInt32(42)) // message length
- buffer.writeRepeatingByte(0, count: 42) // message
- let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
- XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- }
- func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Receive server's initial metadata without content-type
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200"
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(code: .internalError, message: "Missing content-type header"),
- Metadata(headers: serverInitialMetadata)
- )
- )
- }
- func testNotAcceptedEncodingResultsInFinishedRPC() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .deflate,
- acceptedEncodings: [.deflate],
- maxPayloadSize: 1
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- XCTAssertNoThrow(
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- )
- // Make sure we have sent right metadata.
- let writtenMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenMetadata.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- GRPCHTTP2Keys.encoding.rawValue: "deflate",
- GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
- ]
- )
- // Server sends initial metadata with unsupported encoding
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
- GRPCHTTP2Keys.encoding.rawValue: "gzip",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(
- code: .internalError,
- message:
- "The server picked a compression algorithm ('gzip') the client does not know about."
- ),
- Metadata(headers: serverInitialMetadata)
- )
- )
- }
- func testOverMaximumPayloadSize() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- XCTAssertNoThrow(
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- )
- // Make sure we have sent right metadata.
- let writtenMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenMetadata.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- )
- // Server sends initial metadata
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .metadata(Metadata(headers: serverInitialMetadata))
- )
- // Server sends message over payload limit
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- buffer.writeInteger(UInt32(42)) // message length
- buffer.writeRepeatingByte(0, count: 42) // message
- let clientDataPayload = HTTP2Frame.FramePayload.Data(
- data: .byteBuffer(buffer),
- endStream: false
- )
- // Invalid payload should result in error status and stream being closed
- try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
- let part = try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self)
- XCTAssertEqual(
- part,
- .status(Status(code: .internalError, message: "Failed to decode message"), [:])
- )
- channel.embeddedEventLoop.run()
- try channel.closeFuture.wait()
- }
- func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 100,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- XCTAssertNoThrow(
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- )
- // Make sure we have sent right metadata.
- let writtenMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenMetadata.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- )
- // Server sends initial metadata
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .metadata(Metadata(headers: serverInitialMetadata))
- )
- // Server sends message with EOS set.
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- buffer.writeInteger(UInt32(42)) // message length
- buffer.writeRepeatingByte(0, count: 42) // message
- let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
- XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
- // Make sure we got status + trailers with the right error.
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- Status(
- code: .internalError,
- message:
- "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
- ),
- [:]
- )
- )
- }
- func testServerEndsStream() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Write client's initial metadata
- XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
- let clientInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
- // Receive server's initial metadata with end stream set
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.grpcStatus.rawValue: "0",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(
- .init(
- headers: serverInitialMetadata,
- endStream: true
- )
- )
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(code: .ok, message: ""),
- [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- ]
- )
- )
- // We should not throw if the server sends another message:
- // we should drop it, since the server is now closed.
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- buffer.writeInteger(UInt32(42)) // message length
- buffer.writeRepeatingByte(0, count: 42) // message
- let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
- XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- // We should also not throw if the server sends trailers again.
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata, endStream: true))
- )
- )
- }
- func testNormalFlow() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 100,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Make sure we have sent the corresponding frame, and that nothing has been written back.
- let writtenHeaders = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenHeaders.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- // Receive server's initial metadata
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- "some-custom-header": "some-custom-value",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
- )
- // Send a message
- XCTAssertNoThrow(
- try channel.writeOutbound(
- RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
- )
- )
- // Assert we wrote it successfully into the channel
- let writtenMessage = try channel.assertReadDataOutbound()
- var expectedBuffer = ByteBuffer()
- expectedBuffer.writeInteger(UInt8(0)) // not compressed
- expectedBuffer.writeInteger(UInt32(42)) // message length
- expectedBuffer.writeRepeatingByte(1, count: 42) // message
- XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
- // Half-close the outbound end: this would be triggered by finishing the client's writer.
- XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
- // Make sure the EOS frame was sent
- let emptyEOSFrame = try channel.assertReadDataOutbound()
- XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
- XCTAssertTrue(emptyEOSFrame.endStream)
- // Make sure that, if we flush again, we're not writing anything else down
- // the stream. We should have closed at this point.
- channel.flush()
- XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
- // Make sure we cannot write anymore because client's closed.
- XCTAssertThrowsError(
- ofType: RPCError.self,
- try channel.writeOutbound(
- RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
- )
- ) { error in
- XCTAssertEqual(error.code, .internalError)
- XCTAssertEqual(error.message, "Invalid state")
- }
- // This is needed to clear the EmbeddedChannel's stored error, otherwise
- // it will be thrown when writing inbound.
- try? channel.throwIfErrorCaught()
- // Server sends back response message
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- buffer.writeInteger(UInt32(42)) // message length
- buffer.writeRepeatingByte(0, count: 42) // message
- let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
- XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
- // Make sure we read the message properly
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
- )
- // Server sends status to end RPC
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(
- .init(headers: [
- GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
- GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
- "custom-header": "custom-value",
- ])
- )
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
- )
- }
- func testReceiveMessageSplitAcrossMultipleBuffers() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 100,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Make sure we have sent the corresponding frame, and that nothing has been written back.
- let writtenHeaders = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenHeaders.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- // Receive server's initial metadata
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- "some-custom-header": "some-custom-value",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
- )
- // Send a message
- XCTAssertNoThrow(
- try channel.writeOutbound(
- RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
- )
- )
- // Assert we wrote it successfully into the channel
- let writtenMessage = try channel.assertReadDataOutbound()
- var expectedBuffer = ByteBuffer()
- expectedBuffer.writeInteger(UInt8(0)) // not compressed
- expectedBuffer.writeInteger(UInt32(42)) // message length
- expectedBuffer.writeRepeatingByte(1, count: 42) // message
- XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
- // Receive server's first message
- var buffer = ByteBuffer()
- buffer.writeInteger(UInt8(0)) // not compressed
- XCTAssertNoThrow(
- try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- buffer.clear()
- buffer.writeInteger(UInt32(30)) // message length
- XCTAssertNoThrow(
- try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- buffer.clear()
- buffer.writeRepeatingByte(0, count: 10) // first part of the message
- XCTAssertNoThrow(
- try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- buffer.clear()
- buffer.writeRepeatingByte(1, count: 10) // second part of the message
- XCTAssertNoThrow(
- try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- buffer.clear()
- buffer.writeRepeatingByte(2, count: 10) // third part of the message
- XCTAssertNoThrow(
- try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
- )
- var expected = ByteBuffer()
- expected.writeRepeatingByte(0, count: 10)
- expected.writeRepeatingByte(1, count: 10)
- expected.writeRepeatingByte(2, count: 10)
- // Make sure we read the message properly
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.message(GRPCNIOTransportBytes(expected))
- )
- }
- func testSendMultipleMessagesInSingleBuffer() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 100,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Send client's initial metadata
- let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
- XCTAssertNoThrow(try channel.writeOutbound(request))
- // Make sure we have sent the corresponding frame, and that nothing has been written back.
- let writtenHeaders = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(
- writtenHeaders.headers,
- [
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- )
- XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
- // Receive server's initial metadata
- let serverInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.status.rawValue: "200",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- "some-custom-header": "some-custom-value",
- ]
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
- )
- )
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
- )
- // This is where this test actually begins. We want to write two messages
- // without flushing, and make sure that no messages are sent down the pipeline
- // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
- // Write back first message and make sure nothing's written in the channel.
- XCTAssertNoThrow(
- channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
- )
- XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
- // Write back second message and make sure nothing's written in the channel.
- XCTAssertNoThrow(
- channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
- )
- XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
- // Now flush and check we *do* write the data.
- channel.flush()
- let writtenMessage = try channel.assertReadDataOutbound()
- // Make sure both messages have been framed together in the ByteBuffer.
- XCTAssertEqual(
- writtenMessage.data,
- .byteBuffer(
- .init(bytes: [
- // First message
- 0, // Compression disabled
- 0, 0, 0, 4, // Message length
- 1, 1, 1, 1, // First message data
- // Second message
- 0, // Compression disabled
- 0, 0, 0, 4, // Message length
- 2, 2, 2, 2, // Second message data
- ])
- )
- )
- XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
- }
- func testUnexpectedStreamClose_ErrorFired() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Write client's initial metadata
- XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
- let clientInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
- // An error is fired down the pipeline
- let thrownError = ChannelError.connectTimeout(.milliseconds(100))
- channel.pipeline.fireErrorCaught(thrownError)
- // The client receives a status explaining the stream was closed because of the thrown error.
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(
- code: .unavailable,
- message: "Stream unexpectedly closed with error."
- ),
- [:]
- )
- )
- // We should now be closed: check we can't write anymore.
- XCTAssertThrowsError(
- ofType: RPCError.self,
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- ) { error in
- XCTAssertEqual(error.code, .internalError)
- XCTAssertEqual(error.message, "Invalid state")
- }
- }
- func testUnexpectedStreamClose_ChannelInactive() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Write client's initial metadata
- XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
- let clientInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
- // Channel becomes inactive
- channel.pipeline.fireChannelInactive()
- // The client receives a status explaining the stream was closed.
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(code: .unavailable, message: "Stream unexpectedly closed."),
- [:]
- )
- )
- // We should now be closed: check we can't write anymore.
- XCTAssertThrowsError(
- ofType: RPCError.self,
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- ) { error in
- XCTAssertEqual(error.code, .internalError)
- XCTAssertEqual(error.message, "Invalid state")
- }
- }
- func testUnexpectedStreamClose_ResetStreamFrame() throws {
- let handler = GRPCClientStreamHandler(
- methodDescriptor: .testTest,
- scheme: .http,
- authority: nil,
- outboundEncoding: .none,
- acceptedEncodings: [],
- maxPayloadSize: 1,
- skipStateMachineAssertions: true
- )
- let channel = EmbeddedChannel(handler: handler)
- // Write client's initial metadata
- XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
- let clientInitialMetadata: HPACKHeaders = [
- GRPCHTTP2Keys.path.rawValue: "/test/test",
- GRPCHTTP2Keys.scheme.rawValue: "http",
- GRPCHTTP2Keys.method.rawValue: "POST",
- GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
- GRPCHTTP2Keys.te.rawValue: "trailers",
- ]
- let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
- XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
- // Receive RST_STREAM
- XCTAssertNoThrow(
- try channel.writeInbound(
- HTTP2Frame.FramePayload.rstStream(.internalError)
- )
- )
- // The client receives a status explaining RST_STREAM was sent.
- XCTAssertEqual(
- try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
- .status(
- .init(
- code: .unavailable,
- message: "Stream unexpectedly closed: received RST_STREAM frame (0x2: internal error)."
- ),
- [:]
- )
- )
- // We should now be closed: check we can't write anymore.
- XCTAssertThrowsError(
- ofType: RPCError.self,
- try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
- ) { error in
- XCTAssertEqual(error.code, .internalError)
- XCTAssertEqual(error.message, "Invalid state")
- }
- }
- }
- extension EmbeddedChannel {
- fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
- guard
- case .headers(let writtenHeaders) = try XCTUnwrap(
- try self.readOutbound(as: HTTP2Frame.FramePayload.self)
- )
- else {
- throw TestError.assertionFailure("Expected to write headers")
- }
- return writtenHeaders
- }
- fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
- guard
- case .data(let writtenMessage) = try XCTUnwrap(
- try self.readOutbound(as: HTTP2Frame.FramePayload.self)
- )
- else {
- throw TestError.assertionFailure("Expected to write data")
- }
- return writtenMessage
- }
- }
- private enum TestError: Error {
- case assertionFailure(String)
- }
- @available(gRPCSwiftNIOTransport 2.0, *)
- extension MethodDescriptor {
- static let testTest = Self(fullyQualifiedService: "test", method: "test")
- }
|