GRPCChannel.swift 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import Atomics
  17. internal import DequeModule
  18. package import GRPCCore
  19. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  20. package struct GRPCChannel: ClientTransport {
  21. private enum Input: Sendable {
  22. /// Close the channel, if possible.
  23. case close
  24. /// Handle the result of a name resolution.
  25. case handleResolutionResult(NameResolutionResult)
  26. /// Handle the event from the underlying connection object.
  27. case handleLoadBalancerEvent(LoadBalancerEvent, LoadBalancerID)
  28. }
  29. /// Events which can happen to the channel.
  30. private let _connectivityState:
  31. (
  32. stream: AsyncStream<ConnectivityState>,
  33. continuation: AsyncStream<ConnectivityState>.Continuation
  34. )
  35. /// Inputs which this channel should react to.
  36. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  37. /// A resolver providing resolved names to the channel.
  38. private let resolver: NameResolver
  39. /// The state of the channel.
  40. private let state: LockedValueBox<StateMachine>
  41. /// The maximum number of times to attempt to create a stream per RPC.
  42. ///
  43. /// This is the value used by other gRPC implementations.
  44. private static let maxStreamCreationAttempts = 5
  45. /// A factory for connections.
  46. private let connector: any HTTP2Connector
  47. /// The connection backoff configuration used by the subchannel when establishing a connection.
  48. private let backoff: ConnectionBackoff
  49. /// The default compression algorithm used for requests.
  50. private let defaultCompression: CompressionAlgorithm
  51. /// The set of enabled compression algorithms.
  52. private let enabledCompression: CompressionAlgorithmSet
  53. /// The default service config to use.
  54. ///
  55. /// Used when the resolver doesn't provide one.
  56. private let defaultServiceConfig: ServiceConfig
  57. // These are both read frequently and updated infrequently so may be a bottleneck.
  58. private let _methodConfig: LockedValueBox<MethodConfigs>
  59. private let _retryThrottle: LockedValueBox<RetryThrottle?>
  60. package init(
  61. resolver: NameResolver,
  62. connector: any HTTP2Connector,
  63. config: Config,
  64. defaultServiceConfig: ServiceConfig
  65. ) {
  66. self.resolver = resolver
  67. self.state = LockedValueBox(StateMachine())
  68. self._connectivityState = AsyncStream.makeStream()
  69. self.input = AsyncStream.makeStream()
  70. self.connector = connector
  71. self.backoff = ConnectionBackoff(
  72. initial: config.backoff.initial,
  73. max: config.backoff.max,
  74. multiplier: config.backoff.multiplier,
  75. jitter: config.backoff.jitter
  76. )
  77. self.defaultCompression = config.compression.algorithm
  78. self.enabledCompression = config.compression.enabledAlgorithms
  79. self.defaultServiceConfig = defaultServiceConfig
  80. let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
  81. self._retryThrottle = LockedValueBox(throttle)
  82. let methodConfig = MethodConfigs(serviceConfig: defaultServiceConfig)
  83. self._methodConfig = LockedValueBox(methodConfig)
  84. }
  85. /// The connectivity state of the channel.
  86. package var connectivityState: AsyncStream<ConnectivityState> {
  87. self._connectivityState.stream
  88. }
  89. /// Returns a throttle which gRPC uses to determine whether retries can be executed.
  90. package var retryThrottle: RetryThrottle? {
  91. self._retryThrottle.withLockedValue { $0 }
  92. }
  93. /// Returns the configuration for a given method.
  94. ///
  95. /// - Parameter descriptor: The method to lookup configuration for.
  96. /// - Returns: Configuration for the method, if it exists.
  97. package func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
  98. self._methodConfig.withLockedValue { $0[descriptor] }
  99. }
  100. /// Establishes and maintains a connection to the remote destination.
  101. package func connect() async {
  102. self.state.withLockedValue { $0.start() }
  103. self._connectivityState.continuation.yield(.idle)
  104. await withDiscardingTaskGroup { group in
  105. var iterator: Optional<RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator>
  106. // The resolver can either push or pull values. If it pushes values the channel should
  107. // listen for new results. Otherwise the channel will pull values as and when necessary.
  108. switch self.resolver.updateMode.value {
  109. case .push:
  110. iterator = nil
  111. let handle = group.addCancellableTask {
  112. do {
  113. for try await result in self.resolver.names {
  114. self.input.continuation.yield(.handleResolutionResult(result))
  115. }
  116. self.close()
  117. } catch {
  118. self.close()
  119. }
  120. }
  121. // When the channel is closed gracefully, the task group running the load balancer mustn't
  122. // be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
  123. // continue indefinitely. Store its handle and cancel it on close when closing the channel.
  124. self.state.withLockedValue { state in
  125. state.setNameResolverTaskHandle(handle)
  126. }
  127. case .pull:
  128. iterator = self.resolver.names.makeAsyncIterator()
  129. await self.resolve(iterator: &iterator, in: &group)
  130. }
  131. // Resolver is setup, start handling events.
  132. for await input in self.input.stream {
  133. switch input {
  134. case .close:
  135. self.handleClose(in: &group)
  136. case .handleResolutionResult(let result):
  137. self.handleNameResolutionResult(result, in: &group)
  138. case .handleLoadBalancerEvent(let event, let id):
  139. await self.handleLoadBalancerEvent(
  140. event,
  141. loadBalancerID: id,
  142. in: &group,
  143. iterator: &iterator
  144. )
  145. }
  146. }
  147. }
  148. if Task.isCancelled {
  149. self._connectivityState.continuation.finish()
  150. }
  151. }
  152. /// Signal to the transport that no new streams may be created and that connections should be
  153. /// closed when all streams are closed.
  154. package func close() {
  155. self.input.continuation.yield(.close)
  156. }
  157. /// Opens a stream using the transport, and uses it as input into a user-provided closure.
  158. package func withStream<T: Sendable>(
  159. descriptor: MethodDescriptor,
  160. options: CallOptions,
  161. _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
  162. ) async throws -> T {
  163. // Merge options from the call with those from the service config.
  164. let methodConfig = self.configuration(forMethod: descriptor)
  165. var options = options
  166. options.formUnion(with: methodConfig)
  167. for attempt in 1 ... Self.maxStreamCreationAttempts {
  168. switch await self.makeStream(descriptor: descriptor, options: options) {
  169. case .created(let stream):
  170. return try await stream.execute { inbound, outbound in
  171. let rpcStream = RPCStream(
  172. descriptor: stream.descriptor,
  173. inbound: RPCAsyncSequence<RPCResponsePart, any Error>(wrapping: inbound),
  174. outbound: RPCWriter.Closable(wrapping: outbound)
  175. )
  176. return try await closure(rpcStream)
  177. }
  178. case .tryAgain(let error):
  179. if error is CancellationError || attempt == Self.maxStreamCreationAttempts {
  180. throw error
  181. } else {
  182. continue
  183. }
  184. case .stopTrying(let error):
  185. throw error
  186. }
  187. }
  188. fatalError("Internal inconsistency")
  189. }
  190. }
  191. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  192. extension GRPCChannel {
  193. package struct Config: Sendable {
  194. /// Configuration for HTTP/2 connections.
  195. package var http2: HTTP2ClientTransport.Config.HTTP2
  196. /// Configuration for backoff used when establishing a connection.
  197. package var backoff: HTTP2ClientTransport.Config.Backoff
  198. /// Configuration for connection management.
  199. package var connection: HTTP2ClientTransport.Config.Connection
  200. /// Compression configuration.
  201. package var compression: HTTP2ClientTransport.Config.Compression
  202. package init(
  203. http2: HTTP2ClientTransport.Config.HTTP2,
  204. backoff: HTTP2ClientTransport.Config.Backoff,
  205. connection: HTTP2ClientTransport.Config.Connection,
  206. compression: HTTP2ClientTransport.Config.Compression
  207. ) {
  208. self.http2 = http2
  209. self.backoff = backoff
  210. self.connection = connection
  211. self.compression = compression
  212. }
  213. }
  214. }
  215. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  216. extension GRPCChannel {
  217. enum MakeStreamResult {
  218. /// A stream was created, use it.
  219. case created(Connection.Stream)
  220. /// An error occurred while trying to create a stream, try again if possible.
  221. case tryAgain(any Error)
  222. /// An unrecoverable error occurred (e.g. the channel is closed), fail the RPC and don't retry.
  223. case stopTrying(any Error)
  224. }
  225. private func makeStream(
  226. descriptor: MethodDescriptor,
  227. options: CallOptions
  228. ) async -> MakeStreamResult {
  229. let waitForReady = options.waitForReady ?? true
  230. switch self.state.withLockedValue({ $0.makeStream(waitForReady: waitForReady) }) {
  231. case .useLoadBalancer(let loadBalancer):
  232. return await self.makeStream(
  233. descriptor: descriptor,
  234. options: options,
  235. loadBalancer: loadBalancer
  236. )
  237. case .joinQueue:
  238. do {
  239. let loadBalancer = try await self.enqueue(waitForReady: waitForReady)
  240. return await self.makeStream(
  241. descriptor: descriptor,
  242. options: options,
  243. loadBalancer: loadBalancer
  244. )
  245. } catch {
  246. // All errors from enqueue are non-recoverable: either the channel is shutting down or
  247. // the request has been cancelled.
  248. return .stopTrying(error)
  249. }
  250. case .failRPC:
  251. return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
  252. }
  253. }
  254. private func makeStream(
  255. descriptor: MethodDescriptor,
  256. options: CallOptions,
  257. loadBalancer: LoadBalancer
  258. ) async -> MakeStreamResult {
  259. guard let subchannel = loadBalancer.pickSubchannel() else {
  260. return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
  261. }
  262. let methodConfig = self.configuration(forMethod: descriptor)
  263. var options = options
  264. options.formUnion(with: methodConfig)
  265. do {
  266. let stream = try await subchannel.makeStream(descriptor: descriptor, options: options)
  267. return .created(stream)
  268. } catch {
  269. return .tryAgain(error)
  270. }
  271. }
  272. private func enqueue(waitForReady: Bool) async throws -> LoadBalancer {
  273. let id = QueueEntryID()
  274. return try await withTaskCancellationHandler {
  275. try await withCheckedThrowingContinuation { continuation in
  276. if Task.isCancelled {
  277. continuation.resume(throwing: CancellationError())
  278. return
  279. }
  280. let enqueued = self.state.withLockedValue { state in
  281. state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
  282. }
  283. // Not enqueued because the channel is shutdown or shutting down.
  284. if !enqueued {
  285. let error = RPCError(code: .unavailable, message: "channel is shutdown")
  286. continuation.resume(throwing: error)
  287. }
  288. }
  289. } onCancel: {
  290. let continuation = self.state.withLockedValue { state in
  291. state.dequeueContinuation(id: id)
  292. }
  293. continuation?.resume(throwing: CancellationError())
  294. }
  295. }
  296. }
  297. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  298. extension GRPCChannel {
  299. private func handleClose(in group: inout DiscardingTaskGroup) {
  300. switch self.state.withLockedValue({ $0.close() }) {
  301. case .close(let current, let next, let resolver, let continuations):
  302. resolver?.cancel()
  303. current.close()
  304. next?.close()
  305. for continuation in continuations {
  306. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  307. }
  308. self._connectivityState.continuation.yield(.shutdown)
  309. case .cancelAll(let continuations):
  310. for continuation in continuations {
  311. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  312. }
  313. self._connectivityState.continuation.yield(.shutdown)
  314. group.cancelAll()
  315. case .none:
  316. ()
  317. }
  318. }
  319. private func handleNameResolutionResult(
  320. _ result: NameResolutionResult,
  321. in group: inout DiscardingTaskGroup
  322. ) {
  323. // Ignore empty endpoint lists.
  324. if result.endpoints.isEmpty { return }
  325. switch result.serviceConfig ?? .success(self.defaultServiceConfig) {
  326. case .success(let config):
  327. // Update per RPC configuration.
  328. let methodConfig = MethodConfigs(serviceConfig: config)
  329. self._methodConfig.withLockedValue { $0 = methodConfig }
  330. let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
  331. self._retryThrottle.withLockedValue { $0 = retryThrottle }
  332. // Update the load balancer.
  333. self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
  334. case .failure:
  335. self.close()
  336. }
  337. }
  338. enum SupportedLoadBalancerConfig {
  339. case roundRobin
  340. case pickFirst(ServiceConfig.LoadBalancingConfig.PickFirst)
  341. init?(_ config: ServiceConfig.LoadBalancingConfig) {
  342. if let pickFirst = config.pickFirst {
  343. self = .pickFirst(pickFirst)
  344. } else if config.roundRobin != nil {
  345. self = .roundRobin
  346. } else {
  347. return nil
  348. }
  349. }
  350. func matches(loadBalancer: LoadBalancer) -> Bool {
  351. switch (self, loadBalancer) {
  352. case (.roundRobin, .roundRobin):
  353. return true
  354. case (.pickFirst, .pickFirst):
  355. return true
  356. case (.roundRobin, .pickFirst),
  357. (.pickFirst, .roundRobin):
  358. return false
  359. }
  360. }
  361. }
  362. private func updateLoadBalancer(
  363. serviceConfig: ServiceConfig,
  364. endpoints: [Endpoint],
  365. in group: inout DiscardingTaskGroup
  366. ) {
  367. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  368. // Find the first supported config.
  369. var configFromServiceConfig: SupportedLoadBalancerConfig?
  370. for config in serviceConfig.loadBalancingConfig {
  371. if let config = SupportedLoadBalancerConfig(config) {
  372. configFromServiceConfig = config
  373. break
  374. }
  375. }
  376. let onUpdatePolicy: GRPCChannel.StateMachine.OnChangeLoadBalancer
  377. var endpoints = endpoints
  378. // Fallback to pick-first if no other config applies.
  379. let loadBalancerConfig = configFromServiceConfig ?? .pickFirst(.init(shuffleAddressList: false))
  380. switch loadBalancerConfig {
  381. case .roundRobin:
  382. onUpdatePolicy = self.state.withLockedValue { state in
  383. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  384. let loadBalancer = RoundRobinLoadBalancer(
  385. connector: self.connector,
  386. backoff: self.backoff,
  387. defaultCompression: self.defaultCompression,
  388. enabledCompression: self.enabledCompression
  389. )
  390. return .roundRobin(loadBalancer)
  391. }
  392. }
  393. case .pickFirst(let pickFirst):
  394. if pickFirst.shuffleAddressList {
  395. endpoints[0].addresses.shuffle()
  396. }
  397. onUpdatePolicy = self.state.withLockedValue { state in
  398. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  399. let loadBalancer = PickFirstLoadBalancer(
  400. connector: self.connector,
  401. backoff: self.backoff,
  402. defaultCompression: self.defaultCompression,
  403. enabledCompression: self.enabledCompression
  404. )
  405. return .pickFirst(loadBalancer)
  406. }
  407. }
  408. }
  409. self.handleLoadBalancerChange(onUpdatePolicy, endpoints: endpoints, in: &group)
  410. }
  411. private func handleLoadBalancerChange(
  412. _ update: StateMachine.OnChangeLoadBalancer,
  413. endpoints: [Endpoint],
  414. in group: inout DiscardingTaskGroup
  415. ) {
  416. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  417. switch update {
  418. case .runLoadBalancer(let new, let old):
  419. old?.close()
  420. switch new {
  421. case .roundRobin(let loadBalancer):
  422. loadBalancer.updateAddresses(endpoints)
  423. case .pickFirst(let loadBalancer):
  424. loadBalancer.updateEndpoint(endpoints.first!)
  425. }
  426. group.addTask {
  427. await new.run()
  428. }
  429. group.addTask {
  430. for await event in new.events {
  431. self.input.continuation.yield(.handleLoadBalancerEvent(event, new.id))
  432. }
  433. }
  434. case .updateLoadBalancer(let existing):
  435. switch existing {
  436. case .roundRobin(let loadBalancer):
  437. loadBalancer.updateAddresses(endpoints)
  438. case .pickFirst(let loadBalancer):
  439. loadBalancer.updateEndpoint(endpoints.first!)
  440. }
  441. case .none:
  442. ()
  443. }
  444. }
  445. private func handleLoadBalancerEvent(
  446. _ event: LoadBalancerEvent,
  447. loadBalancerID: LoadBalancerID,
  448. in group: inout DiscardingTaskGroup,
  449. iterator: inout RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator?
  450. ) async {
  451. switch event {
  452. case .connectivityStateChanged(let connectivityState):
  453. let actions = self.state.withLockedValue { state in
  454. state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
  455. }
  456. if let newState = actions.publishState {
  457. self._connectivityState.continuation.yield(newState)
  458. }
  459. if let subchannel = actions.close {
  460. subchannel.close()
  461. }
  462. if let resumable = actions.resumeContinuations {
  463. for continuation in resumable.continuations {
  464. continuation.resume(with: resumable.result)
  465. }
  466. }
  467. if actions.finish {
  468. // Fully closed.
  469. self._connectivityState.continuation.finish()
  470. self.input.continuation.finish()
  471. }
  472. case .requiresNameResolution:
  473. await self.resolve(iterator: &iterator, in: &group)
  474. }
  475. }
  476. private func resolve(
  477. iterator: inout RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator?,
  478. in group: inout DiscardingTaskGroup
  479. ) async {
  480. guard var iterator = iterator else { return }
  481. do {
  482. if let result = try await iterator.next() {
  483. self.handleNameResolutionResult(result, in: &group)
  484. } else {
  485. self.close()
  486. }
  487. } catch {
  488. self.close()
  489. }
  490. }
  491. }
  492. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  493. extension GRPCChannel {
  494. struct StateMachine {
  495. enum State {
  496. case notRunning(NotRunning)
  497. case running(Running)
  498. case stopping(Stopping)
  499. case stopped
  500. case _modifying
  501. struct NotRunning {
  502. /// Queue of requests waiting for a load-balancer.
  503. var queue: RequestQueue
  504. /// A handle to the name resolver task.
  505. var nameResolverHandle: CancellableTaskHandle?
  506. init() {
  507. self.queue = RequestQueue()
  508. }
  509. }
  510. struct Running {
  511. /// The connectivity state of the channel.
  512. var connectivityState: ConnectivityState
  513. /// The load-balancer currently in use.
  514. var current: LoadBalancer
  515. /// The next load-balancer to use. This will be promoted to `current` when it enters the
  516. /// ready state.
  517. var next: LoadBalancer?
  518. /// Previously created load-balancers which are in the process of shutting down.
  519. var past: [LoadBalancerID: LoadBalancer]
  520. /// Queue of requests wait for a load-balancer.
  521. var queue: RequestQueue
  522. /// A handle to the name resolver task.
  523. var nameResolverHandle: CancellableTaskHandle?
  524. init(
  525. from state: NotRunning,
  526. loadBalancer: LoadBalancer
  527. ) {
  528. self.connectivityState = .idle
  529. self.current = loadBalancer
  530. self.next = nil
  531. self.past = [:]
  532. self.queue = state.queue
  533. self.nameResolverHandle = state.nameResolverHandle
  534. }
  535. }
  536. struct Stopping {
  537. /// Previously created load-balancers which are in the process of shutting down.
  538. var past: [LoadBalancerID: LoadBalancer]
  539. init(from state: Running) {
  540. self.past = state.past
  541. }
  542. init(loadBalancers: [LoadBalancerID: LoadBalancer]) {
  543. self.past = loadBalancers
  544. }
  545. }
  546. }
  547. /// The current state.
  548. private var state: State
  549. /// Whether the channel is running.
  550. private var running: Bool
  551. init() {
  552. self.state = .notRunning(State.NotRunning())
  553. self.running = false
  554. }
  555. }
  556. }
  557. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  558. extension GRPCChannel.StateMachine {
  559. mutating func start() {
  560. precondition(!self.running, "channel must only be started once")
  561. self.running = true
  562. }
  563. mutating func setNameResolverTaskHandle(_ handle: CancellableTaskHandle) {
  564. switch self.state {
  565. case .notRunning(var state):
  566. state.nameResolverHandle = handle
  567. self.state = .notRunning(state)
  568. case .running, .stopping, .stopped, ._modifying:
  569. fatalError("Invalid state")
  570. }
  571. }
  572. enum OnChangeLoadBalancer {
  573. case runLoadBalancer(LoadBalancer, stop: LoadBalancer?)
  574. case updateLoadBalancer(LoadBalancer)
  575. case none
  576. }
  577. mutating func changeLoadBalancerKind(
  578. to newLoadBalancerKind: GRPCChannel.SupportedLoadBalancerConfig,
  579. _ makeLoadBalancer: () -> LoadBalancer
  580. ) -> OnChangeLoadBalancer {
  581. let onChangeLoadBalancer: OnChangeLoadBalancer
  582. switch self.state {
  583. case .notRunning(let state):
  584. let loadBalancer = makeLoadBalancer()
  585. let state = State.Running(from: state, loadBalancer: loadBalancer)
  586. self.state = .running(state)
  587. onChangeLoadBalancer = .runLoadBalancer(state.current, stop: nil)
  588. case .running(var state):
  589. self.state = ._modifying
  590. if let next = state.next {
  591. if newLoadBalancerKind.matches(loadBalancer: next) {
  592. onChangeLoadBalancer = .updateLoadBalancer(next)
  593. } else {
  594. // The 'next' didn't become ready in time. Close it and replace it with a load-balancer
  595. // of the next kind.
  596. let nextNext = makeLoadBalancer()
  597. let previous = state.next
  598. state.next = nextNext
  599. state.past[next.id] = next
  600. onChangeLoadBalancer = .runLoadBalancer(nextNext, stop: previous)
  601. }
  602. } else {
  603. if newLoadBalancerKind.matches(loadBalancer: state.current) {
  604. onChangeLoadBalancer = .updateLoadBalancer(state.current)
  605. } else {
  606. // Create the 'next' load-balancer, it'll replace 'current' when it becomes ready.
  607. let next = makeLoadBalancer()
  608. state.next = next
  609. onChangeLoadBalancer = .runLoadBalancer(next, stop: nil)
  610. }
  611. }
  612. self.state = .running(state)
  613. case .stopping, .stopped:
  614. onChangeLoadBalancer = .none
  615. case ._modifying:
  616. fatalError("Invalid state")
  617. }
  618. return onChangeLoadBalancer
  619. }
  620. struct ConnectivityStateChangeActions {
  621. var close: LoadBalancer? = nil
  622. var publishState: ConnectivityState? = nil
  623. var resumeContinuations: ResumableContinuations? = nil
  624. var finish: Bool = false
  625. struct ResumableContinuations {
  626. var continuations: [CheckedContinuation<LoadBalancer, any Error>]
  627. var result: Result<LoadBalancer, any Error>
  628. }
  629. }
  630. mutating func loadBalancerStateChanged(
  631. to connectivityState: ConnectivityState,
  632. id: LoadBalancerID
  633. ) -> ConnectivityStateChangeActions {
  634. var actions = ConnectivityStateChangeActions()
  635. switch self.state {
  636. case .running(var state):
  637. self.state = ._modifying
  638. if id == state.current.id {
  639. // No change in state, ignore.
  640. if state.connectivityState == connectivityState {
  641. self.state = .running(state)
  642. break
  643. }
  644. state.connectivityState = connectivityState
  645. actions.publishState = connectivityState
  646. switch connectivityState {
  647. case .ready:
  648. // Current load-balancer became ready; resume all continuations in the queue.
  649. let continuations = state.queue.removeAll()
  650. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  651. continuations: continuations,
  652. result: .success(state.current)
  653. )
  654. case .transientFailure, .shutdown: // shutdown includes shutting down
  655. // Current load-balancer failed. Remove all the 'fast-failing' continuations in the
  656. // queue, these are RPCs which set the 'wait for ready' option to false. The rest of
  657. // the entries in the queue will wait for a load-balancer to become ready.
  658. let continuations = state.queue.removeFastFailingEntries()
  659. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  660. continuations: continuations,
  661. result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
  662. )
  663. case .idle, .connecting:
  664. () // Ignore.
  665. }
  666. } else if let next = state.next, next.id == id {
  667. // State change came from the next LB, if it's now ready promote it to be the current.
  668. switch connectivityState {
  669. case .ready:
  670. // Next load-balancer is ready, promote it to current.
  671. let previous = state.current
  672. state.past[previous.id] = previous
  673. state.current = next
  674. state.next = nil
  675. actions.close = previous
  676. if state.connectivityState != connectivityState {
  677. actions.publishState = connectivityState
  678. }
  679. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  680. continuations: state.queue.removeAll(),
  681. result: .success(next)
  682. )
  683. case .idle, .connecting, .transientFailure, .shutdown:
  684. ()
  685. }
  686. }
  687. self.state = .running(state)
  688. case .stopping(var state):
  689. self.state = ._modifying
  690. // Remove the load balancer if it's now shutdown.
  691. switch connectivityState {
  692. case .shutdown:
  693. state.past.removeValue(forKey: id)
  694. case .idle, .connecting, .ready, .transientFailure:
  695. ()
  696. }
  697. // If that was the last load-balancer then finish the input streams so that the channel
  698. // eventually finishes.
  699. if state.past.isEmpty {
  700. actions.finish = true
  701. self.state = .stopped
  702. } else {
  703. self.state = .stopping(state)
  704. }
  705. case .notRunning, .stopped:
  706. ()
  707. case ._modifying:
  708. fatalError("Invalid state")
  709. }
  710. return actions
  711. }
  712. enum OnMakeStream {
  713. /// Use the given load-balancer to make a stream.
  714. case useLoadBalancer(LoadBalancer)
  715. /// Join the queue and wait until a load-balancer becomes ready.
  716. case joinQueue
  717. /// Fail the stream request, the channel isn't in a suitable state.
  718. case failRPC
  719. }
  720. func makeStream(waitForReady: Bool) -> OnMakeStream {
  721. let onMakeStream: OnMakeStream
  722. switch self.state {
  723. case .notRunning:
  724. onMakeStream = .joinQueue
  725. case .running(let state):
  726. switch state.connectivityState {
  727. case .idle, .connecting:
  728. onMakeStream = .joinQueue
  729. case .ready:
  730. onMakeStream = .useLoadBalancer(state.current)
  731. case .transientFailure:
  732. onMakeStream = waitForReady ? .joinQueue : .failRPC
  733. case .shutdown:
  734. onMakeStream = .failRPC
  735. }
  736. case .stopping, .stopped:
  737. onMakeStream = .failRPC
  738. case ._modifying:
  739. fatalError("Invalid state")
  740. }
  741. return onMakeStream
  742. }
  743. mutating func enqueue(
  744. continuation: CheckedContinuation<LoadBalancer, any Error>,
  745. waitForReady: Bool,
  746. id: QueueEntryID
  747. ) -> Bool {
  748. switch self.state {
  749. case .notRunning(var state):
  750. self.state = ._modifying
  751. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  752. self.state = .notRunning(state)
  753. return true
  754. case .running(var state):
  755. self.state = ._modifying
  756. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  757. self.state = .running(state)
  758. return true
  759. case .stopping, .stopped:
  760. return false
  761. case ._modifying:
  762. fatalError("Invalid state")
  763. }
  764. }
  765. mutating func dequeueContinuation(
  766. id: QueueEntryID
  767. ) -> CheckedContinuation<LoadBalancer, any Error>? {
  768. switch self.state {
  769. case .notRunning(var state):
  770. self.state = ._modifying
  771. let continuation = state.queue.removeEntry(withID: id)
  772. self.state = .notRunning(state)
  773. return continuation
  774. case .running(var state):
  775. self.state = ._modifying
  776. let continuation = state.queue.removeEntry(withID: id)
  777. self.state = .running(state)
  778. return continuation
  779. case .stopping, .stopped:
  780. return nil
  781. case ._modifying:
  782. fatalError("Invalid state")
  783. }
  784. }
  785. enum OnClose {
  786. case none
  787. case cancelAll([RequestQueue.Continuation])
  788. case close(LoadBalancer, LoadBalancer?, CancellableTaskHandle?, [RequestQueue.Continuation])
  789. }
  790. mutating func close() -> OnClose {
  791. let onClose: OnClose
  792. switch self.state {
  793. case .notRunning(var state):
  794. self.state = .stopped
  795. onClose = .cancelAll(state.queue.removeAll())
  796. case .running(var state):
  797. let continuations = state.queue.removeAll()
  798. onClose = .close(state.current, state.next, state.nameResolverHandle, continuations)
  799. state.past[state.current.id] = state.current
  800. if let next = state.next {
  801. state.past[next.id] = next
  802. }
  803. self.state = .stopping(State.Stopping(loadBalancers: state.past))
  804. case .stopping, .stopped:
  805. onClose = .none
  806. case ._modifying:
  807. fatalError("Invalid state")
  808. }
  809. return onClose
  810. }
  811. }