ConnectionManager.swift 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. @usableFromInline
  22. internal final class ConnectionManager {
  23. internal enum Reconnect {
  24. case none
  25. case after(TimeInterval)
  26. }
  27. internal struct ConnectingState {
  28. var backoffIterator: ConnectionBackoffIterator?
  29. var reconnect: Reconnect
  30. var candidate: EventLoopFuture<Channel>
  31. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  32. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  33. }
  34. internal struct ConnectedState {
  35. var backoffIterator: ConnectionBackoffIterator?
  36. var reconnect: Reconnect
  37. var candidate: Channel
  38. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  39. var multiplexer: HTTP2StreamMultiplexer
  40. var error: Error?
  41. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  42. self.backoffIterator = state.backoffIterator
  43. self.reconnect = state.reconnect
  44. self.candidate = candidate
  45. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  46. self.multiplexer = multiplexer
  47. }
  48. }
  49. internal struct ReadyState {
  50. var channel: Channel
  51. var multiplexer: HTTP2StreamMultiplexer
  52. var error: Error?
  53. init(from state: ConnectedState) {
  54. self.channel = state.candidate
  55. self.multiplexer = state.multiplexer
  56. }
  57. }
  58. internal struct TransientFailureState {
  59. var backoffIterator: ConnectionBackoffIterator?
  60. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  61. var scheduled: Scheduled<Void>
  62. var reason: Error
  63. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  64. self.backoffIterator = state.backoffIterator
  65. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  66. self.scheduled = scheduled
  67. self.reason = reason
  68. }
  69. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  70. self.backoffIterator = state.backoffIterator
  71. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  72. self.scheduled = scheduled
  73. self.reason = state.error ?? GRPCStatus(
  74. code: .unavailable,
  75. message: "Unexpected connection drop"
  76. )
  77. }
  78. init(
  79. from state: ReadyState,
  80. scheduled: Scheduled<Void>,
  81. backoffIterator: ConnectionBackoffIterator?
  82. ) {
  83. self.backoffIterator = backoffIterator
  84. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  85. self.scheduled = scheduled
  86. self.reason = state.error ?? GRPCStatus(
  87. code: .unavailable,
  88. message: "Unexpected connection drop"
  89. )
  90. }
  91. }
  92. internal struct ShutdownState {
  93. var closeFuture: EventLoopFuture<Void>
  94. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  95. /// this error.
  96. var reason: Error
  97. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  98. self.closeFuture = closeFuture
  99. self.reason = reason
  100. }
  101. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  102. return ShutdownState(
  103. closeFuture: closeFuture,
  104. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  105. )
  106. }
  107. }
  108. internal enum State {
  109. /// No `Channel` is required.
  110. ///
  111. /// Valid next states:
  112. /// - `connecting`
  113. /// - `shutdown`
  114. case idle(lastError: Error?)
  115. /// We're actively trying to establish a connection.
  116. ///
  117. /// Valid next states:
  118. /// - `active`
  119. /// - `transientFailure` (if our attempt fails and we're going to try again)
  120. /// - `shutdown`
  121. case connecting(ConnectingState)
  122. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  123. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  124. ///
  125. /// Valid next states:
  126. /// - `ready`
  127. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  128. /// to re-establish the connection)
  129. /// - `shutdown`
  130. case active(ConnectedState)
  131. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  132. /// the channel for making RPCs.
  133. ///
  134. /// Valid next states:
  135. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  136. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  137. /// - `shutdown`
  138. case ready(ReadyState)
  139. /// A `Channel` is desired, we'll attempt to create one in the future.
  140. ///
  141. /// Valid next states:
  142. /// - `connecting`
  143. /// - `shutdown`
  144. case transientFailure(TransientFailureState)
  145. /// We never want another `Channel`: this state is terminal.
  146. case shutdown(ShutdownState)
  147. fileprivate var label: String {
  148. switch self {
  149. case .idle:
  150. return "idle"
  151. case .connecting:
  152. return "connecting"
  153. case .active:
  154. return "active"
  155. case .ready:
  156. return "ready"
  157. case .transientFailure:
  158. return "transientFailure"
  159. case .shutdown:
  160. return "shutdown"
  161. }
  162. }
  163. }
  164. /// The last 'external' state we are in, a subset of the internal state.
  165. private var externalState: _ConnectivityState = .idle(nil)
  166. /// Update the external state, potentially notifying a delegate about the change.
  167. private func updateExternalState(to nextState: _ConnectivityState) {
  168. if !self.externalState.isSameState(as: nextState) {
  169. let oldState = self.externalState
  170. self.externalState = nextState
  171. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  172. }
  173. }
  174. /// Our current state.
  175. private var state: State {
  176. didSet {
  177. switch self.state {
  178. case let .idle(error):
  179. self.updateExternalState(to: .idle(error))
  180. self.updateConnectionID()
  181. case .connecting:
  182. self.updateExternalState(to: .connecting)
  183. // This is an internal state.
  184. case .active:
  185. ()
  186. case .ready:
  187. self.updateExternalState(to: .ready)
  188. case let .transientFailure(state):
  189. self.updateExternalState(to: .transientFailure(state.reason))
  190. self.updateConnectionID()
  191. case .shutdown:
  192. self.updateExternalState(to: .shutdown)
  193. }
  194. }
  195. }
  196. /// Returns whether the state is 'idle'.
  197. private var isIdle: Bool {
  198. self.eventLoop.assertInEventLoop()
  199. switch self.state {
  200. case .idle:
  201. return true
  202. case .connecting, .transientFailure, .active, .ready, .shutdown:
  203. return false
  204. }
  205. }
  206. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  207. private var multiplexer: HTTP2StreamMultiplexer? {
  208. self.eventLoop.assertInEventLoop()
  209. switch self.state {
  210. case let .ready(state):
  211. return state.multiplexer
  212. case .idle, .connecting, .transientFailure, .active, .shutdown:
  213. return nil
  214. }
  215. }
  216. /// The `EventLoop` that the managed connection will run on.
  217. internal let eventLoop: EventLoop
  218. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  219. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  220. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  221. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  222. /// An `EventLoopFuture<Channel>` provider.
  223. private let channelProvider: ConnectionManagerChannelProvider
  224. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  225. /// multiplexer.
  226. private let callStartBehavior: CallStartBehavior.Behavior
  227. /// The configuration to use when backing off between connection attempts, if reconnection
  228. /// attempts should be made at all.
  229. private let connectionBackoff: ConnectionBackoff?
  230. /// A logger.
  231. internal var logger: Logger
  232. private let connectionID: String
  233. private var channelNumber: UInt64
  234. private var channelNumberLock = Lock()
  235. private var _connectionIDAndNumber: String {
  236. return "\(self.connectionID)/\(self.channelNumber)"
  237. }
  238. private var connectionIDAndNumber: String {
  239. return self.channelNumberLock.withLock {
  240. return self._connectionIDAndNumber
  241. }
  242. }
  243. private func updateConnectionID() {
  244. self.channelNumberLock.withLockVoid {
  245. self.channelNumber &+= 1
  246. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  247. }
  248. }
  249. internal func appendMetadata(to logger: inout Logger) {
  250. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  251. }
  252. internal convenience init(
  253. configuration: ClientConnection.Configuration,
  254. channelProvider: ConnectionManagerChannelProvider? = nil,
  255. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  256. logger: Logger
  257. ) {
  258. self.init(
  259. eventLoop: configuration.eventLoopGroup.next(),
  260. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  261. callStartBehavior: configuration.callStartBehavior.wrapped,
  262. connectionBackoff: configuration.connectionBackoff,
  263. connectivityDelegate: connectivityDelegate,
  264. http2Delegate: nil,
  265. logger: logger
  266. )
  267. }
  268. internal init(
  269. eventLoop: EventLoop,
  270. channelProvider: ConnectionManagerChannelProvider,
  271. callStartBehavior: CallStartBehavior.Behavior,
  272. connectionBackoff: ConnectionBackoff?,
  273. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  274. http2Delegate: ConnectionManagerHTTP2Delegate?,
  275. logger: Logger
  276. ) {
  277. // Setup the logger.
  278. var logger = logger
  279. let connectionID = UUID().uuidString
  280. let channelNumber: UInt64 = 0
  281. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  282. self.eventLoop = eventLoop
  283. self.state = .idle(lastError: nil)
  284. self.channelProvider = channelProvider
  285. self.callStartBehavior = callStartBehavior
  286. self.connectionBackoff = connectionBackoff
  287. self.connectivityDelegate = connectivityDelegate
  288. self.http2Delegate = http2Delegate
  289. self.connectionID = connectionID
  290. self.channelNumber = channelNumber
  291. self.logger = logger
  292. }
  293. /// Get the multiplexer from the underlying channel handling gRPC calls.
  294. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  295. /// one chance to connect - if not reconnections are managed here.
  296. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  297. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  298. switch self.callStartBehavior {
  299. case .waitsForConnectivity:
  300. return self.getHTTP2MultiplexerPatient()
  301. case .fastFailure:
  302. return self.getHTTP2MultiplexerOptimistic()
  303. }
  304. }
  305. if self.eventLoop.inEventLoop {
  306. return getHTTP2Multiplexer0()
  307. } else {
  308. return self.eventLoop.flatSubmit {
  309. getHTTP2Multiplexer0()
  310. }
  311. }
  312. }
  313. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  314. /// Reconnects are handled if necessary.
  315. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  316. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  317. switch self.state {
  318. case .idle:
  319. self.startConnecting()
  320. // We started connecting so we must transition to the `connecting` state.
  321. guard case let .connecting(connecting) = self.state else {
  322. self.invalidState()
  323. }
  324. multiplexer = connecting.readyChannelMuxPromise.futureResult
  325. case let .connecting(state):
  326. multiplexer = state.readyChannelMuxPromise.futureResult
  327. case let .active(state):
  328. multiplexer = state.readyChannelMuxPromise.futureResult
  329. case let .ready(state):
  330. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  331. case let .transientFailure(state):
  332. multiplexer = state.readyChannelMuxPromise.futureResult
  333. case let .shutdown(state):
  334. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  335. }
  336. self.logger.debug("vending multiplexer future", metadata: [
  337. "connectivity_state": "\(self.state.label)",
  338. ])
  339. return multiplexer
  340. }
  341. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  342. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  343. ///
  344. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  345. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  346. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  347. self.eventLoop.preconditionInEventLoop()
  348. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  349. switch self.state {
  350. case .idle:
  351. self.startConnecting()
  352. // We started connecting so we must transition to the `connecting` state.
  353. guard case let .connecting(connecting) = self.state else {
  354. self.invalidState()
  355. }
  356. return connecting.candidateMuxPromise.futureResult
  357. case let .connecting(state):
  358. return state.candidateMuxPromise.futureResult
  359. case let .active(active):
  360. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  361. case let .ready(ready):
  362. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  363. case let .transientFailure(state):
  364. return self.eventLoop.makeFailedFuture(state.reason)
  365. case let .shutdown(state):
  366. return self.eventLoop.makeFailedFuture(state.reason)
  367. }
  368. }()
  369. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  370. "connectivity_state": "\(self.state.label)",
  371. ])
  372. return muxFuture
  373. }
  374. @usableFromInline
  375. internal enum ShutdownMode {
  376. /// Closes the underlying channel without waiting for existing RPCs to complete.
  377. case forceful
  378. /// Allows running RPCs to run their course before closing the underlying channel. No new
  379. /// streams may be created.
  380. case graceful(NIODeadline)
  381. }
  382. /// Shutdown the underlying connection.
  383. ///
  384. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  385. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  386. let promise = self.eventLoop.makePromise(of: Void.self)
  387. self.shutdown(mode: mode, promise: promise)
  388. return promise.futureResult
  389. }
  390. /// Shutdown the underlying connection.
  391. ///
  392. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  393. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  394. if self.eventLoop.inEventLoop {
  395. self._shutdown(mode: mode, promise: promise)
  396. } else {
  397. self.eventLoop.execute {
  398. self._shutdown(mode: mode, promise: promise)
  399. }
  400. }
  401. }
  402. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  403. self.logger.debug("shutting down connection", metadata: [
  404. "connectivity_state": "\(self.state.label)",
  405. "shutdown.mode": "\(mode)",
  406. ])
  407. switch self.state {
  408. // We don't have a channel and we don't want one, easy!
  409. case .idle:
  410. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  411. self.state = .shutdown(shutdown)
  412. promise.succeed(())
  413. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  414. // the shutdown future and deal with any fallout from the connecting channel without the
  415. // application knowing.
  416. case let .connecting(state):
  417. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  418. self.state = .shutdown(shutdown)
  419. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  420. // connect the application shouldn't have access to the channel or multiplexer.
  421. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  422. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  423. // Complete the shutdown promise when the connection attempt has completed.
  424. state.candidate.whenComplete {
  425. switch $0 {
  426. case let .success(channel):
  427. // In case we do successfully connect, close immediately.
  428. channel.close(mode: .all, promise: nil)
  429. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  430. case .failure:
  431. // We failed to connect, that's fine we still shutdown successfully.
  432. promise.succeed(())
  433. }
  434. }
  435. // We have an active channel but the application doesn't know about it yet. We'll do the same
  436. // as for `.connecting`.
  437. case let .active(state):
  438. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  439. self.state = .shutdown(shutdown)
  440. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  441. // connect the application shouldn't have access to the channel or multiplexer.
  442. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  443. // We have a channel, close it. We only create streams in the ready state so there's no need
  444. // to quiesce here.
  445. state.candidate.close(mode: .all, promise: nil)
  446. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  447. // The channel is up and running: the application could be using it. We can close it and
  448. // return the `closeFuture`.
  449. case let .ready(state):
  450. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  451. self.state = .shutdown(shutdown)
  452. switch mode {
  453. case .forceful:
  454. // We have a channel, close it.
  455. state.channel.close(mode: .all, promise: nil)
  456. case let .graceful(deadline):
  457. // If we don't close by the deadline forcibly close the channel.
  458. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  459. self.logger.info("shutdown timer expired, forcibly closing connection")
  460. state.channel.close(mode: .all, promise: nil)
  461. }
  462. // Cancel the force close if we close normally first.
  463. state.channel.closeFuture.whenComplete { _ in
  464. scheduledForceClose.cancel()
  465. }
  466. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  467. // the channel when all streams have been closed.
  468. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  469. }
  470. // Complete the promise when we eventually close.
  471. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  472. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  473. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  474. // if we cancelled too late.
  475. case let .transientFailure(state):
  476. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  477. self.state = .shutdown(shutdown)
  478. // Stop the creation of a new channel, if we can. If we can't then the task to
  479. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  480. state.scheduled.cancel()
  481. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  482. // connect the application shouldn't should have access to the channel.
  483. state.readyChannelMuxPromise.fail(shutdown.reason)
  484. // No active channel, so complete the shutdown promise now.
  485. promise.succeed(())
  486. // We're already shutdown; there's nothing to do.
  487. case let .shutdown(state):
  488. promise.completeWith(state.closeFuture)
  489. }
  490. }
  491. // MARK: - State changes from the channel handler.
  492. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  493. /// some context.
  494. internal func channelError(_ error: Error) {
  495. self.eventLoop.preconditionInEventLoop()
  496. switch self.state {
  497. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  498. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  499. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  500. case .idle:
  501. self.logger.warning("ignoring unexpected error in idle", metadata: [
  502. MetadataKey.error: "\(error)",
  503. ])
  504. case .connecting:
  505. self.connectionFailed(withError: error)
  506. case var .active(state):
  507. state.error = error
  508. self.state = .active(state)
  509. case var .ready(state):
  510. state.error = error
  511. self.state = .ready(state)
  512. // If we've already in one of these states, then additional errors aren't helpful to us.
  513. case .transientFailure, .shutdown:
  514. ()
  515. }
  516. }
  517. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  518. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  519. self.eventLoop.preconditionInEventLoop()
  520. self.logger.debug("activating connection", metadata: [
  521. "connectivity_state": "\(self.state.label)",
  522. ])
  523. switch self.state {
  524. case let .connecting(connecting):
  525. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  526. self.state = .active(connected)
  527. // Optimistic connections are happy this this level of setup.
  528. connecting.candidateMuxPromise.succeed(multiplexer)
  529. // Application called shutdown before the channel become active; we should close it.
  530. case .shutdown:
  531. channel.close(mode: .all, promise: nil)
  532. // These cases are purposefully separated: some crash reporting services provide stack traces
  533. // which don't include the precondition failure message (which contain the invalid state we were
  534. // in). Keeping the cases separate allows us work out the state from the line number.
  535. case .idle:
  536. self.invalidState()
  537. case .active:
  538. self.invalidState()
  539. case .ready:
  540. self.invalidState()
  541. case .transientFailure:
  542. self.invalidState()
  543. }
  544. }
  545. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  546. /// Must be called on the `EventLoop`.
  547. internal func channelInactive() {
  548. self.eventLoop.preconditionInEventLoop()
  549. self.logger.debug("deactivating connection", metadata: [
  550. "connectivity_state": "\(self.state.label)",
  551. ])
  552. switch self.state {
  553. // The channel is `active` but not `ready`. Should we try again?
  554. case let .active(active):
  555. switch active.reconnect {
  556. // No, shutdown instead.
  557. case .none:
  558. self.logger.debug("shutting down connection")
  559. let error = GRPCStatus(
  560. code: .unavailable,
  561. message: "The connection was dropped and connection re-establishment is disabled"
  562. )
  563. let shutdownState = ShutdownState(
  564. closeFuture: self.eventLoop.makeSucceededFuture(()),
  565. reason: error
  566. )
  567. self.state = .shutdown(shutdownState)
  568. active.readyChannelMuxPromise.fail(error)
  569. // Yes, after some time.
  570. case let .after(delay):
  571. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  572. self.startConnecting()
  573. }
  574. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  575. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  576. }
  577. // The channel was ready and working fine but something went wrong. Should we try to replace
  578. // the channel?
  579. case let .ready(ready):
  580. // No, no backoff is configured.
  581. if self.connectionBackoff == nil {
  582. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  583. self.state = .shutdown(
  584. ShutdownState(
  585. closeFuture: ready.channel.closeFuture,
  586. reason: GRPCStatus(
  587. code: .unavailable,
  588. message: "The connection was dropped and a reconnect was not configured"
  589. )
  590. )
  591. )
  592. } else {
  593. // Yes, start connecting now. We should go via `transientFailure`, however.
  594. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  595. self.startConnecting()
  596. }
  597. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  598. let backoffIterator = self.connectionBackoff?.makeIterator()
  599. self.state = .transientFailure(TransientFailureState(
  600. from: ready,
  601. scheduled: scheduled,
  602. backoffIterator: backoffIterator
  603. ))
  604. }
  605. // This is fine: we expect the channel to become inactive after becoming idle.
  606. case .idle:
  607. ()
  608. // We're already shutdown, that's fine.
  609. case .shutdown:
  610. ()
  611. // These cases are purposefully separated: some crash reporting services provide stack traces
  612. // which don't include the precondition failure message (which contain the invalid state we were
  613. // in). Keeping the cases separate allows us work out the state from the line number.
  614. case .connecting:
  615. self.invalidState()
  616. case .transientFailure:
  617. self.invalidState()
  618. }
  619. }
  620. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  621. /// called on the `EventLoop`.
  622. internal func ready() {
  623. self.eventLoop.preconditionInEventLoop()
  624. self.logger.debug("connection ready", metadata: [
  625. "connectivity_state": "\(self.state.label)",
  626. ])
  627. switch self.state {
  628. case let .active(connected):
  629. self.state = .ready(ReadyState(from: connected))
  630. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  631. case .shutdown:
  632. ()
  633. // These cases are purposefully separated: some crash reporting services provide stack traces
  634. // which don't include the precondition failure message (which contain the invalid state we were
  635. // in). Keeping the cases separate allows us work out the state from the line number.
  636. case .idle:
  637. self.invalidState()
  638. case .transientFailure:
  639. self.invalidState()
  640. case .connecting:
  641. self.invalidState()
  642. case .ready:
  643. self.invalidState()
  644. }
  645. }
  646. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  647. /// the `EventLoop`.
  648. internal func idle() {
  649. self.eventLoop.preconditionInEventLoop()
  650. self.logger.debug("idling connection", metadata: [
  651. "connectivity_state": "\(self.state.label)",
  652. ])
  653. switch self.state {
  654. case let .active(state):
  655. // This state is reachable if the keepalive timer fires before we reach the ready state.
  656. self.state = .idle(lastError: state.error)
  657. state.readyChannelMuxPromise
  658. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  659. case let .ready(state):
  660. self.state = .idle(lastError: state.error)
  661. case .shutdown:
  662. // This is expected when the connection is closed by the user: when the channel becomes
  663. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  664. // 'channelInactive()'.
  665. ()
  666. // These cases are purposefully separated: some crash reporting services provide stack traces
  667. // which don't include the precondition failure message (which contain the invalid state we were
  668. // in). Keeping the cases separate allows us work out the state from the line number.
  669. case .idle:
  670. self.invalidState()
  671. case .connecting:
  672. self.invalidState()
  673. case .transientFailure:
  674. self.invalidState()
  675. }
  676. }
  677. internal func streamClosed() {
  678. self.eventLoop.assertInEventLoop()
  679. self.http2Delegate?.streamClosed(self)
  680. }
  681. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  682. self.eventLoop.assertInEventLoop()
  683. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  684. self, maxConcurrentStreams: maxConcurrentStreams
  685. )
  686. }
  687. /// The connection has started quiescing: notify the connectivity monitor of this.
  688. internal func beginQuiescing() {
  689. self.eventLoop.assertInEventLoop()
  690. self.connectivityDelegate?.connectionIsQuiescing(self)
  691. }
  692. }
  693. extension ConnectionManager {
  694. // A connection attempt failed; we never established a connection.
  695. private func connectionFailed(withError error: Error) {
  696. self.eventLoop.preconditionInEventLoop()
  697. switch self.state {
  698. case let .connecting(connecting):
  699. // Should we reconnect?
  700. switch connecting.reconnect {
  701. // No, shutdown.
  702. case .none:
  703. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  704. self.state = .shutdown(
  705. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  706. )
  707. connecting.readyChannelMuxPromise.fail(error)
  708. connecting.candidateMuxPromise.fail(error)
  709. // Yes, after a delay.
  710. case let .after(delay):
  711. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  712. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  713. self.startConnecting()
  714. }
  715. self.state = .transientFailure(
  716. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  717. )
  718. // Candidate mux users are not willing to wait.
  719. connecting.candidateMuxPromise.fail(error)
  720. }
  721. // The application must have called shutdown while we were trying to establish a connection
  722. // which was doomed to fail anyway. That's fine, we can ignore this.
  723. case .shutdown:
  724. ()
  725. // We can't fail to connect if we aren't trying.
  726. //
  727. // These cases are purposefully separated: some crash reporting services provide stack traces
  728. // which don't include the precondition failure message (which contain the invalid state we were
  729. // in). Keeping the cases separate allows us work out the state from the line number.
  730. case .idle:
  731. self.invalidState()
  732. case .active:
  733. self.invalidState()
  734. case .ready:
  735. self.invalidState()
  736. case .transientFailure:
  737. self.invalidState()
  738. }
  739. }
  740. }
  741. extension ConnectionManager {
  742. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  743. // states. Must be called on the `EventLoop`.
  744. private func startConnecting() {
  745. self.eventLoop.assertInEventLoop()
  746. switch self.state {
  747. case .idle:
  748. let iterator = self.connectionBackoff?.makeIterator()
  749. self.startConnecting(
  750. backoffIterator: iterator,
  751. muxPromise: self.eventLoop.makePromise()
  752. )
  753. case let .transientFailure(pending):
  754. self.startConnecting(
  755. backoffIterator: pending.backoffIterator,
  756. muxPromise: pending.readyChannelMuxPromise
  757. )
  758. // We shutdown before a scheduled connection attempt had started.
  759. case .shutdown:
  760. ()
  761. // These cases are purposefully separated: some crash reporting services provide stack traces
  762. // which don't include the precondition failure message (which contain the invalid state we were
  763. // in). Keeping the cases separate allows us work out the state from the line number.
  764. case .connecting:
  765. self.invalidState()
  766. case .active:
  767. self.invalidState()
  768. case .ready:
  769. self.invalidState()
  770. }
  771. }
  772. private func startConnecting(
  773. backoffIterator: ConnectionBackoffIterator?,
  774. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  775. ) {
  776. let timeoutAndBackoff = backoffIterator?.next()
  777. // We're already on the event loop: submit the connect so it starts after we've made the
  778. // state change to `.connecting`.
  779. self.eventLoop.assertInEventLoop()
  780. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  781. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  782. managedBy: self,
  783. onEventLoop: self.eventLoop,
  784. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  785. logger: self.logger
  786. )
  787. channel.whenFailure { error in
  788. self.connectionFailed(withError: error)
  789. }
  790. return channel
  791. }
  792. // Should we reconnect if the candidate channel fails?
  793. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  794. let connecting = ConnectingState(
  795. backoffIterator: backoffIterator,
  796. reconnect: reconnect,
  797. candidate: candidate,
  798. readyChannelMuxPromise: muxPromise,
  799. candidateMuxPromise: self.eventLoop.makePromise()
  800. )
  801. self.state = .connecting(connecting)
  802. }
  803. }
  804. extension ConnectionManager {
  805. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  806. /// executing on the same `EventLoop` as the connection manager.
  807. internal var sync: Sync {
  808. return Sync(self)
  809. }
  810. internal struct Sync {
  811. private let manager: ConnectionManager
  812. fileprivate init(_ manager: ConnectionManager) {
  813. self.manager = manager
  814. }
  815. /// A delegate for connectivity changes.
  816. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  817. get {
  818. self.manager.eventLoop.assertInEventLoop()
  819. return self.manager.connectivityDelegate
  820. }
  821. nonmutating set {
  822. self.manager.eventLoop.assertInEventLoop()
  823. self.manager.connectivityDelegate = newValue
  824. }
  825. }
  826. /// A delegate for HTTP/2 connection changes.
  827. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  828. get {
  829. self.manager.eventLoop.assertInEventLoop()
  830. return self.manager.http2Delegate
  831. }
  832. nonmutating set {
  833. self.manager.eventLoop.assertInEventLoop()
  834. self.manager.http2Delegate = newValue
  835. }
  836. }
  837. /// Returns `true` if the connection is in the idle state.
  838. internal var isIdle: Bool {
  839. return self.manager.isIdle
  840. }
  841. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  842. /// other state.
  843. internal var multiplexer: HTTP2StreamMultiplexer? {
  844. return self.manager.multiplexer
  845. }
  846. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  847. internal func startConnecting() {
  848. self.manager.startConnecting()
  849. }
  850. }
  851. }
  852. extension ConnectionManager {
  853. private func invalidState(
  854. function: StaticString = #function,
  855. file: StaticString = #file,
  856. line: UInt = #line
  857. ) -> Never {
  858. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  859. }
  860. }