ConnectionManager.swift 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. @usableFromInline
  22. internal final class ConnectionManager {
  23. internal enum Reconnect {
  24. case none
  25. case after(TimeInterval)
  26. }
  27. internal struct ConnectingState {
  28. var backoffIterator: ConnectionBackoffIterator?
  29. var reconnect: Reconnect
  30. var candidate: EventLoopFuture<Channel>
  31. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  32. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  33. }
  34. internal struct ConnectedState {
  35. var backoffIterator: ConnectionBackoffIterator?
  36. var reconnect: Reconnect
  37. var candidate: Channel
  38. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  39. var multiplexer: HTTP2StreamMultiplexer
  40. var error: Error?
  41. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  42. self.backoffIterator = state.backoffIterator
  43. self.reconnect = state.reconnect
  44. self.candidate = candidate
  45. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  46. self.multiplexer = multiplexer
  47. }
  48. }
  49. internal struct ReadyState {
  50. var channel: Channel
  51. var multiplexer: HTTP2StreamMultiplexer
  52. var error: Error?
  53. init(from state: ConnectedState) {
  54. self.channel = state.candidate
  55. self.multiplexer = state.multiplexer
  56. }
  57. }
  58. internal struct TransientFailureState {
  59. var backoffIterator: ConnectionBackoffIterator?
  60. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  61. var scheduled: Scheduled<Void>
  62. var reason: Error?
  63. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  64. self.backoffIterator = state.backoffIterator
  65. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  66. self.scheduled = scheduled
  67. self.reason = reason
  68. }
  69. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  70. self.backoffIterator = state.backoffIterator
  71. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  72. self.scheduled = scheduled
  73. self.reason = state.error
  74. }
  75. init(
  76. from state: ReadyState,
  77. scheduled: Scheduled<Void>,
  78. backoffIterator: ConnectionBackoffIterator?
  79. ) {
  80. self.backoffIterator = backoffIterator
  81. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  82. self.scheduled = scheduled
  83. self.reason = state.error
  84. }
  85. }
  86. internal struct ShutdownState {
  87. var closeFuture: EventLoopFuture<Void>
  88. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  89. /// this error.
  90. var reason: Error
  91. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  92. self.closeFuture = closeFuture
  93. self.reason = reason
  94. }
  95. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  96. return ShutdownState(
  97. closeFuture: closeFuture,
  98. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  99. )
  100. }
  101. }
  102. internal enum State {
  103. /// No `Channel` is required.
  104. ///
  105. /// Valid next states:
  106. /// - `connecting`
  107. /// - `shutdown`
  108. case idle
  109. /// We're actively trying to establish a connection.
  110. ///
  111. /// Valid next states:
  112. /// - `active`
  113. /// - `transientFailure` (if our attempt fails and we're going to try again)
  114. /// - `shutdown`
  115. case connecting(ConnectingState)
  116. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  117. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  118. ///
  119. /// Valid next states:
  120. /// - `ready`
  121. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  122. /// to re-establish the connection)
  123. /// - `shutdown`
  124. case active(ConnectedState)
  125. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  126. /// the channel for making RPCs.
  127. ///
  128. /// Valid next states:
  129. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  130. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  131. /// - `shutdown`
  132. case ready(ReadyState)
  133. /// A `Channel` is desired, we'll attempt to create one in the future.
  134. ///
  135. /// Valid next states:
  136. /// - `connecting`
  137. /// - `shutdown`
  138. case transientFailure(TransientFailureState)
  139. /// We never want another `Channel`: this state is terminal.
  140. case shutdown(ShutdownState)
  141. fileprivate var label: String {
  142. switch self {
  143. case .idle:
  144. return "idle"
  145. case .connecting:
  146. return "connecting"
  147. case .active:
  148. return "active"
  149. case .ready:
  150. return "ready"
  151. case .transientFailure:
  152. return "transientFailure"
  153. case .shutdown:
  154. return "shutdown"
  155. }
  156. }
  157. }
  158. /// The last 'external' state we are in, a subset of the internal state.
  159. private var externalState: ConnectivityState = .idle
  160. /// Update the external state, potentially notifying a delegate about the change.
  161. private func updateExternalState(to nextState: ConnectivityState) {
  162. if self.externalState != nextState {
  163. let oldState = self.externalState
  164. self.externalState = nextState
  165. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  166. }
  167. }
  168. /// Our current state.
  169. private var state: State {
  170. didSet {
  171. switch self.state {
  172. case .idle:
  173. self.updateExternalState(to: .idle)
  174. self.updateConnectionID()
  175. case .connecting:
  176. self.updateExternalState(to: .connecting)
  177. // This is an internal state.
  178. case .active:
  179. ()
  180. case .ready:
  181. self.updateExternalState(to: .ready)
  182. case .transientFailure:
  183. self.updateExternalState(to: .transientFailure)
  184. self.updateConnectionID()
  185. case .shutdown:
  186. self.updateExternalState(to: .shutdown)
  187. }
  188. }
  189. }
  190. /// Returns whether the state is 'idle'.
  191. private var isIdle: Bool {
  192. self.eventLoop.assertInEventLoop()
  193. switch self.state {
  194. case .idle:
  195. return true
  196. case .connecting, .transientFailure, .active, .ready, .shutdown:
  197. return false
  198. }
  199. }
  200. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  201. private var multiplexer: HTTP2StreamMultiplexer? {
  202. self.eventLoop.assertInEventLoop()
  203. switch self.state {
  204. case let .ready(state):
  205. return state.multiplexer
  206. case .idle, .connecting, .transientFailure, .active, .shutdown:
  207. return nil
  208. }
  209. }
  210. /// The `EventLoop` that the managed connection will run on.
  211. internal let eventLoop: EventLoop
  212. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  213. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  214. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  215. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  216. /// An `EventLoopFuture<Channel>` provider.
  217. private let channelProvider: ConnectionManagerChannelProvider
  218. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  219. /// multiplexer.
  220. private let callStartBehavior: CallStartBehavior.Behavior
  221. /// The configuration to use when backing off between connection attempts, if reconnection
  222. /// attempts should be made at all.
  223. private let connectionBackoff: ConnectionBackoff?
  224. /// A logger.
  225. internal var logger: Logger
  226. private let connectionID: String
  227. private var channelNumber: UInt64
  228. private var channelNumberLock = Lock()
  229. private var _connectionIDAndNumber: String {
  230. return "\(self.connectionID)/\(self.channelNumber)"
  231. }
  232. private var connectionIDAndNumber: String {
  233. return self.channelNumberLock.withLock {
  234. return self._connectionIDAndNumber
  235. }
  236. }
  237. private func updateConnectionID() {
  238. self.channelNumberLock.withLockVoid {
  239. self.channelNumber &+= 1
  240. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  241. }
  242. }
  243. internal func appendMetadata(to logger: inout Logger) {
  244. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  245. }
  246. internal convenience init(
  247. configuration: ClientConnection.Configuration,
  248. channelProvider: ConnectionManagerChannelProvider? = nil,
  249. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  250. logger: Logger
  251. ) {
  252. self.init(
  253. eventLoop: configuration.eventLoopGroup.next(),
  254. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  255. callStartBehavior: configuration.callStartBehavior.wrapped,
  256. connectionBackoff: configuration.connectionBackoff,
  257. connectivityDelegate: connectivityDelegate,
  258. http2Delegate: nil,
  259. logger: logger
  260. )
  261. }
  262. internal init(
  263. eventLoop: EventLoop,
  264. channelProvider: ConnectionManagerChannelProvider,
  265. callStartBehavior: CallStartBehavior.Behavior,
  266. connectionBackoff: ConnectionBackoff?,
  267. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  268. http2Delegate: ConnectionManagerHTTP2Delegate?,
  269. logger: Logger
  270. ) {
  271. // Setup the logger.
  272. var logger = logger
  273. let connectionID = UUID().uuidString
  274. let channelNumber: UInt64 = 0
  275. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  276. self.eventLoop = eventLoop
  277. self.state = .idle
  278. self.channelProvider = channelProvider
  279. self.callStartBehavior = callStartBehavior
  280. self.connectionBackoff = connectionBackoff
  281. self.connectivityDelegate = connectivityDelegate
  282. self.http2Delegate = http2Delegate
  283. self.connectionID = connectionID
  284. self.channelNumber = channelNumber
  285. self.logger = logger
  286. }
  287. /// Get the multiplexer from the underlying channel handling gRPC calls.
  288. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  289. /// one chance to connect - if not reconnections are managed here.
  290. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  291. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  292. switch self.callStartBehavior {
  293. case .waitsForConnectivity:
  294. return self.getHTTP2MultiplexerPatient()
  295. case .fastFailure:
  296. return self.getHTTP2MultiplexerOptimistic()
  297. }
  298. }
  299. if self.eventLoop.inEventLoop {
  300. return getHTTP2Multiplexer0()
  301. } else {
  302. return self.eventLoop.flatSubmit {
  303. getHTTP2Multiplexer0()
  304. }
  305. }
  306. }
  307. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  308. /// Reconnects are handled if necessary.
  309. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  310. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  311. switch self.state {
  312. case .idle:
  313. self.startConnecting()
  314. // We started connecting so we must transition to the `connecting` state.
  315. guard case let .connecting(connecting) = self.state else {
  316. self.invalidState()
  317. }
  318. multiplexer = connecting.readyChannelMuxPromise.futureResult
  319. case let .connecting(state):
  320. multiplexer = state.readyChannelMuxPromise.futureResult
  321. case let .active(state):
  322. multiplexer = state.readyChannelMuxPromise.futureResult
  323. case let .ready(state):
  324. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  325. case let .transientFailure(state):
  326. multiplexer = state.readyChannelMuxPromise.futureResult
  327. case let .shutdown(state):
  328. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  329. }
  330. self.logger.debug("vending multiplexer future", metadata: [
  331. "connectivity_state": "\(self.state.label)",
  332. ])
  333. return multiplexer
  334. }
  335. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  336. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  337. ///
  338. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  339. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  340. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  341. self.eventLoop.preconditionInEventLoop()
  342. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  343. switch self.state {
  344. case .idle:
  345. self.startConnecting()
  346. // We started connecting so we must transition to the `connecting` state.
  347. guard case let .connecting(connecting) = self.state else {
  348. self.invalidState()
  349. }
  350. return connecting.candidateMuxPromise.futureResult
  351. case let .connecting(state):
  352. return state.candidateMuxPromise.futureResult
  353. case let .active(active):
  354. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  355. case let .ready(ready):
  356. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  357. case let .transientFailure(state):
  358. // Provide the reason we failed transiently, if we can.
  359. let error = state.reason ?? GRPCStatus(
  360. code: .unavailable,
  361. message: "Connection multiplexer requested while backing off"
  362. )
  363. return self.eventLoop.makeFailedFuture(error)
  364. case let .shutdown(state):
  365. return self.eventLoop.makeFailedFuture(state.reason)
  366. }
  367. }()
  368. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  369. "connectivity_state": "\(self.state.label)",
  370. ])
  371. return muxFuture
  372. }
  373. /// Shutdown any connection which exists. This is a request from the application.
  374. internal func shutdown() -> EventLoopFuture<Void> {
  375. if self.eventLoop.inEventLoop {
  376. return self._shutdown()
  377. } else {
  378. return self.eventLoop.flatSubmit {
  379. return self._shutdown()
  380. }
  381. }
  382. }
  383. private func _shutdown() -> EventLoopFuture<Void> {
  384. self.logger.debug("shutting down connection", metadata: [
  385. "connectivity_state": "\(self.state.label)",
  386. ])
  387. let shutdown: ShutdownState
  388. switch self.state {
  389. // We don't have a channel and we don't want one, easy!
  390. case .idle:
  391. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  392. self.state = .shutdown(shutdown)
  393. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  394. // the shutdown future and deal with any fallout from the connecting channel without the
  395. // application knowing.
  396. case let .connecting(state):
  397. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  398. self.state = .shutdown(shutdown)
  399. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  400. // connect the application shouldn't have access to the channel or multiplexer.
  401. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  402. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  403. // In case we do successfully connect, close immediately.
  404. state.candidate.whenSuccess {
  405. $0.close(mode: .all, promise: nil)
  406. }
  407. // We have an active channel but the application doesn't know about it yet. We'll do the same
  408. // as for `.connecting`.
  409. case let .active(state):
  410. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  411. self.state = .shutdown(shutdown)
  412. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  413. // connect the application shouldn't have access to the channel or multiplexer.
  414. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  415. // We have a channel, close it.
  416. state.candidate.close(mode: .all, promise: nil)
  417. // The channel is up and running: the application could be using it. We can close it and
  418. // return the `closeFuture`.
  419. case let .ready(state):
  420. shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
  421. self.state = .shutdown(shutdown)
  422. // We have a channel, close it.
  423. state.channel.close(mode: .all, promise: nil)
  424. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  425. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  426. // if we cancelled too late.
  427. case let .transientFailure(state):
  428. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  429. self.state = .shutdown(shutdown)
  430. // Stop the creation of a new channel, if we can. If we can't then the task to
  431. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  432. state.scheduled.cancel()
  433. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  434. // connect the application shouldn't should have access to the channel.
  435. state.readyChannelMuxPromise.fail(shutdown.reason)
  436. // We're already shutdown; nothing to do.
  437. case let .shutdown(state):
  438. shutdown = state
  439. }
  440. return shutdown.closeFuture
  441. }
  442. // MARK: - State changes from the channel handler.
  443. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  444. /// some context.
  445. internal func channelError(_ error: Error) {
  446. self.eventLoop.preconditionInEventLoop()
  447. switch self.state {
  448. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  449. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  450. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  451. case .idle:
  452. self.logger.warning("ignoring unexpected error in idle", metadata: [
  453. MetadataKey.error: "\(error)",
  454. ])
  455. case .connecting:
  456. self.invalidState()
  457. case var .active(state):
  458. state.error = error
  459. self.state = .active(state)
  460. case var .ready(state):
  461. state.error = error
  462. self.state = .ready(state)
  463. // If we've already in one of these states, then additional errors aren't helpful to us.
  464. case .transientFailure, .shutdown:
  465. ()
  466. }
  467. }
  468. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  469. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  470. self.eventLoop.preconditionInEventLoop()
  471. self.logger.debug("activating connection", metadata: [
  472. "connectivity_state": "\(self.state.label)",
  473. ])
  474. switch self.state {
  475. case let .connecting(connecting):
  476. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  477. self.state = .active(connected)
  478. // Optimistic connections are happy this this level of setup.
  479. connecting.candidateMuxPromise.succeed(multiplexer)
  480. // Application called shutdown before the channel become active; we should close it.
  481. case .shutdown:
  482. channel.close(mode: .all, promise: nil)
  483. // These cases are purposefully separated: some crash reporting services provide stack traces
  484. // which don't include the precondition failure message (which contain the invalid state we were
  485. // in). Keeping the cases separate allows us work out the state from the line number.
  486. case .idle:
  487. self.invalidState()
  488. case .active:
  489. self.invalidState()
  490. case .ready:
  491. self.invalidState()
  492. case .transientFailure:
  493. self.invalidState()
  494. }
  495. }
  496. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  497. /// Must be called on the `EventLoop`.
  498. internal func channelInactive() {
  499. self.eventLoop.preconditionInEventLoop()
  500. self.logger.debug("deactivating connection", metadata: [
  501. "connectivity_state": "\(self.state.label)",
  502. ])
  503. switch self.state {
  504. // The channel is `active` but not `ready`. Should we try again?
  505. case let .active(active):
  506. let error = GRPCStatus(
  507. code: .unavailable,
  508. message: "The connection was dropped and connection re-establishment is disabled"
  509. )
  510. switch active.reconnect {
  511. // No, shutdown instead.
  512. case .none:
  513. self.logger.debug("shutting down connection")
  514. let shutdownState = ShutdownState(
  515. closeFuture: self.eventLoop.makeSucceededFuture(()),
  516. reason: error
  517. )
  518. self.state = .shutdown(shutdownState)
  519. active.readyChannelMuxPromise.fail(error)
  520. // Yes, after some time.
  521. case let .after(delay):
  522. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  523. self.startConnecting()
  524. }
  525. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  526. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  527. }
  528. // The channel was ready and working fine but something went wrong. Should we try to replace
  529. // the channel?
  530. case let .ready(ready):
  531. // No, no backoff is configured.
  532. if self.connectionBackoff == nil {
  533. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  534. self.state = .shutdown(
  535. ShutdownState(
  536. closeFuture: ready.channel.closeFuture,
  537. reason: GRPCStatus(
  538. code: .unavailable,
  539. message: "The connection was dropped and a reconnect was not configured"
  540. )
  541. )
  542. )
  543. } else {
  544. // Yes, start connecting now. We should go via `transientFailure`, however.
  545. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  546. self.startConnecting()
  547. }
  548. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  549. let backoffIterator = self.connectionBackoff?.makeIterator()
  550. self.state = .transientFailure(TransientFailureState(
  551. from: ready,
  552. scheduled: scheduled,
  553. backoffIterator: backoffIterator
  554. ))
  555. }
  556. // This is fine: we expect the channel to become inactive after becoming idle.
  557. case .idle:
  558. ()
  559. // We're already shutdown, that's fine.
  560. case .shutdown:
  561. ()
  562. // These cases are purposefully separated: some crash reporting services provide stack traces
  563. // which don't include the precondition failure message (which contain the invalid state we were
  564. // in). Keeping the cases separate allows us work out the state from the line number.
  565. case .connecting:
  566. self.invalidState()
  567. case .transientFailure:
  568. self.invalidState()
  569. }
  570. }
  571. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  572. /// called on the `EventLoop`.
  573. internal func ready() {
  574. self.eventLoop.preconditionInEventLoop()
  575. self.logger.debug("connection ready", metadata: [
  576. "connectivity_state": "\(self.state.label)",
  577. ])
  578. switch self.state {
  579. case let .active(connected):
  580. self.state = .ready(ReadyState(from: connected))
  581. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  582. case .shutdown:
  583. ()
  584. // These cases are purposefully separated: some crash reporting services provide stack traces
  585. // which don't include the precondition failure message (which contain the invalid state we were
  586. // in). Keeping the cases separate allows us work out the state from the line number.
  587. case .idle:
  588. self.invalidState()
  589. case .transientFailure:
  590. self.invalidState()
  591. case .connecting:
  592. self.invalidState()
  593. case .ready:
  594. self.invalidState()
  595. }
  596. }
  597. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  598. /// the `EventLoop`.
  599. internal func idle() {
  600. self.eventLoop.preconditionInEventLoop()
  601. self.logger.debug("idling connection", metadata: [
  602. "connectivity_state": "\(self.state.label)",
  603. ])
  604. switch self.state {
  605. case let .active(state):
  606. // This state is reachable if the keepalive timer fires before we reach the ready state.
  607. self.state = .idle
  608. state.readyChannelMuxPromise
  609. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  610. case .ready:
  611. self.state = .idle
  612. case .shutdown:
  613. // This is expected when the connection is closed by the user: when the channel becomes
  614. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  615. // 'channelInactive()'.
  616. ()
  617. // These cases are purposefully separated: some crash reporting services provide stack traces
  618. // which don't include the precondition failure message (which contain the invalid state we were
  619. // in). Keeping the cases separate allows us work out the state from the line number.
  620. case .idle:
  621. self.invalidState()
  622. case .connecting:
  623. self.invalidState()
  624. case .transientFailure:
  625. self.invalidState()
  626. }
  627. }
  628. internal func streamClosed() {
  629. self.eventLoop.assertInEventLoop()
  630. self.http2Delegate?.streamClosed(self)
  631. }
  632. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  633. self.eventLoop.assertInEventLoop()
  634. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  635. self, maxConcurrentStreams: maxConcurrentStreams
  636. )
  637. }
  638. /// The connection has started quiescing: notify the connectivity monitor of this.
  639. internal func beginQuiescing() {
  640. self.eventLoop.assertInEventLoop()
  641. self.connectivityDelegate?.connectionIsQuiescing(self)
  642. }
  643. }
  644. extension ConnectionManager {
  645. // A connection attempt failed; we never established a connection.
  646. private func connectionFailed(withError error: Error) {
  647. self.eventLoop.preconditionInEventLoop()
  648. switch self.state {
  649. case let .connecting(connecting):
  650. // Should we reconnect?
  651. switch connecting.reconnect {
  652. // No, shutdown.
  653. case .none:
  654. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  655. self.state = .shutdown(
  656. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  657. )
  658. connecting.readyChannelMuxPromise.fail(error)
  659. connecting.candidateMuxPromise.fail(error)
  660. // Yes, after a delay.
  661. case let .after(delay):
  662. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  663. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  664. self.startConnecting()
  665. }
  666. self.state = .transientFailure(
  667. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  668. )
  669. // Candidate mux users are not willing to wait.
  670. connecting.candidateMuxPromise.fail(error)
  671. }
  672. // The application must have called shutdown while we were trying to establish a connection
  673. // which was doomed to fail anyway. That's fine, we can ignore this.
  674. case .shutdown:
  675. ()
  676. // We can't fail to connect if we aren't trying.
  677. //
  678. // These cases are purposefully separated: some crash reporting services provide stack traces
  679. // which don't include the precondition failure message (which contain the invalid state we were
  680. // in). Keeping the cases separate allows us work out the state from the line number.
  681. case .idle:
  682. self.invalidState()
  683. case .active:
  684. self.invalidState()
  685. case .ready:
  686. self.invalidState()
  687. case .transientFailure:
  688. self.invalidState()
  689. }
  690. }
  691. }
  692. extension ConnectionManager {
  693. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  694. // states. Must be called on the `EventLoop`.
  695. private func startConnecting() {
  696. self.eventLoop.assertInEventLoop()
  697. switch self.state {
  698. case .idle:
  699. let iterator = self.connectionBackoff?.makeIterator()
  700. self.startConnecting(
  701. backoffIterator: iterator,
  702. muxPromise: self.eventLoop.makePromise()
  703. )
  704. case let .transientFailure(pending):
  705. self.startConnecting(
  706. backoffIterator: pending.backoffIterator,
  707. muxPromise: pending.readyChannelMuxPromise
  708. )
  709. // We shutdown before a scheduled connection attempt had started.
  710. case .shutdown:
  711. ()
  712. // These cases are purposefully separated: some crash reporting services provide stack traces
  713. // which don't include the precondition failure message (which contain the invalid state we were
  714. // in). Keeping the cases separate allows us work out the state from the line number.
  715. case .connecting:
  716. self.invalidState()
  717. case .active:
  718. self.invalidState()
  719. case .ready:
  720. self.invalidState()
  721. }
  722. }
  723. private func startConnecting(
  724. backoffIterator: ConnectionBackoffIterator?,
  725. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  726. ) {
  727. let timeoutAndBackoff = backoffIterator?.next()
  728. // We're already on the event loop: submit the connect so it starts after we've made the
  729. // state change to `.connecting`.
  730. self.eventLoop.assertInEventLoop()
  731. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  732. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  733. managedBy: self,
  734. onEventLoop: self.eventLoop,
  735. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  736. logger: self.logger
  737. )
  738. channel.whenFailure { error in
  739. self.connectionFailed(withError: error)
  740. }
  741. return channel
  742. }
  743. // Should we reconnect if the candidate channel fails?
  744. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  745. let connecting = ConnectingState(
  746. backoffIterator: backoffIterator,
  747. reconnect: reconnect,
  748. candidate: candidate,
  749. readyChannelMuxPromise: muxPromise,
  750. candidateMuxPromise: self.eventLoop.makePromise()
  751. )
  752. self.state = .connecting(connecting)
  753. }
  754. }
  755. extension ConnectionManager {
  756. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  757. /// executing on the same `EventLoop` as the connection manager.
  758. internal var sync: Sync {
  759. return Sync(self)
  760. }
  761. internal struct Sync {
  762. private let manager: ConnectionManager
  763. fileprivate init(_ manager: ConnectionManager) {
  764. self.manager = manager
  765. }
  766. /// A delegate for connectivity changes.
  767. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  768. get {
  769. self.manager.eventLoop.assertInEventLoop()
  770. return self.manager.connectivityDelegate
  771. }
  772. nonmutating set {
  773. self.manager.eventLoop.assertInEventLoop()
  774. self.manager.connectivityDelegate = newValue
  775. }
  776. }
  777. /// A delegate for HTTP/2 connection changes.
  778. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  779. get {
  780. self.manager.eventLoop.assertInEventLoop()
  781. return self.manager.http2Delegate
  782. }
  783. nonmutating set {
  784. self.manager.eventLoop.assertInEventLoop()
  785. self.manager.http2Delegate = newValue
  786. }
  787. }
  788. /// Returns `true` if the connection is in the idle state.
  789. internal var isIdle: Bool {
  790. return self.manager.isIdle
  791. }
  792. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  793. /// other state.
  794. internal var multiplexer: HTTP2StreamMultiplexer? {
  795. return self.manager.multiplexer
  796. }
  797. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  798. internal func startConnecting() {
  799. self.manager.startConnecting()
  800. }
  801. }
  802. }
  803. extension ConnectionManager {
  804. private func invalidState(
  805. function: StaticString = #function,
  806. file: StaticString = #file,
  807. line: UInt = #line
  808. ) -> Never {
  809. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  810. }
  811. }