Subchannel.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. /// A ``Subchannel`` provides communication to a single ``Endpoint``.
  19. ///
  20. /// Each ``Subchannel`` starts in an 'idle' state where it isn't attempting to connect to an
  21. /// endpoint. You can tell it to start connecting by calling ``connect()`` and you can listen
  22. /// to connectivity state changes by consuming the ``events`` sequence.
  23. ///
  24. /// You must call ``close()`` on the ``Subchannel`` when it's no longer required. This will move
  25. /// it to the ``ConnectivityState/shutdown`` state: existing RPCs may continue but all subsequent
  26. /// calls to ``makeStream(descriptor:options:)`` will fail.
  27. ///
  28. /// To use the ``Subchannel`` you must run it in a task:
  29. ///
  30. /// ```swift
  31. /// await withTaskGroup(of: Void.self) { group in
  32. /// group.addTask { await subchannel.run() }
  33. ///
  34. /// for await event in subchannel.events {
  35. /// switch event {
  36. /// case .connectivityStateChanged(.ready):
  37. /// // ...
  38. /// default:
  39. /// // ...
  40. /// }
  41. /// }
  42. /// }
  43. /// ```
  44. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  45. struct Subchannel {
  46. enum Event: Sendable, Hashable {
  47. /// The connection received a GOAWAY and will close soon. No new streams
  48. /// should be opened on this connection.
  49. case goingAway
  50. /// The connectivity state of the subchannel changed.
  51. case connectivityStateChanged(ConnectivityState)
  52. /// The subchannel requests that the load balancer re-resolves names.
  53. case requiresNameResolution
  54. }
  55. private enum Input: Sendable {
  56. /// Request that the connection starts connecting.
  57. case connect
  58. /// A backoff period has ended.
  59. case backedOff
  60. /// Close the connection, if possible.
  61. case close
  62. /// Handle the event from the underlying connection object.
  63. case handleConnectionEvent(Connection.Event)
  64. }
  65. /// Events which can happen to the subchannel.
  66. private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
  67. /// Inputs which this subchannel should react to.
  68. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  69. /// The state of the subchannel.
  70. private let state: NIOLockedValueBox<State>
  71. /// The endpoint this subchannel is targeting.
  72. let endpoint: Endpoint
  73. /// The ID of the subchannel.
  74. let id: SubchannelID
  75. /// A factory for connections.
  76. private let connector: any HTTP2Connector
  77. /// The connection backoff configuration used by the subchannel when establishing a connection.
  78. private let backoff: ConnectionBackoff
  79. /// The default compression algorithm used for requests.
  80. private let defaultCompression: CompressionAlgorithm
  81. /// The set of enabled compression algorithms.
  82. private let enabledCompression: CompressionAlgorithmSet
  83. init(
  84. endpoint: Endpoint,
  85. id: SubchannelID,
  86. connector: any HTTP2Connector,
  87. backoff: ConnectionBackoff,
  88. defaultCompression: CompressionAlgorithm,
  89. enabledCompression: CompressionAlgorithmSet
  90. ) {
  91. assert(!endpoint.addresses.isEmpty, "endpoint.addresses mustn't be empty")
  92. self.state = NIOLockedValueBox(.notConnected)
  93. self.endpoint = endpoint
  94. self.id = id
  95. self.connector = connector
  96. self.backoff = backoff
  97. self.defaultCompression = defaultCompression
  98. self.enabledCompression = enabledCompression
  99. self.event = AsyncStream.makeStream(of: Event.self)
  100. self.input = AsyncStream.makeStream(of: Input.self)
  101. // Subchannel always starts in the idle state.
  102. self.event.continuation.yield(.connectivityStateChanged(.idle))
  103. }
  104. }
  105. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  106. extension Subchannel {
  107. /// A stream of events which can happen to the subchannel.
  108. var events: AsyncStream<Event> {
  109. self.event.stream
  110. }
  111. /// Run the subchannel.
  112. ///
  113. /// Running the subchannel will attempt to maintain a connection to a remote endpoint. At times
  114. /// the connection may be idle but it will reconnect on-demand when a stream is requested. If
  115. /// connect attempts fail then the subchannel may progressively spend longer in a transient
  116. /// failure state.
  117. ///
  118. /// Events and state changes can be observed via the ``events`` stream.
  119. func run() async {
  120. await withDiscardingTaskGroup { group in
  121. for await input in self.input.stream {
  122. switch input {
  123. case .connect:
  124. self.handleConnectInput(in: &group)
  125. case .backedOff:
  126. self.handleBackedOffInput(in: &group)
  127. case .close:
  128. self.handleCloseInput(in: &group)
  129. case .handleConnectionEvent(let event):
  130. self.handleConnectionEvent(event, in: &group)
  131. }
  132. }
  133. }
  134. // Once the task group is done, the event stream must also be finished. In normal operation
  135. // this is handled via other paths. For cancellation it must be finished explicitly.
  136. if Task.isCancelled {
  137. self.event.continuation.finish()
  138. }
  139. }
  140. /// Initiate a connection attempt, if possible.
  141. func connect() {
  142. self.input.continuation.yield(.connect)
  143. }
  144. /// Initiates graceful shutdown, if possible.
  145. func close() {
  146. self.input.continuation.yield(.close)
  147. }
  148. /// Make a stream using the subchannel if it's ready.
  149. ///
  150. /// - Parameter descriptor: A descriptor of the method to create a stream for.
  151. /// - Returns: The open stream.
  152. func makeStream(
  153. descriptor: MethodDescriptor,
  154. options: CallOptions
  155. ) async throws -> Connection.Stream {
  156. let connection: Connection? = self.state.withLockedValue { state in
  157. switch state {
  158. case .notConnected, .connecting, .closing, .closed:
  159. return nil
  160. case .connected(let connected):
  161. return connected.connection
  162. }
  163. }
  164. guard let connection = connection else {
  165. throw RPCError(code: .unavailable, message: "subchannel isn't ready")
  166. }
  167. return try await connection.makeStream(descriptor: descriptor, options: options)
  168. }
  169. }
  170. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  171. extension Subchannel {
  172. private func handleConnectInput(in group: inout DiscardingTaskGroup) {
  173. let connection = self.state.withLockedValue { state in
  174. state.makeConnection(
  175. to: self.endpoint.addresses,
  176. using: self.connector,
  177. backoff: self.backoff,
  178. defaultCompression: self.defaultCompression,
  179. enabledCompression: self.enabledCompression
  180. )
  181. }
  182. guard let connection = connection else {
  183. // Not in a state to start a connection.
  184. return
  185. }
  186. // About to start connecting a new connection; emit a state change event.
  187. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  188. self.runConnection(connection, in: &group)
  189. }
  190. private func handleBackedOffInput(in group: inout DiscardingTaskGroup) {
  191. switch self.state.withLockedValue({ $0.backedOff() }) {
  192. case .none:
  193. ()
  194. case .connect(let connection):
  195. // About to start connecting, emit a state change event.
  196. self.event.continuation.yield(.connectivityStateChanged(.connecting))
  197. self.runConnection(connection, in: &group)
  198. case .shutdown:
  199. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  200. // Close the event streams.
  201. self.event.continuation.finish()
  202. self.input.continuation.finish()
  203. }
  204. }
  205. private func handleCloseInput(in group: inout DiscardingTaskGroup) {
  206. switch self.state.withLockedValue({ $0.close() }) {
  207. case .none:
  208. ()
  209. case .close(let connection):
  210. connection.close()
  211. case .shutdown:
  212. // Connection closed because the load balancer asked it to, so notify the load balancer.
  213. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  214. // At this point there are no more events: close the event streams.
  215. self.event.continuation.finish()
  216. self.input.continuation.finish()
  217. }
  218. }
  219. private func handleConnectionEvent(
  220. _ event: Connection.Event,
  221. in group: inout DiscardingTaskGroup
  222. ) {
  223. switch event {
  224. case .connectSucceeded:
  225. self.handleConnectSucceededEvent()
  226. case .connectFailed:
  227. self.handleConnectFailedEvent(in: &group)
  228. case .goingAway:
  229. self.handleGoingAwayEvent()
  230. case .closed(let reason):
  231. self.handleConnectionClosedEvent(reason, in: &group)
  232. }
  233. }
  234. private func handleConnectSucceededEvent() {
  235. switch self.state.withLockedValue({ $0.connectSucceeded() }) {
  236. case .updateState:
  237. // Emit a connectivity state change: the load balancer can now use this subchannel.
  238. self.event.continuation.yield(.connectivityStateChanged(.ready))
  239. case .close(let connection):
  240. connection.close()
  241. case .none:
  242. ()
  243. }
  244. }
  245. private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
  246. let onConnectFailed = self.state.withLockedValue { $0.connectFailed(connector: self.connector) }
  247. switch onConnectFailed {
  248. case .connect(let connection):
  249. // Try the next address.
  250. self.runConnection(connection, in: &group)
  251. case .backoff(let duration):
  252. // All addresses have been tried, backoff for some time.
  253. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  254. group.addTask {
  255. do {
  256. try await Task.sleep(for: duration)
  257. self.input.continuation.yield(.backedOff)
  258. } catch {
  259. // Can only be a cancellation error, swallow it. No further connection attempts will be
  260. // made.
  261. ()
  262. }
  263. }
  264. case .shutdown:
  265. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  266. // No more events, close the streams.
  267. self.event.continuation.finish()
  268. self.input.continuation.finish()
  269. case .none:
  270. ()
  271. }
  272. }
  273. private func handleGoingAwayEvent() {
  274. let isGoingAway = self.state.withLockedValue { $0.goingAway() }
  275. guard isGoingAway else { return }
  276. // Notify the load balancer that the subchannel is going away to stop it from being used.
  277. self.event.continuation.yield(.goingAway)
  278. // A GOAWAY also means that the load balancer should re-resolve as the available servers
  279. // may have changed.
  280. self.event.continuation.yield(.requiresNameResolution)
  281. }
  282. private func handleConnectionClosedEvent(
  283. _ reason: Connection.CloseReason,
  284. in group: inout DiscardingTaskGroup
  285. ) {
  286. let isClosed = self.state.withLockedValue { $0.closed(reason: reason) }
  287. guard isClosed else { return }
  288. switch reason {
  289. case .idleTimeout, .remote, .error(_, wasIdle: true):
  290. // Connection closed due to an idle timeout or the remote telling it to GOAWAY; notify the
  291. // load balancer about this.
  292. self.event.continuation.yield(.connectivityStateChanged(.idle))
  293. case .keepaliveTimeout, .error(_, wasIdle: false):
  294. // Unclean closes trigger a transient failure state change and a name resolution.
  295. self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
  296. self.event.continuation.yield(.requiresNameResolution)
  297. // Attempt to reconnect.
  298. self.handleConnectInput(in: &group)
  299. case .initiatedLocally:
  300. // Connection closed because the load balancer asked it to, so notify the load balancer.
  301. self.event.continuation.yield(.connectivityStateChanged(.shutdown))
  302. // At this point there are no more events: close the event streams.
  303. self.event.continuation.finish()
  304. self.input.continuation.finish()
  305. }
  306. }
  307. private func runConnection(_ connection: Connection, in group: inout DiscardingTaskGroup) {
  308. group.addTask {
  309. await connection.run()
  310. }
  311. group.addTask {
  312. for await event in connection.events {
  313. self.input.continuation.yield(.handleConnectionEvent(event))
  314. }
  315. }
  316. }
  317. }
  318. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  319. extension Subchannel {
  320. private enum State {
  321. /// Not connected and not actively connecting.
  322. case notConnected
  323. /// A connection attempt is in-progress.
  324. case connecting(Connecting)
  325. /// A connection has been established.
  326. case connected(Connected)
  327. /// The subchannel is closing.
  328. case closing(Closing)
  329. /// The subchannel is closed.
  330. case closed
  331. struct Connecting {
  332. var connection: Connection
  333. let addresses: [SocketAddress]
  334. var addressIterator: Array<SocketAddress>.Iterator
  335. var backoff: ConnectionBackoff.Iterator
  336. }
  337. struct Connected {
  338. var connection: Connection
  339. init(from state: Connecting) {
  340. self.connection = state.connection
  341. }
  342. }
  343. struct Closing {
  344. var connection: Connection
  345. init(from state: Connecting) {
  346. self.connection = state.connection
  347. }
  348. init(from state: Connected) {
  349. self.connection = state.connection
  350. }
  351. }
  352. mutating func makeConnection(
  353. to addresses: [SocketAddress],
  354. using connector: any HTTP2Connector,
  355. backoff: ConnectionBackoff,
  356. defaultCompression: CompressionAlgorithm,
  357. enabledCompression: CompressionAlgorithmSet
  358. ) -> Connection? {
  359. switch self {
  360. case .notConnected:
  361. var iterator = addresses.makeIterator()
  362. let address = iterator.next()! // addresses must not be empty.
  363. let connection = Connection(
  364. address: address,
  365. http2Connector: connector,
  366. defaultCompression: defaultCompression,
  367. enabledCompression: enabledCompression
  368. )
  369. let connecting = State.Connecting(
  370. connection: connection,
  371. addresses: addresses,
  372. addressIterator: iterator,
  373. backoff: backoff.makeIterator()
  374. )
  375. self = .connecting(connecting)
  376. return connection
  377. case .connecting, .connected, .closing, .closed:
  378. return nil
  379. }
  380. }
  381. enum OnClose {
  382. case none
  383. case shutdown
  384. case close(Connection)
  385. }
  386. mutating func close() -> OnClose {
  387. let onClose: OnClose
  388. switch self {
  389. case .notConnected:
  390. onClose = .shutdown
  391. case .connecting(let state):
  392. self = .closing(Closing(from: state))
  393. // Do nothing; the connection hasn't been established yet so can't be closed.
  394. onClose = .none
  395. case .connected(let state):
  396. self = .closing(Closing(from: state))
  397. onClose = .close(state.connection)
  398. case .closing, .closed:
  399. onClose = .none
  400. }
  401. return onClose
  402. }
  403. enum OnConnectSucceeded {
  404. case updateState
  405. case close(Connection)
  406. case none
  407. }
  408. mutating func connectSucceeded() -> OnConnectSucceeded {
  409. switch self {
  410. case .connecting(let state):
  411. self = .connected(Connected(from: state))
  412. return .updateState
  413. case .closing(let state):
  414. self = .closing(state)
  415. return .close(state.connection)
  416. case .notConnected, .connected, .closed:
  417. return .none
  418. }
  419. }
  420. enum OnConnectFailed {
  421. case none
  422. case connect(Connection)
  423. case backoff(Duration)
  424. case shutdown
  425. }
  426. mutating func connectFailed(connector: any HTTP2Connector) -> OnConnectFailed {
  427. switch self {
  428. case .connecting(var connecting):
  429. if let address = connecting.addressIterator.next() {
  430. connecting.connection = Connection(
  431. address: address,
  432. http2Connector: connector,
  433. defaultCompression: .none,
  434. enabledCompression: .all
  435. )
  436. self = .connecting(connecting)
  437. return .connect(connecting.connection)
  438. } else {
  439. connecting.addressIterator = connecting.addresses.makeIterator()
  440. let address = connecting.addressIterator.next()!
  441. connecting.connection = Connection(
  442. address: address,
  443. http2Connector: connector,
  444. defaultCompression: .none,
  445. enabledCompression: .all
  446. )
  447. let backoff = connecting.backoff.next()
  448. self = .connecting(connecting)
  449. return .backoff(backoff)
  450. }
  451. case .closing:
  452. self = .closed
  453. return .shutdown
  454. case .notConnected, .connected, .closed:
  455. return .none
  456. }
  457. }
  458. enum OnBackedOff {
  459. case none
  460. case connect(Connection)
  461. case shutdown
  462. }
  463. mutating func backedOff() -> OnBackedOff {
  464. switch self {
  465. case .connecting(let state):
  466. return .connect(state.connection)
  467. case .closing:
  468. self = .closed
  469. return .shutdown
  470. case .notConnected, .connected, .closed:
  471. return .none
  472. }
  473. }
  474. mutating func goingAway() -> Bool {
  475. switch self {
  476. case .connected(let state):
  477. self = .closing(Closing(from: state))
  478. return true
  479. case .notConnected, .closing, .connecting, .closed:
  480. return false
  481. }
  482. }
  483. mutating func closed(reason: Connection.CloseReason) -> Bool {
  484. switch self {
  485. case .connected, .closing:
  486. switch reason {
  487. case .idleTimeout, .keepaliveTimeout, .error, .remote:
  488. self = .notConnected
  489. case .initiatedLocally:
  490. self = .closed
  491. }
  492. return true
  493. case .notConnected, .connecting, .closed:
  494. return false
  495. }
  496. }
  497. }
  498. }