Subchannel.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. /// A ``Subchannel`` provides communication to a single ``Endpoint``.
  19. ///
  20. /// Each ``Subchannel`` starts in an 'idle' state where it isn't attempting to connect to an
  21. /// endpoint. You can tell it to start connecting by calling ``connect()`` and you can listen
  22. /// to connectivity state changes by consuming the ``events`` sequence.
  23. ///
  24. /// You must call ``shutDown()`` on the ``Subchannel`` when it's no longer required. This will move
  25. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  26. /// calls to ``makeStream(descriptor:options:)`` will fail.
  27. ///
  28. /// To use the ``Subchannel`` you must run it in a task:
  29. ///
  30. /// ```swift
  31. /// await withTaskGroup(of: Void.self) { group in
  32. /// group.addTask { await subchannel.run() }
  33. ///
  34. /// for await event in subchannel.events {
  35. /// switch event {
  36. /// case .connectivityStateChanged(.ready):
  37. /// // ...
  38. /// default:
  39. /// // ...
  40. /// }
  41. /// }
  42. /// }
  43. /// ```
  44. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  45. struct Subchannel {
  46. enum Event: Sendable, Hashable {
  47. /// The connection received a GOAWAY and will close soon. No new streams
  48. /// should be opened on this connection.
  49. case goingAway
  50. /// The connectivity state of the subchannel changed.
  51. case connectivityStateChanged(ConnectivityState)
  52. /// The subchannel requests that the load balancer re-resolves names.
  53. case requiresNameResolution
  54. }
  55. private enum Input: Sendable {
  56. /// Request that the connection starts connecting.
  57. case connect
  58. /// A backoff period has ended.
  59. case backedOff
  60. /// Shuts down the connection, if possible.
  61. case shutDown
  62. /// Handle the event from the underlying connection object.
  63. case handleConnectionEvent(Connection.Event)
  64. }
  65. /// Events which can happen to the subchannel.
  66. private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
  67. /// Inputs which this subchannel should react to.
  68. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  69. /// The state of the subchannel.
  70. private let state: NIOLockedValueBox<State>
  71. /// The endpoint this subchannel is targeting.
  72. let endpoint: Endpoint
  73. /// The ID of the subchannel.
  74. let id: SubchannelID
  75. /// A factory for connections.
  76. private let connector: any HTTP2Connector
  77. /// The connection backoff configuration used by the subchannel when establishing a connection.
  78. private let backoff: ConnectionBackoff
  79. /// The default compression algorithm used for requests.
  80. private let defaultCompression: CompressionAlgorithm
  81. /// The set of enabled compression algorithms.
  82. private let enabledCompression: CompressionAlgorithmSet
  83. init(
  84. endpoint: Endpoint,
  85. id: SubchannelID,
  86. connector: any HTTP2Connector,
  87. backoff: ConnectionBackoff,
  88. defaultCompression: CompressionAlgorithm,
  89. enabledCompression: CompressionAlgorithmSet
  90. ) {
  91. assert(!endpoint.addresses.isEmpty, "endpoint.addresses mustn't be empty")
  92. self.state = NIOLockedValueBox(.notConnected(.initial))
  93. self.endpoint = endpoint
  94. self.id = id
  95. self.connector = connector
  96. self.backoff = backoff
  97. self.defaultCompression = defaultCompression
  98. self.enabledCompression = enabledCompression
  99. self.event = AsyncStream.makeStream(of: Event.self)
  100. self.input = AsyncStream.makeStream(of: Input.self)
  101. // Subchannel always starts in the idle state.
  102. self.event.continuation.yield(.connectivityStateChanged(.idle))
  103. }
  104. }
  105. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  106. extension Subchannel {
  107. /// A stream of events which can happen to the subchannel.
  108. var events: AsyncStream<Event> {
  109. self.event.stream
  110. }
  111. /// Run the subchannel.
  112. ///
  113. /// Running the subchannel will attempt to maintain a connection to a remote endpoint. At times
  114. /// the connection may be idle but it will reconnect on-demand when a stream is requested. If
  115. /// connect attempts fail then the subchannel may progressively spend longer in a transient
  116. /// failure state.
  117. ///
  118. /// Events and state changes can be observed via the ``events`` stream.
  119. func run() async {
  120. await withDiscardingTaskGroup { group in
  121. for await input in self.input.stream {
  122. switch input {
  123. case .connect:
  124. self.handleConnectInput(in: &group)
  125. case .backedOff:
  126. self.handleBackedOffInput(in: &group)
  127. case .shutDown:
  128. self.handleShutDownInput(in: &group)
  129. case .handleConnectionEvent(let event):
  130. self.handleConnectionEvent(event, in: &group)
  131. }
  132. }
  133. }
  134. // Once the task group is done, the event stream must also be finished. In normal operation
  135. // this is handled via other paths. For cancellation it must be finished explicitly.
  136. if Task.isCancelled {
  137. self.event.continuation.finish()
  138. }
  139. }
  140. /// Initiate a connection attempt, if possible.
  141. func connect() {
  142. self.input.continuation.yield(.connect)
  143. }
  144. /// Initiates graceful shutdown, if possible.
  145. func shutDown() {
  146. self.input.continuation.yield(.shutDown)
  147. }
  148. /// Make a stream using the subchannel if it's ready.
  149. ///
  150. /// - Parameter descriptor: A descriptor of the method to create a stream for.
  151. /// - Returns: The open stream.
  152. func makeStream(
  153. descriptor: MethodDescriptor,
  154. options: CallOptions
  155. ) async throws -> Connection.Stream {
  156. let connection: Connection? = self.state.withLockedValue { state in
  157. switch state {
  158. case .notConnected, .connecting, .goingAway, .shuttingDown, .shutDown:
  159. return nil
  160. case .connected(let connected):
  161. return connected.connection
  162. }
  163. }
  164. guard let connection = connection else {
  165. throw RPCError(code: .unavailable, message: "subchannel isn't ready")
  166. }
  167. return try await connection.makeStream(descriptor: descriptor, options: options)
  168. }
  169. }
  170. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  171. extension Subchannel {
  172. private func handleConnectInput(in group: inout DiscardingTaskGroup) {
  173. let connection = self.state.withLockedValue { state in
  174. state.makeConnection(
  175. to: self.endpoint.addresses,
  176. using: self.connector,
  177. backoff: self.backoff,
  178. defaultCompression: self.defaultCompression,
  179. enabledCompression: self.enabledCompression
  180. )
  181. }
  182. guard let connection = connection else {
  183. // Not in a state to start a connection.
  184. return
  185. }
  186. // About to start connecting a new connection; emit a state change event.
  187. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  188. self.runConnection(connection, in: &group)
  189. }
  190. private func handleBackedOffInput(in group: inout DiscardingTaskGroup) {
  191. switch self.state.withLockedValue({ $0.backedOff() }) {
  192. case .none:
  193. ()
  194. case .finish:
  195. self.event.continuation.finish()
  196. self.input.continuation.finish()
  197. case .connect(let connection):
  198. // About to start connecting, emit a state change event.
  199. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  200. self.runConnection(connection, in: &group)
  201. }
  202. }
  203. private func handleShutDownInput(in group: inout DiscardingTaskGroup) {
  204. switch self.state.withLockedValue({ $0.shutDown() }) {
  205. case .none:
  206. ()
  207. case .emitShutdown:
  208. // Connection closed because the load balancer asked it to, so notify the load balancer.
  209. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  210. case .emitShutdownAndClose(let connection):
  211. // Connection closed because the load balancer asked it to, so notify the load balancer.
  212. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  213. connection.close()
  214. case .emitShutdownAndFinish:
  215. // Connection closed because the load balancer asked it to, so notify the load balancer.
  216. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  217. // At this point there are no more events: close the event streams.
  218. self.event.continuation.finish()
  219. self.input.continuation.finish()
  220. }
  221. }
  222. private func handleConnectionEvent(
  223. _ event: Connection.Event,
  224. in group: inout DiscardingTaskGroup
  225. ) {
  226. switch event {
  227. case .connectSucceeded:
  228. self.handleConnectSucceededEvent()
  229. case .connectFailed:
  230. self.handleConnectFailedEvent(in: &group)
  231. case .goingAway:
  232. self.handleGoingAwayEvent()
  233. case .closed(let reason):
  234. self.handleConnectionClosedEvent(reason, in: &group)
  235. }
  236. }
  237. private func handleConnectSucceededEvent() {
  238. switch self.state.withLockedValue({ $0.connectSucceeded() }) {
  239. case .updateStateToReady:
  240. // Emit a connectivity state change: the load balancer can now use this subchannel.
  241. self.event.continuation.yield(.connectivityStateChanged(.ready))
  242. case .finishAndClose(let connection):
  243. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  244. self.event.continuation.finish()
  245. self.input.continuation.finish()
  246. connection.close()
  247. case .none:
  248. ()
  249. }
  250. }
  251. private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
  252. let onConnectFailed = self.state.withLockedValue { $0.connectFailed(connector: self.connector) }
  253. switch onConnectFailed {
  254. case .connect(let connection):
  255. // Try the next address.
  256. self.runConnection(connection, in: &group)
  257. case .backoff(let duration):
  258. // All addresses have been tried, backoff for some time.
  259. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  260. group.addTask {
  261. do {
  262. try await Task.sleep(for: duration)
  263. self.input.continuation.yield(.backedOff)
  264. } catch {
  265. // Can only be a cancellation error, swallow it. No further connection attempts will be
  266. // made.
  267. ()
  268. }
  269. }
  270. case .finish:
  271. self.event.continuation.finish()
  272. self.input.continuation.finish()
  273. case .none:
  274. ()
  275. }
  276. }
  277. private func handleGoingAwayEvent() {
  278. let isGoingAway = self.state.withLockedValue { $0.goingAway() }
  279. guard isGoingAway else { return }
  280. // Notify the load balancer that the subchannel is going away to stop it from being used.
  281. self.event.continuation.yield(.goingAway)
  282. // A GOAWAY also means that the load balancer should re-resolve as the available servers
  283. // may have changed.
  284. self.event.continuation.yield(.requiresNameResolution)
  285. }
  286. private func handleConnectionClosedEvent(
  287. _ reason: Connection.CloseReason,
  288. in group: inout DiscardingTaskGroup
  289. ) {
  290. switch self.state.withLockedValue({ $0.closed(reason: reason) }) {
  291. case .nothing:
  292. ()
  293. case .emitIdle:
  294. self.event.continuation.yield(.connectivityStateChanged(.idle))
  295. case .emitTransientFailureAndReconnect:
  296. // Unclean closes trigger a transient failure state change and a name resolution.
  297. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  298. self.event.continuation.yield(.requiresNameResolution)
  299. // Attempt to reconnect.
  300. self.handleConnectInput(in: &group)
  301. case .finish(let emitShutdown):
  302. if emitShutdown {
  303. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  304. }
  305. // At this point there are no more events: close the event streams.
  306. self.event.continuation.finish()
  307. self.input.continuation.finish()
  308. }
  309. }
  310. private func runConnection(_ connection: Connection, in group: inout DiscardingTaskGroup) {
  311. group.addTask {
  312. await connection.run()
  313. }
  314. group.addTask {
  315. for await event in connection.events {
  316. self.input.continuation.yield(.handleConnectionEvent(event))
  317. }
  318. }
  319. }
  320. }
  321. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  322. extension Subchannel {
  323. /// ┌───────────────┐
  324. /// ┌───────▶│ NOT CONNECTED │───────────shutDown─────────────┐
  325. /// │ └───────────────┘ │
  326. /// │ │ │
  327. /// │ connFailed──┤connect │
  328. /// │ /backedOff │ │
  329. /// │ │ ▼ │
  330. /// │ │ ┌───────────────┐ │
  331. /// │ └──│ CONNECTING │──────┐ │
  332. /// │ └───────────────┘ │ │
  333. /// │ │ │ │
  334. /// closed connSucceeded │ │
  335. /// │ │ │ │
  336. /// │ ▼ │ │
  337. /// │ ┌───────────────┐ │ ┌───────────────┐ │
  338. /// │ │ CONNECTED │──shutDown──▶│ SHUTTING DOWN │ │
  339. /// │ └───────────────┘ │ └───────────────┘ │
  340. /// │ │ │ │ │
  341. /// │ goAway │ closed │
  342. /// │ │ │ │ │
  343. /// │ ▼ │ ▼ │
  344. /// │ ┌───────────────┐ │ ┌───────────────┐ │
  345. /// └────────│ GOING AWAY │──────┘ │ SHUT DOWN │◀─┘
  346. /// └───────────────┘ └───────────────┘
  347. private enum State {
  348. /// Not connected and not actively connecting.
  349. case notConnected(NotConnected)
  350. /// A connection attempt is in-progress.
  351. case connecting(Connecting)
  352. /// A connection has been established.
  353. case connected(Connected)
  354. /// The subchannel is going away. It may return to the 'notConnected' state when the underlying
  355. /// connection has closed.
  356. case goingAway(GoingAway)
  357. /// The subchannel is shutting down, it will enter the 'shutDown' state when closed, it may not
  358. /// enter any other state.
  359. case shuttingDown(ShuttingDown)
  360. /// The subchannel is shutdown, this is a terminal state.
  361. case shutDown(ShutDown)
  362. struct NotConnected {
  363. private init() {}
  364. static let initial = NotConnected()
  365. init(from state: Connected) {}
  366. init(from state: GoingAway) {}
  367. }
  368. struct Connecting {
  369. var connection: Connection
  370. let addresses: [SocketAddress]
  371. var addressIterator: Array<SocketAddress>.Iterator
  372. var backoff: ConnectionBackoff.Iterator
  373. }
  374. struct Connected {
  375. var connection: Connection
  376. init(from state: Connecting) {
  377. self.connection = state.connection
  378. }
  379. }
  380. struct GoingAway {
  381. var connection: Connection
  382. init(from state: Connecting) {
  383. self.connection = state.connection
  384. }
  385. init(from state: Connected) {
  386. self.connection = state.connection
  387. }
  388. }
  389. struct ShuttingDown {
  390. var connection: Connection
  391. init(from state: Connecting) {
  392. self.connection = state.connection
  393. }
  394. init(from state: Connected) {
  395. self.connection = state.connection
  396. }
  397. init(from state: GoingAway) {
  398. self.connection = state.connection
  399. }
  400. }
  401. struct ShutDown {
  402. init(from state: ShuttingDown) {}
  403. init(from state: GoingAway) {}
  404. init(from state: NotConnected) {}
  405. }
  406. mutating func makeConnection(
  407. to addresses: [SocketAddress],
  408. using connector: any HTTP2Connector,
  409. backoff: ConnectionBackoff,
  410. defaultCompression: CompressionAlgorithm,
  411. enabledCompression: CompressionAlgorithmSet
  412. ) -> Connection? {
  413. switch self {
  414. case .notConnected:
  415. var iterator = addresses.makeIterator()
  416. let address = iterator.next()! // addresses must not be empty.
  417. let connection = Connection(
  418. address: address,
  419. http2Connector: connector,
  420. defaultCompression: defaultCompression,
  421. enabledCompression: enabledCompression
  422. )
  423. let connecting = State.Connecting(
  424. connection: connection,
  425. addresses: addresses,
  426. addressIterator: iterator,
  427. backoff: backoff.makeIterator()
  428. )
  429. self = .connecting(connecting)
  430. return connection
  431. case .connecting, .connected, .goingAway, .shuttingDown, .shutDown:
  432. return nil
  433. }
  434. }
  435. enum OnClose {
  436. case none
  437. case emitShutdownAndFinish
  438. case emitShutdownAndClose(Connection)
  439. case emitShutdown
  440. }
  441. mutating func shutDown() -> OnClose {
  442. let onShutDown: OnClose
  443. switch self {
  444. case .notConnected(let state):
  445. self = .shutDown(ShutDown(from: state))
  446. onShutDown = .emitShutdownAndFinish
  447. case .connecting(let state):
  448. // Only emit the shutdown; there's no connection to close yet.
  449. self = .shuttingDown(ShuttingDown(from: state))
  450. onShutDown = .emitShutdown
  451. case .connected(let state):
  452. self = .shuttingDown(ShuttingDown(from: state))
  453. onShutDown = .emitShutdownAndClose(state.connection)
  454. case .goingAway(let state):
  455. self = .shuttingDown(ShuttingDown(from: state))
  456. onShutDown = .emitShutdown
  457. case .shuttingDown, .shutDown:
  458. onShutDown = .none
  459. }
  460. return onShutDown
  461. }
  462. enum OnConnectSucceeded {
  463. case updateStateToReady
  464. case finishAndClose(Connection)
  465. case none
  466. }
  467. mutating func connectSucceeded() -> OnConnectSucceeded {
  468. switch self {
  469. case .connecting(let state):
  470. self = .connected(Connected(from: state))
  471. return .updateStateToReady
  472. case .shuttingDown(let state):
  473. self = .shutDown(ShutDown(from: state))
  474. return .finishAndClose(state.connection)
  475. case .notConnected, .connected, .goingAway, .shutDown:
  476. return .none
  477. }
  478. }
  479. enum OnConnectFailed {
  480. case none
  481. case finish
  482. case connect(Connection)
  483. case backoff(Duration)
  484. }
  485. mutating func connectFailed(connector: any HTTP2Connector) -> OnConnectFailed {
  486. let onConnectFailed: OnConnectFailed
  487. switch self {
  488. case .connecting(var state):
  489. if let address = state.addressIterator.next() {
  490. state.connection = Connection(
  491. address: address,
  492. http2Connector: connector,
  493. defaultCompression: .none,
  494. enabledCompression: .all
  495. )
  496. self = .connecting(state)
  497. onConnectFailed = .connect(state.connection)
  498. } else {
  499. state.addressIterator = state.addresses.makeIterator()
  500. let address = state.addressIterator.next()!
  501. state.connection = Connection(
  502. address: address,
  503. http2Connector: connector,
  504. defaultCompression: .none,
  505. enabledCompression: .all
  506. )
  507. let backoff = state.backoff.next()
  508. self = .connecting(state)
  509. onConnectFailed = .backoff(backoff)
  510. }
  511. case .shuttingDown(let state):
  512. self = .shutDown(ShutDown(from: state))
  513. onConnectFailed = .finish
  514. case .notConnected, .connected, .goingAway, .shutDown:
  515. onConnectFailed = .none
  516. }
  517. return onConnectFailed
  518. }
  519. enum OnBackedOff {
  520. case none
  521. case connect(Connection)
  522. case finish
  523. }
  524. mutating func backedOff() -> OnBackedOff {
  525. switch self {
  526. case .connecting(let state):
  527. self = .connecting(state)
  528. return .connect(state.connection)
  529. case .shuttingDown(let state):
  530. self = .shutDown(ShutDown(from: state))
  531. return .finish
  532. case .notConnected, .connected, .goingAway, .shutDown:
  533. return .none
  534. }
  535. }
  536. mutating func goingAway() -> Bool {
  537. switch self {
  538. case .connected(let state):
  539. self = .goingAway(GoingAway(from: state))
  540. return true
  541. case .notConnected, .goingAway, .connecting, .shuttingDown, .shutDown:
  542. return false
  543. }
  544. }
  545. enum OnClosed {
  546. case nothing
  547. case emitIdle
  548. case emitTransientFailureAndReconnect
  549. case finish(emitShutdown: Bool)
  550. }
  551. mutating func closed(reason: Connection.CloseReason) -> OnClosed {
  552. let onClosed: OnClosed
  553. switch self {
  554. case .connected(let state):
  555. switch reason {
  556. case .idleTimeout, .remote, .error(_, wasIdle: true):
  557. self = .notConnected(NotConnected(from: state))
  558. onClosed = .emitIdle
  559. case .keepaliveTimeout, .error(_, wasIdle: false):
  560. self = .notConnected(NotConnected(from: state))
  561. onClosed = .emitTransientFailureAndReconnect
  562. case .initiatedLocally:
  563. // Should be in the 'shuttingDown' state.
  564. assertionFailure("Invalid state")
  565. let shuttingDown = State.ShuttingDown(from: state)
  566. self = .shutDown(ShutDown(from: shuttingDown))
  567. onClosed = .finish(emitShutdown: true)
  568. }
  569. case .goingAway(let state):
  570. self = .notConnected(NotConnected(from: state))
  571. onClosed = .emitIdle
  572. case .shuttingDown(let state):
  573. self = .shutDown(ShutDown(from: state))
  574. return .finish(emitShutdown: false)
  575. case .notConnected, .connecting, .shutDown:
  576. onClosed = .nothing
  577. }
  578. return onClosed
  579. }
  580. }
  581. }