ConnectionManager.swift 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. #if compiler(>=5.6)
  22. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  23. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  24. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  25. // event loop is being used.
  26. extension ConnectionManager: @unchecked Sendable {}
  27. #endif // compiler(>=5.6)
  28. @usableFromInline
  29. internal final class ConnectionManager {
  30. internal enum Reconnect {
  31. case none
  32. case after(TimeInterval)
  33. }
  34. internal struct ConnectingState {
  35. var backoffIterator: ConnectionBackoffIterator?
  36. var reconnect: Reconnect
  37. var candidate: EventLoopFuture<Channel>
  38. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  39. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  40. }
  41. internal struct ConnectedState {
  42. var backoffIterator: ConnectionBackoffIterator?
  43. var reconnect: Reconnect
  44. var candidate: Channel
  45. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  46. var multiplexer: HTTP2StreamMultiplexer
  47. var error: Error?
  48. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  49. self.backoffIterator = state.backoffIterator
  50. self.reconnect = state.reconnect
  51. self.candidate = candidate
  52. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  53. self.multiplexer = multiplexer
  54. }
  55. }
  56. internal struct ReadyState {
  57. var channel: Channel
  58. var multiplexer: HTTP2StreamMultiplexer
  59. var error: Error?
  60. init(from state: ConnectedState) {
  61. self.channel = state.candidate
  62. self.multiplexer = state.multiplexer
  63. }
  64. }
  65. internal struct TransientFailureState {
  66. var backoffIterator: ConnectionBackoffIterator?
  67. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  68. var scheduled: Scheduled<Void>
  69. var reason: Error
  70. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  71. self.backoffIterator = state.backoffIterator
  72. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  73. self.scheduled = scheduled
  74. self.reason = reason
  75. }
  76. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  77. self.backoffIterator = state.backoffIterator
  78. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  79. self.scheduled = scheduled
  80. self.reason = state.error ?? GRPCStatus(
  81. code: .unavailable,
  82. message: "Unexpected connection drop"
  83. )
  84. }
  85. init(
  86. from state: ReadyState,
  87. scheduled: Scheduled<Void>,
  88. backoffIterator: ConnectionBackoffIterator?
  89. ) {
  90. self.backoffIterator = backoffIterator
  91. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  92. self.scheduled = scheduled
  93. self.reason = state.error ?? GRPCStatus(
  94. code: .unavailable,
  95. message: "Unexpected connection drop"
  96. )
  97. }
  98. }
  99. internal struct ShutdownState {
  100. var closeFuture: EventLoopFuture<Void>
  101. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  102. /// this error.
  103. var reason: Error
  104. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  105. self.closeFuture = closeFuture
  106. self.reason = reason
  107. }
  108. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  109. return ShutdownState(
  110. closeFuture: closeFuture,
  111. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  112. )
  113. }
  114. }
  115. internal enum State {
  116. /// No `Channel` is required.
  117. ///
  118. /// Valid next states:
  119. /// - `connecting`
  120. /// - `shutdown`
  121. case idle(lastError: Error?)
  122. /// We're actively trying to establish a connection.
  123. ///
  124. /// Valid next states:
  125. /// - `active`
  126. /// - `transientFailure` (if our attempt fails and we're going to try again)
  127. /// - `shutdown`
  128. case connecting(ConnectingState)
  129. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  130. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  131. ///
  132. /// Valid next states:
  133. /// - `ready`
  134. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  135. /// to re-establish the connection)
  136. /// - `shutdown`
  137. case active(ConnectedState)
  138. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  139. /// the channel for making RPCs.
  140. ///
  141. /// Valid next states:
  142. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  143. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  144. /// - `shutdown`
  145. case ready(ReadyState)
  146. /// A `Channel` is desired, we'll attempt to create one in the future.
  147. ///
  148. /// Valid next states:
  149. /// - `connecting`
  150. /// - `shutdown`
  151. case transientFailure(TransientFailureState)
  152. /// We never want another `Channel`: this state is terminal.
  153. case shutdown(ShutdownState)
  154. fileprivate var label: String {
  155. switch self {
  156. case .idle:
  157. return "idle"
  158. case .connecting:
  159. return "connecting"
  160. case .active:
  161. return "active"
  162. case .ready:
  163. return "ready"
  164. case .transientFailure:
  165. return "transientFailure"
  166. case .shutdown:
  167. return "shutdown"
  168. }
  169. }
  170. }
  171. /// The last 'external' state we are in, a subset of the internal state.
  172. private var externalState: _ConnectivityState = .idle(nil)
  173. /// Update the external state, potentially notifying a delegate about the change.
  174. private func updateExternalState(to nextState: _ConnectivityState) {
  175. if !self.externalState.isSameState(as: nextState) {
  176. let oldState = self.externalState
  177. self.externalState = nextState
  178. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  179. }
  180. }
  181. /// Our current state.
  182. private var state: State {
  183. didSet {
  184. switch self.state {
  185. case let .idle(error):
  186. self.updateExternalState(to: .idle(error))
  187. self.updateConnectionID()
  188. case .connecting:
  189. self.updateExternalState(to: .connecting)
  190. // This is an internal state.
  191. case .active:
  192. ()
  193. case .ready:
  194. self.updateExternalState(to: .ready)
  195. case let .transientFailure(state):
  196. self.updateExternalState(to: .transientFailure(state.reason))
  197. self.updateConnectionID()
  198. case .shutdown:
  199. self.updateExternalState(to: .shutdown)
  200. }
  201. }
  202. }
  203. /// Returns whether the state is 'idle'.
  204. private var isIdle: Bool {
  205. self.eventLoop.assertInEventLoop()
  206. switch self.state {
  207. case .idle:
  208. return true
  209. case .connecting, .transientFailure, .active, .ready, .shutdown:
  210. return false
  211. }
  212. }
  213. /// Returns whether the state is 'shutdown'.
  214. private var isShutdown: Bool {
  215. self.eventLoop.assertInEventLoop()
  216. switch self.state {
  217. case .shutdown:
  218. return true
  219. case .idle, .connecting, .transientFailure, .active, .ready:
  220. return false
  221. }
  222. }
  223. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  224. private var multiplexer: HTTP2StreamMultiplexer? {
  225. self.eventLoop.assertInEventLoop()
  226. switch self.state {
  227. case let .ready(state):
  228. return state.multiplexer
  229. case .idle, .connecting, .transientFailure, .active, .shutdown:
  230. return nil
  231. }
  232. }
  233. /// The `EventLoop` that the managed connection will run on.
  234. internal let eventLoop: EventLoop
  235. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  236. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  237. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  238. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  239. /// An `EventLoopFuture<Channel>` provider.
  240. private let channelProvider: ConnectionManagerChannelProvider
  241. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  242. /// multiplexer.
  243. private let callStartBehavior: CallStartBehavior.Behavior
  244. /// The configuration to use when backing off between connection attempts, if reconnection
  245. /// attempts should be made at all.
  246. private let connectionBackoff: ConnectionBackoff?
  247. /// A logger.
  248. internal var logger: Logger
  249. private let connectionID: String
  250. private var channelNumber: UInt64
  251. private var channelNumberLock = NIOLock()
  252. private var _connectionIDAndNumber: String {
  253. return "\(self.connectionID)/\(self.channelNumber)"
  254. }
  255. private var connectionIDAndNumber: String {
  256. return self.channelNumberLock.withLock {
  257. return self._connectionIDAndNumber
  258. }
  259. }
  260. private func updateConnectionID() {
  261. self.channelNumberLock.withLock {
  262. self.channelNumber &+= 1
  263. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  264. }
  265. }
  266. internal func appendMetadata(to logger: inout Logger) {
  267. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  268. }
  269. internal convenience init(
  270. configuration: ClientConnection.Configuration,
  271. channelProvider: ConnectionManagerChannelProvider? = nil,
  272. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  273. logger: Logger
  274. ) {
  275. self.init(
  276. eventLoop: configuration.eventLoopGroup.next(),
  277. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  278. callStartBehavior: configuration.callStartBehavior.wrapped,
  279. connectionBackoff: configuration.connectionBackoff,
  280. connectivityDelegate: connectivityDelegate,
  281. http2Delegate: nil,
  282. logger: logger
  283. )
  284. }
  285. internal init(
  286. eventLoop: EventLoop,
  287. channelProvider: ConnectionManagerChannelProvider,
  288. callStartBehavior: CallStartBehavior.Behavior,
  289. connectionBackoff: ConnectionBackoff?,
  290. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  291. http2Delegate: ConnectionManagerHTTP2Delegate?,
  292. logger: Logger
  293. ) {
  294. // Setup the logger.
  295. var logger = logger
  296. let connectionID = UUID().uuidString
  297. let channelNumber: UInt64 = 0
  298. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  299. self.eventLoop = eventLoop
  300. self.state = .idle(lastError: nil)
  301. self.channelProvider = channelProvider
  302. self.callStartBehavior = callStartBehavior
  303. self.connectionBackoff = connectionBackoff
  304. self.connectivityDelegate = connectivityDelegate
  305. self.http2Delegate = http2Delegate
  306. self.connectionID = connectionID
  307. self.channelNumber = channelNumber
  308. self.logger = logger
  309. }
  310. /// Get the multiplexer from the underlying channel handling gRPC calls.
  311. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  312. /// one chance to connect - if not reconnections are managed here.
  313. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  314. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  315. switch self.callStartBehavior {
  316. case .waitsForConnectivity:
  317. return self.getHTTP2MultiplexerPatient()
  318. case .fastFailure:
  319. return self.getHTTP2MultiplexerOptimistic()
  320. }
  321. }
  322. if self.eventLoop.inEventLoop {
  323. return getHTTP2Multiplexer0()
  324. } else {
  325. return self.eventLoop.flatSubmit {
  326. getHTTP2Multiplexer0()
  327. }
  328. }
  329. }
  330. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  331. /// Reconnects are handled if necessary.
  332. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  333. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  334. switch self.state {
  335. case .idle:
  336. self.startConnecting()
  337. // We started connecting so we must transition to the `connecting` state.
  338. guard case let .connecting(connecting) = self.state else {
  339. self.invalidState()
  340. }
  341. multiplexer = connecting.readyChannelMuxPromise.futureResult
  342. case let .connecting(state):
  343. multiplexer = state.readyChannelMuxPromise.futureResult
  344. case let .active(state):
  345. multiplexer = state.readyChannelMuxPromise.futureResult
  346. case let .ready(state):
  347. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  348. case let .transientFailure(state):
  349. multiplexer = state.readyChannelMuxPromise.futureResult
  350. case let .shutdown(state):
  351. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  352. }
  353. self.logger.debug("vending multiplexer future", metadata: [
  354. "connectivity_state": "\(self.state.label)",
  355. ])
  356. return multiplexer
  357. }
  358. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  359. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  360. ///
  361. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  362. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  363. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  364. self.eventLoop.preconditionInEventLoop()
  365. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  366. switch self.state {
  367. case .idle:
  368. self.startConnecting()
  369. // We started connecting so we must transition to the `connecting` state.
  370. guard case let .connecting(connecting) = self.state else {
  371. self.invalidState()
  372. }
  373. return connecting.candidateMuxPromise.futureResult
  374. case let .connecting(state):
  375. return state.candidateMuxPromise.futureResult
  376. case let .active(active):
  377. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  378. case let .ready(ready):
  379. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  380. case let .transientFailure(state):
  381. return self.eventLoop.makeFailedFuture(state.reason)
  382. case let .shutdown(state):
  383. return self.eventLoop.makeFailedFuture(state.reason)
  384. }
  385. }()
  386. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  387. "connectivity_state": "\(self.state.label)",
  388. ])
  389. return muxFuture
  390. }
  391. @usableFromInline
  392. internal enum ShutdownMode {
  393. /// Closes the underlying channel without waiting for existing RPCs to complete.
  394. case forceful
  395. /// Allows running RPCs to run their course before closing the underlying channel. No new
  396. /// streams may be created.
  397. case graceful(NIODeadline)
  398. }
  399. /// Shutdown the underlying connection.
  400. ///
  401. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  402. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  403. let promise = self.eventLoop.makePromise(of: Void.self)
  404. self.shutdown(mode: mode, promise: promise)
  405. return promise.futureResult
  406. }
  407. /// Shutdown the underlying connection.
  408. ///
  409. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  410. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  411. if self.eventLoop.inEventLoop {
  412. self._shutdown(mode: mode, promise: promise)
  413. } else {
  414. self.eventLoop.execute {
  415. self._shutdown(mode: mode, promise: promise)
  416. }
  417. }
  418. }
  419. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  420. self.logger.debug("shutting down connection", metadata: [
  421. "connectivity_state": "\(self.state.label)",
  422. "shutdown.mode": "\(mode)",
  423. ])
  424. switch self.state {
  425. // We don't have a channel and we don't want one, easy!
  426. case .idle:
  427. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  428. self.state = .shutdown(shutdown)
  429. promise.succeed(())
  430. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  431. // the shutdown future and deal with any fallout from the connecting channel without the
  432. // application knowing.
  433. case let .connecting(state):
  434. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  435. self.state = .shutdown(shutdown)
  436. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  437. // connect the application shouldn't have access to the channel or multiplexer.
  438. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  439. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  440. // Complete the shutdown promise when the connection attempt has completed.
  441. state.candidate.whenComplete {
  442. switch $0 {
  443. case let .success(channel):
  444. // In case we do successfully connect, close immediately.
  445. channel.close(mode: .all, promise: nil)
  446. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  447. case .failure:
  448. // We failed to connect, that's fine we still shutdown successfully.
  449. promise.succeed(())
  450. }
  451. }
  452. // We have an active channel but the application doesn't know about it yet. We'll do the same
  453. // as for `.connecting`.
  454. case let .active(state):
  455. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  456. self.state = .shutdown(shutdown)
  457. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  458. // connect the application shouldn't have access to the channel or multiplexer.
  459. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  460. // We have a channel, close it. We only create streams in the ready state so there's no need
  461. // to quiesce here.
  462. state.candidate.close(mode: .all, promise: nil)
  463. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  464. // The channel is up and running: the application could be using it. We can close it and
  465. // return the `closeFuture`.
  466. case let .ready(state):
  467. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  468. self.state = .shutdown(shutdown)
  469. switch mode {
  470. case .forceful:
  471. // We have a channel, close it.
  472. state.channel.close(mode: .all, promise: nil)
  473. case let .graceful(deadline):
  474. // If we don't close by the deadline forcibly close the channel.
  475. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  476. self.logger.info("shutdown timer expired, forcibly closing connection")
  477. state.channel.close(mode: .all, promise: nil)
  478. }
  479. // Cancel the force close if we close normally first.
  480. state.channel.closeFuture.whenComplete { _ in
  481. scheduledForceClose.cancel()
  482. }
  483. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  484. // the channel when all streams have been closed.
  485. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  486. }
  487. // Complete the promise when we eventually close.
  488. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  489. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  490. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  491. // if we cancelled too late.
  492. case let .transientFailure(state):
  493. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  494. self.state = .shutdown(shutdown)
  495. // Stop the creation of a new channel, if we can. If we can't then the task to
  496. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  497. state.scheduled.cancel()
  498. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  499. // connect the application shouldn't should have access to the channel.
  500. state.readyChannelMuxPromise.fail(shutdown.reason)
  501. // No active channel, so complete the shutdown promise now.
  502. promise.succeed(())
  503. // We're already shutdown; there's nothing to do.
  504. case let .shutdown(state):
  505. promise.completeWith(state.closeFuture)
  506. }
  507. }
  508. /// Registers a callback which fires when the current active connection is closed.
  509. ///
  510. /// If there is a connection, the callback will be invoked with `true` when the connection is
  511. /// closed. Otherwise the callback is invoked with `false`.
  512. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  513. if self.eventLoop.inEventLoop {
  514. self._onCurrentConnectionClose(onClose)
  515. } else {
  516. self.eventLoop.execute {
  517. self._onCurrentConnectionClose(onClose)
  518. }
  519. }
  520. }
  521. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  522. self.eventLoop.assertInEventLoop()
  523. switch self.state {
  524. case let .ready(state):
  525. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  526. case .idle, .connecting, .active, .transientFailure, .shutdown:
  527. onClose(false)
  528. }
  529. }
  530. // MARK: - State changes from the channel handler.
  531. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  532. /// some context.
  533. internal func channelError(_ error: Error) {
  534. self.eventLoop.preconditionInEventLoop()
  535. switch self.state {
  536. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  537. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  538. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  539. case .idle:
  540. self.logger.warning("ignoring unexpected error in idle", metadata: [
  541. MetadataKey.error: "\(error)",
  542. ])
  543. case .connecting:
  544. self.connectionFailed(withError: error)
  545. case var .active(state):
  546. state.error = error
  547. self.state = .active(state)
  548. case var .ready(state):
  549. state.error = error
  550. self.state = .ready(state)
  551. // If we've already in one of these states, then additional errors aren't helpful to us.
  552. case .transientFailure, .shutdown:
  553. ()
  554. }
  555. }
  556. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  557. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  558. self.eventLoop.preconditionInEventLoop()
  559. self.logger.debug("activating connection", metadata: [
  560. "connectivity_state": "\(self.state.label)",
  561. ])
  562. switch self.state {
  563. case let .connecting(connecting):
  564. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  565. self.state = .active(connected)
  566. // Optimistic connections are happy this this level of setup.
  567. connecting.candidateMuxPromise.succeed(multiplexer)
  568. // Application called shutdown before the channel become active; we should close it.
  569. case .shutdown:
  570. channel.close(mode: .all, promise: nil)
  571. // These cases are purposefully separated: some crash reporting services provide stack traces
  572. // which don't include the precondition failure message (which contain the invalid state we were
  573. // in). Keeping the cases separate allows us work out the state from the line number.
  574. case .idle:
  575. self.invalidState()
  576. case .active:
  577. self.invalidState()
  578. case .ready:
  579. self.invalidState()
  580. case .transientFailure:
  581. self.invalidState()
  582. }
  583. }
  584. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  585. /// Must be called on the `EventLoop`.
  586. internal func channelInactive() {
  587. self.eventLoop.preconditionInEventLoop()
  588. self.logger.debug("deactivating connection", metadata: [
  589. "connectivity_state": "\(self.state.label)",
  590. ])
  591. switch self.state {
  592. // The channel is `active` but not `ready`. Should we try again?
  593. case let .active(active):
  594. switch active.reconnect {
  595. // No, shutdown instead.
  596. case .none:
  597. self.logger.debug("shutting down connection")
  598. let error = GRPCStatus(
  599. code: .unavailable,
  600. message: "The connection was dropped and connection re-establishment is disabled"
  601. )
  602. let shutdownState = ShutdownState(
  603. closeFuture: self.eventLoop.makeSucceededFuture(()),
  604. reason: error
  605. )
  606. self.state = .shutdown(shutdownState)
  607. active.readyChannelMuxPromise.fail(error)
  608. // Yes, after some time.
  609. case let .after(delay):
  610. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  611. self.startConnecting()
  612. }
  613. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  614. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  615. }
  616. // The channel was ready and working fine but something went wrong. Should we try to replace
  617. // the channel?
  618. case let .ready(ready):
  619. // No, no backoff is configured.
  620. if self.connectionBackoff == nil {
  621. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  622. self.state = .shutdown(
  623. ShutdownState(
  624. closeFuture: ready.channel.closeFuture,
  625. reason: GRPCStatus(
  626. code: .unavailable,
  627. message: "The connection was dropped and a reconnect was not configured"
  628. )
  629. )
  630. )
  631. } else {
  632. // Yes, start connecting now. We should go via `transientFailure`, however.
  633. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  634. self.startConnecting()
  635. }
  636. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  637. let backoffIterator = self.connectionBackoff?.makeIterator()
  638. self.state = .transientFailure(TransientFailureState(
  639. from: ready,
  640. scheduled: scheduled,
  641. backoffIterator: backoffIterator
  642. ))
  643. }
  644. // This is fine: we expect the channel to become inactive after becoming idle.
  645. case .idle:
  646. ()
  647. // We're already shutdown, that's fine.
  648. case .shutdown:
  649. ()
  650. // These cases are purposefully separated: some crash reporting services provide stack traces
  651. // which don't include the precondition failure message (which contain the invalid state we were
  652. // in). Keeping the cases separate allows us work out the state from the line number.
  653. case .connecting:
  654. self.invalidState()
  655. case .transientFailure:
  656. self.invalidState()
  657. }
  658. }
  659. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  660. /// called on the `EventLoop`.
  661. internal func ready() {
  662. self.eventLoop.preconditionInEventLoop()
  663. self.logger.debug("connection ready", metadata: [
  664. "connectivity_state": "\(self.state.label)",
  665. ])
  666. switch self.state {
  667. case let .active(connected):
  668. self.state = .ready(ReadyState(from: connected))
  669. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  670. case .shutdown:
  671. ()
  672. // These cases are purposefully separated: some crash reporting services provide stack traces
  673. // which don't include the precondition failure message (which contain the invalid state we were
  674. // in). Keeping the cases separate allows us work out the state from the line number.
  675. case .idle:
  676. self.invalidState()
  677. case .transientFailure:
  678. self.invalidState()
  679. case .connecting:
  680. self.invalidState()
  681. case .ready:
  682. self.invalidState()
  683. }
  684. }
  685. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  686. /// the `EventLoop`.
  687. internal func idle() {
  688. self.eventLoop.preconditionInEventLoop()
  689. self.logger.debug("idling connection", metadata: [
  690. "connectivity_state": "\(self.state.label)",
  691. ])
  692. switch self.state {
  693. case let .active(state):
  694. // This state is reachable if the keepalive timer fires before we reach the ready state.
  695. self.state = .idle(lastError: state.error)
  696. state.readyChannelMuxPromise
  697. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  698. case let .ready(state):
  699. self.state = .idle(lastError: state.error)
  700. case .shutdown:
  701. // This is expected when the connection is closed by the user: when the channel becomes
  702. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  703. // 'channelInactive()'.
  704. ()
  705. // These cases are purposefully separated: some crash reporting services provide stack traces
  706. // which don't include the precondition failure message (which contain the invalid state we were
  707. // in). Keeping the cases separate allows us work out the state from the line number.
  708. case .idle:
  709. self.invalidState()
  710. case .connecting:
  711. self.invalidState()
  712. case .transientFailure:
  713. self.invalidState()
  714. }
  715. }
  716. internal func streamOpened() {
  717. self.eventLoop.assertInEventLoop()
  718. self.http2Delegate?.streamOpened(self)
  719. }
  720. internal func streamClosed() {
  721. self.eventLoop.assertInEventLoop()
  722. self.http2Delegate?.streamClosed(self)
  723. }
  724. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  725. self.eventLoop.assertInEventLoop()
  726. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  727. self, maxConcurrentStreams: maxConcurrentStreams
  728. )
  729. }
  730. /// The connection has started quiescing: notify the connectivity monitor of this.
  731. internal func beginQuiescing() {
  732. self.eventLoop.assertInEventLoop()
  733. self.connectivityDelegate?.connectionIsQuiescing(self)
  734. }
  735. }
  736. extension ConnectionManager {
  737. // A connection attempt failed; we never established a connection.
  738. private func connectionFailed(withError error: Error) {
  739. self.eventLoop.preconditionInEventLoop()
  740. switch self.state {
  741. case let .connecting(connecting):
  742. // Should we reconnect?
  743. switch connecting.reconnect {
  744. // No, shutdown.
  745. case .none:
  746. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  747. self.state = .shutdown(
  748. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  749. )
  750. connecting.readyChannelMuxPromise.fail(error)
  751. connecting.candidateMuxPromise.fail(error)
  752. // Yes, after a delay.
  753. case let .after(delay):
  754. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  755. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  756. self.startConnecting()
  757. }
  758. self.state = .transientFailure(
  759. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  760. )
  761. // Candidate mux users are not willing to wait.
  762. connecting.candidateMuxPromise.fail(error)
  763. }
  764. // The application must have called shutdown while we were trying to establish a connection
  765. // which was doomed to fail anyway. That's fine, we can ignore this.
  766. case .shutdown:
  767. ()
  768. // We can't fail to connect if we aren't trying.
  769. //
  770. // These cases are purposefully separated: some crash reporting services provide stack traces
  771. // which don't include the precondition failure message (which contain the invalid state we were
  772. // in). Keeping the cases separate allows us work out the state from the line number.
  773. case .idle:
  774. self.invalidState()
  775. case .active:
  776. self.invalidState()
  777. case .ready:
  778. self.invalidState()
  779. case .transientFailure:
  780. self.invalidState()
  781. }
  782. }
  783. }
  784. extension ConnectionManager {
  785. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  786. // states. Must be called on the `EventLoop`.
  787. private func startConnecting() {
  788. self.eventLoop.assertInEventLoop()
  789. switch self.state {
  790. case .idle:
  791. let iterator = self.connectionBackoff?.makeIterator()
  792. self.startConnecting(
  793. backoffIterator: iterator,
  794. muxPromise: self.eventLoop.makePromise()
  795. )
  796. case let .transientFailure(pending):
  797. self.startConnecting(
  798. backoffIterator: pending.backoffIterator,
  799. muxPromise: pending.readyChannelMuxPromise
  800. )
  801. // We shutdown before a scheduled connection attempt had started.
  802. case .shutdown:
  803. ()
  804. // These cases are purposefully separated: some crash reporting services provide stack traces
  805. // which don't include the precondition failure message (which contain the invalid state we were
  806. // in). Keeping the cases separate allows us work out the state from the line number.
  807. case .connecting:
  808. self.invalidState()
  809. case .active:
  810. self.invalidState()
  811. case .ready:
  812. self.invalidState()
  813. }
  814. }
  815. private func startConnecting(
  816. backoffIterator: ConnectionBackoffIterator?,
  817. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  818. ) {
  819. let timeoutAndBackoff = backoffIterator?.next()
  820. // We're already on the event loop: submit the connect so it starts after we've made the
  821. // state change to `.connecting`.
  822. self.eventLoop.assertInEventLoop()
  823. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  824. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  825. managedBy: self,
  826. onEventLoop: self.eventLoop,
  827. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  828. logger: self.logger
  829. )
  830. channel.whenFailure { error in
  831. self.connectionFailed(withError: error)
  832. }
  833. return channel
  834. }
  835. // Should we reconnect if the candidate channel fails?
  836. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  837. let connecting = ConnectingState(
  838. backoffIterator: backoffIterator,
  839. reconnect: reconnect,
  840. candidate: candidate,
  841. readyChannelMuxPromise: muxPromise,
  842. candidateMuxPromise: self.eventLoop.makePromise()
  843. )
  844. self.state = .connecting(connecting)
  845. }
  846. }
  847. extension ConnectionManager {
  848. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  849. /// executing on the same `EventLoop` as the connection manager.
  850. internal var sync: Sync {
  851. return Sync(self)
  852. }
  853. internal struct Sync {
  854. private let manager: ConnectionManager
  855. fileprivate init(_ manager: ConnectionManager) {
  856. self.manager = manager
  857. }
  858. /// A delegate for connectivity changes.
  859. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  860. get {
  861. self.manager.eventLoop.assertInEventLoop()
  862. return self.manager.connectivityDelegate
  863. }
  864. nonmutating set {
  865. self.manager.eventLoop.assertInEventLoop()
  866. self.manager.connectivityDelegate = newValue
  867. }
  868. }
  869. /// A delegate for HTTP/2 connection changes.
  870. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  871. get {
  872. self.manager.eventLoop.assertInEventLoop()
  873. return self.manager.http2Delegate
  874. }
  875. nonmutating set {
  876. self.manager.eventLoop.assertInEventLoop()
  877. self.manager.http2Delegate = newValue
  878. }
  879. }
  880. /// Returns `true` if the connection is in the idle state.
  881. internal var isIdle: Bool {
  882. return self.manager.isIdle
  883. }
  884. /// Returne `true` if the connection is in the shutdown state.
  885. internal var isShutdown: Bool {
  886. return self.manager.isShutdown
  887. }
  888. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  889. /// other state.
  890. internal var multiplexer: HTTP2StreamMultiplexer? {
  891. return self.manager.multiplexer
  892. }
  893. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  894. internal func startConnecting() {
  895. self.manager.startConnecting()
  896. }
  897. }
  898. }
  899. extension ConnectionManager {
  900. private func invalidState(
  901. function: StaticString = #function,
  902. file: StaticString = #fileID,
  903. line: UInt = #line
  904. ) -> Never {
  905. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  906. }
  907. }