PickFirstLoadBalancerTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCHTTP2Core
  18. import NIOHTTP2
  19. import NIOPosix
  20. import XCTest
  21. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  22. final class PickFirstLoadBalancerTests: XCTestCase {
  23. func testPickFirstConnectsToServer() async throws {
  24. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  25. switch event {
  26. case .connectivityStateChanged(.idle):
  27. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  28. context.pickFirst!.updateEndpoint(endpoint)
  29. case .connectivityStateChanged(.ready):
  30. context.loadBalancer.close()
  31. default:
  32. ()
  33. }
  34. } verifyEvents: { events in
  35. let expected: [LoadBalancerEvent] = [
  36. .connectivityStateChanged(.idle),
  37. .connectivityStateChanged(.connecting),
  38. .connectivityStateChanged(.ready),
  39. .connectivityStateChanged(.shutdown),
  40. ]
  41. XCTAssertEqual(events, expected)
  42. }
  43. }
  44. func testPickSubchannelWhenNotReady() async throws {
  45. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  46. switch event {
  47. case .connectivityStateChanged(.idle):
  48. XCTAssertNil(context.loadBalancer.pickSubchannel())
  49. context.loadBalancer.close()
  50. case .connectivityStateChanged(.shutdown):
  51. XCTAssertNil(context.loadBalancer.pickSubchannel())
  52. default:
  53. ()
  54. }
  55. } verifyEvents: { events in
  56. let expected: [LoadBalancerEvent] = [
  57. .connectivityStateChanged(.idle),
  58. .connectivityStateChanged(.shutdown),
  59. ]
  60. XCTAssertEqual(events, expected)
  61. }
  62. }
  63. func testPickSubchannelReturnsSameSubchannel() async throws {
  64. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  65. switch event {
  66. case .connectivityStateChanged(.idle):
  67. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  68. context.pickFirst!.updateEndpoint(endpoint)
  69. case .connectivityStateChanged(.ready):
  70. var ids = Set<SubchannelID>()
  71. for _ in 0 ..< 100 {
  72. let subchannel = try XCTUnwrap(context.loadBalancer.pickSubchannel())
  73. ids.insert(subchannel.id)
  74. }
  75. XCTAssertEqual(ids.count, 1)
  76. context.loadBalancer.close()
  77. default:
  78. ()
  79. }
  80. } verifyEvents: { events in
  81. let expected: [LoadBalancerEvent] = [
  82. .connectivityStateChanged(.idle),
  83. .connectivityStateChanged(.connecting),
  84. .connectivityStateChanged(.ready),
  85. .connectivityStateChanged(.shutdown),
  86. ]
  87. XCTAssertEqual(events, expected)
  88. }
  89. }
  90. func testEndpointUpdateHandledGracefully() async throws {
  91. try await LoadBalancerTest.pickFirst(servers: 2, connector: .posix()) { context, event in
  92. switch event {
  93. case .connectivityStateChanged(.idle):
  94. let endpoint = Endpoint(addresses: [context.servers[0].address])
  95. context.pickFirst!.updateEndpoint(endpoint)
  96. case .connectivityStateChanged(.ready):
  97. // Must be connected to server-0.
  98. try await XCTPoll(every: .milliseconds(10)) {
  99. context.servers[0].server.clients.count == 1
  100. }
  101. // Update the endpoint so that it contains server-1.
  102. let endpoint = Endpoint(addresses: [context.servers[1].address])
  103. context.pickFirst!.updateEndpoint(endpoint)
  104. // Should remain in the ready state
  105. try await XCTPoll(every: .milliseconds(10)) {
  106. context.servers[0].server.clients.isEmpty && context.servers[1].server.clients.count == 1
  107. }
  108. context.loadBalancer.close()
  109. default:
  110. ()
  111. }
  112. } verifyEvents: { events in
  113. let expected: [LoadBalancerEvent] = [
  114. .connectivityStateChanged(.idle),
  115. .connectivityStateChanged(.connecting),
  116. .connectivityStateChanged(.ready),
  117. .connectivityStateChanged(.shutdown),
  118. ]
  119. XCTAssertEqual(events, expected)
  120. }
  121. }
  122. func testSameEndpointUpdateIsIgnored() async throws {
  123. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  124. switch event {
  125. case .connectivityStateChanged(.idle):
  126. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  127. context.pickFirst!.updateEndpoint(endpoint)
  128. case .connectivityStateChanged(.ready):
  129. // Must be connected to server-0.
  130. try await XCTPoll(every: .milliseconds(10)) {
  131. context.servers[0].server.clients.count == 1
  132. }
  133. // Update the endpoint. This should be a no-op, server should remain connected.
  134. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  135. context.pickFirst!.updateEndpoint(endpoint)
  136. try await XCTPoll(every: .milliseconds(10)) {
  137. context.servers[0].server.clients.count == 1
  138. }
  139. context.loadBalancer.close()
  140. default:
  141. ()
  142. }
  143. } verifyEvents: { events in
  144. let expected: [LoadBalancerEvent] = [
  145. .connectivityStateChanged(.idle),
  146. .connectivityStateChanged(.connecting),
  147. .connectivityStateChanged(.ready),
  148. .connectivityStateChanged(.shutdown),
  149. ]
  150. XCTAssertEqual(events, expected)
  151. }
  152. }
  153. func testEmptyEndpointUpdateIsIgnored() async throws {
  154. // Checks that an update using the empty endpoint is ignored.
  155. try await LoadBalancerTest.pickFirst(servers: 0, connector: .posix()) { context, event in
  156. switch event {
  157. case .connectivityStateChanged(.idle):
  158. let endpoint = Endpoint(addresses: [])
  159. // Should no-op.
  160. context.pickFirst!.updateEndpoint(endpoint)
  161. context.loadBalancer.close()
  162. default:
  163. ()
  164. }
  165. } verifyEvents: { events in
  166. let expected: [LoadBalancerEvent] = [
  167. .connectivityStateChanged(.idle),
  168. .connectivityStateChanged(.shutdown),
  169. ]
  170. XCTAssertEqual(events, expected)
  171. }
  172. }
  173. func testPickOnIdleTriggersConnect() async throws {
  174. // Tests that picking a subchannel when the load balancer is idle triggers a reconnect and
  175. // becomes ready again. Uses a very short idle time to re-enter the idle state.
  176. let idle = AtomicCounter()
  177. try await LoadBalancerTest.pickFirst(
  178. servers: 1,
  179. connector: .posix(maxIdleTime: .milliseconds(1)) // Aggressively idle the connection
  180. ) { context, event in
  181. switch event {
  182. case .connectivityStateChanged(.idle):
  183. let (_, idleCount) = idle.increment()
  184. switch idleCount {
  185. case 1:
  186. // The first idle happens when the load balancer in started, give it an endpoint
  187. // which it will connect to. Wait for it to be ready and then idle again.
  188. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  189. context.pickFirst!.updateEndpoint(endpoint)
  190. case 2:
  191. // Load-balancer has the endpoints but all are idle. Picking will trigger a connect.
  192. XCTAssertNil(context.loadBalancer.pickSubchannel())
  193. case 3:
  194. // Connection idled again. Shut it down.
  195. context.loadBalancer.close()
  196. default:
  197. XCTFail("Became idle too many times")
  198. }
  199. default:
  200. ()
  201. }
  202. } verifyEvents: { events in
  203. let expected: [LoadBalancerEvent] = [
  204. .connectivityStateChanged(.idle),
  205. .connectivityStateChanged(.connecting),
  206. .connectivityStateChanged(.ready),
  207. .connectivityStateChanged(.idle),
  208. .connectivityStateChanged(.connecting),
  209. .connectivityStateChanged(.ready),
  210. .connectivityStateChanged(.idle),
  211. .connectivityStateChanged(.shutdown),
  212. ]
  213. XCTAssertEqual(events, expected)
  214. }
  215. }
  216. func testPickFirstConnectionDropReturnsToIdle() async throws {
  217. // Checks that when the load balancers connection is unexpectedly dropped when there are no
  218. // open streams that it returns to the idle state.
  219. let idleCount = AtomicCounter()
  220. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  221. switch event {
  222. case .connectivityStateChanged(.idle):
  223. let (_, newIdleCount) = idleCount.increment()
  224. switch newIdleCount {
  225. case 1:
  226. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  227. context.pickFirst!.updateEndpoint(endpoint)
  228. case 2:
  229. context.loadBalancer.close()
  230. default:
  231. ()
  232. }
  233. case .connectivityStateChanged(.ready):
  234. // Drop the connection.
  235. context.servers[0].server.clients[0].close(mode: .all, promise: nil)
  236. default:
  237. ()
  238. }
  239. } verifyEvents: { events in
  240. let expected: [LoadBalancerEvent] = [
  241. .connectivityStateChanged(.idle),
  242. .connectivityStateChanged(.connecting),
  243. .connectivityStateChanged(.ready),
  244. .connectivityStateChanged(.idle),
  245. .connectivityStateChanged(.shutdown),
  246. ]
  247. XCTAssertEqual(events, expected)
  248. }
  249. }
  250. func testPickFirstReceivesGoAway() async throws {
  251. let idleCount = AtomicCounter()
  252. try await LoadBalancerTest.pickFirst(servers: 2, connector: .posix()) { context, event in
  253. switch event {
  254. case .connectivityStateChanged(.idle):
  255. let (_, newIdleCount) = idleCount.increment()
  256. switch newIdleCount {
  257. case 1:
  258. // Provide the address of the first server.
  259. context.pickFirst!.updateEndpoint(Endpoint(context.servers[0].address))
  260. case 2:
  261. // Provide the address of the second server.
  262. context.pickFirst!.updateEndpoint(Endpoint(context.servers[1].address))
  263. default:
  264. ()
  265. }
  266. case .connectivityStateChanged(.ready):
  267. switch idleCount.value {
  268. case 1:
  269. // Must be connected to server 1, send a GOAWAY frame.
  270. let channel = context.servers[0].server.clients.first!
  271. let goAway = HTTP2Frame(
  272. streamID: .rootStream,
  273. payload: .goAway(lastStreamID: 0, errorCode: .noError, opaqueData: nil)
  274. )
  275. channel.writeAndFlush(goAway, promise: nil)
  276. case 2:
  277. // Must only be connected to server 2 now.
  278. XCTAssertEqual(context.servers[0].server.clients.count, 0)
  279. XCTAssertEqual(context.servers[1].server.clients.count, 1)
  280. context.loadBalancer.close()
  281. default:
  282. ()
  283. }
  284. default:
  285. ()
  286. }
  287. } verifyEvents: { events in
  288. let expected: [LoadBalancerEvent] = [
  289. .connectivityStateChanged(.idle),
  290. .connectivityStateChanged(.connecting),
  291. .connectivityStateChanged(.ready),
  292. .requiresNameResolution,
  293. .connectivityStateChanged(.idle),
  294. .connectivityStateChanged(.connecting),
  295. .connectivityStateChanged(.ready),
  296. .connectivityStateChanged(.shutdown),
  297. ]
  298. XCTAssertEqual(events, expected)
  299. }
  300. }
  301. }