GRPCChannel.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import DequeModule
  18. @_spi(Package) import GRPCCore
  19. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  20. @_spi(Package)
  21. public struct GRPCChannel: ClientTransport {
  22. private enum Input: Sendable {
  23. /// Close the channel, if possible.
  24. case close
  25. /// Handle the result of a name resolution.
  26. case handleResolutionResult(NameResolutionResult)
  27. /// Handle the event from the underlying connection object.
  28. case handleLoadBalancerEvent(LoadBalancerEvent, LoadBalancerID)
  29. }
  30. /// Events which can happen to the channel.
  31. private let _connectivityState:
  32. (
  33. stream: AsyncStream<ConnectivityState>,
  34. continuation: AsyncStream<ConnectivityState>.Continuation
  35. )
  36. /// Inputs which this channel should react to.
  37. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  38. /// A resolver providing resolved names to the channel.
  39. private let resolver: NameResolver
  40. /// The state of the channel.
  41. private let state: _LockedValueBox<StateMachine>
  42. /// The maximum number of times to attempt to create a stream per RPC.
  43. ///
  44. /// This is the value used by other gRPC implementations.
  45. private static let maxStreamCreationAttempts = 5
  46. /// A factory for connections.
  47. private let connector: any HTTP2Connector
  48. /// The connection backoff configuration used by the subchannel when establishing a connection.
  49. private let backoff: ConnectionBackoff
  50. /// The default compression algorithm used for requests.
  51. private let defaultCompression: CompressionAlgorithm
  52. /// The set of enabled compression algorithms.
  53. private let enabledCompression: CompressionAlgorithmSet
  54. /// The default service config to use.
  55. ///
  56. /// Used when the resolver doesn't provide one.
  57. private let defaultServiceConfig: ServiceConfig
  58. // These are both read frequently and updated infrequently so may be a bottleneck.
  59. private let _methodConfig: _LockedValueBox<_MethodConfigs>
  60. private let _retryThrottle: _LockedValueBox<RetryThrottle?>
  61. @_spi(Package)
  62. public init(
  63. resolver: NameResolver,
  64. connector: any HTTP2Connector,
  65. config: Config,
  66. defaultServiceConfig: ServiceConfig
  67. ) {
  68. self.resolver = resolver
  69. self.state = _LockedValueBox(StateMachine())
  70. self._connectivityState = AsyncStream.makeStream()
  71. self.input = AsyncStream.makeStream()
  72. self.connector = connector
  73. self.backoff = ConnectionBackoff(
  74. initial: config.backoff.initial,
  75. max: config.backoff.max,
  76. multiplier: config.backoff.multiplier,
  77. jitter: config.backoff.jitter
  78. )
  79. self.defaultCompression = config.compression.algorithm
  80. self.enabledCompression = config.compression.enabledAlgorithms
  81. self.defaultServiceConfig = defaultServiceConfig
  82. let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
  83. self._retryThrottle = _LockedValueBox(throttle)
  84. let methodConfig = _MethodConfigs(serviceConfig: defaultServiceConfig)
  85. self._methodConfig = _LockedValueBox(methodConfig)
  86. }
  87. /// The connectivity state of the channel.
  88. var connectivityState: AsyncStream<ConnectivityState> {
  89. self._connectivityState.stream
  90. }
  91. /// Returns a throttle which gRPC uses to determine whether retries can be executed.
  92. public var retryThrottle: RetryThrottle? {
  93. self._retryThrottle.withLockedValue { $0 }
  94. }
  95. /// Returns the configuration for a given method.
  96. ///
  97. /// - Parameter descriptor: The method to lookup configuration for.
  98. /// - Returns: Configuration for the method, if it exists.
  99. public func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
  100. self._methodConfig.withLockedValue { $0[descriptor] }
  101. }
  102. /// Establishes and maintains a connection to the remote destination.
  103. public func connect() async {
  104. self.state.withLockedValue { $0.start() }
  105. self._connectivityState.continuation.yield(.idle)
  106. await withDiscardingTaskGroup { group in
  107. var iterator: Optional<RPCAsyncSequence<NameResolutionResult>.AsyncIterator>
  108. // The resolver can either push or pull values. If it pushes values the channel should
  109. // listen for new results. Otherwise the channel will pull values as and when necessary.
  110. switch self.resolver.updateMode.value {
  111. case .push:
  112. iterator = nil
  113. let handle = group.addCancellableTask {
  114. do {
  115. for try await result in self.resolver.names {
  116. self.input.continuation.yield(.handleResolutionResult(result))
  117. }
  118. self.close()
  119. } catch {
  120. self.close()
  121. }
  122. }
  123. // When the channel is closed gracefully, the task group running the load balancer mustn't
  124. // be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
  125. // continue indefinitely. Store its handle and cancel it on close when closing the channel.
  126. self.state.withLockedValue { state in
  127. state.setNameResolverTaskHandle(handle)
  128. }
  129. case .pull:
  130. iterator = self.resolver.names.makeAsyncIterator()
  131. await self.resolve(iterator: &iterator, in: &group)
  132. }
  133. // Resolver is setup, start handling events.
  134. for await input in self.input.stream {
  135. switch input {
  136. case .close:
  137. self.handleClose(in: &group)
  138. case .handleResolutionResult(let result):
  139. self.handleNameResolutionResult(result, in: &group)
  140. case .handleLoadBalancerEvent(let event, let id):
  141. await self.handleLoadBalancerEvent(
  142. event,
  143. loadBalancerID: id,
  144. in: &group,
  145. iterator: &iterator
  146. )
  147. }
  148. }
  149. }
  150. if Task.isCancelled {
  151. self._connectivityState.continuation.finish()
  152. }
  153. }
  154. /// Signal to the transport that no new streams may be created and that connections should be
  155. /// closed when all streams are closed.
  156. public func close() {
  157. self.input.continuation.yield(.close)
  158. }
  159. /// Opens a stream using the transport, and uses it as input into a user-provided closure.
  160. public func withStream<T>(
  161. descriptor: MethodDescriptor,
  162. options: CallOptions,
  163. _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
  164. ) async throws -> T {
  165. // Merge options from the call with those from the service config.
  166. let methodConfig = self.configuration(forMethod: descriptor)
  167. var options = options
  168. options.formUnion(with: methodConfig)
  169. for attempt in 1 ... Self.maxStreamCreationAttempts {
  170. switch await self.makeStream(descriptor: descriptor, options: options) {
  171. case .created(let stream):
  172. return try await stream.execute { inbound, outbound in
  173. let rpcStream = RPCStream(
  174. descriptor: stream.descriptor,
  175. inbound: RPCAsyncSequence(wrapping: inbound),
  176. outbound: RPCWriter.Closable(wrapping: outbound)
  177. )
  178. return try await closure(rpcStream)
  179. }
  180. case .tryAgain(let error):
  181. if error is CancellationError || attempt == Self.maxStreamCreationAttempts {
  182. throw error
  183. } else {
  184. continue
  185. }
  186. case .stopTrying(let error):
  187. throw error
  188. }
  189. }
  190. fatalError("Internal inconsistency")
  191. }
  192. }
  193. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  194. extension GRPCChannel {
  195. @_spi(Package)
  196. public struct Config: Sendable {
  197. /// Configuration for HTTP/2 connections.
  198. var http2: HTTP2ClientTransport.Config.HTTP2
  199. /// Configuration for backoff used when establishing a connection.
  200. var backoff: HTTP2ClientTransport.Config.Backoff
  201. /// Configuration for connection management.
  202. var connection: HTTP2ClientTransport.Config.Connection
  203. /// Compression configuration.
  204. var compression: HTTP2ClientTransport.Config.Compression
  205. @_spi(Package)
  206. public init(
  207. http2: HTTP2ClientTransport.Config.HTTP2,
  208. backoff: HTTP2ClientTransport.Config.Backoff,
  209. connection: HTTP2ClientTransport.Config.Connection,
  210. compression: HTTP2ClientTransport.Config.Compression
  211. ) {
  212. self.http2 = http2
  213. self.backoff = backoff
  214. self.connection = connection
  215. self.compression = compression
  216. }
  217. }
  218. }
  219. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  220. extension GRPCChannel {
  221. enum MakeStreamResult {
  222. /// A stream was created, use it.
  223. case created(Connection.Stream)
  224. /// An error occurred while trying to create a stream, try again if possible.
  225. case tryAgain(Error)
  226. /// An unrecoverable error occurred (e.g. the channel is closed), fail the RPC and don't retry.
  227. case stopTrying(Error)
  228. }
  229. private func makeStream(
  230. descriptor: MethodDescriptor,
  231. options: CallOptions
  232. ) async -> MakeStreamResult {
  233. let waitForReady = options.waitForReady ?? true
  234. switch self.state.withLockedValue({ $0.makeStream(waitForReady: waitForReady) }) {
  235. case .useLoadBalancer(let loadBalancer):
  236. return await self.makeStream(
  237. descriptor: descriptor,
  238. options: options,
  239. loadBalancer: loadBalancer
  240. )
  241. case .joinQueue:
  242. do {
  243. let loadBalancer = try await self.enqueue(waitForReady: waitForReady)
  244. return await self.makeStream(
  245. descriptor: descriptor,
  246. options: options,
  247. loadBalancer: loadBalancer
  248. )
  249. } catch {
  250. // All errors from enqueue are non-recoverable: either the channel is shutting down or
  251. // the request has been cancelled.
  252. return .stopTrying(error)
  253. }
  254. case .failRPC:
  255. return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
  256. }
  257. }
  258. private func makeStream(
  259. descriptor: MethodDescriptor,
  260. options: CallOptions,
  261. loadBalancer: LoadBalancer
  262. ) async -> MakeStreamResult {
  263. guard let subchannel = loadBalancer.pickSubchannel() else {
  264. return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
  265. }
  266. let methodConfig = self.configuration(forMethod: descriptor)
  267. var options = options
  268. options.formUnion(with: methodConfig)
  269. do {
  270. let stream = try await subchannel.makeStream(descriptor: descriptor, options: options)
  271. return .created(stream)
  272. } catch {
  273. return .tryAgain(error)
  274. }
  275. }
  276. private func enqueue(waitForReady: Bool) async throws -> LoadBalancer {
  277. let id = QueueEntryID()
  278. return try await withTaskCancellationHandler {
  279. try await withCheckedThrowingContinuation { continuation in
  280. if Task.isCancelled {
  281. continuation.resume(throwing: CancellationError())
  282. return
  283. }
  284. let enqueued = self.state.withLockedValue { state in
  285. state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
  286. }
  287. // Not enqueued because the channel is shutdown or shutting down.
  288. if !enqueued {
  289. let error = RPCError(code: .unavailable, message: "channel is shutdown")
  290. continuation.resume(throwing: error)
  291. }
  292. }
  293. } onCancel: {
  294. let continuation = self.state.withLockedValue { state in
  295. state.dequeueContinuation(id: id)
  296. }
  297. continuation?.resume(throwing: CancellationError())
  298. }
  299. }
  300. }
  301. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  302. extension GRPCChannel {
  303. private func handleClose(in group: inout DiscardingTaskGroup) {
  304. switch self.state.withLockedValue({ $0.close() }) {
  305. case .close(let current, let next, let resolver, let continuations):
  306. resolver?.cancel()
  307. current.close()
  308. next?.close()
  309. for continuation in continuations {
  310. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  311. }
  312. self._connectivityState.continuation.yield(.shutdown)
  313. case .cancelAll(let continuations):
  314. for continuation in continuations {
  315. continuation.resume(throwing: RPCError(code: .unavailable, message: "channel is closed"))
  316. }
  317. self._connectivityState.continuation.yield(.shutdown)
  318. group.cancelAll()
  319. case .none:
  320. ()
  321. }
  322. }
  323. private func handleNameResolutionResult(
  324. _ result: NameResolutionResult,
  325. in group: inout DiscardingTaskGroup
  326. ) {
  327. // Ignore empty endpoint lists.
  328. if result.endpoints.isEmpty { return }
  329. switch result.serviceConfig ?? .success(self.defaultServiceConfig) {
  330. case .success(let config):
  331. // Update per RPC configuration.
  332. let methodConfig = _MethodConfigs(serviceConfig: config)
  333. self._methodConfig.withLockedValue { $0 = methodConfig }
  334. let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
  335. self._retryThrottle.withLockedValue { $0 = retryThrottle }
  336. // Update the load balancer.
  337. self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
  338. case .failure:
  339. self.close()
  340. }
  341. }
  342. enum SupportedLoadBalancerConfig {
  343. case roundRobin
  344. case pickFirst(ServiceConfig.LoadBalancingConfig.PickFirst)
  345. init?(_ config: ServiceConfig.LoadBalancingConfig) {
  346. if let pickFirst = config.pickFirst {
  347. self = .pickFirst(pickFirst)
  348. } else if config.roundRobin != nil {
  349. self = .roundRobin
  350. } else {
  351. return nil
  352. }
  353. }
  354. func matches(loadBalancer: LoadBalancer) -> Bool {
  355. switch (self, loadBalancer) {
  356. case (.roundRobin, .roundRobin):
  357. return true
  358. case (.pickFirst, .pickFirst):
  359. return true
  360. case (.roundRobin, .pickFirst),
  361. (.pickFirst, .roundRobin):
  362. return false
  363. }
  364. }
  365. }
  366. private func updateLoadBalancer(
  367. serviceConfig: ServiceConfig,
  368. endpoints: [Endpoint],
  369. in group: inout DiscardingTaskGroup
  370. ) {
  371. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  372. // Find the first supported config.
  373. var configFromServiceConfig: SupportedLoadBalancerConfig?
  374. for config in serviceConfig.loadBalancingConfig {
  375. if let config = SupportedLoadBalancerConfig(config) {
  376. configFromServiceConfig = config
  377. break
  378. }
  379. }
  380. let onUpdatePolicy: GRPCChannel.StateMachine.OnChangeLoadBalancer
  381. var endpoints = endpoints
  382. // Fallback to pick-first if no other config applies.
  383. let loadBalancerConfig = configFromServiceConfig ?? .pickFirst(.init(shuffleAddressList: false))
  384. switch loadBalancerConfig {
  385. case .roundRobin:
  386. onUpdatePolicy = self.state.withLockedValue { state in
  387. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  388. let loadBalancer = RoundRobinLoadBalancer(
  389. connector: self.connector,
  390. backoff: self.backoff,
  391. defaultCompression: self.defaultCompression,
  392. enabledCompression: self.enabledCompression
  393. )
  394. return .roundRobin(loadBalancer)
  395. }
  396. }
  397. case .pickFirst(let pickFirst):
  398. if pickFirst.shuffleAddressList {
  399. endpoints[0].addresses.shuffle()
  400. }
  401. onUpdatePolicy = self.state.withLockedValue { state in
  402. state.changeLoadBalancerKind(to: loadBalancerConfig) {
  403. let loadBalancer = PickFirstLoadBalancer(
  404. connector: self.connector,
  405. backoff: self.backoff,
  406. defaultCompression: self.defaultCompression,
  407. enabledCompression: self.enabledCompression
  408. )
  409. return .pickFirst(loadBalancer)
  410. }
  411. }
  412. }
  413. self.handleLoadBalancerChange(onUpdatePolicy, endpoints: endpoints, in: &group)
  414. }
  415. private func handleLoadBalancerChange(
  416. _ update: StateMachine.OnChangeLoadBalancer,
  417. endpoints: [Endpoint],
  418. in group: inout DiscardingTaskGroup
  419. ) {
  420. assert(!endpoints.isEmpty, "endpoints must be non-empty")
  421. switch update {
  422. case .runLoadBalancer(let new, let old):
  423. old?.close()
  424. switch new {
  425. case .roundRobin(let loadBalancer):
  426. loadBalancer.updateAddresses(endpoints)
  427. case .pickFirst(let loadBalancer):
  428. loadBalancer.updateEndpoint(endpoints.first!)
  429. }
  430. group.addTask {
  431. await new.run()
  432. }
  433. group.addTask {
  434. for await event in new.events {
  435. self.input.continuation.yield(.handleLoadBalancerEvent(event, new.id))
  436. }
  437. }
  438. case .updateLoadBalancer(let existing):
  439. switch existing {
  440. case .roundRobin(let loadBalancer):
  441. loadBalancer.updateAddresses(endpoints)
  442. case .pickFirst(let loadBalancer):
  443. loadBalancer.updateEndpoint(endpoints.first!)
  444. }
  445. case .none:
  446. ()
  447. }
  448. }
  449. private func handleLoadBalancerEvent(
  450. _ event: LoadBalancerEvent,
  451. loadBalancerID: LoadBalancerID,
  452. in group: inout DiscardingTaskGroup,
  453. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?
  454. ) async {
  455. switch event {
  456. case .connectivityStateChanged(let connectivityState):
  457. let actions = self.state.withLockedValue { state in
  458. state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
  459. }
  460. if let newState = actions.publishState {
  461. self._connectivityState.continuation.yield(newState)
  462. }
  463. if let subchannel = actions.close {
  464. subchannel.close()
  465. }
  466. if let resumable = actions.resumeContinuations {
  467. for continuation in resumable.continuations {
  468. continuation.resume(with: resumable.result)
  469. }
  470. }
  471. if actions.finish {
  472. // Fully closed.
  473. self._connectivityState.continuation.finish()
  474. self.input.continuation.finish()
  475. }
  476. case .requiresNameResolution:
  477. await self.resolve(iterator: &iterator, in: &group)
  478. }
  479. }
  480. private func resolve(
  481. iterator: inout RPCAsyncSequence<NameResolutionResult>.AsyncIterator?,
  482. in group: inout DiscardingTaskGroup
  483. ) async {
  484. guard var iterator = iterator else { return }
  485. do {
  486. if let result = try await iterator.next() {
  487. self.handleNameResolutionResult(result, in: &group)
  488. } else {
  489. self.close()
  490. }
  491. } catch {
  492. self.close()
  493. }
  494. }
  495. }
  496. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  497. extension GRPCChannel {
  498. struct StateMachine {
  499. enum State {
  500. case notRunning(NotRunning)
  501. case running(Running)
  502. case stopping(Stopping)
  503. case stopped
  504. case _modifying
  505. struct NotRunning {
  506. /// Queue of requests waiting for a load-balancer.
  507. var queue: RequestQueue
  508. /// A handle to the name resolver task.
  509. var nameResolverHandle: CancellableTaskHandle?
  510. init() {
  511. self.queue = RequestQueue()
  512. }
  513. }
  514. struct Running {
  515. /// The connectivity state of the channel.
  516. var connectivityState: ConnectivityState
  517. /// The load-balancer currently in use.
  518. var current: LoadBalancer
  519. /// The next load-balancer to use. This will be promoted to `current` when it enters the
  520. /// ready state.
  521. var next: LoadBalancer?
  522. /// Previously created load-balancers which are in the process of shutting down.
  523. var past: [LoadBalancerID: LoadBalancer]
  524. /// Queue of requests wait for a load-balancer.
  525. var queue: RequestQueue
  526. /// A handle to the name resolver task.
  527. var nameResolverHandle: CancellableTaskHandle?
  528. init(
  529. from state: NotRunning,
  530. loadBalancer: LoadBalancer
  531. ) {
  532. self.connectivityState = .idle
  533. self.current = loadBalancer
  534. self.next = nil
  535. self.past = [:]
  536. self.queue = state.queue
  537. self.nameResolverHandle = state.nameResolverHandle
  538. }
  539. }
  540. struct Stopping {
  541. /// Previously created load-balancers which are in the process of shutting down.
  542. var past: [LoadBalancerID: LoadBalancer]
  543. init(from state: Running) {
  544. self.past = state.past
  545. }
  546. init(loadBalancers: [LoadBalancerID: LoadBalancer]) {
  547. self.past = loadBalancers
  548. }
  549. }
  550. }
  551. /// The current state.
  552. private var state: State
  553. /// Whether the channel is running.
  554. private var running: Bool
  555. init() {
  556. self.state = .notRunning(State.NotRunning())
  557. self.running = false
  558. }
  559. }
  560. }
  561. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  562. extension GRPCChannel.StateMachine {
  563. mutating func start() {
  564. precondition(!self.running, "channel must only be started once")
  565. self.running = true
  566. }
  567. mutating func setNameResolverTaskHandle(_ handle: CancellableTaskHandle) {
  568. switch self.state {
  569. case .notRunning(var state):
  570. state.nameResolverHandle = handle
  571. self.state = .notRunning(state)
  572. case .running, .stopping, .stopped, ._modifying:
  573. fatalError("Invalid state")
  574. }
  575. }
  576. enum OnChangeLoadBalancer {
  577. case runLoadBalancer(LoadBalancer, stop: LoadBalancer?)
  578. case updateLoadBalancer(LoadBalancer)
  579. case none
  580. }
  581. mutating func changeLoadBalancerKind(
  582. to newLoadBalancerKind: GRPCChannel.SupportedLoadBalancerConfig,
  583. _ makeLoadBalancer: () -> LoadBalancer
  584. ) -> OnChangeLoadBalancer {
  585. let onChangeLoadBalancer: OnChangeLoadBalancer
  586. switch self.state {
  587. case .notRunning(let state):
  588. let loadBalancer = makeLoadBalancer()
  589. let state = State.Running(from: state, loadBalancer: loadBalancer)
  590. self.state = .running(state)
  591. onChangeLoadBalancer = .runLoadBalancer(state.current, stop: nil)
  592. case .running(var state):
  593. self.state = ._modifying
  594. if let next = state.next {
  595. if newLoadBalancerKind.matches(loadBalancer: next) {
  596. onChangeLoadBalancer = .updateLoadBalancer(next)
  597. } else {
  598. // The 'next' didn't become ready in time. Close it and replace it with a load-balancer
  599. // of the next kind.
  600. let nextNext = makeLoadBalancer()
  601. let previous = state.next
  602. state.next = nextNext
  603. state.past[next.id] = next
  604. onChangeLoadBalancer = .runLoadBalancer(nextNext, stop: previous)
  605. }
  606. } else {
  607. if newLoadBalancerKind.matches(loadBalancer: state.current) {
  608. onChangeLoadBalancer = .updateLoadBalancer(state.current)
  609. } else {
  610. // Create the 'next' load-balancer, it'll replace 'current' when it becomes ready.
  611. let next = makeLoadBalancer()
  612. state.next = next
  613. onChangeLoadBalancer = .runLoadBalancer(next, stop: nil)
  614. }
  615. }
  616. self.state = .running(state)
  617. case .stopping, .stopped:
  618. onChangeLoadBalancer = .none
  619. case ._modifying:
  620. fatalError("Invalid state")
  621. }
  622. return onChangeLoadBalancer
  623. }
  624. struct ConnectivityStateChangeActions {
  625. var close: LoadBalancer? = nil
  626. var publishState: ConnectivityState? = nil
  627. var resumeContinuations: ResumableContinuations? = nil
  628. var finish: Bool = false
  629. struct ResumableContinuations {
  630. var continuations: [CheckedContinuation<LoadBalancer, Error>]
  631. var result: Result<LoadBalancer, Error>
  632. }
  633. }
  634. mutating func loadBalancerStateChanged(
  635. to connectivityState: ConnectivityState,
  636. id: LoadBalancerID
  637. ) -> ConnectivityStateChangeActions {
  638. var actions = ConnectivityStateChangeActions()
  639. switch self.state {
  640. case .running(var state):
  641. self.state = ._modifying
  642. if id == state.current.id {
  643. // No change in state, ignore.
  644. if state.connectivityState == connectivityState {
  645. self.state = .running(state)
  646. break
  647. }
  648. state.connectivityState = connectivityState
  649. actions.publishState = connectivityState
  650. switch connectivityState {
  651. case .ready:
  652. // Current load-balancer became ready; resume all continuations in the queue.
  653. let continuations = state.queue.removeAll()
  654. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  655. continuations: continuations,
  656. result: .success(state.current)
  657. )
  658. case .transientFailure, .shutdown: // shutdown includes shutting down
  659. // Current load-balancer failed. Remove all the 'fast-failing' continuations in the
  660. // queue, these are RPCs which set the 'wait for ready' option to false. The rest of
  661. // the entries in the queue will wait for a load-balancer to become ready.
  662. let continuations = state.queue.removeFastFailingEntries()
  663. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  664. continuations: continuations,
  665. result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
  666. )
  667. case .idle, .connecting:
  668. () // Ignore.
  669. }
  670. } else if let next = state.next, next.id == id {
  671. // State change came from the next LB, if it's now ready promote it to be the current.
  672. switch connectivityState {
  673. case .ready:
  674. // Next load-balancer is ready, promote it to current.
  675. let previous = state.current
  676. state.past[previous.id] = previous
  677. state.current = next
  678. state.next = nil
  679. actions.close = previous
  680. if state.connectivityState != connectivityState {
  681. actions.publishState = connectivityState
  682. }
  683. actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
  684. continuations: state.queue.removeAll(),
  685. result: .success(next)
  686. )
  687. case .idle, .connecting, .transientFailure, .shutdown:
  688. ()
  689. }
  690. }
  691. self.state = .running(state)
  692. case .stopping(var state):
  693. self.state = ._modifying
  694. // Remove the load balancer if it's now shutdown.
  695. switch connectivityState {
  696. case .shutdown:
  697. state.past.removeValue(forKey: id)
  698. case .idle, .connecting, .ready, .transientFailure:
  699. ()
  700. }
  701. // If that was the last load-balancer then finish the input streams so that the channel
  702. // eventually finishes.
  703. if state.past.isEmpty {
  704. actions.finish = true
  705. self.state = .stopped
  706. } else {
  707. self.state = .stopping(state)
  708. }
  709. case .notRunning, .stopped:
  710. ()
  711. case ._modifying:
  712. fatalError("Invalid state")
  713. }
  714. return actions
  715. }
  716. enum OnMakeStream {
  717. /// Use the given load-balancer to make a stream.
  718. case useLoadBalancer(LoadBalancer)
  719. /// Join the queue and wait until a load-balancer becomes ready.
  720. case joinQueue
  721. /// Fail the stream request, the channel isn't in a suitable state.
  722. case failRPC
  723. }
  724. func makeStream(waitForReady: Bool) -> OnMakeStream {
  725. let onMakeStream: OnMakeStream
  726. switch self.state {
  727. case .notRunning:
  728. onMakeStream = .joinQueue
  729. case .running(let state):
  730. switch state.connectivityState {
  731. case .idle, .connecting:
  732. onMakeStream = .joinQueue
  733. case .ready:
  734. onMakeStream = .useLoadBalancer(state.current)
  735. case .transientFailure:
  736. onMakeStream = waitForReady ? .joinQueue : .failRPC
  737. case .shutdown:
  738. onMakeStream = .failRPC
  739. }
  740. case .stopping, .stopped:
  741. onMakeStream = .failRPC
  742. case ._modifying:
  743. fatalError("Invalid state")
  744. }
  745. return onMakeStream
  746. }
  747. mutating func enqueue(
  748. continuation: CheckedContinuation<LoadBalancer, Error>,
  749. waitForReady: Bool,
  750. id: QueueEntryID
  751. ) -> Bool {
  752. switch self.state {
  753. case .notRunning(var state):
  754. self.state = ._modifying
  755. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  756. self.state = .notRunning(state)
  757. return true
  758. case .running(var state):
  759. self.state = ._modifying
  760. state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
  761. self.state = .running(state)
  762. return true
  763. case .stopping, .stopped:
  764. return false
  765. case ._modifying:
  766. fatalError("Invalid state")
  767. }
  768. }
  769. mutating func dequeueContinuation(
  770. id: QueueEntryID
  771. ) -> CheckedContinuation<LoadBalancer, Error>? {
  772. switch self.state {
  773. case .notRunning(var state):
  774. self.state = ._modifying
  775. let continuation = state.queue.removeEntry(withID: id)
  776. self.state = .notRunning(state)
  777. return continuation
  778. case .running(var state):
  779. self.state = ._modifying
  780. let continuation = state.queue.removeEntry(withID: id)
  781. self.state = .running(state)
  782. return continuation
  783. case .stopping, .stopped:
  784. return nil
  785. case ._modifying:
  786. fatalError("Invalid state")
  787. }
  788. }
  789. enum OnClose {
  790. case none
  791. case cancelAll([RequestQueue.Continuation])
  792. case close(LoadBalancer, LoadBalancer?, CancellableTaskHandle?, [RequestQueue.Continuation])
  793. }
  794. mutating func close() -> OnClose {
  795. let onClose: OnClose
  796. switch self.state {
  797. case .notRunning(var state):
  798. self.state = .stopped
  799. onClose = .cancelAll(state.queue.removeAll())
  800. case .running(var state):
  801. let continuations = state.queue.removeAll()
  802. onClose = .close(state.current, state.next, state.nameResolverHandle, continuations)
  803. state.past[state.current.id] = state.current
  804. if let next = state.next {
  805. state.past[next.id] = next
  806. }
  807. self.state = .stopping(State.Stopping(loadBalancers: state.past))
  808. case .stopping, .stopped:
  809. onClose = .none
  810. case ._modifying:
  811. fatalError("Invalid state")
  812. }
  813. return onClose
  814. }
  815. }