GRPCChannelTests.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCHTTP2Core
  18. import NIOHTTP2
  19. import NIOPosix
  20. import XCTest
  21. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  22. final class GRPCChannelTests: XCTestCase {
  23. func testDefaultServiceConfig() throws {
  24. var serviceConfig = ServiceConfig()
  25. serviceConfig.loadBalancingConfig = [.roundRobin]
  26. serviceConfig.methodConfig = [MethodConfig(names: [MethodConfig.Name(.echoGet)])]
  27. serviceConfig.retryThrottling = try ServiceConfig.RetryThrottling(
  28. maxTokens: 100,
  29. tokenRatio: 0.1
  30. )
  31. let channel = GRPCChannel(
  32. resolver: .static(endpoints: []),
  33. connector: .never,
  34. config: .defaults,
  35. defaultServiceConfig: serviceConfig
  36. )
  37. XCTAssertNotNil(channel.configuration(forMethod: .echoGet))
  38. XCTAssertNil(channel.configuration(forMethod: .echoUpdate))
  39. let throttle = try XCTUnwrap(channel.retryThrottle)
  40. XCTAssertEqual(throttle.maximumTokens, 100)
  41. XCTAssertEqual(throttle.tokenRatio, 0.1)
  42. }
  43. func testServiceConfigFromResolver() async throws {
  44. // Verify that service config from the resolver takes precedence over the default service
  45. // config. This is done indirectly by checking method config and retry throttle config.
  46. // Create a service config to provide via the resolver.
  47. var serviceConfig = ServiceConfig()
  48. serviceConfig.loadBalancingConfig = [.roundRobin]
  49. serviceConfig.methodConfig = [MethodConfig(names: [MethodConfig.Name(.echoGet)])]
  50. serviceConfig.retryThrottling = try ServiceConfig.RetryThrottling(
  51. maxTokens: 100,
  52. tokenRatio: 0.1
  53. )
  54. // Need a server to connect to, no RPCs will be created though.
  55. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  56. let address = try await server.bind()
  57. let channel = GRPCChannel(
  58. resolver: .static(endpoints: [Endpoint(addresses: [address])], serviceConfig: serviceConfig),
  59. connector: .posix(),
  60. config: .defaults,
  61. defaultServiceConfig: ServiceConfig()
  62. )
  63. // Not resolved yet so the default (empty) service config is used.
  64. XCTAssertNil(channel.configuration(forMethod: .echoGet))
  65. XCTAssertNil(channel.configuration(forMethod: .echoUpdate))
  66. XCTAssertNil(channel.retryThrottle)
  67. try await withThrowingDiscardingTaskGroup { group in
  68. group.addTask {
  69. try await server.run(.never)
  70. }
  71. group.addTask {
  72. await channel.connect()
  73. }
  74. for await event in channel.connectivityState {
  75. switch event {
  76. case .ready:
  77. // When the channel is ready it must have the service config from the resolver.
  78. XCTAssertNotNil(channel.configuration(forMethod: .echoGet))
  79. XCTAssertNil(channel.configuration(forMethod: .echoUpdate))
  80. let throttle = try XCTUnwrap(channel.retryThrottle)
  81. XCTAssertEqual(throttle.maximumTokens, 100)
  82. XCTAssertEqual(throttle.tokenRatio, 0.1)
  83. // Now close.
  84. channel.close()
  85. default:
  86. ()
  87. }
  88. }
  89. group.cancelAll()
  90. }
  91. }
  92. func testServiceConfigFromResolverAfterUpdate() async throws {
  93. // Verify that the channel uses service config from the resolver and that it uses the latest
  94. // version provided by the resolver. This is done indirectly by checking method config and retry
  95. // throttle config.
  96. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  97. let address = try await server.bind()
  98. let (resolver, continuation) = NameResolver.dynamic(updateMode: .push)
  99. let channel = GRPCChannel(
  100. resolver: resolver,
  101. connector: .posix(),
  102. config: .defaults,
  103. defaultServiceConfig: ServiceConfig()
  104. )
  105. // Not resolved yet so the default (empty) service config is used.
  106. XCTAssertNil(channel.configuration(forMethod: .echoGet))
  107. XCTAssertNil(channel.configuration(forMethod: .echoUpdate))
  108. XCTAssertNil(channel.retryThrottle)
  109. // Yield the first address list and service config.
  110. var serviceConfig = ServiceConfig()
  111. serviceConfig.loadBalancingConfig = [.roundRobin]
  112. serviceConfig.methodConfig = [MethodConfig(names: [MethodConfig.Name(.echoGet)])]
  113. serviceConfig.retryThrottling = try ServiceConfig.RetryThrottling(
  114. maxTokens: 100,
  115. tokenRatio: 0.1
  116. )
  117. let resolutionResult = NameResolutionResult(
  118. endpoints: [Endpoint(address)],
  119. serviceConfig: .success(serviceConfig)
  120. )
  121. continuation.yield(resolutionResult)
  122. try await withThrowingDiscardingTaskGroup { group in
  123. group.addTask {
  124. try await server.run(.never)
  125. }
  126. group.addTask {
  127. await channel.connect()
  128. }
  129. for await event in channel.connectivityState {
  130. switch event {
  131. case .ready:
  132. // When the channel it must have the service config from the resolver.
  133. XCTAssertNotNil(channel.configuration(forMethod: .echoGet))
  134. XCTAssertNil(channel.configuration(forMethod: .echoUpdate))
  135. let throttle = try XCTUnwrap(channel.retryThrottle)
  136. XCTAssertEqual(throttle.maximumTokens, 100)
  137. XCTAssertEqual(throttle.tokenRatio, 0.1)
  138. // Now yield a new service config with the same addresses.
  139. var resolutionResult = resolutionResult
  140. serviceConfig.methodConfig = [MethodConfig(names: [MethodConfig.Name(.echoUpdate)])]
  141. serviceConfig.retryThrottling = nil
  142. resolutionResult.serviceConfig = .success(serviceConfig)
  143. continuation.yield(resolutionResult)
  144. // This should be propagated quickly.
  145. try await XCTPoll(every: .milliseconds(10)) {
  146. let noConfigForGet = channel.configuration(forMethod: .echoGet) == nil
  147. let configForUpdate = channel.configuration(forMethod: .echoUpdate) != nil
  148. let noThrottle = channel.retryThrottle == nil
  149. return noConfigForGet && configForUpdate && noThrottle
  150. }
  151. channel.close()
  152. default:
  153. ()
  154. }
  155. }
  156. group.cancelAll()
  157. }
  158. }
  159. func testPushBasedResolutionUpdates() async throws {
  160. // Verify that the channel responds to name resolution changes which are pushed into
  161. // the resolver. Do this by starting two servers and only making the address of one available
  162. // via the resolver at a time. Server identity is provided via metadata in the RPC.
  163. // Start a few servers.
  164. let server1 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  165. let address1 = try await server1.bind()
  166. let server2 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  167. let address2 = try await server2.bind()
  168. // Setup a resolver and push some changes into it.
  169. let (resolver, continuation) = NameResolver.dynamic(updateMode: .push)
  170. let resolution1 = NameResolutionResult(endpoints: [Endpoint(address1)], serviceConfig: nil)
  171. continuation.yield(resolution1)
  172. var serviceConfig = ServiceConfig()
  173. serviceConfig.loadBalancingConfig = [.roundRobin]
  174. let channel = GRPCChannel(
  175. resolver: resolver,
  176. connector: .posix(),
  177. config: .defaults,
  178. defaultServiceConfig: serviceConfig
  179. )
  180. try await withThrowingDiscardingTaskGroup { group in
  181. // Servers respond with their own address in the trailing metadata.
  182. for (server, address) in [(server1, address1), (server2, address2)] {
  183. group.addTask {
  184. try await server.run { inbound, outbound in
  185. let status = Status(code: .ok, message: "")
  186. let metadata: Metadata = ["server-addr": "\(address)"]
  187. try await outbound.write(.status(status, metadata))
  188. outbound.finish()
  189. }
  190. }
  191. }
  192. group.addTask {
  193. await channel.connect()
  194. }
  195. // The stream will be queued until the channel is ready.
  196. let serverAddress1 = try await channel.serverAddress()
  197. XCTAssertEqual(serverAddress1, "\(address1)")
  198. XCTAssertEqual(server1.clients.count, 1)
  199. XCTAssertEqual(server2.clients.count, 0)
  200. // Yield the second address. Because this happens asynchronously there's no guarantee that
  201. // the next stream will be made against the same server, so poll until the servers have the
  202. // appropriate connections.
  203. let resolution2 = NameResolutionResult(endpoints: [Endpoint(address2)], serviceConfig: nil)
  204. continuation.yield(resolution2)
  205. try await XCTPoll(every: .milliseconds(10)) {
  206. server1.clients.count == 0 && server2.clients.count == 1
  207. }
  208. let serverAddress2 = try await channel.serverAddress()
  209. XCTAssertEqual(serverAddress2, "\(address2)")
  210. group.cancelAll()
  211. }
  212. }
  213. func testPullBasedResolutionUpdates() async throws {
  214. // Verify that the channel responds to name resolution changes which are pulled because a
  215. // subchannel asked the channel to re-resolve. Do this by starting two servers and changing
  216. // which is available via resolution updates. Server identity is provided via metadata in
  217. // the RPC.
  218. // Start a few servers.
  219. let server1 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  220. let address1 = try await server1.bind()
  221. let server2 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  222. let address2 = try await server2.bind()
  223. // Setup a resolve which we push changes into.
  224. let (resolver, continuation) = NameResolver.dynamic(updateMode: .pull)
  225. // Yield the addresses.
  226. for address in [address1, address2] {
  227. let resolution = NameResolutionResult(endpoints: [Endpoint(address)], serviceConfig: nil)
  228. continuation.yield(resolution)
  229. }
  230. var serviceConfig = ServiceConfig()
  231. serviceConfig.loadBalancingConfig = [.roundRobin]
  232. let channel = GRPCChannel(
  233. resolver: resolver,
  234. connector: .posix(),
  235. config: .defaults,
  236. defaultServiceConfig: serviceConfig
  237. )
  238. try await withThrowingDiscardingTaskGroup { group in
  239. // Servers respond with their own address in the trailing metadata.
  240. for (server, address) in [(server1, address1), (server2, address2)] {
  241. group.addTask {
  242. try await server.run { inbound, outbound in
  243. let status = Status(code: .ok, message: "")
  244. let metadata: Metadata = ["server-addr": "\(address)"]
  245. try await outbound.write(.status(status, metadata))
  246. outbound.finish()
  247. }
  248. }
  249. }
  250. group.addTask {
  251. await channel.connect()
  252. }
  253. // The stream will be queued until the channel is ready.
  254. let serverAddress1 = try await channel.serverAddress()
  255. XCTAssertEqual(serverAddress1, "\(address1)")
  256. XCTAssertEqual(server1.clients.count, 1)
  257. XCTAssertEqual(server2.clients.count, 0)
  258. // Tell the first server to GOAWAY. This will cause the subchannel to re-resolve.
  259. let server1Client = try XCTUnwrap(server1.clients.first)
  260. let goAway = HTTP2Frame(
  261. streamID: .rootStream,
  262. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  263. )
  264. try await server1Client.writeAndFlush(goAway)
  265. // Poll until the first client drops, addresses are re-resolved, and a connection is
  266. // established to server2.
  267. try await XCTPoll(every: .milliseconds(10)) {
  268. server1.clients.count == 0 && server2.clients.count == 1
  269. }
  270. let serverAddress2 = try await channel.serverAddress()
  271. XCTAssertEqual(serverAddress2, "\(address2)")
  272. group.cancelAll()
  273. }
  274. }
  275. func testCloseWhenRPCsAreInProgress() async throws {
  276. // Verify that closing the channel while there are RPCs in progress allows the RPCs to finish
  277. // gracefully.
  278. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  279. let address = try await server.bind()
  280. try await withThrowingDiscardingTaskGroup { group in
  281. group.addTask {
  282. try await server.run(.echo)
  283. }
  284. var serviceConfig = ServiceConfig()
  285. serviceConfig.loadBalancingConfig = [.roundRobin]
  286. let channel = GRPCChannel(
  287. resolver: .static(endpoints: [Endpoint(address)]),
  288. connector: .posix(),
  289. config: .defaults,
  290. defaultServiceConfig: serviceConfig
  291. )
  292. group.addTask {
  293. await channel.connect()
  294. }
  295. try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
  296. try await stream.outbound.write(.metadata([:]))
  297. var iterator = stream.inbound.makeAsyncIterator()
  298. let part1 = try await iterator.next()
  299. switch part1 {
  300. case .metadata:
  301. // Got metadata, close the channel.
  302. channel.close()
  303. case .message, .status, .none:
  304. XCTFail("Expected metadata, got \(String(describing: part1))")
  305. }
  306. for await state in channel.connectivityState {
  307. switch state {
  308. case .shutdown:
  309. // Happens when shutting-down has been initiated, so finish the RPC.
  310. stream.outbound.finish()
  311. let part2 = try await iterator.next()
  312. switch part2 {
  313. case .status(let status, _):
  314. XCTAssertEqual(status.code, .ok)
  315. case .metadata, .message, .none:
  316. XCTFail("Expected status, got \(String(describing: part2))")
  317. }
  318. default:
  319. ()
  320. }
  321. }
  322. }
  323. group.cancelAll()
  324. }
  325. }
  326. func testQueueRequestsWhileNotReady() async throws {
  327. // Verify that requests are queued until the channel becomes ready. As creating streams
  328. // will race with the channel becoming ready, we add numerous tasks to the task group which
  329. // each create a stream before making the server address known to the channel via the resolver.
  330. // This isn't perfect as the resolution _could_ happen before attempting to create all streams
  331. // although this is unlikely.
  332. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  333. let address = try await server.bind()
  334. let (resolver, continuation) = NameResolver.dynamic(updateMode: .push)
  335. var serviceConfig = ServiceConfig()
  336. serviceConfig.loadBalancingConfig = [.roundRobin]
  337. let channel = GRPCChannel(
  338. resolver: resolver,
  339. connector: .posix(),
  340. config: .defaults,
  341. defaultServiceConfig: serviceConfig
  342. )
  343. enum Subtask { case rpc, other }
  344. try await withThrowingTaskGroup(of: Subtask.self) { group in
  345. // Run the server.
  346. group.addTask {
  347. try await server.run { inbound, outbound in
  348. for try await part in inbound {
  349. switch part {
  350. case .metadata:
  351. try await outbound.write(.metadata([:]))
  352. case .message(let bytes):
  353. try await outbound.write(.message(bytes))
  354. }
  355. }
  356. let status = Status(code: .ok, message: "")
  357. try await outbound.write(.status(status, [:]))
  358. outbound.finish()
  359. }
  360. return .other
  361. }
  362. group.addTask {
  363. await channel.connect()
  364. return .other
  365. }
  366. // Start a bunch of requests. These won't start until an address is yielded, they should
  367. // be queued though.
  368. for _ in 1 ... 100 {
  369. group.addTask {
  370. try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
  371. try await stream.outbound.write(.metadata([:]))
  372. stream.outbound.finish()
  373. for try await part in stream.inbound {
  374. switch part {
  375. case .metadata, .message:
  376. ()
  377. case .status(let status, _):
  378. XCTAssertEqual(status.code, .ok)
  379. }
  380. }
  381. }
  382. return .rpc
  383. }
  384. }
  385. // At least some of the RPCs should have been queued by now.
  386. let resolution = NameResolutionResult(endpoints: [Endpoint(address)], serviceConfig: nil)
  387. continuation.yield(resolution)
  388. var outstandingRPCs = 100
  389. for try await subtask in group {
  390. switch subtask {
  391. case .rpc:
  392. outstandingRPCs -= 1
  393. // All RPCs done, close the channel and cancel the group to stop the server.
  394. if outstandingRPCs == 0 {
  395. channel.close()
  396. group.cancelAll()
  397. }
  398. case .other:
  399. ()
  400. }
  401. }
  402. }
  403. }
  404. func testQueueRequestsFailFast() async throws {
  405. // Verifies that if 'waitsForReady' is 'false', that queued requests are failed when there is
  406. // a transient failure. The transient failure is triggered by attempting to connect to a
  407. // non-existent server.
  408. let (resolver, continuation) = NameResolver.dynamic(updateMode: .push)
  409. var serviceConfig = ServiceConfig()
  410. serviceConfig.loadBalancingConfig = [.roundRobin]
  411. let channel = GRPCChannel(
  412. resolver: resolver,
  413. connector: .posix(),
  414. config: .defaults,
  415. defaultServiceConfig: serviceConfig
  416. )
  417. enum Subtask { case rpc, other }
  418. try await withThrowingTaskGroup(of: Subtask.self) { group in
  419. group.addTask {
  420. await channel.connect()
  421. return .other
  422. }
  423. for _ in 1 ... 100 {
  424. group.addTask {
  425. var options = CallOptions.defaults
  426. options.waitForReady = false
  427. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  428. try await channel.withStream(descriptor: .echoGet, options: options) { _ in
  429. XCTFail("Unexpected stream")
  430. }
  431. } errorHandler: { error in
  432. XCTAssertEqual(error.code, .unavailable)
  433. }
  434. return .rpc
  435. }
  436. }
  437. // At least some of the RPCs should have been queued by now.
  438. let resolution = NameResolutionResult(
  439. endpoints: [Endpoint(.unixDomainSocket(path: "/test-queue-requests-fail-fast"))],
  440. serviceConfig: nil
  441. )
  442. continuation.yield(resolution)
  443. var outstandingRPCs = 100
  444. for try await subtask in group {
  445. switch subtask {
  446. case .rpc:
  447. outstandingRPCs -= 1
  448. // All RPCs done, close the channel and cancel the group to stop the server.
  449. if outstandingRPCs == 0 {
  450. channel.close()
  451. group.cancelAll()
  452. }
  453. case .other:
  454. ()
  455. }
  456. }
  457. }
  458. }
  459. func testLoadBalancerChangingFromRoundRobinToPickFirst() async throws {
  460. // The test will push different configs to the resolver, first a round-robin LB, then a
  461. // pick-first LB.
  462. let (resolver, continuation) = NameResolver.dynamic(updateMode: .push)
  463. let channel = GRPCChannel(
  464. resolver: resolver,
  465. connector: .posix(),
  466. config: .defaults,
  467. defaultServiceConfig: ServiceConfig()
  468. )
  469. // Start a few servers.
  470. let server1 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  471. let address1 = try await server1.bind()
  472. let server2 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  473. let address2 = try await server2.bind()
  474. let server3 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  475. let address3 = try await server3.bind()
  476. try await withThrowingTaskGroup(of: Void.self) { group in
  477. // Run the servers, no RPCs will be run against them.
  478. for server in [server1, server2, server3] {
  479. group.addTask {
  480. try await server.run(.never)
  481. }
  482. }
  483. group.addTask {
  484. await channel.connect()
  485. }
  486. for await event in channel.connectivityState {
  487. switch event {
  488. case .idle:
  489. let endpoints = [address1, address2].map { Endpoint(addresses: [$0]) }
  490. var serviceConfig = ServiceConfig()
  491. serviceConfig.loadBalancingConfig = [.roundRobin]
  492. let resolutionResult = NameResolutionResult(
  493. endpoints: endpoints,
  494. serviceConfig: .success(serviceConfig)
  495. )
  496. // Push the first resolution result which uses round robin. This will result in the
  497. // channel becoming ready.
  498. continuation.yield(resolutionResult)
  499. case .ready:
  500. // Channel is ready, server 1 and 2 should have clients shortly.
  501. try await XCTPoll(every: .milliseconds(10)) {
  502. server1.clients.count == 1 && server2.clients.count == 1 && server3.clients.count == 0
  503. }
  504. // Both subchannels are ready, prepare and yield an update to the resolver.
  505. var serviceConfig = ServiceConfig()
  506. serviceConfig.loadBalancingConfig = [.pickFirst(shuffleAddressList: false)]
  507. let resolutionResult = NameResolutionResult(
  508. endpoints: [Endpoint(addresses: [address3])],
  509. serviceConfig: .success(serviceConfig)
  510. )
  511. continuation.yield(resolutionResult)
  512. // Only server 3 should have a connection.
  513. try await XCTPoll(every: .milliseconds(10)) {
  514. server1.clients.count == 0 && server2.clients.count == 0 && server3.clients.count == 1
  515. }
  516. channel.close()
  517. case .shutdown:
  518. group.cancelAll()
  519. default:
  520. ()
  521. }
  522. }
  523. }
  524. }
  525. func testPickFirstShufflingAddressList() async throws {
  526. // This test checks that the pick first load-balancer has its address list shuffled. We can't
  527. // assert this deterministically, so instead we'll run an experiment a number of times. Each
  528. // round will create N servers and provide them as endpoints to the pick-first load balancer.
  529. // The channel will establish a connection to one of the servers and its identity will be noted.
  530. let numberOfRounds = 100
  531. let numberOfServers = 2
  532. let servers = (0 ..< numberOfServers).map { _ in
  533. TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  534. }
  535. var addresses = [SocketAddress]()
  536. for server in servers {
  537. let address = try await server.bind()
  538. addresses.append(address)
  539. }
  540. let endpoint = Endpoint(addresses: addresses)
  541. var counts = Array(repeating: 0, count: addresses.count)
  542. // Supply service config on init, not via the load-balancer.
  543. var serviceConfig = ServiceConfig()
  544. serviceConfig.loadBalancingConfig = [.pickFirst(shuffleAddressList: true)]
  545. try await withThrowingDiscardingTaskGroup { group in
  546. // Run the servers.
  547. for server in servers {
  548. group.addTask {
  549. try await server.run(.never)
  550. }
  551. }
  552. // Run the experiment.
  553. for _ in 0 ..< numberOfRounds {
  554. let channel = GRPCChannel(
  555. resolver: .static(endpoints: [endpoint]),
  556. connector: .posix(),
  557. config: .defaults,
  558. defaultServiceConfig: serviceConfig
  559. )
  560. group.addTask {
  561. await channel.connect()
  562. }
  563. for await state in channel.connectivityState {
  564. switch state {
  565. case .ready:
  566. for index in servers.indices {
  567. if servers[index].clients.count == 1 {
  568. counts[index] += 1
  569. break
  570. }
  571. }
  572. channel.close()
  573. default:
  574. ()
  575. }
  576. }
  577. }
  578. // Stop the servers.
  579. group.cancelAll()
  580. }
  581. // The address list is shuffled, so there's no guarantee how many times we'll hit each server.
  582. // Assert that the minimum a server should be hit is 10% of the time.
  583. let expected = Double(numberOfRounds) / Double(numberOfServers)
  584. let minimum = expected * 0.1
  585. XCTAssert(counts.allSatisfy({ Double($0) >= minimum }), "\(counts)")
  586. }
  587. func testPickFirstIsFallbackPolicy() async throws {
  588. // Start a few servers.
  589. let server1 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  590. let address1 = try await server1.bind()
  591. let server2 = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  592. let address2 = try await server2.bind()
  593. // Prepare a channel with an empty service config.
  594. let channel = GRPCChannel(
  595. resolver: .static(endpoints: [Endpoint(address1, address2)]),
  596. connector: .posix(),
  597. config: .defaults,
  598. defaultServiceConfig: ServiceConfig()
  599. )
  600. try await withThrowingDiscardingTaskGroup { group in
  601. // Run the servers.
  602. for server in [server1, server2] {
  603. group.addTask {
  604. try await server.run(.never)
  605. }
  606. }
  607. group.addTask {
  608. await channel.connect()
  609. }
  610. for try await state in channel.connectivityState {
  611. switch state {
  612. case .ready:
  613. // Only server 1 should have a connection.
  614. try await XCTPoll(every: .milliseconds(10)) {
  615. server1.clients.count == 1 && server2.clients.count == 0
  616. }
  617. channel.close()
  618. default:
  619. ()
  620. }
  621. }
  622. group.cancelAll()
  623. }
  624. }
  625. func testQueueRequestsThenClose() async throws {
  626. // Set a high backoff so the channel stays in transient failure for long enough.
  627. var config = GRPCChannel.Config.defaults
  628. config.backoff.initial = .seconds(120)
  629. let channel = GRPCChannel(
  630. resolver: .static(
  631. endpoints: [
  632. Endpoint(.unixDomainSocket(path: "/testQueueRequestsThenClose"))
  633. ]
  634. ),
  635. connector: .posix(),
  636. config: .defaults,
  637. defaultServiceConfig: ServiceConfig()
  638. )
  639. try await withThrowingDiscardingTaskGroup { group in
  640. group.addTask {
  641. await channel.connect()
  642. }
  643. for try await state in channel.connectivityState {
  644. switch state {
  645. case .transientFailure:
  646. group.addTask {
  647. // Sleep a little to increase the chances of the stream being queued before the channel
  648. // reacts to the close.
  649. try await Task.sleep(for: .milliseconds(10))
  650. channel.close()
  651. }
  652. // Try to open a new stream.
  653. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  654. try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
  655. XCTFail("Unexpected new stream")
  656. }
  657. } errorHandler: { error in
  658. XCTAssertEqual(error.code, .unavailable)
  659. }
  660. default:
  661. ()
  662. }
  663. }
  664. group.cancelAll()
  665. }
  666. }
  667. }
  668. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  669. extension GRPCChannel.Config {
  670. static var defaults: Self {
  671. Self(
  672. http2: .defaults,
  673. backoff: .defaults,
  674. connection: .defaults,
  675. compression: .defaults
  676. )
  677. }
  678. }
  679. extension Endpoint {
  680. init(_ addresses: SocketAddress...) {
  681. self.init(addresses: addresses)
  682. }
  683. }
  684. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  685. extension GRPCChannel {
  686. fileprivate func serverAddress() async throws -> String? {
  687. let values: Metadata.StringValues? = try await self.withStream(
  688. descriptor: .echoGet,
  689. options: .defaults
  690. ) { stream in
  691. try await stream.outbound.write(.metadata([:]))
  692. stream.outbound.finish()
  693. for try await part in stream.inbound {
  694. switch part {
  695. case .metadata, .message:
  696. XCTFail("Unexpected part: \(part)")
  697. case .status(_, let metadata):
  698. return metadata[stringValues: "server-addr"]
  699. }
  700. }
  701. return nil
  702. }
  703. return values?.first(where: { _ in true })
  704. }
  705. }