GRPCChannel.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import DequeModule
  18. @_spi(Package) import GRPCCore
  19. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  20. @_spi(Package)
  21. public struct GRPCChannel: ClientTransport {
  22. private enum Input: Sendable {
  23. /// Close the channel, if possible.
  24. case close
  25. /// Handle the result of a name resolution.
  26. case handleResolutionResult(NameResolutionResult)
  27. /// Handle the event from the underlying connection object.
  28. case handleLoadBalancerEvent(LoadBalancerEvent, LoadBalancerID)
  29. }
  30. /// Events which can happen to the channel.
  31. private let _connectivityState:
  32. (
  33. stream: AsyncStream<ConnectivityState>,
  34. continuation: AsyncStream<ConnectivityState>.Continuation
  35. )
  36. /// Inputs which this channel should react to.
  37. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  38. /// A resolver providing resolved names to the channel.
  39. private let resolver: NameResolver
  40. /// The state of the channel.
  41. private let state: _LockedValueBox<StateMachine>
  42. /// The maximum number of times to attempt to create a stream per RPC.
  43. ///
  44. /// This is the value used by other gRPC implementations.
  45. private static let maxStreamCreationAttempts = 5
  46. /// A factory for connections.
  47. private let connector: any HTTP2Connector
  48. /// The connection backoff configuration used by the subchannel when establishing a connection.
  49. private let backoff: ConnectionBackoff
  50. /// The default compression algorithm used for requests.
  51. private let defaultCompression: CompressionAlgorithm
  52. /// The set of enabled compression algorithms.
  53. private let enabledCompression: CompressionAlgorithmSet
  54. /// The default service config to use.
  55. ///
  56. /// Used when the resolver doesn't provide one.
  57. private let defaultServiceConfig: ServiceConfig
  58. // These are both read frequently and updated infrequently so may be a bottleneck.
  59. private let _methodConfig: _LockedValueBox<_MethodConfigs>
  60. private let _retryThrottle: _LockedValueBox<RetryThrottle?>
  61. @_spi(Package)
  62. public init(
  63. resolver: NameResolver,
  64. connector: any HTTP2Connector,
  65. config: Config,
  66. defaultServiceConfig: ServiceConfig
  67. ) {
  68. self.resolver = resolver
  69. self.state = _LockedValueBox(StateMachine())
  70. self._connectivityState = AsyncStream.makeStream()
  71. self.input = AsyncStream.makeStream()
  72. self.connector = connector
  73. self.backoff = ConnectionBackoff(
  74. initial: config.backoff.initial,
  75. max: config.backoff.max,
  76. multiplier: config.backoff.multiplier,
  77. jitter: config.backoff.jitter
  78. )
  79. self.defaultCompression = config.compression.algorithm
  80. self.enabledCompression = config.compression.enabledAlgorithms
  81. self.defaultServiceConfig = defaultServiceConfig
  82. let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
  83. self._retryThrottle = _LockedValueBox(throttle)
  84. let methodConfig = _MethodConfigs(serviceConfig: defaultServiceConfig)
  85. self._methodConfig = _LockedValueBox(methodConfig)
  86. }
  87. /// The connectivity state of the channel.
  88. var connectivityState: AsyncStream<ConnectivityState> {
  89. self._connectivityState.stream
  90. }
  91. /// Returns a throttle which gRPC uses to determine whether retries can be executed.
  92. public var retryThrottle: RetryThrottle? {
  93. self._retryThrottle.withLockedValue { $0 }
  94. }
  95. /// Returns the configuration for a given method.
  96. ///
  97. /// - Parameter descriptor: The method to lookup configuration for.
  98. /// - Returns: Configuration for the method, if it exists.
  99. public func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
  100. self._methodConfig.withLockedValue { $0[descriptor] }
  101. }
  102. /// Establishes and maintains a connection to the remote destination.
  103. public func connect() async {
  104. self.state.withLockedValue { $0.start() }
  105. self._connectivityState.continuation.yield(.idle)
  106. await withDiscardingTaskGroup { group in
  107. var iterator: Optional<RPCAsyncSequence<NameResolutionResult>.AsyncIterator>
  108. // The resolver can either push or pull values. If it pushes values the channel should
  109. // listen for new results. Otherwise the channel will pull values as and when necessary.
  110. switch self.resolver.updateMode.value {
  111. case .push:
  112. iterator = nil
  113. let handle = group.addCancellableTask {
  114. do {
  115. for try await result in self.resolver.names {
  116. self.input.continuation.yield(.handleResolutionResult(result))
  117. }
  118. self.close()
  119. } catch {
  120. self.close()
  121. }
  122. }
  123. // When the channel is closed gracefully, the task group running the load balancer mustn't
  124. // be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
  125. // continue indefinitely. Store its handle and cancel it on close when closing the channel.
  126. self.state.withLockedValue { state in
  127. state.setNameResolverTaskHandle(handle)
  128. }
  129. case .pull:
  130. iterator = self.resolver.names.makeAsyncIterator()
  131. await self.resolve(iterator: &iterator, in: &group)
  132. }
  133. // Resolver is setup, start handling events.
  134. for await input in self.input.stream {
  135. switch input {
  136. case .close:
  137. self.handleClose(in: &group)
  138. case .handleResolutionResult(let result):
  139. self.handleNameResolutionResult(result, in: &group)
  140. case .handleLoadBalancerEvent(let event, let id):
  141. await self.handleLoadBalancerEvent(
  142. event,
  143. loadBalancerID: id,
  144. in: &group,
  145. iterator: &iterator
  146. )
  147. }
  148. }
  149. }
  150. if Task.isCancelled {
  151. self._connectivityState.continuation.finish()
  152. }
  153. }
  154. /// Signal to the transport that no new streams may be created and that connections should be
  155. /// closed when all streams are closed.
  156. public func close() {
  157. self.input.continuation.yield(.close)
  158. }
  159. /// Opens a stream using the transport, and uses it as input into a user-provided closure.
  160. public func withStream<T>(
  161. descriptor: MethodDescriptor,
  162. options: CallOptions,
  163. _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
  164. ) async throws -> T {
  165. // Merge options from the call with those from the service config.
  166. let methodConfig = self.configuration(forMethod: descriptor)
  167. var options = options
  168. options.formUnion(with: methodConfig)
  169. for attempt in 1 ... Self.maxStreamCreationAttempts {
  170. switch await self.makeStream(descriptor: descriptor, options: options) {
  171. case .created(let stream):
  172. return try await stream.execute { inbound, outbound in
  173. let rpcStream = RPCStream(
  174. descriptor: stream.descriptor,
  175. inbound: RPCAsyncSequence(wrapping: inbound),
  176. outbound: RPCWriter.Closable(wrapping: outbound)
  177. )
  178. return try await closure(rpcStream)
  179. }
  180. case .tryAgain(let error):
  181. if error is CancellationError || attempt == Self.maxStreamCreationAttempts {
  182. throw error
  183. } else {
  184. continue
  185. }
  186. case .stopTrying(let error):
  187. throw error
  188. }
  189. }
  190. fatalError("Internal inconsistency")
  191. }
  192. }
  193. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  194. extension GRPCChannel {
  195. @_spi(Package)
  196. public struct Config: Sendable {
  197. /// Configuration for HTTP/2 connections.
  198. var http2: HTTP2ClientTransport.Config.HTTP2
  199. /// Configuration for backoff used when establishing a connection.
  200. var backoff: HTTP2ClientTransport.Config.Backoff
  201. /// Configuration for dealing with idle connections.
  202. var idle: HTTP2ClientTransport.Config.Idle?
  203. /// Configuration for keepalive.
  204. var keepalive: HTTP2ClientTransport.Config.Keepalive?
  205. /// Compression configuration.
  206. var compression: HTTP2ClientTransport.Config.Compression
  207. @_spi(Package)
  208. public init(
  209. http2: HTTP2ClientTransport.Config.HTTP2,
  210. backoff: HTTP2ClientTransport.Config.Backoff,
  211. idle: HTTP2ClientTransport.Config.Idle?,
  212. keepalive: HTTP2ClientTransport.Config.Keepalive?,
  213. compression: HTTP2ClientTransport.Config.Compression
  214. ) {
  215. self.http2 = http2
  216. self.backoff = backoff
  217. self.idle = idle
  218. self.keepalive = keepalive
  219. self.compression = compression
  220. }
  221. }
  222. }
  223. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  224. extension GRPCChannel {
  225. enum MakeStreamResult {
  226. /// A stream was created, use it.
  227. case created(Connection.Stream)
  228. /// An error occurred while trying to create a stream, try again if possible.
  229. case tryAgain(Error)
  230. /// An unrecoverable error occurred (e.g. the channel is closed), fail the RPC and don't retry.
  231. case stopTrying(Error)
  232. }
  233. private func makeStream(
  234. descriptor: MethodDescriptor,
  235. options: CallOptions
  236. ) async -> MakeStreamResult {
  237. let waitForReady = options.waitForReady ?? true
  238. switch self.state.withLockedValue({ $0.makeStream(waitForReady: waitForReady) }) {
  239. case .useLoadBalancer(let loadBalancer):
  240. return await self.makeStream(
  241. descriptor: descriptor,
  242. options: options,
  243. loadBalancer: loadBalancer
  244. )
  245. case .joinQueue:
  246. do {
  247. let loadBalancer = try await self.enqueue(waitForReady: waitForReady)
  248. return await self.makeStream(
  249. descriptor: descriptor,
  250. options: options,
  251. loadBalancer: loadBalancer
  252. )
  253. } catch {
  254. // All errors from enqueue are non-recoverable: either the channel is shutting down or
  255. // the request has been cancelled.
  256. return .stopTrying(error)
  257. }
  258. case .failRPC:
  259. return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
  260. }
  261. }
  262. private func makeStream(
  263. descriptor: MethodDescriptor,
  264. options: CallOptions,
  265. loadBalancer: LoadBalancer
  266. ) async -> MakeStreamResult {
  267. guard let subchannel = loadBalancer.pickSubchannel() else {
  268. return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
  269. }
  270. let methodConfig = self.configuration(forMethod: descriptor)
  271. var options = options
  272. options.formUnion(with: methodConfig)
  273. do {
  274. let stream = try await subchannel.makeStream(descriptor: descriptor, options: options)
  275. return .created(stream)
  276. } catch {
  277. return .tryAgain(error)
  278. }
  279. }
  280. private func enqueue(waitForReady: Bool) async throws -> LoadBalancer {
  281. let id = QueueEntryID()
  282. return try await withTaskCancellationHandler {
  283. try await withCheckedThrowingContinuation { continuation in
  284. if Task.isCancelled {
  285. continuation.resume(throwing: CancellationError())
  286. return
  287. }
  288. let enqueued = self.state.withLockedValue { state in
  289. state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
  290. }
  291. // Not enqueued because the channel is shutdown or shutting down.
  292. if !enqueued {
  293. let error = RPCError(code: .unavailable, message: "channel is shutdown")
  294. continuation.resume(throwing: error)
  295. }
  296. }
  297. } onCancel: {
  298. let continuation = self.state.withLockedValue { state in
  299. state.dequeueContinuation(id: id)
  300. }
  301. continuation?.resume(throwing: CancellationError())
  302. }
  303. }
  304. }
  305. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  306. extension GRPCChannel {
  307. private func handleClose(in group: inout DiscardingTaskGroup) {
  308. switch self.state.withLockedValue({ $0.close() }) {
  309. case .close(let current, let next, let resolver):
  310. resolver?.cancel()
  311. current.close()
  312. next?.close()
  313. self._connectivityState.continuation.yield(.shutdown)
  314. case .cancelAll(let continuations):
  315. for continuation in continuations {
  316. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  317. }
  318. self._connectivityState.continuation.yield(.shutdown)
  319. group.cancelAll()
  320. case .none:
  321. ()
  322. }
  323. }
  324. private func handleNameResolutionResult(
  325. _ result: NameResolutionResult,
  326. in group: inout DiscardingTaskGroup
  327. ) {
  328. // Ignore empty endpoint lists.
  329. if result.endpoints.isEmpty { return }
  330. switch result.serviceConfig ?? .success(self.defaultServiceConfig) {
  331. case .success(let config):
  332. // Update per RPC configuration.
  333. let methodConfig = _MethodConfigs(serviceConfig: config)
  334. self._methodConfig.withLockedValue { $0 = methodConfig }
  335. let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
  336. self._retryThrottle.withLockedValue { $0 = retryThrottle }
  337. // Update the load balancer.
  338. self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
  339. case .failure:
  340. self.close()
  341. }
  342. }
  343. private func updateLoadBalancer(
  344. serviceConfig: ServiceConfig,
  345. endpoints: [Endpoint],
  346. in group: inout DiscardingTaskGroup
  347. ) {
  348. // Pick the first applicable policy, else fallback to pick-first.
  349. for policy in serviceConfig.loadBalancingConfig {
  350. let onUpdatePolicy: GRPCChannel.StateMachine.OnChangeLoadBalancer
  351. if policy.roundRobin != nil {
  352. onUpdatePolicy = self.state.withLockedValue { state in
  353. state.changeLoadBalancerKind(to: .roundRobin) {
  354. let loadBalancer = RoundRobinLoadBalancer(
  355. connector: self.connector,
  356. backoff: self.backoff,
  357. defaultCompression: self.defaultCompression,
  358. enabledCompression: self.enabledCompression
  359. )
  360. return .roundRobin(loadBalancer)
  361. }
  362. }
  363. } else if policy.pickFirst != nil {
  364. fatalError("TODO: use pick-first when supported")
  365. } else {
  366. // Policy isn't known, ignore it.
  367. continue
  368. }
  369. self.handleLoadBalancerChange(onUpdatePolicy, endpoints: endpoints, in: &group)
  370. return
  371. }
  372. // No suitable config was found, fallback to pick-first.
  373. fatalError("TODO: use pick-first when supported")
  374. }
  375. private func handleLoadBalancerChange(
  376. _ update: StateMachine.OnChangeLoadBalancer,
  377. endpoints: [Endpoint],
  378. in group: inout DiscardingTaskGroup
  379. ) {
  380. switch update {
  381. case .runLoadBalancer(let new, let old):
  382. old?.close()
  383. new.updateAddresses(endpoints)
  384. group.addTask {
  385. await new.run()
  386. }
  387. group.addTask {
  388. for await event in new.events {
  389. self.input.continuation.yield(.handleLoadBalancerEvent(event, new.id))
  390. }
  391. }
  392. case .updateLoadBalancer(let existing):
  393. existing.updateAddresses(endpoints)
  394. case .none:
  395. ()
  396. }
  397. }
  398. private func handleLoadBalancerEvent(
  399. _ event: LoadBalancerEvent,
  400. loadBalancerID: LoadBalancerID,
  401. in group: inout DiscardingTaskGroup,
  402. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?
  403. ) async {
  404. switch event {
  405. case .connectivityStateChanged(let connectivityState):
  406. let actions = self.state.withLockedValue { state in
  407. state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
  408. }
  409. if let newState = actions.publishState {
  410. self._connectivityState.continuation.yield(newState)
  411. }
  412. if let subchannel = actions.close {
  413. subchannel.close()
  414. }
  415. if let resumable = actions.resumeContinuations {
  416. for continuation in resumable.continuations {
  417. continuation.resume(with: resumable.result)
  418. }
  419. }
  420. if actions.finish {
  421. // Fully closed.
  422. self._connectivityState.continuation.finish()
  423. self.input.continuation.finish()
  424. }
  425. case .requiresNameResolution:
  426. await self.resolve(iterator: &iterator, in: &group)
  427. }
  428. }
  429. private func resolve(
  430. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?,
  431. in group: inout DiscardingTaskGroup
  432. ) async {
  433. guard var iterator = iterator else { return }
  434. do {
  435. if let result = try await iterator.next() {
  436. self.handleNameResolutionResult(result, in: &group)
  437. } else {
  438. self.close()
  439. }
  440. } catch {
  441. self.close()
  442. }
  443. }
  444. }
  445. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  446. extension GRPCChannel {
  447. struct StateMachine {
  448. enum State {
  449. case notRunning(NotRunning)
  450. case running(Running)
  451. case stopping(Stopping)
  452. case stopped
  453. case _modifying
  454. struct NotRunning {
  455. /// Queue of requests waiting for a load-balancer.
  456. var queue: RequestQueue
  457. /// A handle to the name resolver task.
  458. var nameResolverHandle: CancellableTaskHandle?
  459. init() {
  460. self.queue = RequestQueue()
  461. }
  462. }
  463. struct Running {
  464. /// The connectivity state of the channel.
  465. var connectivityState: ConnectivityState
  466. /// The load-balancer currently in use.
  467. var current: LoadBalancer
  468. /// The next load-balancer to use. This will be promoted to `current` when it enters the
  469. /// ready state.
  470. var next: LoadBalancer?
  471. /// Previously created load-balancers which are in the process of shutting down.
  472. var past: [LoadBalancerID: LoadBalancer]
  473. /// Queue of requests wait for a load-balancer.
  474. var queue: RequestQueue
  475. /// A handle to the name resolver task.
  476. var nameResolverHandle: CancellableTaskHandle?
  477. init(
  478. from state: NotRunning,
  479. loadBalancer: LoadBalancer
  480. ) {
  481. self.connectivityState = .idle
  482. self.current = loadBalancer
  483. self.next = nil
  484. self.past = [:]
  485. self.queue = state.queue
  486. self.nameResolverHandle = state.nameResolverHandle
  487. }
  488. }
  489. struct Stopping {
  490. /// Previously created load-balancers which are in the process of shutting down.
  491. var past: [LoadBalancerID: LoadBalancer]
  492. init(from state: Running) {
  493. self.past = state.past
  494. }
  495. init(loadBalancers: [LoadBalancerID: LoadBalancer]) {
  496. self.past = loadBalancers
  497. }
  498. }
  499. }
  500. /// The current state.
  501. private var state: State
  502. /// Whether the channel is running.
  503. private var running: Bool
  504. init() {
  505. self.state = .notRunning(State.NotRunning())
  506. self.running = false
  507. }
  508. }
  509. }
  510. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  511. extension GRPCChannel.StateMachine {
  512. mutating func start() {
  513. precondition(!self.running, "channel must only be started once")
  514. self.running = true
  515. }
  516. mutating func setNameResolverTaskHandle(_ handle: CancellableTaskHandle) {
  517. switch self.state {
  518. case .notRunning(var state):
  519. state.nameResolverHandle = handle
  520. self.state = .notRunning(state)
  521. case .running, .stopping, .stopped, ._modifying:
  522. fatalError("Invalid state")
  523. }
  524. }
  525. enum LoadBalancerKind {
  526. case roundRobin
  527. func matches(loadBalancer: LoadBalancer) -> Bool {
  528. switch (self, loadBalancer) {
  529. case (.roundRobin, .roundRobin):
  530. return true
  531. }
  532. }
  533. }
  534. enum OnChangeLoadBalancer {
  535. case runLoadBalancer(LoadBalancer, stop: LoadBalancer?)
  536. case updateLoadBalancer(LoadBalancer)
  537. case none
  538. }
  539. mutating func changeLoadBalancerKind(
  540. to newLoadBalancerKind: LoadBalancerKind,
  541. _ makeLoadBalancer: () -> LoadBalancer
  542. ) -> OnChangeLoadBalancer {
  543. let onChangeLoadBalancer: OnChangeLoadBalancer
  544. switch self.state {
  545. case .notRunning(let state):
  546. let loadBalancer = makeLoadBalancer()
  547. let state = State.Running(from: state, loadBalancer: loadBalancer)
  548. self.state = .running(state)
  549. onChangeLoadBalancer = .runLoadBalancer(state.current, stop: nil)
  550. case .running(var state):
  551. self.state = ._modifying
  552. if let next = state.next {
  553. if newLoadBalancerKind.matches(loadBalancer: next) {
  554. onChangeLoadBalancer = .updateLoadBalancer(next)
  555. } else {
  556. // The 'next' didn't become ready in time. Close it and replace it with a load-balancer
  557. // of the next kind.
  558. let nextNext = makeLoadBalancer()
  559. let previous = state.next
  560. state.next = nextNext
  561. state.past[next.id] = next
  562. onChangeLoadBalancer = .runLoadBalancer(nextNext, stop: previous)
  563. }
  564. } else {
  565. if newLoadBalancerKind.matches(loadBalancer: state.current) {
  566. onChangeLoadBalancer = .updateLoadBalancer(state.current)
  567. } else {
  568. // Create the 'next' load-balancer, it'll replace 'current' when it becomes ready.
  569. let next = makeLoadBalancer()
  570. state.next = next
  571. onChangeLoadBalancer = .runLoadBalancer(next, stop: nil)
  572. }
  573. }
  574. self.state = .running(state)
  575. case .stopping, .stopped:
  576. onChangeLoadBalancer = .none
  577. case ._modifying:
  578. fatalError("Invalid state")
  579. }
  580. return onChangeLoadBalancer
  581. }
  582. struct ConnectivityStateChangeActions {
  583. var close: LoadBalancer? = nil
  584. var publishState: ConnectivityState? = nil
  585. var resumeContinuations: ResumableContinuations? = nil
  586. var finish: Bool = false
  587. struct ResumableContinuations {
  588. var continuations: [CheckedContinuation<LoadBalancer, Error>]
  589. var result: Result<LoadBalancer, Error>
  590. }
  591. }
  592. mutating func loadBalancerStateChanged(
  593. to connectivityState: ConnectivityState,
  594. id: LoadBalancerID
  595. ) -> ConnectivityStateChangeActions {
  596. var actions = ConnectivityStateChangeActions()
  597. switch self.state {
  598. case .running(var state):
  599. self.state = ._modifying
  600. if id == state.current.id {
  601. // No change in state, ignore.
  602. if state.connectivityState == connectivityState {
  603. self.state = .running(state)
  604. break
  605. }
  606. state.connectivityState = connectivityState
  607. actions.publishState = connectivityState
  608. switch connectivityState {
  609. case .ready:
  610. // Current load-balancer became ready; resume all continuations in the queue.
  611. let continuations = state.queue.removeAll()
  612. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  613. continuations: continuations,
  614. result: .success(state.current)
  615. )
  616. case .transientFailure, .shutdown: // shutdown includes shutting down
  617. // Current load-balancer failed. Remove all the 'fast-failing' continuations in the
  618. // queue, these are RPCs which set the 'wait for ready' option to false. The rest of
  619. // the entries in the queue will wait for a load-balancer to become ready.
  620. let continuations = state.queue.removeFastFailingEntries()
  621. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  622. continuations: continuations,
  623. result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
  624. )
  625. case .idle, .connecting:
  626. () // Ignore.
  627. }
  628. } else if let next = state.next, next.id == id {
  629. // State change came from the next LB, if it's now ready promote it to be the current.
  630. switch connectivityState {
  631. case .ready:
  632. // Next load-balancer is ready, promote it to current.
  633. let previous = state.current
  634. state.past[previous.id] = previous
  635. state.current = next
  636. state.next = nil
  637. actions.close = previous
  638. if state.connectivityState != connectivityState {
  639. actions.publishState = connectivityState
  640. }
  641. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  642. continuations: state.queue.removeAll(),
  643. result: .success(next)
  644. )
  645. case .idle, .connecting, .transientFailure, .shutdown:
  646. ()
  647. }
  648. }
  649. self.state = .running(state)
  650. case .stopping(var state):
  651. self.state = ._modifying
  652. // Remove the load balancer if it's now shutdown.
  653. switch connectivityState {
  654. case .shutdown:
  655. state.past.removeValue(forKey: id)
  656. case .idle, .connecting, .ready, .transientFailure:
  657. ()
  658. }
  659. // If that was the last load-balancer then finish the input streams so that the channel
  660. // eventually finishes.
  661. if state.past.isEmpty {
  662. actions.finish = true
  663. self.state = .stopped
  664. } else {
  665. self.state = .stopping(state)
  666. }
  667. case .notRunning, .stopped:
  668. ()
  669. case ._modifying:
  670. fatalError("Invalid state")
  671. }
  672. return actions
  673. }
  674. enum OnMakeStream {
  675. /// Use the given load-balancer to make a stream.
  676. case useLoadBalancer(LoadBalancer)
  677. /// Join the queue and wait until a load-balancer becomes ready.
  678. case joinQueue
  679. /// Fail the stream request, the channel isn't in a suitable state.
  680. case failRPC
  681. }
  682. func makeStream(waitForReady: Bool) -> OnMakeStream {
  683. let onMakeStream: OnMakeStream
  684. switch self.state {
  685. case .notRunning:
  686. onMakeStream = .joinQueue
  687. case .running(let state):
  688. switch state.connectivityState {
  689. case .idle, .connecting:
  690. onMakeStream = .joinQueue
  691. case .ready:
  692. onMakeStream = .useLoadBalancer(state.current)
  693. case .transientFailure:
  694. onMakeStream = waitForReady ? .joinQueue : .failRPC
  695. case .shutdown:
  696. onMakeStream = .failRPC
  697. }
  698. case .stopping, .stopped:
  699. onMakeStream = .failRPC
  700. case ._modifying:
  701. fatalError("Invalid state")
  702. }
  703. return onMakeStream
  704. }
  705. mutating func enqueue(
  706. continuation: CheckedContinuation<LoadBalancer, Error>,
  707. waitForReady: Bool,
  708. id: QueueEntryID
  709. ) -> Bool {
  710. switch self.state {
  711. case .notRunning(var state):
  712. self.state = ._modifying
  713. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  714. self.state = .notRunning(state)
  715. return true
  716. case .running(var state):
  717. self.state = ._modifying
  718. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  719. self.state = .running(state)
  720. return true
  721. case .stopping, .stopped:
  722. return false
  723. case ._modifying:
  724. fatalError("Invalid state")
  725. }
  726. }
  727. mutating func dequeueContinuation(
  728. id: QueueEntryID
  729. ) -> CheckedContinuation<LoadBalancer, Error>? {
  730. switch self.state {
  731. case .notRunning(var state):
  732. self.state = ._modifying
  733. let continuation = state.queue.removeEntry(withID: id)
  734. self.state = .notRunning(state)
  735. return continuation
  736. case .running(var state):
  737. self.state = ._modifying
  738. let continuation = state.queue.removeEntry(withID: id)
  739. self.state = .running(state)
  740. return continuation
  741. case .stopping, .stopped:
  742. return nil
  743. case ._modifying:
  744. fatalError("Invalid state")
  745. }
  746. }
  747. enum OnClose {
  748. case none
  749. case cancelAll([RequestQueue.Continuation])
  750. case close(LoadBalancer, LoadBalancer?, CancellableTaskHandle?)
  751. }
  752. mutating func close() -> OnClose {
  753. let onClose: OnClose
  754. switch self.state {
  755. case .notRunning(var state):
  756. self.state = .stopped
  757. onClose = .cancelAll(state.queue.removeAll())
  758. case .running(var state):
  759. onClose = .close(state.current, state.next, state.nameResolverHandle)
  760. state.past[state.current.id] = state.current
  761. if let next = state.next {
  762. state.past[next.id] = next
  763. }
  764. self.state = .stopping(State.Stopping(loadBalancers: state.past))
  765. case .stopping, .stopped:
  766. onClose = .none
  767. case ._modifying:
  768. fatalError("Invalid state")
  769. }
  770. return onClose
  771. }
  772. }