Subchannel.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. /// A ``Subchannel`` provides communication to a single ``Endpoint``.
  19. ///
  20. /// Each ``Subchannel`` starts in an 'idle' state where it isn't attempting to connect to an
  21. /// endpoint. You can tell it to start connecting by calling ``connect()`` and you can listen
  22. /// to connectivity state changes by consuming the ``events`` sequence.
  23. ///
  24. /// You must call ``close()`` on the ``Subchannel`` when it's no longer required. This will move
  25. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  26. /// calls to ``makeStream(descriptor:options:)`` will fail.
  27. ///
  28. /// To use the ``Subchannel`` you must run it in a task:
  29. ///
  30. /// ```swift
  31. /// await withTaskGroup(of: Void.self) { group in
  32. /// group.addTask { await subchannel.run() }
  33. ///
  34. /// for await event in subchannel.events {
  35. /// switch event {
  36. /// case .connectivityStateChanged(.ready):
  37. /// // ...
  38. /// default:
  39. /// // ...
  40. /// }
  41. /// }
  42. /// }
  43. /// ```
  44. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  45. struct Subchannel {
  46. enum Event: Sendable, Hashable {
  47. /// The connection received a GOAWAY and will close soon. No new streams
  48. /// should be opened on this connection.
  49. case goingAway
  50. /// The connectivity state of the subchannel changed.
  51. case connectivityStateChanged(ConnectivityState)
  52. /// The subchannel requests that the load balancer re-resolves names.
  53. case requiresNameResolution
  54. }
  55. private enum Input: Sendable {
  56. /// Request that the connection starts connecting.
  57. case connect
  58. /// A backoff period has ended.
  59. case backedOff
  60. /// Close the connection, if possible.
  61. case close
  62. /// Handle the event from the underlying connection object.
  63. case handleConnectionEvent(Connection.Event)
  64. }
  65. /// Events which can happen to the subchannel.
  66. private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
  67. /// Inputs which this subchannel should react to.
  68. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  69. /// The state of the subchannel.
  70. private let state: NIOLockedValueBox<State>
  71. /// The endpoint this subchannel is targeting.
  72. let endpoint: Endpoint
  73. /// The ID of the subchannel.
  74. let id: SubchannelID
  75. /// A factory for connections.
  76. private let connector: any HTTP2Connector
  77. /// The connection backoff configuration used by the subchannel when establishing a connection.
  78. private let backoff: ConnectionBackoff
  79. /// The default compression algorithm used for requests.
  80. private let defaultCompression: CompressionAlgorithm
  81. /// The set of enabled compression algorithms.
  82. private let enabledCompression: CompressionAlgorithmSet
  83. init(
  84. endpoint: Endpoint,
  85. id: SubchannelID,
  86. connector: any HTTP2Connector,
  87. backoff: ConnectionBackoff,
  88. defaultCompression: CompressionAlgorithm,
  89. enabledCompression: CompressionAlgorithmSet
  90. ) {
  91. assert(!endpoint.addresses.isEmpty, "endpoint.addresses mustn't be empty")
  92. self.state = NIOLockedValueBox(.notConnected)
  93. self.endpoint = endpoint
  94. self.id = id
  95. self.connector = connector
  96. self.backoff = backoff
  97. self.defaultCompression = defaultCompression
  98. self.enabledCompression = enabledCompression
  99. self.event = AsyncStream.makeStream(of: Event.self)
  100. self.input = AsyncStream.makeStream(of: Input.self)
  101. // Subchannel always starts in the idle state.
  102. self.event.continuation.yield(.connectivityStateChanged(.idle))
  103. }
  104. }
  105. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  106. extension Subchannel {
  107. /// A stream of events which can happen to the subchannel.
  108. var events: AsyncStream<Event> {
  109. self.event.stream
  110. }
  111. /// Run the subchannel.
  112. ///
  113. /// Running the subchannel will attempt to maintain a connection to a remote endpoint. At times
  114. /// the connection may be idle but it will reconnect on-demand when a stream is requested. If
  115. /// connect attempts fail then the subchannel may progressively spend longer in a transient
  116. /// failure state.
  117. ///
  118. /// Events and state changes can be observed via the ``events`` stream.
  119. func run() async {
  120. await withDiscardingTaskGroup { group in
  121. for await input in self.input.stream {
  122. switch input {
  123. case .connect:
  124. self.handleConnectInput(in: &group)
  125. case .backedOff:
  126. self.handleBackedOffInput(in: &group)
  127. case .close:
  128. self.handleCloseInput(in: &group)
  129. case .handleConnectionEvent(let event):
  130. self.handleConnectionEvent(event, in: &group)
  131. }
  132. }
  133. }
  134. // Once the task group is done, the event stream must also be finished. In normal operation
  135. // this is handled via other paths. For cancellation it must be finished explicitly.
  136. if Task.isCancelled {
  137. self.event.continuation.finish()
  138. }
  139. }
  140. /// Initiate a connection attempt, if possible.
  141. func connect() {
  142. self.input.continuation.yield(.connect)
  143. }
  144. /// Initiates graceful shutdown, if possible.
  145. func close() {
  146. self.input.continuation.yield(.close)
  147. }
  148. /// Make a stream using the subchannel if it's ready.
  149. ///
  150. /// - Parameter descriptor: A descriptor of the method to create a stream for.
  151. /// - Returns: The open stream.
  152. func makeStream(
  153. descriptor: MethodDescriptor,
  154. options: CallOptions
  155. ) async throws -> Connection.Stream {
  156. let connection: Connection? = self.state.withLockedValue { state in
  157. switch state {
  158. case .notConnected, .connecting, .closing, .closed:
  159. return nil
  160. case .connected(let connected):
  161. return connected.connection
  162. }
  163. }
  164. guard let connection = connection else {
  165. throw RPCError(code: .unavailable, message: "subchannel isn't ready")
  166. }
  167. return try await connection.makeStream(descriptor: descriptor, options: options)
  168. }
  169. }
  170. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  171. extension Subchannel {
  172. private func handleConnectInput(in group: inout DiscardingTaskGroup) {
  173. let connection = self.state.withLockedValue { state in
  174. state.makeConnection(
  175. to: self.endpoint.addresses,
  176. using: self.connector,
  177. backoff: self.backoff,
  178. defaultCompression: self.defaultCompression,
  179. enabledCompression: self.enabledCompression
  180. )
  181. }
  182. guard let connection = connection else {
  183. // Not in a state to start a connection.
  184. return
  185. }
  186. // About to start connecting a new connection; emit a state change event.
  187. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  188. self.runConnection(connection, in: &group)
  189. }
  190. private func handleBackedOffInput(in group: inout DiscardingTaskGroup) {
  191. switch self.state.withLockedValue({ $0.backedOff() }) {
  192. case .none:
  193. ()
  194. case .close(let connection):
  195. connection.close()
  196. case .connect(let connection):
  197. // About to start connecting, emit a state change event.
  198. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  199. self.runConnection(connection, in: &group)
  200. }
  201. }
  202. private func handleCloseInput(in group: inout DiscardingTaskGroup) {
  203. switch self.state.withLockedValue({ $0.close() }) {
  204. case .none:
  205. ()
  206. case .emitShutdownAndClose(let connection):
  207. // Connection closed because the load balancer asked it to, so notify the load balancer.
  208. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  209. connection.close()
  210. case .emitShutdownAndFinish:
  211. // Connection closed because the load balancer asked it to, so notify the load balancer.
  212. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  213. // At this point there are no more events: close the event streams.
  214. self.event.continuation.finish()
  215. self.input.continuation.finish()
  216. }
  217. }
  218. private func handleConnectionEvent(
  219. _ event: Connection.Event,
  220. in group: inout DiscardingTaskGroup
  221. ) {
  222. switch event {
  223. case .connectSucceeded:
  224. self.handleConnectSucceededEvent()
  225. case .connectFailed:
  226. self.handleConnectFailedEvent(in: &group)
  227. case .goingAway:
  228. self.handleGoingAwayEvent()
  229. case .closed(let reason):
  230. self.handleConnectionClosedEvent(reason, in: &group)
  231. }
  232. }
  233. private func handleConnectSucceededEvent() {
  234. switch self.state.withLockedValue({ $0.connectSucceeded() }) {
  235. case .updateStateToReady:
  236. // Emit a connectivity state change: the load balancer can now use this subchannel.
  237. self.event.continuation.yield(.connectivityStateChanged(.ready))
  238. case .closeAndEmitShutdown(let connection):
  239. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  240. connection.close()
  241. case .none:
  242. ()
  243. }
  244. }
  245. private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
  246. let onConnectFailed = self.state.withLockedValue { $0.connectFailed(connector: self.connector) }
  247. switch onConnectFailed {
  248. case .connect(let connection):
  249. // Try the next address.
  250. self.runConnection(connection, in: &group)
  251. case .backoff(let duration):
  252. // All addresses have been tried, backoff for some time.
  253. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  254. group.addTask {
  255. do {
  256. try await Task.sleep(for: duration)
  257. self.input.continuation.yield(.backedOff)
  258. } catch {
  259. // Can only be a cancellation error, swallow it. No further connection attempts will be
  260. // made.
  261. ()
  262. }
  263. }
  264. case .closeAndEmitShutdownEvent(let connection):
  265. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  266. connection.close()
  267. case .none:
  268. ()
  269. }
  270. }
  271. private func handleGoingAwayEvent() {
  272. let isGoingAway = self.state.withLockedValue { $0.goingAway() }
  273. guard isGoingAway else { return }
  274. // Notify the load balancer that the subchannel is going away to stop it from being used.
  275. self.event.continuation.yield(.goingAway)
  276. // A GOAWAY also means that the load balancer should re-resolve as the available servers
  277. // may have changed.
  278. self.event.continuation.yield(.requiresNameResolution)
  279. }
  280. private func handleConnectionClosedEvent(
  281. _ reason: Connection.CloseReason,
  282. in group: inout DiscardingTaskGroup
  283. ) {
  284. switch self.state.withLockedValue({ $0.closed(reason: reason) }) {
  285. case .nothing:
  286. ()
  287. case .emitIdle:
  288. self.event.continuation.yield(.connectivityStateChanged(.idle))
  289. case .emitTransientFailureAndReconnect:
  290. // Unclean closes trigger a transient failure state change and a name resolution.
  291. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  292. self.event.continuation.yield(.requiresNameResolution)
  293. // Attempt to reconnect.
  294. self.handleConnectInput(in: &group)
  295. case .finish(let emitShutdown):
  296. if emitShutdown {
  297. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  298. }
  299. // At this point there are no more events: close the event streams.
  300. self.event.continuation.finish()
  301. self.input.continuation.finish()
  302. }
  303. }
  304. private func runConnection(_ connection: Connection, in group: inout DiscardingTaskGroup) {
  305. group.addTask {
  306. await connection.run()
  307. }
  308. group.addTask {
  309. for await event in connection.events {
  310. self.input.continuation.yield(.handleConnectionEvent(event))
  311. }
  312. }
  313. }
  314. }
  315. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  316. extension Subchannel {
  317. private enum State {
  318. /// Not connected and not actively connecting.
  319. case notConnected
  320. /// A connection attempt is in-progress.
  321. case connecting(Connecting)
  322. /// A connection has been established.
  323. case connected(Connected)
  324. /// The subchannel is closing.
  325. case closing(Closing)
  326. /// The subchannel is closed.
  327. case closed
  328. struct Connecting {
  329. var connection: Connection
  330. let addresses: [SocketAddress]
  331. var addressIterator: Array<SocketAddress>.Iterator
  332. var backoff: ConnectionBackoff.Iterator
  333. var shutdownRequested: Bool = false
  334. }
  335. struct Connected {
  336. var connection: Connection
  337. init(from state: Connecting) {
  338. self.connection = state.connection
  339. }
  340. }
  341. struct Closing {
  342. var connection: Connection
  343. init(from state: Connecting) {
  344. self.connection = state.connection
  345. }
  346. init(from state: Connected) {
  347. self.connection = state.connection
  348. }
  349. }
  350. mutating func makeConnection(
  351. to addresses: [SocketAddress],
  352. using connector: any HTTP2Connector,
  353. backoff: ConnectionBackoff,
  354. defaultCompression: CompressionAlgorithm,
  355. enabledCompression: CompressionAlgorithmSet
  356. ) -> Connection? {
  357. switch self {
  358. case .notConnected:
  359. var iterator = addresses.makeIterator()
  360. let address = iterator.next()! // addresses must not be empty.
  361. let connection = Connection(
  362. address: address,
  363. http2Connector: connector,
  364. defaultCompression: defaultCompression,
  365. enabledCompression: enabledCompression
  366. )
  367. let connecting = State.Connecting(
  368. connection: connection,
  369. addresses: addresses,
  370. addressIterator: iterator,
  371. backoff: backoff.makeIterator()
  372. )
  373. self = .connecting(connecting)
  374. return connection
  375. case .connecting, .connected, .closing, .closed:
  376. return nil
  377. }
  378. }
  379. enum OnClose {
  380. case none
  381. case emitShutdownAndFinish
  382. case emitShutdownAndClose(Connection)
  383. }
  384. mutating func close() -> OnClose {
  385. let onClose: OnClose
  386. switch self {
  387. case .notConnected:
  388. onClose = .emitShutdownAndFinish
  389. case .connecting(var state):
  390. state.shutdownRequested = true
  391. self = .connecting(state)
  392. // Do nothing; the connection hasn't been established yet so can't be closed.
  393. onClose = .none
  394. case .connected(let state):
  395. self = .closing(Closing(from: state))
  396. onClose = .emitShutdownAndClose(state.connection)
  397. case .closing, .closed:
  398. onClose = .none
  399. }
  400. return onClose
  401. }
  402. enum OnConnectSucceeded {
  403. case updateStateToReady
  404. case closeAndEmitShutdown(Connection)
  405. case none
  406. }
  407. mutating func connectSucceeded() -> OnConnectSucceeded {
  408. switch self {
  409. case .connecting(let state):
  410. if state.shutdownRequested {
  411. self = .closing(Closing(from: state))
  412. return .closeAndEmitShutdown(state.connection)
  413. } else {
  414. self = .connected(Connected(from: state))
  415. return .updateStateToReady
  416. }
  417. case .closing(let state):
  418. // Shouldn't happen via the connecting state.
  419. assertionFailure("Invalid state")
  420. return .closeAndEmitShutdown(state.connection)
  421. case .notConnected, .connected, .closed:
  422. return .none
  423. }
  424. }
  425. enum OnConnectFailed {
  426. case none
  427. case connect(Connection)
  428. case closeAndEmitShutdownEvent(Connection)
  429. case backoff(Duration)
  430. }
  431. mutating func connectFailed(connector: any HTTP2Connector) -> OnConnectFailed {
  432. let onConnectFailed: OnConnectFailed
  433. switch self {
  434. case .connecting(var state):
  435. if state.shutdownRequested {
  436. // Subchannel has been asked to shutdown, do so now.
  437. self = .closing(Closing(from: state))
  438. onConnectFailed = .closeAndEmitShutdownEvent(state.connection)
  439. } else if let address = state.addressIterator.next() {
  440. state.connection = Connection(
  441. address: address,
  442. http2Connector: connector,
  443. defaultCompression: .none,
  444. enabledCompression: .all
  445. )
  446. self = .connecting(state)
  447. onConnectFailed = .connect(state.connection)
  448. } else {
  449. state.addressIterator = state.addresses.makeIterator()
  450. let address = state.addressIterator.next()!
  451. state.connection = Connection(
  452. address: address,
  453. http2Connector: connector,
  454. defaultCompression: .none,
  455. enabledCompression: .all
  456. )
  457. let backoff = state.backoff.next()
  458. self = .connecting(state)
  459. onConnectFailed = .backoff(backoff)
  460. }
  461. case .closing:
  462. // Should be handled via connection.closeRequested
  463. assertionFailure("Invalid state")
  464. onConnectFailed = .none
  465. case .notConnected, .connected, .closed:
  466. onConnectFailed = .none
  467. }
  468. return onConnectFailed
  469. }
  470. enum OnBackedOff {
  471. case none
  472. case connect(Connection)
  473. case close(Connection)
  474. }
  475. mutating func backedOff() -> OnBackedOff {
  476. switch self {
  477. case .connecting(let state):
  478. if state.shutdownRequested {
  479. self = .closing(Closing(from: state))
  480. return .close(state.connection)
  481. } else {
  482. self = .connecting(state)
  483. return .connect(state.connection)
  484. }
  485. case .closing:
  486. // Shouldn't happen via the connecting state.
  487. assertionFailure("Invalid state")
  488. return .none
  489. case .notConnected, .connected, .closed:
  490. return .none
  491. }
  492. }
  493. mutating func goingAway() -> Bool {
  494. switch self {
  495. case .connected(let state):
  496. self = .closing(Closing(from: state))
  497. return true
  498. case .notConnected, .closing, .connecting, .closed:
  499. return false
  500. }
  501. }
  502. enum OnClosed {
  503. case nothing
  504. case emitIdle
  505. case emitTransientFailureAndReconnect
  506. case finish(emitShutdown: Bool)
  507. }
  508. mutating func closed(reason: Connection.CloseReason) -> OnClosed {
  509. let onClosed: OnClosed
  510. switch self {
  511. case .connected:
  512. switch reason {
  513. case .idleTimeout, .remote, .error(_, wasIdle: true):
  514. self = .notConnected
  515. onClosed = .emitIdle
  516. case .keepaliveTimeout, .error(_, wasIdle: false):
  517. self = .notConnected
  518. onClosed = .emitTransientFailureAndReconnect
  519. case .initiatedLocally:
  520. self = .closed
  521. onClosed = .finish(emitShutdown: true)
  522. }
  523. case .closing:
  524. switch reason {
  525. case .idleTimeout, .remote, .error(_, wasIdle: true):
  526. self = .notConnected
  527. onClosed = .emitIdle
  528. case .keepaliveTimeout, .error(_, wasIdle: false):
  529. self = .notConnected
  530. onClosed = .emitTransientFailureAndReconnect
  531. case .initiatedLocally:
  532. self = .closed
  533. onClosed = .finish(emitShutdown: false)
  534. }
  535. case .notConnected, .connecting, .closed:
  536. onClosed = .nothing
  537. }
  538. return onClosed
  539. }
  540. }
  541. }