RoundRobinLoadBalancer.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. /// A load-balancer which maintains to a set of subchannels and uses round-robin to pick a
  18. /// subchannel when picking a subchannel to use.
  19. ///
  20. /// This load-balancer starts in an 'idle' state and begins connecting when a set of addresses is
  21. /// provided to it with ``updateAddresses(_:)``. Repeated calls to ``updateAddresses(_:)`` will
  22. /// update the subchannels gracefully: new subchannels will be added for new addresses and existing
  23. /// subchannels will be removed if their addresses are no longer present.
  24. ///
  25. /// The state of the load-balancer is aggregated across the state of its subchannels, changes in
  26. /// the aggregate state are reported up via ``events``.
  27. ///
  28. /// You must call ``close()`` on the load-balancer when it's no longer required. This will move
  29. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  30. /// calls to ``makeStream(descriptor:options:)`` will fail.
  31. ///
  32. /// To use this load-balancer you must run it in a task:
  33. ///
  34. /// ```swift
  35. /// await withDiscardingTaskGroup { group in
  36. /// // Run the load-balancer
  37. /// group.addTask { await roundRobin.run() }
  38. ///
  39. /// // Update its address list
  40. /// let endpoints: [Endpoint] = [
  41. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1001)]),
  42. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1002)]),
  43. /// Endpoint(addresses: [.ipv4(host: "127.0.0.1", port: 1003)])
  44. /// ]
  45. /// roundRobin.updateAddresses(endpoints)
  46. ///
  47. /// // Consume state update events
  48. /// for await event in roundRobin.events {
  49. /// switch event {
  50. /// case .connectivityStateChanged(.ready):
  51. /// // ...
  52. /// default:
  53. /// // ...
  54. /// }
  55. /// }
  56. /// }
  57. /// ```
  58. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  59. struct RoundRobinLoadBalancer {
  60. enum Input: Sendable, Hashable {
  61. /// Update the addresses used by the load balancer to the following endpoints.
  62. case updateAddresses([Endpoint])
  63. /// Close the load balancer.
  64. case close
  65. }
  66. /// A key for an endpoint which identifies it uniquely, regardless of the ordering of addresses.
  67. private struct EndpointKey: Hashable, Sendable, CustomStringConvertible {
  68. /// Opaque data.
  69. private let opaque: [String]
  70. /// The endpoint this key is for.
  71. let endpoint: Endpoint
  72. init(_ endpoint: Endpoint) {
  73. self.endpoint = endpoint
  74. self.opaque = endpoint.addresses.map { String(describing: $0) }.sorted()
  75. }
  76. var description: String {
  77. String(describing: self.endpoint.addresses)
  78. }
  79. func hash(into hasher: inout Hasher) {
  80. hasher.combine(self.opaque)
  81. }
  82. static func == (lhs: Self, rhs: Self) -> Bool {
  83. lhs.opaque == rhs.opaque
  84. }
  85. }
  86. /// Events which can happen to the load balancer.
  87. private let event:
  88. (
  89. stream: AsyncStream<LoadBalancerEvent>,
  90. continuation: AsyncStream<LoadBalancerEvent>.Continuation
  91. )
  92. /// Inputs which this load balancer should react to.
  93. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  94. /// The state of the load balancer.
  95. private let state: _LockedValueBox<State>
  96. /// A connector, capable of creating connections.
  97. private let connector: any HTTP2Connector
  98. /// Connection backoff configuration.
  99. private let backoff: ConnectionBackoff
  100. /// The default compression algorithm to use. Can be overridden on a per-call basis.
  101. private let defaultCompression: CompressionAlgorithm
  102. /// The set of enabled compression algorithms.
  103. private let enabledCompression: CompressionAlgorithmSet
  104. /// The ID of this load balancer.
  105. internal let id: LoadBalancerID
  106. init(
  107. connector: any HTTP2Connector,
  108. backoff: ConnectionBackoff,
  109. defaultCompression: CompressionAlgorithm,
  110. enabledCompression: CompressionAlgorithmSet
  111. ) {
  112. self.connector = connector
  113. self.backoff = backoff
  114. self.defaultCompression = defaultCompression
  115. self.enabledCompression = enabledCompression
  116. self.id = LoadBalancerID()
  117. self.event = AsyncStream.makeStream(of: LoadBalancerEvent.self)
  118. self.input = AsyncStream.makeStream(of: Input.self)
  119. self.state = _LockedValueBox(.active(State.Active()))
  120. // The load balancer starts in the idle state.
  121. self.event.continuation.yield(.connectivityStateChanged(.idle))
  122. }
  123. /// A stream of events which can happen to the load balancer.
  124. var events: AsyncStream<LoadBalancerEvent> {
  125. self.event.stream
  126. }
  127. /// Runs the load balancer, returning when it has closed.
  128. ///
  129. /// You can monitor events which happen on the load balancer with ``events``.
  130. func run() async {
  131. await withDiscardingTaskGroup { group in
  132. for await input in self.input.stream {
  133. switch input {
  134. case .updateAddresses(let addresses):
  135. self.handleUpdateAddresses(addresses, in: &group)
  136. case .close:
  137. self.handleCloseInput()
  138. }
  139. }
  140. }
  141. if Task.isCancelled {
  142. // Finish the event stream as it's unlikely to have been finished by a regular code path.
  143. self.event.continuation.finish()
  144. }
  145. }
  146. /// Update the addresses used by the load balancer.
  147. ///
  148. /// This may result in new subchannels being created and some subchannels being removed.
  149. func updateAddresses(_ endpoints: [Endpoint]) {
  150. self.input.continuation.yield(.updateAddresses(endpoints))
  151. }
  152. /// Close the load balancer, and all subchannels it manages.
  153. func close() {
  154. self.input.continuation.yield(.close)
  155. }
  156. /// Pick a ready subchannel from the load balancer.
  157. ///
  158. /// - Returns: A subchannel, or `nil` if there aren't any ready subchannels.
  159. func pickSubchannel() -> Subchannel? {
  160. switch self.state.withLockedValue({ $0.pickSubchannel() }) {
  161. case .picked(let subchannel):
  162. return subchannel
  163. case .notAvailable(let subchannels):
  164. // Tell the subchannels to start connecting.
  165. for subchannel in subchannels {
  166. subchannel.connect()
  167. }
  168. return nil
  169. }
  170. }
  171. }
  172. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  173. extension RoundRobinLoadBalancer {
  174. /// Handles an update in endpoints.
  175. ///
  176. /// The load-balancer will diff the set of endpoints with the existing set of endpoints:
  177. /// - endpoints which are new will have subchannels created for them,
  178. /// - endpoints which existed previously but are not present in `endpoints` are closed,
  179. /// - endpoints which existed previously and are still present in `endpoints` are untouched.
  180. ///
  181. /// This process is gradual: the load-balancer won't remove an old endpoint until a subchannel
  182. /// for a corresponding new subchannel becomes ready.
  183. ///
  184. /// - Parameters:
  185. /// - endpoints: Endpoints which should have subchannels. Must not be empty.
  186. /// - group: The group which should manage and run new subchannels.
  187. private func handleUpdateAddresses(_ endpoints: [Endpoint], in group: inout DiscardingTaskGroup) {
  188. if endpoints.isEmpty { return }
  189. // Compute the keys for each endpoint.
  190. let newEndpoints = Set(endpoints.map { EndpointKey($0) })
  191. let (added, removed, newState) = self.state.withLockedValue { state in
  192. state.updateSubchannels(newEndpoints: newEndpoints) { endpoint, id in
  193. Subchannel(
  194. endpoint: endpoint,
  195. id: id,
  196. connector: self.connector,
  197. backoff: self.backoff,
  198. defaultCompression: self.defaultCompression,
  199. enabledCompression: self.enabledCompression
  200. )
  201. }
  202. }
  203. // Publish the new connectivity state.
  204. if let newState = newState {
  205. self.event.continuation.yield(.connectivityStateChanged(newState))
  206. }
  207. // Run each of the new subchannels.
  208. for subchannel in added {
  209. let key = EndpointKey(subchannel.endpoint)
  210. self.runSubchannel(subchannel, forKey: key, in: &group)
  211. }
  212. // Old subchannels are removed when new subchannels become ready. Excess subchannels are only
  213. // present if there are more to remove than to add. These are the excess subchannels which
  214. // are closed now.
  215. for subchannel in removed {
  216. subchannel.shutDown()
  217. }
  218. }
  219. private func runSubchannel(
  220. _ subchannel: Subchannel,
  221. forKey key: EndpointKey,
  222. in group: inout DiscardingTaskGroup
  223. ) {
  224. // Start running it and tell it to connect.
  225. subchannel.connect()
  226. group.addTask {
  227. await subchannel.run()
  228. }
  229. group.addTask {
  230. for await event in subchannel.events {
  231. switch event {
  232. case .connectivityStateChanged(let state):
  233. self.handleSubchannelConnectivityStateChange(state, key: key)
  234. case .goingAway:
  235. self.handleSubchannelGoingAway(key: key)
  236. case .requiresNameResolution:
  237. self.event.continuation.yield(.requiresNameResolution)
  238. }
  239. }
  240. }
  241. }
  242. private func handleSubchannelConnectivityStateChange(
  243. _ connectivityState: ConnectivityState,
  244. key: EndpointKey
  245. ) {
  246. let onChange = self.state.withLockedValue { state in
  247. state.updateSubchannelConnectivityState(connectivityState, key: key)
  248. }
  249. switch onChange {
  250. case .publishStateChange(let aggregateState):
  251. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  252. case .closeAndPublishStateChange(let subchannel, let aggregateState):
  253. self.event.continuation.yield(.connectivityStateChanged(aggregateState))
  254. subchannel.shutDown()
  255. case .close(let subchannel):
  256. subchannel.shutDown()
  257. case .closed:
  258. // All subchannels are closed; finish the streams so the run loop exits.
  259. self.event.continuation.finish()
  260. self.input.continuation.finish()
  261. case .none:
  262. ()
  263. }
  264. }
  265. private func handleSubchannelGoingAway(key: EndpointKey) {
  266. switch self.state.withLockedValue({ $0.parkSubchannel(withKey: key) }) {
  267. case .closeAndUpdateState(let subchannel, let connectivityState):
  268. subchannel.shutDown()
  269. if let connectivityState = connectivityState {
  270. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  271. }
  272. case .none:
  273. ()
  274. }
  275. }
  276. private func handleCloseInput() {
  277. switch self.state.withLockedValue({ $0.close() }) {
  278. case .closeSubchannels(let subchannels):
  279. // Publish a new shutdown state, this LB is no longer usable for new RPCs.
  280. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  281. // Close the subchannels.
  282. for subchannel in subchannels {
  283. subchannel.shutDown()
  284. }
  285. case .closed:
  286. // No subchannels to close.
  287. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  288. self.event.continuation.finish()
  289. self.input.continuation.finish()
  290. case .none:
  291. ()
  292. }
  293. }
  294. }
  295. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  296. extension RoundRobinLoadBalancer {
  297. private enum State {
  298. case active(Active)
  299. case closing(Closing)
  300. case closed
  301. struct Active {
  302. private(set) var aggregateConnectivityState: ConnectivityState
  303. private var picker: Picker?
  304. var endpoints: [Endpoint]
  305. var subchannels: [EndpointKey: SubchannelState]
  306. var parkedSubchannels: [EndpointKey: Subchannel]
  307. init() {
  308. self.endpoints = []
  309. self.subchannels = [:]
  310. self.parkedSubchannels = [:]
  311. self.aggregateConnectivityState = .idle
  312. self.picker = nil
  313. }
  314. mutating func updateConnectivityState(
  315. _ state: ConnectivityState,
  316. key: EndpointKey
  317. ) -> OnSubchannelConnectivityStateUpdate {
  318. if let changed = self.subchannels[key]?.updateState(state) {
  319. guard changed else { return .none }
  320. let subchannelToClose: Subchannel?
  321. switch state {
  322. case .ready:
  323. if let index = self.subchannels.firstIndex(where: { $0.value.markedForRemoval }) {
  324. let (key, subchannelState) = self.subchannels.remove(at: index)
  325. self.parkedSubchannels[key] = subchannelState.subchannel
  326. subchannelToClose = subchannelState.subchannel
  327. } else {
  328. subchannelToClose = nil
  329. }
  330. case .idle, .connecting, .transientFailure, .shutdown:
  331. subchannelToClose = nil
  332. }
  333. let aggregateState = self.refreshPickerAndAggregateState()
  334. switch (subchannelToClose, aggregateState) {
  335. case (.some(let subchannel), .some(let state)):
  336. return .closeAndPublishStateChange(subchannel, state)
  337. case (.some(let subchannel), .none):
  338. return .close(subchannel)
  339. case (.none, .some(let state)):
  340. return .publishStateChange(state)
  341. case (.none, .none):
  342. return .none
  343. }
  344. } else {
  345. switch state {
  346. case .idle:
  347. // The subchannel can be parked before it's shutdown. If there are no active RPCs then
  348. // it will enter the idle state instead. If that happens, close it.
  349. if let parked = self.parkedSubchannels[key] {
  350. return .close(parked)
  351. } else {
  352. return .none
  353. }
  354. case .shutdown:
  355. self.parkedSubchannels.removeValue(forKey: key)
  356. case .connecting, .ready, .transientFailure:
  357. ()
  358. }
  359. return .none
  360. }
  361. }
  362. mutating func refreshPickerAndAggregateState() -> ConnectivityState? {
  363. let ready = self.subchannels.values.compactMap { $0.state == .ready ? $0.subchannel : nil }
  364. self.picker = Picker(subchannels: ready)
  365. let aggregate = ConnectivityState.aggregate(self.subchannels.values.map { $0.state })
  366. if aggregate == self.aggregateConnectivityState {
  367. return nil
  368. } else {
  369. self.aggregateConnectivityState = aggregate
  370. return aggregate
  371. }
  372. }
  373. mutating func pick() -> Subchannel? {
  374. self.picker?.pick()
  375. }
  376. mutating func markForRemoval(
  377. _ keys: some Sequence<EndpointKey>,
  378. numberToRemoveNow: Int
  379. ) -> [Subchannel] {
  380. var numberToRemoveNow = numberToRemoveNow
  381. var keyIterator = keys.makeIterator()
  382. var subchannelsToClose = [Subchannel]()
  383. while numberToRemoveNow > 0, let key = keyIterator.next() {
  384. if let subchannelState = self.subchannels.removeValue(forKey: key) {
  385. numberToRemoveNow -= 1
  386. self.parkedSubchannels[key] = subchannelState.subchannel
  387. subchannelsToClose.append(subchannelState.subchannel)
  388. }
  389. }
  390. while let key = keyIterator.next() {
  391. self.subchannels[key]?.markForRemoval()
  392. }
  393. return subchannelsToClose
  394. }
  395. mutating func registerSubchannels(
  396. withKeys keys: some Sequence<EndpointKey>,
  397. _ makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  398. ) -> [Subchannel] {
  399. var subchannels = [Subchannel]()
  400. for key in keys {
  401. let subchannel = makeSubchannel(key.endpoint, SubchannelID())
  402. subchannels.append(subchannel)
  403. self.subchannels[key] = SubchannelState(subchannel: subchannel)
  404. }
  405. return subchannels
  406. }
  407. }
  408. struct Closing {
  409. enum Reason: Sendable, Hashable {
  410. case goAway
  411. case user
  412. }
  413. var reason: Reason
  414. var parkedSubchannels: [EndpointKey: Subchannel]
  415. mutating func updateConnectivityState(
  416. _ state: ConnectivityState,
  417. key: EndpointKey
  418. ) -> (OnSubchannelConnectivityStateUpdate, RoundRobinLoadBalancer.State) {
  419. let result: OnSubchannelConnectivityStateUpdate
  420. let nextState: RoundRobinLoadBalancer.State
  421. switch state {
  422. case .idle:
  423. if let parked = self.parkedSubchannels[key] {
  424. result = .close(parked)
  425. } else {
  426. result = .none
  427. }
  428. nextState = .closing(self)
  429. case .shutdown:
  430. self.parkedSubchannels.removeValue(forKey: key)
  431. if self.parkedSubchannels.isEmpty {
  432. nextState = .closed
  433. result = .closed
  434. } else {
  435. nextState = .closing(self)
  436. result = .none
  437. }
  438. case .connecting, .ready, .transientFailure:
  439. result = .none
  440. nextState = .closing(self)
  441. }
  442. return (result, nextState)
  443. }
  444. }
  445. struct SubchannelState {
  446. var subchannel: Subchannel
  447. var state: ConnectivityState
  448. var markedForRemoval: Bool
  449. init(subchannel: Subchannel) {
  450. self.subchannel = subchannel
  451. self.state = .idle
  452. self.markedForRemoval = false
  453. }
  454. mutating func updateState(_ newState: ConnectivityState) -> Bool {
  455. // The transition from transient failure to connecting is ignored.
  456. //
  457. // See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  458. if self.state == .transientFailure, newState == .connecting {
  459. return false
  460. }
  461. let oldState = self.state
  462. self.state = newState
  463. return oldState != newState
  464. }
  465. mutating func markForRemoval() {
  466. self.markedForRemoval = true
  467. }
  468. }
  469. struct Picker {
  470. private var subchannels: [Subchannel]
  471. private var index: Int
  472. init?(subchannels: [Subchannel]) {
  473. if subchannels.isEmpty { return nil }
  474. self.subchannels = subchannels
  475. self.index = (0 ..< subchannels.count).randomElement()!
  476. }
  477. mutating func pick() -> Subchannel {
  478. defer {
  479. self.index = (self.index + 1) % self.subchannels.count
  480. }
  481. return self.subchannels[self.index]
  482. }
  483. }
  484. mutating func updateSubchannels(
  485. newEndpoints: Set<EndpointKey>,
  486. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  487. ) -> (run: [Subchannel], close: [Subchannel], newState: ConnectivityState?) {
  488. switch self {
  489. case .active(var state):
  490. let existingEndpoints = Set(state.subchannels.keys)
  491. let keysToAdd = newEndpoints.subtracting(existingEndpoints)
  492. let keysToRemove = existingEndpoints.subtracting(newEndpoints)
  493. if keysToRemove.isEmpty && keysToAdd.isEmpty {
  494. // Nothing to do.
  495. return (run: [], close: [], newState: nil)
  496. }
  497. // The load balancer should keep subchannels to remove in service until new subchannels
  498. // can replace each of them so that requests can continue to be served.
  499. //
  500. // If there are more keys to remove than to add, remove some now.
  501. let numberToRemoveNow = max(keysToRemove.count - keysToAdd.count, 0)
  502. let removed = state.markForRemoval(keysToRemove, numberToRemoveNow: numberToRemoveNow)
  503. let added = state.registerSubchannels(withKeys: keysToAdd, makeSubchannel)
  504. let newState = state.refreshPickerAndAggregateState()
  505. self = .active(state)
  506. return (run: added, close: removed, newState: newState)
  507. case .closing, .closed:
  508. // Nothing to do.
  509. return (run: [], close: [], newState: nil)
  510. }
  511. }
  512. enum OnParkChannel {
  513. case closeAndUpdateState(Subchannel, ConnectivityState?)
  514. case none
  515. }
  516. mutating func parkSubchannel(withKey key: EndpointKey) -> OnParkChannel {
  517. switch self {
  518. case .active(var state):
  519. guard let subchannelState = state.subchannels.removeValue(forKey: key) else {
  520. return .none
  521. }
  522. // Parking the subchannel may invalidate the picker and the aggregate state, refresh both.
  523. state.parkedSubchannels[key] = subchannelState.subchannel
  524. let newState = state.refreshPickerAndAggregateState()
  525. self = .active(state)
  526. return .closeAndUpdateState(subchannelState.subchannel, newState)
  527. case .closing, .closed:
  528. return .none
  529. }
  530. }
  531. mutating func registerSubchannels(
  532. withKeys keys: some Sequence<EndpointKey>,
  533. _ makeSubchannel: (Endpoint) -> Subchannel
  534. ) -> [Subchannel] {
  535. switch self {
  536. case .active(var state):
  537. var subchannels = [Subchannel]()
  538. for key in keys {
  539. let subchannel = makeSubchannel(key.endpoint)
  540. subchannels.append(subchannel)
  541. state.subchannels[key] = SubchannelState(subchannel: subchannel)
  542. }
  543. self = .active(state)
  544. return subchannels
  545. case .closing, .closed:
  546. return []
  547. }
  548. }
  549. enum OnSubchannelConnectivityStateUpdate {
  550. case closeAndPublishStateChange(Subchannel, ConnectivityState)
  551. case publishStateChange(ConnectivityState)
  552. case close(Subchannel)
  553. case closed
  554. case none
  555. }
  556. mutating func updateSubchannelConnectivityState(
  557. _ connectivityState: ConnectivityState,
  558. key: EndpointKey
  559. ) -> OnSubchannelConnectivityStateUpdate {
  560. switch self {
  561. case .active(var state):
  562. let result = state.updateConnectivityState(connectivityState, key: key)
  563. self = .active(state)
  564. return result
  565. case .closing(var state):
  566. let (result, nextState) = state.updateConnectivityState(connectivityState, key: key)
  567. self = nextState
  568. return result
  569. case .closed:
  570. return .none
  571. }
  572. }
  573. enum OnClose {
  574. case closeSubchannels([Subchannel])
  575. case closed
  576. case none
  577. }
  578. mutating func close() -> OnClose {
  579. switch self {
  580. case .active(var active):
  581. var subchannelsToClose = [Subchannel]()
  582. for (id, subchannelState) in active.subchannels {
  583. subchannelsToClose.append(subchannelState.subchannel)
  584. active.parkedSubchannels[id] = subchannelState.subchannel
  585. }
  586. if subchannelsToClose.isEmpty {
  587. self = .closed
  588. return .closed
  589. } else {
  590. self = .closing(Closing(reason: .user, parkedSubchannels: active.parkedSubchannels))
  591. return .closeSubchannels(subchannelsToClose)
  592. }
  593. case .closing, .closed:
  594. return .none
  595. }
  596. }
  597. enum OnPickSubchannel {
  598. case picked(Subchannel)
  599. case notAvailable([Subchannel])
  600. }
  601. mutating func pickSubchannel() -> OnPickSubchannel {
  602. let onMakeStream: OnPickSubchannel
  603. switch self {
  604. case .active(var active):
  605. if let subchannel = active.pick() {
  606. onMakeStream = .picked(subchannel)
  607. } else {
  608. switch active.aggregateConnectivityState {
  609. case .idle:
  610. onMakeStream = .notAvailable(active.subchannels.values.map { $0.subchannel })
  611. case .connecting, .ready, .transientFailure, .shutdown:
  612. onMakeStream = .notAvailable([])
  613. }
  614. }
  615. self = .active(active)
  616. case .closing, .closed:
  617. onMakeStream = .notAvailable([])
  618. }
  619. return onMakeStream
  620. }
  621. }
  622. }
  623. extension ConnectivityState {
  624. static func aggregate(_ states: some Collection<ConnectivityState>) -> ConnectivityState {
  625. // See https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
  626. // If any one subchannel is in READY state, the channel's state is READY.
  627. if states.contains(where: { $0 == .ready }) {
  628. return .ready
  629. }
  630. // Otherwise, if there is any subchannel in state CONNECTING, the channel's state is CONNECTING.
  631. if states.contains(where: { $0 == .connecting }) {
  632. return .connecting
  633. }
  634. // Otherwise, if there is any subchannel in state IDLE, the channel's state is IDLE.
  635. if states.contains(where: { $0 == .idle }) {
  636. return .idle
  637. }
  638. // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
  639. // is TRANSIENT_FAILURE.
  640. if states.allSatisfy({ $0 == .transientFailure }) {
  641. return .transientFailure
  642. }
  643. return .shutdown
  644. }
  645. }