GRPCChannel.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import DequeModule
  18. @_spi(Package) import GRPCCore
  19. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  20. @_spi(Package)
  21. public struct GRPCChannel: ClientTransport {
  22. private enum Input: Sendable {
  23. /// Close the channel, if possible.
  24. case close
  25. /// Handle the result of a name resolution.
  26. case handleResolutionResult(NameResolutionResult)
  27. /// Handle the event from the underlying connection object.
  28. case handleLoadBalancerEvent(LoadBalancerEvent, LoadBalancerID)
  29. }
  30. /// Events which can happen to the channel.
  31. private let _connectivityState:
  32. (
  33. stream: AsyncStream<ConnectivityState>,
  34. continuation: AsyncStream<ConnectivityState>.Continuation
  35. )
  36. /// Inputs which this channel should react to.
  37. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  38. /// A resolver providing resolved names to the channel.
  39. private let resolver: NameResolver
  40. /// The state of the channel.
  41. private let state: _LockedValueBox<StateMachine>
  42. /// The maximum number of times to attempt to create a stream per RPC.
  43. ///
  44. /// This is the value used by other gRPC implementations.
  45. private static let maxStreamCreationAttempts = 5
  46. /// A factory for connections.
  47. private let connector: any HTTP2Connector
  48. /// The connection backoff configuration used by the subchannel when establishing a connection.
  49. private let backoff: ConnectionBackoff
  50. /// The default compression algorithm used for requests.
  51. private let defaultCompression: CompressionAlgorithm
  52. /// The set of enabled compression algorithms.
  53. private let enabledCompression: CompressionAlgorithmSet
  54. /// The default service config to use.
  55. ///
  56. /// Used when the resolver doesn't provide one.
  57. private let defaultServiceConfig: ServiceConfig
  58. // These are both read frequently and updated infrequently so may be a bottleneck.
  59. private let _methodConfig: _LockedValueBox<_MethodConfigs>
  60. private let _retryThrottle: _LockedValueBox<RetryThrottle?>
  61. @_spi(Package)
  62. public init(
  63. resolver: NameResolver,
  64. connector: any HTTP2Connector,
  65. config: Config,
  66. defaultServiceConfig: ServiceConfig
  67. ) {
  68. self.resolver = resolver
  69. self.state = _LockedValueBox(StateMachine())
  70. self._connectivityState = AsyncStream.makeStream()
  71. self.input = AsyncStream.makeStream()
  72. self.connector = connector
  73. self.backoff = ConnectionBackoff(
  74. initial: config.backoff.initial,
  75. max: config.backoff.max,
  76. multiplier: config.backoff.multiplier,
  77. jitter: config.backoff.jitter
  78. )
  79. self.defaultCompression = config.compression.algorithm
  80. self.enabledCompression = config.compression.enabledAlgorithms
  81. self.defaultServiceConfig = defaultServiceConfig
  82. let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
  83. self._retryThrottle = _LockedValueBox(throttle)
  84. let methodConfig = _MethodConfigs(serviceConfig: defaultServiceConfig)
  85. self._methodConfig = _LockedValueBox(methodConfig)
  86. }
  87. /// The connectivity state of the channel.
  88. var connectivityState: AsyncStream<ConnectivityState> {
  89. self._connectivityState.stream
  90. }
  91. /// Returns a throttle which gRPC uses to determine whether retries can be executed.
  92. public var retryThrottle: RetryThrottle? {
  93. self._retryThrottle.withLockedValue { $0 }
  94. }
  95. /// Returns the configuration for a given method.
  96. ///
  97. /// - Parameter descriptor: The method to lookup configuration for.
  98. /// - Returns: Configuration for the method, if it exists.
  99. public func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
  100. self._methodConfig.withLockedValue { $0[descriptor] }
  101. }
  102. /// Establishes and maintains a connection to the remote destination.
  103. public func connect() async {
  104. self.state.withLockedValue { $0.start() }
  105. self._connectivityState.continuation.yield(.idle)
  106. await withDiscardingTaskGroup { group in
  107. var iterator: Optional<RPCAsyncSequence<NameResolutionResult>.AsyncIterator>
  108. // The resolver can either push or pull values. If it pushes values the channel should
  109. // listen for new results. Otherwise the channel will pull values as and when necessary.
  110. switch self.resolver.updateMode.value {
  111. case .push:
  112. iterator = nil
  113. let handle = group.addCancellableTask {
  114. do {
  115. for try await result in self.resolver.names {
  116. self.input.continuation.yield(.handleResolutionResult(result))
  117. }
  118. self.close()
  119. } catch {
  120. self.close()
  121. }
  122. }
  123. // When the channel is closed gracefully, the task group running the load balancer mustn't
  124. // be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
  125. // continue indefinitely. Store its handle and cancel it on close when closing the channel.
  126. self.state.withLockedValue { state in
  127. state.setNameResolverTaskHandle(handle)
  128. }
  129. case .pull:
  130. iterator = self.resolver.names.makeAsyncIterator()
  131. await self.resolve(iterator: &iterator, in: &group)
  132. }
  133. // Resolver is setup, start handling events.
  134. for await input in self.input.stream {
  135. switch input {
  136. case .close:
  137. self.handleClose(in: &group)
  138. case .handleResolutionResult(let result):
  139. self.handleNameResolutionResult(result, in: &group)
  140. case .handleLoadBalancerEvent(let event, let id):
  141. await self.handleLoadBalancerEvent(
  142. event,
  143. loadBalancerID: id,
  144. in: &group,
  145. iterator: &iterator
  146. )
  147. }
  148. }
  149. }
  150. if Task.isCancelled {
  151. self._connectivityState.continuation.finish()
  152. }
  153. }
  154. /// Signal to the transport that no new streams may be created and that connections should be
  155. /// closed when all streams are closed.
  156. public func close() {
  157. self.input.continuation.yield(.close)
  158. }
  159. /// Opens a stream using the transport, and uses it as input into a user-provided closure.
  160. public func withStream<T>(
  161. descriptor: MethodDescriptor,
  162. options: CallOptions,
  163. _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
  164. ) async throws -> T {
  165. // Merge options from the call with those from the service config.
  166. let methodConfig = self.configuration(forMethod: descriptor)
  167. var options = options
  168. options.formUnion(with: methodConfig)
  169. for attempt in 1 ... Self.maxStreamCreationAttempts {
  170. switch await self.makeStream(descriptor: descriptor, options: options) {
  171. case .created(let stream):
  172. return try await stream.execute { inbound, outbound in
  173. let rpcStream = RPCStream(
  174. descriptor: stream.descriptor,
  175. inbound: RPCAsyncSequence(wrapping: inbound),
  176. outbound: RPCWriter.Closable(wrapping: outbound)
  177. )
  178. return try await closure(rpcStream)
  179. }
  180. case .tryAgain(let error):
  181. if error is CancellationError || attempt == Self.maxStreamCreationAttempts {
  182. throw error
  183. } else {
  184. continue
  185. }
  186. case .stopTrying(let error):
  187. throw error
  188. }
  189. }
  190. fatalError("Internal inconsistency")
  191. }
  192. }
  193. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  194. extension GRPCChannel {
  195. @_spi(Package)
  196. public struct Config: Sendable {
  197. /// Configuration for HTTP/2 connections.
  198. var http2: HTTP2ClientTransport.Config.HTTP2
  199. /// Configuration for backoff used when establishing a connection.
  200. var backoff: HTTP2ClientTransport.Config.Backoff
  201. /// Configuration for connection management.
  202. var connection: HTTP2ClientTransport.Config.Connection
  203. /// Compression configuration.
  204. var compression: HTTP2ClientTransport.Config.Compression
  205. @_spi(Package)
  206. public init(
  207. http2: HTTP2ClientTransport.Config.HTTP2,
  208. backoff: HTTP2ClientTransport.Config.Backoff,
  209. connection: HTTP2ClientTransport.Config.Connection,
  210. compression: HTTP2ClientTransport.Config.Compression
  211. ) {
  212. self.http2 = http2
  213. self.backoff = backoff
  214. self.connection = connection
  215. self.compression = compression
  216. }
  217. }
  218. }
  219. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  220. extension GRPCChannel {
  221. enum MakeStreamResult {
  222. /// A stream was created, use it.
  223. case created(Connection.Stream)
  224. /// An error occurred while trying to create a stream, try again if possible.
  225. case tryAgain(Error)
  226. /// An unrecoverable error occurred (e.g. the channel is closed), fail the RPC and don't retry.
  227. case stopTrying(Error)
  228. }
  229. private func makeStream(
  230. descriptor: MethodDescriptor,
  231. options: CallOptions
  232. ) async -> MakeStreamResult {
  233. let waitForReady = options.waitForReady ?? true
  234. switch self.state.withLockedValue({ $0.makeStream(waitForReady: waitForReady) }) {
  235. case .useLoadBalancer(let loadBalancer):
  236. return await self.makeStream(
  237. descriptor: descriptor,
  238. options: options,
  239. loadBalancer: loadBalancer
  240. )
  241. case .joinQueue:
  242. do {
  243. let loadBalancer = try await self.enqueue(waitForReady: waitForReady)
  244. return await self.makeStream(
  245. descriptor: descriptor,
  246. options: options,
  247. loadBalancer: loadBalancer
  248. )
  249. } catch {
  250. // All errors from enqueue are non-recoverable: either the channel is shutting down or
  251. // the request has been cancelled.
  252. return .stopTrying(error)
  253. }
  254. case .failRPC:
  255. return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
  256. }
  257. }
  258. private func makeStream(
  259. descriptor: MethodDescriptor,
  260. options: CallOptions,
  261. loadBalancer: LoadBalancer
  262. ) async -> MakeStreamResult {
  263. guard let subchannel = loadBalancer.pickSubchannel() else {
  264. return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
  265. }
  266. let methodConfig = self.configuration(forMethod: descriptor)
  267. var options = options
  268. options.formUnion(with: methodConfig)
  269. do {
  270. let stream = try await subchannel.makeStream(descriptor: descriptor, options: options)
  271. return .created(stream)
  272. } catch {
  273. return .tryAgain(error)
  274. }
  275. }
  276. private func enqueue(waitForReady: Bool) async throws -> LoadBalancer {
  277. let id = QueueEntryID()
  278. return try await withTaskCancellationHandler {
  279. try await withCheckedThrowingContinuation { continuation in
  280. if Task.isCancelled {
  281. continuation.resume(throwing: CancellationError())
  282. return
  283. }
  284. let enqueued = self.state.withLockedValue { state in
  285. state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
  286. }
  287. // Not enqueued because the channel is shutdown or shutting down.
  288. if !enqueued {
  289. let error = RPCError(code: .unavailable, message: "channel is shutdown")
  290. continuation.resume(throwing: error)
  291. }
  292. }
  293. } onCancel: {
  294. let continuation = self.state.withLockedValue { state in
  295. state.dequeueContinuation(id: id)
  296. }
  297. continuation?.resume(throwing: CancellationError())
  298. }
  299. }
  300. }
  301. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  302. extension GRPCChannel {
  303. private func handleClose(in group: inout DiscardingTaskGroup) {
  304. switch self.state.withLockedValue({ $0.close() }) {
  305. case .close(let current, let next, let resolver):
  306. resolver?.cancel()
  307. current.close()
  308. next?.close()
  309. self._connectivityState.continuation.yield(.shutdown)
  310. case .cancelAll(let continuations):
  311. for continuation in continuations {
  312. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  313. }
  314. self._connectivityState.continuation.yield(.shutdown)
  315. group.cancelAll()
  316. case .none:
  317. ()
  318. }
  319. }
  320. private func handleNameResolutionResult(
  321. _ result: NameResolutionResult,
  322. in group: inout DiscardingTaskGroup
  323. ) {
  324. // Ignore empty endpoint lists.
  325. if result.endpoints.isEmpty { return }
  326. switch result.serviceConfig ?? .success(self.defaultServiceConfig) {
  327. case .success(let config):
  328. // Update per RPC configuration.
  329. let methodConfig = _MethodConfigs(serviceConfig: config)
  330. self._methodConfig.withLockedValue { $0 = methodConfig }
  331. let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
  332. self._retryThrottle.withLockedValue { $0 = retryThrottle }
  333. // Update the load balancer.
  334. self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
  335. case .failure:
  336. self.close()
  337. }
  338. }
  339. private func updateLoadBalancer(
  340. serviceConfig: ServiceConfig,
  341. endpoints: [Endpoint],
  342. in group: inout DiscardingTaskGroup
  343. ) {
  344. // Pick the first applicable policy, else fallback to pick-first.
  345. for policy in serviceConfig.loadBalancingConfig {
  346. let onUpdatePolicy: GRPCChannel.StateMachine.OnChangeLoadBalancer
  347. if policy.roundRobin != nil {
  348. onUpdatePolicy = self.state.withLockedValue { state in
  349. state.changeLoadBalancerKind(to: .roundRobin) {
  350. let loadBalancer = RoundRobinLoadBalancer(
  351. connector: self.connector,
  352. backoff: self.backoff,
  353. defaultCompression: self.defaultCompression,
  354. enabledCompression: self.enabledCompression
  355. )
  356. return .roundRobin(loadBalancer)
  357. }
  358. }
  359. } else if policy.pickFirst != nil {
  360. fatalError("TODO: use pick-first when supported")
  361. } else {
  362. // Policy isn't known, ignore it.
  363. continue
  364. }
  365. self.handleLoadBalancerChange(onUpdatePolicy, endpoints: endpoints, in: &group)
  366. return
  367. }
  368. // No suitable config was found, fallback to pick-first.
  369. fatalError("TODO: use pick-first when supported")
  370. }
  371. private func handleLoadBalancerChange(
  372. _ update: StateMachine.OnChangeLoadBalancer,
  373. endpoints: [Endpoint],
  374. in group: inout DiscardingTaskGroup
  375. ) {
  376. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  377. switch update {
  378. case .runLoadBalancer(let new, let old):
  379. old?.close()
  380. switch new {
  381. case .roundRobin(let loadBalancer):
  382. loadBalancer.updateAddresses(endpoints)
  383. case .pickFirst(let loadBalancer):
  384. loadBalancer.updateEndpoint(endpoints.first!)
  385. }
  386. group.addTask {
  387. await new.run()
  388. }
  389. group.addTask {
  390. for await event in new.events {
  391. self.input.continuation.yield(.handleLoadBalancerEvent(event, new.id))
  392. }
  393. }
  394. case .updateLoadBalancer(let existing):
  395. switch existing {
  396. case .roundRobin(let loadBalancer):
  397. loadBalancer.updateAddresses(endpoints)
  398. case .pickFirst(let loadBalancer):
  399. loadBalancer.updateEndpoint(endpoints.first!)
  400. }
  401. case .none:
  402. ()
  403. }
  404. }
  405. private func handleLoadBalancerEvent(
  406. _ event: LoadBalancerEvent,
  407. loadBalancerID: LoadBalancerID,
  408. in group: inout DiscardingTaskGroup,
  409. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?
  410. ) async {
  411. switch event {
  412. case .connectivityStateChanged(let connectivityState):
  413. let actions = self.state.withLockedValue { state in
  414. state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
  415. }
  416. if let newState = actions.publishState {
  417. self._connectivityState.continuation.yield(newState)
  418. }
  419. if let subchannel = actions.close {
  420. subchannel.close()
  421. }
  422. if let resumable = actions.resumeContinuations {
  423. for continuation in resumable.continuations {
  424. continuation.resume(with: resumable.result)
  425. }
  426. }
  427. if actions.finish {
  428. // Fully closed.
  429. self._connectivityState.continuation.finish()
  430. self.input.continuation.finish()
  431. }
  432. case .requiresNameResolution:
  433. await self.resolve(iterator: &iterator, in: &group)
  434. }
  435. }
  436. private func resolve(
  437. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?,
  438. in group: inout DiscardingTaskGroup
  439. ) async {
  440. guard var iterator = iterator else { return }
  441. do {
  442. if let result = try await iterator.next() {
  443. self.handleNameResolutionResult(result, in: &group)
  444. } else {
  445. self.close()
  446. }
  447. } catch {
  448. self.close()
  449. }
  450. }
  451. }
  452. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  453. extension GRPCChannel {
  454. struct StateMachine {
  455. enum State {
  456. case notRunning(NotRunning)
  457. case running(Running)
  458. case stopping(Stopping)
  459. case stopped
  460. case _modifying
  461. struct NotRunning {
  462. /// Queue of requests waiting for a load-balancer.
  463. var queue: RequestQueue
  464. /// A handle to the name resolver task.
  465. var nameResolverHandle: CancellableTaskHandle?
  466. init() {
  467. self.queue = RequestQueue()
  468. }
  469. }
  470. struct Running {
  471. /// The connectivity state of the channel.
  472. var connectivityState: ConnectivityState
  473. /// The load-balancer currently in use.
  474. var current: LoadBalancer
  475. /// The next load-balancer to use. This will be promoted to `current` when it enters the
  476. /// ready state.
  477. var next: LoadBalancer?
  478. /// Previously created load-balancers which are in the process of shutting down.
  479. var past: [LoadBalancerID: LoadBalancer]
  480. /// Queue of requests wait for a load-balancer.
  481. var queue: RequestQueue
  482. /// A handle to the name resolver task.
  483. var nameResolverHandle: CancellableTaskHandle?
  484. init(
  485. from state: NotRunning,
  486. loadBalancer: LoadBalancer
  487. ) {
  488. self.connectivityState = .idle
  489. self.current = loadBalancer
  490. self.next = nil
  491. self.past = [:]
  492. self.queue = state.queue
  493. self.nameResolverHandle = state.nameResolverHandle
  494. }
  495. }
  496. struct Stopping {
  497. /// Previously created load-balancers which are in the process of shutting down.
  498. var past: [LoadBalancerID: LoadBalancer]
  499. init(from state: Running) {
  500. self.past = state.past
  501. }
  502. init(loadBalancers: [LoadBalancerID: LoadBalancer]) {
  503. self.past = loadBalancers
  504. }
  505. }
  506. }
  507. /// The current state.
  508. private var state: State
  509. /// Whether the channel is running.
  510. private var running: Bool
  511. init() {
  512. self.state = .notRunning(State.NotRunning())
  513. self.running = false
  514. }
  515. }
  516. }
  517. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  518. extension GRPCChannel.StateMachine {
  519. mutating func start() {
  520. precondition(!self.running, "channel must only be started once")
  521. self.running = true
  522. }
  523. mutating func setNameResolverTaskHandle(_ handle: CancellableTaskHandle) {
  524. switch self.state {
  525. case .notRunning(var state):
  526. state.nameResolverHandle = handle
  527. self.state = .notRunning(state)
  528. case .running, .stopping, .stopped, ._modifying:
  529. fatalError("Invalid state")
  530. }
  531. }
  532. enum LoadBalancerKind {
  533. case roundRobin
  534. case pickFirst
  535. func matches(loadBalancer: LoadBalancer) -> Bool {
  536. switch (self, loadBalancer) {
  537. case (.roundRobin, .roundRobin):
  538. return true
  539. case (.pickFirst, .pickFirst):
  540. return true
  541. case (.roundRobin, .pickFirst),
  542. (.pickFirst, .roundRobin):
  543. return false
  544. }
  545. }
  546. }
  547. enum OnChangeLoadBalancer {
  548. case runLoadBalancer(LoadBalancer, stop: LoadBalancer?)
  549. case updateLoadBalancer(LoadBalancer)
  550. case none
  551. }
  552. mutating func changeLoadBalancerKind(
  553. to newLoadBalancerKind: LoadBalancerKind,
  554. _ makeLoadBalancer: () -> LoadBalancer
  555. ) -> OnChangeLoadBalancer {
  556. let onChangeLoadBalancer: OnChangeLoadBalancer
  557. switch self.state {
  558. case .notRunning(let state):
  559. let loadBalancer = makeLoadBalancer()
  560. let state = State.Running(from: state, loadBalancer: loadBalancer)
  561. self.state = .running(state)
  562. onChangeLoadBalancer = .runLoadBalancer(state.current, stop: nil)
  563. case .running(var state):
  564. self.state = ._modifying
  565. if let next = state.next {
  566. if newLoadBalancerKind.matches(loadBalancer: next) {
  567. onChangeLoadBalancer = .updateLoadBalancer(next)
  568. } else {
  569. // The 'next' didn't become ready in time. Close it and replace it with a load-balancer
  570. // of the next kind.
  571. let nextNext = makeLoadBalancer()
  572. let previous = state.next
  573. state.next = nextNext
  574. state.past[next.id] = next
  575. onChangeLoadBalancer = .runLoadBalancer(nextNext, stop: previous)
  576. }
  577. } else {
  578. if newLoadBalancerKind.matches(loadBalancer: state.current) {
  579. onChangeLoadBalancer = .updateLoadBalancer(state.current)
  580. } else {
  581. // Create the 'next' load-balancer, it'll replace 'current' when it becomes ready.
  582. let next = makeLoadBalancer()
  583. state.next = next
  584. onChangeLoadBalancer = .runLoadBalancer(next, stop: nil)
  585. }
  586. }
  587. self.state = .running(state)
  588. case .stopping, .stopped:
  589. onChangeLoadBalancer = .none
  590. case ._modifying:
  591. fatalError("Invalid state")
  592. }
  593. return onChangeLoadBalancer
  594. }
  595. struct ConnectivityStateChangeActions {
  596. var close: LoadBalancer? = nil
  597. var publishState: ConnectivityState? = nil
  598. var resumeContinuations: ResumableContinuations? = nil
  599. var finish: Bool = false
  600. struct ResumableContinuations {
  601. var continuations: [CheckedContinuation<LoadBalancer, Error>]
  602. var result: Result<LoadBalancer, Error>
  603. }
  604. }
  605. mutating func loadBalancerStateChanged(
  606. to connectivityState: ConnectivityState,
  607. id: LoadBalancerID
  608. ) -> ConnectivityStateChangeActions {
  609. var actions = ConnectivityStateChangeActions()
  610. switch self.state {
  611. case .running(var state):
  612. self.state = ._modifying
  613. if id == state.current.id {
  614. // No change in state, ignore.
  615. if state.connectivityState == connectivityState {
  616. self.state = .running(state)
  617. break
  618. }
  619. state.connectivityState = connectivityState
  620. actions.publishState = connectivityState
  621. switch connectivityState {
  622. case .ready:
  623. // Current load-balancer became ready; resume all continuations in the queue.
  624. let continuations = state.queue.removeAll()
  625. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  626. continuations: continuations,
  627. result: .success(state.current)
  628. )
  629. case .transientFailure, .shutdown: // shutdown includes shutting down
  630. // Current load-balancer failed. Remove all the 'fast-failing' continuations in the
  631. // queue, these are RPCs which set the 'wait for ready' option to false. The rest of
  632. // the entries in the queue will wait for a load-balancer to become ready.
  633. let continuations = state.queue.removeFastFailingEntries()
  634. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  635. continuations: continuations,
  636. result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
  637. )
  638. case .idle, .connecting:
  639. () // Ignore.
  640. }
  641. } else if let next = state.next, next.id == id {
  642. // State change came from the next LB, if it's now ready promote it to be the current.
  643. switch connectivityState {
  644. case .ready:
  645. // Next load-balancer is ready, promote it to current.
  646. let previous = state.current
  647. state.past[previous.id] = previous
  648. state.current = next
  649. state.next = nil
  650. actions.close = previous
  651. if state.connectivityState != connectivityState {
  652. actions.publishState = connectivityState
  653. }
  654. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  655. continuations: state.queue.removeAll(),
  656. result: .success(next)
  657. )
  658. case .idle, .connecting, .transientFailure, .shutdown:
  659. ()
  660. }
  661. }
  662. self.state = .running(state)
  663. case .stopping(var state):
  664. self.state = ._modifying
  665. // Remove the load balancer if it's now shutdown.
  666. switch connectivityState {
  667. case .shutdown:
  668. state.past.removeValue(forKey: id)
  669. case .idle, .connecting, .ready, .transientFailure:
  670. ()
  671. }
  672. // If that was the last load-balancer then finish the input streams so that the channel
  673. // eventually finishes.
  674. if state.past.isEmpty {
  675. actions.finish = true
  676. self.state = .stopped
  677. } else {
  678. self.state = .stopping(state)
  679. }
  680. case .notRunning, .stopped:
  681. ()
  682. case ._modifying:
  683. fatalError("Invalid state")
  684. }
  685. return actions
  686. }
  687. enum OnMakeStream {
  688. /// Use the given load-balancer to make a stream.
  689. case useLoadBalancer(LoadBalancer)
  690. /// Join the queue and wait until a load-balancer becomes ready.
  691. case joinQueue
  692. /// Fail the stream request, the channel isn't in a suitable state.
  693. case failRPC
  694. }
  695. func makeStream(waitForReady: Bool) -> OnMakeStream {
  696. let onMakeStream: OnMakeStream
  697. switch self.state {
  698. case .notRunning:
  699. onMakeStream = .joinQueue
  700. case .running(let state):
  701. switch state.connectivityState {
  702. case .idle, .connecting:
  703. onMakeStream = .joinQueue
  704. case .ready:
  705. onMakeStream = .useLoadBalancer(state.current)
  706. case .transientFailure:
  707. onMakeStream = waitForReady ? .joinQueue : .failRPC
  708. case .shutdown:
  709. onMakeStream = .failRPC
  710. }
  711. case .stopping, .stopped:
  712. onMakeStream = .failRPC
  713. case ._modifying:
  714. fatalError("Invalid state")
  715. }
  716. return onMakeStream
  717. }
  718. mutating func enqueue(
  719. continuation: CheckedContinuation<LoadBalancer, Error>,
  720. waitForReady: Bool,
  721. id: QueueEntryID
  722. ) -> Bool {
  723. switch self.state {
  724. case .notRunning(var state):
  725. self.state = ._modifying
  726. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  727. self.state = .notRunning(state)
  728. return true
  729. case .running(var state):
  730. self.state = ._modifying
  731. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  732. self.state = .running(state)
  733. return true
  734. case .stopping, .stopped:
  735. return false
  736. case ._modifying:
  737. fatalError("Invalid state")
  738. }
  739. }
  740. mutating func dequeueContinuation(
  741. id: QueueEntryID
  742. ) -> CheckedContinuation<LoadBalancer, Error>? {
  743. switch self.state {
  744. case .notRunning(var state):
  745. self.state = ._modifying
  746. let continuation = state.queue.removeEntry(withID: id)
  747. self.state = .notRunning(state)
  748. return continuation
  749. case .running(var state):
  750. self.state = ._modifying
  751. let continuation = state.queue.removeEntry(withID: id)
  752. self.state = .running(state)
  753. return continuation
  754. case .stopping, .stopped:
  755. return nil
  756. case ._modifying:
  757. fatalError("Invalid state")
  758. }
  759. }
  760. enum OnClose {
  761. case none
  762. case cancelAll([RequestQueue.Continuation])
  763. case close(LoadBalancer, LoadBalancer?, CancellableTaskHandle?)
  764. }
  765. mutating func close() -> OnClose {
  766. let onClose: OnClose
  767. switch self.state {
  768. case .notRunning(var state):
  769. self.state = .stopped
  770. onClose = .cancelAll(state.queue.removeAll())
  771. case .running(var state):
  772. onClose = .close(state.current, state.next, state.nameResolverHandle)
  773. state.past[state.current.id] = state.current
  774. if let next = state.next {
  775. state.past[next.id] = next
  776. }
  777. self.state = .stopping(State.Stopping(loadBalancers: state.past))
  778. case .stopping, .stopped:
  779. onClose = .none
  780. case ._modifying:
  781. fatalError("Invalid state")
  782. }
  783. return onClose
  784. }
  785. }