RoundRobinLoadBalancer.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. /// A load-balancer which maintains to a set of subchannels and uses round-robin to pick a
  18. /// subchannel when picking a subchannel to use.
  19. ///
  20. /// This load-balancer starts in an 'idle' state and begins connecting when a set of addresses is
  21. /// provided to it with ``updateAddresses(_:)``. Repeated calls to ``updateAddresses(_:)`` will
  22. /// update the subchannels gracefully: new subchannels will be added for new addresses and existing
  23. /// subchannels will be removed if their addresses are no longer present.
  24. ///
  25. /// The state of the load-balancer is aggregated across the state of its subchannels, changes in
  26. /// the aggregate state are reported up via ``events``.
  27. ///
  28. /// You must call ``close()`` on the load-balancer when it's no longer required. This will move
  29. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  30. /// calls to ``makeStream(descriptor:options:)`` will fail.
  31. ///
  32. /// To use this load-balancer you must run it in a task:
  33. ///
  34. /// ```swift
  35. /// await withDiscardingTaskGroup { group in
  36. /// // Run the load-balancer
  37. /// group.addTask { await roundRobin.run() }
  38. ///
  39. /// // Update its address list
  40. /// let endpoints: [Endpoint] = [
  41. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1001)]),
  42. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1002)]),
  43. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1003)])
  44. /// ]
  45. /// roundRobin.updateAddresses(endpoints)
  46. ///
  47. /// // Consume state update events
  48. /// for await event in roundRobin.events {
  49. /// switch event {
  50. /// case .connectivityStateChanged(.ready):
  51. /// // ...
  52. /// default:
  53. /// // ...
  54. /// }
  55. /// }
  56. /// }
  57. /// ```
  58. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  59. struct RoundRobinLoadBalancer {
  60. enum Input: Sendable, Hashable {
  61. /// Update the addresses used by the load balancer to the following endpoints.
  62. case updateAddresses([Endpoint])
  63. /// Close the load balancer.
  64. case close
  65. }
  66. /// A key for an endpoint which identifies it uniquely, regardless of the ordering of addresses.
  67. private struct EndpointKey: Hashable, Sendable, CustomStringConvertible {
  68. /// Opaque data.
  69. private let opaque: [String]
  70. /// The endpoint this key is for.
  71. let endpoint: Endpoint
  72. init(_ endpoint: Endpoint) {
  73. self.endpoint = endpoint
  74. self.opaque = endpoint.addresses.map { String(describing: $0) }.sorted()
  75. }
  76. var description: String {
  77. String(describing: self.endpoint.addresses)
  78. }
  79. func hash(into hasher: inout Hasher) {
  80. hasher.combine(self.opaque)
  81. }
  82. static func == (lhs: Self, rhs: Self) -> Bool {
  83. lhs.opaque == rhs.opaque
  84. }
  85. }
  86. /// Events which can happen to the load balancer.
  87. private let event:
  88. (
  89. stream: AsyncStream<LoadBalancerEvent>,
  90. continuation: AsyncStream<LoadBalancerEvent>.Continuation
  91. )
  92. /// Inputs which this load balancer should react to.
  93. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  94. /// The state of the load balancer.
  95. private let state: _LockedValueBox<State>
  96. /// A connector, capable of creating connections.
  97. private let connector: any HTTP2Connector
  98. /// Connection backoff configuration.
  99. private let backoff: ConnectionBackoff
  100. /// The default compression algorithm to use. Can be overridden on a per-call basis.
  101. private let defaultCompression: CompressionAlgorithm
  102. /// The set of enabled compression algorithms.
  103. private let enabledCompression: CompressionAlgorithmSet
  104. /// The ID of this load balancer.
  105. internal let id: LoadBalancerID
  106. init(
  107. connector: any HTTP2Connector,
  108. backoff: ConnectionBackoff,
  109. defaultCompression: CompressionAlgorithm,
  110. enabledCompression: CompressionAlgorithmSet
  111. ) {
  112. self.connector = connector
  113. self.backoff = backoff
  114. self.defaultCompression = defaultCompression
  115. self.enabledCompression = enabledCompression
  116. self.id = LoadBalancerID()
  117. self.event = AsyncStream.makeStream(of: LoadBalancerEvent.self)
  118. self.input = AsyncStream.makeStream(of: Input.self)
  119. self.state = _LockedValueBox(.active(State.Active()))
  120. // The load balancer starts in the idle state.
  121. self.event.continuation.yield(.connectivityStateChanged(.idle))
  122. }
  123. /// A stream of events which can happen to the load balancer.
  124. var events: AsyncStream<LoadBalancerEvent> {
  125. self.event.stream
  126. }
  127. /// Runs the load balancer, returning when it has closed.
  128. ///
  129. /// You can monitor events which happen on the load balancer with ``events``.
  130. func run() async {
  131. await withDiscardingTaskGroup { group in
  132. for await input in self.input.stream {
  133. switch input {
  134. case .updateAddresses(let addresses):
  135. self.handleUpdateAddresses(addresses, in: &group)
  136. case .close:
  137. self.handleCloseInput()
  138. }
  139. }
  140. }
  141. if Task.isCancelled {
  142. // Finish the event stream as it's unlikely to have been finished by a regular code path.
  143. self.event.continuation.finish()
  144. }
  145. }
  146. /// Update the addresses used by the load balancer.
  147. ///
  148. /// This may result in new subchannels being created and some subchannels being removed.
  149. func updateAddresses(_ endpoints: [Endpoint]) {
  150. self.input.continuation.yield(.updateAddresses(endpoints))
  151. }
  152. /// Close the load balancer, and all subchannels it manages.
  153. func close() {
  154. self.input.continuation.yield(.close)
  155. }
  156. /// Pick a ready subchannel from the load balancer.
  157. ///
  158. /// - Returns: A subchannel, or `nil` if there aren't any ready subchannels.
  159. func pickSubchannel() -> Subchannel? {
  160. switch self.state.withLockedValue({ $0.pickSubchannel() }) {
  161. case .picked(let subchannel):
  162. return subchannel
  163. case .notAvailable(let subchannels):
  164. // Tell the subchannels to start connecting.
  165. for subchannel in subchannels {
  166. subchannel.connect()
  167. }
  168. return nil
  169. }
  170. }
  171. }
  172. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  173. extension RoundRobinLoadBalancer {
  174. /// Handles an update in endpoints.
  175. ///
  176. /// The load-balancer will diff the set of endpoints with the existing set of endpoints:
  177. /// - endpoints which are new will have subchannels created for them,
  178. /// - endpoints which existed previously but are not present in `endpoints` are closed,
  179. /// - endpoints which existed previously and are still present in `endpoints` are untouched.
  180. ///
  181. /// This process is gradual: the load-balancer won't remove an old endpoint until a subchannel
  182. /// for a corresponding new subchannel becomes ready.
  183. ///
  184. /// - Parameters:
  185. /// - endpoints: Endpoints which should have subchannels. Must not be empty.
  186. /// - group: The group which should manage and run new subchannels.
  187. private func handleUpdateAddresses(_ endpoints: [Endpoint], in group: inout DiscardingTaskGroup) {
  188. if endpoints.isEmpty { return }
  189. // Compute the keys for each endpoint.
  190. let newEndpoints = Set(endpoints.map { EndpointKey($0) })
  191. let (added, removed, newState) = self.state.withLockedValue { state in
  192. state.updateSubchannels(newEndpoints: newEndpoints) { endpoint, id in
  193. Subchannel(
  194. endpoint: endpoint,
  195. id: id,
  196. connector: self.connector,
  197. backoff: self.backoff,
  198. defaultCompression: self.defaultCompression,
  199. enabledCompression: self.enabledCompression
  200. )
  201. }
  202. }
  203. // Publish the new connectivity state.
  204. if let newState = newState {
  205. self.event.continuation.yield(.connectivityStateChanged(newState))
  206. }
  207. // Run each of the new subchannels.
  208. for subchannel in added {
  209. let key = EndpointKey(subchannel.endpoint)
  210. self.runSubchannel(subchannel, forKey: key, in: &group)
  211. }
  212. // Old subchannels are removed when new subchannels become ready. Excess subchannels are only
  213. // present if there are more to remove than to add. These are the excess subchannels which
  214. // are closed now.
  215. for subchannel in removed {
  216. subchannel.close()
  217. }
  218. }
  219. private func runSubchannel(
  220. _ subchannel: Subchannel,
  221. forKey key: EndpointKey,
  222. in group: inout DiscardingTaskGroup
  223. ) {
  224. // Start running it and tell it to connect.
  225. subchannel.connect()
  226. group.addTask {
  227. await subchannel.run()
  228. }
  229. group.addTask {
  230. for await event in subchannel.events {
  231. switch event {
  232. case .connectivityStateChanged(let state):
  233. self.handleSubchannelConnectivityStateChange(state, key: key)
  234. case .goingAway:
  235. self.handleSubchannelGoingAway(key: key)
  236. case .requiresNameResolution:
  237. self.event.continuation.yield(.requiresNameResolution)
  238. }
  239. }
  240. }
  241. }
  242. private func handleSubchannelConnectivityStateChange(
  243. _ connectivityState: ConnectivityState,
  244. key: EndpointKey
  245. ) {
  246. let onChange = self.state.withLockedValue { state in
  247. state.updateSubchannelConnectivityState(connectivityState, key: key)
  248. }
  249. switch onChange {
  250. case .publishStateChange(let aggregateState):
  251. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  252. case .closeAndPublishStateChange(let subchannel, let aggregateState):
  253. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  254. subchannel.close()
  255. case .close(let subchannel):
  256. subchannel.close()
  257. case .closed:
  258. // All subchannels are closed; finish the streams so the run loop exits.
  259. self.event.continuation.finish()
  260. self.input.continuation.finish()
  261. case .none:
  262. ()
  263. }
  264. }
  265. private func handleSubchannelGoingAway(key: EndpointKey) {
  266. switch self.state.withLockedValue({ $0.parkSubchannel(withKey: key) }) {
  267. case .closeAndUpdateState(_, let connectivityState):
  268. // No need to close the subchannel, it's already going away and will close itself.
  269. if let connectivityState = connectivityState {
  270. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  271. }
  272. case .none:
  273. ()
  274. }
  275. }
  276. private func handleCloseInput() {
  277. switch self.state.withLockedValue({ $0.close() }) {
  278. case .closeSubchannels(let subchannels):
  279. // Publish a new shutdown state, this LB is no longer usable for new RPCs.
  280. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  281. // Close the subchannels.
  282. for subchannel in subchannels {
  283. subchannel.close()
  284. }
  285. case .closed:
  286. // No subchannels to close.
  287. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  288. self.event.continuation.finish()
  289. self.input.continuation.finish()
  290. case .none:
  291. ()
  292. }
  293. }
  294. }
  295. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  296. extension RoundRobinLoadBalancer {
  297. private enum State {
  298. case active(Active)
  299. case closing(Closing)
  300. case closed
  301. struct Active {
  302. private(set) var aggregateConnectivityState: ConnectivityState
  303. private var picker: Picker?
  304. var endpoints: [Endpoint]
  305. var subchannels: [EndpointKey: SubchannelState]
  306. var parkedSubchannels: [EndpointKey: Subchannel]
  307. init() {
  308. self.endpoints = []
  309. self.subchannels = [:]
  310. self.parkedSubchannels = [:]
  311. self.aggregateConnectivityState = .idle
  312. self.picker = nil
  313. }
  314. mutating func updateConnectivityState(
  315. _ state: ConnectivityState,
  316. key: EndpointKey
  317. ) -> OnSubchannelConnectivityStateUpdate {
  318. if let changed = self.subchannels[key]?.updateState(state) {
  319. guard changed else { return .none }
  320. let subchannelToClose: Subchannel?
  321. switch state {
  322. case .ready:
  323. if let index = self.subchannels.firstIndex(where: { $0.value.markedForRemoval }) {
  324. let (key, subchannelState) = self.subchannels.remove(at: index)
  325. self.parkedSubchannels[key] = subchannelState.subchannel
  326. subchannelToClose = subchannelState.subchannel
  327. } else {
  328. subchannelToClose = nil
  329. }
  330. case .idle, .connecting, .transientFailure, .shutdown:
  331. subchannelToClose = nil
  332. }
  333. let aggregateState = self.refreshPickerAndAggregateState()
  334. switch (subchannelToClose, aggregateState) {
  335. case (.some(let subchannel), .some(let state)):
  336. return .closeAndPublishStateChange(subchannel, state)
  337. case (.some(let subchannel), .none):
  338. return .close(subchannel)
  339. case (.none, .some(let state)):
  340. return .publishStateChange(state)
  341. case (.none, .none):
  342. return .none
  343. }
  344. } else {
  345. switch state {
  346. case .idle, .connecting, .ready, .transientFailure:
  347. ()
  348. case .shutdown:
  349. self.parkedSubchannels.removeValue(forKey: key)
  350. }
  351. return .none
  352. }
  353. }
  354. mutating func refreshPickerAndAggregateState() -> ConnectivityState? {
  355. let ready = self.subchannels.values.compactMap { $0.state == .ready ? $0.subchannel : nil }
  356. self.picker = Picker(subchannels: ready)
  357. let aggregate = ConnectivityState.aggregate(self.subchannels.values.map { $0.state })
  358. if aggregate == self.aggregateConnectivityState {
  359. return nil
  360. } else {
  361. self.aggregateConnectivityState = aggregate
  362. return aggregate
  363. }
  364. }
  365. mutating func pick() -> Subchannel? {
  366. self.picker?.pick()
  367. }
  368. mutating func markForRemoval(
  369. _ keys: some Sequence<EndpointKey>,
  370. numberToRemoveNow: Int
  371. ) -> [Subchannel] {
  372. var numberToRemoveNow = numberToRemoveNow
  373. var keyIterator = keys.makeIterator()
  374. var subchannelsToClose = [Subchannel]()
  375. while numberToRemoveNow > 0, let key = keyIterator.next() {
  376. if let subchannelState = self.subchannels.removeValue(forKey: key) {
  377. numberToRemoveNow -= 1
  378. self.parkedSubchannels[key] = subchannelState.subchannel
  379. subchannelsToClose.append(subchannelState.subchannel)
  380. }
  381. }
  382. while let key = keyIterator.next() {
  383. self.subchannels[key]?.markForRemoval()
  384. }
  385. return subchannelsToClose
  386. }
  387. mutating func registerSubchannels(
  388. withKeys keys: some Sequence<EndpointKey>,
  389. _ makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  390. ) -> [Subchannel] {
  391. var subchannels = [Subchannel]()
  392. for key in keys {
  393. let subchannel = makeSubchannel(key.endpoint, SubchannelID())
  394. subchannels.append(subchannel)
  395. self.subchannels[key] = SubchannelState(subchannel: subchannel)
  396. }
  397. return subchannels
  398. }
  399. }
  400. struct Closing {
  401. enum Reason: Sendable, Hashable {
  402. case goAway
  403. case user
  404. }
  405. var reason: Reason
  406. var parkedSubchannels: [EndpointKey: Subchannel]
  407. mutating func updateConnectivityState(_ state: ConnectivityState, key: EndpointKey) -> Bool {
  408. switch state {
  409. case .idle, .connecting, .ready, .transientFailure:
  410. ()
  411. case .shutdown:
  412. self.parkedSubchannels.removeValue(forKey: key)
  413. }
  414. return self.parkedSubchannels.isEmpty
  415. }
  416. }
  417. struct SubchannelState {
  418. var subchannel: Subchannel
  419. var state: ConnectivityState
  420. var markedForRemoval: Bool
  421. init(subchannel: Subchannel) {
  422. self.subchannel = subchannel
  423. self.state = .idle
  424. self.markedForRemoval = false
  425. }
  426. mutating func updateState(_ newState: ConnectivityState) -> Bool {
  427. // The transition from transient failure to connecting is ignored.
  428. //
  429. // See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  430. if self.state == .transientFailure, newState == .connecting {
  431. return false
  432. }
  433. let oldState = self.state
  434. self.state = newState
  435. return oldState != newState
  436. }
  437. mutating func markForRemoval() {
  438. self.markedForRemoval = true
  439. }
  440. }
  441. struct Picker {
  442. private var subchannels: [Subchannel]
  443. private var index: Int
  444. init?(subchannels: [Subchannel]) {
  445. if subchannels.isEmpty { return nil }
  446. self.subchannels = subchannels
  447. self.index = (0 ..< subchannels.count).randomElement()!
  448. }
  449. mutating func pick() -> Subchannel {
  450. defer {
  451. self.index = (self.index + 1) % self.subchannels.count
  452. }
  453. return self.subchannels[self.index]
  454. }
  455. }
  456. mutating func updateSubchannels(
  457. newEndpoints: Set<EndpointKey>,
  458. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  459. ) -> (run: [Subchannel], close: [Subchannel], newState: ConnectivityState?) {
  460. switch self {
  461. case .active(var state):
  462. let existingEndpoints = Set(state.subchannels.keys)
  463. let keysToAdd = newEndpoints.subtracting(existingEndpoints)
  464. let keysToRemove = existingEndpoints.subtracting(newEndpoints)
  465. if keysToRemove.isEmpty && keysToAdd.isEmpty {
  466. // Nothing to do.
  467. return (run: [], close: [], newState: nil)
  468. }
  469. // The load balancer should keep subchannels to remove in service until new subchannels
  470. // can replace each of them so that requests can continue to be served.
  471. //
  472. // If there are more keys to remove than to add, remove some now.
  473. let numberToRemoveNow = max(keysToRemove.count - keysToAdd.count, 0)
  474. let removed = state.markForRemoval(keysToRemove, numberToRemoveNow: numberToRemoveNow)
  475. let added = state.registerSubchannels(withKeys: keysToAdd, makeSubchannel)
  476. let newState = state.refreshPickerAndAggregateState()
  477. self = .active(state)
  478. return (run: added, close: removed, newState: newState)
  479. case .closing, .closed:
  480. // Nothing to do.
  481. return (run: [], close: [], newState: nil)
  482. }
  483. }
  484. enum OnParkChannel {
  485. case closeAndUpdateState(Subchannel, ConnectivityState?)
  486. case none
  487. }
  488. mutating func parkSubchannel(withKey key: EndpointKey) -> OnParkChannel {
  489. switch self {
  490. case .active(var state):
  491. guard let subchannelState = state.subchannels.removeValue(forKey: key) else {
  492. return .none
  493. }
  494. // Parking the subchannel may invalidate the picker and the aggregate state, refresh both.
  495. state.parkedSubchannels[key] = subchannelState.subchannel
  496. let newState = state.refreshPickerAndAggregateState()
  497. self = .active(state)
  498. return .closeAndUpdateState(subchannelState.subchannel, newState)
  499. case .closing, .closed:
  500. return .none
  501. }
  502. }
  503. mutating func registerSubchannels(
  504. withKeys keys: some Sequence<EndpointKey>,
  505. _ makeSubchannel: (Endpoint) -> Subchannel
  506. ) -> [Subchannel] {
  507. switch self {
  508. case .active(var state):
  509. var subchannels = [Subchannel]()
  510. for key in keys {
  511. let subchannel = makeSubchannel(key.endpoint)
  512. subchannels.append(subchannel)
  513. state.subchannels[key] = SubchannelState(subchannel: subchannel)
  514. }
  515. self = .active(state)
  516. return subchannels
  517. case .closing, .closed:
  518. return []
  519. }
  520. }
  521. enum OnSubchannelConnectivityStateUpdate {
  522. case closeAndPublishStateChange(Subchannel, ConnectivityState)
  523. case publishStateChange(ConnectivityState)
  524. case close(Subchannel)
  525. case closed
  526. case none
  527. }
  528. mutating func updateSubchannelConnectivityState(
  529. _ connectivityState: ConnectivityState,
  530. key: EndpointKey
  531. ) -> OnSubchannelConnectivityStateUpdate {
  532. switch self {
  533. case .active(var state):
  534. let result = state.updateConnectivityState(connectivityState, key: key)
  535. self = .active(state)
  536. return result
  537. case .closing(var state):
  538. if state.updateConnectivityState(connectivityState, key: key) {
  539. self = .closed
  540. return .closed
  541. } else {
  542. self = .closing(state)
  543. return .none
  544. }
  545. case .closed:
  546. return .none
  547. }
  548. }
  549. enum OnClose {
  550. case closeSubchannels([Subchannel])
  551. case closed
  552. case none
  553. }
  554. mutating func close() -> OnClose {
  555. switch self {
  556. case .active(var active):
  557. var subchannelsToClose = [Subchannel]()
  558. for (id, subchannelState) in active.subchannels {
  559. subchannelsToClose.append(subchannelState.subchannel)
  560. active.parkedSubchannels[id] = subchannelState.subchannel
  561. }
  562. if subchannelsToClose.isEmpty {
  563. self = .closed
  564. return .closed
  565. } else {
  566. self = .closing(Closing(reason: .user, parkedSubchannels: active.parkedSubchannels))
  567. return .closeSubchannels(subchannelsToClose)
  568. }
  569. case .closing, .closed:
  570. return .none
  571. }
  572. }
  573. enum OnPickSubchannel {
  574. case picked(Subchannel)
  575. case notAvailable([Subchannel])
  576. }
  577. mutating func pickSubchannel() -> OnPickSubchannel {
  578. let onMakeStream: OnPickSubchannel
  579. switch self {
  580. case .active(var active):
  581. if let subchannel = active.pick() {
  582. onMakeStream = .picked(subchannel)
  583. } else {
  584. switch active.aggregateConnectivityState {
  585. case .idle:
  586. onMakeStream = .notAvailable(active.subchannels.values.map { $0.subchannel })
  587. case .connecting, .ready, .transientFailure, .shutdown:
  588. onMakeStream = .notAvailable([])
  589. }
  590. }
  591. self = .active(active)
  592. case .closing, .closed:
  593. onMakeStream = .notAvailable([])
  594. }
  595. return onMakeStream
  596. }
  597. }
  598. }
  599. extension ConnectivityState {
  600. static func aggregate(_ states: some Collection<ConnectivityState>) -> ConnectivityState {
  601. // See https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  602. // If any one subchannel is in READY state, the channel's state is READY.
  603. if states.contains(where: { $0 == .ready }) {
  604. return .ready
  605. }
  606. // Otherwise, if there is any subchannel in state CONNECTING, the channel's state is CONNECTING.
  607. if states.contains(where: { $0 == .connecting }) {
  608. return .connecting
  609. }
  610. // Otherwise, if there is any subchannel in state IDLE, the channel's state is IDLE.
  611. if states.contains(where: { $0 == .idle }) {
  612. return .idle
  613. }
  614. // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
  615. // is TRANSIENT_FAILURE.
  616. if states.allSatisfy({ $0 == .transientFailure }) {
  617. return .transientFailure
  618. }
  619. return .shutdown
  620. }
  621. }