| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCCore
- import GRPCNIOTransportCore
- import GRPCNIOTransportHTTP2Posix
- import GRPCNIOTransportHTTP2TransportServices
- import NIOPosix
- import XCTest
- import protocol NIOCore.Channel
- @available(gRPCSwiftNIOTransport 1.0, *)
- final class HTTP2TransportTests: XCTestCase {
- // A combination of client and server transport kinds.
- struct Transport: Sendable, CustomStringConvertible {
- var server: TransportKind
- var client: TransportKind
- var description: String {
- "server=\(self.server) client=\(self.client)"
- }
- }
- func forEachTransportPair(
- _ transport: [Transport] = .supported,
- serverAddress: SocketAddress = .ipv4(host: "127.0.0.1", port: 0),
- enableControlService: Bool = true,
- clientCompression: CompressionAlgorithm = .none,
- clientEnabledCompression: CompressionAlgorithmSet = .none,
- serverCompression: CompressionAlgorithmSet = .none,
- _ execute: (
- ControlClient<NIOClientTransport>,
- GRPCServer<NIOServerTransport>,
- Transport
- ) async throws -> Void
- ) async throws {
- for pair in transport {
- try await withThrowingTaskGroup(of: Void.self) { group in
- let (server, address) = try await self.runServer(
- in: &group,
- address: serverAddress,
- kind: pair.server,
- enableControlService: enableControlService,
- compression: serverCompression
- )
- let target: any ResolvableTarget
- if let ipv4 = address.ipv4 {
- target = .ipv4(host: ipv4.host, port: ipv4.port)
- } else if let ipv6 = address.ipv6 {
- target = .ipv6(host: ipv6.host, port: ipv6.port)
- } else if let uds = address.unixDomainSocket {
- target = .unixDomainSocket(path: uds.path)
- } else {
- XCTFail("Unexpected address to connect to")
- return
- }
- let client = try self.makeClient(
- kind: pair.client,
- target: target,
- compression: clientCompression,
- enabledCompression: clientEnabledCompression
- )
- group.addTask {
- try await client.runConnections()
- }
- do {
- let control = ControlClient(wrapping: client)
- try await execute(control, server, pair)
- } catch {
- XCTFail("Unexpected error: '\(error)' (\(pair))")
- }
- server.beginGracefulShutdown()
- client.beginGracefulShutdown()
- }
- }
- }
- func forEachClientAndHTTPStatusCodeServer(
- _ kind: [TransportKind] = TransportKind.clients,
- _ execute: (ControlClient<NIOClientTransport>, TransportKind) async throws -> Void
- ) async throws {
- for clientKind in kind {
- try await withThrowingTaskGroup(of: Void.self) { group in
- let server = HTTP2StatusCodeServer()
- group.addTask {
- try await server.run()
- }
- let address = try await server.listeningAddress
- let client = try self.makeClient(
- kind: clientKind,
- target: .ipv4(host: address.host, port: address.port),
- compression: .none,
- enabledCompression: .none
- )
- group.addTask {
- try await client.runConnections()
- }
- do {
- let control = ControlClient(wrapping: client)
- try await execute(control, clientKind)
- } catch {
- XCTFail("Unexpected error: '\(error)' (\(clientKind))")
- }
- group.cancelAll()
- }
- }
- }
- private func runServer(
- in group: inout ThrowingTaskGroup<Void, any Error>,
- address: SocketAddress,
- kind: TransportKind,
- enableControlService: Bool,
- compression: CompressionAlgorithmSet
- ) async throws -> (GRPCServer<NIOServerTransport>, GRPCNIOTransportCore.SocketAddress) {
- let services = enableControlService ? [ControlService()] : []
- switch kind {
- case .posix:
- let server = GRPCServer(
- transport: NIOServerTransport(
- .http2NIOPosix(
- address: address,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.enabledAlgorithms = compression
- $0.rpc.maxRequestPayloadSize = .max
- }
- )
- ),
- services: services
- )
- group.addTask {
- try await server.serve()
- }
- let address = try await server.listeningAddress!
- return (server, address)
- #if canImport(Network)
- case .transportServices:
- let server = GRPCServer(
- transport: NIOServerTransport(
- .http2NIOTS(
- address: address,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.enabledAlgorithms = compression
- $0.rpc.maxRequestPayloadSize = .max
- }
- )
- ),
- services: services
- )
- group.addTask {
- try await server.serve()
- }
- let address = try await server.listeningAddress!
- return (server, address)
- #endif
- case .wrappedChannel:
- fatalError("Unsupported")
- }
- }
- private func makeClient(
- kind: TransportKind,
- target: any ResolvableTarget,
- compression: CompressionAlgorithm,
- enabledCompression: CompressionAlgorithmSet
- ) throws -> GRPCClient<NIOClientTransport> {
- let transport: NIOClientTransport
- var serviceConfig = ServiceConfig()
- serviceConfig.loadBalancingConfig = [.roundRobin]
- serviceConfig.methodConfig = [
- MethodConfig(
- // Applies to all RPCs
- names: [MethodConfig.Name(service: "", method: "")],
- maxRequestMessageBytes: .max,
- maxResponseMessageBytes: .max
- )
- ]
- switch kind {
- case .posix:
- let posix = try HTTP2ClientTransport.Posix(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.algorithm = compression
- $0.compression.enabledAlgorithms = enabledCompression
- },
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(posix)
- #if canImport(Network)
- case .transportServices:
- let transportServices = try HTTP2ClientTransport.TransportServices(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.compression.algorithm = compression
- $0.compression.enabledAlgorithms = enabledCompression
- },
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(transportServices)
- #endif
- case .wrappedChannel:
- let bootstrap = ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup)
- let channel: any Channel
- var config = HTTP2ClientTransport.WrappedChannel.Config.defaults
- config.compression.algorithm = compression
- config.compression.enabledAlgorithms = enabledCompression
- if let dns = target as? ResolvableTargets.DNS {
- channel = try bootstrap.connect(host: dns.host, port: dns.port ?? 443).wait()
- config.http2.authority = dns.host
- } else if let ipv4 = target as? ResolvableTargets.IPv4, let address = ipv4.addresses.first {
- channel = try bootstrap.connect(host: address.host, port: address.port).wait()
- config.http2.authority = address.host
- } else if let ipv6 = target as? ResolvableTargets.IPv6, let address = ipv6.addresses.first {
- channel = try bootstrap.connect(host: address.host, port: address.port).wait()
- config.http2.authority = address.host
- } else if let uds = target as? ResolvableTargets.UnixDomainSocket {
- channel = try bootstrap.connect(unixDomainSocketPath: uds.address.path).wait()
- config.http2.authority = uds.authority
- } else {
- throw RPCError(
- code: .invalidArgument,
- message: "Target '\(target)' isn't supported by the tunnel transport."
- )
- }
- let wrapped = HTTP2ClientTransport.WrappedChannel(
- takingOwnershipOf: channel,
- config: config,
- serviceConfig: serviceConfig
- )
- transport = NIOClientTransport(wrapped)
- }
- return GRPCClient(transport: transport, interceptors: [PeerInfoClientInterceptor()])
- }
- func testUnaryOK() async throws {
- // Client sends one message, server sends back metadata, a single message, and an ok status with
- // trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 0, count: 1024), "\(pair)")
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testUnaryNotOK() async throws {
- // Client sends one message, server sends back metadata, a single message, and a non-ok status
- // with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testUnaryRejected() async throws {
- // Client sends one message, server sends non-ok status with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest<ControlInput>(
- message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
- metadata: metadata
- )
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- // No initial metadata for trailers-only.
- XCTAssertEqual(response.metadata, [:])
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages which are ignored.
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- // Send a message.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // ... and the final status.
- try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
- }
- try await control.clientStream(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages which are ignored.
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- // Send a message.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
- }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testClientStreamingRejected() async throws {
- // Client sends one message, server sends non-ok status with trailing metadata.
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- let message: ControlInput = .trailersOnly(
- code: .aborted,
- message: "\(#function)",
- echoMetadata: true
- )
- try await writer.write(message)
- }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- let trailing = error.metadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- // No initial metadata for trailers-only.
- XCTAssertEqual(response.metadata, [:])
- let trailing = response.trailingMetadata
- XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- func testServerStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- XCTAssertEqual(messagesReceived, 5)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingEmptyOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- // Echo back metadata, but don't send any messages.
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- for try await part in contents.bodyParts {
- switch part {
- case .message:
- XCTFail("Unexpected message")
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- do {
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata:
- XCTFail("Unexpected trailing metadata, should be provided in RPCError")
- }
- }
- XCTFail("Expected error to be thrown")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- XCTAssertEqual(messagesReceived, 5)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingEmptyNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.status = .with {
- $0.code = .aborted
- $0.message = "\(#function)"
- }
- }
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- do {
- for try await _ in contents.bodyParts {
- XCTFail("Unexpected message, \(pair)")
- }
- XCTFail("Expected error to be thrown")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testServerStreamingRejected() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest<ControlInput>(
- message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
- metadata: metadata
- )
- try await control.serverStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("Expected RPC to be rejected \(pair)")
- case .failure(let error):
- XCTAssertEqual(error.code, .aborted, "\(pair)")
- XCTAssertEqual(error.message, "\(#function)", "\(pair)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- }
- }
- func testBidiStreamingOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages, each is echo'd back.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata(let metadata):
- XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- }
- XCTAssertEqual(messagesReceived, 3)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingEmptyOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- var receivedTrailingMetadata = false
- for try await part in contents.bodyParts {
- switch part {
- case .message:
- XCTFail("Unexpected message \(pair)")
- case .trailingMetadata:
- XCTAssertFalse(receivedTrailingMetadata, "\(pair)")
- receivedTrailingMetadata = true
- }
- }
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingNotOK() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(.echoMetadata)
- // Send a few messages, each is echo'd back.
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- // Send the final status.
- try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success(let contents):
- XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- var messagesReceived = 0
- do {
- for try await part in contents.bodyParts {
- switch part {
- case .message(let message):
- messagesReceived += 1
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
- case .trailingMetadata:
- XCTFail("Trailing metadata should be provided by error")
- }
- }
- XCTFail("Should've thrown error \(pair)")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
- }
- XCTAssertEqual(messagesReceived, 3)
- case .failure(let error):
- throw error
- }
- }
- }
- }
- func testBidiStreamingRejected() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let metadata: Metadata = ["test-key": "test-value"]
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: metadata
- ) { writer in
- try await writer.write(
- .trailersOnly(
- code: .aborted,
- message: "\(#function)",
- echoMetadata: true
- )
- )
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("Expected RPC to fail \(pair)")
- case .failure(let error):
- XCTAssertEqual(error.code, .aborted)
- XCTAssertEqual(error.message, "\(#function)")
- XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"])
- }
- }
- }
- }
- // MARK: - Not Implemented
- func testUnaryNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = ClientRequest(message: ControlInput())
- try await control.unary(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testClientStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.clientStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testServerStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = ClientRequest(message: ControlInput())
- try await control.serverStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- func testBidiStreamingNotImplemented() async throws {
- try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { _ in }
- try await control.bidiStream(request: request) { response in
- XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
- XCTAssertEqual(error.code, .unimplemented)
- }
- }
- }
- }
- // MARK: - Compression tests
- private func testUnaryCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.unary(
- request: ClientRequest(message: message),
- options: options
- ) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- private func testClientStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- try await writer.write(.noOp)
- try await writer.write(.noOp)
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.clientStream(request: request, options: options) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- private func testServerStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 5
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.serverStream(
- request: ClientRequest(message: message),
- options: options
- ) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- for try await message in response.messages {
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- }
- private func testBidiStreamingCompression(
- client: CompressionAlgorithm,
- server: CompressionAlgorithm,
- control: ControlClient<NIOClientTransport>,
- pair: Transport
- ) async throws {
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- try await writer.write(.messages(1, repeating: 42, count: 1024))
- }
- var options = CallOptions.defaults
- options.compression = client
- try await control.bidiStream(request: request, options: options) { response in
- // Check the client algorithm.
- switch client {
- case .deflate, .gzip:
- // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
- let encoding = Array(response.metadata["echo-grpc-encoding"])
- XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- // Check the server algorithm.
- switch server {
- case .deflate, .gzip:
- let encoding = Array(response.metadata["grpc-encoding"])
- XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
- case .none:
- ()
- default:
- XCTFail("Unhandled compression '\(client)'")
- }
- for try await message in response.messages {
- XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
- }
- }
- }
- func testUnaryDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testUnaryCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testUnaryGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testUnaryCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testClientStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testClientStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testClientStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testClientStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testServerStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testServerStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testServerStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testServerStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testBidiStreamingDeflateCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .deflate,
- clientEnabledCompression: .deflate,
- serverCompression: .deflate
- ) { control, _, pair in
- try await self.testBidiStreamingCompression(
- client: .deflate,
- server: .deflate,
- control: control,
- pair: pair
- )
- }
- }
- func testBidiStreamingGzipCompression() async throws {
- try await self.forEachTransportPair(
- clientCompression: .gzip,
- clientEnabledCompression: .gzip,
- serverCompression: .gzip
- ) { control, _, pair in
- try await self.testBidiStreamingCompression(
- client: .gzip,
- server: .gzip,
- control: control,
- pair: pair
- )
- }
- }
- func testUnaryUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let message = ControlInput.with {
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.unary(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testClientStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.noOp)
- }
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.clientStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testServerStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let message = ControlInput.with {
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 42
- $0.size = 1024
- }
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.serverStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testBidiStreamingUnsupportedCompression() async throws {
- try await self.forEachTransportPair(
- clientEnabledCompression: .all,
- serverCompression: .gzip
- ) { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.noOp)
- }
- var options = CallOptions.defaults
- options.compression = .deflate
- try await control.bidiStream(request: request, options: options) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should've been rejected")
- case .failure(let error):
- let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
- // "identity" may or may not be included, so only test for values which must be present.
- XCTAssertTrue(acceptEncoding.contains("gzip"))
- XCTAssertFalse(acceptEncoding.contains("deflate"))
- }
- }
- }
- }
- func testUnaryTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.unary(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testClientStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(message)
- }
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.clientStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testServerStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let message = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: message)
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.serverStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- func testBidiStreamingTimeoutPropagatedToServer() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- try await writer.write(.echoMetadata)
- }
- var options = CallOptions.defaults
- options.timeout = .seconds(10)
- try await control.bidiStream(request: request, options: options) { response in
- let timeout = Array(response.metadata["echo-grpc-timeout"])
- XCTAssertEqual(timeout.count, 1)
- }
- }
- }
- private static let httpToStatusCodePairs: [(Int, RPCError.Code)] = [
- // See https://github.com/grpc/grpc/blob/7f664c69b2a636386fbf95c16bc78c559734ce0f/doc/http-grpc-status-mapping.md
- (400, .internalError),
- (401, .unauthenticated),
- (403, .permissionDenied),
- (404, .unimplemented),
- (418, .unknown),
- (429, .unavailable),
- (502, .unavailable),
- (503, .unavailable),
- (504, .unavailable),
- (504, .unavailable),
- ]
- func testUnaryAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let metadata: Metadata = ["response-status": "\(httpCode)"]
- try await control.unary(
- request: ClientRequest(message: .noOp, metadata: metadata)
- ) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testClientStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: ["response-status": "\(httpCode)"]
- ) { _ in
- }
- try await control.clientStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testServerStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let metadata: Metadata = ["response-status": "\(httpCode)"]
- try await control.serverStream(
- request: ClientRequest(message: .noOp, metadata: metadata)
- ) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testBidiStreamingAgainstNonGRPCServer() async throws {
- try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
- for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
- // Tell the server what to respond with.
- let request = StreamingClientRequest(
- of: ControlInput.self,
- metadata: ["response-status": "\(httpCode)"]
- ) { _ in
- }
- try await control.bidiStream(request: request) { response in
- switch response.accepted {
- case .success:
- XCTFail("RPC should have failed with '\(expectedStatus)'")
- case .failure(let error):
- XCTAssertEqual(error.code, expectedStatus)
- }
- }
- }
- }
- }
- func testUnaryScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: input)
- try await control.unary(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testServerStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- let request = ClientRequest(message: input)
- try await control.serverStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testClientStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(input)
- }
- try await control.clientStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testBidiStreamingScheme() async throws {
- try await self.forEachTransportPair { control, _, pair in
- let request = StreamingClientRequest(of: ControlInput.self) { writer in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- }
- try await writer.write(input)
- }
- try await control.bidiStream(request: request) { response in
- XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
- }
- }
- }
- func testServerCancellation() async throws {
- for kind in [CancellationKind.awaitCancelled, .withCancellationHandler] {
- try await self.forEachTransportPair { control, server, pair in
- let request = ClientRequest(message: kind)
- try await control.waitForCancellation(request: request) { response in
- // Shutdown the client so that it doesn't attempt to reconnect when the server closes.
- control.client.beginGracefulShutdown()
- // Shutdown the server to cancel the RPC.
- server.beginGracefulShutdown()
- // The RPC should complete without any error or response.
- let responses = try await response.messages.reduce(into: []) { $0.append($1) }
- XCTAssert(responses.isEmpty)
- }
- }
- }
- }
- func testUppercaseClientMetadataKey() async throws {
- try await self.forEachTransportPair { control, _, _ in
- let request = ClientRequest<ControlInput>(
- message: .with {
- $0.echoMetadataInHeaders = true
- $0.numberOfMessages = 1
- },
- metadata: ["UPPERCASE-KEY": "value"]
- )
- try await control.unary(request: request) { response in
- // Keys will be lowercase before being sent over the wire.
- XCTAssertEqual(Array(response.metadata["echo-uppercase-key"]), ["value"])
- }
- }
- }
- func testUppercaseServerMetadataKey() async throws {
- try await self.forEachTransportPair { control, _, _ in
- let request = ClientRequest<ControlInput>(
- message: .with {
- $0.initialMetadataToAdd["UPPERCASE-KEY"] = "initial"
- $0.trailingMetadataToAdd["UPPERCASE-KEY"] = "trailing"
- $0.numberOfMessages = 1
- }
- )
- try await control.unary(request: request) { response in
- XCTAssertEqual(Array(response.metadata["uppercase-key"]), ["initial"])
- XCTAssertEqual(Array(response.trailingMetadata["uppercase-key"]), ["trailing"])
- }
- }
- }
- private func checkAuthority(
- client: GRPCClient<NIOClientTransport>,
- expected: String
- ) async throws {
- let control = ControlClient(wrapping: client)
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = true
- $0.echoMetadataInTrailers = true
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = 1024
- }
- }
- try await control.unary(request: ClientRequest(message: input)) { response in
- let initial = response.metadata
- XCTAssertEqual(Array(initial["echo-authority"]), [.string(expected)])
- }
- }
- private func testAuthority(
- serverAddress: SocketAddress,
- authorityOverride override: String? = nil,
- clientTarget: (SocketAddress) -> any ResolvableTarget,
- expectedAuthority: (SocketAddress) -> String
- ) async throws {
- try await withGRPCServer(
- transport: .http2NIOPosix(
- address: serverAddress,
- transportSecurity: .plaintext
- ),
- services: [ControlService()]
- ) { server in
- guard let listeningAddress = try await server.listeningAddress else {
- XCTFail("No listening address")
- return
- }
- let target = clientTarget(listeningAddress)
- try await withGRPCClient(
- transport: NIOClientTransport(
- .http2NIOPosix(
- target: target,
- transportSecurity: .plaintext,
- config: .defaults {
- $0.http2.authority = override
- }
- )
- )
- ) { client in
- try await self.checkAuthority(client: client, expected: expectedAuthority(listeningAddress))
- }
- }
- }
- func testAuthorityDNS() async throws {
- try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
- return .dns(host: "localhost", port: address.ipv4!.port)
- } expectedAuthority: { address in
- return "localhost:\(address.ipv4!.port)"
- }
- }
- func testOverrideAuthorityDNS() async throws {
- try await self.testAuthority(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .dns(host: "localhost", port: address.ipv4!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityIPv4() async throws {
- try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
- return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
- } expectedAuthority: { address in
- return "127.0.0.1:\(address.ipv4!.port)"
- }
- }
- func testOverrideAuthorityIPv4() async throws {
- try await self.testAuthority(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityIPv6() async throws {
- try await self.testAuthority(serverAddress: .ipv6(host: "::1", port: 0)) { address in
- return .ipv6(host: "::1", port: address.ipv6!.port)
- } expectedAuthority: { address in
- return "[::1]:\(address.ipv6!.port)"
- }
- }
- func testOverrideAuthorityIPv6() async throws {
- try await self.testAuthority(
- serverAddress: .ipv6(host: "::1", port: 0),
- authorityOverride: "respect-my-authority"
- ) { address in
- return .ipv6(host: "::1", port: address.ipv6!.port)
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testAuthorityUDS() async throws {
- let path = "test-authority-uds"
- try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
- return .unixDomainSocket(path: path)
- } expectedAuthority: { _ in
- return path
- }
- }
- func testAuthorityLocalUDSOverride() async throws {
- let path = "test-authority-local-uds-override"
- try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
- return .unixDomainSocket(path: path, authority: "respect-my-authority")
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testOverrideAuthorityUDS() async throws {
- let path = "test-override-authority-uds"
- try await self.testAuthority(
- serverAddress: .unixDomainSocket(path: path),
- authorityOverride: "respect-my-authority"
- ) { _ in
- return .unixDomainSocket(path: path, authority: "should-be-ignored")
- } expectedAuthority: { _ in
- return "respect-my-authority"
- }
- }
- func testPeerInfoIPv4() async throws {
- try await self.forEachTransportPair(
- serverAddress: .ipv4(host: "127.0.0.1", port: 0)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- let serverRemotePeerMatches = peerInfo.server.remote.wholeMatch(of: /ipv4:127\.0\.0\.1:(\d+)/)
- let clientPort = try XCTUnwrap(serverRemotePeerMatches).1
- let serverLocalPeerMatches = peerInfo.server.local.wholeMatch(of: /ipv4:127.0.0.1:(\d+)/)
- let serverPort = try XCTUnwrap(serverLocalPeerMatches).1
- let clientRemotePeerMatches = peerInfo.client.remote.wholeMatch(of: /ipv4:127.0.0.1:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientRemotePeerMatches).1, serverPort)
- let clientLocalPeerMatches = peerInfo.client.local.wholeMatch(of: /ipv4:127\.0\.0\.1:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientLocalPeerMatches).1, clientPort)
- }
- }
- func testPeerInfoIPv6() async throws {
- try await self.forEachTransportPair(
- serverAddress: .ipv6(host: "::1", port: 0)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- let serverRemotePeerMatches = peerInfo.server.remote.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- let clientPort = try XCTUnwrap(serverRemotePeerMatches).1
- let serverLocalPeerMatches = peerInfo.server.local.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- let serverPort = try XCTUnwrap(serverLocalPeerMatches).1
- let clientRemotePeerMatches = peerInfo.client.remote.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientRemotePeerMatches).1, serverPort)
- let clientLocalPeerMatches = peerInfo.client.local.wholeMatch(of: /ipv6:\[::1\]:(\d+)/)
- XCTAssertEqual(try XCTUnwrap(clientLocalPeerMatches).1, clientPort)
- }
- }
- func testPeerInfoUDS() async throws {
- let path = "peer-info-uds"
- try await self.forEachTransportPair(
- serverAddress: .unixDomainSocket(path: path)
- ) { control, _, _ in
- let peerInfo = try await control.peerInfo()
- XCTAssertNotNil(peerInfo.server.remote.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.server.local.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.client.remote.wholeMatch(of: /unix:peer-info-uds/))
- XCTAssertNotNil(peerInfo.client.local.wholeMatch(of: /unix:peer-info-uds/))
- }
- }
- enum TestError: Error {
- case someError
- }
- func testThrowingInClientStreamWriter() async throws {
- try await forEachTransportPair { client, server, transport in
- do {
- _ = try await client.clientStream(
- request: .init(producer: { writer in
- throw TestError.someError
- })
- )
- XCTFail("Test should have failed")
- } catch let error as RPCError {
- XCTAssertEqual(error.code, .unavailable)
- XCTAssertEqual(error.message, "Stream unexpectedly closed with error.")
- }
- }
- }
- func testLargeResponse() async throws {
- let sizeInMiB = 8
- let sizeInBytes = sizeInMiB * 1024 * 1024
- try await self.forEachTransportPair { control, _, pair in
- let input = ControlInput.with {
- $0.echoMetadataInHeaders = false
- $0.echoMetadataInTrailers = false
- $0.numberOfMessages = 1
- $0.payloadParameters = .with {
- $0.content = 0
- $0.size = sizeInBytes
- }
- }
- let metadata: Metadata = ["test-key": "test-value"]
- let request = ClientRequest(message: input, metadata: metadata)
- try await control.unary(request: request) { response in
- let message = try response.message
- XCTAssertEqual(message.payload, Data(repeating: 0, count: sizeInBytes), "\(pair)")
- }
- }
- }
- }
- @available(gRPCSwiftNIOTransport 1.0, *)
- extension [HTTP2TransportTests.Transport] {
- static let supported: [HTTP2TransportTests.Transport] = TransportKind.servers.flatMap { server in
- TransportKind.clients.map { client in
- HTTP2TransportTests.Transport(server: server, client: client)
- }
- }
- }
- @available(gRPCSwiftNIOTransport 1.0, *)
- extension ControlInput {
- fileprivate static let echoMetadata = Self.with {
- $0.echoMetadataInHeaders = true
- }
- fileprivate static let noOp = Self()
- fileprivate static func messages(
- _ numberOfMessages: Int,
- repeating: UInt8,
- count: Int
- ) -> Self {
- return Self.with {
- $0.numberOfMessages = numberOfMessages
- $0.payloadParameters = .with {
- $0.content = repeating
- $0.size = count
- }
- }
- }
- fileprivate static func status(
- code: GRPCCore.Status.Code,
- message: String,
- echoMetadata: Bool
- ) -> Self {
- return Self.with {
- $0.echoMetadataInTrailers = echoMetadata
- $0.status = .with {
- $0.code = code
- $0.message = message
- }
- }
- }
- fileprivate static func trailersOnly(
- code: GRPCCore.Status.Code,
- message: String,
- echoMetadata: Bool
- ) -> Self {
- return Self.with {
- $0.echoMetadataInTrailers = echoMetadata
- $0.isTrailersOnly = true
- $0.status = .with {
- $0.code = code
- $0.message = message
- }
- }
- }
- }
- @available(gRPCSwiftNIOTransport 1.0, *)
- extension CompressionAlgorithm {
- var name: String {
- switch self {
- case .deflate:
- return "deflate"
- case .gzip:
- return "gzip"
- case .none:
- return "identity"
- default:
- return ""
- }
- }
- }
|