RoundRobinLoadBalancer.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. /// A load-balancer which maintains to a set of subchannels and uses round-robin to pick a
  18. /// subchannel when picking a subchannel to use.
  19. ///
  20. /// This load-balancer starts in an 'idle' state and begins connecting when a set of addresses is
  21. /// provided to it with ``updateAddresses(_:)``. Repeated calls to ``updateAddresses(_:)`` will
  22. /// update the subchannels gracefully: new subchannels will be added for new addresses and existing
  23. /// subchannels will be removed if their addresses are no longer present.
  24. ///
  25. /// The state of the load-balancer is aggregated across the state of its subchannels, changes in
  26. /// the aggregate state are reported up via ``events``.
  27. ///
  28. /// You must call ``close()`` on the load-balancer when it's no longer required. This will move
  29. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  30. /// calls to ``makeStream(descriptor:options:)`` will fail.
  31. ///
  32. /// To use this load-balancer you must run it in a task:
  33. ///
  34. /// ```swift
  35. /// await withDiscardingTaskGroup { group in
  36. /// // Run the load-balancer
  37. /// group.addTask { await roundRobin.run() }
  38. ///
  39. /// // Update its address list
  40. /// let endpoints: [Endpoint] = [
  41. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1001)]),
  42. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1002)]),
  43. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1003)])
  44. /// ]
  45. /// roundRobin.updateAddresses(endpoints)
  46. ///
  47. /// // Consume state update events
  48. /// for await event in roundRobin.events {
  49. /// switch event {
  50. /// case .connectivityStateChanged(.ready):
  51. /// // ...
  52. /// default:
  53. /// // ...
  54. /// }
  55. /// }
  56. /// }
  57. /// ```
  58. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  59. struct RoundRobinLoadBalancer {
  60. enum Input: Sendable, Hashable {
  61. /// Update the addresses used by the load balancer to the following endpoints.
  62. case updateAddresses([Endpoint])
  63. /// Close the load balancer.
  64. case close
  65. }
  66. /// A key for an endpoint which identifies it uniquely, regardless of the ordering of addresses.
  67. private struct EndpointKey: Hashable, Sendable, CustomStringConvertible {
  68. /// Opaque data.
  69. private let opaque: [String]
  70. /// The endpoint this key is for.
  71. let endpoint: Endpoint
  72. init(_ endpoint: Endpoint) {
  73. self.endpoint = endpoint
  74. self.opaque = endpoint.addresses.map { String(describing: $0) }.sorted()
  75. }
  76. var description: String {
  77. String(describing: self.endpoint.addresses)
  78. }
  79. func hash(into hasher: inout Hasher) {
  80. hasher.combine(self.opaque)
  81. }
  82. static func == (lhs: Self, rhs: Self) -> Bool {
  83. lhs.opaque == rhs.opaque
  84. }
  85. }
  86. /// Events which can happen to the load balancer.
  87. private let event:
  88. (
  89. stream: AsyncStream<LoadBalancerEvent>,
  90. continuation: AsyncStream<LoadBalancerEvent>.Continuation
  91. )
  92. /// Inputs which this load balancer should react to.
  93. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  94. /// The state of the load balancer.
  95. private let state: _LockedValueBox<State>
  96. /// A connector, capable of creating connections.
  97. private let connector: any HTTP2Connector
  98. /// Connection backoff configuration.
  99. private let backoff: ConnectionBackoff
  100. /// The default compression algorithm to use. Can be overridden on a per-call basis.
  101. private let defaultCompression: CompressionAlgorithm
  102. /// The set of enabled compression algorithms.
  103. private let enabledCompression: CompressionAlgorithmSet
  104. init(
  105. connector: any HTTP2Connector,
  106. backoff: ConnectionBackoff,
  107. defaultCompression: CompressionAlgorithm,
  108. enabledCompression: CompressionAlgorithmSet
  109. ) {
  110. self.connector = connector
  111. self.backoff = backoff
  112. self.defaultCompression = defaultCompression
  113. self.enabledCompression = enabledCompression
  114. self.event = AsyncStream.makeStream(of: LoadBalancerEvent.self)
  115. self.input = AsyncStream.makeStream(of: Input.self)
  116. self.state = _LockedValueBox(.active(State.Active()))
  117. // The load balancer starts in the idle state.
  118. self.event.continuation.yield(.connectivityStateChanged(.idle))
  119. }
  120. /// A stream of events which can happen to the load balancer.
  121. var events: AsyncStream<LoadBalancerEvent> {
  122. self.event.stream
  123. }
  124. /// Runs the load balancer, returning when it has closed.
  125. ///
  126. /// You can monitor events which happen on the load balancer with ``events``.
  127. func run() async {
  128. await withDiscardingTaskGroup { group in
  129. for await input in self.input.stream {
  130. switch input {
  131. case .updateAddresses(let addresses):
  132. self.handleUpdateAddresses(addresses, in: &group)
  133. case .close:
  134. self.handleCloseInput()
  135. }
  136. }
  137. }
  138. if Task.isCancelled {
  139. // Finish the event stream as it's unlikely to have been finished by a regular code path.
  140. self.event.continuation.finish()
  141. }
  142. }
  143. /// Update the addresses used by the load balancer.
  144. ///
  145. /// This may result in new subchannels being created and some subchannels being removed.
  146. func updateAddresses(_ endpoints: [Endpoint]) {
  147. self.input.continuation.yield(.updateAddresses(endpoints))
  148. }
  149. /// Close the load balancer, and all subchannels it manages.
  150. func close() {
  151. self.input.continuation.yield(.close)
  152. }
  153. /// Pick a ready subchannel from the load balancer.
  154. ///
  155. /// - Returns: A subchannel, or `nil` if there aren't any ready subchannels.
  156. func pickSubchannel() -> Subchannel? {
  157. switch self.state.withLockedValue({ $0.pickSubchannel() }) {
  158. case .picked(let subchannel):
  159. return subchannel
  160. case .notAvailable(let subchannels):
  161. // Tell the subchannels to start connecting.
  162. for subchannel in subchannels {
  163. subchannel.connect()
  164. }
  165. return nil
  166. }
  167. }
  168. }
  169. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  170. extension RoundRobinLoadBalancer {
  171. /// Handles an update in endpoints.
  172. ///
  173. /// The load-balancer will diff the set of endpoints with the existing set of endpoints:
  174. /// - endpoints which are new will have subchannels created for them,
  175. /// - endpoints which existed previously but are not present in `endpoints` are closed,
  176. /// - endpoints which existed previously and are still present in `endpoints` are untouched.
  177. ///
  178. /// This process is gradual: the load-balancer won't remove an old endpoint until a subchannel
  179. /// for a corresponding new subchannel becomes ready.
  180. ///
  181. /// - Parameters:
  182. /// - endpoints: Endpoints which should have subchannels. Must not be empty.
  183. /// - group: The group which should manage and run new subchannels.
  184. private func handleUpdateAddresses(_ endpoints: [Endpoint], in group: inout DiscardingTaskGroup) {
  185. if endpoints.isEmpty { return }
  186. // Compute the keys for each endpoint.
  187. let newEndpoints = Set(endpoints.map { EndpointKey($0) })
  188. let (added, removed, newState) = self.state.withLockedValue { state in
  189. state.updateSubchannels(newEndpoints: newEndpoints) { endpoint, id in
  190. Subchannel(
  191. endpoint: endpoint,
  192. id: id,
  193. connector: self.connector,
  194. backoff: self.backoff,
  195. defaultCompression: self.defaultCompression,
  196. enabledCompression: self.enabledCompression
  197. )
  198. }
  199. }
  200. // Publish the new connectivity state.
  201. if let newState = newState {
  202. self.event.continuation.yield(.connectivityStateChanged(newState))
  203. }
  204. // Run each of the new subchannels.
  205. for subchannel in added {
  206. let key = EndpointKey(subchannel.endpoint)
  207. self.runSubchannel(subchannel, forKey: key, in: &group)
  208. }
  209. // Old subchannels are removed when new subchannels become ready. Excess subchannels are only
  210. // present if there are more to remove than to add. These are the excess subchannels which
  211. // are closed now.
  212. for subchannel in removed {
  213. subchannel.close()
  214. }
  215. }
  216. private func runSubchannel(
  217. _ subchannel: Subchannel,
  218. forKey key: EndpointKey,
  219. in group: inout DiscardingTaskGroup
  220. ) {
  221. // Start running it and tell it to connect.
  222. subchannel.connect()
  223. group.addTask {
  224. await subchannel.run()
  225. }
  226. group.addTask {
  227. for await event in subchannel.events {
  228. switch event {
  229. case .connectivityStateChanged(let state):
  230. self.handleSubchannelConnectivityStateChange(state, key: key)
  231. case .goingAway:
  232. self.handleSubchannelGoingAway(key: key)
  233. case .requiresNameResolution:
  234. self.event.continuation.yield(.requiresNameResolution)
  235. }
  236. }
  237. }
  238. }
  239. private func handleSubchannelConnectivityStateChange(
  240. _ connectivityState: ConnectivityState,
  241. key: EndpointKey
  242. ) {
  243. let onChange = self.state.withLockedValue { state in
  244. state.updateSubchannelConnectivityState(connectivityState, key: key)
  245. }
  246. switch onChange {
  247. case .publishStateChange(let aggregateState):
  248. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  249. case .closeAndPublishStateChange(let subchannel, let aggregateState):
  250. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  251. subchannel.close()
  252. case .close(let subchannel):
  253. subchannel.close()
  254. case .closed:
  255. // All subchannels are closed; finish the streams so the run loop exits.
  256. self.event.continuation.finish()
  257. self.input.continuation.finish()
  258. case .none:
  259. ()
  260. }
  261. }
  262. private func handleSubchannelGoingAway(key: EndpointKey) {
  263. switch self.state.withLockedValue({ $0.parkSubchannel(withKey: key) }) {
  264. case .closeAndUpdateState(_, let connectivityState):
  265. // No need to close the subchannel, it's already going away and will close itself.
  266. if let connectivityState = connectivityState {
  267. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  268. }
  269. case .none:
  270. ()
  271. }
  272. }
  273. private func handleCloseInput() {
  274. switch self.state.withLockedValue({ $0.close() }) {
  275. case .closeSubchannels(let subchannels):
  276. // Publish a new shutdown state, this LB is no longer usable for new RPCs.
  277. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  278. // Close the subchannels.
  279. for subchannel in subchannels {
  280. subchannel.close()
  281. }
  282. case .closed:
  283. // No subchannels to close.
  284. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  285. self.event.continuation.finish()
  286. self.input.continuation.finish()
  287. case .none:
  288. ()
  289. }
  290. }
  291. }
  292. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  293. extension RoundRobinLoadBalancer {
  294. private enum State {
  295. case active(Active)
  296. case closing(Closing)
  297. case closed
  298. struct Active {
  299. private(set) var aggregateConnectivityState: ConnectivityState
  300. private var picker: Picker?
  301. var endpoints: [Endpoint]
  302. var subchannels: [EndpointKey: SubchannelState]
  303. var parkedSubchannels: [EndpointKey: Subchannel]
  304. init() {
  305. self.endpoints = []
  306. self.subchannels = [:]
  307. self.parkedSubchannels = [:]
  308. self.aggregateConnectivityState = .idle
  309. self.picker = nil
  310. }
  311. mutating func updateConnectivityState(
  312. _ state: ConnectivityState,
  313. key: EndpointKey
  314. ) -> OnSubchannelConnectivityStateUpdate {
  315. if let changed = self.subchannels[key]?.updateState(state) {
  316. guard changed else { return .none }
  317. let subchannelToClose: Subchannel?
  318. switch state {
  319. case .ready:
  320. if let index = self.subchannels.firstIndex(where: { $0.value.markedForRemoval }) {
  321. let (key, subchannelState) = self.subchannels.remove(at: index)
  322. self.parkedSubchannels[key] = subchannelState.subchannel
  323. subchannelToClose = subchannelState.subchannel
  324. } else {
  325. subchannelToClose = nil
  326. }
  327. case .idle, .connecting, .transientFailure, .shutdown:
  328. subchannelToClose = nil
  329. }
  330. let aggregateState = self.refreshPickerAndAggregateState()
  331. switch (subchannelToClose, aggregateState) {
  332. case (.some(let subchannel), .some(let state)):
  333. return .closeAndPublishStateChange(subchannel, state)
  334. case (.some(let subchannel), .none):
  335. return .close(subchannel)
  336. case (.none, .some(let state)):
  337. return .publishStateChange(state)
  338. case (.none, .none):
  339. return .none
  340. }
  341. } else {
  342. switch state {
  343. case .idle, .connecting, .ready, .transientFailure:
  344. ()
  345. case .shutdown:
  346. self.parkedSubchannels.removeValue(forKey: key)
  347. }
  348. return .none
  349. }
  350. }
  351. mutating func refreshPickerAndAggregateState() -> ConnectivityState? {
  352. let ready = self.subchannels.values.compactMap { $0.state == .ready ? $0.subchannel : nil }
  353. self.picker = Picker(subchannels: ready)
  354. let aggregate = ConnectivityState.aggregate(self.subchannels.values.map { $0.state })
  355. if aggregate == self.aggregateConnectivityState {
  356. return nil
  357. } else {
  358. self.aggregateConnectivityState = aggregate
  359. return aggregate
  360. }
  361. }
  362. mutating func pick() -> Subchannel? {
  363. self.picker?.pick()
  364. }
  365. mutating func markForRemoval(
  366. _ keys: some Sequence<EndpointKey>,
  367. numberToRemoveNow: Int
  368. ) -> [Subchannel] {
  369. var numberToRemoveNow = numberToRemoveNow
  370. var keyIterator = keys.makeIterator()
  371. var subchannelsToClose = [Subchannel]()
  372. while numberToRemoveNow > 0, let key = keyIterator.next() {
  373. if let subchannelState = self.subchannels.removeValue(forKey: key) {
  374. numberToRemoveNow -= 1
  375. self.parkedSubchannels[key] = subchannelState.subchannel
  376. subchannelsToClose.append(subchannelState.subchannel)
  377. }
  378. }
  379. while let key = keyIterator.next() {
  380. self.subchannels[key]?.markForRemoval()
  381. }
  382. return subchannelsToClose
  383. }
  384. mutating func registerSubchannels(
  385. withKeys keys: some Sequence<EndpointKey>,
  386. _ makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  387. ) -> [Subchannel] {
  388. var subchannels = [Subchannel]()
  389. for key in keys {
  390. let subchannel = makeSubchannel(key.endpoint, SubchannelID())
  391. subchannels.append(subchannel)
  392. self.subchannels[key] = SubchannelState(subchannel: subchannel)
  393. }
  394. return subchannels
  395. }
  396. }
  397. struct Closing {
  398. enum Reason: Sendable, Hashable {
  399. case goAway
  400. case user
  401. }
  402. var reason: Reason
  403. var parkedSubchannels: [EndpointKey: Subchannel]
  404. mutating func updateConnectivityState(_ state: ConnectivityState, key: EndpointKey) -> Bool {
  405. switch state {
  406. case .idle, .connecting, .ready, .transientFailure:
  407. ()
  408. case .shutdown:
  409. self.parkedSubchannels.removeValue(forKey: key)
  410. }
  411. return self.parkedSubchannels.isEmpty
  412. }
  413. }
  414. struct SubchannelState {
  415. var subchannel: Subchannel
  416. var state: ConnectivityState
  417. var markedForRemoval: Bool
  418. init(subchannel: Subchannel) {
  419. self.subchannel = subchannel
  420. self.state = .idle
  421. self.markedForRemoval = false
  422. }
  423. mutating func updateState(_ newState: ConnectivityState) -> Bool {
  424. // The transition from transient failure to connecting is ignored.
  425. //
  426. // See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  427. if self.state == .transientFailure, newState == .connecting {
  428. return false
  429. }
  430. let oldState = self.state
  431. self.state = newState
  432. return oldState != newState
  433. }
  434. mutating func markForRemoval() {
  435. self.markedForRemoval = true
  436. }
  437. }
  438. struct Picker {
  439. private var subchannels: [Subchannel]
  440. private var index: Int
  441. init?(subchannels: [Subchannel]) {
  442. if subchannels.isEmpty { return nil }
  443. self.subchannels = subchannels
  444. self.index = (0 ..< subchannels.count).randomElement()!
  445. }
  446. mutating func pick() -> Subchannel {
  447. defer {
  448. self.index = (self.index + 1) % self.subchannels.count
  449. }
  450. return self.subchannels[self.index]
  451. }
  452. }
  453. mutating func updateSubchannels(
  454. newEndpoints: Set<EndpointKey>,
  455. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  456. ) -> (run: [Subchannel], close: [Subchannel], newState: ConnectivityState?) {
  457. switch self {
  458. case .active(var state):
  459. let existingEndpoints = Set(state.subchannels.keys)
  460. let keysToAdd = newEndpoints.subtracting(existingEndpoints)
  461. let keysToRemove = existingEndpoints.subtracting(newEndpoints)
  462. if keysToRemove.isEmpty && keysToAdd.isEmpty {
  463. // Nothing to do.
  464. return (run: [], close: [], newState: nil)
  465. }
  466. // The load balancer should keep subchannels to remove in service until new subchannels
  467. // can replace each of them so that requests can continue to be served.
  468. //
  469. // If there are more keys to remove than to add, remove some now.
  470. let numberToRemoveNow = max(keysToRemove.count - keysToAdd.count, 0)
  471. let removed = state.markForRemoval(keysToRemove, numberToRemoveNow: numberToRemoveNow)
  472. let added = state.registerSubchannels(withKeys: keysToAdd, makeSubchannel)
  473. let newState = state.refreshPickerAndAggregateState()
  474. self = .active(state)
  475. return (run: added, close: removed, newState: newState)
  476. case .closing, .closed:
  477. // Nothing to do.
  478. return (run: [], close: [], newState: nil)
  479. }
  480. }
  481. enum OnParkChannel {
  482. case closeAndUpdateState(Subchannel, ConnectivityState?)
  483. case none
  484. }
  485. mutating func parkSubchannel(withKey key: EndpointKey) -> OnParkChannel {
  486. switch self {
  487. case .active(var state):
  488. guard let subchannelState = state.subchannels.removeValue(forKey: key) else {
  489. return .none
  490. }
  491. // Parking the subchannel may invalidate the picker and the aggregate state, refresh both.
  492. state.parkedSubchannels[key] = subchannelState.subchannel
  493. let newState = state.refreshPickerAndAggregateState()
  494. self = .active(state)
  495. return .closeAndUpdateState(subchannelState.subchannel, newState)
  496. case .closing, .closed:
  497. return .none
  498. }
  499. }
  500. mutating func registerSubchannels(
  501. withKeys keys: some Sequence<EndpointKey>,
  502. _ makeSubchannel: (Endpoint) -> Subchannel
  503. ) -> [Subchannel] {
  504. switch self {
  505. case .active(var state):
  506. var subchannels = [Subchannel]()
  507. for key in keys {
  508. let subchannel = makeSubchannel(key.endpoint)
  509. subchannels.append(subchannel)
  510. state.subchannels[key] = SubchannelState(subchannel: subchannel)
  511. }
  512. self = .active(state)
  513. return subchannels
  514. case .closing, .closed:
  515. return []
  516. }
  517. }
  518. enum OnSubchannelConnectivityStateUpdate {
  519. case closeAndPublishStateChange(Subchannel, ConnectivityState)
  520. case publishStateChange(ConnectivityState)
  521. case close(Subchannel)
  522. case closed
  523. case none
  524. }
  525. mutating func updateSubchannelConnectivityState(
  526. _ connectivityState: ConnectivityState,
  527. key: EndpointKey
  528. ) -> OnSubchannelConnectivityStateUpdate {
  529. switch self {
  530. case .active(var state):
  531. let result = state.updateConnectivityState(connectivityState, key: key)
  532. self = .active(state)
  533. return result
  534. case .closing(var state):
  535. if state.updateConnectivityState(connectivityState, key: key) {
  536. self = .closed
  537. return .closed
  538. } else {
  539. self = .closing(state)
  540. return .none
  541. }
  542. case .closed:
  543. return .none
  544. }
  545. }
  546. enum OnClose {
  547. case closeSubchannels([Subchannel])
  548. case closed
  549. case none
  550. }
  551. mutating func close() -> OnClose {
  552. switch self {
  553. case .active(var active):
  554. var subchannelsToClose = [Subchannel]()
  555. for (id, subchannelState) in active.subchannels {
  556. subchannelsToClose.append(subchannelState.subchannel)
  557. active.parkedSubchannels[id] = subchannelState.subchannel
  558. }
  559. if subchannelsToClose.isEmpty {
  560. self = .closed
  561. return .closed
  562. } else {
  563. self = .closing(Closing(reason: .user, parkedSubchannels: active.parkedSubchannels))
  564. return .closeSubchannels(subchannelsToClose)
  565. }
  566. case .closing, .closed:
  567. return .none
  568. }
  569. }
  570. enum OnPickSubchannel {
  571. case picked(Subchannel)
  572. case notAvailable([Subchannel])
  573. }
  574. mutating func pickSubchannel() -> OnPickSubchannel {
  575. let onMakeStream: OnPickSubchannel
  576. switch self {
  577. case .active(var active):
  578. if let subchannel = active.pick() {
  579. onMakeStream = .picked(subchannel)
  580. } else {
  581. switch active.aggregateConnectivityState {
  582. case .idle:
  583. onMakeStream = .notAvailable(active.subchannels.values.map { $0.subchannel })
  584. case .connecting, .ready, .transientFailure, .shutdown:
  585. onMakeStream = .notAvailable([])
  586. }
  587. }
  588. self = .active(active)
  589. case .closing, .closed:
  590. onMakeStream = .notAvailable([])
  591. }
  592. return onMakeStream
  593. }
  594. }
  595. }
  596. extension ConnectivityState {
  597. static func aggregate(_ states: some Collection<ConnectivityState>) -> ConnectivityState {
  598. // See https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  599. // If any one subchannel is in READY state, the channel's state is READY.
  600. if states.contains(where: { $0 == .ready }) {
  601. return .ready
  602. }
  603. // Otherwise, if there is any subchannel in state CONNECTING, the channel's state is CONNECTING.
  604. if states.contains(where: { $0 == .connecting }) {
  605. return .connecting
  606. }
  607. // Otherwise, if there is any subchannel in state IDLE, the channel's state is IDLE.
  608. if states.contains(where: { $0 == .idle }) {
  609. return .idle
  610. }
  611. // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
  612. // is TRANSIENT_FAILURE.
  613. if states.allSatisfy({ $0 == .transientFailure }) {
  614. return .transientFailure
  615. }
  616. return .shutdown
  617. }
  618. }