ConnectionManager.swift 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  22. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  23. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  24. // event loop is being used.
  25. @usableFromInline
  26. internal final class ConnectionManager: @unchecked Sendable {
  27. internal enum Reconnect {
  28. case none
  29. case after(TimeInterval)
  30. }
  31. internal struct ConnectingState {
  32. var backoffIterator: ConnectionBackoffIterator?
  33. var reconnect: Reconnect
  34. var candidate: EventLoopFuture<Channel>
  35. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  36. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  37. }
  38. internal struct ConnectedState {
  39. var backoffIterator: ConnectionBackoffIterator?
  40. var reconnect: Reconnect
  41. var candidate: Channel
  42. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  43. var multiplexer: HTTP2StreamMultiplexer
  44. var error: Error?
  45. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  46. self.backoffIterator = state.backoffIterator
  47. self.reconnect = state.reconnect
  48. self.candidate = candidate
  49. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  50. self.multiplexer = multiplexer
  51. }
  52. }
  53. internal struct ReadyState {
  54. var channel: Channel
  55. var multiplexer: HTTP2StreamMultiplexer
  56. var error: Error?
  57. init(from state: ConnectedState) {
  58. self.channel = state.candidate
  59. self.multiplexer = state.multiplexer
  60. }
  61. }
  62. internal struct TransientFailureState {
  63. var backoffIterator: ConnectionBackoffIterator?
  64. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  65. var scheduled: Scheduled<Void>
  66. var reason: Error
  67. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
  68. self.backoffIterator = state.backoffIterator
  69. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  70. self.scheduled = scheduled
  71. self.reason =
  72. reason
  73. ?? GRPCStatus(
  74. code: .unavailable,
  75. message: "Unexpected connection drop"
  76. )
  77. }
  78. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  79. self.backoffIterator = state.backoffIterator
  80. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  81. self.scheduled = scheduled
  82. self.reason =
  83. state.error
  84. ?? GRPCStatus(
  85. code: .unavailable,
  86. message: "Unexpected connection drop"
  87. )
  88. }
  89. init(
  90. from state: ReadyState,
  91. scheduled: Scheduled<Void>,
  92. backoffIterator: ConnectionBackoffIterator?
  93. ) {
  94. self.backoffIterator = backoffIterator
  95. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  96. self.scheduled = scheduled
  97. self.reason =
  98. state.error
  99. ?? GRPCStatus(
  100. code: .unavailable,
  101. message: "Unexpected connection drop"
  102. )
  103. }
  104. }
  105. internal struct ShutdownState {
  106. var closeFuture: EventLoopFuture<Void>
  107. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  108. /// this error.
  109. var reason: Error
  110. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  111. self.closeFuture = closeFuture
  112. self.reason = reason
  113. }
  114. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  115. return ShutdownState(
  116. closeFuture: closeFuture,
  117. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  118. )
  119. }
  120. }
  121. internal enum State {
  122. /// No `Channel` is required.
  123. ///
  124. /// Valid next states:
  125. /// - `connecting`
  126. /// - `shutdown`
  127. case idle(lastError: Error?)
  128. /// We're actively trying to establish a connection.
  129. ///
  130. /// Valid next states:
  131. /// - `active`
  132. /// - `transientFailure` (if our attempt fails and we're going to try again)
  133. /// - `shutdown`
  134. case connecting(ConnectingState)
  135. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  136. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  137. ///
  138. /// Valid next states:
  139. /// - `ready`
  140. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  141. /// to re-establish the connection)
  142. /// - `shutdown`
  143. case active(ConnectedState)
  144. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  145. /// the channel for making RPCs.
  146. ///
  147. /// Valid next states:
  148. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  149. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  150. /// - `shutdown`
  151. case ready(ReadyState)
  152. /// A `Channel` is desired, we'll attempt to create one in the future.
  153. ///
  154. /// Valid next states:
  155. /// - `connecting`
  156. /// - `shutdown`
  157. case transientFailure(TransientFailureState)
  158. /// We never want another `Channel`: this state is terminal.
  159. case shutdown(ShutdownState)
  160. fileprivate var label: String {
  161. switch self {
  162. case .idle:
  163. return "idle"
  164. case .connecting:
  165. return "connecting"
  166. case .active:
  167. return "active"
  168. case .ready:
  169. return "ready"
  170. case .transientFailure:
  171. return "transientFailure"
  172. case .shutdown:
  173. return "shutdown"
  174. }
  175. }
  176. }
  177. /// The last 'external' state we are in, a subset of the internal state.
  178. private var externalState: _ConnectivityState = .idle(nil)
  179. /// Update the external state, potentially notifying a delegate about the change.
  180. private func updateExternalState(to nextState: _ConnectivityState) {
  181. if !self.externalState.isSameState(as: nextState) {
  182. let oldState = self.externalState
  183. self.externalState = nextState
  184. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  185. }
  186. }
  187. /// Our current state.
  188. private var state: State {
  189. didSet {
  190. switch self.state {
  191. case let .idle(error):
  192. self.updateExternalState(to: .idle(error))
  193. self.updateConnectionID()
  194. case .connecting:
  195. self.updateExternalState(to: .connecting)
  196. // This is an internal state.
  197. case .active:
  198. ()
  199. case .ready:
  200. self.updateExternalState(to: .ready)
  201. case let .transientFailure(state):
  202. self.updateExternalState(to: .transientFailure(state.reason))
  203. self.updateConnectionID()
  204. case .shutdown:
  205. self.updateExternalState(to: .shutdown)
  206. }
  207. }
  208. }
  209. /// Returns whether the state is 'idle'.
  210. private var isIdle: Bool {
  211. self.eventLoop.assertInEventLoop()
  212. switch self.state {
  213. case .idle:
  214. return true
  215. case .connecting, .transientFailure, .active, .ready, .shutdown:
  216. return false
  217. }
  218. }
  219. /// Returns whether the state is 'shutdown'.
  220. private var isShutdown: Bool {
  221. self.eventLoop.assertInEventLoop()
  222. switch self.state {
  223. case .shutdown:
  224. return true
  225. case .idle, .connecting, .transientFailure, .active, .ready:
  226. return false
  227. }
  228. }
  229. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  230. private var multiplexer: HTTP2StreamMultiplexer? {
  231. self.eventLoop.assertInEventLoop()
  232. switch self.state {
  233. case let .ready(state):
  234. return state.multiplexer
  235. case .idle, .connecting, .transientFailure, .active, .shutdown:
  236. return nil
  237. }
  238. }
  239. /// The `EventLoop` that the managed connection will run on.
  240. internal let eventLoop: EventLoop
  241. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  242. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  243. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  244. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  245. /// An `EventLoopFuture<Channel>` provider.
  246. private let channelProvider: ConnectionManagerChannelProvider
  247. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  248. /// multiplexer.
  249. private let callStartBehavior: CallStartBehavior.Behavior
  250. /// The configuration to use when backing off between connection attempts, if reconnection
  251. /// attempts should be made at all.
  252. private let connectionBackoff: ConnectionBackoff?
  253. /// A logger.
  254. internal var logger: Logger
  255. private let connectionID: String
  256. private var channelNumber: UInt64
  257. private var channelNumberLock = NIOLock()
  258. private var _connectionIDAndNumber: String {
  259. return "\(self.connectionID)/\(self.channelNumber)"
  260. }
  261. private var connectionIDAndNumber: String {
  262. return self.channelNumberLock.withLock {
  263. return self._connectionIDAndNumber
  264. }
  265. }
  266. private func updateConnectionID() {
  267. self.channelNumberLock.withLock {
  268. self.channelNumber &+= 1
  269. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  270. }
  271. }
  272. internal func appendMetadata(to logger: inout Logger) {
  273. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  274. }
  275. internal convenience init(
  276. configuration: ClientConnection.Configuration,
  277. channelProvider: ConnectionManagerChannelProvider? = nil,
  278. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  279. logger: Logger
  280. ) {
  281. self.init(
  282. eventLoop: configuration.eventLoopGroup.next(),
  283. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  284. callStartBehavior: configuration.callStartBehavior.wrapped,
  285. connectionBackoff: configuration.connectionBackoff,
  286. connectivityDelegate: connectivityDelegate,
  287. http2Delegate: nil,
  288. logger: logger
  289. )
  290. }
  291. internal init(
  292. eventLoop: EventLoop,
  293. channelProvider: ConnectionManagerChannelProvider,
  294. callStartBehavior: CallStartBehavior.Behavior,
  295. connectionBackoff: ConnectionBackoff?,
  296. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  297. http2Delegate: ConnectionManagerHTTP2Delegate?,
  298. logger: Logger
  299. ) {
  300. // Setup the logger.
  301. var logger = logger
  302. let connectionID = UUID().uuidString
  303. let channelNumber: UInt64 = 0
  304. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  305. self.eventLoop = eventLoop
  306. self.state = .idle(lastError: nil)
  307. self.channelProvider = channelProvider
  308. self.callStartBehavior = callStartBehavior
  309. self.connectionBackoff = connectionBackoff
  310. self.connectivityDelegate = connectivityDelegate
  311. self.http2Delegate = http2Delegate
  312. self.connectionID = connectionID
  313. self.channelNumber = channelNumber
  314. self.logger = logger
  315. }
  316. /// Get the multiplexer from the underlying channel handling gRPC calls.
  317. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  318. /// one chance to connect - if not reconnections are managed here.
  319. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  320. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  321. switch self.callStartBehavior {
  322. case .waitsForConnectivity:
  323. return self.getHTTP2MultiplexerPatient()
  324. case .fastFailure:
  325. return self.getHTTP2MultiplexerOptimistic()
  326. }
  327. }
  328. if self.eventLoop.inEventLoop {
  329. return getHTTP2Multiplexer0()
  330. } else {
  331. return self.eventLoop.flatSubmit {
  332. getHTTP2Multiplexer0()
  333. }
  334. }
  335. }
  336. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  337. /// Reconnects are handled if necessary.
  338. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  339. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  340. switch self.state {
  341. case .idle:
  342. self.startConnecting()
  343. // We started connecting so we must transition to the `connecting` state.
  344. guard case let .connecting(connecting) = self.state else {
  345. self.unreachableState()
  346. }
  347. multiplexer = connecting.readyChannelMuxPromise.futureResult
  348. case let .connecting(state):
  349. multiplexer = state.readyChannelMuxPromise.futureResult
  350. case let .active(state):
  351. multiplexer = state.readyChannelMuxPromise.futureResult
  352. case let .ready(state):
  353. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  354. case let .transientFailure(state):
  355. multiplexer = state.readyChannelMuxPromise.futureResult
  356. case let .shutdown(state):
  357. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  358. }
  359. self.logger.debug(
  360. "vending multiplexer future",
  361. metadata: [
  362. "connectivity_state": "\(self.state.label)"
  363. ]
  364. )
  365. return multiplexer
  366. }
  367. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  368. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  369. ///
  370. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  371. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  372. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  373. self.eventLoop.preconditionInEventLoop()
  374. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  375. switch self.state {
  376. case .idle:
  377. self.startConnecting()
  378. // We started connecting so we must transition to the `connecting` state.
  379. guard case let .connecting(connecting) = self.state else {
  380. self.unreachableState()
  381. }
  382. return connecting.candidateMuxPromise.futureResult
  383. case let .connecting(state):
  384. return state.candidateMuxPromise.futureResult
  385. case let .active(active):
  386. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  387. case let .ready(ready):
  388. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  389. case let .transientFailure(state):
  390. return self.eventLoop.makeFailedFuture(state.reason)
  391. case let .shutdown(state):
  392. return self.eventLoop.makeFailedFuture(state.reason)
  393. }
  394. }()
  395. self.logger.debug(
  396. "vending fast-failing multiplexer future",
  397. metadata: [
  398. "connectivity_state": "\(self.state.label)"
  399. ]
  400. )
  401. return muxFuture
  402. }
  403. @usableFromInline
  404. internal enum ShutdownMode {
  405. /// Closes the underlying channel without waiting for existing RPCs to complete.
  406. case forceful
  407. /// Allows running RPCs to run their course before closing the underlying channel. No new
  408. /// streams may be created.
  409. case graceful(NIODeadline)
  410. }
  411. /// Shutdown the underlying connection.
  412. ///
  413. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  414. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  415. let promise = self.eventLoop.makePromise(of: Void.self)
  416. self.shutdown(mode: mode, promise: promise)
  417. return promise.futureResult
  418. }
  419. /// Shutdown the underlying connection.
  420. ///
  421. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  422. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  423. if self.eventLoop.inEventLoop {
  424. self._shutdown(mode: mode, promise: promise)
  425. } else {
  426. self.eventLoop.execute {
  427. self._shutdown(mode: mode, promise: promise)
  428. }
  429. }
  430. }
  431. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  432. self.logger.debug(
  433. "shutting down connection",
  434. metadata: [
  435. "connectivity_state": "\(self.state.label)",
  436. "shutdown.mode": "\(mode)",
  437. ]
  438. )
  439. switch self.state {
  440. // We don't have a channel and we don't want one, easy!
  441. case .idle:
  442. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  443. self.state = .shutdown(shutdown)
  444. promise.succeed(())
  445. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  446. // the shutdown future and deal with any fallout from the connecting channel without the
  447. // application knowing.
  448. case let .connecting(state):
  449. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  450. self.state = .shutdown(shutdown)
  451. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  452. // connect the application shouldn't have access to the channel or multiplexer.
  453. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  454. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  455. // Complete the shutdown promise when the connection attempt has completed.
  456. state.candidate.whenComplete {
  457. switch $0 {
  458. case let .success(channel):
  459. // In case we do successfully connect, close on the next loop tick. When connecting a
  460. // channel NIO will complete the promise for the channel before firing channel active.
  461. // That means we may close and fire inactive before active which HTTP/2 will be unhappy
  462. // about.
  463. self.eventLoop.execute {
  464. channel.close(mode: .all, promise: nil)
  465. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  466. }
  467. case .failure:
  468. // We failed to connect, that's fine we still shutdown successfully.
  469. promise.succeed(())
  470. }
  471. }
  472. // We have an active channel but the application doesn't know about it yet. We'll do the same
  473. // as for `.connecting`.
  474. case let .active(state):
  475. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  476. self.state = .shutdown(shutdown)
  477. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  478. // connect the application shouldn't have access to the channel or multiplexer.
  479. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  480. // We have a channel, close it. We only create streams in the ready state so there's no need
  481. // to quiesce here.
  482. state.candidate.close(mode: .all, promise: nil)
  483. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  484. // The channel is up and running: the application could be using it. We can close it and
  485. // return the `closeFuture`.
  486. case let .ready(state):
  487. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  488. self.state = .shutdown(shutdown)
  489. switch mode {
  490. case .forceful:
  491. // We have a channel, close it.
  492. state.channel.close(mode: .all, promise: nil)
  493. case let .graceful(deadline):
  494. // If we don't close by the deadline forcibly close the channel.
  495. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  496. self.logger.info("shutdown timer expired, forcibly closing connection")
  497. state.channel.close(mode: .all, promise: nil)
  498. }
  499. // Cancel the force close if we close normally first.
  500. state.channel.closeFuture.whenComplete { _ in
  501. scheduledForceClose.cancel()
  502. }
  503. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  504. // the channel when all streams have been closed.
  505. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  506. }
  507. // Complete the promise when we eventually close.
  508. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  509. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  510. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  511. // if we cancelled too late.
  512. case let .transientFailure(state):
  513. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  514. self.state = .shutdown(shutdown)
  515. // Stop the creation of a new channel, if we can. If we can't then the task to
  516. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  517. state.scheduled.cancel()
  518. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  519. // connect the application shouldn't should have access to the channel.
  520. state.readyChannelMuxPromise.fail(shutdown.reason)
  521. // No active channel, so complete the shutdown promise now.
  522. promise.succeed(())
  523. // We're already shutdown; there's nothing to do.
  524. case let .shutdown(state):
  525. promise.completeWith(state.closeFuture)
  526. }
  527. }
  528. /// Registers a callback which fires when the current active connection is closed.
  529. ///
  530. /// If there is a connection, the callback will be invoked with `true` when the connection is
  531. /// closed. Otherwise the callback is invoked with `false`.
  532. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  533. if self.eventLoop.inEventLoop {
  534. self._onCurrentConnectionClose(onClose)
  535. } else {
  536. self.eventLoop.execute {
  537. self._onCurrentConnectionClose(onClose)
  538. }
  539. }
  540. }
  541. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  542. self.eventLoop.assertInEventLoop()
  543. switch self.state {
  544. case let .ready(state):
  545. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  546. case .idle, .connecting, .active, .transientFailure, .shutdown:
  547. onClose(false)
  548. }
  549. }
  550. // MARK: - State changes from the channel handler.
  551. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  552. /// some context.
  553. internal func channelError(_ error: Error) {
  554. self.eventLoop.preconditionInEventLoop()
  555. switch self.state {
  556. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  557. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  558. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  559. case .idle:
  560. self.logger.warning(
  561. "ignoring unexpected error in idle",
  562. metadata: [
  563. MetadataKey.error: "\(error)"
  564. ]
  565. )
  566. case .connecting:
  567. self.connectionFailed(withError: error)
  568. case var .active(state):
  569. state.error = error
  570. self.state = .active(state)
  571. case var .ready(state):
  572. state.error = error
  573. self.state = .ready(state)
  574. // If we've already in one of these states, then additional errors aren't helpful to us.
  575. case .transientFailure, .shutdown:
  576. ()
  577. }
  578. }
  579. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  580. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  581. self.eventLoop.preconditionInEventLoop()
  582. self.logger.debug(
  583. "activating connection",
  584. metadata: [
  585. "connectivity_state": "\(self.state.label)"
  586. ]
  587. )
  588. switch self.state {
  589. case let .connecting(connecting):
  590. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  591. self.state = .active(connected)
  592. // Optimistic connections are happy this this level of setup.
  593. connecting.candidateMuxPromise.succeed(multiplexer)
  594. // Application called shutdown before the channel become active; we should close it.
  595. case .shutdown:
  596. channel.close(mode: .all, promise: nil)
  597. case .idle, .transientFailure:
  598. // Received a channelActive when not connecting. Can happen if channelActive and
  599. // channelInactive are reordered. Ignore.
  600. ()
  601. case .active, .ready:
  602. // Received a second 'channelActive', already active so ignore.
  603. ()
  604. }
  605. }
  606. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  607. /// Must be called on the `EventLoop`.
  608. internal func channelInactive() {
  609. self.eventLoop.preconditionInEventLoop()
  610. self.logger.debug(
  611. "deactivating connection",
  612. metadata: [
  613. "connectivity_state": "\(self.state.label)"
  614. ]
  615. )
  616. switch self.state {
  617. // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
  618. // common but we should tolerate it.
  619. case let .connecting(connecting):
  620. // Should we try connecting again?
  621. switch connecting.reconnect {
  622. // No, shutdown instead.
  623. case .none:
  624. self.logger.debug("shutting down connection")
  625. let error = GRPCStatus(
  626. code: .unavailable,
  627. message: "The connection was dropped and connection re-establishment is disabled"
  628. )
  629. let shutdownState = ShutdownState(
  630. closeFuture: self.eventLoop.makeSucceededFuture(()),
  631. reason: error
  632. )
  633. self.state = .shutdown(shutdownState)
  634. // Shutting down, so fail the outstanding promises.
  635. connecting.readyChannelMuxPromise.fail(error)
  636. connecting.candidateMuxPromise.fail(error)
  637. // Yes, after some time.
  638. case let .after(delay):
  639. let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
  640. // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
  641. connecting.candidateMuxPromise.fail(error)
  642. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  643. self.startConnecting()
  644. }
  645. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  646. self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
  647. }
  648. // The channel is `active` but not `ready`. Should we try again?
  649. case let .active(active):
  650. switch active.reconnect {
  651. // No, shutdown instead.
  652. case .none:
  653. self.logger.debug("shutting down connection")
  654. let error = GRPCStatus(
  655. code: .unavailable,
  656. message: "The connection was dropped and connection re-establishment is disabled"
  657. )
  658. let shutdownState = ShutdownState(
  659. closeFuture: self.eventLoop.makeSucceededFuture(()),
  660. reason: error
  661. )
  662. self.state = .shutdown(shutdownState)
  663. active.readyChannelMuxPromise.fail(error)
  664. // Yes, after some time.
  665. case let .after(delay):
  666. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  667. self.startConnecting()
  668. }
  669. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  670. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  671. }
  672. // The channel was ready and working fine but something went wrong. Should we try to replace
  673. // the channel?
  674. case let .ready(ready):
  675. // No, no backoff is configured.
  676. if self.connectionBackoff == nil {
  677. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  678. self.state = .shutdown(
  679. ShutdownState(
  680. closeFuture: ready.channel.closeFuture,
  681. reason: GRPCStatus(
  682. code: .unavailable,
  683. message: "The connection was dropped and a reconnect was not configured"
  684. )
  685. )
  686. )
  687. } else {
  688. // Yes, start connecting now. We should go via `transientFailure`, however.
  689. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  690. self.startConnecting()
  691. }
  692. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  693. let backoffIterator = self.connectionBackoff?.makeIterator()
  694. self.state = .transientFailure(
  695. TransientFailureState(
  696. from: ready,
  697. scheduled: scheduled,
  698. backoffIterator: backoffIterator
  699. )
  700. )
  701. }
  702. // This is fine: we expect the channel to become inactive after becoming idle.
  703. case .idle:
  704. ()
  705. // We're already shutdown, that's fine.
  706. case .shutdown:
  707. ()
  708. // Received 'channelInactive' twice; fine, ignore.
  709. case .transientFailure:
  710. ()
  711. }
  712. }
  713. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  714. /// called on the `EventLoop`.
  715. internal func ready() {
  716. self.eventLoop.preconditionInEventLoop()
  717. self.logger.debug(
  718. "connection ready",
  719. metadata: [
  720. "connectivity_state": "\(self.state.label)"
  721. ]
  722. )
  723. switch self.state {
  724. case let .active(connected):
  725. self.state = .ready(ReadyState(from: connected))
  726. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  727. case .shutdown:
  728. ()
  729. case .idle, .transientFailure:
  730. // No connection or connection attempt exists but connection was marked as ready. This is
  731. // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
  732. // error to.
  733. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  734. case .connecting:
  735. // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
  736. // mode.
  737. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  738. case .ready:
  739. // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
  740. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  741. }
  742. }
  743. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  744. /// the `EventLoop`.
  745. internal func idle() {
  746. self.eventLoop.preconditionInEventLoop()
  747. self.logger.debug(
  748. "idling connection",
  749. metadata: [
  750. "connectivity_state": "\(self.state.label)"
  751. ]
  752. )
  753. switch self.state {
  754. case let .active(state):
  755. // This state is reachable if the keepalive timer fires before we reach the ready state.
  756. self.state = .idle(lastError: state.error)
  757. state.readyChannelMuxPromise
  758. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  759. case let .ready(state):
  760. self.state = .idle(lastError: state.error)
  761. case .shutdown:
  762. // This is expected when the connection is closed by the user: when the channel becomes
  763. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  764. // 'channelInactive()'.
  765. ()
  766. case .idle, .transientFailure:
  767. // There's no connection to idle; ignore.
  768. ()
  769. case .connecting:
  770. // The idle watchdog is started when the connection is active, this shouldn't happen
  771. // in the connecting state. Ignore it in release mode.
  772. assertionFailure("tried to idle a connection in the \(self.state.label) state")
  773. }
  774. }
  775. internal func streamOpened() {
  776. self.eventLoop.assertInEventLoop()
  777. self.http2Delegate?.streamOpened(self)
  778. }
  779. internal func streamClosed() {
  780. self.eventLoop.assertInEventLoop()
  781. self.http2Delegate?.streamClosed(self)
  782. }
  783. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  784. self.eventLoop.assertInEventLoop()
  785. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  786. self,
  787. maxConcurrentStreams: maxConcurrentStreams
  788. )
  789. }
  790. /// The connection has started quiescing: notify the connectivity monitor of this.
  791. internal func beginQuiescing() {
  792. self.eventLoop.assertInEventLoop()
  793. self.connectivityDelegate?.connectionIsQuiescing(self)
  794. }
  795. }
  796. extension ConnectionManager {
  797. // A connection attempt failed; we never established a connection.
  798. private func connectionFailed(withError error: Error) {
  799. self.eventLoop.preconditionInEventLoop()
  800. switch self.state {
  801. case let .connecting(connecting):
  802. // Should we reconnect?
  803. switch connecting.reconnect {
  804. // No, shutdown.
  805. case .none:
  806. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  807. self.state = .shutdown(
  808. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  809. )
  810. connecting.readyChannelMuxPromise.fail(error)
  811. connecting.candidateMuxPromise.fail(error)
  812. // Yes, after a delay.
  813. case let .after(delay):
  814. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  815. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  816. self.startConnecting()
  817. }
  818. self.state = .transientFailure(
  819. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  820. )
  821. // Candidate mux users are not willing to wait.
  822. connecting.candidateMuxPromise.fail(error)
  823. }
  824. // The application must have called shutdown while we were trying to establish a connection
  825. // which was doomed to fail anyway. That's fine, we can ignore this.
  826. case .shutdown:
  827. ()
  828. // Connection attempt failed, but no connection attempt is in progress.
  829. case .idle, .active, .ready, .transientFailure:
  830. // Nothing we can do other than ignore in release mode.
  831. assertionFailure("connect promise failed in \(self.state.label) state")
  832. }
  833. }
  834. }
  835. extension ConnectionManager {
  836. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  837. // states. Must be called on the `EventLoop`.
  838. private func startConnecting() {
  839. self.eventLoop.assertInEventLoop()
  840. switch self.state {
  841. case .idle:
  842. let iterator = self.connectionBackoff?.makeIterator()
  843. self.startConnecting(
  844. backoffIterator: iterator,
  845. muxPromise: self.eventLoop.makePromise()
  846. )
  847. case let .transientFailure(pending):
  848. self.startConnecting(
  849. backoffIterator: pending.backoffIterator,
  850. muxPromise: pending.readyChannelMuxPromise
  851. )
  852. // We shutdown before a scheduled connection attempt had started.
  853. case .shutdown:
  854. ()
  855. // We only call startConnecting() if the connection does not exist and after checking what the
  856. // current state is, so none of these states should be reachable.
  857. case .connecting:
  858. self.unreachableState()
  859. case .active:
  860. self.unreachableState()
  861. case .ready:
  862. self.unreachableState()
  863. }
  864. }
  865. private func startConnecting(
  866. backoffIterator: ConnectionBackoffIterator?,
  867. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  868. ) {
  869. let timeoutAndBackoff = backoffIterator?.next()
  870. // We're already on the event loop: submit the connect so it starts after we've made the
  871. // state change to `.connecting`.
  872. self.eventLoop.assertInEventLoop()
  873. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  874. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  875. managedBy: self,
  876. onEventLoop: self.eventLoop,
  877. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  878. logger: self.logger
  879. )
  880. channel.whenFailure { error in
  881. self.connectionFailed(withError: error)
  882. }
  883. return channel
  884. }
  885. // Should we reconnect if the candidate channel fails?
  886. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  887. let connecting = ConnectingState(
  888. backoffIterator: backoffIterator,
  889. reconnect: reconnect,
  890. candidate: candidate,
  891. readyChannelMuxPromise: muxPromise,
  892. candidateMuxPromise: self.eventLoop.makePromise()
  893. )
  894. self.state = .connecting(connecting)
  895. }
  896. }
  897. extension ConnectionManager {
  898. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  899. /// executing on the same `EventLoop` as the connection manager.
  900. internal var sync: Sync {
  901. return Sync(self)
  902. }
  903. internal struct Sync {
  904. private let manager: ConnectionManager
  905. fileprivate init(_ manager: ConnectionManager) {
  906. self.manager = manager
  907. }
  908. /// A delegate for connectivity changes.
  909. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  910. get {
  911. self.manager.eventLoop.assertInEventLoop()
  912. return self.manager.connectivityDelegate
  913. }
  914. nonmutating set {
  915. self.manager.eventLoop.assertInEventLoop()
  916. self.manager.connectivityDelegate = newValue
  917. }
  918. }
  919. /// A delegate for HTTP/2 connection changes.
  920. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  921. get {
  922. self.manager.eventLoop.assertInEventLoop()
  923. return self.manager.http2Delegate
  924. }
  925. nonmutating set {
  926. self.manager.eventLoop.assertInEventLoop()
  927. self.manager.http2Delegate = newValue
  928. }
  929. }
  930. /// Returns `true` if the connection is in the idle state.
  931. internal var isIdle: Bool {
  932. return self.manager.isIdle
  933. }
  934. /// Returne `true` if the connection is in the shutdown state.
  935. internal var isShutdown: Bool {
  936. return self.manager.isShutdown
  937. }
  938. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  939. /// other state.
  940. internal var multiplexer: HTTP2StreamMultiplexer? {
  941. return self.manager.multiplexer
  942. }
  943. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  944. internal func startConnecting() {
  945. self.manager.startConnecting()
  946. }
  947. }
  948. }
  949. extension ConnectionManager {
  950. private func unreachableState(
  951. function: StaticString = #function,
  952. file: StaticString = #fileID,
  953. line: UInt = #line
  954. ) -> Never {
  955. fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
  956. }
  957. }