ConnectionManager.swift 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  22. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  23. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  24. // event loop is being used.
  25. @usableFromInline
  26. internal final class ConnectionManager: @unchecked Sendable {
  27. internal enum Reconnect {
  28. case none
  29. case after(TimeInterval)
  30. }
  31. internal struct ConnectingState {
  32. var backoffIterator: ConnectionBackoffIterator?
  33. var reconnect: Reconnect
  34. var candidate: EventLoopFuture<Channel>
  35. var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
  36. var candidateMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
  37. }
  38. internal struct ConnectedState {
  39. var backoffIterator: ConnectionBackoffIterator?
  40. var reconnect: Reconnect
  41. var candidate: Channel
  42. var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
  43. var multiplexer: NIOHTTP2Handler.StreamMultiplexer
  44. var error: Error?
  45. init(
  46. from state: ConnectingState,
  47. candidate: Channel,
  48. multiplexer: NIOHTTP2Handler.StreamMultiplexer
  49. ) {
  50. self.backoffIterator = state.backoffIterator
  51. self.reconnect = state.reconnect
  52. self.candidate = candidate
  53. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  54. self.multiplexer = multiplexer
  55. }
  56. }
  57. internal struct ReadyState {
  58. var channel: Channel
  59. var multiplexer: NIOHTTP2Handler.StreamMultiplexer
  60. var error: Error?
  61. init(from state: ConnectedState) {
  62. self.channel = state.candidate
  63. self.multiplexer = state.multiplexer
  64. }
  65. }
  66. internal struct TransientFailureState {
  67. var backoffIterator: ConnectionBackoffIterator?
  68. var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
  69. var scheduled: Scheduled<Void>
  70. var reason: Error
  71. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
  72. self.backoffIterator = state.backoffIterator
  73. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  74. self.scheduled = scheduled
  75. self.reason = reason ?? GRPCStatus(
  76. code: .unavailable,
  77. message: "Unexpected connection drop"
  78. )
  79. }
  80. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  81. self.backoffIterator = state.backoffIterator
  82. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  83. self.scheduled = scheduled
  84. self.reason = state.error ?? GRPCStatus(
  85. code: .unavailable,
  86. message: "Unexpected connection drop"
  87. )
  88. }
  89. init(
  90. from state: ReadyState,
  91. scheduled: Scheduled<Void>,
  92. backoffIterator: ConnectionBackoffIterator?
  93. ) {
  94. self.backoffIterator = backoffIterator
  95. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  96. self.scheduled = scheduled
  97. self.reason = state.error ?? GRPCStatus(
  98. code: .unavailable,
  99. message: "Unexpected connection drop"
  100. )
  101. }
  102. }
  103. internal struct ShutdownState {
  104. var closeFuture: EventLoopFuture<Void>
  105. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  106. /// this error.
  107. var reason: Error
  108. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  109. self.closeFuture = closeFuture
  110. self.reason = reason
  111. }
  112. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  113. return ShutdownState(
  114. closeFuture: closeFuture,
  115. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  116. )
  117. }
  118. }
  119. internal enum State {
  120. /// No `Channel` is required.
  121. ///
  122. /// Valid next states:
  123. /// - `connecting`
  124. /// - `shutdown`
  125. case idle(lastError: Error?)
  126. /// We're actively trying to establish a connection.
  127. ///
  128. /// Valid next states:
  129. /// - `active`
  130. /// - `transientFailure` (if our attempt fails and we're going to try again)
  131. /// - `shutdown`
  132. case connecting(ConnectingState)
  133. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  134. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  135. ///
  136. /// Valid next states:
  137. /// - `ready`
  138. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  139. /// to re-establish the connection)
  140. /// - `shutdown`
  141. case active(ConnectedState)
  142. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  143. /// the channel for making RPCs.
  144. ///
  145. /// Valid next states:
  146. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  147. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  148. /// - `shutdown`
  149. case ready(ReadyState)
  150. /// A `Channel` is desired, we'll attempt to create one in the future.
  151. ///
  152. /// Valid next states:
  153. /// - `connecting`
  154. /// - `shutdown`
  155. case transientFailure(TransientFailureState)
  156. /// We never want another `Channel`: this state is terminal.
  157. case shutdown(ShutdownState)
  158. fileprivate var label: String {
  159. switch self {
  160. case .idle:
  161. return "idle"
  162. case .connecting:
  163. return "connecting"
  164. case .active:
  165. return "active"
  166. case .ready:
  167. return "ready"
  168. case .transientFailure:
  169. return "transientFailure"
  170. case .shutdown:
  171. return "shutdown"
  172. }
  173. }
  174. }
  175. /// The last 'external' state we are in, a subset of the internal state.
  176. private var externalState: _ConnectivityState = .idle(nil)
  177. /// Update the external state, potentially notifying a delegate about the change.
  178. private func updateExternalState(to nextState: _ConnectivityState) {
  179. if !self.externalState.isSameState(as: nextState) {
  180. let oldState = self.externalState
  181. self.externalState = nextState
  182. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  183. }
  184. }
  185. /// Our current state.
  186. private var state: State {
  187. didSet {
  188. switch self.state {
  189. case let .idle(error):
  190. self.updateExternalState(to: .idle(error))
  191. self.updateConnectionID()
  192. case .connecting:
  193. self.updateExternalState(to: .connecting)
  194. // This is an internal state.
  195. case .active:
  196. ()
  197. case .ready:
  198. self.updateExternalState(to: .ready)
  199. case let .transientFailure(state):
  200. self.updateExternalState(to: .transientFailure(state.reason))
  201. self.updateConnectionID()
  202. case .shutdown:
  203. self.updateExternalState(to: .shutdown)
  204. }
  205. }
  206. }
  207. /// Returns whether the state is 'idle'.
  208. private var isIdle: Bool {
  209. self.eventLoop.assertInEventLoop()
  210. switch self.state {
  211. case .idle:
  212. return true
  213. case .connecting, .transientFailure, .active, .ready, .shutdown:
  214. return false
  215. }
  216. }
  217. /// Returns whether the state is 'shutdown'.
  218. private var isShutdown: Bool {
  219. self.eventLoop.assertInEventLoop()
  220. switch self.state {
  221. case .shutdown:
  222. return true
  223. case .idle, .connecting, .transientFailure, .active, .ready:
  224. return false
  225. }
  226. }
  227. /// Returns the `NIOHTTP2Handler.StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  228. private var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
  229. self.eventLoop.assertInEventLoop()
  230. switch self.state {
  231. case let .ready(state):
  232. return state.multiplexer
  233. case .idle, .connecting, .transientFailure, .active, .shutdown:
  234. return nil
  235. }
  236. }
  237. /// The `EventLoop` that the managed connection will run on.
  238. internal let eventLoop: EventLoop
  239. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  240. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  241. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  242. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  243. /// An `EventLoopFuture<Channel>` provider.
  244. private let channelProvider: ConnectionManagerChannelProvider
  245. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  246. /// multiplexer.
  247. private let callStartBehavior: CallStartBehavior.Behavior
  248. /// The configuration to use when backing off between connection attempts, if reconnection
  249. /// attempts should be made at all.
  250. private let connectionBackoff: ConnectionBackoff?
  251. /// A logger.
  252. internal var logger: Logger
  253. private let connectionID: String
  254. private var channelNumber: UInt64
  255. private var channelNumberLock = NIOLock()
  256. private var _connectionIDAndNumber: String {
  257. return "\(self.connectionID)/\(self.channelNumber)"
  258. }
  259. private var connectionIDAndNumber: String {
  260. return self.channelNumberLock.withLock {
  261. return self._connectionIDAndNumber
  262. }
  263. }
  264. private func updateConnectionID() {
  265. self.channelNumberLock.withLock {
  266. self.channelNumber &+= 1
  267. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  268. }
  269. }
  270. internal func appendMetadata(to logger: inout Logger) {
  271. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  272. }
  273. internal convenience init(
  274. configuration: ClientConnection.Configuration,
  275. channelProvider: ConnectionManagerChannelProvider? = nil,
  276. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  277. logger: Logger
  278. ) {
  279. self.init(
  280. eventLoop: configuration.eventLoopGroup.next(),
  281. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  282. callStartBehavior: configuration.callStartBehavior.wrapped,
  283. connectionBackoff: configuration.connectionBackoff,
  284. connectivityDelegate: connectivityDelegate,
  285. http2Delegate: nil,
  286. logger: logger
  287. )
  288. }
  289. internal init(
  290. eventLoop: EventLoop,
  291. channelProvider: ConnectionManagerChannelProvider,
  292. callStartBehavior: CallStartBehavior.Behavior,
  293. connectionBackoff: ConnectionBackoff?,
  294. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  295. http2Delegate: ConnectionManagerHTTP2Delegate?,
  296. logger: Logger
  297. ) {
  298. // Setup the logger.
  299. var logger = logger
  300. let connectionID = UUID().uuidString
  301. let channelNumber: UInt64 = 0
  302. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  303. self.eventLoop = eventLoop
  304. self.state = .idle(lastError: nil)
  305. self.channelProvider = channelProvider
  306. self.callStartBehavior = callStartBehavior
  307. self.connectionBackoff = connectionBackoff
  308. self.connectivityDelegate = connectivityDelegate
  309. self.http2Delegate = http2Delegate
  310. self.connectionID = connectionID
  311. self.channelNumber = channelNumber
  312. self.logger = logger
  313. }
  314. /// Get the multiplexer from the underlying channel handling gRPC calls.
  315. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  316. /// one chance to connect - if not reconnections are managed here.
  317. internal func getHTTP2Multiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
  318. func getHTTP2Multiplexer0() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
  319. switch self.callStartBehavior {
  320. case .waitsForConnectivity:
  321. return self.getHTTP2MultiplexerPatient()
  322. case .fastFailure:
  323. return self.getHTTP2MultiplexerOptimistic()
  324. }
  325. }
  326. if self.eventLoop.inEventLoop {
  327. return getHTTP2Multiplexer0()
  328. } else {
  329. return self.eventLoop.flatSubmit {
  330. getHTTP2Multiplexer0()
  331. }
  332. }
  333. }
  334. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  335. /// Reconnects are handled if necessary.
  336. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
  337. let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer>
  338. switch self.state {
  339. case .idle:
  340. self.startConnecting()
  341. // We started connecting so we must transition to the `connecting` state.
  342. guard case let .connecting(connecting) = self.state else {
  343. self.unreachableState()
  344. }
  345. multiplexer = connecting.readyChannelMuxPromise.futureResult
  346. case let .connecting(state):
  347. multiplexer = state.readyChannelMuxPromise.futureResult
  348. case let .active(state):
  349. multiplexer = state.readyChannelMuxPromise.futureResult
  350. case let .ready(state):
  351. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  352. case let .transientFailure(state):
  353. multiplexer = state.readyChannelMuxPromise.futureResult
  354. case let .shutdown(state):
  355. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  356. }
  357. self.logger.debug("vending multiplexer future", metadata: [
  358. "connectivity_state": "\(self.state.label)",
  359. ])
  360. return multiplexer
  361. }
  362. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  363. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  364. ///
  365. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  366. private func getHTTP2MultiplexerOptimistic()
  367. -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
  368. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  369. self.eventLoop.preconditionInEventLoop()
  370. let muxFuture: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = { () in
  371. switch self.state {
  372. case .idle:
  373. self.startConnecting()
  374. // We started connecting so we must transition to the `connecting` state.
  375. guard case let .connecting(connecting) = self.state else {
  376. self.unreachableState()
  377. }
  378. return connecting.candidateMuxPromise.futureResult
  379. case let .connecting(state):
  380. return state.candidateMuxPromise.futureResult
  381. case let .active(active):
  382. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  383. case let .ready(ready):
  384. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  385. case let .transientFailure(state):
  386. return self.eventLoop.makeFailedFuture(state.reason)
  387. case let .shutdown(state):
  388. return self.eventLoop.makeFailedFuture(state.reason)
  389. }
  390. }()
  391. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  392. "connectivity_state": "\(self.state.label)",
  393. ])
  394. return muxFuture
  395. }
  396. @usableFromInline
  397. internal enum ShutdownMode {
  398. /// Closes the underlying channel without waiting for existing RPCs to complete.
  399. case forceful
  400. /// Allows running RPCs to run their course before closing the underlying channel. No new
  401. /// streams may be created.
  402. case graceful(NIODeadline)
  403. }
  404. /// Shutdown the underlying connection.
  405. ///
  406. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  407. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  408. let promise = self.eventLoop.makePromise(of: Void.self)
  409. self.shutdown(mode: mode, promise: promise)
  410. return promise.futureResult
  411. }
  412. /// Shutdown the underlying connection.
  413. ///
  414. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  415. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  416. if self.eventLoop.inEventLoop {
  417. self._shutdown(mode: mode, promise: promise)
  418. } else {
  419. self.eventLoop.execute {
  420. self._shutdown(mode: mode, promise: promise)
  421. }
  422. }
  423. }
  424. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  425. self.logger.debug("shutting down connection", metadata: [
  426. "connectivity_state": "\(self.state.label)",
  427. "shutdown.mode": "\(mode)",
  428. ])
  429. switch self.state {
  430. // We don't have a channel and we don't want one, easy!
  431. case .idle:
  432. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  433. self.state = .shutdown(shutdown)
  434. promise.succeed(())
  435. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  436. // the shutdown future and deal with any fallout from the connecting channel without the
  437. // application knowing.
  438. case let .connecting(state):
  439. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  440. self.state = .shutdown(shutdown)
  441. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  442. // connect the application shouldn't have access to the channel or multiplexer.
  443. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  444. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  445. // Complete the shutdown promise when the connection attempt has completed.
  446. state.candidate.whenComplete {
  447. switch $0 {
  448. case let .success(channel):
  449. // In case we do successfully connect, close on the next loop tick. When connecting a
  450. // channel NIO will complete the promise for the channel before firing channel active.
  451. // That means we may close and fire inactive before active which HTTP/2 will be unhappy
  452. // about.
  453. self.eventLoop.execute {
  454. channel.close(mode: .all, promise: nil)
  455. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  456. }
  457. case .failure:
  458. // We failed to connect, that's fine we still shutdown successfully.
  459. promise.succeed(())
  460. }
  461. }
  462. // We have an active channel but the application doesn't know about it yet. We'll do the same
  463. // as for `.connecting`.
  464. case let .active(state):
  465. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  466. self.state = .shutdown(shutdown)
  467. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  468. // connect the application shouldn't have access to the channel or multiplexer.
  469. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  470. // We have a channel, close it. We only create streams in the ready state so there's no need
  471. // to quiesce here.
  472. state.candidate.close(mode: .all, promise: nil)
  473. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  474. // The channel is up and running: the application could be using it. We can close it and
  475. // return the `closeFuture`.
  476. case let .ready(state):
  477. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  478. self.state = .shutdown(shutdown)
  479. switch mode {
  480. case .forceful:
  481. // We have a channel, close it.
  482. state.channel.close(mode: .all, promise: nil)
  483. case let .graceful(deadline):
  484. // If we don't close by the deadline forcibly close the channel.
  485. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  486. self.logger.info("shutdown timer expired, forcibly closing connection")
  487. state.channel.close(mode: .all, promise: nil)
  488. }
  489. // Cancel the force close if we close normally first.
  490. state.channel.closeFuture.whenComplete { _ in
  491. scheduledForceClose.cancel()
  492. }
  493. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  494. // the channel when all streams have been closed.
  495. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  496. }
  497. // Complete the promise when we eventually close.
  498. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  499. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  500. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  501. // if we cancelled too late.
  502. case let .transientFailure(state):
  503. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  504. self.state = .shutdown(shutdown)
  505. // Stop the creation of a new channel, if we can. If we can't then the task to
  506. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  507. state.scheduled.cancel()
  508. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  509. // connect the application shouldn't should have access to the channel.
  510. state.readyChannelMuxPromise.fail(shutdown.reason)
  511. // No active channel, so complete the shutdown promise now.
  512. promise.succeed(())
  513. // We're already shutdown; there's nothing to do.
  514. case let .shutdown(state):
  515. promise.completeWith(state.closeFuture)
  516. }
  517. }
  518. /// Registers a callback which fires when the current active connection is closed.
  519. ///
  520. /// If there is a connection, the callback will be invoked with `true` when the connection is
  521. /// closed. Otherwise the callback is invoked with `false`.
  522. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  523. if self.eventLoop.inEventLoop {
  524. self._onCurrentConnectionClose(onClose)
  525. } else {
  526. self.eventLoop.execute {
  527. self._onCurrentConnectionClose(onClose)
  528. }
  529. }
  530. }
  531. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  532. self.eventLoop.assertInEventLoop()
  533. switch self.state {
  534. case let .ready(state):
  535. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  536. case .idle, .connecting, .active, .transientFailure, .shutdown:
  537. onClose(false)
  538. }
  539. }
  540. // MARK: - State changes from the channel handler.
  541. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  542. /// some context.
  543. internal func channelError(_ error: Error) {
  544. self.eventLoop.preconditionInEventLoop()
  545. switch self.state {
  546. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  547. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  548. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  549. case .idle:
  550. self.logger.warning("ignoring unexpected error in idle", metadata: [
  551. MetadataKey.error: "\(error)",
  552. ])
  553. case .connecting:
  554. self.connectionFailed(withError: error)
  555. case var .active(state):
  556. state.error = error
  557. self.state = .active(state)
  558. case var .ready(state):
  559. state.error = error
  560. self.state = .ready(state)
  561. // If we've already in one of these states, then additional errors aren't helpful to us.
  562. case .transientFailure, .shutdown:
  563. ()
  564. }
  565. }
  566. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  567. internal func channelActive(channel: Channel, multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
  568. self.eventLoop.preconditionInEventLoop()
  569. self.logger.debug("activating connection", metadata: [
  570. "connectivity_state": "\(self.state.label)",
  571. ])
  572. switch self.state {
  573. case let .connecting(connecting):
  574. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  575. self.state = .active(connected)
  576. // Optimistic connections are happy this this level of setup.
  577. connecting.candidateMuxPromise.succeed(multiplexer)
  578. // Application called shutdown before the channel become active; we should close it.
  579. case .shutdown:
  580. channel.close(mode: .all, promise: nil)
  581. case .idle, .transientFailure:
  582. // Received a channelActive when not connecting. Can happen if channelActive and
  583. // channelInactive are reordered. Ignore.
  584. ()
  585. case .active, .ready:
  586. // Received a second 'channelActive', already active so ignore.
  587. ()
  588. }
  589. }
  590. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  591. /// Must be called on the `EventLoop`.
  592. internal func channelInactive() {
  593. self.eventLoop.preconditionInEventLoop()
  594. self.logger.debug("deactivating connection", metadata: [
  595. "connectivity_state": "\(self.state.label)",
  596. ])
  597. switch self.state {
  598. // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
  599. // common but we should tolerate it.
  600. case let .connecting(connecting):
  601. // Should we try connecting again?
  602. switch connecting.reconnect {
  603. // No, shutdown instead.
  604. case .none:
  605. self.logger.debug("shutting down connection")
  606. let error = GRPCStatus(
  607. code: .unavailable,
  608. message: "The connection was dropped and connection re-establishment is disabled"
  609. )
  610. let shutdownState = ShutdownState(
  611. closeFuture: self.eventLoop.makeSucceededFuture(()),
  612. reason: error
  613. )
  614. self.state = .shutdown(shutdownState)
  615. // Shutting down, so fail the outstanding promises.
  616. connecting.readyChannelMuxPromise.fail(error)
  617. connecting.candidateMuxPromise.fail(error)
  618. // Yes, after some time.
  619. case let .after(delay):
  620. let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
  621. // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
  622. connecting.candidateMuxPromise.fail(error)
  623. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  624. self.startConnecting()
  625. }
  626. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  627. self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
  628. }
  629. // The channel is `active` but not `ready`. Should we try again?
  630. case let .active(active):
  631. switch active.reconnect {
  632. // No, shutdown instead.
  633. case .none:
  634. self.logger.debug("shutting down connection")
  635. let error = GRPCStatus(
  636. code: .unavailable,
  637. message: "The connection was dropped and connection re-establishment is disabled"
  638. )
  639. let shutdownState = ShutdownState(
  640. closeFuture: self.eventLoop.makeSucceededFuture(()),
  641. reason: error
  642. )
  643. self.state = .shutdown(shutdownState)
  644. active.readyChannelMuxPromise.fail(error)
  645. // Yes, after some time.
  646. case let .after(delay):
  647. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  648. self.startConnecting()
  649. }
  650. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  651. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  652. }
  653. // The channel was ready and working fine but something went wrong. Should we try to replace
  654. // the channel?
  655. case let .ready(ready):
  656. // No, no backoff is configured.
  657. if self.connectionBackoff == nil {
  658. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  659. self.state = .shutdown(
  660. ShutdownState(
  661. closeFuture: ready.channel.closeFuture,
  662. reason: GRPCStatus(
  663. code: .unavailable,
  664. message: "The connection was dropped and a reconnect was not configured"
  665. )
  666. )
  667. )
  668. } else {
  669. // Yes, start connecting now. We should go via `transientFailure`, however.
  670. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  671. self.startConnecting()
  672. }
  673. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  674. let backoffIterator = self.connectionBackoff?.makeIterator()
  675. self.state = .transientFailure(TransientFailureState(
  676. from: ready,
  677. scheduled: scheduled,
  678. backoffIterator: backoffIterator
  679. ))
  680. }
  681. // This is fine: we expect the channel to become inactive after becoming idle.
  682. case .idle:
  683. ()
  684. // We're already shutdown, that's fine.
  685. case .shutdown:
  686. ()
  687. // Received 'channelInactive' twice; fine, ignore.
  688. case .transientFailure:
  689. ()
  690. }
  691. }
  692. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  693. /// called on the `EventLoop`.
  694. internal func ready() {
  695. self.eventLoop.preconditionInEventLoop()
  696. self.logger.debug("connection ready", metadata: [
  697. "connectivity_state": "\(self.state.label)",
  698. ])
  699. switch self.state {
  700. case let .active(connected):
  701. self.state = .ready(ReadyState(from: connected))
  702. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  703. case .shutdown:
  704. ()
  705. case .idle, .transientFailure:
  706. // No connection or connection attempt exists but connection was marked as ready. This is
  707. // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
  708. // error to.
  709. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  710. case .connecting:
  711. // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
  712. // mode.
  713. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  714. case .ready:
  715. // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
  716. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  717. }
  718. }
  719. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  720. /// the `EventLoop`.
  721. internal func idle() {
  722. self.eventLoop.preconditionInEventLoop()
  723. self.logger.debug("idling connection", metadata: [
  724. "connectivity_state": "\(self.state.label)",
  725. ])
  726. switch self.state {
  727. case let .active(state):
  728. // This state is reachable if the keepalive timer fires before we reach the ready state.
  729. self.state = .idle(lastError: state.error)
  730. state.readyChannelMuxPromise
  731. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  732. case let .ready(state):
  733. self.state = .idle(lastError: state.error)
  734. case .shutdown:
  735. // This is expected when the connection is closed by the user: when the channel becomes
  736. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  737. // 'channelInactive()'.
  738. ()
  739. case .idle, .transientFailure:
  740. // There's no connection to idle; ignore.
  741. ()
  742. case .connecting:
  743. // The idle watchdog is started when the connection is active, this shouldn't happen
  744. // in the connecting state. Ignore it in release mode.
  745. assertionFailure("tried to idle a connection in the \(self.state.label) state")
  746. }
  747. }
  748. internal func streamOpened() {
  749. self.eventLoop.assertInEventLoop()
  750. self.http2Delegate?.streamOpened(self)
  751. }
  752. internal func streamClosed() {
  753. self.eventLoop.assertInEventLoop()
  754. self.http2Delegate?.streamClosed(self)
  755. }
  756. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  757. self.eventLoop.assertInEventLoop()
  758. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  759. self, maxConcurrentStreams: maxConcurrentStreams
  760. )
  761. }
  762. /// The connection has started quiescing: notify the connectivity monitor of this.
  763. internal func beginQuiescing() {
  764. self.eventLoop.assertInEventLoop()
  765. self.connectivityDelegate?.connectionIsQuiescing(self)
  766. }
  767. }
  768. extension ConnectionManager {
  769. // A connection attempt failed; we never established a connection.
  770. private func connectionFailed(withError error: Error) {
  771. self.eventLoop.preconditionInEventLoop()
  772. switch self.state {
  773. case let .connecting(connecting):
  774. // Should we reconnect?
  775. switch connecting.reconnect {
  776. // No, shutdown.
  777. case .none:
  778. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  779. self.state = .shutdown(
  780. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  781. )
  782. connecting.readyChannelMuxPromise.fail(error)
  783. connecting.candidateMuxPromise.fail(error)
  784. // Yes, after a delay.
  785. case let .after(delay):
  786. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  787. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  788. self.startConnecting()
  789. }
  790. self.state = .transientFailure(
  791. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  792. )
  793. // Candidate mux users are not willing to wait.
  794. connecting.candidateMuxPromise.fail(error)
  795. }
  796. // The application must have called shutdown while we were trying to establish a connection
  797. // which was doomed to fail anyway. That's fine, we can ignore this.
  798. case .shutdown:
  799. ()
  800. // Connection attempt failed, but no connection attempt is in progress.
  801. case .idle, .active, .ready, .transientFailure:
  802. // Nothing we can do other than ignore in release mode.
  803. assertionFailure("connect promise failed in \(self.state.label) state")
  804. }
  805. }
  806. }
  807. extension ConnectionManager {
  808. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  809. // states. Must be called on the `EventLoop`.
  810. private func startConnecting() {
  811. self.eventLoop.assertInEventLoop()
  812. switch self.state {
  813. case .idle:
  814. let iterator = self.connectionBackoff?.makeIterator()
  815. self.startConnecting(
  816. backoffIterator: iterator,
  817. muxPromise: self.eventLoop.makePromise()
  818. )
  819. case let .transientFailure(pending):
  820. self.startConnecting(
  821. backoffIterator: pending.backoffIterator,
  822. muxPromise: pending.readyChannelMuxPromise
  823. )
  824. // We shutdown before a scheduled connection attempt had started.
  825. case .shutdown:
  826. ()
  827. // We only call startConnecting() if the connection does not exist and after checking what the
  828. // current state is, so none of these states should be reachable.
  829. case .connecting:
  830. self.unreachableState()
  831. case .active:
  832. self.unreachableState()
  833. case .ready:
  834. self.unreachableState()
  835. }
  836. }
  837. private func startConnecting(
  838. backoffIterator: ConnectionBackoffIterator?,
  839. muxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
  840. ) {
  841. let timeoutAndBackoff = backoffIterator?.next()
  842. // We're already on the event loop: submit the connect so it starts after we've made the
  843. // state change to `.connecting`.
  844. self.eventLoop.assertInEventLoop()
  845. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  846. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  847. managedBy: self,
  848. onEventLoop: self.eventLoop,
  849. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  850. logger: self.logger
  851. )
  852. channel.whenFailure { error in
  853. self.connectionFailed(withError: error)
  854. }
  855. return channel
  856. }
  857. // Should we reconnect if the candidate channel fails?
  858. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  859. let connecting = ConnectingState(
  860. backoffIterator: backoffIterator,
  861. reconnect: reconnect,
  862. candidate: candidate,
  863. readyChannelMuxPromise: muxPromise,
  864. candidateMuxPromise: self.eventLoop.makePromise()
  865. )
  866. self.state = .connecting(connecting)
  867. }
  868. }
  869. extension ConnectionManager {
  870. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  871. /// executing on the same `EventLoop` as the connection manager.
  872. internal var sync: Sync {
  873. return Sync(self)
  874. }
  875. internal struct Sync {
  876. private let manager: ConnectionManager
  877. fileprivate init(_ manager: ConnectionManager) {
  878. self.manager = manager
  879. }
  880. /// A delegate for connectivity changes.
  881. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  882. get {
  883. self.manager.eventLoop.assertInEventLoop()
  884. return self.manager.connectivityDelegate
  885. }
  886. nonmutating set {
  887. self.manager.eventLoop.assertInEventLoop()
  888. self.manager.connectivityDelegate = newValue
  889. }
  890. }
  891. /// A delegate for HTTP/2 connection changes.
  892. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  893. get {
  894. self.manager.eventLoop.assertInEventLoop()
  895. return self.manager.http2Delegate
  896. }
  897. nonmutating set {
  898. self.manager.eventLoop.assertInEventLoop()
  899. self.manager.http2Delegate = newValue
  900. }
  901. }
  902. /// Returns `true` if the connection is in the idle state.
  903. internal var isIdle: Bool {
  904. return self.manager.isIdle
  905. }
  906. /// Returne `true` if the connection is in the shutdown state.
  907. internal var isShutdown: Bool {
  908. return self.manager.isShutdown
  909. }
  910. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  911. /// other state.
  912. internal var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
  913. return self.manager.multiplexer
  914. }
  915. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  916. internal func startConnecting() {
  917. self.manager.startConnecting()
  918. }
  919. }
  920. }
  921. extension ConnectionManager {
  922. private func unreachableState(
  923. function: StaticString = #function,
  924. file: StaticString = #fileID,
  925. line: UInt = #line
  926. ) -> Never {
  927. fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
  928. }
  929. }