RoundRobinLoadBalancer.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package import GRPCCore
  17. private import NIOConcurrencyHelpers
  18. /// A load-balancer which maintains to a set of subchannels and uses round-robin to pick a
  19. /// subchannel when picking a subchannel to use.
  20. ///
  21. /// This load-balancer starts in an 'idle' state and begins connecting when a set of addresses is
  22. /// provided to it with ``updateAddresses(_:)``. Repeated calls to ``updateAddresses(_:)`` will
  23. /// update the subchannels gracefully: new subchannels will be added for new addresses and existing
  24. /// subchannels will be removed if their addresses are no longer present.
  25. ///
  26. /// The state of the load-balancer is aggregated across the state of its subchannels, changes in
  27. /// the aggregate state are reported up via ``events``.
  28. ///
  29. /// You must call ``close()`` on the load-balancer when it's no longer required. This will move
  30. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  31. /// calls to ``makeStream(descriptor:options:)`` will fail.
  32. ///
  33. /// To use this load-balancer you must run it in a task:
  34. ///
  35. /// ```swift
  36. /// await withDiscardingTaskGroup { group in
  37. /// // Run the load-balancer
  38. /// group.addTask { await roundRobin.run() }
  39. ///
  40. /// // Update its address list
  41. /// let endpoints: [Endpoint] = [
  42. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1001)]),
  43. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1002)]),
  44. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1003)])
  45. /// ]
  46. /// roundRobin.updateAddresses(endpoints)
  47. ///
  48. /// // Consume state update events
  49. /// for await event in roundRobin.events {
  50. /// switch event {
  51. /// case .connectivityStateChanged(.ready):
  52. /// // ...
  53. /// default:
  54. /// // ...
  55. /// }
  56. /// }
  57. /// }
  58. /// ```
  59. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  60. package final class RoundRobinLoadBalancer: Sendable {
  61. enum Input: Sendable, Hashable {
  62. /// Update the addresses used by the load balancer to the following endpoints.
  63. case updateAddresses([Endpoint])
  64. /// Close the load balancer.
  65. case close
  66. }
  67. /// A key for an endpoint which identifies it uniquely, regardless of the ordering of addresses.
  68. private struct EndpointKey: Hashable, Sendable, CustomStringConvertible {
  69. /// Opaque data.
  70. private let opaque: [String]
  71. /// The endpoint this key is for.
  72. let endpoint: Endpoint
  73. init(_ endpoint: Endpoint) {
  74. self.endpoint = endpoint
  75. self.opaque = endpoint.addresses.map { String(describing: $0) }.sorted()
  76. }
  77. var description: String {
  78. String(describing: self.endpoint.addresses)
  79. }
  80. func hash(into hasher: inout Hasher) {
  81. hasher.combine(self.opaque)
  82. }
  83. static func == (lhs: Self, rhs: Self) -> Bool {
  84. lhs.opaque == rhs.opaque
  85. }
  86. }
  87. /// Events which can happen to the load balancer.
  88. private let event:
  89. (
  90. stream: AsyncStream<LoadBalancerEvent>,
  91. continuation: AsyncStream<LoadBalancerEvent>.Continuation
  92. )
  93. /// Inputs which this load balancer should react to.
  94. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  95. // Uses NIOLockedValueBox to workaround: https://github.com/swiftlang/swift/issues/76007
  96. /// The state of the load balancer.
  97. private let state: NIOLockedValueBox<State>
  98. /// A connector, capable of creating connections.
  99. private let connector: any HTTP2Connector
  100. /// Connection backoff configuration.
  101. private let backoff: ConnectionBackoff
  102. /// The default compression algorithm to use. Can be overridden on a per-call basis.
  103. private let defaultCompression: CompressionAlgorithm
  104. /// The set of enabled compression algorithms.
  105. private let enabledCompression: CompressionAlgorithmSet
  106. /// The ID of this load balancer.
  107. internal let id: LoadBalancerID
  108. package init(
  109. connector: any HTTP2Connector,
  110. backoff: ConnectionBackoff,
  111. defaultCompression: CompressionAlgorithm,
  112. enabledCompression: CompressionAlgorithmSet
  113. ) {
  114. self.connector = connector
  115. self.backoff = backoff
  116. self.defaultCompression = defaultCompression
  117. self.enabledCompression = enabledCompression
  118. self.id = LoadBalancerID()
  119. self.event = AsyncStream.makeStream(of: LoadBalancerEvent.self)
  120. self.input = AsyncStream.makeStream(of: Input.self)
  121. self.state = NIOLockedValueBox(.active(State.Active()))
  122. // The load balancer starts in the idle state.
  123. self.event.continuation.yield(.connectivityStateChanged(.idle))
  124. }
  125. /// A stream of events which can happen to the load balancer.
  126. package var events: AsyncStream<LoadBalancerEvent> {
  127. self.event.stream
  128. }
  129. /// Runs the load balancer, returning when it has closed.
  130. ///
  131. /// You can monitor events which happen on the load balancer with ``events``.
  132. package func run() async {
  133. await withDiscardingTaskGroup { group in
  134. for await input in self.input.stream {
  135. switch input {
  136. case .updateAddresses(let addresses):
  137. self.handleUpdateAddresses(addresses, in: &group)
  138. case .close:
  139. self.handleCloseInput()
  140. }
  141. }
  142. }
  143. if Task.isCancelled {
  144. // Finish the event stream as it's unlikely to have been finished by a regular code path.
  145. self.event.continuation.finish()
  146. }
  147. }
  148. /// Update the addresses used by the load balancer.
  149. ///
  150. /// This may result in new subchannels being created and some subchannels being removed.
  151. package func updateAddresses(_ endpoints: [Endpoint]) {
  152. self.input.continuation.yield(.updateAddresses(endpoints))
  153. }
  154. /// Close the load balancer, and all subchannels it manages.
  155. package func close() {
  156. self.input.continuation.yield(.close)
  157. }
  158. /// Pick a ready subchannel from the load balancer.
  159. ///
  160. /// - Returns: A subchannel, or `nil` if there aren't any ready subchannels.
  161. package func pickSubchannel() -> Subchannel? {
  162. switch self.state.withLockedValue({ $0.pickSubchannel() }) {
  163. case .picked(let subchannel):
  164. return subchannel
  165. case .notAvailable(let subchannels):
  166. // Tell the subchannels to start connecting.
  167. for subchannel in subchannels {
  168. subchannel.connect()
  169. }
  170. return nil
  171. }
  172. }
  173. }
  174. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  175. extension RoundRobinLoadBalancer {
  176. /// Handles an update in endpoints.
  177. ///
  178. /// The load-balancer will diff the set of endpoints with the existing set of endpoints:
  179. /// - endpoints which are new will have subchannels created for them,
  180. /// - endpoints which existed previously but are not present in `endpoints` are closed,
  181. /// - endpoints which existed previously and are still present in `endpoints` are untouched.
  182. ///
  183. /// This process is gradual: the load-balancer won't remove an old endpoint until a subchannel
  184. /// for a corresponding new subchannel becomes ready.
  185. ///
  186. /// - Parameters:
  187. /// - endpoints: Endpoints which should have subchannels. Must not be empty.
  188. /// - group: The group which should manage and run new subchannels.
  189. private func handleUpdateAddresses(_ endpoints: [Endpoint], in group: inout DiscardingTaskGroup) {
  190. if endpoints.isEmpty { return }
  191. // Compute the keys for each endpoint.
  192. let newEndpoints = Set(endpoints.map { EndpointKey($0) })
  193. let (added, removed, newState) = self.state.withLockedValue { state in
  194. state.updateSubchannels(newEndpoints: newEndpoints) { endpoint, id in
  195. Subchannel(
  196. endpoint: endpoint,
  197. id: id,
  198. connector: self.connector,
  199. backoff: self.backoff,
  200. defaultCompression: self.defaultCompression,
  201. enabledCompression: self.enabledCompression
  202. )
  203. }
  204. }
  205. // Publish the new connectivity state.
  206. if let newState = newState {
  207. self.event.continuation.yield(.connectivityStateChanged(newState))
  208. }
  209. // Run each of the new subchannels.
  210. for subchannel in added {
  211. let key = EndpointKey(subchannel.endpoint)
  212. self.runSubchannel(subchannel, forKey: key, in: &group)
  213. }
  214. // Old subchannels are removed when new subchannels become ready. Excess subchannels are only
  215. // present if there are more to remove than to add. These are the excess subchannels which
  216. // are closed now.
  217. for subchannel in removed {
  218. subchannel.shutDown()
  219. }
  220. }
  221. private func runSubchannel(
  222. _ subchannel: Subchannel,
  223. forKey key: EndpointKey,
  224. in group: inout DiscardingTaskGroup
  225. ) {
  226. // Start running it and tell it to connect.
  227. subchannel.connect()
  228. group.addTask {
  229. await subchannel.run()
  230. }
  231. group.addTask {
  232. for await event in subchannel.events {
  233. switch event {
  234. case .connectivityStateChanged(let state):
  235. self.handleSubchannelConnectivityStateChange(state, key: key)
  236. case .goingAway:
  237. self.handleSubchannelGoingAway(key: key)
  238. case .requiresNameResolution:
  239. self.event.continuation.yield(.requiresNameResolution)
  240. }
  241. }
  242. }
  243. }
  244. private func handleSubchannelConnectivityStateChange(
  245. _ connectivityState: ConnectivityState,
  246. key: EndpointKey
  247. ) {
  248. let onChange = self.state.withLockedValue { state in
  249. state.updateSubchannelConnectivityState(connectivityState, key: key)
  250. }
  251. switch onChange {
  252. case .publishStateChange(let aggregateState):
  253. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  254. case .closeAndPublishStateChange(let subchannel, let aggregateState):
  255. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  256. subchannel.shutDown()
  257. case .close(let subchannel):
  258. subchannel.shutDown()
  259. case .closed:
  260. // All subchannels are closed; finish the streams so the run loop exits.
  261. self.event.continuation.finish()
  262. self.input.continuation.finish()
  263. case .none:
  264. ()
  265. }
  266. }
  267. private func handleSubchannelGoingAway(key: EndpointKey) {
  268. switch self.state.withLockedValue({ $0.parkSubchannel(withKey: key) }) {
  269. case .closeAndUpdateState(let subchannel, let connectivityState):
  270. subchannel.shutDown()
  271. if let connectivityState = connectivityState {
  272. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  273. }
  274. case .none:
  275. ()
  276. }
  277. }
  278. private func handleCloseInput() {
  279. switch self.state.withLockedValue({ $0.close() }) {
  280. case .closeSubchannels(let subchannels):
  281. // Publish a new shutdown state, this LB is no longer usable for new RPCs.
  282. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  283. // Close the subchannels.
  284. for subchannel in subchannels {
  285. subchannel.shutDown()
  286. }
  287. case .closed:
  288. // No subchannels to close.
  289. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  290. self.event.continuation.finish()
  291. self.input.continuation.finish()
  292. case .none:
  293. ()
  294. }
  295. }
  296. }
  297. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  298. extension RoundRobinLoadBalancer {
  299. private enum State {
  300. case active(Active)
  301. case closing(Closing)
  302. case closed
  303. struct Active {
  304. private(set) var aggregateConnectivityState: ConnectivityState
  305. private var picker: Picker?
  306. var endpoints: [Endpoint]
  307. var subchannels: [EndpointKey: SubchannelState]
  308. var parkedSubchannels: [EndpointKey: Subchannel]
  309. init() {
  310. self.endpoints = []
  311. self.subchannels = [:]
  312. self.parkedSubchannels = [:]
  313. self.aggregateConnectivityState = .idle
  314. self.picker = nil
  315. }
  316. mutating func updateConnectivityState(
  317. _ state: ConnectivityState,
  318. key: EndpointKey
  319. ) -> OnSubchannelConnectivityStateUpdate {
  320. if let changed = self.subchannels[key]?.updateState(state) {
  321. guard changed else { return .none }
  322. let subchannelToClose: Subchannel?
  323. switch state {
  324. case .ready:
  325. if let index = self.subchannels.firstIndex(where: { $0.value.markedForRemoval }) {
  326. let (key, subchannelState) = self.subchannels.remove(at: index)
  327. self.parkedSubchannels[key] = subchannelState.subchannel
  328. subchannelToClose = subchannelState.subchannel
  329. } else {
  330. subchannelToClose = nil
  331. }
  332. case .idle, .connecting, .transientFailure, .shutdown:
  333. subchannelToClose = nil
  334. }
  335. let aggregateState = self.refreshPickerAndAggregateState()
  336. switch (subchannelToClose, aggregateState) {
  337. case (.some(let subchannel), .some(let state)):
  338. return .closeAndPublishStateChange(subchannel, state)
  339. case (.some(let subchannel), .none):
  340. return .close(subchannel)
  341. case (.none, .some(let state)):
  342. return .publishStateChange(state)
  343. case (.none, .none):
  344. return .none
  345. }
  346. } else {
  347. switch state {
  348. case .idle:
  349. // The subchannel can be parked before it's shutdown. If there are no active RPCs then
  350. // it will enter the idle state instead. If that happens, close it.
  351. if let parked = self.parkedSubchannels[key] {
  352. return .close(parked)
  353. } else {
  354. return .none
  355. }
  356. case .shutdown:
  357. self.parkedSubchannels.removeValue(forKey: key)
  358. case .connecting, .ready, .transientFailure:
  359. ()
  360. }
  361. return .none
  362. }
  363. }
  364. mutating func refreshPickerAndAggregateState() -> ConnectivityState? {
  365. let ready = self.subchannels.values.compactMap { $0.state == .ready ? $0.subchannel : nil }
  366. self.picker = Picker(subchannels: ready)
  367. let aggregate = ConnectivityState.aggregate(self.subchannels.values.map { $0.state })
  368. if aggregate == self.aggregateConnectivityState {
  369. return nil
  370. } else {
  371. self.aggregateConnectivityState = aggregate
  372. return aggregate
  373. }
  374. }
  375. mutating func pick() -> Subchannel? {
  376. self.picker?.pick()
  377. }
  378. mutating func markForRemoval(
  379. _ keys: some Sequence<EndpointKey>,
  380. numberToRemoveNow: Int
  381. ) -> [Subchannel] {
  382. var numberToRemoveNow = numberToRemoveNow
  383. var keyIterator = keys.makeIterator()
  384. var subchannelsToClose = [Subchannel]()
  385. while numberToRemoveNow > 0, let key = keyIterator.next() {
  386. if let subchannelState = self.subchannels.removeValue(forKey: key) {
  387. numberToRemoveNow -= 1
  388. self.parkedSubchannels[key] = subchannelState.subchannel
  389. subchannelsToClose.append(subchannelState.subchannel)
  390. }
  391. }
  392. while let key = keyIterator.next() {
  393. self.subchannels[key]?.markForRemoval()
  394. }
  395. return subchannelsToClose
  396. }
  397. mutating func registerSubchannels(
  398. withKeys keys: some Sequence<EndpointKey>,
  399. _ makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  400. ) -> [Subchannel] {
  401. var subchannels = [Subchannel]()
  402. for key in keys {
  403. let subchannel = makeSubchannel(key.endpoint, SubchannelID())
  404. subchannels.append(subchannel)
  405. self.subchannels[key] = SubchannelState(subchannel: subchannel)
  406. }
  407. return subchannels
  408. }
  409. }
  410. struct Closing {
  411. enum Reason: Sendable, Hashable {
  412. case goAway
  413. case user
  414. }
  415. var reason: Reason
  416. var parkedSubchannels: [EndpointKey: Subchannel]
  417. mutating func updateConnectivityState(
  418. _ state: ConnectivityState,
  419. key: EndpointKey
  420. ) -> (OnSubchannelConnectivityStateUpdate, RoundRobinLoadBalancer.State) {
  421. let result: OnSubchannelConnectivityStateUpdate
  422. let nextState: RoundRobinLoadBalancer.State
  423. switch state {
  424. case .idle:
  425. if let parked = self.parkedSubchannels[key] {
  426. result = .close(parked)
  427. } else {
  428. result = .none
  429. }
  430. nextState = .closing(self)
  431. case .shutdown:
  432. self.parkedSubchannels.removeValue(forKey: key)
  433. if self.parkedSubchannels.isEmpty {
  434. nextState = .closed
  435. result = .closed
  436. } else {
  437. nextState = .closing(self)
  438. result = .none
  439. }
  440. case .connecting, .ready, .transientFailure:
  441. result = .none
  442. nextState = .closing(self)
  443. }
  444. return (result, nextState)
  445. }
  446. }
  447. struct SubchannelState {
  448. var subchannel: Subchannel
  449. var state: ConnectivityState
  450. var markedForRemoval: Bool
  451. init(subchannel: Subchannel) {
  452. self.subchannel = subchannel
  453. self.state = .idle
  454. self.markedForRemoval = false
  455. }
  456. mutating func updateState(_ newState: ConnectivityState) -> Bool {
  457. // The transition from transient failure to connecting is ignored.
  458. //
  459. // See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  460. if self.state == .transientFailure, newState == .connecting {
  461. return false
  462. }
  463. let oldState = self.state
  464. self.state = newState
  465. return oldState != newState
  466. }
  467. mutating func markForRemoval() {
  468. self.markedForRemoval = true
  469. }
  470. }
  471. struct Picker {
  472. private var subchannels: [Subchannel]
  473. private var index: Int
  474. init?(subchannels: [Subchannel]) {
  475. if subchannels.isEmpty { return nil }
  476. self.subchannels = subchannels
  477. self.index = (0 ..< subchannels.count).randomElement()!
  478. }
  479. mutating func pick() -> Subchannel {
  480. defer {
  481. self.index = (self.index + 1) % self.subchannels.count
  482. }
  483. return self.subchannels[self.index]
  484. }
  485. }
  486. mutating func updateSubchannels(
  487. newEndpoints: Set<EndpointKey>,
  488. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  489. ) -> (run: [Subchannel], close: [Subchannel], newState: ConnectivityState?) {
  490. switch self {
  491. case .active(var state):
  492. let existingEndpoints = Set(state.subchannels.keys)
  493. let keysToAdd = newEndpoints.subtracting(existingEndpoints)
  494. let keysToRemove = existingEndpoints.subtracting(newEndpoints)
  495. if keysToRemove.isEmpty && keysToAdd.isEmpty {
  496. // Nothing to do.
  497. return (run: [], close: [], newState: nil)
  498. }
  499. // The load balancer should keep subchannels to remove in service until new subchannels
  500. // can replace each of them so that requests can continue to be served.
  501. //
  502. // If there are more keys to remove than to add, remove some now.
  503. let numberToRemoveNow = max(keysToRemove.count - keysToAdd.count, 0)
  504. let removed = state.markForRemoval(keysToRemove, numberToRemoveNow: numberToRemoveNow)
  505. let added = state.registerSubchannels(withKeys: keysToAdd, makeSubchannel)
  506. let newState = state.refreshPickerAndAggregateState()
  507. self = .active(state)
  508. return (run: added, close: removed, newState: newState)
  509. case .closing, .closed:
  510. // Nothing to do.
  511. return (run: [], close: [], newState: nil)
  512. }
  513. }
  514. enum OnParkChannel {
  515. case closeAndUpdateState(Subchannel, ConnectivityState?)
  516. case none
  517. }
  518. mutating func parkSubchannel(withKey key: EndpointKey) -> OnParkChannel {
  519. switch self {
  520. case .active(var state):
  521. guard let subchannelState = state.subchannels.removeValue(forKey: key) else {
  522. return .none
  523. }
  524. // Parking the subchannel may invalidate the picker and the aggregate state, refresh both.
  525. state.parkedSubchannels[key] = subchannelState.subchannel
  526. let newState = state.refreshPickerAndAggregateState()
  527. self = .active(state)
  528. return .closeAndUpdateState(subchannelState.subchannel, newState)
  529. case .closing, .closed:
  530. return .none
  531. }
  532. }
  533. mutating func registerSubchannels(
  534. withKeys keys: some Sequence<EndpointKey>,
  535. _ makeSubchannel: (Endpoint) -> Subchannel
  536. ) -> [Subchannel] {
  537. switch self {
  538. case .active(var state):
  539. var subchannels = [Subchannel]()
  540. for key in keys {
  541. let subchannel = makeSubchannel(key.endpoint)
  542. subchannels.append(subchannel)
  543. state.subchannels[key] = SubchannelState(subchannel: subchannel)
  544. }
  545. self = .active(state)
  546. return subchannels
  547. case .closing, .closed:
  548. return []
  549. }
  550. }
  551. enum OnSubchannelConnectivityStateUpdate {
  552. case closeAndPublishStateChange(Subchannel, ConnectivityState)
  553. case publishStateChange(ConnectivityState)
  554. case close(Subchannel)
  555. case closed
  556. case none
  557. }
  558. mutating func updateSubchannelConnectivityState(
  559. _ connectivityState: ConnectivityState,
  560. key: EndpointKey
  561. ) -> OnSubchannelConnectivityStateUpdate {
  562. switch self {
  563. case .active(var state):
  564. let result = state.updateConnectivityState(connectivityState, key: key)
  565. self = .active(state)
  566. return result
  567. case .closing(var state):
  568. let (result, nextState) = state.updateConnectivityState(connectivityState, key: key)
  569. self = nextState
  570. return result
  571. case .closed:
  572. return .none
  573. }
  574. }
  575. enum OnClose {
  576. case closeSubchannels([Subchannel])
  577. case closed
  578. case none
  579. }
  580. mutating func close() -> OnClose {
  581. switch self {
  582. case .active(var active):
  583. var subchannelsToClose = [Subchannel]()
  584. for (id, subchannelState) in active.subchannels {
  585. subchannelsToClose.append(subchannelState.subchannel)
  586. active.parkedSubchannels[id] = subchannelState.subchannel
  587. }
  588. if subchannelsToClose.isEmpty {
  589. self = .closed
  590. return .closed
  591. } else {
  592. self = .closing(Closing(reason: .user, parkedSubchannels: active.parkedSubchannels))
  593. return .closeSubchannels(subchannelsToClose)
  594. }
  595. case .closing, .closed:
  596. return .none
  597. }
  598. }
  599. enum OnPickSubchannel {
  600. case picked(Subchannel)
  601. case notAvailable([Subchannel])
  602. }
  603. mutating func pickSubchannel() -> OnPickSubchannel {
  604. let onMakeStream: OnPickSubchannel
  605. switch self {
  606. case .active(var active):
  607. if let subchannel = active.pick() {
  608. onMakeStream = .picked(subchannel)
  609. } else {
  610. switch active.aggregateConnectivityState {
  611. case .idle:
  612. onMakeStream = .notAvailable(active.subchannels.values.map { $0.subchannel })
  613. case .connecting, .ready, .transientFailure, .shutdown:
  614. onMakeStream = .notAvailable([])
  615. }
  616. }
  617. self = .active(active)
  618. case .closing, .closed:
  619. onMakeStream = .notAvailable([])
  620. }
  621. return onMakeStream
  622. }
  623. }
  624. }
  625. extension ConnectivityState {
  626. static func aggregate(_ states: some Collection<ConnectivityState>) -> ConnectivityState {
  627. // See https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  628. // If any one subchannel is in READY state, the channel's state is READY.
  629. if states.contains(where: { $0 == .ready }) {
  630. return .ready
  631. }
  632. // Otherwise, if there is any subchannel in state CONNECTING, the channel's state is CONNECTING.
  633. if states.contains(where: { $0 == .connecting }) {
  634. return .connecting
  635. }
  636. // Otherwise, if there is any subchannel in state IDLE, the channel's state is IDLE.
  637. if states.contains(where: { $0 == .idle }) {
  638. return .idle
  639. }
  640. // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
  641. // is TRANSIENT_FAILURE.
  642. if states.allSatisfy({ $0 == .transientFailure }) {
  643. return .transientFailure
  644. }
  645. return .shutdown
  646. }
  647. }