ConnectionManager.swift 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOConcurrencyHelpers
  20. import NIOHTTP2
  21. internal final class ConnectionManager {
  22. internal enum Reconnect {
  23. case none
  24. case after(TimeInterval)
  25. }
  26. internal struct ConnectingState {
  27. var backoffIterator: ConnectionBackoffIterator?
  28. var reconnect: Reconnect
  29. var candidate: EventLoopFuture<Channel>
  30. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  31. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  32. }
  33. internal struct ConnectedState {
  34. var backoffIterator: ConnectionBackoffIterator?
  35. var reconnect: Reconnect
  36. var candidate: Channel
  37. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  38. var multiplexer: HTTP2StreamMultiplexer
  39. var error: Error?
  40. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  41. self.backoffIterator = state.backoffIterator
  42. self.reconnect = state.reconnect
  43. self.candidate = candidate
  44. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  45. self.multiplexer = multiplexer
  46. }
  47. }
  48. internal struct ReadyState {
  49. var channel: Channel
  50. var multiplexer: HTTP2StreamMultiplexer
  51. var error: Error?
  52. init(from state: ConnectedState) {
  53. self.channel = state.candidate
  54. self.multiplexer = state.multiplexer
  55. }
  56. }
  57. internal struct TransientFailureState {
  58. var backoffIterator: ConnectionBackoffIterator?
  59. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  60. var scheduled: Scheduled<Void>
  61. var reason: Error?
  62. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  63. self.backoffIterator = state.backoffIterator
  64. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  65. self.scheduled = scheduled
  66. self.reason = reason
  67. }
  68. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  69. self.backoffIterator = state.backoffIterator
  70. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  71. self.scheduled = scheduled
  72. self.reason = state.error
  73. }
  74. init(
  75. from state: ReadyState,
  76. scheduled: Scheduled<Void>,
  77. backoffIterator: ConnectionBackoffIterator?
  78. ) {
  79. self.backoffIterator = backoffIterator
  80. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  81. self.scheduled = scheduled
  82. self.reason = state.error
  83. }
  84. }
  85. internal struct ShutdownState {
  86. var closeFuture: EventLoopFuture<Void>
  87. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  88. /// this error.
  89. var reason: Error
  90. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  91. self.closeFuture = closeFuture
  92. self.reason = reason
  93. }
  94. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  95. return ShutdownState(
  96. closeFuture: closeFuture,
  97. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  98. )
  99. }
  100. }
  101. internal enum State {
  102. /// No `Channel` is required.
  103. ///
  104. /// Valid next states:
  105. /// - `connecting`
  106. /// - `shutdown`
  107. case idle
  108. /// We're actively trying to establish a connection.
  109. ///
  110. /// Valid next states:
  111. /// - `active`
  112. /// - `transientFailure` (if our attempt fails and we're going to try again)
  113. /// - `shutdown`
  114. case connecting(ConnectingState)
  115. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  116. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  117. ///
  118. /// Valid next states:
  119. /// - `ready`
  120. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  121. /// to re-establish the connection)
  122. /// - `shutdown`
  123. case active(ConnectedState)
  124. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  125. /// the channel for making RPCs.
  126. ///
  127. /// Valid next states:
  128. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  129. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  130. /// - `shutdown`
  131. case ready(ReadyState)
  132. /// A `Channel` is desired, we'll attempt to create one in the future.
  133. ///
  134. /// Valid next states:
  135. /// - `connecting`
  136. /// - `shutdown`
  137. case transientFailure(TransientFailureState)
  138. /// We never want another `Channel`: this state is terminal.
  139. case shutdown(ShutdownState)
  140. fileprivate var label: String {
  141. switch self {
  142. case .idle:
  143. return "idle"
  144. case .connecting:
  145. return "connecting"
  146. case .active:
  147. return "active"
  148. case .ready:
  149. return "ready"
  150. case .transientFailure:
  151. return "transientFailure"
  152. case .shutdown:
  153. return "shutdown"
  154. }
  155. }
  156. }
  157. /// The last 'external' state we are in, a subset of the internal state.
  158. private var externalState: ConnectivityState = .idle
  159. /// Update the external state, potentially notifying a delegate about the change.
  160. private func updateExternalState(to nextState: ConnectivityState) {
  161. if self.externalState != nextState {
  162. let oldState = self.externalState
  163. self.externalState = nextState
  164. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  165. }
  166. }
  167. /// Our current state.
  168. private var state: State {
  169. didSet {
  170. switch self.state {
  171. case .idle:
  172. self.updateExternalState(to: .idle)
  173. self.updateConnectionID()
  174. case .connecting:
  175. self.updateExternalState(to: .connecting)
  176. // This is an internal state.
  177. case .active:
  178. ()
  179. case .ready:
  180. self.updateExternalState(to: .ready)
  181. case .transientFailure:
  182. self.updateExternalState(to: .transientFailure)
  183. self.updateConnectionID()
  184. case .shutdown:
  185. self.updateExternalState(to: .shutdown)
  186. }
  187. }
  188. }
  189. /// Returns whether the state is 'idle'.
  190. private var isIdle: Bool {
  191. self.eventLoop.assertInEventLoop()
  192. switch self.state {
  193. case .idle:
  194. return true
  195. case .connecting, .transientFailure, .active, .ready, .shutdown:
  196. return false
  197. }
  198. }
  199. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  200. private var multiplexer: HTTP2StreamMultiplexer? {
  201. self.eventLoop.assertInEventLoop()
  202. switch self.state {
  203. case let .ready(state):
  204. return state.multiplexer
  205. case .idle, .connecting, .transientFailure, .active, .shutdown:
  206. return nil
  207. }
  208. }
  209. /// The `EventLoop` that the managed connection will run on.
  210. internal let eventLoop: EventLoop
  211. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  212. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  213. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  214. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  215. /// An `EventLoopFuture<Channel>` provider.
  216. private let channelProvider: ConnectionManagerChannelProvider
  217. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  218. /// multiplexer.
  219. private let callStartBehavior: CallStartBehavior.Behavior
  220. /// The configuration to use when backing off between connection attempts, if reconnection
  221. /// attempts should be made at all.
  222. private let connectionBackoff: ConnectionBackoff?
  223. /// A logger.
  224. internal var logger: Logger
  225. private let connectionID: String
  226. private var channelNumber: UInt64
  227. private var channelNumberLock = Lock()
  228. private var _connectionIDAndNumber: String {
  229. return "\(self.connectionID)/\(self.channelNumber)"
  230. }
  231. private var connectionIDAndNumber: String {
  232. return self.channelNumberLock.withLock {
  233. return self._connectionIDAndNumber
  234. }
  235. }
  236. private func updateConnectionID() {
  237. self.channelNumberLock.withLockVoid {
  238. self.channelNumber &+= 1
  239. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  240. }
  241. }
  242. internal func appendMetadata(to logger: inout Logger) {
  243. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  244. }
  245. internal convenience init(
  246. configuration: ClientConnection.Configuration,
  247. channelProvider: ConnectionManagerChannelProvider? = nil,
  248. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  249. logger: Logger
  250. ) {
  251. self.init(
  252. eventLoop: configuration.eventLoopGroup.next(),
  253. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  254. callStartBehavior: configuration.callStartBehavior.wrapped,
  255. connectionBackoff: configuration.connectionBackoff,
  256. connectivityDelegate: connectivityDelegate,
  257. http2Delegate: nil,
  258. logger: logger
  259. )
  260. }
  261. internal init(
  262. eventLoop: EventLoop,
  263. channelProvider: ConnectionManagerChannelProvider,
  264. callStartBehavior: CallStartBehavior.Behavior,
  265. connectionBackoff: ConnectionBackoff?,
  266. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  267. http2Delegate: ConnectionManagerHTTP2Delegate?,
  268. logger: Logger
  269. ) {
  270. // Setup the logger.
  271. var logger = logger
  272. let connectionID = UUID().uuidString
  273. let channelNumber: UInt64 = 0
  274. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  275. self.eventLoop = eventLoop
  276. self.state = .idle
  277. self.channelProvider = channelProvider
  278. self.callStartBehavior = callStartBehavior
  279. self.connectionBackoff = connectionBackoff
  280. self.connectivityDelegate = connectivityDelegate
  281. self.http2Delegate = http2Delegate
  282. self.connectionID = connectionID
  283. self.channelNumber = channelNumber
  284. self.logger = logger
  285. }
  286. /// Get the multiplexer from the underlying channel handling gRPC calls.
  287. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  288. /// one chance to connect - if not reconnections are managed here.
  289. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  290. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  291. switch self.callStartBehavior {
  292. case .waitsForConnectivity:
  293. return self.getHTTP2MultiplexerPatient()
  294. case .fastFailure:
  295. return self.getHTTP2MultiplexerOptimistic()
  296. }
  297. }
  298. if self.eventLoop.inEventLoop {
  299. return getHTTP2Multiplexer0()
  300. } else {
  301. return self.eventLoop.flatSubmit {
  302. getHTTP2Multiplexer0()
  303. }
  304. }
  305. }
  306. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  307. /// Reconnects are handled if necessary.
  308. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  309. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  310. switch self.state {
  311. case .idle:
  312. self.startConnecting()
  313. // We started connecting so we must transition to the `connecting` state.
  314. guard case let .connecting(connecting) = self.state else {
  315. self.invalidState()
  316. }
  317. multiplexer = connecting.readyChannelMuxPromise.futureResult
  318. case let .connecting(state):
  319. multiplexer = state.readyChannelMuxPromise.futureResult
  320. case let .active(state):
  321. multiplexer = state.readyChannelMuxPromise.futureResult
  322. case let .ready(state):
  323. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  324. case let .transientFailure(state):
  325. multiplexer = state.readyChannelMuxPromise.futureResult
  326. case let .shutdown(state):
  327. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  328. }
  329. self.logger.debug("vending multiplexer future", metadata: [
  330. "connectivity_state": "\(self.state.label)",
  331. ])
  332. return multiplexer
  333. }
  334. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  335. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  336. ///
  337. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  338. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  339. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  340. self.eventLoop.preconditionInEventLoop()
  341. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  342. switch self.state {
  343. case .idle:
  344. self.startConnecting()
  345. // We started connecting so we must transition to the `connecting` state.
  346. guard case let .connecting(connecting) = self.state else {
  347. self.invalidState()
  348. }
  349. return connecting.candidateMuxPromise.futureResult
  350. case let .connecting(state):
  351. return state.candidateMuxPromise.futureResult
  352. case let .active(active):
  353. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  354. case let .ready(ready):
  355. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  356. case let .transientFailure(state):
  357. // Provide the reason we failed transiently, if we can.
  358. let error = state.reason ?? GRPCStatus(
  359. code: .unavailable,
  360. message: "Connection multiplexer requested while backing off"
  361. )
  362. return self.eventLoop.makeFailedFuture(error)
  363. case let .shutdown(state):
  364. return self.eventLoop.makeFailedFuture(state.reason)
  365. }
  366. }()
  367. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  368. "connectivity_state": "\(self.state.label)",
  369. ])
  370. return muxFuture
  371. }
  372. /// Shutdown any connection which exists. This is a request from the application.
  373. internal func shutdown() -> EventLoopFuture<Void> {
  374. if self.eventLoop.inEventLoop {
  375. return self._shutdown()
  376. } else {
  377. return self.eventLoop.flatSubmit {
  378. return self._shutdown()
  379. }
  380. }
  381. }
  382. private func _shutdown() -> EventLoopFuture<Void> {
  383. self.logger.debug("shutting down connection", metadata: [
  384. "connectivity_state": "\(self.state.label)",
  385. ])
  386. let shutdown: ShutdownState
  387. switch self.state {
  388. // We don't have a channel and we don't want one, easy!
  389. case .idle:
  390. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  391. self.state = .shutdown(shutdown)
  392. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  393. // the shutdown future and deal with any fallout from the connecting channel without the
  394. // application knowing.
  395. case let .connecting(state):
  396. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  397. self.state = .shutdown(shutdown)
  398. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  399. // connect the application shouldn't have access to the channel or multiplexer.
  400. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  401. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  402. // In case we do successfully connect, close immediately.
  403. state.candidate.whenSuccess {
  404. $0.close(mode: .all, promise: nil)
  405. }
  406. // We have an active channel but the application doesn't know about it yet. We'll do the same
  407. // as for `.connecting`.
  408. case let .active(state):
  409. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  410. self.state = .shutdown(shutdown)
  411. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  412. // connect the application shouldn't have access to the channel or multiplexer.
  413. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  414. // We have a channel, close it.
  415. state.candidate.close(mode: .all, promise: nil)
  416. // The channel is up and running: the application could be using it. We can close it and
  417. // return the `closeFuture`.
  418. case let .ready(state):
  419. shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
  420. self.state = .shutdown(shutdown)
  421. // We have a channel, close it.
  422. state.channel.close(mode: .all, promise: nil)
  423. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  424. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  425. // if we cancelled too late.
  426. case let .transientFailure(state):
  427. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  428. self.state = .shutdown(shutdown)
  429. // Stop the creation of a new channel, if we can. If we can't then the task to
  430. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  431. state.scheduled.cancel()
  432. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  433. // connect the application shouldn't should have access to the channel.
  434. state.readyChannelMuxPromise.fail(shutdown.reason)
  435. // We're already shutdown; nothing to do.
  436. case let .shutdown(state):
  437. shutdown = state
  438. }
  439. return shutdown.closeFuture
  440. }
  441. // MARK: - State changes from the channel handler.
  442. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  443. /// some context.
  444. internal func channelError(_ error: Error) {
  445. self.eventLoop.preconditionInEventLoop()
  446. switch self.state {
  447. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  448. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  449. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  450. case .idle:
  451. self.logger.warning("ignoring unexpected error in idle", metadata: [
  452. MetadataKey.error: "\(error)",
  453. ])
  454. case .connecting:
  455. self.invalidState()
  456. case var .active(state):
  457. state.error = error
  458. self.state = .active(state)
  459. case var .ready(state):
  460. state.error = error
  461. self.state = .ready(state)
  462. // If we've already in one of these states, then additional errors aren't helpful to us.
  463. case .transientFailure, .shutdown:
  464. ()
  465. }
  466. }
  467. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  468. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  469. self.eventLoop.preconditionInEventLoop()
  470. self.logger.debug("activating connection", metadata: [
  471. "connectivity_state": "\(self.state.label)",
  472. ])
  473. switch self.state {
  474. case let .connecting(connecting):
  475. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  476. self.state = .active(connected)
  477. // Optimistic connections are happy this this level of setup.
  478. connecting.candidateMuxPromise.succeed(multiplexer)
  479. // Application called shutdown before the channel become active; we should close it.
  480. case .shutdown:
  481. channel.close(mode: .all, promise: nil)
  482. // These cases are purposefully separated: some crash reporting services provide stack traces
  483. // which don't include the precondition failure message (which contain the invalid state we were
  484. // in). Keeping the cases separate allows us work out the state from the line number.
  485. case .idle:
  486. self.invalidState()
  487. case .active:
  488. self.invalidState()
  489. case .ready:
  490. self.invalidState()
  491. case .transientFailure:
  492. self.invalidState()
  493. }
  494. }
  495. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  496. /// Must be called on the `EventLoop`.
  497. internal func channelInactive() {
  498. self.eventLoop.preconditionInEventLoop()
  499. self.logger.debug("deactivating connection", metadata: [
  500. "connectivity_state": "\(self.state.label)",
  501. ])
  502. switch self.state {
  503. // The channel is `active` but not `ready`. Should we try again?
  504. case let .active(active):
  505. let error = GRPCStatus(
  506. code: .unavailable,
  507. message: "The connection was dropped and connection re-establishment is disabled"
  508. )
  509. switch active.reconnect {
  510. // No, shutdown instead.
  511. case .none:
  512. self.logger.debug("shutting down connection")
  513. let shutdownState = ShutdownState(
  514. closeFuture: self.eventLoop.makeSucceededFuture(()),
  515. reason: error
  516. )
  517. self.state = .shutdown(shutdownState)
  518. active.readyChannelMuxPromise.fail(error)
  519. // Yes, after some time.
  520. case let .after(delay):
  521. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  522. self.startConnecting()
  523. }
  524. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  525. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  526. }
  527. // The channel was ready and working fine but something went wrong. Should we try to replace
  528. // the channel?
  529. case let .ready(ready):
  530. // No, no backoff is configured.
  531. if self.connectionBackoff == nil {
  532. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  533. self.state = .shutdown(
  534. ShutdownState(
  535. closeFuture: ready.channel.closeFuture,
  536. reason: GRPCStatus(
  537. code: .unavailable,
  538. message: "The connection was dropped and a reconnect was not configured"
  539. )
  540. )
  541. )
  542. } else {
  543. // Yes, start connecting now. We should go via `transientFailure`, however.
  544. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  545. self.startConnecting()
  546. }
  547. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  548. let backoffIterator = self.connectionBackoff?.makeIterator()
  549. self.state = .transientFailure(TransientFailureState(
  550. from: ready,
  551. scheduled: scheduled,
  552. backoffIterator: backoffIterator
  553. ))
  554. }
  555. // This is fine: we expect the channel to become inactive after becoming idle.
  556. case .idle:
  557. ()
  558. // We're already shutdown, that's fine.
  559. case .shutdown:
  560. ()
  561. // These cases are purposefully separated: some crash reporting services provide stack traces
  562. // which don't include the precondition failure message (which contain the invalid state we were
  563. // in). Keeping the cases separate allows us work out the state from the line number.
  564. case .connecting:
  565. self.invalidState()
  566. case .transientFailure:
  567. self.invalidState()
  568. }
  569. }
  570. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  571. /// called on the `EventLoop`.
  572. internal func ready() {
  573. self.eventLoop.preconditionInEventLoop()
  574. self.logger.debug("connection ready", metadata: [
  575. "connectivity_state": "\(self.state.label)",
  576. ])
  577. switch self.state {
  578. case let .active(connected):
  579. self.state = .ready(ReadyState(from: connected))
  580. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  581. case .shutdown:
  582. ()
  583. // These cases are purposefully separated: some crash reporting services provide stack traces
  584. // which don't include the precondition failure message (which contain the invalid state we were
  585. // in). Keeping the cases separate allows us work out the state from the line number.
  586. case .idle:
  587. self.invalidState()
  588. case .transientFailure:
  589. self.invalidState()
  590. case .connecting:
  591. self.invalidState()
  592. case .ready:
  593. self.invalidState()
  594. }
  595. }
  596. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  597. /// the `EventLoop`.
  598. internal func idle() {
  599. self.eventLoop.preconditionInEventLoop()
  600. self.logger.debug("idling connection", metadata: [
  601. "connectivity_state": "\(self.state.label)",
  602. ])
  603. switch self.state {
  604. case let .active(state):
  605. // This state is reachable if the keepalive timer fires before we reach the ready state.
  606. self.state = .idle
  607. state.readyChannelMuxPromise
  608. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  609. case .ready:
  610. self.state = .idle
  611. case .shutdown:
  612. // This is expected when the connection is closed by the user: when the channel becomes
  613. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  614. // 'channelInactive()'.
  615. ()
  616. // These cases are purposefully separated: some crash reporting services provide stack traces
  617. // which don't include the precondition failure message (which contain the invalid state we were
  618. // in). Keeping the cases separate allows us work out the state from the line number.
  619. case .idle:
  620. self.invalidState()
  621. case .connecting:
  622. self.invalidState()
  623. case .transientFailure:
  624. self.invalidState()
  625. }
  626. }
  627. internal func streamClosed() {
  628. self.eventLoop.assertInEventLoop()
  629. self.http2Delegate?.streamClosed(self)
  630. }
  631. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  632. self.eventLoop.assertInEventLoop()
  633. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  634. self, maxConcurrentStreams: maxConcurrentStreams
  635. )
  636. }
  637. /// The connection has started quiescing: notify the connectivity monitor of this.
  638. internal func beginQuiescing() {
  639. self.eventLoop.assertInEventLoop()
  640. self.connectivityDelegate?.connectionIsQuiescing(self)
  641. }
  642. }
  643. extension ConnectionManager {
  644. // A connection attempt failed; we never established a connection.
  645. private func connectionFailed(withError error: Error) {
  646. self.eventLoop.preconditionInEventLoop()
  647. switch self.state {
  648. case let .connecting(connecting):
  649. // Should we reconnect?
  650. switch connecting.reconnect {
  651. // No, shutdown.
  652. case .none:
  653. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  654. self.state = .shutdown(
  655. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  656. )
  657. connecting.readyChannelMuxPromise.fail(error)
  658. connecting.candidateMuxPromise.fail(error)
  659. // Yes, after a delay.
  660. case let .after(delay):
  661. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  662. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  663. self.startConnecting()
  664. }
  665. self.state = .transientFailure(
  666. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  667. )
  668. // Candidate mux users are not willing to wait.
  669. connecting.candidateMuxPromise.fail(error)
  670. }
  671. // The application must have called shutdown while we were trying to establish a connection
  672. // which was doomed to fail anyway. That's fine, we can ignore this.
  673. case .shutdown:
  674. ()
  675. // We can't fail to connect if we aren't trying.
  676. //
  677. // These cases are purposefully separated: some crash reporting services provide stack traces
  678. // which don't include the precondition failure message (which contain the invalid state we were
  679. // in). Keeping the cases separate allows us work out the state from the line number.
  680. case .idle:
  681. self.invalidState()
  682. case .active:
  683. self.invalidState()
  684. case .ready:
  685. self.invalidState()
  686. case .transientFailure:
  687. self.invalidState()
  688. }
  689. }
  690. }
  691. extension ConnectionManager {
  692. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  693. // states. Must be called on the `EventLoop`.
  694. private func startConnecting() {
  695. self.eventLoop.assertInEventLoop()
  696. switch self.state {
  697. case .idle:
  698. let iterator = self.connectionBackoff?.makeIterator()
  699. self.startConnecting(
  700. backoffIterator: iterator,
  701. muxPromise: self.eventLoop.makePromise()
  702. )
  703. case let .transientFailure(pending):
  704. self.startConnecting(
  705. backoffIterator: pending.backoffIterator,
  706. muxPromise: pending.readyChannelMuxPromise
  707. )
  708. // We shutdown before a scheduled connection attempt had started.
  709. case .shutdown:
  710. ()
  711. // These cases are purposefully separated: some crash reporting services provide stack traces
  712. // which don't include the precondition failure message (which contain the invalid state we were
  713. // in). Keeping the cases separate allows us work out the state from the line number.
  714. case .connecting:
  715. self.invalidState()
  716. case .active:
  717. self.invalidState()
  718. case .ready:
  719. self.invalidState()
  720. }
  721. }
  722. private func startConnecting(
  723. backoffIterator: ConnectionBackoffIterator?,
  724. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  725. ) {
  726. let timeoutAndBackoff = backoffIterator?.next()
  727. // We're already on the event loop: submit the connect so it starts after we've made the
  728. // state change to `.connecting`.
  729. self.eventLoop.assertInEventLoop()
  730. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  731. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  732. managedBy: self,
  733. onEventLoop: self.eventLoop,
  734. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  735. logger: self.logger
  736. )
  737. channel.whenFailure { error in
  738. self.connectionFailed(withError: error)
  739. }
  740. return channel
  741. }
  742. // Should we reconnect if the candidate channel fails?
  743. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  744. let connecting = ConnectingState(
  745. backoffIterator: backoffIterator,
  746. reconnect: reconnect,
  747. candidate: candidate,
  748. readyChannelMuxPromise: muxPromise,
  749. candidateMuxPromise: self.eventLoop.makePromise()
  750. )
  751. self.state = .connecting(connecting)
  752. }
  753. }
  754. extension ConnectionManager {
  755. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  756. /// executing on the same `EventLoop` as the connection manager.
  757. internal var sync: Sync {
  758. return Sync(self)
  759. }
  760. internal struct Sync {
  761. private let manager: ConnectionManager
  762. fileprivate init(_ manager: ConnectionManager) {
  763. self.manager = manager
  764. }
  765. /// A delegate for connectivity changes.
  766. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  767. get {
  768. self.manager.eventLoop.assertInEventLoop()
  769. return self.manager.connectivityDelegate
  770. }
  771. nonmutating set {
  772. self.manager.eventLoop.assertInEventLoop()
  773. self.manager.connectivityDelegate = newValue
  774. }
  775. }
  776. /// A delegate for HTTP/2 connection changes.
  777. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  778. get {
  779. self.manager.eventLoop.assertInEventLoop()
  780. return self.manager.http2Delegate
  781. }
  782. nonmutating set {
  783. self.manager.eventLoop.assertInEventLoop()
  784. self.manager.http2Delegate = newValue
  785. }
  786. }
  787. /// Returns `true` if the connection is in the idle state.
  788. internal var isIdle: Bool {
  789. return self.manager.isIdle
  790. }
  791. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  792. /// other state.
  793. internal var multiplexer: HTTP2StreamMultiplexer? {
  794. return self.manager.multiplexer
  795. }
  796. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  797. internal func startConnecting() {
  798. self.manager.startConnecting()
  799. }
  800. }
  801. }
  802. extension ConnectionManager {
  803. private func invalidState(
  804. function: StaticString = #function,
  805. file: StaticString = #file,
  806. line: UInt = #line
  807. ) -> Never {
  808. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  809. }
  810. }