PickFirstLoadBalancerTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCHTTP2Core
  19. import NIOHTTP2
  20. import NIOPosix
  21. import XCTest
  22. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  23. final class PickFirstLoadBalancerTests: XCTestCase {
  24. func testPickFirstConnectsToServer() async throws {
  25. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  26. switch event {
  27. case .connectivityStateChanged(.idle):
  28. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  29. context.pickFirst!.updateEndpoint(endpoint)
  30. case .connectivityStateChanged(.ready):
  31. context.loadBalancer.close()
  32. default:
  33. ()
  34. }
  35. } verifyEvents: { events in
  36. let expected: [LoadBalancerEvent] = [
  37. .connectivityStateChanged(.idle),
  38. .connectivityStateChanged(.connecting),
  39. .connectivityStateChanged(.ready),
  40. .connectivityStateChanged(.shutdown),
  41. ]
  42. XCTAssertEqual(events, expected)
  43. }
  44. }
  45. func testPickSubchannelWhenNotReady() async throws {
  46. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  47. switch event {
  48. case .connectivityStateChanged(.idle):
  49. XCTAssertNil(context.loadBalancer.pickSubchannel())
  50. context.loadBalancer.close()
  51. case .connectivityStateChanged(.shutdown):
  52. XCTAssertNil(context.loadBalancer.pickSubchannel())
  53. default:
  54. ()
  55. }
  56. } verifyEvents: { events in
  57. let expected: [LoadBalancerEvent] = [
  58. .connectivityStateChanged(.idle),
  59. .connectivityStateChanged(.shutdown),
  60. ]
  61. XCTAssertEqual(events, expected)
  62. }
  63. }
  64. func testPickSubchannelReturnsSameSubchannel() async throws {
  65. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  66. switch event {
  67. case .connectivityStateChanged(.idle):
  68. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  69. context.pickFirst!.updateEndpoint(endpoint)
  70. case .connectivityStateChanged(.ready):
  71. var ids = Set<SubchannelID>()
  72. for _ in 0 ..< 100 {
  73. let subchannel = try XCTUnwrap(context.loadBalancer.pickSubchannel())
  74. ids.insert(subchannel.id)
  75. }
  76. XCTAssertEqual(ids.count, 1)
  77. context.loadBalancer.close()
  78. default:
  79. ()
  80. }
  81. } verifyEvents: { events in
  82. let expected: [LoadBalancerEvent] = [
  83. .connectivityStateChanged(.idle),
  84. .connectivityStateChanged(.connecting),
  85. .connectivityStateChanged(.ready),
  86. .connectivityStateChanged(.shutdown),
  87. ]
  88. XCTAssertEqual(events, expected)
  89. }
  90. }
  91. func testEndpointUpdateHandledGracefully() async throws {
  92. try await LoadBalancerTest.pickFirst(servers: 2, connector: .posix()) { context, event in
  93. switch event {
  94. case .connectivityStateChanged(.idle):
  95. let endpoint = Endpoint(addresses: [context.servers[0].address])
  96. context.pickFirst!.updateEndpoint(endpoint)
  97. case .connectivityStateChanged(.ready):
  98. // Must be connected to server-0.
  99. try await XCTPoll(every: .milliseconds(10)) {
  100. context.servers[0].server.clients.count == 1
  101. }
  102. // Update the endpoint so that it contains server-1.
  103. let endpoint = Endpoint(addresses: [context.servers[1].address])
  104. context.pickFirst!.updateEndpoint(endpoint)
  105. // Should remain in the ready state
  106. try await XCTPoll(every: .milliseconds(10)) {
  107. context.servers[0].server.clients.isEmpty && context.servers[1].server.clients.count == 1
  108. }
  109. context.loadBalancer.close()
  110. default:
  111. ()
  112. }
  113. } verifyEvents: { events in
  114. let expected: [LoadBalancerEvent] = [
  115. .connectivityStateChanged(.idle),
  116. .connectivityStateChanged(.connecting),
  117. .connectivityStateChanged(.ready),
  118. .connectivityStateChanged(.shutdown),
  119. ]
  120. XCTAssertEqual(events, expected)
  121. }
  122. }
  123. func testSameEndpointUpdateIsIgnored() async throws {
  124. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  125. switch event {
  126. case .connectivityStateChanged(.idle):
  127. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  128. context.pickFirst!.updateEndpoint(endpoint)
  129. case .connectivityStateChanged(.ready):
  130. // Must be connected to server-0.
  131. try await XCTPoll(every: .milliseconds(10)) {
  132. context.servers[0].server.clients.count == 1
  133. }
  134. // Update the endpoint. This should be a no-op, server should remain connected.
  135. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  136. context.pickFirst!.updateEndpoint(endpoint)
  137. try await XCTPoll(every: .milliseconds(10)) {
  138. context.servers[0].server.clients.count == 1
  139. }
  140. context.loadBalancer.close()
  141. default:
  142. ()
  143. }
  144. } verifyEvents: { events in
  145. let expected: [LoadBalancerEvent] = [
  146. .connectivityStateChanged(.idle),
  147. .connectivityStateChanged(.connecting),
  148. .connectivityStateChanged(.ready),
  149. .connectivityStateChanged(.shutdown),
  150. ]
  151. XCTAssertEqual(events, expected)
  152. }
  153. }
  154. func testEmptyEndpointUpdateIsIgnored() async throws {
  155. // Checks that an update using the empty endpoint is ignored.
  156. try await LoadBalancerTest.pickFirst(servers: 0, connector: .posix()) { context, event in
  157. switch event {
  158. case .connectivityStateChanged(.idle):
  159. let endpoint = Endpoint(addresses: [])
  160. // Should no-op.
  161. context.pickFirst!.updateEndpoint(endpoint)
  162. context.loadBalancer.close()
  163. default:
  164. ()
  165. }
  166. } verifyEvents: { events in
  167. let expected: [LoadBalancerEvent] = [
  168. .connectivityStateChanged(.idle),
  169. .connectivityStateChanged(.shutdown),
  170. ]
  171. XCTAssertEqual(events, expected)
  172. }
  173. }
  174. func testPickOnIdleTriggersConnect() async throws {
  175. // Tests that picking a subchannel when the load balancer is idle triggers a reconnect and
  176. // becomes ready again. Uses a very short idle time to re-enter the idle state.
  177. let idle = ManagedAtomic(0)
  178. try await LoadBalancerTest.pickFirst(
  179. servers: 1,
  180. connector: .posix(maxIdleTime: .milliseconds(1)) // Aggressively idle the connection
  181. ) { context, event in
  182. switch event {
  183. case .connectivityStateChanged(.idle):
  184. let idleCount = idle.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
  185. switch idleCount {
  186. case 1:
  187. // The first idle happens when the load balancer in started, give it an endpoint
  188. // which it will connect to. Wait for it to be ready and then idle again.
  189. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  190. context.pickFirst!.updateEndpoint(endpoint)
  191. case 2:
  192. // Load-balancer has the endpoints but all are idle. Picking will trigger a connect.
  193. XCTAssertNil(context.loadBalancer.pickSubchannel())
  194. case 3:
  195. // Connection idled again. Shut it down.
  196. context.loadBalancer.close()
  197. default:
  198. XCTFail("Became idle too many times")
  199. }
  200. default:
  201. ()
  202. }
  203. } verifyEvents: { events in
  204. let expected: [LoadBalancerEvent] = [
  205. .connectivityStateChanged(.idle),
  206. .connectivityStateChanged(.connecting),
  207. .connectivityStateChanged(.ready),
  208. .connectivityStateChanged(.idle),
  209. .connectivityStateChanged(.connecting),
  210. .connectivityStateChanged(.ready),
  211. .connectivityStateChanged(.idle),
  212. .connectivityStateChanged(.shutdown),
  213. ]
  214. XCTAssertEqual(events, expected)
  215. }
  216. }
  217. func testPickFirstConnectionDropReturnsToIdle() async throws {
  218. // Checks that when the load balancers connection is unexpectedly dropped when there are no
  219. // open streams that it returns to the idle state.
  220. let idleCount = ManagedAtomic(0)
  221. try await LoadBalancerTest.pickFirst(servers: 1, connector: .posix()) { context, event in
  222. switch event {
  223. case .connectivityStateChanged(.idle):
  224. switch idleCount.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
  225. case 1:
  226. let endpoint = Endpoint(addresses: context.servers.map { $0.address })
  227. context.pickFirst!.updateEndpoint(endpoint)
  228. case 2:
  229. context.loadBalancer.close()
  230. default:
  231. ()
  232. }
  233. case .connectivityStateChanged(.ready):
  234. // Drop the connection.
  235. context.servers[0].server.clients[0].close(mode: .all, promise: nil)
  236. default:
  237. ()
  238. }
  239. } verifyEvents: { events in
  240. let expected: [LoadBalancerEvent] = [
  241. .connectivityStateChanged(.idle),
  242. .connectivityStateChanged(.connecting),
  243. .connectivityStateChanged(.ready),
  244. .connectivityStateChanged(.idle),
  245. .connectivityStateChanged(.shutdown),
  246. ]
  247. XCTAssertEqual(events, expected)
  248. }
  249. }
  250. func testPickFirstReceivesGoAway() async throws {
  251. let idleCount = ManagedAtomic(0)
  252. try await LoadBalancerTest.pickFirst(servers: 2, connector: .posix()) { context, event in
  253. switch event {
  254. case .connectivityStateChanged(.idle):
  255. switch idleCount.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
  256. case 1:
  257. // Provide the address of the first server.
  258. context.pickFirst!.updateEndpoint(Endpoint(context.servers[0].address))
  259. case 2:
  260. // Provide the address of the second server.
  261. context.pickFirst!.updateEndpoint(Endpoint(context.servers[1].address))
  262. default:
  263. ()
  264. }
  265. case .connectivityStateChanged(.ready):
  266. switch idleCount.load(ordering: .sequentiallyConsistent) {
  267. case 1:
  268. // Must be connected to server 1, send a GOAWAY frame.
  269. let channel = context.servers[0].server.clients.first!
  270. let goAway = HTTP2Frame(
  271. streamID: .rootStream,
  272. payload: .goAway(lastStreamID: 0, errorCode: .noError, opaqueData: nil)
  273. )
  274. channel.writeAndFlush(goAway, promise: nil)
  275. case 2:
  276. // Must only be connected to server 2 now.
  277. XCTAssertEqual(context.servers[0].server.clients.count, 0)
  278. XCTAssertEqual(context.servers[1].server.clients.count, 1)
  279. context.loadBalancer.close()
  280. default:
  281. ()
  282. }
  283. default:
  284. ()
  285. }
  286. } verifyEvents: { events in
  287. let expected: [LoadBalancerEvent] = [
  288. .connectivityStateChanged(.idle),
  289. .connectivityStateChanged(.connecting),
  290. .connectivityStateChanged(.ready),
  291. .requiresNameResolution,
  292. .connectivityStateChanged(.idle),
  293. .connectivityStateChanged(.connecting),
  294. .connectivityStateChanged(.ready),
  295. .connectivityStateChanged(.shutdown),
  296. ]
  297. XCTAssertEqual(events, expected)
  298. }
  299. }
  300. }