HTTP2TransportTests.swift 56 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCNIOTransportCore
  18. import GRPCNIOTransportHTTP2Posix
  19. import GRPCNIOTransportHTTP2TransportServices
  20. import XCTest
  21. final class HTTP2TransportTests: XCTestCase {
  22. // A combination of client and server transport kinds.
  23. struct Transport: Sendable, CustomStringConvertible {
  24. var server: Kind
  25. var client: Kind
  26. enum Kind: Sendable, CustomStringConvertible {
  27. case posix
  28. case niots
  29. var description: String {
  30. switch self {
  31. case .posix:
  32. return "NIOPosix"
  33. case .niots:
  34. return "NIOTS"
  35. }
  36. }
  37. }
  38. var description: String {
  39. "server=\(self.server) client=\(self.client)"
  40. }
  41. }
  42. func forEachTransportPair(
  43. _ transport: [Transport] = [.init(server: .posix, client: .posix)],
  44. serverAddress: SocketAddress = .ipv4(host: "127.0.0.1", port: 0),
  45. enableControlService: Bool = true,
  46. clientCompression: CompressionAlgorithm = .none,
  47. clientEnabledCompression: CompressionAlgorithmSet = .none,
  48. serverCompression: CompressionAlgorithmSet = .none,
  49. _ execute: (ControlClient, GRPCServer, Transport) async throws -> Void
  50. ) async throws {
  51. for pair in transport {
  52. try await withThrowingTaskGroup(of: Void.self) { group in
  53. let (server, address) = try await self.runServer(
  54. in: &group,
  55. address: serverAddress,
  56. kind: pair.server,
  57. enableControlService: enableControlService,
  58. compression: serverCompression
  59. )
  60. let target: any ResolvableTarget
  61. if let ipv4 = address.ipv4 {
  62. target = .ipv4(host: ipv4.host, port: ipv4.port)
  63. } else if let ipv6 = address.ipv6 {
  64. target = .ipv6(host: ipv6.host, port: ipv6.port)
  65. } else if let uds = address.unixDomainSocket {
  66. target = .unixDomainSocket(path: uds.path)
  67. } else {
  68. XCTFail("Unexpected address to connect to")
  69. return
  70. }
  71. let client = try self.makeClient(
  72. kind: pair.client,
  73. target: target,
  74. compression: clientCompression,
  75. enabledCompression: clientEnabledCompression
  76. )
  77. group.addTask {
  78. try await client.run()
  79. }
  80. do {
  81. let control = ControlClient(wrapping: client)
  82. try await execute(control, server, pair)
  83. } catch {
  84. XCTFail("Unexpected error: '\(error)' (\(pair))")
  85. }
  86. server.beginGracefulShutdown()
  87. client.beginGracefulShutdown()
  88. }
  89. }
  90. }
  91. func forEachClientAndHTTPStatusCodeServer(
  92. _ kind: [Transport.Kind] = [.posix, .niots],
  93. _ execute: (ControlClient, Transport.Kind) async throws -> Void
  94. ) async throws {
  95. for clientKind in kind {
  96. try await withThrowingTaskGroup(of: Void.self) { group in
  97. let server = HTTP2StatusCodeServer()
  98. group.addTask {
  99. try await server.run()
  100. }
  101. let address = try await server.listeningAddress
  102. let client = try self.makeClient(
  103. kind: clientKind,
  104. target: .ipv4(host: address.host, port: address.port),
  105. compression: .none,
  106. enabledCompression: .none
  107. )
  108. group.addTask {
  109. try await client.run()
  110. }
  111. do {
  112. let control = ControlClient(wrapping: client)
  113. try await execute(control, clientKind)
  114. } catch {
  115. XCTFail("Unexpected error: '\(error)' (\(clientKind))")
  116. }
  117. group.cancelAll()
  118. }
  119. }
  120. }
  121. private func runServer(
  122. in group: inout ThrowingTaskGroup<Void, any Error>,
  123. address: SocketAddress,
  124. kind: Transport.Kind,
  125. enableControlService: Bool,
  126. compression: CompressionAlgorithmSet
  127. ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) {
  128. let services = enableControlService ? [ControlService()] : []
  129. switch kind {
  130. case .posix:
  131. let server = GRPCServer(
  132. transport: .http2NIOPosix(
  133. address: address,
  134. config: .defaults(transportSecurity: .plaintext) {
  135. $0.compression.enabledAlgorithms = compression
  136. }
  137. ),
  138. services: services
  139. )
  140. group.addTask {
  141. try await server.serve()
  142. }
  143. let address = try await server.listeningAddress!
  144. return (server, address)
  145. case .niots:
  146. #if canImport(Network)
  147. let server = GRPCServer(
  148. transport: .http2NIOTS(
  149. address: address,
  150. config: .defaults(transportSecurity: .plaintext) {
  151. $0.compression.enabledAlgorithms = compression
  152. }
  153. ),
  154. services: services
  155. )
  156. group.addTask {
  157. try await server.serve()
  158. }
  159. let address = try await server.listeningAddress!
  160. return (server, address)
  161. #else
  162. throw XCTSkip("Transport not supported on this platform")
  163. #endif
  164. }
  165. }
  166. private func makeClient(
  167. kind: Transport.Kind,
  168. target: any ResolvableTarget,
  169. compression: CompressionAlgorithm,
  170. enabledCompression: CompressionAlgorithmSet
  171. ) throws -> GRPCClient {
  172. let transport: any ClientTransport
  173. switch kind {
  174. case .posix:
  175. var serviceConfig = ServiceConfig()
  176. serviceConfig.loadBalancingConfig = [.roundRobin]
  177. transport = try HTTP2ClientTransport.Posix(
  178. target: target,
  179. config: .defaults(transportSecurity: .plaintext) {
  180. $0.compression.algorithm = compression
  181. $0.compression.enabledAlgorithms = enabledCompression
  182. },
  183. serviceConfig: serviceConfig
  184. )
  185. case .niots:
  186. #if canImport(Network)
  187. var serviceConfig = ServiceConfig()
  188. serviceConfig.loadBalancingConfig = [.roundRobin]
  189. transport = try HTTP2ClientTransport.TransportServices(
  190. target: target,
  191. config: .defaults(transportSecurity: .plaintext) {
  192. $0.compression.algorithm = compression
  193. $0.compression.enabledAlgorithms = enabledCompression
  194. },
  195. serviceConfig: serviceConfig
  196. )
  197. #else
  198. throw XCTSkip("Transport not supported on this platform")
  199. #endif
  200. }
  201. return GRPCClient(transport: transport)
  202. }
  203. func testUnaryOK() async throws {
  204. // Client sends one message, server sends back metadata, a single message, and an ok status with
  205. // trailing metadata.
  206. try await self.forEachTransportPair { control, _, pair in
  207. let input = ControlInput.with {
  208. $0.echoMetadataInHeaders = true
  209. $0.echoMetadataInTrailers = true
  210. $0.numberOfMessages = 1
  211. $0.payloadParameters = .with {
  212. $0.content = 0
  213. $0.size = 1024
  214. }
  215. }
  216. let metadata: Metadata = ["test-key": "test-value"]
  217. let request = ClientRequest(message: input, metadata: metadata)
  218. try await control.unary(request: request) { response in
  219. let message = try response.message
  220. XCTAssertEqual(message.payload, Data(repeating: 0, count: 1024), "\(pair)")
  221. let initial = response.metadata
  222. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  223. let trailing = response.trailingMetadata
  224. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  225. }
  226. }
  227. }
  228. func testUnaryNotOK() async throws {
  229. // Client sends one message, server sends back metadata, a single message, and a non-ok status
  230. // with trailing metadata.
  231. try await self.forEachTransportPair { control, _, pair in
  232. let input = ControlInput.with {
  233. $0.echoMetadataInTrailers = true
  234. $0.numberOfMessages = 1
  235. $0.payloadParameters = .with {
  236. $0.content = 0
  237. $0.size = 1024
  238. }
  239. $0.status = .with {
  240. $0.code = .aborted
  241. $0.message = "\(#function)"
  242. }
  243. }
  244. let metadata: Metadata = ["test-key": "test-value"]
  245. let request = ClientRequest(message: input, metadata: metadata)
  246. try await control.unary(request: request) { response in
  247. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  248. XCTAssertEqual(error.code, .aborted)
  249. XCTAssertEqual(error.message, "\(#function)")
  250. let trailing = error.metadata
  251. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  252. }
  253. let trailing = response.trailingMetadata
  254. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  255. }
  256. }
  257. }
  258. func testUnaryRejected() async throws {
  259. // Client sends one message, server sends non-ok status with trailing metadata.
  260. try await self.forEachTransportPair { control, _, pair in
  261. let metadata: Metadata = ["test-key": "test-value"]
  262. let request = ClientRequest<ControlInput>(
  263. message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
  264. metadata: metadata
  265. )
  266. try await control.unary(request: request) { response in
  267. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  268. XCTAssertEqual(error.code, .aborted, "\(pair)")
  269. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  270. let trailing = error.metadata
  271. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  272. }
  273. // No initial metadata for trailers-only.
  274. XCTAssertEqual(response.metadata, [:])
  275. let trailing = response.trailingMetadata
  276. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  277. }
  278. }
  279. }
  280. func testClientStreamingOK() async throws {
  281. try await self.forEachTransportPair { control, _, pair in
  282. let metadata: Metadata = ["test-key": "test-value"]
  283. let request = StreamingClientRequest(
  284. of: ControlInput.self,
  285. metadata: metadata
  286. ) { writer in
  287. try await writer.write(.echoMetadata)
  288. // Send a few messages which are ignored.
  289. try await writer.write(.noOp)
  290. try await writer.write(.noOp)
  291. try await writer.write(.noOp)
  292. // Send a message.
  293. try await writer.write(.messages(1, repeating: 42, count: 1024))
  294. // ... and the final status.
  295. try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
  296. }
  297. try await control.clientStream(request: request) { response in
  298. let message = try response.message
  299. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  300. let initial = response.metadata
  301. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  302. let trailing = response.trailingMetadata
  303. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  304. }
  305. }
  306. }
  307. func testClientStreamingNotOK() async throws {
  308. try await self.forEachTransportPair { control, _, pair in
  309. let metadata: Metadata = ["test-key": "test-value"]
  310. let request = StreamingClientRequest(
  311. of: ControlInput.self,
  312. metadata: metadata
  313. ) { writer in
  314. try await writer.write(.echoMetadata)
  315. // Send a few messages which are ignored.
  316. try await writer.write(.noOp)
  317. try await writer.write(.noOp)
  318. try await writer.write(.noOp)
  319. // Send a message.
  320. try await writer.write(.messages(1, repeating: 42, count: 1024))
  321. // Send the final status.
  322. try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
  323. }
  324. try await control.clientStream(request: request) { response in
  325. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  326. XCTAssertEqual(error.code, .aborted, "\(pair)")
  327. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  328. let trailing = error.metadata
  329. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  330. }
  331. let initial = response.metadata
  332. XCTAssertEqual(Array(initial["echo-test-key"]), ["test-value"], "\(pair)")
  333. let trailing = response.trailingMetadata
  334. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  335. }
  336. }
  337. }
  338. func testClientStreamingRejected() async throws {
  339. // Client sends one message, server sends non-ok status with trailing metadata.
  340. try await self.forEachTransportPair { control, _, pair in
  341. let metadata: Metadata = ["test-key": "test-value"]
  342. let request = StreamingClientRequest(
  343. of: ControlInput.self,
  344. metadata: metadata
  345. ) { writer in
  346. let message: ControlInput = .trailersOnly(
  347. code: .aborted,
  348. message: "\(#function)",
  349. echoMetadata: true
  350. )
  351. try await writer.write(message)
  352. }
  353. try await control.clientStream(request: request) { response in
  354. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  355. XCTAssertEqual(error.code, .aborted, "\(pair)")
  356. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  357. let trailing = error.metadata
  358. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  359. }
  360. // No initial metadata for trailers-only.
  361. XCTAssertEqual(response.metadata, [:])
  362. let trailing = response.trailingMetadata
  363. XCTAssertEqual(Array(trailing["echo-test-key"]), ["test-value"], "\(pair)")
  364. }
  365. }
  366. }
  367. func testServerStreamingOK() async throws {
  368. try await self.forEachTransportPair { control, _, pair in
  369. let metadata: Metadata = ["test-key": "test-value"]
  370. let input = ControlInput.with {
  371. $0.echoMetadataInHeaders = true
  372. $0.echoMetadataInTrailers = true
  373. $0.numberOfMessages = 5
  374. $0.payloadParameters = .with {
  375. $0.content = 42
  376. $0.size = 1024
  377. }
  378. }
  379. let request = ClientRequest(message: input, metadata: metadata)
  380. try await control.serverStream(request: request) { response in
  381. switch response.accepted {
  382. case .success(let contents):
  383. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  384. var messagesReceived = 0
  385. for try await part in contents.bodyParts {
  386. switch part {
  387. case .message(let message):
  388. messagesReceived += 1
  389. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  390. case .trailingMetadata(let metadata):
  391. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  392. }
  393. }
  394. XCTAssertEqual(messagesReceived, 5)
  395. case .failure(let error):
  396. throw error
  397. }
  398. }
  399. }
  400. }
  401. func testServerStreamingEmptyOK() async throws {
  402. try await self.forEachTransportPair { control, _, pair in
  403. let metadata: Metadata = ["test-key": "test-value"]
  404. // Echo back metadata, but don't send any messages.
  405. let input = ControlInput.with {
  406. $0.echoMetadataInHeaders = true
  407. $0.echoMetadataInTrailers = true
  408. }
  409. let request = ClientRequest(message: input, metadata: metadata)
  410. try await control.serverStream(request: request) { response in
  411. switch response.accepted {
  412. case .success(let contents):
  413. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  414. for try await part in contents.bodyParts {
  415. switch part {
  416. case .message:
  417. XCTFail("Unexpected message")
  418. case .trailingMetadata(let metadata):
  419. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  420. }
  421. }
  422. case .failure(let error):
  423. throw error
  424. }
  425. }
  426. }
  427. }
  428. func testServerStreamingNotOK() async throws {
  429. try await self.forEachTransportPair { control, _, pair in
  430. let metadata: Metadata = ["test-key": "test-value"]
  431. let input = ControlInput.with {
  432. $0.echoMetadataInHeaders = true
  433. $0.echoMetadataInTrailers = true
  434. $0.numberOfMessages = 5
  435. $0.payloadParameters = .with {
  436. $0.content = 42
  437. $0.size = 1024
  438. }
  439. $0.status = .with {
  440. $0.code = .aborted
  441. $0.message = "\(#function)"
  442. }
  443. }
  444. let request = ClientRequest(message: input, metadata: metadata)
  445. try await control.serverStream(request: request) { response in
  446. switch response.accepted {
  447. case .success(let contents):
  448. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  449. var messagesReceived = 0
  450. do {
  451. for try await part in contents.bodyParts {
  452. switch part {
  453. case .message(let message):
  454. messagesReceived += 1
  455. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  456. case .trailingMetadata:
  457. XCTFail("Unexpected trailing metadata, should be provided in RPCError")
  458. }
  459. }
  460. XCTFail("Expected error to be thrown")
  461. } catch let error as RPCError {
  462. XCTAssertEqual(error.code, .aborted)
  463. XCTAssertEqual(error.message, "\(#function)")
  464. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  465. }
  466. XCTAssertEqual(messagesReceived, 5)
  467. case .failure(let error):
  468. throw error
  469. }
  470. }
  471. }
  472. }
  473. func testServerStreamingEmptyNotOK() async throws {
  474. try await self.forEachTransportPair { control, _, pair in
  475. let metadata: Metadata = ["test-key": "test-value"]
  476. let input = ControlInput.with {
  477. $0.echoMetadataInHeaders = true
  478. $0.echoMetadataInTrailers = true
  479. $0.status = .with {
  480. $0.code = .aborted
  481. $0.message = "\(#function)"
  482. }
  483. }
  484. let request = ClientRequest(message: input, metadata: metadata)
  485. try await control.serverStream(request: request) { response in
  486. switch response.accepted {
  487. case .success(let contents):
  488. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  489. do {
  490. for try await _ in contents.bodyParts {
  491. XCTFail("Unexpected message, \(pair)")
  492. }
  493. XCTFail("Expected error to be thrown")
  494. } catch let error as RPCError {
  495. XCTAssertEqual(error.code, .aborted)
  496. XCTAssertEqual(error.message, "\(#function)")
  497. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  498. }
  499. case .failure(let error):
  500. throw error
  501. }
  502. }
  503. }
  504. }
  505. func testServerStreamingRejected() async throws {
  506. try await self.forEachTransportPair { control, _, pair in
  507. let metadata: Metadata = ["test-key": "test-value"]
  508. let request = ClientRequest<ControlInput>(
  509. message: .trailersOnly(code: .aborted, message: "\(#function)", echoMetadata: true),
  510. metadata: metadata
  511. )
  512. try await control.serverStream(request: request) { response in
  513. switch response.accepted {
  514. case .success:
  515. XCTFail("Expected RPC to be rejected \(pair)")
  516. case .failure(let error):
  517. XCTAssertEqual(error.code, .aborted, "\(pair)")
  518. XCTAssertEqual(error.message, "\(#function)", "\(pair)")
  519. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  520. }
  521. }
  522. }
  523. }
  524. func testBidiStreamingOK() async throws {
  525. try await self.forEachTransportPair { control, _, pair in
  526. let metadata: Metadata = ["test-key": "test-value"]
  527. let request = StreamingClientRequest(
  528. of: ControlInput.self,
  529. metadata: metadata
  530. ) { writer in
  531. try await writer.write(.echoMetadata)
  532. // Send a few messages, each is echo'd back.
  533. try await writer.write(.messages(1, repeating: 42, count: 1024))
  534. try await writer.write(.messages(1, repeating: 42, count: 1024))
  535. try await writer.write(.messages(1, repeating: 42, count: 1024))
  536. // Send the final status.
  537. try await writer.write(.status(code: .ok, message: "", echoMetadata: true))
  538. }
  539. try await control.bidiStream(request: request) { response in
  540. switch response.accepted {
  541. case .success(let contents):
  542. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  543. var messagesReceived = 0
  544. for try await part in contents.bodyParts {
  545. switch part {
  546. case .message(let message):
  547. messagesReceived += 1
  548. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  549. case .trailingMetadata(let metadata):
  550. XCTAssertEqual(Array(metadata["echo-test-key"]), ["test-value"], "\(pair)")
  551. }
  552. }
  553. XCTAssertEqual(messagesReceived, 3)
  554. case .failure(let error):
  555. throw error
  556. }
  557. }
  558. }
  559. }
  560. func testBidiStreamingEmptyOK() async throws {
  561. try await self.forEachTransportPair { control, _, pair in
  562. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  563. try await control.bidiStream(request: request) { response in
  564. switch response.accepted {
  565. case .success(let contents):
  566. var receivedTrailingMetadata = false
  567. for try await part in contents.bodyParts {
  568. switch part {
  569. case .message:
  570. XCTFail("Unexpected message \(pair)")
  571. case .trailingMetadata:
  572. XCTAssertFalse(receivedTrailingMetadata, "\(pair)")
  573. receivedTrailingMetadata = true
  574. }
  575. }
  576. case .failure(let error):
  577. throw error
  578. }
  579. }
  580. }
  581. }
  582. func testBidiStreamingNotOK() async throws {
  583. try await self.forEachTransportPair { control, _, pair in
  584. let metadata: Metadata = ["test-key": "test-value"]
  585. let request = StreamingClientRequest(
  586. of: ControlInput.self,
  587. metadata: metadata
  588. ) { writer in
  589. try await writer.write(.echoMetadata)
  590. // Send a few messages, each is echo'd back.
  591. try await writer.write(.messages(1, repeating: 42, count: 1024))
  592. try await writer.write(.messages(1, repeating: 42, count: 1024))
  593. try await writer.write(.messages(1, repeating: 42, count: 1024))
  594. // Send the final status.
  595. try await writer.write(.status(code: .aborted, message: "\(#function)", echoMetadata: true))
  596. }
  597. try await control.bidiStream(request: request) { response in
  598. switch response.accepted {
  599. case .success(let contents):
  600. XCTAssertEqual(Array(contents.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  601. var messagesReceived = 0
  602. do {
  603. for try await part in contents.bodyParts {
  604. switch part {
  605. case .message(let message):
  606. messagesReceived += 1
  607. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024))
  608. case .trailingMetadata:
  609. XCTFail("Trailing metadata should be provided by error")
  610. }
  611. }
  612. XCTFail("Should've thrown error \(pair)")
  613. } catch let error as RPCError {
  614. XCTAssertEqual(error.code, .aborted)
  615. XCTAssertEqual(error.message, "\(#function)")
  616. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"], "\(pair)")
  617. }
  618. XCTAssertEqual(messagesReceived, 3)
  619. case .failure(let error):
  620. throw error
  621. }
  622. }
  623. }
  624. }
  625. func testBidiStreamingRejected() async throws {
  626. try await self.forEachTransportPair { control, _, pair in
  627. let metadata: Metadata = ["test-key": "test-value"]
  628. let request = StreamingClientRequest(
  629. of: ControlInput.self,
  630. metadata: metadata
  631. ) { writer in
  632. try await writer.write(
  633. .trailersOnly(
  634. code: .aborted,
  635. message: "\(#function)",
  636. echoMetadata: true
  637. )
  638. )
  639. }
  640. try await control.bidiStream(request: request) { response in
  641. switch response.accepted {
  642. case .success:
  643. XCTFail("Expected RPC to fail \(pair)")
  644. case .failure(let error):
  645. XCTAssertEqual(error.code, .aborted)
  646. XCTAssertEqual(error.message, "\(#function)")
  647. XCTAssertEqual(Array(error.metadata["echo-test-key"]), ["test-value"])
  648. }
  649. }
  650. }
  651. }
  652. // MARK: - Not Implemented
  653. func testUnaryNotImplemented() async throws {
  654. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  655. let request = ClientRequest(message: ControlInput())
  656. try await control.unary(request: request) { response in
  657. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  658. XCTAssertEqual(error.code, .unimplemented)
  659. }
  660. }
  661. }
  662. }
  663. func testClientStreamingNotImplemented() async throws {
  664. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  665. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  666. try await control.clientStream(request: request) { response in
  667. XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
  668. XCTAssertEqual(error.code, .unimplemented)
  669. }
  670. }
  671. }
  672. }
  673. func testServerStreamingNotImplemented() async throws {
  674. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  675. let request = ClientRequest(message: ControlInput())
  676. try await control.serverStream(request: request) { response in
  677. XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
  678. XCTAssertEqual(error.code, .unimplemented)
  679. }
  680. }
  681. }
  682. }
  683. func testBidiStreamingNotImplemented() async throws {
  684. try await self.forEachTransportPair(enableControlService: false) { control, _, pair in
  685. let request = StreamingClientRequest(of: ControlInput.self) { _ in }
  686. try await control.bidiStream(request: request) { response in
  687. XCTAssertThrowsError(ofType: RPCError.self, try response.accepted.get()) { error in
  688. XCTAssertEqual(error.code, .unimplemented)
  689. }
  690. }
  691. }
  692. }
  693. // MARK: - Compression tests
  694. private func testUnaryCompression(
  695. client: CompressionAlgorithm,
  696. server: CompressionAlgorithm,
  697. control: ControlClient,
  698. pair: Transport
  699. ) async throws {
  700. let message = ControlInput.with {
  701. $0.echoMetadataInHeaders = true
  702. $0.numberOfMessages = 1
  703. $0.payloadParameters = .with {
  704. $0.content = 42
  705. $0.size = 1024
  706. }
  707. }
  708. var options = CallOptions.defaults
  709. options.compression = client
  710. try await control.unary(
  711. request: ClientRequest(message: message),
  712. options: options
  713. ) { response in
  714. // Check the client algorithm.
  715. switch client {
  716. case .deflate, .gzip:
  717. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  718. let encoding = Array(response.metadata["echo-grpc-encoding"])
  719. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  720. case .none:
  721. ()
  722. default:
  723. XCTFail("Unhandled compression '\(client)'")
  724. }
  725. // Check the server algorithm.
  726. switch server {
  727. case .deflate, .gzip:
  728. let encoding = Array(response.metadata["grpc-encoding"])
  729. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  730. case .none:
  731. ()
  732. default:
  733. XCTFail("Unhandled compression '\(client)'")
  734. }
  735. let message = try response.message
  736. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  737. }
  738. }
  739. private func testClientStreamingCompression(
  740. client: CompressionAlgorithm,
  741. server: CompressionAlgorithm,
  742. control: ControlClient,
  743. pair: Transport
  744. ) async throws {
  745. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  746. try await writer.write(.echoMetadata)
  747. try await writer.write(.noOp)
  748. try await writer.write(.noOp)
  749. try await writer.write(.messages(1, repeating: 42, count: 1024))
  750. }
  751. var options = CallOptions.defaults
  752. options.compression = client
  753. try await control.clientStream(request: request, options: options) { response in
  754. // Check the client algorithm.
  755. switch client {
  756. case .deflate, .gzip:
  757. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  758. let encoding = Array(response.metadata["echo-grpc-encoding"])
  759. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  760. case .none:
  761. ()
  762. default:
  763. XCTFail("Unhandled compression '\(client)'")
  764. }
  765. // Check the server algorithm.
  766. switch server {
  767. case .deflate, .gzip:
  768. let encoding = Array(response.metadata["grpc-encoding"])
  769. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  770. case .none:
  771. ()
  772. default:
  773. XCTFail("Unhandled compression '\(client)'")
  774. }
  775. let message = try response.message
  776. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  777. }
  778. }
  779. private func testServerStreamingCompression(
  780. client: CompressionAlgorithm,
  781. server: CompressionAlgorithm,
  782. control: ControlClient,
  783. pair: Transport
  784. ) async throws {
  785. let message = ControlInput.with {
  786. $0.echoMetadataInHeaders = true
  787. $0.numberOfMessages = 5
  788. $0.payloadParameters = .with {
  789. $0.content = 42
  790. $0.size = 1024
  791. }
  792. }
  793. var options = CallOptions.defaults
  794. options.compression = client
  795. try await control.serverStream(
  796. request: ClientRequest(message: message),
  797. options: options
  798. ) { response in
  799. // Check the client algorithm.
  800. switch client {
  801. case .deflate, .gzip:
  802. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  803. let encoding = Array(response.metadata["echo-grpc-encoding"])
  804. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  805. case .none:
  806. ()
  807. default:
  808. XCTFail("Unhandled compression '\(client)'")
  809. }
  810. // Check the server algorithm.
  811. switch server {
  812. case .deflate, .gzip:
  813. let encoding = Array(response.metadata["grpc-encoding"])
  814. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  815. case .none:
  816. ()
  817. default:
  818. XCTFail("Unhandled compression '\(client)'")
  819. }
  820. for try await message in response.messages {
  821. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  822. }
  823. }
  824. }
  825. private func testBidiStreamingCompression(
  826. client: CompressionAlgorithm,
  827. server: CompressionAlgorithm,
  828. control: ControlClient,
  829. pair: Transport
  830. ) async throws {
  831. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  832. try await writer.write(.echoMetadata)
  833. try await writer.write(.messages(1, repeating: 42, count: 1024))
  834. try await writer.write(.messages(1, repeating: 42, count: 1024))
  835. try await writer.write(.messages(1, repeating: 42, count: 1024))
  836. }
  837. var options = CallOptions.defaults
  838. options.compression = client
  839. try await control.bidiStream(request: request, options: options) { response in
  840. // Check the client algorithm.
  841. switch client {
  842. case .deflate, .gzip:
  843. // "echo-grpc-encoding" is the value of "grpc-encoding" sent from the client to the server.
  844. let encoding = Array(response.metadata["echo-grpc-encoding"])
  845. XCTAssertEqual(encoding, ["\(client.name)"], "\(pair)")
  846. case .none:
  847. ()
  848. default:
  849. XCTFail("Unhandled compression '\(client)'")
  850. }
  851. // Check the server algorithm.
  852. switch server {
  853. case .deflate, .gzip:
  854. let encoding = Array(response.metadata["grpc-encoding"])
  855. XCTAssertEqual(encoding, ["\(server.name)"], "\(pair)")
  856. case .none:
  857. ()
  858. default:
  859. XCTFail("Unhandled compression '\(client)'")
  860. }
  861. for try await message in response.messages {
  862. XCTAssertEqual(message.payload, Data(repeating: 42, count: 1024), "\(pair)")
  863. }
  864. }
  865. }
  866. func testUnaryDeflateCompression() async throws {
  867. try await self.forEachTransportPair(
  868. clientCompression: .deflate,
  869. clientEnabledCompression: .deflate,
  870. serverCompression: .deflate
  871. ) { control, _, pair in
  872. try await self.testUnaryCompression(
  873. client: .deflate,
  874. server: .deflate,
  875. control: control,
  876. pair: pair
  877. )
  878. }
  879. }
  880. func testUnaryGzipCompression() async throws {
  881. try await self.forEachTransportPair(
  882. clientCompression: .gzip,
  883. clientEnabledCompression: .gzip,
  884. serverCompression: .gzip
  885. ) { control, _, pair in
  886. try await self.testUnaryCompression(
  887. client: .gzip,
  888. server: .gzip,
  889. control: control,
  890. pair: pair
  891. )
  892. }
  893. }
  894. func testClientStreamingDeflateCompression() async throws {
  895. try await self.forEachTransportPair(
  896. clientCompression: .deflate,
  897. clientEnabledCompression: .deflate,
  898. serverCompression: .deflate
  899. ) { control, _, pair in
  900. try await self.testClientStreamingCompression(
  901. client: .deflate,
  902. server: .deflate,
  903. control: control,
  904. pair: pair
  905. )
  906. }
  907. }
  908. func testClientStreamingGzipCompression() async throws {
  909. try await self.forEachTransportPair(
  910. clientCompression: .gzip,
  911. clientEnabledCompression: .gzip,
  912. serverCompression: .gzip
  913. ) { control, _, pair in
  914. try await self.testClientStreamingCompression(
  915. client: .gzip,
  916. server: .gzip,
  917. control: control,
  918. pair: pair
  919. )
  920. }
  921. }
  922. func testServerStreamingDeflateCompression() async throws {
  923. try await self.forEachTransportPair(
  924. clientCompression: .deflate,
  925. clientEnabledCompression: .deflate,
  926. serverCompression: .deflate
  927. ) { control, _, pair in
  928. try await self.testServerStreamingCompression(
  929. client: .deflate,
  930. server: .deflate,
  931. control: control,
  932. pair: pair
  933. )
  934. }
  935. }
  936. func testServerStreamingGzipCompression() async throws {
  937. try await self.forEachTransportPair(
  938. clientCompression: .gzip,
  939. clientEnabledCompression: .gzip,
  940. serverCompression: .gzip
  941. ) { control, _, pair in
  942. try await self.testServerStreamingCompression(
  943. client: .gzip,
  944. server: .gzip,
  945. control: control,
  946. pair: pair
  947. )
  948. }
  949. }
  950. func testBidiStreamingDeflateCompression() async throws {
  951. try await self.forEachTransportPair(
  952. clientCompression: .deflate,
  953. clientEnabledCompression: .deflate,
  954. serverCompression: .deflate
  955. ) { control, _, pair in
  956. try await self.testBidiStreamingCompression(
  957. client: .deflate,
  958. server: .deflate,
  959. control: control,
  960. pair: pair
  961. )
  962. }
  963. }
  964. func testBidiStreamingGzipCompression() async throws {
  965. try await self.forEachTransportPair(
  966. clientCompression: .gzip,
  967. clientEnabledCompression: .gzip,
  968. serverCompression: .gzip
  969. ) { control, _, pair in
  970. try await self.testBidiStreamingCompression(
  971. client: .gzip,
  972. server: .gzip,
  973. control: control,
  974. pair: pair
  975. )
  976. }
  977. }
  978. func testUnaryUnsupportedCompression() async throws {
  979. try await self.forEachTransportPair(
  980. clientEnabledCompression: .all,
  981. serverCompression: .gzip
  982. ) { control, _, pair in
  983. let message = ControlInput.with {
  984. $0.numberOfMessages = 1
  985. $0.payloadParameters = .with {
  986. $0.content = 42
  987. $0.size = 1024
  988. }
  989. }
  990. let request = ClientRequest(message: message)
  991. var options = CallOptions.defaults
  992. options.compression = .deflate
  993. try await control.unary(request: request, options: options) { response in
  994. switch response.accepted {
  995. case .success:
  996. XCTFail("RPC should've been rejected")
  997. case .failure(let error):
  998. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  999. // "identity" may or may not be included, so only test for values which must be present.
  1000. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1001. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1002. }
  1003. }
  1004. }
  1005. }
  1006. func testClientStreamingUnsupportedCompression() async throws {
  1007. try await self.forEachTransportPair(
  1008. clientEnabledCompression: .all,
  1009. serverCompression: .gzip
  1010. ) { control, _, pair in
  1011. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1012. try await writer.write(.noOp)
  1013. }
  1014. var options = CallOptions.defaults
  1015. options.compression = .deflate
  1016. try await control.clientStream(request: request, options: options) { response in
  1017. switch response.accepted {
  1018. case .success:
  1019. XCTFail("RPC should've been rejected")
  1020. case .failure(let error):
  1021. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1022. // "identity" may or may not be included, so only test for values which must be present.
  1023. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1024. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1025. }
  1026. }
  1027. }
  1028. }
  1029. func testServerStreamingUnsupportedCompression() async throws {
  1030. try await self.forEachTransportPair(
  1031. clientEnabledCompression: .all,
  1032. serverCompression: .gzip
  1033. ) { control, _, pair in
  1034. let message = ControlInput.with {
  1035. $0.numberOfMessages = 1
  1036. $0.payloadParameters = .with {
  1037. $0.content = 42
  1038. $0.size = 1024
  1039. }
  1040. }
  1041. let request = ClientRequest(message: message)
  1042. var options = CallOptions.defaults
  1043. options.compression = .deflate
  1044. try await control.serverStream(request: request, options: options) { response in
  1045. switch response.accepted {
  1046. case .success:
  1047. XCTFail("RPC should've been rejected")
  1048. case .failure(let error):
  1049. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1050. // "identity" may or may not be included, so only test for values which must be present.
  1051. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1052. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1053. }
  1054. }
  1055. }
  1056. }
  1057. func testBidiStreamingUnsupportedCompression() async throws {
  1058. try await self.forEachTransportPair(
  1059. clientEnabledCompression: .all,
  1060. serverCompression: .gzip
  1061. ) { control, _, pair in
  1062. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1063. try await writer.write(.noOp)
  1064. }
  1065. var options = CallOptions.defaults
  1066. options.compression = .deflate
  1067. try await control.bidiStream(request: request, options: options) { response in
  1068. switch response.accepted {
  1069. case .success:
  1070. XCTFail("RPC should've been rejected")
  1071. case .failure(let error):
  1072. let acceptEncoding = Array(error.metadata["grpc-accept-encoding"])
  1073. // "identity" may or may not be included, so only test for values which must be present.
  1074. XCTAssertTrue(acceptEncoding.contains("gzip"))
  1075. XCTAssertFalse(acceptEncoding.contains("deflate"))
  1076. }
  1077. }
  1078. }
  1079. }
  1080. func testUnaryTimeoutPropagatedToServer() async throws {
  1081. try await self.forEachTransportPair { control, _, pair in
  1082. let message = ControlInput.with {
  1083. $0.echoMetadataInHeaders = true
  1084. $0.numberOfMessages = 1
  1085. }
  1086. let request = ClientRequest(message: message)
  1087. var options = CallOptions.defaults
  1088. options.timeout = .seconds(10)
  1089. try await control.unary(request: request, options: options) { response in
  1090. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1091. XCTAssertEqual(timeout.count, 1)
  1092. }
  1093. }
  1094. }
  1095. func testClientStreamingTimeoutPropagatedToServer() async throws {
  1096. try await self.forEachTransportPair { control, _, pair in
  1097. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1098. let message = ControlInput.with {
  1099. $0.echoMetadataInHeaders = true
  1100. $0.numberOfMessages = 1
  1101. }
  1102. try await writer.write(message)
  1103. }
  1104. var options = CallOptions.defaults
  1105. options.timeout = .seconds(10)
  1106. try await control.clientStream(request: request, options: options) { response in
  1107. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1108. XCTAssertEqual(timeout.count, 1)
  1109. }
  1110. }
  1111. }
  1112. func testServerStreamingTimeoutPropagatedToServer() async throws {
  1113. try await self.forEachTransportPair { control, _, pair in
  1114. let message = ControlInput.with {
  1115. $0.echoMetadataInHeaders = true
  1116. $0.numberOfMessages = 1
  1117. }
  1118. let request = ClientRequest(message: message)
  1119. var options = CallOptions.defaults
  1120. options.timeout = .seconds(10)
  1121. try await control.serverStream(request: request, options: options) { response in
  1122. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1123. XCTAssertEqual(timeout.count, 1)
  1124. }
  1125. }
  1126. }
  1127. func testBidiStreamingTimeoutPropagatedToServer() async throws {
  1128. try await self.forEachTransportPair { control, _, pair in
  1129. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1130. try await writer.write(.echoMetadata)
  1131. }
  1132. var options = CallOptions.defaults
  1133. options.timeout = .seconds(10)
  1134. try await control.bidiStream(request: request, options: options) { response in
  1135. let timeout = Array(response.metadata["echo-grpc-timeout"])
  1136. XCTAssertEqual(timeout.count, 1)
  1137. }
  1138. }
  1139. }
  1140. private static let httpToStatusCodePairs: [(Int, RPCError.Code)] = [
  1141. // See https://github.com/grpc/grpc/blob/7f664c69b2a636386fbf95c16bc78c559734ce0f/doc/http-grpc-status-mapping.md
  1142. (400, .internalError),
  1143. (401, .unauthenticated),
  1144. (403, .permissionDenied),
  1145. (404, .unimplemented),
  1146. (418, .unknown),
  1147. (429, .unavailable),
  1148. (502, .unavailable),
  1149. (503, .unavailable),
  1150. (504, .unavailable),
  1151. (504, .unavailable),
  1152. ]
  1153. func testUnaryAgainstNonGRPCServer() async throws {
  1154. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1155. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1156. // Tell the server what to respond with.
  1157. let metadata: Metadata = ["response-status": "\(httpCode)"]
  1158. try await control.unary(
  1159. request: ClientRequest(message: .noOp, metadata: metadata)
  1160. ) { response in
  1161. switch response.accepted {
  1162. case .success:
  1163. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1164. case .failure(let error):
  1165. XCTAssertEqual(error.code, expectedStatus)
  1166. }
  1167. }
  1168. }
  1169. }
  1170. }
  1171. func testClientStreamingAgainstNonGRPCServer() async throws {
  1172. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1173. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1174. // Tell the server what to respond with.
  1175. let request = StreamingClientRequest(
  1176. of: ControlInput.self,
  1177. metadata: ["response-status": "\(httpCode)"]
  1178. ) { _ in
  1179. }
  1180. try await control.clientStream(request: request) { response in
  1181. switch response.accepted {
  1182. case .success:
  1183. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1184. case .failure(let error):
  1185. XCTAssertEqual(error.code, expectedStatus)
  1186. }
  1187. }
  1188. }
  1189. }
  1190. }
  1191. func testServerStreamingAgainstNonGRPCServer() async throws {
  1192. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1193. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1194. // Tell the server what to respond with.
  1195. let metadata: Metadata = ["response-status": "\(httpCode)"]
  1196. try await control.serverStream(
  1197. request: ClientRequest(message: .noOp, metadata: metadata)
  1198. ) { response in
  1199. switch response.accepted {
  1200. case .success:
  1201. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1202. case .failure(let error):
  1203. XCTAssertEqual(error.code, expectedStatus)
  1204. }
  1205. }
  1206. }
  1207. }
  1208. }
  1209. func testBidiStreamingAgainstNonGRPCServer() async throws {
  1210. try await self.forEachClientAndHTTPStatusCodeServer { control, kind in
  1211. for (httpCode, expectedStatus) in Self.httpToStatusCodePairs {
  1212. // Tell the server what to respond with.
  1213. let request = StreamingClientRequest(
  1214. of: ControlInput.self,
  1215. metadata: ["response-status": "\(httpCode)"]
  1216. ) { _ in
  1217. }
  1218. try await control.bidiStream(request: request) { response in
  1219. switch response.accepted {
  1220. case .success:
  1221. XCTFail("RPC should have failed with '\(expectedStatus)'")
  1222. case .failure(let error):
  1223. XCTAssertEqual(error.code, expectedStatus)
  1224. }
  1225. }
  1226. }
  1227. }
  1228. }
  1229. func testUnaryScheme() async throws {
  1230. try await self.forEachTransportPair { control, _, pair in
  1231. let input = ControlInput.with {
  1232. $0.echoMetadataInHeaders = true
  1233. $0.numberOfMessages = 1
  1234. }
  1235. let request = ClientRequest(message: input)
  1236. try await control.unary(request: request) { response in
  1237. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1238. }
  1239. }
  1240. }
  1241. func testServerStreamingScheme() async throws {
  1242. try await self.forEachTransportPair { control, _, pair in
  1243. let input = ControlInput.with {
  1244. $0.echoMetadataInHeaders = true
  1245. $0.numberOfMessages = 1
  1246. }
  1247. let request = ClientRequest(message: input)
  1248. try await control.serverStream(request: request) { response in
  1249. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1250. }
  1251. }
  1252. }
  1253. func testClientStreamingScheme() async throws {
  1254. try await self.forEachTransportPair { control, _, pair in
  1255. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1256. let input = ControlInput.with {
  1257. $0.echoMetadataInHeaders = true
  1258. $0.numberOfMessages = 1
  1259. }
  1260. try await writer.write(input)
  1261. }
  1262. try await control.clientStream(request: request) { response in
  1263. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1264. }
  1265. }
  1266. }
  1267. func testBidiStreamingScheme() async throws {
  1268. try await self.forEachTransportPair { control, _, pair in
  1269. let request = StreamingClientRequest(of: ControlInput.self) { writer in
  1270. let input = ControlInput.with {
  1271. $0.echoMetadataInHeaders = true
  1272. $0.numberOfMessages = 1
  1273. }
  1274. try await writer.write(input)
  1275. }
  1276. try await control.bidiStream(request: request) { response in
  1277. XCTAssertEqual(Array(response.metadata["echo-scheme"]), ["http"])
  1278. }
  1279. }
  1280. }
  1281. func testServerCancellation() async throws {
  1282. for kind in [CancellationKind.awaitCancelled, .withCancellationHandler] {
  1283. try await self.forEachTransportPair { control, server, pair in
  1284. let request = ClientRequest(message: kind)
  1285. try await control.waitForCancellation(request: request) { response in
  1286. // Shutdown the client so that it doesn't attempt to reconnect when the server closes.
  1287. control.client.beginGracefulShutdown()
  1288. // Shutdown the server to cancel the RPC.
  1289. server.beginGracefulShutdown()
  1290. // The RPC should complete without any error or response.
  1291. let responses = try await response.messages.reduce(into: []) { $0.append($1) }
  1292. XCTAssert(responses.isEmpty)
  1293. }
  1294. }
  1295. }
  1296. }
  1297. func testUppercaseClientMetadataKey() async throws {
  1298. try await self.forEachTransportPair { control, _, _ in
  1299. let request = ClientRequest<ControlInput>(
  1300. message: .with {
  1301. $0.echoMetadataInHeaders = true
  1302. $0.numberOfMessages = 1
  1303. },
  1304. metadata: ["UPPERCASE-KEY": "value"]
  1305. )
  1306. try await control.unary(request: request) { response in
  1307. // Keys will be lowercase before being sent over the wire.
  1308. XCTAssertEqual(Array(response.metadata["echo-uppercase-key"]), ["value"])
  1309. }
  1310. }
  1311. }
  1312. func testUppercaseServerMetadataKey() async throws {
  1313. try await self.forEachTransportPair { control, _, _ in
  1314. let request = ClientRequest<ControlInput>(
  1315. message: .with {
  1316. $0.initialMetadataToAdd["UPPERCASE-KEY"] = "initial"
  1317. $0.trailingMetadataToAdd["UPPERCASE-KEY"] = "trailing"
  1318. $0.numberOfMessages = 1
  1319. }
  1320. )
  1321. try await control.unary(request: request) { response in
  1322. XCTAssertEqual(Array(response.metadata["uppercase-key"]), ["initial"])
  1323. XCTAssertEqual(Array(response.trailingMetadata["uppercase-key"]), ["trailing"])
  1324. }
  1325. }
  1326. }
  1327. private func checkAuthority(client: GRPCClient, expected: String) async throws {
  1328. let control = ControlClient(wrapping: client)
  1329. let input = ControlInput.with {
  1330. $0.echoMetadataInHeaders = true
  1331. $0.echoMetadataInTrailers = true
  1332. $0.numberOfMessages = 1
  1333. $0.payloadParameters = .with {
  1334. $0.content = 0
  1335. $0.size = 1024
  1336. }
  1337. }
  1338. try await control.unary(request: ClientRequest(message: input)) { response in
  1339. let initial = response.metadata
  1340. XCTAssertEqual(Array(initial["echo-authority"]), [.string(expected)])
  1341. }
  1342. }
  1343. private func testAuthority(
  1344. serverAddress: SocketAddress,
  1345. authorityOverride override: String? = nil,
  1346. clientTarget: (SocketAddress) -> any ResolvableTarget,
  1347. expectedAuthority: (SocketAddress) -> String
  1348. ) async throws {
  1349. try await withGRPCServer(
  1350. transport: .http2NIOPosix(
  1351. address: serverAddress,
  1352. config: .defaults(transportSecurity: .plaintext)
  1353. ),
  1354. services: [ControlService()]
  1355. ) { server in
  1356. guard let listeningAddress = try await server.listeningAddress else {
  1357. XCTFail("No listening address")
  1358. return
  1359. }
  1360. let target = clientTarget(listeningAddress)
  1361. try await withGRPCClient(
  1362. transport: .http2NIOPosix(
  1363. target: target,
  1364. config: .defaults(transportSecurity: .plaintext) {
  1365. $0.http2.authority = override
  1366. }
  1367. )
  1368. ) { client in
  1369. try await self.checkAuthority(client: client, expected: expectedAuthority(listeningAddress))
  1370. }
  1371. }
  1372. }
  1373. func testAuthorityDNS() async throws {
  1374. try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
  1375. return .dns(host: "localhost", port: address.ipv4!.port)
  1376. } expectedAuthority: { address in
  1377. return "localhost:\(address.ipv4!.port)"
  1378. }
  1379. }
  1380. func testOverrideAuthorityDNS() async throws {
  1381. try await self.testAuthority(
  1382. serverAddress: .ipv4(host: "127.0.0.1", port: 0),
  1383. authorityOverride: "respect-my-authority"
  1384. ) { address in
  1385. return .dns(host: "localhost", port: address.ipv4!.port)
  1386. } expectedAuthority: { _ in
  1387. return "respect-my-authority"
  1388. }
  1389. }
  1390. func testAuthorityIPv4() async throws {
  1391. try await self.testAuthority(serverAddress: .ipv4(host: "127.0.0.1", port: 0)) { address in
  1392. return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
  1393. } expectedAuthority: { address in
  1394. return "127.0.0.1:\(address.ipv4!.port)"
  1395. }
  1396. }
  1397. func testOverrideAuthorityIPv4() async throws {
  1398. try await self.testAuthority(
  1399. serverAddress: .ipv4(host: "127.0.0.1", port: 0),
  1400. authorityOverride: "respect-my-authority"
  1401. ) { address in
  1402. return .ipv4(host: "127.0.0.1", port: address.ipv4!.port)
  1403. } expectedAuthority: { _ in
  1404. return "respect-my-authority"
  1405. }
  1406. }
  1407. func testAuthorityIPv6() async throws {
  1408. try await self.testAuthority(serverAddress: .ipv6(host: "::1", port: 0)) { address in
  1409. return .ipv6(host: "::1", port: address.ipv6!.port)
  1410. } expectedAuthority: { address in
  1411. return "[::1]:\(address.ipv6!.port)"
  1412. }
  1413. }
  1414. func testOverrideAuthorityIPv6() async throws {
  1415. try await self.testAuthority(
  1416. serverAddress: .ipv6(host: "::1", port: 0),
  1417. authorityOverride: "respect-my-authority"
  1418. ) { address in
  1419. return .ipv6(host: "::1", port: address.ipv6!.port)
  1420. } expectedAuthority: { _ in
  1421. return "respect-my-authority"
  1422. }
  1423. }
  1424. func testAuthorityUDS() async throws {
  1425. let path = "test-authority-uds"
  1426. try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
  1427. return .unixDomainSocket(path: path)
  1428. } expectedAuthority: { _ in
  1429. return path
  1430. }
  1431. }
  1432. func testAuthorityLocalUDSOverride() async throws {
  1433. let path = "test-authority-local-uds-override"
  1434. try await self.testAuthority(serverAddress: .unixDomainSocket(path: path)) { address in
  1435. return .unixDomainSocket(path: path, authority: "respect-my-authority")
  1436. } expectedAuthority: { _ in
  1437. return "respect-my-authority"
  1438. }
  1439. }
  1440. func testOverrideAuthorityUDS() async throws {
  1441. let path = "test-override-authority-uds"
  1442. try await self.testAuthority(
  1443. serverAddress: .unixDomainSocket(path: path),
  1444. authorityOverride: "respect-my-authority"
  1445. ) { _ in
  1446. return .unixDomainSocket(path: path, authority: "should-be-ignored")
  1447. } expectedAuthority: { _ in
  1448. return "respect-my-authority"
  1449. }
  1450. }
  1451. func testPeerInfoIPv4() async throws {
  1452. try await self.forEachTransportPair(
  1453. serverAddress: .ipv4(host: "127.0.0.1", port: 0)
  1454. ) { control, _, _ in
  1455. let peerInfo = try await control.peerInfo()
  1456. let matches = peerInfo.matches(of: /ipv4:127.0.0.1:\d+/)
  1457. XCTAssertNotNil(matches)
  1458. }
  1459. }
  1460. func testPeerInfoIPv6() async throws {
  1461. try await self.forEachTransportPair(
  1462. serverAddress: .ipv6(host: "::1", port: 0)
  1463. ) { control, _, _ in
  1464. let peerInfo = try await control.peerInfo()
  1465. let matches = peerInfo.matches(of: /ipv6:[::1]:\d+/)
  1466. XCTAssertNotNil(matches)
  1467. }
  1468. }
  1469. func testPeerInfoUDS() async throws {
  1470. let path = "peer-info-uds"
  1471. try await self.forEachTransportPair(
  1472. serverAddress: .unixDomainSocket(path: path)
  1473. ) { control, _, _ in
  1474. let peerInfo = try await control.peerInfo()
  1475. XCTAssertEqual(peerInfo, "unix:peer-info-uds")
  1476. }
  1477. }
  1478. }
  1479. extension [HTTP2TransportTests.Transport] {
  1480. static let supported = [
  1481. HTTP2TransportTests.Transport(server: .posix, client: .posix),
  1482. HTTP2TransportTests.Transport(server: .niots, client: .niots),
  1483. HTTP2TransportTests.Transport(server: .niots, client: .posix),
  1484. HTTP2TransportTests.Transport(server: .posix, client: .niots),
  1485. ]
  1486. }
  1487. extension ControlInput {
  1488. fileprivate static let echoMetadata = Self.with {
  1489. $0.echoMetadataInHeaders = true
  1490. }
  1491. fileprivate static let noOp = Self()
  1492. fileprivate static func messages(
  1493. _ numberOfMessages: Int,
  1494. repeating: UInt8,
  1495. count: Int
  1496. ) -> Self {
  1497. return Self.with {
  1498. $0.numberOfMessages = numberOfMessages
  1499. $0.payloadParameters = .with {
  1500. $0.content = repeating
  1501. $0.size = count
  1502. }
  1503. }
  1504. }
  1505. fileprivate static func status(
  1506. code: GRPCCore.Status.Code,
  1507. message: String,
  1508. echoMetadata: Bool
  1509. ) -> Self {
  1510. return Self.with {
  1511. $0.echoMetadataInTrailers = echoMetadata
  1512. $0.status = .with {
  1513. $0.code = code
  1514. $0.message = message
  1515. }
  1516. }
  1517. }
  1518. fileprivate static func trailersOnly(
  1519. code: GRPCCore.Status.Code,
  1520. message: String,
  1521. echoMetadata: Bool
  1522. ) -> Self {
  1523. return Self.with {
  1524. $0.echoMetadataInTrailers = echoMetadata
  1525. $0.isTrailersOnly = true
  1526. $0.status = .with {
  1527. $0.code = code
  1528. $0.message = message
  1529. }
  1530. }
  1531. }
  1532. }
  1533. extension CompressionAlgorithm {
  1534. var name: String {
  1535. switch self {
  1536. case .deflate:
  1537. return "deflate"
  1538. case .gzip:
  1539. return "gzip"
  1540. case .none:
  1541. return "identity"
  1542. default:
  1543. return ""
  1544. }
  1545. }
  1546. }