PickFirstLoadBalancer.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package import GRPCCore
  17. private import Synchronization
  18. /// A load-balancer which has a single subchannel.
  19. ///
  20. /// This load-balancer starts in an 'idle' state and begins connecting when a set of addresses is
  21. /// provided to it with ``updateEndpoint(_:)``. Repeated calls to ``updateEndpoint(_:)`` will
  22. /// update the subchannel gracefully: RPCs will continue to use the old subchannel until the new
  23. /// subchannel becomes ready.
  24. ///
  25. /// You must call ``close()`` on the load-balancer when it's no longer required. This will move
  26. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  27. /// calls to ``makeStream(descriptor:options:)`` will fail.
  28. ///
  29. /// To use this load-balancer you must run it in a task:
  30. ///
  31. /// ```swift
  32. /// await withDiscardingTaskGroup { group in
  33. /// // Run the load-balancer
  34. /// group.addTask { await pickFirst.run() }
  35. ///
  36. /// // Update its endpoint.
  37. /// let endpoint = Endpoint(
  38. /// addresses: [
  39. /// .ipv4(host: "127.0.0.1", port: 1001),
  40. /// .ipv4(host: "127.0.0.1", port: 1002),
  41. /// .ipv4(host: "127.0.0.1", port: 1003)
  42. /// ]
  43. /// )
  44. /// pickFirst.updateEndpoint(endpoint)
  45. ///
  46. /// // Consume state update events
  47. /// for await event in pickFirst.events {
  48. /// switch event {
  49. /// case .connectivityStateChanged(.ready):
  50. /// // ...
  51. /// default:
  52. /// // ...
  53. /// }
  54. /// }
  55. /// }
  56. /// ```
  57. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  58. package final class PickFirstLoadBalancer: Sendable {
  59. enum Input: Sendable, Hashable {
  60. /// Update the addresses used by the load balancer to the following endpoints.
  61. case updateEndpoint(Endpoint)
  62. /// Close the load balancer.
  63. case close
  64. }
  65. /// Events which can happen to the load balancer.
  66. private let event:
  67. (
  68. stream: AsyncStream<LoadBalancerEvent>,
  69. continuation: AsyncStream<LoadBalancerEvent>.Continuation
  70. )
  71. /// Inputs which this load balancer should react to.
  72. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  73. /// A connector, capable of creating connections.
  74. private let connector: any HTTP2Connector
  75. /// Connection backoff configuration.
  76. private let backoff: ConnectionBackoff
  77. /// The default compression algorithm to use. Can be overridden on a per-call basis.
  78. private let defaultCompression: CompressionAlgorithm
  79. /// The set of enabled compression algorithms.
  80. private let enabledCompression: CompressionAlgorithmSet
  81. /// The state of the load-balancer.
  82. private let state: Mutex<State>
  83. /// The ID of this load balancer.
  84. internal let id: LoadBalancerID
  85. package init(
  86. connector: any HTTP2Connector,
  87. backoff: ConnectionBackoff,
  88. defaultCompression: CompressionAlgorithm,
  89. enabledCompression: CompressionAlgorithmSet
  90. ) {
  91. self.connector = connector
  92. self.backoff = backoff
  93. self.defaultCompression = defaultCompression
  94. self.enabledCompression = enabledCompression
  95. self.id = LoadBalancerID()
  96. self.state = Mutex(State())
  97. self.event = AsyncStream.makeStream(of: LoadBalancerEvent.self)
  98. self.input = AsyncStream.makeStream(of: Input.self)
  99. // The load balancer starts in the idle state.
  100. self.event.continuation.yield(.connectivityStateChanged(.idle))
  101. }
  102. /// A stream of events which can happen to the load balancer.
  103. package var events: AsyncStream<LoadBalancerEvent> {
  104. self.event.stream
  105. }
  106. /// Runs the load balancer, returning when it has closed.
  107. ///
  108. /// You can monitor events which happen on the load balancer with ``events``.
  109. package func run() async {
  110. await withDiscardingTaskGroup { group in
  111. for await input in self.input.stream {
  112. switch input {
  113. case .updateEndpoint(let endpoint):
  114. self.handleUpdateEndpoint(endpoint, in: &group)
  115. case .close:
  116. self.handleCloseInput()
  117. }
  118. }
  119. }
  120. if Task.isCancelled {
  121. // Finish the event stream as it's unlikely to have been finished by a regular code path.
  122. self.event.continuation.finish()
  123. }
  124. }
  125. /// Update the addresses used by the load balancer.
  126. ///
  127. /// This may result in new subchannels being created and some subchannels being removed.
  128. package func updateEndpoint(_ endpoint: Endpoint) {
  129. self.input.continuation.yield(.updateEndpoint(endpoint))
  130. }
  131. /// Close the load balancer, and all subchannels it manages.
  132. package func close() {
  133. self.input.continuation.yield(.close)
  134. }
  135. /// Pick a ready subchannel from the load balancer.
  136. ///
  137. /// - Returns: A subchannel, or `nil` if there aren't any ready subchannels.
  138. package func pickSubchannel() -> Subchannel? {
  139. let onPickSubchannel = self.state.withLock { $0.pickSubchannel() }
  140. switch onPickSubchannel {
  141. case .picked(let subchannel):
  142. return subchannel
  143. case .notAvailable(let subchannel):
  144. subchannel?.connect()
  145. return nil
  146. }
  147. }
  148. }
  149. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  150. extension PickFirstLoadBalancer {
  151. private func handleUpdateEndpoint(_ endpoint: Endpoint, in group: inout DiscardingTaskGroup) {
  152. if endpoint.addresses.isEmpty { return }
  153. let onUpdate = self.state.withLock { state in
  154. state.updateEndpoint(endpoint) { endpoint, id in
  155. Subchannel(
  156. endpoint: endpoint,
  157. id: id,
  158. connector: self.connector,
  159. backoff: self.backoff,
  160. defaultCompression: self.defaultCompression,
  161. enabledCompression: self.enabledCompression
  162. )
  163. }
  164. }
  165. switch onUpdate {
  166. case .connect(let newSubchannel, close: let oldSubchannel):
  167. self.runSubchannel(newSubchannel, in: &group)
  168. oldSubchannel?.shutDown()
  169. case .none:
  170. ()
  171. }
  172. }
  173. private func runSubchannel(
  174. _ subchannel: Subchannel,
  175. in group: inout DiscardingTaskGroup
  176. ) {
  177. // Start running it and tell it to connect.
  178. subchannel.connect()
  179. group.addTask {
  180. await subchannel.run()
  181. }
  182. group.addTask {
  183. for await event in subchannel.events {
  184. switch event {
  185. case .connectivityStateChanged(let state):
  186. self.handleSubchannelConnectivityStateChange(state, id: subchannel.id)
  187. case .goingAway:
  188. self.handleGoAway(id: subchannel.id)
  189. case .requiresNameResolution:
  190. self.event.continuation.yield(.requiresNameResolution)
  191. }
  192. }
  193. }
  194. }
  195. private func handleSubchannelConnectivityStateChange(
  196. _ connectivityState: ConnectivityState,
  197. id: SubchannelID
  198. ) {
  199. let onUpdateState = self.state.withLock {
  200. $0.updateSubchannelConnectivityState(connectivityState, id: id)
  201. }
  202. switch onUpdateState {
  203. case .close(let subchannel):
  204. subchannel.shutDown()
  205. case .closeAndPublishStateChange(let subchannel, let connectivityState):
  206. subchannel.shutDown()
  207. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  208. case .publishStateChange(let connectivityState):
  209. self.event.continuation.yield(.connectivityStateChanged(connectivityState))
  210. case .closed:
  211. self.event.continuation.finish()
  212. self.input.continuation.finish()
  213. case .none:
  214. ()
  215. }
  216. }
  217. private func handleGoAway(id: SubchannelID) {
  218. self.state.withLock { state in
  219. state.receivedGoAway(id: id)
  220. }
  221. }
  222. private func handleCloseInput() {
  223. let onClose = self.state.withLock { $0.close() }
  224. switch onClose {
  225. case .closeSubchannels(let subchannel1, let subchannel2):
  226. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  227. subchannel1.shutDown()
  228. subchannel2?.shutDown()
  229. case .closed:
  230. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  231. self.event.continuation.finish()
  232. self.input.continuation.finish()
  233. case .none:
  234. ()
  235. }
  236. }
  237. }
  238. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  239. extension PickFirstLoadBalancer {
  240. enum State: Sendable {
  241. case active(Active)
  242. case closing(Closing)
  243. case closed
  244. init() {
  245. self = .active(Active())
  246. }
  247. }
  248. }
  249. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  250. extension PickFirstLoadBalancer.State {
  251. struct Active: Sendable {
  252. var endpoint: Endpoint?
  253. var connectivityState: ConnectivityState
  254. var current: Subchannel?
  255. var next: Subchannel?
  256. var parked: [SubchannelID: Subchannel]
  257. var isCurrentGoingAway: Bool
  258. init() {
  259. self.endpoint = nil
  260. self.connectivityState = .idle
  261. self.current = nil
  262. self.next = nil
  263. self.parked = [:]
  264. self.isCurrentGoingAway = false
  265. }
  266. }
  267. struct Closing: Sendable {
  268. var parked: [SubchannelID: Subchannel]
  269. init(from state: Active) {
  270. self.parked = state.parked
  271. }
  272. }
  273. }
  274. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  275. extension PickFirstLoadBalancer.State.Active {
  276. mutating func updateEndpoint(
  277. _ endpoint: Endpoint,
  278. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  279. ) -> PickFirstLoadBalancer.State.OnUpdateEndpoint {
  280. if self.endpoint == endpoint { return .none }
  281. let onUpdateEndpoint: PickFirstLoadBalancer.State.OnUpdateEndpoint
  282. let id = SubchannelID()
  283. let newSubchannel = makeSubchannel(endpoint, id)
  284. switch (self.current, self.next) {
  285. case (.some(let current), .none):
  286. if self.connectivityState == .idle {
  287. // Current subchannel is idle and we have a new endpoint, move straight to the new
  288. // subchannel.
  289. self.current = newSubchannel
  290. self.parked[current.id] = current
  291. onUpdateEndpoint = .connect(newSubchannel, close: current)
  292. } else {
  293. // Current subchannel is in a non-idle state, set it as the next subchannel and promote
  294. // it when it becomes ready.
  295. self.next = newSubchannel
  296. onUpdateEndpoint = .connect(newSubchannel, close: nil)
  297. }
  298. case (.some, .some(let next)):
  299. // Current and next subchannel exist. Replace the next subchannel.
  300. self.next = newSubchannel
  301. self.parked[next.id] = next
  302. onUpdateEndpoint = .connect(newSubchannel, close: next)
  303. case (.none, .none):
  304. self.current = newSubchannel
  305. onUpdateEndpoint = .connect(newSubchannel, close: nil)
  306. case (.none, .some(let next)):
  307. self.current = newSubchannel
  308. self.next = nil
  309. self.parked[next.id] = next
  310. onUpdateEndpoint = .connect(newSubchannel, close: next)
  311. }
  312. return onUpdateEndpoint
  313. }
  314. mutating func updateSubchannelConnectivityState(
  315. _ connectivityState: ConnectivityState,
  316. id: SubchannelID
  317. ) -> (PickFirstLoadBalancer.State.OnConnectivityStateUpdate, PickFirstLoadBalancer.State) {
  318. let onUpdate: PickFirstLoadBalancer.State.OnConnectivityStateUpdate
  319. if let current = self.current, current.id == id {
  320. if connectivityState == self.connectivityState {
  321. onUpdate = .none
  322. } else {
  323. self.connectivityState = connectivityState
  324. onUpdate = .publishStateChange(connectivityState)
  325. }
  326. } else if let next = self.next, next.id == id {
  327. // if it becomes ready then promote it
  328. switch connectivityState {
  329. case .ready:
  330. if self.connectivityState != connectivityState {
  331. self.connectivityState = connectivityState
  332. if let current = self.current {
  333. onUpdate = .closeAndPublishStateChange(current, connectivityState)
  334. } else {
  335. onUpdate = .publishStateChange(connectivityState)
  336. }
  337. self.current = next
  338. self.isCurrentGoingAway = false
  339. } else {
  340. // No state change to publish, just roll over.
  341. onUpdate = self.current.map { .close($0) } ?? .none
  342. self.current = next
  343. self.isCurrentGoingAway = false
  344. }
  345. case .idle, .connecting, .transientFailure, .shutdown:
  346. onUpdate = .none
  347. }
  348. } else {
  349. switch connectivityState {
  350. case .idle:
  351. if let subchannel = self.parked[id] {
  352. onUpdate = .close(subchannel)
  353. } else {
  354. onUpdate = .none
  355. }
  356. case .shutdown:
  357. self.parked.removeValue(forKey: id)
  358. onUpdate = .none
  359. case .connecting, .ready, .transientFailure:
  360. onUpdate = .none
  361. }
  362. }
  363. return (onUpdate, .active(self))
  364. }
  365. mutating func receivedGoAway(id: SubchannelID) {
  366. if let current = self.current, current.id == id {
  367. // When receiving a GOAWAY the subchannel will ask for an address to be re-resolved and the
  368. // connection will eventually become idle. At this point we wait: the connection remains
  369. // in its current state.
  370. self.isCurrentGoingAway = true
  371. } else if let next = self.next, next.id == id {
  372. // The next connection is going away, park it.
  373. // connection.
  374. self.next = nil
  375. self.parked[next.id] = next
  376. }
  377. }
  378. mutating func close() -> (PickFirstLoadBalancer.State.OnClose, PickFirstLoadBalancer.State) {
  379. let onClose: PickFirstLoadBalancer.State.OnClose
  380. let nextState: PickFirstLoadBalancer.State
  381. if let current = self.current {
  382. self.parked[current.id] = current
  383. if let next = self.next {
  384. self.parked[next.id] = next
  385. onClose = .closeSubchannels(current, next)
  386. } else {
  387. onClose = .closeSubchannels(current, nil)
  388. }
  389. nextState = .closing(PickFirstLoadBalancer.State.Closing(from: self))
  390. } else {
  391. onClose = .closed
  392. nextState = .closed
  393. }
  394. return (onClose, nextState)
  395. }
  396. func pickSubchannel() -> PickFirstLoadBalancer.State.OnPickSubchannel {
  397. let onPick: PickFirstLoadBalancer.State.OnPickSubchannel
  398. if let current = self.current, !self.isCurrentGoingAway {
  399. switch self.connectivityState {
  400. case .idle:
  401. onPick = .notAvailable(current)
  402. case .ready:
  403. onPick = .picked(current)
  404. case .connecting, .transientFailure, .shutdown:
  405. onPick = .notAvailable(nil)
  406. }
  407. } else {
  408. onPick = .notAvailable(nil)
  409. }
  410. return onPick
  411. }
  412. }
  413. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  414. extension PickFirstLoadBalancer.State.Closing {
  415. mutating func updateSubchannelConnectivityState(
  416. _ connectivityState: ConnectivityState,
  417. id: SubchannelID
  418. ) -> (PickFirstLoadBalancer.State.OnConnectivityStateUpdate, PickFirstLoadBalancer.State) {
  419. let onUpdate: PickFirstLoadBalancer.State.OnConnectivityStateUpdate
  420. let nextState: PickFirstLoadBalancer.State
  421. switch connectivityState {
  422. case .idle:
  423. if let subchannel = self.parked[id] {
  424. onUpdate = .close(subchannel)
  425. } else {
  426. onUpdate = .none
  427. }
  428. nextState = .closing(self)
  429. case .shutdown:
  430. if self.parked.removeValue(forKey: id) != nil {
  431. if self.parked.isEmpty {
  432. onUpdate = .closed
  433. nextState = .closed
  434. } else {
  435. onUpdate = .none
  436. nextState = .closing(self)
  437. }
  438. } else {
  439. onUpdate = .none
  440. nextState = .closing(self)
  441. }
  442. case .connecting, .ready, .transientFailure:
  443. onUpdate = .none
  444. nextState = .closing(self)
  445. }
  446. return (onUpdate, nextState)
  447. }
  448. }
  449. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  450. extension PickFirstLoadBalancer.State {
  451. enum OnUpdateEndpoint {
  452. case connect(Subchannel, close: Subchannel?)
  453. case none
  454. }
  455. mutating func updateEndpoint(
  456. _ endpoint: Endpoint,
  457. makeSubchannel: (_ endpoint: Endpoint, _ id: SubchannelID) -> Subchannel
  458. ) -> OnUpdateEndpoint {
  459. let onUpdateEndpoint: OnUpdateEndpoint
  460. switch self {
  461. case .active(var state):
  462. onUpdateEndpoint = state.updateEndpoint(endpoint) { endpoint, id in
  463. makeSubchannel(endpoint, id)
  464. }
  465. self = .active(state)
  466. case .closing, .closed:
  467. onUpdateEndpoint = .none
  468. }
  469. return onUpdateEndpoint
  470. }
  471. enum OnConnectivityStateUpdate {
  472. case closeAndPublishStateChange(Subchannel, ConnectivityState)
  473. case publishStateChange(ConnectivityState)
  474. case close(Subchannel)
  475. case closed
  476. case none
  477. }
  478. mutating func updateSubchannelConnectivityState(
  479. _ connectivityState: ConnectivityState,
  480. id: SubchannelID
  481. ) -> OnConnectivityStateUpdate {
  482. let onUpdateState: OnConnectivityStateUpdate
  483. switch self {
  484. case .active(var state):
  485. (onUpdateState, self) = state.updateSubchannelConnectivityState(connectivityState, id: id)
  486. case .closing(var state):
  487. (onUpdateState, self) = state.updateSubchannelConnectivityState(connectivityState, id: id)
  488. case .closed:
  489. onUpdateState = .none
  490. }
  491. return onUpdateState
  492. }
  493. mutating func receivedGoAway(id: SubchannelID) {
  494. switch self {
  495. case .active(var state):
  496. state.receivedGoAway(id: id)
  497. self = .active(state)
  498. case .closing, .closed:
  499. ()
  500. }
  501. }
  502. enum OnClose {
  503. case closeSubchannels(Subchannel, Subchannel?)
  504. case closed
  505. case none
  506. }
  507. mutating func close() -> OnClose {
  508. let onClose: OnClose
  509. switch self {
  510. case .active(var state):
  511. (onClose, self) = state.close()
  512. case .closing, .closed:
  513. onClose = .none
  514. }
  515. return onClose
  516. }
  517. enum OnPickSubchannel {
  518. case picked(Subchannel)
  519. case notAvailable(Subchannel?)
  520. }
  521. func pickSubchannel() -> OnPickSubchannel {
  522. switch self {
  523. case .active(let state):
  524. return state.pickSubchannel()
  525. case .closing, .closed:
  526. return .notAvailable(nil)
  527. }
  528. }
  529. }