GRPCChannel.swift 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import Atomics
  17. internal import DequeModule
  18. package import GRPCCore
  19. private import Synchronization
  20. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  21. package final class GRPCChannel: ClientTransport {
  22. private enum Input: Sendable {
  23. /// Close the channel, if possible.
  24. case close
  25. /// Handle the result of a name resolution.
  26. case handleResolutionResult(NameResolutionResult)
  27. /// Handle the event from the underlying connection object.
  28. case handleLoadBalancerEvent(LoadBalancerEvent, LoadBalancerID)
  29. }
  30. /// Events which can happen to the channel.
  31. private let _connectivityState:
  32. (
  33. stream: AsyncStream<ConnectivityState>,
  34. continuation: AsyncStream<ConnectivityState>.Continuation
  35. )
  36. /// Inputs which this channel should react to.
  37. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  38. /// A resolver providing resolved names to the channel.
  39. private let resolver: NameResolver
  40. /// The state of the channel.
  41. private let state: Mutex<StateMachine>
  42. /// The maximum number of times to attempt to create a stream per RPC.
  43. ///
  44. /// This is the value used by other gRPC implementations.
  45. private static let maxStreamCreationAttempts = 5
  46. /// A factory for connections.
  47. private let connector: any HTTP2Connector
  48. /// The connection backoff configuration used by the subchannel when establishing a connection.
  49. private let backoff: ConnectionBackoff
  50. /// The default compression algorithm used for requests.
  51. private let defaultCompression: CompressionAlgorithm
  52. /// The set of enabled compression algorithms.
  53. private let enabledCompression: CompressionAlgorithmSet
  54. /// The default service config to use.
  55. ///
  56. /// Used when the resolver doesn't provide one.
  57. private let defaultServiceConfig: ServiceConfig
  58. // These are both read frequently and updated infrequently so may be a bottleneck.
  59. private let _methodConfig: Mutex<MethodConfigs>
  60. private let _retryThrottle: Mutex<RetryThrottle?>
  61. package init(
  62. resolver: NameResolver,
  63. connector: any HTTP2Connector,
  64. config: Config,
  65. defaultServiceConfig: ServiceConfig
  66. ) {
  67. self.resolver = resolver
  68. self.state = Mutex(StateMachine())
  69. self._connectivityState = AsyncStream.makeStream()
  70. self.input = AsyncStream.makeStream()
  71. self.connector = connector
  72. self.backoff = ConnectionBackoff(
  73. initial: config.backoff.initial,
  74. max: config.backoff.max,
  75. multiplier: config.backoff.multiplier,
  76. jitter: config.backoff.jitter
  77. )
  78. self.defaultCompression = config.compression.algorithm
  79. self.enabledCompression = config.compression.enabledAlgorithms
  80. self.defaultServiceConfig = defaultServiceConfig
  81. let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
  82. self._retryThrottle = Mutex(throttle)
  83. let methodConfig = MethodConfigs(serviceConfig: defaultServiceConfig)
  84. self._methodConfig = Mutex(methodConfig)
  85. }
  86. /// The connectivity state of the channel.
  87. package var connectivityState: AsyncStream<ConnectivityState> {
  88. self._connectivityState.stream
  89. }
  90. /// Returns a throttle which gRPC uses to determine whether retries can be executed.
  91. package var retryThrottle: RetryThrottle? {
  92. self._retryThrottle.withLock { $0 }
  93. }
  94. /// Returns the configuration for a given method.
  95. ///
  96. /// - Parameter descriptor: The method to lookup configuration for.
  97. /// - Returns: Configuration for the method, if it exists.
  98. package func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
  99. self._methodConfig.withLock { $0[descriptor] }
  100. }
  101. /// Establishes and maintains a connection to the remote destination.
  102. package func connect() async {
  103. self.state.withLock { $0.start() }
  104. self._connectivityState.continuation.yield(.idle)
  105. await withDiscardingTaskGroup { group in
  106. var iterator: Optional<RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator>
  107. // The resolver can either push or pull values. If it pushes values the channel should
  108. // listen for new results. Otherwise the channel will pull values as and when necessary.
  109. switch self.resolver.updateMode.value {
  110. case .push:
  111. iterator = nil
  112. let handle = group.addCancellableTask {
  113. do {
  114. for try await result in self.resolver.names {
  115. self.input.continuation.yield(.handleResolutionResult(result))
  116. }
  117. self.beginGracefulShutdown()
  118. } catch {
  119. self.beginGracefulShutdown()
  120. }
  121. }
  122. // When the channel is closed gracefully, the task group running the load balancer mustn't
  123. // be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
  124. // continue indefinitely. Store its handle and cancel it on close when closing the channel.
  125. self.state.withLock { state in
  126. state.setNameResolverTaskHandle(handle)
  127. }
  128. case .pull:
  129. iterator = self.resolver.names.makeAsyncIterator()
  130. await self.resolve(iterator: &iterator, in: &group)
  131. }
  132. // Resolver is setup, start handling events.
  133. for await input in self.input.stream {
  134. switch input {
  135. case .close:
  136. self.handleClose(in: &group)
  137. case .handleResolutionResult(let result):
  138. self.handleNameResolutionResult(result, in: &group)
  139. case .handleLoadBalancerEvent(let event, let id):
  140. await self.handleLoadBalancerEvent(
  141. event,
  142. loadBalancerID: id,
  143. in: &group,
  144. iterator: &iterator
  145. )
  146. }
  147. }
  148. }
  149. if Task.isCancelled {
  150. self._connectivityState.continuation.finish()
  151. }
  152. }
  153. /// Signal to the transport that no new streams may be created and that connections should be
  154. /// closed when all streams are closed.
  155. package func beginGracefulShutdown() {
  156. self.input.continuation.yield(.close)
  157. }
  158. /// Opens a stream using the transport, and uses it as input into a user-provided closure.
  159. package func withStream<T: Sendable>(
  160. descriptor: MethodDescriptor,
  161. options: CallOptions,
  162. _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
  163. ) async throws -> T {
  164. // Merge options from the call with those from the service config.
  165. let methodConfig = self.configuration(forMethod: descriptor)
  166. var options = options
  167. options.formUnion(with: methodConfig)
  168. for attempt in 1 ... Self.maxStreamCreationAttempts {
  169. switch await self.makeStream(descriptor: descriptor, options: options) {
  170. case .created(let stream):
  171. return try await stream.execute { inbound, outbound in
  172. let rpcStream = RPCStream(
  173. descriptor: stream.descriptor,
  174. inbound: RPCAsyncSequence<RPCResponsePart, any Error>(wrapping: inbound),
  175. outbound: RPCWriter.Closable(wrapping: outbound)
  176. )
  177. return try await closure(rpcStream)
  178. }
  179. case .tryAgain(let error):
  180. if error is CancellationError || attempt == Self.maxStreamCreationAttempts {
  181. throw error
  182. } else {
  183. continue
  184. }
  185. case .stopTrying(let error):
  186. throw error
  187. }
  188. }
  189. fatalError("Internal inconsistency")
  190. }
  191. }
  192. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  193. extension GRPCChannel {
  194. package struct Config: Sendable {
  195. /// Configuration for HTTP/2 connections.
  196. package var http2: HTTP2ClientTransport.Config.HTTP2
  197. /// Configuration for backoff used when establishing a connection.
  198. package var backoff: HTTP2ClientTransport.Config.Backoff
  199. /// Configuration for connection management.
  200. package var connection: HTTP2ClientTransport.Config.Connection
  201. /// Compression configuration.
  202. package var compression: HTTP2ClientTransport.Config.Compression
  203. package init(
  204. http2: HTTP2ClientTransport.Config.HTTP2,
  205. backoff: HTTP2ClientTransport.Config.Backoff,
  206. connection: HTTP2ClientTransport.Config.Connection,
  207. compression: HTTP2ClientTransport.Config.Compression
  208. ) {
  209. self.http2 = http2
  210. self.backoff = backoff
  211. self.connection = connection
  212. self.compression = compression
  213. }
  214. }
  215. }
  216. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  217. extension GRPCChannel {
  218. enum MakeStreamResult {
  219. /// A stream was created, use it.
  220. case created(Connection.Stream)
  221. /// An error occurred while trying to create a stream, try again if possible.
  222. case tryAgain(any Error)
  223. /// An unrecoverable error occurred (e.g. the channel is closed), fail the RPC and don't retry.
  224. case stopTrying(any Error)
  225. }
  226. private func makeStream(
  227. descriptor: MethodDescriptor,
  228. options: CallOptions
  229. ) async -> MakeStreamResult {
  230. let waitForReady = options.waitForReady ?? true
  231. switch self.state.withLock({ $0.makeStream(waitForReady: waitForReady) }) {
  232. case .useLoadBalancer(let loadBalancer):
  233. return await self.makeStream(
  234. descriptor: descriptor,
  235. options: options,
  236. loadBalancer: loadBalancer
  237. )
  238. case .joinQueue:
  239. do {
  240. let loadBalancer = try await self.enqueue(waitForReady: waitForReady)
  241. return await self.makeStream(
  242. descriptor: descriptor,
  243. options: options,
  244. loadBalancer: loadBalancer
  245. )
  246. } catch {
  247. // All errors from enqueue are non-recoverable: either the channel is shutting down or
  248. // the request has been cancelled.
  249. return .stopTrying(error)
  250. }
  251. case .failRPC:
  252. return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
  253. }
  254. }
  255. private func makeStream(
  256. descriptor: MethodDescriptor,
  257. options: CallOptions,
  258. loadBalancer: LoadBalancer
  259. ) async -> MakeStreamResult {
  260. guard let subchannel = loadBalancer.pickSubchannel() else {
  261. return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
  262. }
  263. let methodConfig = self.configuration(forMethod: descriptor)
  264. var options = options
  265. options.formUnion(with: methodConfig)
  266. do {
  267. let stream = try await subchannel.makeStream(descriptor: descriptor, options: options)
  268. return .created(stream)
  269. } catch {
  270. return .tryAgain(error)
  271. }
  272. }
  273. private func enqueue(waitForReady: Bool) async throws -> LoadBalancer {
  274. let id = QueueEntryID()
  275. return try await withTaskCancellationHandler {
  276. try await withCheckedThrowingContinuation { continuation in
  277. if Task.isCancelled {
  278. continuation.resume(throwing: CancellationError())
  279. return
  280. }
  281. let enqueued = self.state.withLock { state in
  282. state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
  283. }
  284. // Not enqueued because the channel is shutdown or shutting down.
  285. if !enqueued {
  286. let error = RPCError(code: .unavailable, message: "channel is shutdown")
  287. continuation.resume(throwing: error)
  288. }
  289. }
  290. } onCancel: {
  291. let continuation = self.state.withLock { state in
  292. state.dequeueContinuation(id: id)
  293. }
  294. continuation?.resume(throwing: CancellationError())
  295. }
  296. }
  297. }
  298. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  299. extension GRPCChannel {
  300. private func handleClose(in group: inout DiscardingTaskGroup) {
  301. switch self.state.withLock({ $0.close() }) {
  302. case .close(let current, let next, let resolver, let continuations):
  303. resolver?.cancel()
  304. current.close()
  305. next?.close()
  306. for continuation in continuations {
  307. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  308. }
  309. self._connectivityState.continuation.yield(.shutdown)
  310. case .cancelAll(let continuations):
  311. for continuation in continuations {
  312. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  313. }
  314. self._connectivityState.continuation.yield(.shutdown)
  315. group.cancelAll()
  316. case .none:
  317. ()
  318. }
  319. }
  320. private func handleNameResolutionResult(
  321. _ result: NameResolutionResult,
  322. in group: inout DiscardingTaskGroup
  323. ) {
  324. // Ignore empty endpoint lists.
  325. if result.endpoints.isEmpty { return }
  326. switch result.serviceConfig ?? .success(self.defaultServiceConfig) {
  327. case .success(let config):
  328. // Update per RPC configuration.
  329. let methodConfig = MethodConfigs(serviceConfig: config)
  330. self._methodConfig.withLock { $0 = methodConfig }
  331. let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
  332. self._retryThrottle.withLock { $0 = retryThrottle }
  333. // Update the load balancer.
  334. self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
  335. case .failure:
  336. self.beginGracefulShutdown()
  337. }
  338. }
  339. enum SupportedLoadBalancerConfig {
  340. case roundRobin
  341. case pickFirst(ServiceConfig.LoadBalancingConfig.PickFirst)
  342. init?(_ config: ServiceConfig.LoadBalancingConfig) {
  343. if let pickFirst = config.pickFirst {
  344. self = .pickFirst(pickFirst)
  345. } else if config.roundRobin != nil {
  346. self = .roundRobin
  347. } else {
  348. return nil
  349. }
  350. }
  351. func matches(loadBalancer: LoadBalancer) -> Bool {
  352. switch (self, loadBalancer) {
  353. case (.roundRobin, .roundRobin):
  354. return true
  355. case (.pickFirst, .pickFirst):
  356. return true
  357. case (.roundRobin, .pickFirst),
  358. (.pickFirst, .roundRobin):
  359. return false
  360. }
  361. }
  362. }
  363. private func updateLoadBalancer(
  364. serviceConfig: ServiceConfig,
  365. endpoints: [Endpoint],
  366. in group: inout DiscardingTaskGroup
  367. ) {
  368. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  369. // Find the first supported config.
  370. var configFromServiceConfig: SupportedLoadBalancerConfig?
  371. for config in serviceConfig.loadBalancingConfig {
  372. if let config = SupportedLoadBalancerConfig(config) {
  373. configFromServiceConfig = config
  374. break
  375. }
  376. }
  377. let onUpdatePolicy: GRPCChannel.StateMachine.OnChangeLoadBalancer
  378. var endpoints = endpoints
  379. // Fallback to pick-first if no other config applies.
  380. let loadBalancerConfig = configFromServiceConfig ?? .pickFirst(.init(shuffleAddressList: false))
  381. switch loadBalancerConfig {
  382. case .roundRobin:
  383. onUpdatePolicy = self.state.withLock { state in
  384. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  385. let loadBalancer = RoundRobinLoadBalancer(
  386. connector: self.connector,
  387. backoff: self.backoff,
  388. defaultCompression: self.defaultCompression,
  389. enabledCompression: self.enabledCompression
  390. )
  391. return .roundRobin(loadBalancer)
  392. }
  393. }
  394. case .pickFirst(let pickFirst):
  395. if pickFirst.shuffleAddressList {
  396. endpoints[0].addresses.shuffle()
  397. }
  398. onUpdatePolicy = self.state.withLock { state in
  399. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  400. let loadBalancer = PickFirstLoadBalancer(
  401. connector: self.connector,
  402. backoff: self.backoff,
  403. defaultCompression: self.defaultCompression,
  404. enabledCompression: self.enabledCompression
  405. )
  406. return .pickFirst(loadBalancer)
  407. }
  408. }
  409. }
  410. self.handleLoadBalancerChange(onUpdatePolicy, endpoints: endpoints, in: &group)
  411. }
  412. private func handleLoadBalancerChange(
  413. _ update: StateMachine.OnChangeLoadBalancer,
  414. endpoints: [Endpoint],
  415. in group: inout DiscardingTaskGroup
  416. ) {
  417. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  418. switch update {
  419. case .runLoadBalancer(let new, let old):
  420. old?.close()
  421. switch new {
  422. case .roundRobin(let loadBalancer):
  423. loadBalancer.updateAddresses(endpoints)
  424. case .pickFirst(let loadBalancer):
  425. loadBalancer.updateEndpoint(endpoints.first!)
  426. }
  427. group.addTask {
  428. await new.run()
  429. }
  430. group.addTask {
  431. for await event in new.events {
  432. self.input.continuation.yield(.handleLoadBalancerEvent(event, new.id))
  433. }
  434. }
  435. case .updateLoadBalancer(let existing):
  436. switch existing {
  437. case .roundRobin(let loadBalancer):
  438. loadBalancer.updateAddresses(endpoints)
  439. case .pickFirst(let loadBalancer):
  440. loadBalancer.updateEndpoint(endpoints.first!)
  441. }
  442. case .none:
  443. ()
  444. }
  445. }
  446. private func handleLoadBalancerEvent(
  447. _ event: LoadBalancerEvent,
  448. loadBalancerID: LoadBalancerID,
  449. in group: inout DiscardingTaskGroup,
  450. iterator: inout RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator?
  451. ) async {
  452. switch event {
  453. case .connectivityStateChanged(let connectivityState):
  454. let actions = self.state.withLock { state in
  455. state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
  456. }
  457. if let newState = actions.publishState {
  458. self._connectivityState.continuation.yield(newState)
  459. }
  460. if let subchannel = actions.close {
  461. subchannel.close()
  462. }
  463. if let resumable = actions.resumeContinuations {
  464. for continuation in resumable.continuations {
  465. continuation.resume(with: resumable.result)
  466. }
  467. }
  468. if actions.finish {
  469. // Fully closed.
  470. self._connectivityState.continuation.finish()
  471. self.input.continuation.finish()
  472. }
  473. case .requiresNameResolution:
  474. await self.resolve(iterator: &iterator, in: &group)
  475. }
  476. }
  477. private func resolve(
  478. iterator: inout RPCAsyncSequence<NameResolutionResult, any Error>.AsyncIterator?,
  479. in group: inout DiscardingTaskGroup
  480. ) async {
  481. guard var iterator = iterator else { return }
  482. do {
  483. if let result = try await iterator.next() {
  484. self.handleNameResolutionResult(result, in: &group)
  485. } else {
  486. self.beginGracefulShutdown()
  487. }
  488. } catch {
  489. self.beginGracefulShutdown()
  490. }
  491. }
  492. }
  493. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  494. extension GRPCChannel {
  495. struct StateMachine {
  496. enum State {
  497. case notRunning(NotRunning)
  498. case running(Running)
  499. case stopping(Stopping)
  500. case stopped
  501. case _modifying
  502. struct NotRunning {
  503. /// Queue of requests waiting for a load-balancer.
  504. var queue: RequestQueue
  505. /// A handle to the name resolver task.
  506. var nameResolverHandle: CancellableTaskHandle?
  507. init() {
  508. self.queue = RequestQueue()
  509. }
  510. }
  511. struct Running {
  512. /// The connectivity state of the channel.
  513. var connectivityState: ConnectivityState
  514. /// The load-balancer currently in use.
  515. var current: LoadBalancer
  516. /// The next load-balancer to use. This will be promoted to `current` when it enters the
  517. /// ready state.
  518. var next: LoadBalancer?
  519. /// Previously created load-balancers which are in the process of shutting down.
  520. var past: [LoadBalancerID: LoadBalancer]
  521. /// Queue of requests wait for a load-balancer.
  522. var queue: RequestQueue
  523. /// A handle to the name resolver task.
  524. var nameResolverHandle: CancellableTaskHandle?
  525. init(
  526. from state: NotRunning,
  527. loadBalancer: LoadBalancer
  528. ) {
  529. self.connectivityState = .idle
  530. self.current = loadBalancer
  531. self.next = nil
  532. self.past = [:]
  533. self.queue = state.queue
  534. self.nameResolverHandle = state.nameResolverHandle
  535. }
  536. }
  537. struct Stopping {
  538. /// Previously created load-balancers which are in the process of shutting down.
  539. var past: [LoadBalancerID: LoadBalancer]
  540. init(from state: Running) {
  541. self.past = state.past
  542. }
  543. init(loadBalancers: [LoadBalancerID: LoadBalancer]) {
  544. self.past = loadBalancers
  545. }
  546. }
  547. }
  548. /// The current state.
  549. private var state: State
  550. /// Whether the channel is running.
  551. private var running: Bool
  552. init() {
  553. self.state = .notRunning(State.NotRunning())
  554. self.running = false
  555. }
  556. }
  557. }
  558. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  559. extension GRPCChannel.StateMachine {
  560. mutating func start() {
  561. precondition(!self.running, "channel must only be started once")
  562. self.running = true
  563. }
  564. mutating func setNameResolverTaskHandle(_ handle: CancellableTaskHandle) {
  565. switch self.state {
  566. case .notRunning(var state):
  567. state.nameResolverHandle = handle
  568. self.state = .notRunning(state)
  569. case .running, .stopping, .stopped, ._modifying:
  570. fatalError("Invalid state")
  571. }
  572. }
  573. enum OnChangeLoadBalancer {
  574. case runLoadBalancer(LoadBalancer, stop: LoadBalancer?)
  575. case updateLoadBalancer(LoadBalancer)
  576. case none
  577. }
  578. mutating func changeLoadBalancerKind(
  579. to newLoadBalancerKind: GRPCChannel.SupportedLoadBalancerConfig,
  580. _ makeLoadBalancer: () -> LoadBalancer
  581. ) -> OnChangeLoadBalancer {
  582. let onChangeLoadBalancer: OnChangeLoadBalancer
  583. switch self.state {
  584. case .notRunning(let state):
  585. let loadBalancer = makeLoadBalancer()
  586. let state = State.Running(from: state, loadBalancer: loadBalancer)
  587. self.state = .running(state)
  588. onChangeLoadBalancer = .runLoadBalancer(state.current, stop: nil)
  589. case .running(var state):
  590. self.state = ._modifying
  591. if let next = state.next {
  592. if newLoadBalancerKind.matches(loadBalancer: next) {
  593. onChangeLoadBalancer = .updateLoadBalancer(next)
  594. } else {
  595. // The 'next' didn't become ready in time. Close it and replace it with a load-balancer
  596. // of the next kind.
  597. let nextNext = makeLoadBalancer()
  598. let previous = state.next
  599. state.next = nextNext
  600. state.past[next.id] = next
  601. onChangeLoadBalancer = .runLoadBalancer(nextNext, stop: previous)
  602. }
  603. } else {
  604. if newLoadBalancerKind.matches(loadBalancer: state.current) {
  605. onChangeLoadBalancer = .updateLoadBalancer(state.current)
  606. } else {
  607. // Create the 'next' load-balancer, it'll replace 'current' when it becomes ready.
  608. let next = makeLoadBalancer()
  609. state.next = next
  610. onChangeLoadBalancer = .runLoadBalancer(next, stop: nil)
  611. }
  612. }
  613. self.state = .running(state)
  614. case .stopping, .stopped:
  615. onChangeLoadBalancer = .none
  616. case ._modifying:
  617. fatalError("Invalid state")
  618. }
  619. return onChangeLoadBalancer
  620. }
  621. struct ConnectivityStateChangeActions {
  622. var close: LoadBalancer? = nil
  623. var publishState: ConnectivityState? = nil
  624. var resumeContinuations: ResumableContinuations? = nil
  625. var finish: Bool = false
  626. struct ResumableContinuations {
  627. var continuations: [CheckedContinuation<LoadBalancer, any Error>]
  628. var result: Result<LoadBalancer, any Error>
  629. }
  630. }
  631. mutating func loadBalancerStateChanged(
  632. to connectivityState: ConnectivityState,
  633. id: LoadBalancerID
  634. ) -> ConnectivityStateChangeActions {
  635. var actions = ConnectivityStateChangeActions()
  636. switch self.state {
  637. case .running(var state):
  638. self.state = ._modifying
  639. if id == state.current.id {
  640. // No change in state, ignore.
  641. if state.connectivityState == connectivityState {
  642. self.state = .running(state)
  643. break
  644. }
  645. state.connectivityState = connectivityState
  646. actions.publishState = connectivityState
  647. switch connectivityState {
  648. case .ready:
  649. // Current load-balancer became ready; resume all continuations in the queue.
  650. let continuations = state.queue.removeAll()
  651. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  652. continuations: continuations,
  653. result: .success(state.current)
  654. )
  655. case .transientFailure, .shutdown: // shutdown includes shutting down
  656. // Current load-balancer failed. Remove all the 'fast-failing' continuations in the
  657. // queue, these are RPCs which set the 'wait for ready' option to false. The rest of
  658. // the entries in the queue will wait for a load-balancer to become ready.
  659. let continuations = state.queue.removeFastFailingEntries()
  660. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  661. continuations: continuations,
  662. result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
  663. )
  664. case .idle, .connecting:
  665. () // Ignore.
  666. }
  667. } else if let next = state.next, next.id == id {
  668. // State change came from the next LB, if it's now ready promote it to be the current.
  669. switch connectivityState {
  670. case .ready:
  671. // Next load-balancer is ready, promote it to current.
  672. let previous = state.current
  673. state.past[previous.id] = previous
  674. state.current = next
  675. state.next = nil
  676. actions.close = previous
  677. if state.connectivityState != connectivityState {
  678. actions.publishState = connectivityState
  679. }
  680. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  681. continuations: state.queue.removeAll(),
  682. result: .success(next)
  683. )
  684. case .idle, .connecting, .transientFailure, .shutdown:
  685. ()
  686. }
  687. }
  688. self.state = .running(state)
  689. case .stopping(var state):
  690. self.state = ._modifying
  691. // Remove the load balancer if it's now shutdown.
  692. switch connectivityState {
  693. case .shutdown:
  694. state.past.removeValue(forKey: id)
  695. case .idle, .connecting, .ready, .transientFailure:
  696. ()
  697. }
  698. // If that was the last load-balancer then finish the input streams so that the channel
  699. // eventually finishes.
  700. if state.past.isEmpty {
  701. actions.finish = true
  702. self.state = .stopped
  703. } else {
  704. self.state = .stopping(state)
  705. }
  706. case .notRunning, .stopped:
  707. ()
  708. case ._modifying:
  709. fatalError("Invalid state")
  710. }
  711. return actions
  712. }
  713. enum OnMakeStream {
  714. /// Use the given load-balancer to make a stream.
  715. case useLoadBalancer(LoadBalancer)
  716. /// Join the queue and wait until a load-balancer becomes ready.
  717. case joinQueue
  718. /// Fail the stream request, the channel isn't in a suitable state.
  719. case failRPC
  720. }
  721. func makeStream(waitForReady: Bool) -> OnMakeStream {
  722. let onMakeStream: OnMakeStream
  723. switch self.state {
  724. case .notRunning:
  725. onMakeStream = .joinQueue
  726. case .running(let state):
  727. switch state.connectivityState {
  728. case .idle, .connecting:
  729. onMakeStream = .joinQueue
  730. case .ready:
  731. onMakeStream = .useLoadBalancer(state.current)
  732. case .transientFailure:
  733. onMakeStream = waitForReady ? .joinQueue : .failRPC
  734. case .shutdown:
  735. onMakeStream = .failRPC
  736. }
  737. case .stopping, .stopped:
  738. onMakeStream = .failRPC
  739. case ._modifying:
  740. fatalError("Invalid state")
  741. }
  742. return onMakeStream
  743. }
  744. mutating func enqueue(
  745. continuation: CheckedContinuation<LoadBalancer, any Error>,
  746. waitForReady: Bool,
  747. id: QueueEntryID
  748. ) -> Bool {
  749. switch self.state {
  750. case .notRunning(var state):
  751. self.state = ._modifying
  752. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  753. self.state = .notRunning(state)
  754. return true
  755. case .running(var state):
  756. self.state = ._modifying
  757. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  758. self.state = .running(state)
  759. return true
  760. case .stopping, .stopped:
  761. return false
  762. case ._modifying:
  763. fatalError("Invalid state")
  764. }
  765. }
  766. mutating func dequeueContinuation(
  767. id: QueueEntryID
  768. ) -> CheckedContinuation<LoadBalancer, any Error>? {
  769. switch self.state {
  770. case .notRunning(var state):
  771. self.state = ._modifying
  772. let continuation = state.queue.removeEntry(withID: id)
  773. self.state = .notRunning(state)
  774. return continuation
  775. case .running(var state):
  776. self.state = ._modifying
  777. let continuation = state.queue.removeEntry(withID: id)
  778. self.state = .running(state)
  779. return continuation
  780. case .stopping, .stopped:
  781. return nil
  782. case ._modifying:
  783. fatalError("Invalid state")
  784. }
  785. }
  786. enum OnClose {
  787. case none
  788. case cancelAll([RequestQueue.Continuation])
  789. case close(LoadBalancer, LoadBalancer?, CancellableTaskHandle?, [RequestQueue.Continuation])
  790. }
  791. mutating func close() -> OnClose {
  792. let onClose: OnClose
  793. switch self.state {
  794. case .notRunning(var state):
  795. self.state = .stopped
  796. onClose = .cancelAll(state.queue.removeAll())
  797. case .running(var state):
  798. let continuations = state.queue.removeAll()
  799. onClose = .close(state.current, state.next, state.nameResolverHandle, continuations)
  800. state.past[state.current.id] = state.current
  801. if let next = state.next {
  802. state.past[next.id] = next
  803. }
  804. self.state = .stopping(State.Stopping(loadBalancers: state.past))
  805. case .stopping, .stopped:
  806. onClose = .none
  807. case ._modifying:
  808. fatalError("Invalid state")
  809. }
  810. return onClose
  811. }
  812. }