HTTP2TransportTests.swift 56 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCNIOTransportCore
  18. import GRPCNIOTransportHTTP2Posix
  19. import GRPCNIOTransportHTTP2TransportServices
  20. import XCTest
  21. final class HTTP2TransportTests: XCTestCase {
  22. // A combination of client and server transport kinds.
  23. struct Transport: Sendable, CustomStringConvertible {
  24. var server: Kind
  25. var client: Kind
  26. enum Kind: Sendable, CustomStringConvertible {
  27. case posix
  28. case niots
  29. var description: String {
  30. switch self {
  31. case .posix:
  32. return "NIOPosix"
  33. case .niots:
  34. return "NIOTS"
  35. }
  36. }
  37. }
  38. var description: String {
  39. "server=\(self.server) client=\(self.client)"
  40. }
  41. }
  42. func forEachTransportPair(
  43. _ transport: [Transport] = [.init(server: .posix, client: .posix)],
  44. serverAddress: SocketAddress = .ipv4(host: "127.0.0.1", port: 0),
  45. enableControlService: Bool = true,
  46. clientCompression: CompressionAlgorithm = .none,
  47. clientEnabledCompression: CompressionAlgorithmSet = .none,
  48. serverCompression: CompressionAlgorithmSet = .none,
  49. _ execute: (ControlClient, GRPCServer, Transport) async throws -> Void
  50. ) async throws {
  51. for pair in transport {
  52. try await withThrowingTaskGroup(of: Void.self) { group in
  53. let (server, address) = try await self.runServer(
  54. in: &group,
  55. address: serverAddress,
  56. kind: pair.server,
  57. enableControlService: enableControlService,
  58. compression: serverCompression
  59. )
  60. let target: any ResolvableTarget
  61. if let ipv4 = address.ipv4 {
  62. target = .ipv4(host: ipv4.host, port: ipv4.port)
  63. } else if let ipv6 = address.ipv6 {
  64. target = .ipv6(host: ipv6.host, port: ipv6.port)
  65. } else if let uds = address.unixDomainSocket {
  66. target = .unixDomainSocket(path: uds.path)
  67. } else {
  68. XCTFail("Unexpected address to connect to")
  69. return
  70. }
  71. let client = try self.makeClient(
  72. kind: pair.client,
  73. target: target,
  74. compression: clientCompression,
  75. enabledCompression: clientEnabledCompression
  76. )
  77. group.addTask {
  78. try await client.run()
  79. }
  80. do {
  81. let control = ControlClient(wrapping: client)
  82. try await execute(control, server, pair)
  83. } catch {
  84. XCTFail("Unexpected error: '\(error)' (\(pair))")
  85. }
  86. server.beginGracefulShutdown()
  87. client.beginGracefulShutdown()
  88. }
  89. }
  90. }
  91. func forEachClientAndHTTPStatusCodeServer(
  92. _ kind: [Transport.Kind] = [.posix, .niots],
  93. _ execute: (ControlClient, Transport.Kind) async throws -> Void
  94. ) async throws {
  95. for clientKind in kind {
  96. try await withThrowingTaskGroup(of: Void.self) { group in
  97. let server = HTTP2StatusCodeServer()
  98. group.addTask {
  99. try await server.run()
  100. }
  101. let address = try await server.listeningAddress
  102. let client = try self.makeClient(
  103. kind: clientKind,
  104. target: .ipv4(host: address.host, port: address.port),
  105. compression: .none,
  106. enabledCompression: .none
  107. )
  108. group.addTask {
  109. try await client.run()
  110. }
  111. do {
  112. let control = ControlClient(wrapping: client)
  113. try await execute(control, clientKind)
  114. } catch {
  115. XCTFail("Unexpected error: '\(error)' (\(clientKind))")
  116. }
  117. group.cancelAll()
  118. }
  119. }
  120. }
  121. private func runServer(
  122. in group: inout ThrowingTaskGroup<Void, any Error>,
  123. address: SocketAddress,
  124. kind: Transport.Kind,
  125. enableControlService: Bool,
  126. compression: CompressionAlgorithmSet
  127. ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) {
  128. let services = enableControlService ? [ControlService()] : []
  129. switch kind {
  130. case .posix:
  131. let server = GRPCServer(
  132. transport: .http2NIOPosix(
  133. address: address,
  134. transportSecurity: .plaintext,
  135. config: .defaults {
  136. $0.compression.enabledAlgorithms = compression
  137. }
  138. ),
  139. services: services
  140. )
  141. group.addTask {
  142. try await server.serve()
  143. }
  144. let address = try await server.listeningAddress!
  145. return (server, address)
  146. case .niots:
  147. #if canImport(Network)
  148. let server = GRPCServer(
  149. transport: .http2NIOTS(
  150. address: address,
  151. transportSecurity: .plaintext,
  152. config: .defaults {
  153. $0.compression.enabledAlgorithms = compression
  154. }
  155. ),
  156. services: services
  157. )
  158. group.addTask {
  159. try await server.serve()
  160. }
  161. let address = try await server.listeningAddress!
  162. return (server, address)
  163. #else
  164. throw XCTSkip("Transport not supported on this platform")
  165. #endif
  166. }
  167. }
  168. private func makeClient(
  169. kind: Transport.Kind,
  170. target: any ResolvableTarget,
  171. compression: CompressionAlgorithm,
  172. enabledCompression: CompressionAlgorithmSet
  173. ) throws -> GRPCClient {
  174. let transport: any ClientTransport
  175. switch kind {
  176. case .posix:
  177. var serviceConfig = ServiceConfig()
  178. serviceConfig.loadBalancingConfig = [.roundRobin]
  179. transport = try HTTP2ClientTransport.Posix(
  180. target: target,
  181. transportSecurity: .plaintext,
  182. config: .defaults {
  183. $0.compression.algorithm = compression
  184. $0.compression.enabledAlgorithms = enabledCompression
  185. },
  186. serviceConfig: serviceConfig
  187. )
  188. case .niots:
  189. #if canImport(Network)
  190. var serviceConfig = ServiceConfig()
  191. serviceConfig.loadBalancingConfig = [.roundRobin]
  192. transport = try HTTP2ClientTransport.TransportServices(
  193. target: target,
  194. transportSecurity: .plaintext,
  195. config: .defaults {
  196. $0.compression.algorithm = compression
  197. $0.compression.enabledAlgorithms = enabledCompression
  198. },
  199. serviceConfig: serviceConfig
  200. )
  201. #else
  202. throw XCTSkip("Transport not supported on this platform")
  203. #endif
  204. }
  205. return GRPCClient(transport: transport)
  206. }
  207. func testUnaryOK() async throws {
  208. // Client sends one message, server sends back metadata, a single message, and an ok status with
  209. // trailing metadata.
  210. try await self.forEachTransportPair { control, _, pair in
  211. let input = ControlInput.with {
  212. $0.echoMetadataInHeaders = true
  213. $0.echoMetadataInTrailers = true
  214. $0.numberOfMessages = 1
  215. $0.payloadParameters = .with {
  216. $0.content = 0
  217. $0.size = 1024
  218. }
  219. }
  220. let metadata: Metadata = ["test-key": "test-value"]
  221. let request = ClientRequest(message: input, metadata: metadata)
  222. try await control.unary(request: request) { response in
  223. let message = try response.message
  224. XCTAssertEqual(message.payload, Data(repeating: 0, count: 1024), "\(pair)")
  225. let initial = response.metadata
  226. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  227. let trailing = response.trailingMetadata
  228. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  229. }
  230. }
  231. }
  232. func testUnaryNotOK() async throws {
  233. // Client sends one message, server sends back metadata, a single message, and a non-ok status
  234. // with trailing metadata.
  235. try await self.forEachTransportPair { control, _, pair in
  236. let input = ControlInput.with {
  237. $0.echoMetadataInTrailers = true
  238. $0.numberOfMessages = 1
  239. $0.payloadParameters = .with {
  240. $0.content = 0
  241. $0.size = 1024
  242. }
  243. $0.status = .with {
  244. $0.code = .aborted
  245. $0.message = "\(#function)"
  246. }
  247. }
  248. let metadata: Metadata = ["test-key": "test-value"]
  249. let request = ClientRequest(message: input, metadata: metadata)
  250. try await control.unary(request: request) { response in
  251. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  252. XCTAssertEqual(error.code, .aborted)
  253. XCTAssertEqual(error.message, "\(#function)")
  254. let trailing = error.metadata
  255. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  256. }
  257. let trailing = response.trailingMetadata
  258. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  259. }
  260. }
  261. }
  262. func testUnaryRejected() async throws {
  263. // Client sends one message, server sends non-ok status with trailing metadata.
  264. try await self.forEachTransportPair { control, _, pair in
  265. let metadata: Metadata = ["test-key": "test-value"]
  266. let request = ClientRequest<ControlInput>(
  267. message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
  268. metadata: metadata
  269. )
  270. try await control.unary(request: request) { response in
  271. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  272. XCTAssertEqual(error.code, .aborted, "\(pair)")
  273. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  274. let trailing = error.metadata
  275. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  276. }
  277. // No initial metadata for trailers-only.
  278. XCTAssertEqual(response.metadata, [:])
  279. let trailing = response.trailingMetadata
  280. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  281. }
  282. }
  283. }
  284. func testClientStreamingOK() async throws {
  285. try await self.forEachTransportPair { control, _, pair in
  286. let metadata: Metadata = ["test-key": "test-value"]
  287. let request = StreamingClientRequest(
  288. of: ControlInput.self,
  289. metadata: metadata
  290. ) { writer in
  291. try await writer.write(.echoMetadata)
  292. // Send a few messages which are ignored.
  293. try await writer.write(.noOp)
  294. try await writer.write(.noOp)
  295. try await writer.write(.noOp)
  296. // Send a message.
  297. try await writer.write(.messages(1, repeating: 42, count: 1024))
  298. // ... and the final status.
  299. try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
  300. }
  301. try await control.clientStream(request: request) { response in
  302. let message = try response.message
  303. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  304. let initial = response.metadata
  305. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  306. let trailing = response.trailingMetadata
  307. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  308. }
  309. }
  310. }
  311. func testClientStreamingNotOK() async throws {
  312. try await self.forEachTransportPair { control, _, pair in
  313. let metadata: Metadata = ["test-key": "test-value"]
  314. let request = StreamingClientRequest(
  315. of: ControlInput.self,
  316. metadata: metadata
  317. ) { writer in
  318. try await writer.write(.echoMetadata)
  319. // Send a few messages which are ignored.
  320. try await writer.write(.noOp)
  321. try await writer.write(.noOp)
  322. try await writer.write(.noOp)
  323. // Send a message.
  324. try await writer.write(.messages(1, repeating: 42, count: 1024))
  325. // Send the final status.
  326. try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
  327. }
  328. try await control.clientStream(request: request) { response in
  329. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  330. XCTAssertEqual(error.code, .aborted, "\(pair)")
  331. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  332. let trailing = error.metadata
  333. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  334. }
  335. let initial = response.metadata
  336. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  337. let trailing = response.trailingMetadata
  338. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  339. }
  340. }
  341. }
  342. func testClientStreamingRejected() async throws {
  343. // Client sends one message, server sends non-ok status with trailing metadata.
  344. try await self.forEachTransportPair { control, _, pair in
  345. let metadata: Metadata = ["test-key": "test-value"]
  346. let request = StreamingClientRequest(
  347. of: ControlInput.self,
  348. metadata: metadata
  349. ) { writer in
  350. let message: ControlInput = .trailersOnly(
  351. code: .aborted,
  352. message: "\(#function)",
  353. echoMetadata: true
  354. )
  355. try await writer.write(message)
  356. }
  357. try await control.clientStream(request: request) { response in
  358. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  359. XCTAssertEqual(error.code, .aborted, "\(pair)")
  360. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  361. let trailing = error.metadata
  362. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  363. }
  364. // No initial metadata for trailers-only.
  365. XCTAssertEqual(response.metadata, [:])
  366. let trailing = response.trailingMetadata
  367. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  368. }
  369. }
  370. }
  371. func testServerStreamingOK() async throws {
  372. try await self.forEachTransportPair { control, _, pair in
  373. let metadata: Metadata = ["test-key": "test-value"]
  374. let input = ControlInput.with {
  375. $0.echoMetadataInHeaders = true
  376. $0.echoMetadataInTrailers = true
  377. $0.numberOfMessages = 5
  378. $0.payloadParameters = .with {
  379. $0.content = 42
  380. $0.size = 1024
  381. }
  382. }
  383. let request = ClientRequest(message: input, metadata: metadata)
  384. try await control.serverStream(request: request) { response in
  385. switch response.accepted {
  386. case .success(let contents):
  387. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  388. var messagesReceived = 0
  389. for try await part in contents.bodyParts {
  390. switch part {
  391. case .message(let message):
  392. messagesReceived += 1
  393. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  394. case .trailingMetadata(let metadata):
  395. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  396. }
  397. }
  398. XCTAssertEqual(messagesReceived, 5)
  399. case .failure(let error):
  400. throw error
  401. }
  402. }
  403. }
  404. }
  405. func testServerStreamingEmptyOK() async throws {
  406. try await self.forEachTransportPair { control, _, pair in
  407. let metadata: Metadata = ["test-key": "test-value"]
  408. // Echo back metadata, but don't send any messages.
  409. let input = ControlInput.with {
  410. $0.echoMetadataInHeaders = true
  411. $0.echoMetadataInTrailers = true
  412. }
  413. let request = ClientRequest(message: input, metadata: metadata)
  414. try await control.serverStream(request: request) { response in
  415. switch response.accepted {
  416. case .success(let contents):
  417. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  418. for try await part in contents.bodyParts {
  419. switch part {
  420. case .message:
  421. XCTFail("Unexpected message")
  422. case .trailingMetadata(let metadata):
  423. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  424. }
  425. }
  426. case .failure(let error):
  427. throw error
  428. }
  429. }
  430. }
  431. }
  432. func testServerStreamingNotOK() async throws {
  433. try await self.forEachTransportPair { control, _, pair in
  434. let metadata: Metadata = ["test-key": "test-value"]
  435. let input = ControlInput.with {
  436. $0.echoMetadataInHeaders = true
  437. $0.echoMetadataInTrailers = true
  438. $0.numberOfMessages = 5
  439. $0.payloadParameters = .with {
  440. $0.content = 42
  441. $0.size = 1024
  442. }
  443. $0.status = .with {
  444. $0.code = .aborted
  445. $0.message = "\(#function)"
  446. }
  447. }
  448. let request = ClientRequest(message: input, metadata: metadata)
  449. try await control.serverStream(request: request) { response in
  450. switch response.accepted {
  451. case .success(let contents):
  452. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  453. var messagesReceived = 0
  454. do {
  455. for try await part in contents.bodyParts {
  456. switch part {
  457. case .message(let message):
  458. messagesReceived += 1
  459. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  460. case .trailingMetadata:
  461. XCTFail("Unexpected trailing metadata, should be provided in RPCError")
  462. }
  463. }
  464. XCTFail("Expected error to be thrown")
  465. } catch let error as RPCError {
  466. XCTAssertEqual(error.code, .aborted)
  467. XCTAssertEqual(error.message, "\(#function)")
  468. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  469. }
  470. XCTAssertEqual(messagesReceived, 5)
  471. case .failure(let error):
  472. throw error
  473. }
  474. }
  475. }
  476. }
  477. func testServerStreamingEmptyNotOK() async throws {
  478. try await self.forEachTransportPair { control, _, pair in
  479. let metadata: Metadata = ["test-key": "test-value"]
  480. let input = ControlInput.with {
  481. $0.echoMetadataInHeaders = true
  482. $0.echoMetadataInTrailers = true
  483. $0.status = .with {
  484. $0.code = .aborted
  485. $0.message = "\(#function)"
  486. }
  487. }
  488. let request = ClientRequest(message: input, metadata: metadata)
  489. try await control.serverStream(request: request) { response in
  490. switch response.accepted {
  491. case .success(let contents):
  492. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  493. do {
  494. for try await _ in contents.bodyParts {
  495. XCTFail("Unexpected message, \(pair)")
  496. }
  497. XCTFail("Expected error to be thrown")
  498. } catch let error as RPCError {
  499. XCTAssertEqual(error.code, .aborted)
  500. XCTAssertEqual(error.message, "\(#function)")
  501. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  502. }
  503. case .failure(let error):
  504. throw error
  505. }
  506. }
  507. }
  508. }
  509. func testServerStreamingRejected() async throws {
  510. try await self.forEachTransportPair { control, _, pair in
  511. let metadata: Metadata = ["test-key": "test-value"]
  512. let request = ClientRequest<ControlInput>(
  513. message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
  514. metadata: metadata
  515. )
  516. try await control.serverStream(request: request) { response in
  517. switch response.accepted {
  518. case .success:
  519. XCTFail("Expected RPC to be rejected \(pair)")
  520. case .failure(let error):
  521. XCTAssertEqual(error.code, .aborted, "\(pair)")
  522. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  523. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  524. }
  525. }
  526. }
  527. }
  528. func testBidiStreamingOK() async throws {
  529. try await self.forEachTransportPair { control, _, pair in
  530. let metadata: Metadata = ["test-key": "test-value"]
  531. let request = StreamingClientRequest(
  532. of: ControlInput.self,
  533. metadata: metadata
  534. ) { writer in
  535. try await writer.write(.echoMetadata)
  536. // Send a few messages, each is echo'd back.
  537. try await writer.write(.messages(1, repeating: 42, count: 1024))
  538. try await writer.write(.messages(1, repeating: 42, count: 1024))
  539. try await writer.write(.messages(1, repeating: 42, count: 1024))
  540. // Send the final status.
  541. try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
  542. }
  543. try await control.bidiStream(request: request) { response in
  544. switch response.accepted {
  545. case .success(let contents):
  546. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  547. var messagesReceived = 0
  548. for try await part in contents.bodyParts {
  549. switch part {
  550. case .message(let message):
  551. messagesReceived += 1
  552. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  553. case .trailingMetadata(let metadata):
  554. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  555. }
  556. }
  557. XCTAssertEqual(messagesReceived, 3)
  558. case .failure(let error):
  559. throw error
  560. }
  561. }
  562. }
  563. }
  564. func testBidiStreamingEmptyOK() async throws {
  565. try await self.forEachTransportPair { control, _, pair in
  566. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  567. try await control.bidiStream(request: request) { response in
  568. switch response.accepted {
  569. case .success(let contents):
  570. var receivedTrailingMetadata = false
  571. for try await part in contents.bodyParts {
  572. switch part {
  573. case .message:
  574. XCTFail("Unexpected message \(pair)")
  575. case .trailingMetadata:
  576. XCTAssertFalse(receivedTrailingMetadata, "\(pair)")
  577. receivedTrailingMetadata = true
  578. }
  579. }
  580. case .failure(let error):
  581. throw error
  582. }
  583. }
  584. }
  585. }
  586. func testBidiStreamingNotOK() async throws {
  587. try await self.forEachTransportPair { control, _, pair in
  588. let metadata: Metadata = ["test-key": "test-value"]
  589. let request = StreamingClientRequest(
  590. of: ControlInput.self,
  591. metadata: metadata
  592. ) { writer in
  593. try await writer.write(.echoMetadata)
  594. // Send a few messages, each is echo'd back.
  595. try await writer.write(.messages(1, repeating: 42, count: 1024))
  596. try await writer.write(.messages(1, repeating: 42, count: 1024))
  597. try await writer.write(.messages(1, repeating: 42, count: 1024))
  598. // Send the final status.
  599. try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
  600. }
  601. try await control.bidiStream(request: request) { response in
  602. switch response.accepted {
  603. case .success(let contents):
  604. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  605. var messagesReceived = 0
  606. do {
  607. for try await part in contents.bodyParts {
  608. switch part {
  609. case .message(let message):
  610. messagesReceived += 1
  611. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  612. case .trailingMetadata:
  613. XCTFail("Trailing metadata should be provided by error")
  614. }
  615. }
  616. XCTFail("Should've thrown error \(pair)")
  617. } catch let error as RPCError {
  618. XCTAssertEqual(error.code, .aborted)
  619. XCTAssertEqual(error.message, "\(#function)")
  620. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  621. }
  622. XCTAssertEqual(messagesReceived, 3)
  623. case .failure(let error):
  624. throw error
  625. }
  626. }
  627. }
  628. }
  629. func testBidiStreamingRejected() async throws {
  630. try await self.forEachTransportPair { control, _, pair in
  631. let metadata: Metadata = ["test-key": "test-value"]
  632. let request = StreamingClientRequest(
  633. of: ControlInput.self,
  634. metadata: metadata
  635. ) { writer in
  636. try await writer.write(
  637. .trailersOnly(
  638. code: .aborted,
  639. message: "\(#function)",
  640. echoMetadata: true
  641. )
  642. )
  643. }
  644. try await control.bidiStream(request: request) { response in
  645. switch response.accepted {
  646. case .success:
  647. XCTFail("Expected RPC to fail \(pair)")
  648. case .failure(let error):
  649. XCTAssertEqual(error.code, .aborted)
  650. XCTAssertEqual(error.message, "\(#function)")
  651. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"])
  652. }
  653. }
  654. }
  655. }
  656. // MARK: - Not Implemented
  657. func testUnaryNotImplemented() async throws {
  658. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  659. let request = ClientRequest(message: ControlInput())
  660. try await control.unary(request: request) { response in
  661. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  662. XCTAssertEqual(error.code, .unimplemented)
  663. }
  664. }
  665. }
  666. }
  667. func testClientStreamingNotImplemented() async throws {
  668. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  669. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  670. try await control.clientStream(request: request) { response in
  671. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  672. XCTAssertEqual(error.code, .unimplemented)
  673. }
  674. }
  675. }
  676. }
  677. func testServerStreamingNotImplemented() async throws {
  678. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  679. let request = ClientRequest(message: ControlInput())
  680. try await control.serverStream(request: request) { response in
  681. XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
  682. XCTAssertEqual(error.code, .unimplemented)
  683. }
  684. }
  685. }
  686. }
  687. func testBidiStreamingNotImplemented() async throws {
  688. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  689. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  690. try await control.bidiStream(request: request) { response in
  691. XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
  692. XCTAssertEqual(error.code, .unimplemented)
  693. }
  694. }
  695. }
  696. }
  697. // MARK: - Compression tests
  698. private func testUnaryCompression(
  699. client: CompressionAlgorithm,
  700. server: CompressionAlgorithm,
  701. control: ControlClient,
  702. pair: Transport
  703. ) async throws {
  704. let message = ControlInput.with {
  705. $0.echoMetadataInHeaders = true
  706. $0.numberOfMessages = 1
  707. $0.payloadParameters = .with {
  708. $0.content = 42
  709. $0.size = 1024
  710. }
  711. }
  712. var options = CallOptions.defaults
  713. options.compression = client
  714. try await control.unary(
  715. request: ClientRequest(message: message),
  716. options: options
  717. ) { response in
  718. // Check the client algorithm.
  719. switch client {
  720. case .deflate, .gzip:
  721. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  722. let encoding = Array(response.metadata["echo-grpc-encoding"])
  723. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  724. case .none:
  725. ()
  726. default:
  727. XCTFail("Unhandled compression '\(client)'")
  728. }
  729. // Check the server algorithm.
  730. switch server {
  731. case .deflate, .gzip:
  732. let encoding = Array(response.metadata["grpc-encoding"])
  733. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  734. case .none:
  735. ()
  736. default:
  737. XCTFail("Unhandled compression '\(client)'")
  738. }
  739. let message = try response.message
  740. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  741. }
  742. }
  743. private func testClientStreamingCompression(
  744. client: CompressionAlgorithm,
  745. server: CompressionAlgorithm,
  746. control: ControlClient,
  747. pair: Transport
  748. ) async throws {
  749. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  750. try await writer.write(.echoMetadata)
  751. try await writer.write(.noOp)
  752. try await writer.write(.noOp)
  753. try await writer.write(.messages(1, repeating: 42, count: 1024))
  754. }
  755. var options = CallOptions.defaults
  756. options.compression = client
  757. try await control.clientStream(request: request, options: options) { response in
  758. // Check the client algorithm.
  759. switch client {
  760. case .deflate, .gzip:
  761. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  762. let encoding = Array(response.metadata["echo-grpc-encoding"])
  763. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  764. case .none:
  765. ()
  766. default:
  767. XCTFail("Unhandled compression '\(client)'")
  768. }
  769. // Check the server algorithm.
  770. switch server {
  771. case .deflate, .gzip:
  772. let encoding = Array(response.metadata["grpc-encoding"])
  773. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  774. case .none:
  775. ()
  776. default:
  777. XCTFail("Unhandled compression '\(client)'")
  778. }
  779. let message = try response.message
  780. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  781. }
  782. }
  783. private func testServerStreamingCompression(
  784. client: CompressionAlgorithm,
  785. server: CompressionAlgorithm,
  786. control: ControlClient,
  787. pair: Transport
  788. ) async throws {
  789. let message = ControlInput.with {
  790. $0.echoMetadataInHeaders = true
  791. $0.numberOfMessages = 5
  792. $0.payloadParameters = .with {
  793. $0.content = 42
  794. $0.size = 1024
  795. }
  796. }
  797. var options = CallOptions.defaults
  798. options.compression = client
  799. try await control.serverStream(
  800. request: ClientRequest(message: message),
  801. options: options
  802. ) { response in
  803. // Check the client algorithm.
  804. switch client {
  805. case .deflate, .gzip:
  806. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  807. let encoding = Array(response.metadata["echo-grpc-encoding"])
  808. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  809. case .none:
  810. ()
  811. default:
  812. XCTFail("Unhandled compression '\(client)'")
  813. }
  814. // Check the server algorithm.
  815. switch server {
  816. case .deflate, .gzip:
  817. let encoding = Array(response.metadata["grpc-encoding"])
  818. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  819. case .none:
  820. ()
  821. default:
  822. XCTFail("Unhandled compression '\(client)'")
  823. }
  824. for try await message in response.messages {
  825. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  826. }
  827. }
  828. }
  829. private func testBidiStreamingCompression(
  830. client: CompressionAlgorithm,
  831. server: CompressionAlgorithm,
  832. control: ControlClient,
  833. pair: Transport
  834. ) async throws {
  835. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  836. try await writer.write(.echoMetadata)
  837. try await writer.write(.messages(1, repeating: 42, count: 1024))
  838. try await writer.write(.messages(1, repeating: 42, count: 1024))
  839. try await writer.write(.messages(1, repeating: 42, count: 1024))
  840. }
  841. var options = CallOptions.defaults
  842. options.compression = client
  843. try await control.bidiStream(request: request, options: options) { response in
  844. // Check the client algorithm.
  845. switch client {
  846. case .deflate, .gzip:
  847. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  848. let encoding = Array(response.metadata["echo-grpc-encoding"])
  849. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  850. case .none:
  851. ()
  852. default:
  853. XCTFail("Unhandled compression '\(client)'")
  854. }
  855. // Check the server algorithm.
  856. switch server {
  857. case .deflate, .gzip:
  858. let encoding = Array(response.metadata["grpc-encoding"])
  859. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  860. case .none:
  861. ()
  862. default:
  863. XCTFail("Unhandled compression '\(client)'")
  864. }
  865. for try await message in response.messages {
  866. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  867. }
  868. }
  869. }
  870. func testUnaryDeflateCompression() async throws {
  871. try await self.forEachTransportPair(
  872. clientCompression: .deflate,
  873. clientEnabledCompression: .deflate,
  874. serverCompression: .deflate
  875. ) { control, _, pair in
  876. try await self.testUnaryCompression(
  877. client: .deflate,
  878. server: .deflate,
  879. control: control,
  880. pair: pair
  881. )
  882. }
  883. }
  884. func testUnaryGzipCompression() async throws {
  885. try await self.forEachTransportPair(
  886. clientCompression: .gzip,
  887. clientEnabledCompression: .gzip,
  888. serverCompression: .gzip
  889. ) { control, _, pair in
  890. try await self.testUnaryCompression(
  891. client: .gzip,
  892. server: .gzip,
  893. control: control,
  894. pair: pair
  895. )
  896. }
  897. }
  898. func testClientStreamingDeflateCompression() async throws {
  899. try await self.forEachTransportPair(
  900. clientCompression: .deflate,
  901. clientEnabledCompression: .deflate,
  902. serverCompression: .deflate
  903. ) { control, _, pair in
  904. try await self.testClientStreamingCompression(
  905. client: .deflate,
  906. server: .deflate,
  907. control: control,
  908. pair: pair
  909. )
  910. }
  911. }
  912. func testClientStreamingGzipCompression() async throws {
  913. try await self.forEachTransportPair(
  914. clientCompression: .gzip,
  915. clientEnabledCompression: .gzip,
  916. serverCompression: .gzip
  917. ) { control, _, pair in
  918. try await self.testClientStreamingCompression(
  919. client: .gzip,
  920. server: .gzip,
  921. control: control,
  922. pair: pair
  923. )
  924. }
  925. }
  926. func testServerStreamingDeflateCompression() async throws {
  927. try await self.forEachTransportPair(
  928. clientCompression: .deflate,
  929. clientEnabledCompression: .deflate,
  930. serverCompression: .deflate
  931. ) { control, _, pair in
  932. try await self.testServerStreamingCompression(
  933. client: .deflate,
  934. server: .deflate,
  935. control: control,
  936. pair: pair
  937. )
  938. }
  939. }
  940. func testServerStreamingGzipCompression() async throws {
  941. try await self.forEachTransportPair(
  942. clientCompression: .gzip,
  943. clientEnabledCompression: .gzip,
  944. serverCompression: .gzip
  945. ) { control, _, pair in
  946. try await self.testServerStreamingCompression(
  947. client: .gzip,
  948. server: .gzip,
  949. control: control,
  950. pair: pair
  951. )
  952. }
  953. }
  954. func testBidiStreamingDeflateCompression() async throws {
  955. try await self.forEachTransportPair(
  956. clientCompression: .deflate,
  957. clientEnabledCompression: .deflate,
  958. serverCompression: .deflate
  959. ) { control, _, pair in
  960. try await self.testBidiStreamingCompression(
  961. client: .deflate,
  962. server: .deflate,
  963. control: control,
  964. pair: pair
  965. )
  966. }
  967. }
  968. func testBidiStreamingGzipCompression() async throws {
  969. try await self.forEachTransportPair(
  970. clientCompression: .gzip,
  971. clientEnabledCompression: .gzip,
  972. serverCompression: .gzip
  973. ) { control, _, pair in
  974. try await self.testBidiStreamingCompression(
  975. client: .gzip,
  976. server: .gzip,
  977. control: control,
  978. pair: pair
  979. )
  980. }
  981. }
  982. func testUnaryUnsupportedCompression() async throws {
  983. try await self.forEachTransportPair(
  984. clientEnabledCompression: .all,
  985. serverCompression: .gzip
  986. ) { control, _, pair in
  987. let message = ControlInput.with {
  988. $0.numberOfMessages = 1
  989. $0.payloadParameters = .with {
  990. $0.content = 42
  991. $0.size = 1024
  992. }
  993. }
  994. let request = ClientRequest(message: message)
  995. var options = CallOptions.defaults
  996. options.compression = .deflate
  997. try await control.unary(request: request, options: options) { response in
  998. switch response.accepted {
  999. case .success:
  1000. XCTFail("RPC should've been rejected")
  1001. case .failure(let error):
  1002. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1003. // "identity" may or may not be included, so only test for values which must be present.
  1004. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1005. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1006. }
  1007. }
  1008. }
  1009. }
  1010. func testClientStreamingUnsupportedCompression() async throws {
  1011. try await self.forEachTransportPair(
  1012. clientEnabledCompression: .all,
  1013. serverCompression: .gzip
  1014. ) { control, _, pair in
  1015. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1016. try await writer.write(.noOp)
  1017. }
  1018. var options = CallOptions.defaults
  1019. options.compression = .deflate
  1020. try await control.clientStream(request: request, options: options) { response in
  1021. switch response.accepted {
  1022. case .success:
  1023. XCTFail("RPC should've been rejected")
  1024. case .failure(let error):
  1025. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1026. // "identity" may or may not be included, so only test for values which must be present.
  1027. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1028. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1029. }
  1030. }
  1031. }
  1032. }
  1033. func testServerStreamingUnsupportedCompression() async throws {
  1034. try await self.forEachTransportPair(
  1035. clientEnabledCompression: .all,
  1036. serverCompression: .gzip
  1037. ) { control, _, pair in
  1038. let message = ControlInput.with {
  1039. $0.numberOfMessages = 1
  1040. $0.payloadParameters = .with {
  1041. $0.content = 42
  1042. $0.size = 1024
  1043. }
  1044. }
  1045. let request = ClientRequest(message: message)
  1046. var options = CallOptions.defaults
  1047. options.compression = .deflate
  1048. try await control.serverStream(request: request, options: options) { response in
  1049. switch response.accepted {
  1050. case .success:
  1051. XCTFail("RPC should've been rejected")
  1052. case .failure(let error):
  1053. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1054. // "identity" may or may not be included, so only test for values which must be present.
  1055. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1056. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1057. }
  1058. }
  1059. }
  1060. }
  1061. func testBidiStreamingUnsupportedCompression() async throws {
  1062. try await self.forEachTransportPair(
  1063. clientEnabledCompression: .all,
  1064. serverCompression: .gzip
  1065. ) { control, _, pair in
  1066. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1067. try await writer.write(.noOp)
  1068. }
  1069. var options = CallOptions.defaults
  1070. options.compression = .deflate
  1071. try await control.bidiStream(request: request, options: options) { response in
  1072. switch response.accepted {
  1073. case .success:
  1074. XCTFail("RPC should've been rejected")
  1075. case .failure(let error):
  1076. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1077. // "identity" may or may not be included, so only test for values which must be present.
  1078. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1079. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1080. }
  1081. }
  1082. }
  1083. }
  1084. func testUnaryTimeoutPropagatedToServer() async throws {
  1085. try await self.forEachTransportPair { control, _, pair in
  1086. let message = ControlInput.with {
  1087. $0.echoMetadataInHeaders = true
  1088. $0.numberOfMessages = 1
  1089. }
  1090. let request = ClientRequest(message: message)
  1091. var options = CallOptions.defaults
  1092. options.timeout = .seconds(10)
  1093. try await control.unary(request: request, options: options) { response in
  1094. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1095. XCTAssertEqual(timeout.count, 1)
  1096. }
  1097. }
  1098. }
  1099. func testClientStreamingTimeoutPropagatedToServer() async throws {
  1100. try await self.forEachTransportPair { control, _, pair in
  1101. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1102. let message = ControlInput.with {
  1103. $0.echoMetadataInHeaders = true
  1104. $0.numberOfMessages = 1
  1105. }
  1106. try await writer.write(message)
  1107. }
  1108. var options = CallOptions.defaults
  1109. options.timeout = .seconds(10)
  1110. try await control.clientStream(request: request, options: options) { response in
  1111. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1112. XCTAssertEqual(timeout.count, 1)
  1113. }
  1114. }
  1115. }
  1116. func testServerStreamingTimeoutPropagatedToServer() async throws {
  1117. try await self.forEachTransportPair { control, _, pair in
  1118. let message = ControlInput.with {
  1119. $0.echoMetadataInHeaders = true
  1120. $0.numberOfMessages = 1
  1121. }
  1122. let request = ClientRequest(message: message)
  1123. var options = CallOptions.defaults
  1124. options.timeout = .seconds(10)
  1125. try await control.serverStream(request: request, options: options) { response in
  1126. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1127. XCTAssertEqual(timeout.count, 1)
  1128. }
  1129. }
  1130. }
  1131. func testBidiStreamingTimeoutPropagatedToServer() async throws {
  1132. try await self.forEachTransportPair { control, _, pair in
  1133. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1134. try await writer.write(.echoMetadata)
  1135. }
  1136. var options = CallOptions.defaults
  1137. options.timeout = .seconds(10)
  1138. try await control.bidiStream(request: request, options: options) { response in
  1139. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1140. XCTAssertEqual(timeout.count, 1)
  1141. }
  1142. }
  1143. }
  1144. private static let httpToStatusCodePairs: [(Int, RPCError.Code)] = [
  1145. // See https://github.com/grpc/grpc/blob/7f664c69b2a636386fbf95c16bc78c559734ce0f/doc/http-grpc-status-mapping.md
  1146. (400, .internalError),
  1147. (401, .unauthenticated),
  1148. (403, .permissionDenied),
  1149. (404, .unimplemented),
  1150. (418, .unknown),
  1151. (429, .unavailable),
  1152. (502, .unavailable),
  1153. (503, .unavailable),
  1154. (504, .unavailable),
  1155. (504, .unavailable),
  1156. ]
  1157. func testUnaryAgainstNonGRPCServer() async throws {
  1158. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1159. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1160. // Tell the server what to respond with.
  1161. let metadata: Metadata = ["response-status": "\(httpCode)"]
  1162. try await control.unary(
  1163. request: ClientRequest(message: .noOp, metadata: metadata)
  1164. ) { response in
  1165. switch response.accepted {
  1166. case .success:
  1167. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1168. case .failure(let error):
  1169. XCTAssertEqual(error.code, expectedStatus)
  1170. }
  1171. }
  1172. }
  1173. }
  1174. }
  1175. func testClientStreamingAgainstNonGRPCServer() async throws {
  1176. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1177. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1178. // Tell the server what to respond with.
  1179. let request = StreamingClientRequest(
  1180. of: ControlInput.self,
  1181. metadata: ["response-status": "\(httpCode)"]
  1182. ) { _ in
  1183. }
  1184. try await control.clientStream(request: request) { response in
  1185. switch response.accepted {
  1186. case .success:
  1187. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1188. case .failure(let error):
  1189. XCTAssertEqual(error.code, expectedStatus)
  1190. }
  1191. }
  1192. }
  1193. }
  1194. }
  1195. func testServerStreamingAgainstNonGRPCServer() async throws {
  1196. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1197. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1198. // Tell the server what to respond with.
  1199. let metadata: Metadata = ["response-status": "\(httpCode)"]
  1200. try await control.serverStream(
  1201. request: ClientRequest(message: .noOp, metadata: metadata)
  1202. ) { response in
  1203. switch response.accepted {
  1204. case .success:
  1205. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1206. case .failure(let error):
  1207. XCTAssertEqual(error.code, expectedStatus)
  1208. }
  1209. }
  1210. }
  1211. }
  1212. }
  1213. func testBidiStreamingAgainstNonGRPCServer() async throws {
  1214. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1215. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1216. // Tell the server what to respond with.
  1217. let request = StreamingClientRequest(
  1218. of: ControlInput.self,
  1219. metadata: ["response-status": "\(httpCode)"]
  1220. ) { _ in
  1221. }
  1222. try await control.bidiStream(request: request) { response in
  1223. switch response.accepted {
  1224. case .success:
  1225. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1226. case .failure(let error):
  1227. XCTAssertEqual(error.code, expectedStatus)
  1228. }
  1229. }
  1230. }
  1231. }
  1232. }
  1233. func testUnaryScheme() async throws {
  1234. try await self.forEachTransportPair { control, _, pair in
  1235. let input = ControlInput.with {
  1236. $0.echoMetadataInHeaders = true
  1237. $0.numberOfMessages = 1
  1238. }
  1239. let request = ClientRequest(message: input)
  1240. try await control.unary(request: request) { response in
  1241. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1242. }
  1243. }
  1244. }
  1245. func testServerStreamingScheme() async throws {
  1246. try await self.forEachTransportPair { control, _, pair in
  1247. let input = ControlInput.with {
  1248. $0.echoMetadataInHeaders = true
  1249. $0.numberOfMessages = 1
  1250. }
  1251. let request = ClientRequest(message: input)
  1252. try await control.serverStream(request: request) { response in
  1253. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1254. // Workaround https://github.com/grpc/grpc-swift-nio-transport/issues/43
  1255. for try await _ in response.messages {}
  1256. }
  1257. }
  1258. }
  1259. func testClientStreamingScheme() async throws {
  1260. try await self.forEachTransportPair { control, _, pair in
  1261. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1262. let input = ControlInput.with {
  1263. $0.echoMetadataInHeaders = true
  1264. $0.numberOfMessages = 1
  1265. }
  1266. try await writer.write(input)
  1267. }
  1268. try await control.clientStream(request: request) { response in
  1269. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1270. }
  1271. }
  1272. }
  1273. func testBidiStreamingScheme() async throws {
  1274. try await self.forEachTransportPair { control, _, pair in
  1275. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1276. let input = ControlInput.with {
  1277. $0.echoMetadataInHeaders = true
  1278. $0.numberOfMessages = 1
  1279. }
  1280. try await writer.write(input)
  1281. }
  1282. try await control.bidiStream(request: request) { response in
  1283. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1284. // Workaround https://github.com/grpc/grpc-swift-nio-transport/issues/43
  1285. for try await _ in response.messages {}
  1286. }
  1287. }
  1288. }
  1289. func testServerCancellation() async throws {
  1290. for kind in [CancellationKind.awaitCancelled, .withCancellationHandler] {
  1291. try await self.forEachTransportPair { control, server, pair in
  1292. let request = ClientRequest(message: kind)
  1293. try await control.waitForCancellation(request: request) { response in
  1294. // Shutdown the client so that it doesn't attempt to reconnect when the server closes.
  1295. control.client.beginGracefulShutdown()
  1296. // Shutdown the server to cancel the RPC.
  1297. server.beginGracefulShutdown()
  1298. // The RPC should complete without any error or response.
  1299. let responses = try await response.messages.reduce(into: []) { $0.append($1) }
  1300. XCTAssert(responses.isEmpty)
  1301. }
  1302. }
  1303. }
  1304. }
  1305. func testUppercaseClientMetadataKey() async throws {
  1306. try await self.forEachTransportPair { control, _, _ in
  1307. let request = ClientRequest<ControlInput>(
  1308. message: .with {
  1309. $0.echoMetadataInHeaders = true
  1310. $0.numberOfMessages = 1
  1311. },
  1312. metadata: ["UPPERCASE-KEY": "value"]
  1313. )
  1314. try await control.unary(request: request) { response in
  1315. // Keys will be lowercase before being sent over the wire.
  1316. XCTAssertEqual(Array(response.metadata["echo-uppercase-key"]), ["value"])
  1317. }
  1318. }
  1319. }
  1320. func testUppercaseServerMetadataKey() async throws {
  1321. try await self.forEachTransportPair { control, _, _ in
  1322. let request = ClientRequest<ControlInput>(
  1323. message: .with {
  1324. $0.initialMetadataToAdd["UPPERCASE-KEY"] = "initial"
  1325. $0.trailingMetadataToAdd["UPPERCASE-KEY"] = "trailing"
  1326. $0.numberOfMessages = 1
  1327. }
  1328. )
  1329. try await control.unary(request: request) { response in
  1330. XCTAssertEqual(Array(response.metadata["uppercase-key"]), ["initial"])
  1331. XCTAssertEqual(Array(response.trailingMetadata["uppercase-key"]), ["trailing"])
  1332. }
  1333. }
  1334. }
  1335. private func checkAuthority(client: GRPCClient, expected: String) async throws {
  1336. let control = ControlClient(wrapping: client)
  1337. let input = ControlInput.with {
  1338. $0.echoMetadataInHeaders = true
  1339. $0.echoMetadataInTrailers = true
  1340. $0.numberOfMessages = 1
  1341. $0.payloadParameters = .with {
  1342. $0.content = 0
  1343. $0.size = 1024
  1344. }
  1345. }
  1346. try await control.unary(request: ClientRequest(message: input)) { response in
  1347. let initial = response.metadata
  1348. XCTAssertEqual(Array(initial["echo-authority"]), [.string(expected)])
  1349. }
  1350. }
  1351. private func testAuthority(
  1352. serverAddress: SocketAddress,
  1353. authorityOverride override: String? = nil,
  1354. clientTarget: (SocketAddress) -> any ResolvableTarget,
  1355. expectedAuthority: (SocketAddress) -> String
  1356. ) async throws {
  1357. try await withGRPCServer(
  1358. transport: .http2NIOPosix(
  1359. address: serverAddress,
  1360. transportSecurity: .plaintext
  1361. ),
  1362. services: [ControlService()]
  1363. ) { server in
  1364. guard let listeningAddress = try await server.listeningAddress else {
  1365. XCTFail("No listening address")
  1366. return
  1367. }
  1368. let target = clientTarget(listeningAddress)
  1369. try await withGRPCClient(
  1370. transport: .http2NIOPosix(
  1371. target: target,
  1372. transportSecurity: .plaintext,
  1373. config: .defaults {
  1374. $0.http2.authority = override
  1375. }
  1376. )
  1377. ) { client in
  1378. try await self.checkAuthority(client: client, expected: expectedAuthority(listeningAddress))
  1379. }
  1380. }
  1381. }
  1382. func testAuthorityDNS() async throws {
  1383. try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
  1384. return .dns(host: "localhost", port: address.ipv4!.port)
  1385. } expectedAuthority: { address in
  1386. return "localhost:\(address.ipv4!.port)"
  1387. }
  1388. }
  1389. func testOverrideAuthorityDNS() async throws {
  1390. try await self.testAuthority(
  1391. serverAddress: .ipv4(host: "127.0.0.1", port: 0),
  1392. authorityOverride: "respect-my-authority"
  1393. ) { address in
  1394. return .dns(host: "localhost", port: address.ipv4!.port)
  1395. } expectedAuthority: { _ in
  1396. return "respect-my-authority"
  1397. }
  1398. }
  1399. func testAuthorityIPv4() async throws {
  1400. try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
  1401. return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
  1402. } expectedAuthority: { address in
  1403. return "127.0.0.1:\(address.ipv4!.port)"
  1404. }
  1405. }
  1406. func testOverrideAuthorityIPv4() async throws {
  1407. try await self.testAuthority(
  1408. serverAddress: .ipv4(host: "127.0.0.1", port: 0),
  1409. authorityOverride: "respect-my-authority"
  1410. ) { address in
  1411. return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
  1412. } expectedAuthority: { _ in
  1413. return "respect-my-authority"
  1414. }
  1415. }
  1416. func testAuthorityIPv6() async throws {
  1417. try await self.testAuthority(serverAddress: .ipv6(host: "::1", port: 0)) { address in
  1418. return .ipv6(host: "::1", port: address.ipv6!.port)
  1419. } expectedAuthority: { address in
  1420. return "[::1]:\(address.ipv6!.port)"
  1421. }
  1422. }
  1423. func testOverrideAuthorityIPv6() async throws {
  1424. try await self.testAuthority(
  1425. serverAddress: .ipv6(host: "::1", port: 0),
  1426. authorityOverride: "respect-my-authority"
  1427. ) { address in
  1428. return .ipv6(host: "::1", port: address.ipv6!.port)
  1429. } expectedAuthority: { _ in
  1430. return "respect-my-authority"
  1431. }
  1432. }
  1433. func testAuthorityUDS() async throws {
  1434. let path = "test-authority-uds"
  1435. try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
  1436. return .unixDomainSocket(path: path)
  1437. } expectedAuthority: { _ in
  1438. return path
  1439. }
  1440. }
  1441. func testAuthorityLocalUDSOverride() async throws {
  1442. let path = "test-authority-local-uds-override"
  1443. try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
  1444. return .unixDomainSocket(path: path, authority: "respect-my-authority")
  1445. } expectedAuthority: { _ in
  1446. return "respect-my-authority"
  1447. }
  1448. }
  1449. func testOverrideAuthorityUDS() async throws {
  1450. let path = "test-override-authority-uds"
  1451. try await self.testAuthority(
  1452. serverAddress: .unixDomainSocket(path: path),
  1453. authorityOverride: "respect-my-authority"
  1454. ) { _ in
  1455. return .unixDomainSocket(path: path, authority: "should-be-ignored")
  1456. } expectedAuthority: { _ in
  1457. return "respect-my-authority"
  1458. }
  1459. }
  1460. func testPeerInfoIPv4() async throws {
  1461. try await self.forEachTransportPair(
  1462. serverAddress: .ipv4(host: "127.0.0.1", port: 0)
  1463. ) { control, _, _ in
  1464. let peerInfo = try await control.peerInfo()
  1465. let matches = peerInfo.matches(of: /ipv4:127.0.0.1:\d+/)
  1466. XCTAssertNotNil(matches)
  1467. }
  1468. }
  1469. func testPeerInfoIPv6() async throws {
  1470. try await self.forEachTransportPair(
  1471. serverAddress: .ipv6(host: "::1", port: 0)
  1472. ) { control, _, _ in
  1473. let peerInfo = try await control.peerInfo()
  1474. let matches = peerInfo.matches(of: /ipv6:[::1]:\d+/)
  1475. XCTAssertNotNil(matches)
  1476. }
  1477. }
  1478. func testPeerInfoUDS() async throws {
  1479. let path = "peer-info-uds"
  1480. try await self.forEachTransportPair(
  1481. serverAddress: .unixDomainSocket(path: path)
  1482. ) { control, _, _ in
  1483. let peerInfo = try await control.peerInfo()
  1484. XCTAssertEqual(peerInfo, "unix:peer-info-uds")
  1485. }
  1486. }
  1487. }
  1488. extension [HTTP2TransportTests.Transport] {
  1489. static let supported = [
  1490. HTTP2TransportTests.Transport(server: .posix, client: .posix),
  1491. HTTP2TransportTests.Transport(server: .niots, client: .niots),
  1492. HTTP2TransportTests.Transport(server: .niots, client: .posix),
  1493. HTTP2TransportTests.Transport(server: .posix, client: .niots),
  1494. ]
  1495. }
  1496. extension ControlInput {
  1497. fileprivate static let echoMetadata = Self.with {
  1498. $0.echoMetadataInHeaders = true
  1499. }
  1500. fileprivate static let noOp = Self()
  1501. fileprivate static func messages(
  1502. _ numberOfMessages: Int,
  1503. repeating: UInt8,
  1504. count: Int
  1505. ) -> Self {
  1506. return Self.with {
  1507. $0.numberOfMessages = numberOfMessages
  1508. $0.payloadParameters = .with {
  1509. $0.content = repeating
  1510. $0.size = count
  1511. }
  1512. }
  1513. }
  1514. fileprivate static func status(
  1515. code: GRPCCore.Status.Code,
  1516. message: String,
  1517. echoMetadata: Bool
  1518. ) -> Self {
  1519. return Self.with {
  1520. $0.echoMetadataInTrailers = echoMetadata
  1521. $0.status = .with {
  1522. $0.code = code
  1523. $0.message = message
  1524. }
  1525. }
  1526. }
  1527. fileprivate static func trailersOnly(
  1528. code: GRPCCore.Status.Code,
  1529. message: String,
  1530. echoMetadata: Bool
  1531. ) -> Self {
  1532. return Self.with {
  1533. $0.echoMetadataInTrailers = echoMetadata
  1534. $0.isTrailersOnly = true
  1535. $0.status = .with {
  1536. $0.code = code
  1537. $0.message = message
  1538. }
  1539. }
  1540. }
  1541. }
  1542. extension CompressionAlgorithm {
  1543. var name: String {
  1544. switch self {
  1545. case .deflate:
  1546. return "deflate"
  1547. case .gzip:
  1548. return "gzip"
  1549. case .none:
  1550. return "identity"
  1551. default:
  1552. return ""
  1553. }
  1554. }
  1555. }