| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCCore
- import GRPCNIOTransportCore
- import GRPCNIOTransportHTTP2Posix
- import GRPCNIOTransportHTTP2TransportServices
- import NIOPosix
- import XCTest
- import protocol NIOCore.Channel
- @available(gRPCSwiftNIOTransport 2.0, *)
- final class HTTP2TransportTests: XCTestCase {
- // A combination of client and server transport kinds.
- struct Transport: Sendable, CustomStringConvertible {
- var server: TransportKind
- var client: TransportKind
- var description: String {
- "server=\(self.server) client=\(self.client)"
- }
- }
- func forEachTransportPair(
- _ transport: [Transport] = .supported,
- serverAddress: SocketAddress = .ipv4(host: "127.0.0.1", port: 0),
- enableControlService: Bool = true,
- clientCompression: CompressionAlgorithm = .none,
- clientEnabledCompression: CompressionAlgorithmSet = .none,
- serverCompression: CompressionAlgorithmSet = .none,
- _ execute: (
- ControlClient<NIOClientTransport>,
- GRPCServer<NIOServerTransport>,
- Transport
- ) async throws -> Void
- ) async throws {
- for pair in transport {
- try await withThrowingTaskGroup(of: Void.self) { group in
- let (server, address) = try await self.runServer(
- in: &group,
- address: serverAddress,
- kind: pair.server,
- enableControlService: enableControlService,
- compression: serverCompression
- )
- let target: any ResolvableTarget
- if let ipv4 = address.ipv4 {
- target = .ipv4(address: ipv4.host, port: ipv4.port)
- } else if let ipv6 = address.ipv6 {
- target = .ipv6(address: ipv6.host, port: ipv6.port)
- } else if let uds = address.unixDomainSocket {
- target = .unixDomainSocket(path: uds.path)
- } else {
- XCTFail("Unexpected address to connect to")
- return
- }
- let client = try self.makeClient(
- kind: pair.client,
- target: target,
- compression: clientCompression,
- enabledCompression: clientEnabledCompression
- )
- group.addTask {
- try await client.runConnections()
- }
- do {
- let control = ControlClient(wrapping: client)
- try await execute(control, server, pair)
- } catch {
- XCTFail("Unexpected error: '\(error)' (\(pair))")
- }
- server.beginGracefulShutdown()
- client.beginGracefulShutdown()
- }
- }
- }
- func forEachClientAndHTTPStatusCodeServer(
- _ kind: [TransportKind] = TransportKind.clients,
- _ execute: (ControlClient<NIOClientTransport>, TransportKind) async throws -> Void
- ) async throws {
- for clientKind in kind {
- try await withThrowingTaskGroup(of: Void.self) { group in
- let server = HTTP2StatusCodeServer()
- group.addTask {
- try await server.run()
- }
- let address = try await server.listeningAddress
- let client = try self.makeClient(
- kind: clientKind,
- target: .ipv4(address: address.host, port: address.port),
- compression: .none,
- enabledCompression: .none
- )
- group.addTask {
- try await client.runConnections()
- }
- do {
- let control = ControlClient(wrapping: client)
- try await execute(control, clientKind)
- } catch {
- XCTFail("Unexpected error: '\(error)' (\(clientKind))")
- }
- group.cancelAll()
- }
- }
- }
- private func runServer(
- in group: inout ThrowingTaskGroup<Void, any Error>,
- address: SocketAddress,
- kind: TransportKind,
- enableControlService: Bool,
- compression: CompressionAlgorithmSet
- ) async throws -> (GRPCServer<NIOServerTransport>, GRPCNIOTransportCore.SocketAddress) {
- let services = enableControlService ? [ControlService()] : []
- switch kind {
- case .posix:
- let server = GRPCServer(
- transport: NIOServerTransport(
- .http2NIOPosix(
- address: address,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.enabledAlgorithms = compression
- $0.rpc.maxRequestPayloadSize = .max
- }
- )
- ),
- services: services
- )
- group.addTask {
- try await server.serve()
- }
- let address = try await server.listeningAddress!
- return (server, address)
- #if canImport(Network)
- case .transportServices:
- let server = GRPCServer(
- transport: NIOServerTransport(
- .http2NIOTS(
- address: address,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.enabledAlgorithms = compression
- $0.rpc.maxRequestPayloadSize = .max
- }
- )
- ),
- services: services
- )
- group.addTask {
- try await server.serve()
- }
- let address = try await server.listeningAddress!
- return (server, address)
- #endif
- case .wrappedChannel:
- fatalError("Unsupported")
- }
- }
- private func makeClient(
- kind: TransportKind,
- target: any ResolvableTarget,
- compression: CompressionAlgorithm,
- enabledCompression: CompressionAlgorithmSet
- ) throws -> GRPCClient<NIOClientTransport> {
- let transport: NIOClientTransport
- var serviceConfig = ServiceConfig()
- serviceConfig.loadBalancingConfig = [.roundRobin]
- serviceConfig.methodConfig = [
- MethodConfig(
- // Applies to all RPCs
- names: [MethodConfig.Name(service: "", method: "")],
- maxRequestMessageBytes: .max,
- maxResponseMessageBytes: .max
- )
- ]
- switch kind {
- case .posix:
- let posix = try HTTP2ClientTransport.Posix(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.algorithm = compression
- $0.compression.enabledAlgorithms = enabledCompression
- },
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(posix)
- #if canImport(Network)
- case .transportServices:
- let transportServices = try HTTP2ClientTransport.TransportServices(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.algorithm = compression
- $0.compression.enabledAlgorithms = enabledCompression
- },
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(transportServices)
- #endif
- case .wrappedChannel:
- let bootstrap = ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup)
- let channel: any Channel
- var config = HTTP2ClientTransport.WrappedChannel.Config.defaults
- config.compression.algorithm = compression
- config.compression.enabledAlgorithms = enabledCompression
- if let dns = target as? ResolvableTargets.DNS {
- channel = try bootstrap.connect(host: dns.host, port: dns.port ?? 443).wait()
- config.http2.authority = dns.host
- } else if let ipv4 = target as? ResolvableTargets.IPv4, let address = ipv4.addresses.first {
- channel = try bootstrap.connect(host: address.host, port: address.port).wait()
- config.http2.authority = address.host
- } else if let ipv6 = target as? ResolvableTargets.IPv6, let address = ipv6.addresses.first {
- channel = try bootstrap.connect(host: address.host, port: address.port).wait()
- config.http2.authority = address.host
- } else if let uds = target as? ResolvableTargets.UnixDomainSocket {
- channel = try bootstrap.connect(unixDomainSocketPath: uds.address.path).wait()
- config.http2.authority = uds.authority
- } else {
- throw RPCError(
- code: .invalidArgument,
- message: "Target '\(target)' isn't supported by the tunnel transport."
- )
- }
- let wrapped = HTTP2ClientTransport.WrappedChannel(
- takingOwnershipOf: channel,
- config: config,
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(wrapped)
- }
- return GRPCClient(transport: transport, interceptors: [PeerInfoClientInterceptor()])
- }
- func testUnaryOK() async throws {
- // Client sends one message, server sends back metadata, a single message, and an ok status with
- // trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 0, count: 1024), "\(pair)")
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testUnaryNotOK() async throws {
- // Client sends one message, server sends back metadata, a single message, and a non-ok status
- // with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testUnaryRejected() async throws {
- // Client sends one message, server sends non-ok status with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest<ControlInput>(
- message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
- metadata: metadata
- )
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- // No initial metadata for trailers-only.
- XCTAssertEqual(response.metadata, [:])
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages which are ignored.
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- // Send a message.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // ... and the final status.
- try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
- }
- try await control.clientStream(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages which are ignored.
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- // Send a message.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
- }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingRejected() async throws {
- // Client sends one message, server sends non-ok status with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- let message: ControlInput = .trailersOnly(
- code: .aborted,
- message: "\(#function)",
- echoMetadata: true
- )
- try await writer.write(message)
- }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- // No initial metadata for trailers-only.
- XCTAssertEqual(response.metadata, [:])
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testServerStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- XCTAssertEqual(messagesReceived, 5)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingEmptyOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- // Echo back metadata, but don't send any messages.
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- for try await part in contents.bodyParts {
- switch part {
- case .message:
- XCTFail("Unexpected message")
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- do {
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata:
- XCTFail("Unexpected trailing metadata, should be provided in RPCError")
- }
- }
- XCTFail("Expected error to be thrown")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- XCTAssertEqual(messagesReceived, 5)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingEmptyNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- do {
- for try await _ in contents.bodyParts {
- XCTFail("Unexpected message, \(pair)")
- }
- XCTFail("Expected error to be thrown")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingRejected() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest<ControlInput>(
- message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
- metadata: metadata
- )
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("Expected RPC to be rejected \(pair)")
- case .failure(let error):
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- }
- func testBidiStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages, each is echo'd back.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- XCTAssertEqual(messagesReceived, 3)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingEmptyOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- var receivedTrailingMetadata = false
- for try await part in contents.bodyParts {
- switch part {
- case .message:
- XCTFail("Unexpected message \(pair)")
- case .trailingMetadata:
- XCTAssertFalse(receivedTrailingMetadata, "\(pair)")
- receivedTrailingMetadata = true
- }
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages, each is echo'd back.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- do {
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata:
- XCTFail("Trailing metadata should be provided by error")
- }
- }
- XCTFail("Should've thrown error \(pair)")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- XCTAssertEqual(messagesReceived, 3)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingRejected() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(
- .trailersOnly(
- code: .aborted,
- message: "\(#function)",
- echoMetadata: true
- )
- )
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("Expected RPC to fail \(pair)")
- case .failure(let error):
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"])
- }
- }
- }
- }
- // MARK: - Not Implemented
- func testUnaryNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = ClientRequest(message: ControlInput())
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testClientStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testServerStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = ClientRequest(message: ControlInput())
- try await control.serverStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testBidiStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.bidiStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- // MARK: - Compression tests
- private func testUnaryCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.unary(
- request: ClientRequest(message: message),
- options: options
- ) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- private func testClientStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.clientStream(request: request, options: options) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- private func testServerStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.serverStream(
- request: ClientRequest(message: message),
- options: options
- ) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- for try await message in response.messages {
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- }
- private func testBidiStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.bidiStream(request: request, options: options) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- for try await message in response.messages {
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- }
- func testUnaryDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testUnaryCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testUnaryGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testUnaryCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testClientStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testClientStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testClientStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testClientStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testServerStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testServerStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testServerStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testServerStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testBidiStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testBidiStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testBidiStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testBidiStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testUnaryUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let message = ControlInput.with {
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.unary(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testClientStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.noOp)
- }
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.clientStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testServerStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let message = ControlInput.with {
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.serverStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testBidiStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.noOp)
- }
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.bidiStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testUnaryTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.unary(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testClientStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(message)
- }
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.clientStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testServerStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.serverStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testBidiStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- }
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.bidiStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- private static let httpToStatusCodePairs: [(Int, RPCError.Code)] = [
- // See https://github.com/grpc/grpc/blob/7f664c69b2a636386fbf95c16bc78c559734ce0f/doc/http-grpc-status-mapping.md
- (400, .internalError),
- (401, .unauthenticated),
- (403, .permissionDenied),
- (404, .unimplemented),
- (418, .unknown),
- (429, .unavailable),
- (502, .unavailable),
- (503, .unavailable),
- (504, .unavailable),
- (504, .unavailable),
- ]
- func testUnaryAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let metadata: Metadata = ["response-status": "\(httpCode)"]
- try await control.unary(
- request: ClientRequest(message: .noOp, metadata: metadata)
- ) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testClientStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: ["response-status": "\(httpCode)"]
- ) { _ in
- }
- try await control.clientStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testServerStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let metadata: Metadata = ["response-status": "\(httpCode)"]
- try await control.serverStream(
- request: ClientRequest(message: .noOp, metadata: metadata)
- ) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testBidiStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: ["response-status": "\(httpCode)"]
- ) { _ in
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testUnaryScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: input)
- try await control.unary(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testServerStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: input)
- try await control.serverStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testClientStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(input)
- }
- try await control.clientStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testBidiStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(input)
- }
- try await control.bidiStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testServerCancellation() async throws {
- for kind in [CancellationKind.awaitCancelled, .withCancellationHandler] {
- try await self.forEachTransportPair { control, server, pair in
- let request = ClientRequest(message: kind)
- try await control.waitForCancellation(request: request) { response in
- // Shutdown the client so that it doesn't attempt to reconnect when the server closes.
- control.client.beginGracefulShutdown()
- // Shutdown the server to cancel the RPC.
- server.beginGracefulShutdown()
- // The RPC should complete without any error or response.
- let responses = try await response.messages.reduce(into: []) { $0.append($1) }
- XCTAssert(responses.isEmpty)
- }
- }
- }
- }
- func testCancellationIsPropagated() async throws {
- for kind in [CancellationKind.awaitCancelled, .withCancellationHandler] {
- try await self.forEachTransportPair { control, server, pair in
- let request = ClientRequest(message: kind)
- let signal = AsyncStream.makeStream(of: Void.self)
- let task = Task {
- try await control.waitForCancellation(request: request) { response in
- // Signal that the task should be cancelled.
- signal.continuation.finish()
- // The RPC should complete without any error or response.
- do {
- _ = try await response.messages.reduce(into: []) { $0.append($1) }
- XCTFail("Expected cancellation error")
- } catch is CancellationError {
- ()
- }
- }
- }
- // Wait for the signal to cancel.
- for await _ in signal.stream {}
- task.cancel()
- try await task.value
- }
- }
- }
- func testUppercaseClientMetadataKey() async throws {
- try await self.forEachTransportPair { control, _, _ in
- let request = ClientRequest<ControlInput>(
- message: .with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- },
- metadata: ["UPPERCASE-KEY": "value"]
- )
- try await control.unary(request: request) { response in
- // Keys will be lowercase before being sent over the wire.
- XCTAssertEqual(Array(response.metadata["echo-uppercase-key"]), ["value"])
- }
- }
- }
- func testUppercaseServerMetadataKey() async throws {
- try await self.forEachTransportPair { control, _, _ in
- let request = ClientRequest<ControlInput>(
- message: .with {
- $0.initialMetadataToAdd["UPPERCASE-KEY"] = "initial"
- $0.trailingMetadataToAdd["UPPERCASE-KEY"] = "trailing"
- $0.numberOfMessages = 1
- }
- )
- try await control.unary(request: request) { response in
- XCTAssertEqual(Array(response.metadata["uppercase-key"]), ["initial"])
- XCTAssertEqual(Array(response.trailingMetadata["uppercase-key"]), ["trailing"])
- }
- }
- }
- private func checkAuthority(
- client: GRPCClient<NIOClientTransport>,
- expected: String
- ) async throws {
- let control = ControlClient(wrapping: client)
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- }
- try await control.unary(request: ClientRequest(message: input)) { response in
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-authority"]), [.string(expected)])
- }
- }
- private func testAuthority(
- serverAddress: SocketAddress,
- authorityOverride override: String? = nil,
- clientTarget: (SocketAddress) -> any ResolvableTarget,
- expectedAuthority: (SocketAddress) -> String
- ) async throws {
- try await withGRPCServer(
- transport: .http2NIOPosix(
- address: serverAddress,
- transportSecurity: .plaintext
- ),
- services: [ControlService()]
- ) { server in
- guard let listeningAddress = try await server.listeningAddress else {
- XCTFail("No listening address")
- return
- }
- let target = clientTarget(listeningAddress)
- try await withGRPCClient(
- transport: NIOClientTransport(
- .http2NIOPosix(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.http2.authority = override
- }
- )
- )
- ) { client in
- try await self.checkAuthority(client: client, expected: expectedAuthority(listeningAddress))
- }
- }
- }
- func testAuthorityDNS() async throws {
- try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
- return .dns(host: "localhost", port: address.ipv4!.port)
- } expectedAuthority: { address in
- return "localhost:\(address.ipv4!.port)"
- }
- }
- func testOverrideAuthorityDNS() async throws {
- try await self.testAuthority(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .dns(host: "localhost", port: address.ipv4!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityIPv4() async throws {
- try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
- return .ipv4(address: "127.0.0.1", port: address.ipv4!.port)
- } expectedAuthority: { address in
- return "127.0.0.1:\(address.ipv4!.port)"
- }
- }
- func testOverrideAuthorityIPv4() async throws {
- try await self.testAuthority(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .ipv4(address: "127.0.0.1", port: address.ipv4!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityIPv6() async throws {
- try await self.testAuthority(serverAddress: .ipv6(host: "::1", port: 0)) { address in
- return .ipv6(address: "::1", port: address.ipv6!.port)
- } expectedAuthority: { address in
- return "[::1]:\(address.ipv6!.port)"
- }
- }
- func testOverrideAuthorityIPv6() async throws {
- try await self.testAuthority(
- serverAddress: .ipv6(host: "::1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .ipv6(address: "::1", port: address.ipv6!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityUDS() async throws {
- let path = "test-authority-uds"
- try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
- return .unixDomainSocket(path: path)
- } expectedAuthority: { _ in
- return path
- }
- }
- func testAuthorityLocalUDSOverride() async throws {
- let path = "test-authority-local-uds-override"
- try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
- return .unixDomainSocket(path: path, authority: "respect-my-authority")
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testOverrideAuthorityUDS() async throws {
- let path = "test-override-authority-uds"
- try await self.testAuthority(
- serverAddress: .unixDomainSocket(path: path),
- authorityOverride: "respect-my-authority"
- ) { _ in
- return .unixDomainSocket(path: path, authority: "should-be-ignored")
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testPeerInfoIPv4() async throws {
- try await self.forEachTransportPair(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- let serverRemotePeerMatches = peerInfo.server.remote.wholeMatch(of: /ipv4:127\.0\.0\.1:(\d+)/)
- let clientPort = try XCTUnwrap(serverRemotePeerMatches).1
- let serverLocalPeerMatches = peerInfo.server.local.wholeMatch(of: /ipv4:127.0.0.1:(\d+)/)
- let serverPort = try XCTUnwrap(serverLocalPeerMatches).1
- let clientRemotePeerMatches = peerInfo.client.remote.wholeMatch(of: /ipv4:127.0.0.1:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientRemotePeerMatches).1, serverPort)
- let clientLocalPeerMatches = peerInfo.client.local.wholeMatch(of: /ipv4:127\.0\.0\.1:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientLocalPeerMatches).1, clientPort)
- }
- }
- func testPeerInfoIPv6() async throws {
- try await self.forEachTransportPair(
- serverAddress: .ipv6(host: "::1", port: 0)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- let serverRemotePeerMatches = peerInfo.server.remote.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- let clientPort = try XCTUnwrap(serverRemotePeerMatches).1
- let serverLocalPeerMatches = peerInfo.server.local.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- let serverPort = try XCTUnwrap(serverLocalPeerMatches).1
- let clientRemotePeerMatches = peerInfo.client.remote.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientRemotePeerMatches).1, serverPort)
- let clientLocalPeerMatches = peerInfo.client.local.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientLocalPeerMatches).1, clientPort)
- }
- }
- func testPeerInfoUDS() async throws {
- let path = "peer-info-uds"
- try await self.forEachTransportPair(
- serverAddress: .unixDomainSocket(path: path)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- XCTAssertNotNil(peerInfo.server.remote.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.server.local.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.client.remote.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.client.local.wholeMatch(of: /unix:peer-info-uds/))
- }
- }
- enum TestError: Error {
- case someError
- }
- func testThrowingInClientStreamWriter() async throws {
- try await forEachTransportPair { client, server, transport in
- do {
- _ = try await client.clientStream(
- request: .init(producer: { writer in
- throw TestError.someError
- })
- )
- XCTFail("Test should have failed")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .unavailable)
- XCTAssertEqual(error.message, "Stream unexpectedly closed with error.")
- }
- }
- }
- func testLargeResponse() async throws {
- let sizeInMiB = 8
- let sizeInBytes = sizeInMiB * 1024 * 1024
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = false
- $0.echoMetadataInTrailers = false
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = sizeInBytes
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 0, count: sizeInBytes), "\(pair)")
- }
- }
- }
- }
- @available(gRPCSwiftNIOTransport 2.0, *)
- extension [HTTP2TransportTests.Transport] {
- static let supported: [HTTP2TransportTests.Transport] = TransportKind.servers.flatMap { server in
- TransportKind.clients.map { client in
- HTTP2TransportTests.Transport(server: server, client: client)
- }
- }
- }
- @available(gRPCSwiftNIOTransport 2.0, *)
- extension ControlInput {
- fileprivate static let echoMetadata = Self.with {
- $0.echoMetadataInHeaders = true
- }
- fileprivate static let noOp = Self()
- fileprivate static func messages(
- _ numberOfMessages: Int,
- repeating: UInt8,
- count: Int
- ) -> Self {
- return Self.with {
- $0.numberOfMessages = numberOfMessages
- $0.payloadParameters = .with {
- $0.content = repeating
- $0.size = count
- }
- }
- }
- fileprivate static func status(
- code: GRPCCore.Status.Code,
- message: String,
- echoMetadata: Bool
- ) -> Self {
- return Self.with {
- $0.echoMetadataInTrailers = echoMetadata
- $0.status = .with {
- $0.code = code
- $0.message = message
- }
- }
- }
- fileprivate static func trailersOnly(
- code: GRPCCore.Status.Code,
- message: String,
- echoMetadata: Bool
- ) -> Self {
- return Self.with {
- $0.echoMetadataInTrailers = echoMetadata
- $0.isTrailersOnly = true
- $0.status = .with {
- $0.code = code
- $0.message = message
- }
- }
- }
- }
- @available(gRPCSwiftNIOTransport 2.0, *)
- extension CompressionAlgorithm {
- var name: String {
- switch self {
- case .deflate:
- return "deflate"
- case .gzip:
- return "gzip"
- case .none:
- return "identity"
- default:
- return ""
- }
- }
- }
|