RoundRobinLoadBalancerTests.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. @_spi(Package) @testable import GRPCHTTP2Core
  19. import NIOHTTP2
  20. import NIOPosix
  21. import XCTest
  22. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  23. final class RoundRobinLoadBalancerTests: XCTestCase {
  24. func testMultipleConnectionsAreEstablished() async throws {
  25. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  26. switch event {
  27. case .connectivityStateChanged(.idle):
  28. // Update the addresses for the load balancer, this will trigger subchannels to be created
  29. // for each.
  30. let endpoints = context.servers.map { Endpoint(addresses: [$0.address]) }
  31. context.loadBalancer.updateAddresses(endpoints)
  32. case .connectivityStateChanged(.ready):
  33. // Poll until each server has one connected client.
  34. try await XCTPoll(every: .milliseconds(10)) {
  35. context.servers.allSatisfy { server, _ in server.clients.count == 1 }
  36. }
  37. // Close to end the test.
  38. context.loadBalancer.close()
  39. default:
  40. ()
  41. }
  42. } verifyEvents: { events in
  43. let expected: [LoadBalancerEvent] = [
  44. .connectivityStateChanged(.idle),
  45. .connectivityStateChanged(.connecting),
  46. .connectivityStateChanged(.ready),
  47. .connectivityStateChanged(.shutdown),
  48. ]
  49. XCTAssertEqual(events, expected)
  50. }
  51. }
  52. func testSubchannelsArePickedEvenly() async throws {
  53. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  54. switch event {
  55. case .connectivityStateChanged(.idle):
  56. // Update the addresses for the load balancer, this will trigger subchannels to be created
  57. // for each.
  58. let endpoints = context.servers.map { Endpoint(addresses: [$0.address]) }
  59. context.loadBalancer.updateAddresses(endpoints)
  60. case .connectivityStateChanged(.ready):
  61. // Subchannel is ready. This happens when any subchannel becomes ready. Loop until
  62. // we can pick three distinct subchannels.
  63. try await XCTPoll(every: .milliseconds(10)) {
  64. var subchannelIDs = Set<SubchannelID>()
  65. for _ in 0 ..< 3 {
  66. let subchannel = try XCTUnwrap(context.loadBalancer.pickSubchannel())
  67. subchannelIDs.insert(subchannel.id)
  68. }
  69. return subchannelIDs.count == 3
  70. }
  71. // Now that all are ready, load should be distributed evenly among them.
  72. var counts = [SubchannelID: Int]()
  73. for round in 1 ... 10 {
  74. for _ in 1 ... 3 {
  75. if let subchannel = context.loadBalancer.pickSubchannel() {
  76. counts[subchannel.id, default: 0] += 1
  77. } else {
  78. XCTFail("Didn't pick subchannel from ready load balancer")
  79. }
  80. }
  81. XCTAssertEqual(counts.count, 3, "\(counts)")
  82. XCTAssert(counts.values.allSatisfy({ $0 == round }), "\(counts)")
  83. }
  84. // Close to finish the test.
  85. context.loadBalancer.close()
  86. default:
  87. ()
  88. }
  89. } verifyEvents: { events in
  90. let expected: [LoadBalancerEvent] = [
  91. .connectivityStateChanged(.idle),
  92. .connectivityStateChanged(.connecting),
  93. .connectivityStateChanged(.ready),
  94. .connectivityStateChanged(.shutdown),
  95. ]
  96. XCTAssertEqual(events, expected)
  97. }
  98. }
  99. func testAddressUpdatesAreHandledGracefully() async throws {
  100. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  101. switch event {
  102. case .connectivityStateChanged(.idle):
  103. // Do the first connect.
  104. let endpoints = [Endpoint(addresses: [context.servers[0].address])]
  105. context.loadBalancer.updateAddresses(endpoints)
  106. case .connectivityStateChanged(.ready):
  107. // Now the first connection should be established.
  108. do {
  109. try await XCTPoll(every: .milliseconds(10)) {
  110. context.servers[0].server.clients.count == 1
  111. }
  112. }
  113. // First connection is okay, add a second.
  114. do {
  115. let endpoints = [
  116. Endpoint(addresses: [context.servers[0].address]),
  117. Endpoint(addresses: [context.servers[1].address]),
  118. ]
  119. context.loadBalancer.updateAddresses(endpoints)
  120. try await XCTPoll(every: .milliseconds(10)) {
  121. context.servers.prefix(2).allSatisfy { $0.server.clients.count == 1 }
  122. }
  123. }
  124. // Remove those two endpoints and add a third.
  125. do {
  126. let endpoints = [Endpoint(addresses: [context.servers[2].address])]
  127. context.loadBalancer.updateAddresses(endpoints)
  128. try await XCTPoll(every: .milliseconds(10)) {
  129. let disconnected = context.servers.prefix(2).allSatisfy { $0.server.clients.isEmpty }
  130. let connected = context.servers.last!.server.clients.count == 1
  131. return disconnected && connected
  132. }
  133. }
  134. context.loadBalancer.close()
  135. default:
  136. ()
  137. }
  138. } verifyEvents: { events in
  139. // Transitioning to new addresses should be graceful, i.e. a complete change shouldn't
  140. // result in dropping away from the ready state.
  141. let expected: [LoadBalancerEvent] = [
  142. .connectivityStateChanged(.idle),
  143. .connectivityStateChanged(.connecting),
  144. .connectivityStateChanged(.ready),
  145. .connectivityStateChanged(.shutdown),
  146. ]
  147. XCTAssertEqual(events, expected)
  148. }
  149. }
  150. func testSameAddressUpdatesAreIgnored() async throws {
  151. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  152. switch event {
  153. case .connectivityStateChanged(.idle):
  154. let endpoints = context.servers.map { _, address in Endpoint(addresses: [address]) }
  155. context.loadBalancer.updateAddresses(endpoints)
  156. case .connectivityStateChanged(.ready):
  157. // Update with the same addresses, these should be ignored.
  158. let endpoints = context.servers.map { _, address in Endpoint(addresses: [address]) }
  159. context.loadBalancer.updateAddresses(endpoints)
  160. // We should still have three connections.
  161. try await XCTPoll(every: .milliseconds(10)) {
  162. context.servers.allSatisfy { $0.server.clients.count == 1 }
  163. }
  164. context.loadBalancer.close()
  165. default:
  166. ()
  167. }
  168. } verifyEvents: { events in
  169. let expected: [LoadBalancerEvent] = [
  170. .connectivityStateChanged(.idle),
  171. .connectivityStateChanged(.connecting),
  172. .connectivityStateChanged(.ready),
  173. .connectivityStateChanged(.shutdown),
  174. ]
  175. XCTAssertEqual(events, expected)
  176. }
  177. }
  178. func testEmptyAddressUpdatesAreIgnored() async throws {
  179. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  180. switch event {
  181. case .connectivityStateChanged(.idle):
  182. let endpoints = context.servers.map { _, address in Endpoint(addresses: [address]) }
  183. context.loadBalancer.updateAddresses(endpoints)
  184. case .connectivityStateChanged(.ready):
  185. // Update with no-addresses, should be ignored so a subchannel can still be picked.
  186. context.loadBalancer.updateAddresses([])
  187. // We should still have three connections.
  188. try await XCTPoll(every: .milliseconds(10)) {
  189. context.servers.allSatisfy { $0.server.clients.count == 1 }
  190. }
  191. context.loadBalancer.close()
  192. default:
  193. ()
  194. }
  195. } verifyEvents: { events in
  196. let expected: [LoadBalancerEvent] = [
  197. .connectivityStateChanged(.idle),
  198. .connectivityStateChanged(.connecting),
  199. .connectivityStateChanged(.ready),
  200. .connectivityStateChanged(.shutdown),
  201. ]
  202. XCTAssertEqual(events, expected)
  203. }
  204. }
  205. func testSubchannelReceivesGoAway() async throws {
  206. try await RoundRobinLoadBalancerTest.run(servers: 3, connector: .posix()) { context, event in
  207. switch event {
  208. case .connectivityStateChanged(.idle):
  209. // Trigger the connect.
  210. let endpoints = context.servers.map { Endpoint(addresses: [$0.address]) }
  211. context.loadBalancer.updateAddresses(endpoints)
  212. case .connectivityStateChanged(.ready):
  213. // Wait for all servers to become ready.
  214. try await XCTPoll(every: .milliseconds(10)) {
  215. context.servers.allSatisfy { $0.server.clients.count == 1 }
  216. }
  217. // The above only checks whether each server has a client, the test relies on all three
  218. // subchannels being ready, poll until we get three distinct IDs.
  219. var ids = Set<SubchannelID>()
  220. try await XCTPoll(every: .milliseconds(10)) {
  221. for _ in 1 ... 3 {
  222. if let subchannel = context.loadBalancer.pickSubchannel() {
  223. ids.insert(subchannel.id)
  224. }
  225. }
  226. return ids.count == 3
  227. }
  228. // Pick the first server and send a GOAWAY to the client.
  229. let client = context.servers[0].server.clients[0]
  230. let goAway = HTTP2Frame(
  231. streamID: .rootStream,
  232. payload: .goAway(lastStreamID: 0, errorCode: .cancel, opaqueData: nil)
  233. )
  234. // Send a GOAWAY, this should eventually close the subchannel and trigger a name
  235. // resolution.
  236. client.writeAndFlush(goAway, promise: nil)
  237. case .requiresNameResolution:
  238. // One subchannel should've been taken out, meaning we can only pick from the remaining two:
  239. let id1 = try XCTUnwrap(context.loadBalancer.pickSubchannel()?.id)
  240. let id2 = try XCTUnwrap(context.loadBalancer.pickSubchannel()?.id)
  241. let id3 = try XCTUnwrap(context.loadBalancer.pickSubchannel()?.id)
  242. XCTAssertNotEqual(id1, id2)
  243. XCTAssertEqual(id1, id3)
  244. // End the test.
  245. context.loadBalancer.close()
  246. default:
  247. ()
  248. }
  249. } verifyEvents: { events in
  250. let expected: [LoadBalancerEvent] = [
  251. .connectivityStateChanged(.idle),
  252. .connectivityStateChanged(.connecting),
  253. .connectivityStateChanged(.ready),
  254. .requiresNameResolution,
  255. .connectivityStateChanged(.shutdown),
  256. ]
  257. XCTAssertEqual(events, expected)
  258. }
  259. }
  260. func testPickSubchannelWhenNotReady() {
  261. let loadBalancer = RoundRobinLoadBalancer(
  262. connector: .never,
  263. backoff: .defaults,
  264. defaultCompression: .none,
  265. enabledCompression: .none
  266. )
  267. XCTAssertNil(loadBalancer.pickSubchannel())
  268. }
  269. func testPickSubchannelWhenClosed() async {
  270. let loadBalancer = RoundRobinLoadBalancer(
  271. connector: .never,
  272. backoff: .defaults,
  273. defaultCompression: .none,
  274. enabledCompression: .none
  275. )
  276. loadBalancer.close()
  277. await loadBalancer.run()
  278. XCTAssertNil(loadBalancer.pickSubchannel())
  279. }
  280. func testPickOnIdleLoadBalancerTriggersConnect() async throws {
  281. let idle = ManagedAtomic(0)
  282. let ready = ManagedAtomic(0)
  283. try await RoundRobinLoadBalancerTest.run(
  284. servers: 1,
  285. connector: .posix(maxIdleTime: .milliseconds(25)) // Aggressively idle the connection
  286. ) { context, event in
  287. switch event {
  288. case .connectivityStateChanged(.idle):
  289. let idleCount = idle.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
  290. switch idleCount {
  291. case 1:
  292. // The first idle happens when the load balancer in started, give it a set of addresses
  293. // which it will connect to. Wait for it to be ready and then idle again.
  294. let address = context.servers[0].address
  295. let endpoints = [Endpoint(addresses: [address])]
  296. context.loadBalancer.updateAddresses(endpoints)
  297. case 2:
  298. // Load-balancer has the endpoints but all are idle. Picking will trigger a connect.
  299. XCTAssertNil(context.loadBalancer.pickSubchannel())
  300. case 3:
  301. // Connection idled again. Shut it down.
  302. context.loadBalancer.close()
  303. default:
  304. XCTFail("Became idle too many times")
  305. }
  306. case .connectivityStateChanged(.ready):
  307. let readyCount = ready.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
  308. if readyCount == 2 {
  309. XCTAssertNotNil(context.loadBalancer.pickSubchannel())
  310. }
  311. default:
  312. ()
  313. }
  314. } verifyEvents: { events in
  315. let expected: [LoadBalancerEvent] = [
  316. .connectivityStateChanged(.idle),
  317. .connectivityStateChanged(.connecting),
  318. .connectivityStateChanged(.ready),
  319. .connectivityStateChanged(.idle),
  320. .connectivityStateChanged(.connecting),
  321. .connectivityStateChanged(.ready),
  322. .connectivityStateChanged(.idle),
  323. .connectivityStateChanged(.shutdown),
  324. ]
  325. XCTAssertEqual(events, expected)
  326. }
  327. }
  328. }
  329. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  330. enum RoundRobinLoadBalancerTest {
  331. struct Context {
  332. let servers: [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]
  333. let loadBalancer: RoundRobinLoadBalancer
  334. }
  335. static func run(
  336. servers serverCount: Int,
  337. connector: any HTTP2Connector,
  338. backoff: ConnectionBackoff = .defaults,
  339. timeout: Duration = .seconds(10),
  340. function: String = #function,
  341. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  342. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in }
  343. ) async throws {
  344. enum TestEvent {
  345. case timedOut
  346. case completed(Result<Void, Error>)
  347. }
  348. try await withThrowingTaskGroup(of: TestEvent.self) { group in
  349. group.addTask {
  350. try? await Task.sleep(for: timeout)
  351. return .timedOut
  352. }
  353. group.addTask {
  354. do {
  355. try await Self._run(
  356. servers: serverCount,
  357. connector: connector,
  358. backoff: backoff,
  359. handleEvent: handleEvent,
  360. verifyEvents: verifyEvents
  361. )
  362. return .completed(.success(()))
  363. } catch {
  364. return .completed(.failure(error))
  365. }
  366. }
  367. let result = try await group.next()!
  368. group.cancelAll()
  369. switch result {
  370. case .timedOut:
  371. XCTFail("'\(function)' timed out after \(timeout)")
  372. case .completed(let result):
  373. try result.get()
  374. }
  375. }
  376. }
  377. private static func _run(
  378. servers serverCount: Int,
  379. connector: some HTTP2Connector,
  380. backoff: ConnectionBackoff,
  381. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  382. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void
  383. ) async throws {
  384. try await withThrowingTaskGroup(of: Void.self) { group in
  385. // Create the test servers.
  386. var servers = [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]()
  387. for _ in 1 ... serverCount {
  388. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  389. let address = try await server.bind()
  390. servers.append((server, address))
  391. group.addTask {
  392. try await server.run { _, _ in
  393. XCTFail("Unexpected stream")
  394. }
  395. }
  396. }
  397. // Create the load balancer.
  398. let loadBalancer = RoundRobinLoadBalancer(
  399. connector: connector,
  400. backoff: backoff,
  401. defaultCompression: .none,
  402. enabledCompression: .none
  403. )
  404. group.addTask {
  405. await loadBalancer.run()
  406. }
  407. let context = Context(servers: servers, loadBalancer: loadBalancer)
  408. var events = [LoadBalancerEvent]()
  409. for await event in loadBalancer.events {
  410. events.append(event)
  411. try await handleEvent(context, event)
  412. }
  413. verifyEvents(events)
  414. group.cancelAll()
  415. }
  416. }
  417. }