Subchannel.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. /// A ``Subchannel`` provides communication to a single ``Endpoint``.
  19. ///
  20. /// Each ``Subchannel`` starts in an 'idle' state where it isn't attempting to connect to an
  21. /// endpoint. You can tell it to start connecting by calling ``connect()`` and you can listen
  22. /// to connectivity state changes by consuming the ``events`` sequence.
  23. ///
  24. /// You must call ``close()`` on the ``Subchannel`` when it's no longer required. This will move
  25. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  26. /// calls to ``makeStream(descriptor:options:)`` will fail.
  27. ///
  28. /// To use the ``Subchannel`` you must run it in a task:
  29. ///
  30. /// ```swift
  31. /// await withTaskGroup(of: Void.self) { group in
  32. /// group.addTask { await subchannel.run() }
  33. ///
  34. /// for await event in subchannel.events {
  35. /// switch event {
  36. /// case .connectivityStateChanged(.ready):
  37. /// // ...
  38. /// default:
  39. /// // ...
  40. /// }
  41. /// }
  42. /// }
  43. /// ```
  44. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  45. struct Subchannel {
  46. enum Event: Sendable, Hashable {
  47. /// The connection received a GOAWAY and will close soon. No new streams
  48. /// should be opened on this connection.
  49. case goingAway
  50. /// The connectivity state of the subchannel changed.
  51. case connectivityStateChanged(ConnectivityState)
  52. /// The subchannel requests that the load balancer re-resolves names.
  53. case requiresNameResolution
  54. }
  55. private enum Input: Sendable {
  56. /// Request that the connection starts connecting.
  57. case connect
  58. /// A backoff period has ended.
  59. case backedOff
  60. /// Close the connection, if possible.
  61. case close
  62. /// Handle the event from the underlying connection object.
  63. case handleConnectionEvent(Connection.Event)
  64. }
  65. /// Events which can happen to the subchannel.
  66. private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
  67. /// Inputs which this subchannel should react to.
  68. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  69. /// The state of the subchannel.
  70. private let state: NIOLockedValueBox<State>
  71. /// The endpoint this subchannel is targeting.
  72. let endpoint: Endpoint
  73. /// The ID of the subchannel.
  74. let id: SubchannelID
  75. /// A factory for connections.
  76. private let connector: any HTTP2Connector
  77. /// The connection backoff configuration used by the subchannel when establishing a connection.
  78. private let backoff: ConnectionBackoff
  79. /// The default compression algorithm used for requests.
  80. private let defaultCompression: CompressionAlgorithm
  81. /// The set of enabled compression algorithms.
  82. private let enabledCompression: CompressionAlgorithmSet
  83. init(
  84. endpoint: Endpoint,
  85. id: SubchannelID,
  86. connector: any HTTP2Connector,
  87. backoff: ConnectionBackoff,
  88. defaultCompression: CompressionAlgorithm,
  89. enabledCompression: CompressionAlgorithmSet
  90. ) {
  91. assert(!endpoint.addresses.isEmpty, "endpoint.addresses mustn't be empty")
  92. self.state = NIOLockedValueBox(.notConnected)
  93. self.endpoint = endpoint
  94. self.id = id
  95. self.connector = connector
  96. self.backoff = backoff
  97. self.defaultCompression = defaultCompression
  98. self.enabledCompression = enabledCompression
  99. self.event = AsyncStream.makeStream(of: Event.self)
  100. self.input = AsyncStream.makeStream(of: Input.self)
  101. // Subchannel always starts in the idle state.
  102. self.event.continuation.yield(.connectivityStateChanged(.idle))
  103. }
  104. }
  105. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  106. extension Subchannel {
  107. /// A stream of events which can happen to the subchannel.
  108. var events: AsyncStream<Event> {
  109. self.event.stream
  110. }
  111. /// Run the subchannel.
  112. ///
  113. /// Running the subchannel will attempt to maintain a connection to a remote endpoint. At times
  114. /// the connection may be idle but it will reconnect on-demand when a stream is requested. If
  115. /// connect attempts fail then the subchannel may progressively spend longer in a transient
  116. /// failure state.
  117. ///
  118. /// Events and state changes can be observed via the ``events`` stream.
  119. func run() async {
  120. await withDiscardingTaskGroup { group in
  121. for await input in self.input.stream {
  122. switch input {
  123. case .connect:
  124. self.handleConnectInput(in: &group)
  125. case .backedOff:
  126. self.handleBackedOffInput(in: &group)
  127. case .close:
  128. self.handleCloseInput(in: &group)
  129. case .handleConnectionEvent(let event):
  130. self.handleConnectionEvent(event, in: &group)
  131. }
  132. }
  133. }
  134. // Once the task group is done, the event stream must also be finished. In normal operation
  135. // this is handled via other paths. For cancellation it must be finished explicitly.
  136. if Task.isCancelled {
  137. self.event.continuation.finish()
  138. }
  139. }
  140. /// Initiate a connection attempt, if possible.
  141. func connect() {
  142. self.input.continuation.yield(.connect)
  143. }
  144. /// Initiates graceful shutdown, if possible.
  145. func close() {
  146. self.input.continuation.yield(.close)
  147. }
  148. /// Make a stream using the subchannel if it's ready.
  149. ///
  150. /// - Parameter descriptor: A descriptor of the method to create a stream for.
  151. /// - Returns: The open stream.
  152. func makeStream(
  153. descriptor: MethodDescriptor,
  154. options: CallOptions
  155. ) async throws -> Connection.Stream {
  156. let connection: Connection? = self.state.withLockedValue { state in
  157. switch state {
  158. case .notConnected, .connecting, .closing, .closed:
  159. return nil
  160. case .connected(let connected):
  161. return connected.connection
  162. }
  163. }
  164. guard let connection = connection else {
  165. throw RPCError(code: .unavailable, message: "subchannel isn't ready")
  166. }
  167. return try await connection.makeStream(descriptor: descriptor, options: options)
  168. }
  169. }
  170. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  171. extension Subchannel {
  172. private func handleConnectInput(in group: inout DiscardingTaskGroup) {
  173. let connection = self.state.withLockedValue { state in
  174. state.makeConnection(
  175. to: self.endpoint.addresses,
  176. using: self.connector,
  177. backoff: self.backoff,
  178. defaultCompression: self.defaultCompression,
  179. enabledCompression: self.enabledCompression
  180. )
  181. }
  182. guard let connection = connection else {
  183. // Not in a state to start a connection.
  184. return
  185. }
  186. // About to start connecting a new connection; emit a state change event.
  187. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  188. self.runConnection(connection, in: &group)
  189. }
  190. private func handleBackedOffInput(in group: inout DiscardingTaskGroup) {
  191. switch self.state.withLockedValue({ $0.backedOff() }) {
  192. case .none:
  193. ()
  194. case .connect(let connection):
  195. // About to start connecting, emit a state change event.
  196. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  197. self.runConnection(connection, in: &group)
  198. case .shutdown:
  199. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  200. // Close the event streams.
  201. self.event.continuation.finish()
  202. self.input.continuation.finish()
  203. }
  204. }
  205. private func handleCloseInput(in group: inout DiscardingTaskGroup) {
  206. switch self.state.withLockedValue({ $0.close() }) {
  207. case .none:
  208. ()
  209. case .close(let connection):
  210. connection.close()
  211. case .shutdown:
  212. // Connection closed because the load balancer asked it to, so notify the load balancer.
  213. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  214. // At this point there are no more events: close the event streams.
  215. self.event.continuation.finish()
  216. self.input.continuation.finish()
  217. }
  218. }
  219. private func handleConnectionEvent(
  220. _ event: Connection.Event,
  221. in group: inout DiscardingTaskGroup
  222. ) {
  223. switch event {
  224. case .connectSucceeded:
  225. self.handleConnectSucceededEvent()
  226. case .connectFailed:
  227. self.handleConnectFailedEvent(in: &group)
  228. case .goingAway:
  229. self.handleGoingAwayEvent()
  230. case .closed(let reason):
  231. self.handleConnectionClosedEvent(reason, in: &group)
  232. }
  233. }
  234. private func handleConnectSucceededEvent() {
  235. switch self.state.withLockedValue({ $0.connectSucceeded() }) {
  236. case .updateState:
  237. // Emit a connectivity state change: the load balancer can now use this subchannel.
  238. self.event.continuation.yield(.connectivityStateChanged(.ready))
  239. case .close(let connection):
  240. connection.close()
  241. case .none:
  242. ()
  243. }
  244. }
  245. private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
  246. let onConnectFailed = self.state.withLockedValue { $0.connectFailed(connector: self.connector) }
  247. switch onConnectFailed {
  248. case .connect(let connection):
  249. // Try the next address.
  250. self.runConnection(connection, in: &group)
  251. case .backoff(let duration):
  252. // All addresses have been tried, backoff for some time.
  253. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  254. group.addTask {
  255. do {
  256. try await Task.sleep(for: duration)
  257. self.input.continuation.yield(.backedOff)
  258. } catch {
  259. // Can only be a cancellation error, swallow it. No further connection attempts will be
  260. // made.
  261. ()
  262. }
  263. }
  264. case .shutdown:
  265. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  266. // No more events, close the streams.
  267. self.event.continuation.finish()
  268. self.input.continuation.finish()
  269. case .none:
  270. ()
  271. }
  272. }
  273. private func handleGoingAwayEvent() {
  274. let isGoingAway = self.state.withLockedValue { $0.goingAway() }
  275. guard isGoingAway else { return }
  276. // Notify the load balancer that the subchannel is going away to stop it from being used.
  277. self.event.continuation.yield(.goingAway)
  278. // A GOAWAY also means that the load balancer should re-resolve as the available servers
  279. // may have changed.
  280. self.event.continuation.yield(.requiresNameResolution)
  281. }
  282. private func handleConnectionClosedEvent(
  283. _ reason: Connection.CloseReason,
  284. in group: inout DiscardingTaskGroup
  285. ) {
  286. let isClosed = self.state.withLockedValue { $0.closed(reason: reason) }
  287. guard isClosed else { return }
  288. switch reason {
  289. case .idleTimeout:
  290. // Connection closed due to an idle timeout; notify the load balancer about this.
  291. self.event.continuation.yield(.connectivityStateChanged(.idle))
  292. case .keepaliveTimeout, .error:
  293. // Unclean closes trigger a transient failure state change and a name resolution.
  294. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  295. self.event.continuation.yield(.requiresNameResolution)
  296. // Attempt to reconnect.
  297. self.handleConnectInput(in: &group)
  298. case .initiatedLocally, .remote:
  299. // Connection closed because the load balancer (or remote peer) asked it to, so notify the
  300. // load balancer. In the case of 'remote' (i.e. a GOAWAY), the load balancer will have
  301. // already reacted to a separate 'goingAway' event.
  302. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  303. // At this point there are no more events: close the event streams.
  304. self.event.continuation.finish()
  305. self.input.continuation.finish()
  306. }
  307. }
  308. private func runConnection(_ connection: Connection, in group: inout DiscardingTaskGroup) {
  309. group.addTask {
  310. await connection.run()
  311. }
  312. group.addTask {
  313. for await event in connection.events {
  314. self.input.continuation.yield(.handleConnectionEvent(event))
  315. }
  316. }
  317. }
  318. }
  319. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  320. extension Subchannel {
  321. private enum State {
  322. /// Not connected and not actively connecting.
  323. case notConnected
  324. /// A connection attempt is in-progress.
  325. case connecting(Connecting)
  326. /// A connection has been established.
  327. case connected(Connected)
  328. /// The subchannel is closing.
  329. case closing(Closing)
  330. /// The subchannel is closed.
  331. case closed
  332. struct Connecting {
  333. var connection: Connection
  334. let addresses: [SocketAddress]
  335. var addressIterator: Array<SocketAddress>.Iterator
  336. var backoff: ConnectionBackoff.Iterator
  337. }
  338. struct Connected {
  339. var connection: Connection
  340. init(from state: Connecting) {
  341. self.connection = state.connection
  342. }
  343. }
  344. struct Closing {
  345. var connection: Connection
  346. init(from state: Connecting) {
  347. self.connection = state.connection
  348. }
  349. init(from state: Connected) {
  350. self.connection = state.connection
  351. }
  352. }
  353. mutating func makeConnection(
  354. to addresses: [SocketAddress],
  355. using connector: any HTTP2Connector,
  356. backoff: ConnectionBackoff,
  357. defaultCompression: CompressionAlgorithm,
  358. enabledCompression: CompressionAlgorithmSet
  359. ) -> Connection? {
  360. switch self {
  361. case .notConnected:
  362. var iterator = addresses.makeIterator()
  363. let address = iterator.next()! // addresses must not be empty.
  364. let connection = Connection(
  365. address: address,
  366. http2Connector: connector,
  367. defaultCompression: defaultCompression,
  368. enabledCompression: enabledCompression
  369. )
  370. let connecting = State.Connecting(
  371. connection: connection,
  372. addresses: addresses,
  373. addressIterator: iterator,
  374. backoff: backoff.makeIterator()
  375. )
  376. self = .connecting(connecting)
  377. return connection
  378. case .connecting, .connected, .closing, .closed:
  379. return nil
  380. }
  381. }
  382. enum OnClose {
  383. case none
  384. case shutdown
  385. case close(Connection)
  386. }
  387. mutating func close() -> OnClose {
  388. let onClose: OnClose
  389. switch self {
  390. case .notConnected:
  391. onClose = .shutdown
  392. case .connecting(let state):
  393. self = .closing(Closing(from: state))
  394. // Do nothing; the connection hasn't been established yet so can't be closed.
  395. onClose = .none
  396. case .connected(let state):
  397. self = .closing(Closing(from: state))
  398. onClose = .close(state.connection)
  399. case .closing, .closed:
  400. onClose = .none
  401. }
  402. return onClose
  403. }
  404. enum OnConnectSucceeded {
  405. case updateState
  406. case close(Connection)
  407. case none
  408. }
  409. mutating func connectSucceeded() -> OnConnectSucceeded {
  410. switch self {
  411. case .connecting(let state):
  412. self = .connected(Connected(from: state))
  413. return .updateState
  414. case .closing(let state):
  415. self = .closing(state)
  416. return .close(state.connection)
  417. case .notConnected, .connected, .closed:
  418. return .none
  419. }
  420. }
  421. enum OnConnectFailed {
  422. case none
  423. case connect(Connection)
  424. case backoff(Duration)
  425. case shutdown
  426. }
  427. mutating func connectFailed(connector: any HTTP2Connector) -> OnConnectFailed {
  428. switch self {
  429. case .connecting(var connecting):
  430. if let address = connecting.addressIterator.next() {
  431. connecting.connection = Connection(
  432. address: address,
  433. http2Connector: connector,
  434. defaultCompression: .none,
  435. enabledCompression: .all
  436. )
  437. self = .connecting(connecting)
  438. return .connect(connecting.connection)
  439. } else {
  440. connecting.addressIterator = connecting.addresses.makeIterator()
  441. let address = connecting.addressIterator.next()!
  442. connecting.connection = Connection(
  443. address: address,
  444. http2Connector: connector,
  445. defaultCompression: .none,
  446. enabledCompression: .all
  447. )
  448. let backoff = connecting.backoff.next()
  449. self = .connecting(connecting)
  450. return .backoff(backoff)
  451. }
  452. case .closing:
  453. self = .closed
  454. return .shutdown
  455. case .notConnected, .connected, .closed:
  456. return .none
  457. }
  458. }
  459. enum OnBackedOff {
  460. case none
  461. case connect(Connection)
  462. case shutdown
  463. }
  464. mutating func backedOff() -> OnBackedOff {
  465. switch self {
  466. case .connecting(let state):
  467. return .connect(state.connection)
  468. case .closing:
  469. self = .closed
  470. return .shutdown
  471. case .notConnected, .connected, .closed:
  472. return .none
  473. }
  474. }
  475. mutating func goingAway() -> Bool {
  476. switch self {
  477. case .connected(let state):
  478. self = .closing(Closing(from: state))
  479. return true
  480. case .notConnected, .closing, .connecting, .closed:
  481. return false
  482. }
  483. }
  484. mutating func closed(reason: Connection.CloseReason) -> Bool {
  485. switch self {
  486. case .connected, .closing:
  487. switch reason {
  488. case .idleTimeout, .keepaliveTimeout, .error:
  489. self = .notConnected
  490. case .initiatedLocally, .remote:
  491. self = .closed
  492. }
  493. return true
  494. case .notConnected, .connecting, .closed:
  495. return false
  496. }
  497. }
  498. }
  499. }