2
0

ConnectionManager.swift 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. #if compiler(>=5.6)
  22. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  23. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  24. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  25. // event loop is being used.
  26. extension ConnectionManager: @unchecked Sendable {}
  27. #endif // compiler(>=5.6)
  28. @usableFromInline
  29. internal final class ConnectionManager {
  30. internal enum Reconnect {
  31. case none
  32. case after(TimeInterval)
  33. }
  34. internal struct ConnectingState {
  35. var backoffIterator: ConnectionBackoffIterator?
  36. var reconnect: Reconnect
  37. var candidate: EventLoopFuture<Channel>
  38. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  39. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  40. }
  41. internal struct ConnectedState {
  42. var backoffIterator: ConnectionBackoffIterator?
  43. var reconnect: Reconnect
  44. var candidate: Channel
  45. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  46. var multiplexer: HTTP2StreamMultiplexer
  47. var error: Error?
  48. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  49. self.backoffIterator = state.backoffIterator
  50. self.reconnect = state.reconnect
  51. self.candidate = candidate
  52. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  53. self.multiplexer = multiplexer
  54. }
  55. }
  56. internal struct ReadyState {
  57. var channel: Channel
  58. var multiplexer: HTTP2StreamMultiplexer
  59. var error: Error?
  60. init(from state: ConnectedState) {
  61. self.channel = state.candidate
  62. self.multiplexer = state.multiplexer
  63. }
  64. }
  65. internal struct TransientFailureState {
  66. var backoffIterator: ConnectionBackoffIterator?
  67. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  68. var scheduled: Scheduled<Void>
  69. var reason: Error
  70. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
  71. self.backoffIterator = state.backoffIterator
  72. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  73. self.scheduled = scheduled
  74. self.reason = reason ?? GRPCStatus(
  75. code: .unavailable,
  76. message: "Unexpected connection drop"
  77. )
  78. }
  79. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  80. self.backoffIterator = state.backoffIterator
  81. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  82. self.scheduled = scheduled
  83. self.reason = state.error ?? GRPCStatus(
  84. code: .unavailable,
  85. message: "Unexpected connection drop"
  86. )
  87. }
  88. init(
  89. from state: ReadyState,
  90. scheduled: Scheduled<Void>,
  91. backoffIterator: ConnectionBackoffIterator?
  92. ) {
  93. self.backoffIterator = backoffIterator
  94. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  95. self.scheduled = scheduled
  96. self.reason = state.error ?? GRPCStatus(
  97. code: .unavailable,
  98. message: "Unexpected connection drop"
  99. )
  100. }
  101. }
  102. internal struct ShutdownState {
  103. var closeFuture: EventLoopFuture<Void>
  104. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  105. /// this error.
  106. var reason: Error
  107. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  108. self.closeFuture = closeFuture
  109. self.reason = reason
  110. }
  111. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  112. return ShutdownState(
  113. closeFuture: closeFuture,
  114. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  115. )
  116. }
  117. }
  118. internal enum State {
  119. /// No `Channel` is required.
  120. ///
  121. /// Valid next states:
  122. /// - `connecting`
  123. /// - `shutdown`
  124. case idle(lastError: Error?)
  125. /// We're actively trying to establish a connection.
  126. ///
  127. /// Valid next states:
  128. /// - `active`
  129. /// - `transientFailure` (if our attempt fails and we're going to try again)
  130. /// - `shutdown`
  131. case connecting(ConnectingState)
  132. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  133. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  134. ///
  135. /// Valid next states:
  136. /// - `ready`
  137. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  138. /// to re-establish the connection)
  139. /// - `shutdown`
  140. case active(ConnectedState)
  141. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  142. /// the channel for making RPCs.
  143. ///
  144. /// Valid next states:
  145. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  146. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  147. /// - `shutdown`
  148. case ready(ReadyState)
  149. /// A `Channel` is desired, we'll attempt to create one in the future.
  150. ///
  151. /// Valid next states:
  152. /// - `connecting`
  153. /// - `shutdown`
  154. case transientFailure(TransientFailureState)
  155. /// We never want another `Channel`: this state is terminal.
  156. case shutdown(ShutdownState)
  157. fileprivate var label: String {
  158. switch self {
  159. case .idle:
  160. return "idle"
  161. case .connecting:
  162. return "connecting"
  163. case .active:
  164. return "active"
  165. case .ready:
  166. return "ready"
  167. case .transientFailure:
  168. return "transientFailure"
  169. case .shutdown:
  170. return "shutdown"
  171. }
  172. }
  173. }
  174. /// The last 'external' state we are in, a subset of the internal state.
  175. private var externalState: _ConnectivityState = .idle(nil)
  176. /// Update the external state, potentially notifying a delegate about the change.
  177. private func updateExternalState(to nextState: _ConnectivityState) {
  178. if !self.externalState.isSameState(as: nextState) {
  179. let oldState = self.externalState
  180. self.externalState = nextState
  181. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  182. }
  183. }
  184. /// Our current state.
  185. private var state: State {
  186. didSet {
  187. switch self.state {
  188. case let .idle(error):
  189. self.updateExternalState(to: .idle(error))
  190. self.updateConnectionID()
  191. case .connecting:
  192. self.updateExternalState(to: .connecting)
  193. // This is an internal state.
  194. case .active:
  195. ()
  196. case .ready:
  197. self.updateExternalState(to: .ready)
  198. case let .transientFailure(state):
  199. self.updateExternalState(to: .transientFailure(state.reason))
  200. self.updateConnectionID()
  201. case .shutdown:
  202. self.updateExternalState(to: .shutdown)
  203. }
  204. }
  205. }
  206. /// Returns whether the state is 'idle'.
  207. private var isIdle: Bool {
  208. self.eventLoop.assertInEventLoop()
  209. switch self.state {
  210. case .idle:
  211. return true
  212. case .connecting, .transientFailure, .active, .ready, .shutdown:
  213. return false
  214. }
  215. }
  216. /// Returns whether the state is 'shutdown'.
  217. private var isShutdown: Bool {
  218. self.eventLoop.assertInEventLoop()
  219. switch self.state {
  220. case .shutdown:
  221. return true
  222. case .idle, .connecting, .transientFailure, .active, .ready:
  223. return false
  224. }
  225. }
  226. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  227. private var multiplexer: HTTP2StreamMultiplexer? {
  228. self.eventLoop.assertInEventLoop()
  229. switch self.state {
  230. case let .ready(state):
  231. return state.multiplexer
  232. case .idle, .connecting, .transientFailure, .active, .shutdown:
  233. return nil
  234. }
  235. }
  236. /// The `EventLoop` that the managed connection will run on.
  237. internal let eventLoop: EventLoop
  238. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  239. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  240. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  241. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  242. /// An `EventLoopFuture<Channel>` provider.
  243. private let channelProvider: ConnectionManagerChannelProvider
  244. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  245. /// multiplexer.
  246. private let callStartBehavior: CallStartBehavior.Behavior
  247. /// The configuration to use when backing off between connection attempts, if reconnection
  248. /// attempts should be made at all.
  249. private let connectionBackoff: ConnectionBackoff?
  250. /// A logger.
  251. internal var logger: Logger
  252. private let connectionID: String
  253. private var channelNumber: UInt64
  254. private var channelNumberLock = NIOLock()
  255. private var _connectionIDAndNumber: String {
  256. return "\(self.connectionID)/\(self.channelNumber)"
  257. }
  258. private var connectionIDAndNumber: String {
  259. return self.channelNumberLock.withLock {
  260. return self._connectionIDAndNumber
  261. }
  262. }
  263. private func updateConnectionID() {
  264. self.channelNumberLock.withLock {
  265. self.channelNumber &+= 1
  266. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  267. }
  268. }
  269. internal func appendMetadata(to logger: inout Logger) {
  270. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  271. }
  272. internal convenience init(
  273. configuration: ClientConnection.Configuration,
  274. channelProvider: ConnectionManagerChannelProvider? = nil,
  275. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  276. logger: Logger
  277. ) {
  278. self.init(
  279. eventLoop: configuration.eventLoopGroup.next(),
  280. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  281. callStartBehavior: configuration.callStartBehavior.wrapped,
  282. connectionBackoff: configuration.connectionBackoff,
  283. connectivityDelegate: connectivityDelegate,
  284. http2Delegate: nil,
  285. logger: logger
  286. )
  287. }
  288. internal init(
  289. eventLoop: EventLoop,
  290. channelProvider: ConnectionManagerChannelProvider,
  291. callStartBehavior: CallStartBehavior.Behavior,
  292. connectionBackoff: ConnectionBackoff?,
  293. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  294. http2Delegate: ConnectionManagerHTTP2Delegate?,
  295. logger: Logger
  296. ) {
  297. // Setup the logger.
  298. var logger = logger
  299. let connectionID = UUID().uuidString
  300. let channelNumber: UInt64 = 0
  301. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  302. self.eventLoop = eventLoop
  303. self.state = .idle(lastError: nil)
  304. self.channelProvider = channelProvider
  305. self.callStartBehavior = callStartBehavior
  306. self.connectionBackoff = connectionBackoff
  307. self.connectivityDelegate = connectivityDelegate
  308. self.http2Delegate = http2Delegate
  309. self.connectionID = connectionID
  310. self.channelNumber = channelNumber
  311. self.logger = logger
  312. }
  313. /// Get the multiplexer from the underlying channel handling gRPC calls.
  314. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  315. /// one chance to connect - if not reconnections are managed here.
  316. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  317. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  318. switch self.callStartBehavior {
  319. case .waitsForConnectivity:
  320. return self.getHTTP2MultiplexerPatient()
  321. case .fastFailure:
  322. return self.getHTTP2MultiplexerOptimistic()
  323. }
  324. }
  325. if self.eventLoop.inEventLoop {
  326. return getHTTP2Multiplexer0()
  327. } else {
  328. return self.eventLoop.flatSubmit {
  329. getHTTP2Multiplexer0()
  330. }
  331. }
  332. }
  333. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  334. /// Reconnects are handled if necessary.
  335. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  336. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  337. switch self.state {
  338. case .idle:
  339. self.startConnecting()
  340. // We started connecting so we must transition to the `connecting` state.
  341. guard case let .connecting(connecting) = self.state else {
  342. self.unreachableState()
  343. }
  344. multiplexer = connecting.readyChannelMuxPromise.futureResult
  345. case let .connecting(state):
  346. multiplexer = state.readyChannelMuxPromise.futureResult
  347. case let .active(state):
  348. multiplexer = state.readyChannelMuxPromise.futureResult
  349. case let .ready(state):
  350. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  351. case let .transientFailure(state):
  352. multiplexer = state.readyChannelMuxPromise.futureResult
  353. case let .shutdown(state):
  354. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  355. }
  356. self.logger.debug("vending multiplexer future", metadata: [
  357. "connectivity_state": "\(self.state.label)",
  358. ])
  359. return multiplexer
  360. }
  361. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  362. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  363. ///
  364. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  365. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  366. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  367. self.eventLoop.preconditionInEventLoop()
  368. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  369. switch self.state {
  370. case .idle:
  371. self.startConnecting()
  372. // We started connecting so we must transition to the `connecting` state.
  373. guard case let .connecting(connecting) = self.state else {
  374. self.unreachableState()
  375. }
  376. return connecting.candidateMuxPromise.futureResult
  377. case let .connecting(state):
  378. return state.candidateMuxPromise.futureResult
  379. case let .active(active):
  380. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  381. case let .ready(ready):
  382. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  383. case let .transientFailure(state):
  384. return self.eventLoop.makeFailedFuture(state.reason)
  385. case let .shutdown(state):
  386. return self.eventLoop.makeFailedFuture(state.reason)
  387. }
  388. }()
  389. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  390. "connectivity_state": "\(self.state.label)",
  391. ])
  392. return muxFuture
  393. }
  394. @usableFromInline
  395. internal enum ShutdownMode {
  396. /// Closes the underlying channel without waiting for existing RPCs to complete.
  397. case forceful
  398. /// Allows running RPCs to run their course before closing the underlying channel. No new
  399. /// streams may be created.
  400. case graceful(NIODeadline)
  401. }
  402. /// Shutdown the underlying connection.
  403. ///
  404. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  405. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  406. let promise = self.eventLoop.makePromise(of: Void.self)
  407. self.shutdown(mode: mode, promise: promise)
  408. return promise.futureResult
  409. }
  410. /// Shutdown the underlying connection.
  411. ///
  412. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  413. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  414. if self.eventLoop.inEventLoop {
  415. self._shutdown(mode: mode, promise: promise)
  416. } else {
  417. self.eventLoop.execute {
  418. self._shutdown(mode: mode, promise: promise)
  419. }
  420. }
  421. }
  422. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  423. self.logger.debug("shutting down connection", metadata: [
  424. "connectivity_state": "\(self.state.label)",
  425. "shutdown.mode": "\(mode)",
  426. ])
  427. switch self.state {
  428. // We don't have a channel and we don't want one, easy!
  429. case .idle:
  430. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  431. self.state = .shutdown(shutdown)
  432. promise.succeed(())
  433. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  434. // the shutdown future and deal with any fallout from the connecting channel without the
  435. // application knowing.
  436. case let .connecting(state):
  437. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  438. self.state = .shutdown(shutdown)
  439. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  440. // connect the application shouldn't have access to the channel or multiplexer.
  441. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  442. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  443. // Complete the shutdown promise when the connection attempt has completed.
  444. state.candidate.whenComplete {
  445. switch $0 {
  446. case let .success(channel):
  447. // In case we do successfully connect, close on the next loop tick. When connecting a
  448. // channel NIO will complete the promise for the channel before firing channel active.
  449. // That means we may close and fire inactive before active which HTTP/2 will be unhappy
  450. // about.
  451. self.eventLoop.execute {
  452. channel.close(mode: .all, promise: nil)
  453. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  454. }
  455. case .failure:
  456. // We failed to connect, that's fine we still shutdown successfully.
  457. promise.succeed(())
  458. }
  459. }
  460. // We have an active channel but the application doesn't know about it yet. We'll do the same
  461. // as for `.connecting`.
  462. case let .active(state):
  463. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  464. self.state = .shutdown(shutdown)
  465. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  466. // connect the application shouldn't have access to the channel or multiplexer.
  467. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  468. // We have a channel, close it. We only create streams in the ready state so there's no need
  469. // to quiesce here.
  470. state.candidate.close(mode: .all, promise: nil)
  471. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  472. // The channel is up and running: the application could be using it. We can close it and
  473. // return the `closeFuture`.
  474. case let .ready(state):
  475. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  476. self.state = .shutdown(shutdown)
  477. switch mode {
  478. case .forceful:
  479. // We have a channel, close it.
  480. state.channel.close(mode: .all, promise: nil)
  481. case let .graceful(deadline):
  482. // If we don't close by the deadline forcibly close the channel.
  483. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  484. self.logger.info("shutdown timer expired, forcibly closing connection")
  485. state.channel.close(mode: .all, promise: nil)
  486. }
  487. // Cancel the force close if we close normally first.
  488. state.channel.closeFuture.whenComplete { _ in
  489. scheduledForceClose.cancel()
  490. }
  491. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  492. // the channel when all streams have been closed.
  493. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  494. }
  495. // Complete the promise when we eventually close.
  496. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  497. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  498. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  499. // if we cancelled too late.
  500. case let .transientFailure(state):
  501. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  502. self.state = .shutdown(shutdown)
  503. // Stop the creation of a new channel, if we can. If we can't then the task to
  504. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  505. state.scheduled.cancel()
  506. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  507. // connect the application shouldn't should have access to the channel.
  508. state.readyChannelMuxPromise.fail(shutdown.reason)
  509. // No active channel, so complete the shutdown promise now.
  510. promise.succeed(())
  511. // We're already shutdown; there's nothing to do.
  512. case let .shutdown(state):
  513. promise.completeWith(state.closeFuture)
  514. }
  515. }
  516. /// Registers a callback which fires when the current active connection is closed.
  517. ///
  518. /// If there is a connection, the callback will be invoked with `true` when the connection is
  519. /// closed. Otherwise the callback is invoked with `false`.
  520. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  521. if self.eventLoop.inEventLoop {
  522. self._onCurrentConnectionClose(onClose)
  523. } else {
  524. self.eventLoop.execute {
  525. self._onCurrentConnectionClose(onClose)
  526. }
  527. }
  528. }
  529. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  530. self.eventLoop.assertInEventLoop()
  531. switch self.state {
  532. case let .ready(state):
  533. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  534. case .idle, .connecting, .active, .transientFailure, .shutdown:
  535. onClose(false)
  536. }
  537. }
  538. // MARK: - State changes from the channel handler.
  539. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  540. /// some context.
  541. internal func channelError(_ error: Error) {
  542. self.eventLoop.preconditionInEventLoop()
  543. switch self.state {
  544. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  545. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  546. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  547. case .idle:
  548. self.logger.warning("ignoring unexpected error in idle", metadata: [
  549. MetadataKey.error: "\(error)",
  550. ])
  551. case .connecting:
  552. self.connectionFailed(withError: error)
  553. case var .active(state):
  554. state.error = error
  555. self.state = .active(state)
  556. case var .ready(state):
  557. state.error = error
  558. self.state = .ready(state)
  559. // If we've already in one of these states, then additional errors aren't helpful to us.
  560. case .transientFailure, .shutdown:
  561. ()
  562. }
  563. }
  564. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  565. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  566. self.eventLoop.preconditionInEventLoop()
  567. self.logger.debug("activating connection", metadata: [
  568. "connectivity_state": "\(self.state.label)",
  569. ])
  570. switch self.state {
  571. case let .connecting(connecting):
  572. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  573. self.state = .active(connected)
  574. // Optimistic connections are happy this this level of setup.
  575. connecting.candidateMuxPromise.succeed(multiplexer)
  576. // Application called shutdown before the channel become active; we should close it.
  577. case .shutdown:
  578. channel.close(mode: .all, promise: nil)
  579. case .idle, .transientFailure:
  580. // Received a channelActive when not connecting. Can happen if channelActive and
  581. // channelInactive are reordered. Ignore.
  582. ()
  583. case .active, .ready:
  584. // Received a second 'channelActive', already active so ignore.
  585. ()
  586. }
  587. }
  588. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  589. /// Must be called on the `EventLoop`.
  590. internal func channelInactive() {
  591. self.eventLoop.preconditionInEventLoop()
  592. self.logger.debug("deactivating connection", metadata: [
  593. "connectivity_state": "\(self.state.label)",
  594. ])
  595. switch self.state {
  596. // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
  597. // common but we should tolerate it.
  598. case let .connecting(connecting):
  599. // Should we try connecting again?
  600. switch connecting.reconnect {
  601. // No, shutdown instead.
  602. case .none:
  603. self.logger.debug("shutting down connection")
  604. let error = GRPCStatus(
  605. code: .unavailable,
  606. message: "The connection was dropped and connection re-establishment is disabled"
  607. )
  608. let shutdownState = ShutdownState(
  609. closeFuture: self.eventLoop.makeSucceededFuture(()),
  610. reason: error
  611. )
  612. self.state = .shutdown(shutdownState)
  613. // Shutting down, so fail the outstanding promises.
  614. connecting.readyChannelMuxPromise.fail(error)
  615. connecting.candidateMuxPromise.fail(error)
  616. // Yes, after some time.
  617. case let .after(delay):
  618. let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
  619. // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
  620. connecting.candidateMuxPromise.fail(error)
  621. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  622. self.startConnecting()
  623. }
  624. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  625. self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
  626. }
  627. // The channel is `active` but not `ready`. Should we try again?
  628. case let .active(active):
  629. switch active.reconnect {
  630. // No, shutdown instead.
  631. case .none:
  632. self.logger.debug("shutting down connection")
  633. let error = GRPCStatus(
  634. code: .unavailable,
  635. message: "The connection was dropped and connection re-establishment is disabled"
  636. )
  637. let shutdownState = ShutdownState(
  638. closeFuture: self.eventLoop.makeSucceededFuture(()),
  639. reason: error
  640. )
  641. self.state = .shutdown(shutdownState)
  642. active.readyChannelMuxPromise.fail(error)
  643. // Yes, after some time.
  644. case let .after(delay):
  645. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  646. self.startConnecting()
  647. }
  648. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  649. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  650. }
  651. // The channel was ready and working fine but something went wrong. Should we try to replace
  652. // the channel?
  653. case let .ready(ready):
  654. // No, no backoff is configured.
  655. if self.connectionBackoff == nil {
  656. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  657. self.state = .shutdown(
  658. ShutdownState(
  659. closeFuture: ready.channel.closeFuture,
  660. reason: GRPCStatus(
  661. code: .unavailable,
  662. message: "The connection was dropped and a reconnect was not configured"
  663. )
  664. )
  665. )
  666. } else {
  667. // Yes, start connecting now. We should go via `transientFailure`, however.
  668. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  669. self.startConnecting()
  670. }
  671. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  672. let backoffIterator = self.connectionBackoff?.makeIterator()
  673. self.state = .transientFailure(TransientFailureState(
  674. from: ready,
  675. scheduled: scheduled,
  676. backoffIterator: backoffIterator
  677. ))
  678. }
  679. // This is fine: we expect the channel to become inactive after becoming idle.
  680. case .idle:
  681. ()
  682. // We're already shutdown, that's fine.
  683. case .shutdown:
  684. ()
  685. // Received 'channelInactive' twice; fine, ignore.
  686. case .transientFailure:
  687. ()
  688. }
  689. }
  690. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  691. /// called on the `EventLoop`.
  692. internal func ready() {
  693. self.eventLoop.preconditionInEventLoop()
  694. self.logger.debug("connection ready", metadata: [
  695. "connectivity_state": "\(self.state.label)",
  696. ])
  697. switch self.state {
  698. case let .active(connected):
  699. self.state = .ready(ReadyState(from: connected))
  700. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  701. case .shutdown:
  702. ()
  703. case .idle, .transientFailure:
  704. // No connection or connection attempt exists but connection was marked as ready. This is
  705. // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
  706. // error to.
  707. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  708. case .connecting:
  709. // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
  710. // mode.
  711. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  712. case .ready:
  713. // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
  714. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  715. }
  716. }
  717. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  718. /// the `EventLoop`.
  719. internal func idle() {
  720. self.eventLoop.preconditionInEventLoop()
  721. self.logger.debug("idling connection", metadata: [
  722. "connectivity_state": "\(self.state.label)",
  723. ])
  724. switch self.state {
  725. case let .active(state):
  726. // This state is reachable if the keepalive timer fires before we reach the ready state.
  727. self.state = .idle(lastError: state.error)
  728. state.readyChannelMuxPromise
  729. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  730. case let .ready(state):
  731. self.state = .idle(lastError: state.error)
  732. case .shutdown:
  733. // This is expected when the connection is closed by the user: when the channel becomes
  734. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  735. // 'channelInactive()'.
  736. ()
  737. case .idle, .transientFailure:
  738. // There's no connection to idle; ignore.
  739. ()
  740. case .connecting:
  741. // The idle watchdog is started when the connection is active, this shouldn't happen
  742. // in the connecting state. Ignore it in release mode.
  743. assertionFailure("tried to idle a connection in the \(self.state.label) state")
  744. }
  745. }
  746. internal func streamOpened() {
  747. self.eventLoop.assertInEventLoop()
  748. self.http2Delegate?.streamOpened(self)
  749. }
  750. internal func streamClosed() {
  751. self.eventLoop.assertInEventLoop()
  752. self.http2Delegate?.streamClosed(self)
  753. }
  754. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  755. self.eventLoop.assertInEventLoop()
  756. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  757. self, maxConcurrentStreams: maxConcurrentStreams
  758. )
  759. }
  760. /// The connection has started quiescing: notify the connectivity monitor of this.
  761. internal func beginQuiescing() {
  762. self.eventLoop.assertInEventLoop()
  763. self.connectivityDelegate?.connectionIsQuiescing(self)
  764. }
  765. }
  766. extension ConnectionManager {
  767. // A connection attempt failed; we never established a connection.
  768. private func connectionFailed(withError error: Error) {
  769. self.eventLoop.preconditionInEventLoop()
  770. switch self.state {
  771. case let .connecting(connecting):
  772. // Should we reconnect?
  773. switch connecting.reconnect {
  774. // No, shutdown.
  775. case .none:
  776. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  777. self.state = .shutdown(
  778. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  779. )
  780. connecting.readyChannelMuxPromise.fail(error)
  781. connecting.candidateMuxPromise.fail(error)
  782. // Yes, after a delay.
  783. case let .after(delay):
  784. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  785. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  786. self.startConnecting()
  787. }
  788. self.state = .transientFailure(
  789. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  790. )
  791. // Candidate mux users are not willing to wait.
  792. connecting.candidateMuxPromise.fail(error)
  793. }
  794. // The application must have called shutdown while we were trying to establish a connection
  795. // which was doomed to fail anyway. That's fine, we can ignore this.
  796. case .shutdown:
  797. ()
  798. // Connection attempt failed, but no connection attempt is in progress.
  799. case .idle, .active, .ready, .transientFailure:
  800. // Nothing we can do other than ignore in release mode.
  801. assertionFailure("connect promise failed in \(self.state.label) state")
  802. }
  803. }
  804. }
  805. extension ConnectionManager {
  806. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  807. // states. Must be called on the `EventLoop`.
  808. private func startConnecting() {
  809. self.eventLoop.assertInEventLoop()
  810. switch self.state {
  811. case .idle:
  812. let iterator = self.connectionBackoff?.makeIterator()
  813. self.startConnecting(
  814. backoffIterator: iterator,
  815. muxPromise: self.eventLoop.makePromise()
  816. )
  817. case let .transientFailure(pending):
  818. self.startConnecting(
  819. backoffIterator: pending.backoffIterator,
  820. muxPromise: pending.readyChannelMuxPromise
  821. )
  822. // We shutdown before a scheduled connection attempt had started.
  823. case .shutdown:
  824. ()
  825. // We only call startConnecting() if the connection does not exist and after checking what the
  826. // current state is, so none of these states should be reachable.
  827. case .connecting:
  828. self.unreachableState()
  829. case .active:
  830. self.unreachableState()
  831. case .ready:
  832. self.unreachableState()
  833. }
  834. }
  835. private func startConnecting(
  836. backoffIterator: ConnectionBackoffIterator?,
  837. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  838. ) {
  839. let timeoutAndBackoff = backoffIterator?.next()
  840. // We're already on the event loop: submit the connect so it starts after we've made the
  841. // state change to `.connecting`.
  842. self.eventLoop.assertInEventLoop()
  843. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  844. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  845. managedBy: self,
  846. onEventLoop: self.eventLoop,
  847. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  848. logger: self.logger
  849. )
  850. channel.whenFailure { error in
  851. self.connectionFailed(withError: error)
  852. }
  853. return channel
  854. }
  855. // Should we reconnect if the candidate channel fails?
  856. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  857. let connecting = ConnectingState(
  858. backoffIterator: backoffIterator,
  859. reconnect: reconnect,
  860. candidate: candidate,
  861. readyChannelMuxPromise: muxPromise,
  862. candidateMuxPromise: self.eventLoop.makePromise()
  863. )
  864. self.state = .connecting(connecting)
  865. }
  866. }
  867. extension ConnectionManager {
  868. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  869. /// executing on the same `EventLoop` as the connection manager.
  870. internal var sync: Sync {
  871. return Sync(self)
  872. }
  873. internal struct Sync {
  874. private let manager: ConnectionManager
  875. fileprivate init(_ manager: ConnectionManager) {
  876. self.manager = manager
  877. }
  878. /// A delegate for connectivity changes.
  879. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  880. get {
  881. self.manager.eventLoop.assertInEventLoop()
  882. return self.manager.connectivityDelegate
  883. }
  884. nonmutating set {
  885. self.manager.eventLoop.assertInEventLoop()
  886. self.manager.connectivityDelegate = newValue
  887. }
  888. }
  889. /// A delegate for HTTP/2 connection changes.
  890. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  891. get {
  892. self.manager.eventLoop.assertInEventLoop()
  893. return self.manager.http2Delegate
  894. }
  895. nonmutating set {
  896. self.manager.eventLoop.assertInEventLoop()
  897. self.manager.http2Delegate = newValue
  898. }
  899. }
  900. /// Returns `true` if the connection is in the idle state.
  901. internal var isIdle: Bool {
  902. return self.manager.isIdle
  903. }
  904. /// Returne `true` if the connection is in the shutdown state.
  905. internal var isShutdown: Bool {
  906. return self.manager.isShutdown
  907. }
  908. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  909. /// other state.
  910. internal var multiplexer: HTTP2StreamMultiplexer? {
  911. return self.manager.multiplexer
  912. }
  913. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  914. internal func startConnecting() {
  915. self.manager.startConnecting()
  916. }
  917. }
  918. }
  919. extension ConnectionManager {
  920. private func unreachableState(
  921. function: StaticString = #function,
  922. file: StaticString = #fileID,
  923. line: UInt = #line
  924. ) -> Never {
  925. fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
  926. }
  927. }