ConnectionManager.swift 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  22. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  23. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  24. // event loop is being used.
  25. @usableFromInline
  26. internal final class ConnectionManager: @unchecked Sendable {
  27. /// Whether the connection managed by this manager should be allowed to go idle and be closed, or
  28. /// if it should remain open indefinitely even when there are no active streams.
  29. internal enum IdleBehavior {
  30. case closeWhenIdleTimeout
  31. case neverGoIdle
  32. }
  33. internal enum Reconnect {
  34. case none
  35. case after(TimeInterval)
  36. }
  37. internal struct ConnectingState {
  38. var backoffIterator: ConnectionBackoffIterator?
  39. var reconnect: Reconnect
  40. var connectError: Error?
  41. var candidate: EventLoopFuture<Channel>
  42. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  43. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  44. }
  45. internal struct ConnectedState {
  46. var backoffIterator: ConnectionBackoffIterator?
  47. var reconnect: Reconnect
  48. var candidate: Channel
  49. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  50. var multiplexer: HTTP2StreamMultiplexer
  51. var error: Error?
  52. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  53. self.backoffIterator = state.backoffIterator
  54. self.reconnect = state.reconnect
  55. self.candidate = candidate
  56. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  57. self.multiplexer = multiplexer
  58. }
  59. }
  60. internal struct ReadyState {
  61. var channel: Channel
  62. var multiplexer: HTTP2StreamMultiplexer
  63. var error: Error?
  64. init(from state: ConnectedState) {
  65. self.channel = state.candidate
  66. self.multiplexer = state.multiplexer
  67. }
  68. }
  69. internal struct TransientFailureState {
  70. var backoffIterator: ConnectionBackoffIterator?
  71. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  72. var scheduled: Scheduled<Void>
  73. var reason: Error
  74. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
  75. self.backoffIterator = state.backoffIterator
  76. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  77. self.scheduled = scheduled
  78. self.reason =
  79. reason
  80. ?? GRPCStatus(
  81. code: .unavailable,
  82. message: "Unexpected connection drop"
  83. )
  84. }
  85. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  86. self.backoffIterator = state.backoffIterator
  87. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  88. self.scheduled = scheduled
  89. self.reason =
  90. state.error
  91. ?? GRPCStatus(
  92. code: .unavailable,
  93. message: "Unexpected connection drop"
  94. )
  95. }
  96. init(
  97. from state: ReadyState,
  98. scheduled: Scheduled<Void>,
  99. backoffIterator: ConnectionBackoffIterator?
  100. ) {
  101. self.backoffIterator = backoffIterator
  102. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  103. self.scheduled = scheduled
  104. self.reason =
  105. state.error
  106. ?? GRPCStatus(
  107. code: .unavailable,
  108. message: "Unexpected connection drop"
  109. )
  110. }
  111. }
  112. internal struct ShutdownState {
  113. var closeFuture: EventLoopFuture<Void>
  114. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  115. /// this error.
  116. var reason: Error
  117. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  118. self.closeFuture = closeFuture
  119. self.reason = reason
  120. }
  121. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  122. return ShutdownState(
  123. closeFuture: closeFuture,
  124. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  125. )
  126. }
  127. }
  128. internal enum State {
  129. /// No `Channel` is required.
  130. ///
  131. /// Valid next states:
  132. /// - `connecting`
  133. /// - `shutdown`
  134. case idle(lastError: Error?)
  135. /// We're actively trying to establish a connection.
  136. ///
  137. /// Valid next states:
  138. /// - `active`
  139. /// - `transientFailure` (if our attempt fails and we're going to try again)
  140. /// - `shutdown`
  141. case connecting(ConnectingState)
  142. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  143. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  144. ///
  145. /// Valid next states:
  146. /// - `ready`
  147. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  148. /// to re-establish the connection)
  149. /// - `shutdown`
  150. case active(ConnectedState)
  151. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  152. /// the channel for making RPCs.
  153. ///
  154. /// Valid next states:
  155. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  156. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  157. /// - `shutdown`
  158. case ready(ReadyState)
  159. /// A `Channel` is desired, we'll attempt to create one in the future.
  160. ///
  161. /// Valid next states:
  162. /// - `connecting`
  163. /// - `shutdown`
  164. case transientFailure(TransientFailureState)
  165. /// We never want another `Channel`: this state is terminal.
  166. case shutdown(ShutdownState)
  167. fileprivate var label: String {
  168. switch self {
  169. case .idle:
  170. return "idle"
  171. case .connecting:
  172. return "connecting"
  173. case .active:
  174. return "active"
  175. case .ready:
  176. return "ready"
  177. case .transientFailure:
  178. return "transientFailure"
  179. case .shutdown:
  180. return "shutdown"
  181. }
  182. }
  183. }
  184. /// The last 'external' state we are in, a subset of the internal state.
  185. private var externalState: _ConnectivityState = .idle(nil)
  186. /// Update the external state, potentially notifying a delegate about the change.
  187. private func updateExternalState(to nextState: _ConnectivityState) {
  188. if !self.externalState.isSameState(as: nextState) {
  189. let oldState = self.externalState
  190. self.externalState = nextState
  191. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  192. }
  193. }
  194. /// Our current state.
  195. private var state: State {
  196. didSet {
  197. switch self.state {
  198. case let .idle(error):
  199. self.updateExternalState(to: .idle(error))
  200. self.updateConnectionID()
  201. case .connecting:
  202. self.updateExternalState(to: .connecting)
  203. // This is an internal state.
  204. case .active:
  205. ()
  206. case .ready:
  207. self.updateExternalState(to: .ready)
  208. case let .transientFailure(state):
  209. self.updateExternalState(to: .transientFailure(state.reason))
  210. self.updateConnectionID()
  211. case .shutdown:
  212. self.updateExternalState(to: .shutdown)
  213. }
  214. }
  215. }
  216. /// Returns whether the state is 'idle'.
  217. private var isIdle: Bool {
  218. self.eventLoop.assertInEventLoop()
  219. switch self.state {
  220. case .idle:
  221. return true
  222. case .connecting, .transientFailure, .active, .ready, .shutdown:
  223. return false
  224. }
  225. }
  226. /// Returns whether the state is 'connecting'.
  227. private var isConnecting: Bool {
  228. self.eventLoop.assertInEventLoop()
  229. switch self.state {
  230. case .connecting:
  231. return true
  232. case .idle, .transientFailure, .active, .ready, .shutdown:
  233. return false
  234. }
  235. }
  236. /// Returns whether the state is 'ready'.
  237. private var isReady: Bool {
  238. self.eventLoop.assertInEventLoop()
  239. switch self.state {
  240. case .ready:
  241. return true
  242. case .idle, .active, .connecting, .transientFailure, .shutdown:
  243. return false
  244. }
  245. }
  246. /// Returns whether the state is 'ready'.
  247. private var isTransientFailure: Bool {
  248. self.eventLoop.assertInEventLoop()
  249. switch self.state {
  250. case .transientFailure:
  251. return true
  252. case .idle, .connecting, .active, .ready, .shutdown:
  253. return false
  254. }
  255. }
  256. /// Returns whether the state is 'shutdown'.
  257. private var isShutdown: Bool {
  258. self.eventLoop.assertInEventLoop()
  259. switch self.state {
  260. case .shutdown:
  261. return true
  262. case .idle, .connecting, .transientFailure, .active, .ready:
  263. return false
  264. }
  265. }
  266. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  267. private var multiplexer: HTTP2StreamMultiplexer? {
  268. self.eventLoop.assertInEventLoop()
  269. switch self.state {
  270. case let .ready(state):
  271. return state.multiplexer
  272. case .idle, .connecting, .transientFailure, .active, .shutdown:
  273. return nil
  274. }
  275. }
  276. /// The `EventLoop` that the managed connection will run on.
  277. internal let eventLoop: EventLoop
  278. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  279. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  280. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  281. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  282. /// An `EventLoopFuture<Channel>` provider.
  283. private let channelProvider: ConnectionManagerChannelProvider
  284. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  285. /// multiplexer.
  286. private let callStartBehavior: CallStartBehavior.Behavior
  287. /// The configuration to use when backing off between connection attempts, if reconnection
  288. /// attempts should be made at all.
  289. private let connectionBackoff: ConnectionBackoff?
  290. /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires).
  291. internal let idleBehavior: IdleBehavior
  292. /// A logger.
  293. internal var logger: Logger
  294. internal let id: ConnectionManagerID
  295. private var channelNumber: UInt64
  296. private var channelNumberLock = NIOLock()
  297. private var _connectionIDAndNumber: String {
  298. return "\(self.id)/\(self.channelNumber)"
  299. }
  300. private var connectionIDAndNumber: String {
  301. return self.channelNumberLock.withLock {
  302. return self._connectionIDAndNumber
  303. }
  304. }
  305. private func updateConnectionID() {
  306. self.channelNumberLock.withLock {
  307. self.channelNumber &+= 1
  308. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  309. }
  310. }
  311. internal func appendMetadata(to logger: inout Logger) {
  312. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  313. }
  314. internal convenience init(
  315. configuration: ClientConnection.Configuration,
  316. channelProvider: ConnectionManagerChannelProvider? = nil,
  317. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  318. idleBehavior: IdleBehavior,
  319. logger: Logger
  320. ) {
  321. self.init(
  322. eventLoop: configuration.eventLoopGroup.next(),
  323. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  324. callStartBehavior: configuration.callStartBehavior.wrapped,
  325. idleBehavior: idleBehavior,
  326. connectionBackoff: configuration.connectionBackoff,
  327. connectivityDelegate: connectivityDelegate,
  328. http2Delegate: nil,
  329. logger: logger
  330. )
  331. }
  332. internal init(
  333. eventLoop: EventLoop,
  334. channelProvider: ConnectionManagerChannelProvider,
  335. callStartBehavior: CallStartBehavior.Behavior,
  336. idleBehavior: IdleBehavior,
  337. connectionBackoff: ConnectionBackoff?,
  338. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  339. http2Delegate: ConnectionManagerHTTP2Delegate?,
  340. logger: Logger
  341. ) {
  342. // Setup the logger.
  343. var logger = logger
  344. let connectionID = ConnectionManagerID()
  345. let channelNumber: UInt64 = 0
  346. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  347. self.eventLoop = eventLoop
  348. self.state = .idle(lastError: nil)
  349. self.channelProvider = channelProvider
  350. self.callStartBehavior = callStartBehavior
  351. self.connectionBackoff = connectionBackoff
  352. self.connectivityDelegate = connectivityDelegate
  353. self.http2Delegate = http2Delegate
  354. self.idleBehavior = idleBehavior
  355. self.id = connectionID
  356. self.channelNumber = channelNumber
  357. self.logger = logger
  358. }
  359. /// Get the multiplexer from the underlying channel handling gRPC calls.
  360. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  361. /// one chance to connect - if not reconnections are managed here.
  362. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  363. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  364. switch self.callStartBehavior {
  365. case .waitsForConnectivity:
  366. return self.getHTTP2MultiplexerPatient()
  367. case .fastFailure:
  368. return self.getHTTP2MultiplexerOptimistic()
  369. }
  370. }
  371. if self.eventLoop.inEventLoop {
  372. return getHTTP2Multiplexer0()
  373. } else {
  374. return self.eventLoop.flatSubmit {
  375. getHTTP2Multiplexer0()
  376. }
  377. }
  378. }
  379. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  380. /// Reconnects are handled if necessary.
  381. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  382. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  383. switch self.state {
  384. case .idle:
  385. self.startConnecting()
  386. // We started connecting so we must transition to the `connecting` state.
  387. guard case let .connecting(connecting) = self.state else {
  388. self.unreachableState()
  389. }
  390. multiplexer = connecting.readyChannelMuxPromise.futureResult
  391. case let .connecting(state):
  392. multiplexer = state.readyChannelMuxPromise.futureResult
  393. case let .active(state):
  394. multiplexer = state.readyChannelMuxPromise.futureResult
  395. case let .ready(state):
  396. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  397. case let .transientFailure(state):
  398. multiplexer = state.readyChannelMuxPromise.futureResult
  399. case let .shutdown(state):
  400. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  401. }
  402. self.logger.debug(
  403. "vending multiplexer future",
  404. metadata: [
  405. "connectivity_state": "\(self.state.label)"
  406. ]
  407. )
  408. return multiplexer
  409. }
  410. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  411. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  412. ///
  413. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  414. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  415. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  416. self.eventLoop.preconditionInEventLoop()
  417. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  418. switch self.state {
  419. case .idle:
  420. self.startConnecting()
  421. // We started connecting so we must transition to the `connecting` state.
  422. guard case let .connecting(connecting) = self.state else {
  423. self.unreachableState()
  424. }
  425. return connecting.candidateMuxPromise.futureResult
  426. case let .connecting(state):
  427. return state.candidateMuxPromise.futureResult
  428. case let .active(active):
  429. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  430. case let .ready(ready):
  431. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  432. case let .transientFailure(state):
  433. return self.eventLoop.makeFailedFuture(state.reason)
  434. case let .shutdown(state):
  435. return self.eventLoop.makeFailedFuture(state.reason)
  436. }
  437. }()
  438. self.logger.debug(
  439. "vending fast-failing multiplexer future",
  440. metadata: [
  441. "connectivity_state": "\(self.state.label)"
  442. ]
  443. )
  444. return muxFuture
  445. }
  446. @usableFromInline
  447. internal enum ShutdownMode {
  448. /// Closes the underlying channel without waiting for existing RPCs to complete.
  449. case forceful
  450. /// Allows running RPCs to run their course before closing the underlying channel. No new
  451. /// streams may be created.
  452. case graceful(NIODeadline)
  453. }
  454. /// Shutdown the underlying connection.
  455. ///
  456. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  457. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  458. let promise = self.eventLoop.makePromise(of: Void.self)
  459. self.shutdown(mode: mode, promise: promise)
  460. return promise.futureResult
  461. }
  462. /// Shutdown the underlying connection.
  463. ///
  464. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  465. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  466. if self.eventLoop.inEventLoop {
  467. self._shutdown(mode: mode, promise: promise)
  468. } else {
  469. self.eventLoop.execute {
  470. self._shutdown(mode: mode, promise: promise)
  471. }
  472. }
  473. }
  474. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  475. self.logger.debug(
  476. "shutting down connection",
  477. metadata: [
  478. "connectivity_state": "\(self.state.label)",
  479. "shutdown.mode": "\(mode)",
  480. ]
  481. )
  482. switch self.state {
  483. // We don't have a channel and we don't want one, easy!
  484. case .idle:
  485. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  486. self.state = .shutdown(shutdown)
  487. promise.succeed(())
  488. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  489. // the shutdown future and deal with any fallout from the connecting channel without the
  490. // application knowing.
  491. case let .connecting(state):
  492. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  493. self.state = .shutdown(shutdown)
  494. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  495. // connect the application shouldn't have access to the channel or multiplexer.
  496. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  497. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  498. // Complete the shutdown promise when the connection attempt has completed.
  499. state.candidate.whenComplete {
  500. switch $0 {
  501. case let .success(channel):
  502. // In case we do successfully connect, close on the next loop tick. When connecting a
  503. // channel NIO will complete the promise for the channel before firing channel active.
  504. // That means we may close and fire inactive before active which HTTP/2 will be unhappy
  505. // about.
  506. self.eventLoop.execute {
  507. channel.close(mode: .all, promise: nil)
  508. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  509. }
  510. case .failure:
  511. // We failed to connect, that's fine we still shutdown successfully.
  512. promise.succeed(())
  513. }
  514. }
  515. // We have an active channel but the application doesn't know about it yet. We'll do the same
  516. // as for `.connecting`.
  517. case let .active(state):
  518. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  519. self.state = .shutdown(shutdown)
  520. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  521. // connect the application shouldn't have access to the channel or multiplexer.
  522. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  523. // We have a channel, close it. We only create streams in the ready state so there's no need
  524. // to quiesce here.
  525. state.candidate.close(mode: .all, promise: nil)
  526. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  527. // The channel is up and running: the application could be using it. We can close it and
  528. // return the `closeFuture`.
  529. case let .ready(state):
  530. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  531. self.state = .shutdown(shutdown)
  532. switch mode {
  533. case .forceful:
  534. // We have a channel, close it.
  535. state.channel.close(mode: .all, promise: nil)
  536. case let .graceful(deadline):
  537. // If we don't close by the deadline forcibly close the channel.
  538. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  539. self.logger.info("shutdown timer expired, forcibly closing connection")
  540. state.channel.close(mode: .all, promise: nil)
  541. }
  542. // Cancel the force close if we close normally first.
  543. state.channel.closeFuture.whenComplete { _ in
  544. scheduledForceClose.cancel()
  545. }
  546. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  547. // the channel when all streams have been closed.
  548. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  549. }
  550. // Complete the promise when we eventually close.
  551. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  552. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  553. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  554. // if we cancelled too late.
  555. case let .transientFailure(state):
  556. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  557. self.state = .shutdown(shutdown)
  558. // Stop the creation of a new channel, if we can. If we can't then the task to
  559. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  560. state.scheduled.cancel()
  561. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  562. // connect the application shouldn't should have access to the channel.
  563. state.readyChannelMuxPromise.fail(shutdown.reason)
  564. // No active channel, so complete the shutdown promise now.
  565. promise.succeed(())
  566. // We're already shutdown; there's nothing to do.
  567. case let .shutdown(state):
  568. promise.completeWith(state.closeFuture)
  569. }
  570. }
  571. /// Registers a callback which fires when the current active connection is closed.
  572. ///
  573. /// If there is a connection, the callback will be invoked with `true` when the connection is
  574. /// closed. Otherwise the callback is invoked with `false`.
  575. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  576. if self.eventLoop.inEventLoop {
  577. self._onCurrentConnectionClose(onClose)
  578. } else {
  579. self.eventLoop.execute {
  580. self._onCurrentConnectionClose(onClose)
  581. }
  582. }
  583. }
  584. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  585. self.eventLoop.assertInEventLoop()
  586. switch self.state {
  587. case let .ready(state):
  588. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  589. case .idle, .connecting, .active, .transientFailure, .shutdown:
  590. onClose(false)
  591. }
  592. }
  593. // MARK: - State changes from the channel handler.
  594. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  595. /// some context.
  596. internal func channelError(_ error: Error) {
  597. self.eventLoop.preconditionInEventLoop()
  598. switch self.state {
  599. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  600. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  601. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  602. case .idle:
  603. self.logger.warning(
  604. "ignoring unexpected error in idle",
  605. metadata: [
  606. MetadataKey.error: "\(error)"
  607. ]
  608. )
  609. case .connecting(var state):
  610. // Record the error, the channel promise will notify the manager of any error which occurs
  611. // while connecting. It may be overridden by this error if it contains more relevant
  612. // information
  613. if state.connectError == nil {
  614. state.connectError = error
  615. self.state = .connecting(state)
  616. // The pool is only notified of connection errors when the connection transitions to the
  617. // transient failure state. However, in some cases (i.e. with NIOTS), errors can be thrown
  618. // during the connect but before the connect times out.
  619. //
  620. // This opens up a period of time where you can start a call and have it fail with
  621. // deadline exceeded (because no connection was available within the configured max
  622. // wait time for the pool) but without any diagnostic information. The information is
  623. // available but it hasn't been made available to the pool at that point in time.
  624. //
  625. // The delegate can't easily be modified (it's public API) and a new API doesn't make all
  626. // that much sense so we elect to check whether the delegate is the pool and call it
  627. // directly.
  628. if let pool = self.connectivityDelegate as? ConnectionPool {
  629. pool.sync.updateMostRecentError(error)
  630. }
  631. }
  632. case var .active(state):
  633. state.error = error
  634. self.state = .active(state)
  635. case var .ready(state):
  636. state.error = error
  637. self.state = .ready(state)
  638. // If we've already in one of these states, then additional errors aren't helpful to us.
  639. case .transientFailure, .shutdown:
  640. ()
  641. }
  642. }
  643. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  644. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  645. self.eventLoop.preconditionInEventLoop()
  646. self.logger.debug(
  647. "activating connection",
  648. metadata: [
  649. "connectivity_state": "\(self.state.label)"
  650. ]
  651. )
  652. switch self.state {
  653. case let .connecting(connecting):
  654. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  655. self.state = .active(connected)
  656. // Optimistic connections are happy this this level of setup.
  657. connecting.candidateMuxPromise.succeed(multiplexer)
  658. // Application called shutdown before the channel become active; we should close it.
  659. case .shutdown:
  660. channel.close(mode: .all, promise: nil)
  661. case .idle, .transientFailure:
  662. // Received a channelActive when not connecting. Can happen if channelActive and
  663. // channelInactive are reordered. Ignore.
  664. ()
  665. case .active, .ready:
  666. // Received a second 'channelActive', already active so ignore.
  667. ()
  668. }
  669. }
  670. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  671. /// Must be called on the `EventLoop`.
  672. internal func channelInactive() {
  673. self.eventLoop.preconditionInEventLoop()
  674. self.logger.debug(
  675. "deactivating connection",
  676. metadata: [
  677. "connectivity_state": "\(self.state.label)"
  678. ]
  679. )
  680. switch self.state {
  681. // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
  682. // common but we should tolerate it.
  683. case let .connecting(connecting):
  684. // Should we try connecting again?
  685. switch connecting.reconnect {
  686. // No, shutdown instead.
  687. case .none:
  688. self.logger.debug("shutting down connection")
  689. let error = GRPCStatus(
  690. code: .unavailable,
  691. message: "The connection was dropped and connection re-establishment is disabled"
  692. )
  693. let shutdownState = ShutdownState(
  694. closeFuture: self.eventLoop.makeSucceededFuture(()),
  695. reason: error
  696. )
  697. self.state = .shutdown(shutdownState)
  698. // Shutting down, so fail the outstanding promises.
  699. connecting.readyChannelMuxPromise.fail(error)
  700. connecting.candidateMuxPromise.fail(error)
  701. // Yes, after some time.
  702. case let .after(delay):
  703. let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
  704. // Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again.
  705. connecting.candidateMuxPromise.fail(error)
  706. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  707. self.startConnecting()
  708. }
  709. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  710. self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
  711. }
  712. // The channel is `active` but not `ready`. Should we try again?
  713. case let .active(active):
  714. switch active.reconnect {
  715. // No, shutdown instead.
  716. case .none:
  717. self.logger.debug("shutting down connection")
  718. let error = GRPCStatus(
  719. code: .unavailable,
  720. message: "The connection was dropped and connection re-establishment is disabled"
  721. )
  722. let shutdownState = ShutdownState(
  723. closeFuture: self.eventLoop.makeSucceededFuture(()),
  724. reason: error
  725. )
  726. self.state = .shutdown(shutdownState)
  727. active.readyChannelMuxPromise.fail(error)
  728. // Yes, after some time.
  729. case let .after(delay):
  730. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  731. self.startConnecting()
  732. }
  733. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  734. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  735. }
  736. // The channel was ready and working fine but something went wrong. Should we try to replace
  737. // the channel?
  738. case let .ready(ready):
  739. // No, no backoff is configured.
  740. if self.connectionBackoff == nil {
  741. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  742. self.state = .shutdown(
  743. ShutdownState(
  744. closeFuture: ready.channel.closeFuture,
  745. reason: GRPCStatus(
  746. code: .unavailable,
  747. message: "The connection was dropped and a reconnect was not configured"
  748. )
  749. )
  750. )
  751. } else {
  752. // Yes, start connecting now. We should go via `transientFailure`, however.
  753. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  754. self.startConnecting()
  755. }
  756. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  757. let backoffIterator = self.connectionBackoff?.makeIterator()
  758. self.state = .transientFailure(
  759. TransientFailureState(
  760. from: ready,
  761. scheduled: scheduled,
  762. backoffIterator: backoffIterator
  763. )
  764. )
  765. }
  766. // This is fine: we expect the channel to become inactive after becoming idle.
  767. case .idle:
  768. ()
  769. // We're already shutdown, that's fine.
  770. case .shutdown:
  771. ()
  772. // Received 'channelInactive' twice; fine, ignore.
  773. case .transientFailure:
  774. ()
  775. }
  776. }
  777. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  778. /// called on the `EventLoop`.
  779. internal func ready() {
  780. self.eventLoop.preconditionInEventLoop()
  781. self.logger.debug(
  782. "connection ready",
  783. metadata: [
  784. "connectivity_state": "\(self.state.label)"
  785. ]
  786. )
  787. switch self.state {
  788. case let .active(connected):
  789. self.state = .ready(ReadyState(from: connected))
  790. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  791. case .shutdown:
  792. ()
  793. case .idle, .transientFailure:
  794. // No connection or connection attempt exists but connection was marked as ready. This is
  795. // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
  796. // error to.
  797. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  798. case .connecting:
  799. // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
  800. // mode.
  801. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  802. case .ready:
  803. // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
  804. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  805. }
  806. }
  807. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  808. /// the `EventLoop`.
  809. internal func idle() {
  810. self.eventLoop.preconditionInEventLoop()
  811. self.logger.debug(
  812. "idling connection",
  813. metadata: [
  814. "connectivity_state": "\(self.state.label)"
  815. ]
  816. )
  817. switch self.state {
  818. case let .active(state):
  819. // This state is reachable if the keepalive timer fires before we reach the ready state.
  820. self.state = .idle(lastError: state.error)
  821. state.readyChannelMuxPromise
  822. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  823. case let .ready(state):
  824. self.state = .idle(lastError: state.error)
  825. case .shutdown:
  826. // This is expected when the connection is closed by the user: when the channel becomes
  827. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  828. // 'channelInactive()'.
  829. ()
  830. case .idle, .transientFailure:
  831. // There's no connection to idle; ignore.
  832. ()
  833. case .connecting:
  834. // The idle watchdog is started when the connection is active, this shouldn't happen
  835. // in the connecting state. Ignore it in release mode.
  836. assertionFailure("tried to idle a connection in the \(self.state.label) state")
  837. }
  838. }
  839. internal func streamOpened() {
  840. self.eventLoop.assertInEventLoop()
  841. self.http2Delegate?.streamOpened(self)
  842. }
  843. internal func streamClosed() {
  844. self.eventLoop.assertInEventLoop()
  845. self.http2Delegate?.streamClosed(self)
  846. }
  847. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  848. self.eventLoop.assertInEventLoop()
  849. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  850. self,
  851. maxConcurrentStreams: maxConcurrentStreams
  852. )
  853. }
  854. /// The connection has started quiescing: notify the connectivity monitor of this.
  855. internal func beginQuiescing() {
  856. self.eventLoop.assertInEventLoop()
  857. self.connectivityDelegate?.connectionIsQuiescing(self)
  858. }
  859. }
  860. extension ConnectionManager {
  861. // A connection attempt failed; we never established a connection.
  862. private func connectionFailed(withError error: Error) {
  863. self.eventLoop.preconditionInEventLoop()
  864. switch self.state {
  865. case let .connecting(connecting):
  866. let reportedError: Error
  867. switch error as? ChannelError {
  868. case .some(.connectTimeout):
  869. // A more relevant error may have been caught earlier. Use that in preference to the
  870. // timeout as it'll likely be more useful.
  871. reportedError = connecting.connectError ?? error
  872. default:
  873. reportedError = error
  874. }
  875. // Should we reconnect?
  876. switch connecting.reconnect {
  877. // No, shutdown.
  878. case .none:
  879. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  880. self.state = .shutdown(
  881. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: reportedError)
  882. )
  883. connecting.readyChannelMuxPromise.fail(reportedError)
  884. connecting.candidateMuxPromise.fail(reportedError)
  885. // Yes, after a delay.
  886. case let .after(delay):
  887. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  888. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  889. self.startConnecting()
  890. }
  891. self.state = .transientFailure(
  892. TransientFailureState(from: connecting, scheduled: scheduled, reason: reportedError)
  893. )
  894. // Candidate mux users are not willing to wait.
  895. connecting.candidateMuxPromise.fail(reportedError)
  896. }
  897. // The application must have called shutdown while we were trying to establish a connection
  898. // which was doomed to fail anyway. That's fine, we can ignore this.
  899. case .shutdown:
  900. ()
  901. // Connection attempt failed, but no connection attempt is in progress.
  902. case .idle, .active, .ready, .transientFailure:
  903. // Nothing we can do other than ignore in release mode.
  904. assertionFailure("connect promise failed in \(self.state.label) state")
  905. }
  906. }
  907. }
  908. extension ConnectionManager {
  909. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  910. // states. Must be called on the `EventLoop`.
  911. private func startConnecting() {
  912. self.eventLoop.assertInEventLoop()
  913. switch self.state {
  914. case .idle:
  915. let iterator = self.connectionBackoff?.makeIterator()
  916. // The iterator produces the connect timeout and the backoff to use for the next attempt. This
  917. // is unfortunate if retries is set to none because we need to connect timeout but not the
  918. // backoff yet the iterator will not return a value to us. To workaround this we grab the
  919. // connect timeout and override it.
  920. let connectTimeoutOverride: TimeAmount?
  921. if let backoff = self.connectionBackoff, backoff.retries == .none {
  922. connectTimeoutOverride = .seconds(timeInterval: backoff.minimumConnectionTimeout)
  923. } else {
  924. connectTimeoutOverride = nil
  925. }
  926. self.startConnecting(
  927. backoffIterator: iterator,
  928. muxPromise: self.eventLoop.makePromise(),
  929. connectTimeoutOverride: connectTimeoutOverride
  930. )
  931. case let .transientFailure(pending):
  932. self.startConnecting(
  933. backoffIterator: pending.backoffIterator,
  934. muxPromise: pending.readyChannelMuxPromise
  935. )
  936. // We shutdown before a scheduled connection attempt had started.
  937. case .shutdown:
  938. ()
  939. // We only call startConnecting() if the connection does not exist and after checking what the
  940. // current state is, so none of these states should be reachable.
  941. case .connecting:
  942. self.unreachableState()
  943. case .active:
  944. self.unreachableState()
  945. case .ready:
  946. self.unreachableState()
  947. }
  948. }
  949. private func startConnecting(
  950. backoffIterator: ConnectionBackoffIterator?,
  951. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>,
  952. connectTimeoutOverride: TimeAmount? = nil
  953. ) {
  954. let timeoutAndBackoff = backoffIterator?.next()
  955. // We're already on the event loop: submit the connect so it starts after we've made the
  956. // state change to `.connecting`.
  957. self.eventLoop.assertInEventLoop()
  958. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  959. let connectTimeout: TimeAmount?
  960. if let connectTimeoutOverride = connectTimeoutOverride {
  961. connectTimeout = connectTimeoutOverride
  962. } else {
  963. connectTimeout = timeoutAndBackoff.map { TimeAmount.seconds(timeInterval: $0.timeout) }
  964. }
  965. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  966. managedBy: self,
  967. onEventLoop: self.eventLoop,
  968. connectTimeout: connectTimeout,
  969. logger: self.logger
  970. )
  971. channel.whenFailure { error in
  972. self.connectionFailed(withError: error)
  973. }
  974. return channel
  975. }
  976. // Should we reconnect if the candidate channel fails?
  977. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  978. let connecting = ConnectingState(
  979. backoffIterator: backoffIterator,
  980. reconnect: reconnect,
  981. candidate: candidate,
  982. readyChannelMuxPromise: muxPromise,
  983. candidateMuxPromise: self.eventLoop.makePromise()
  984. )
  985. self.state = .connecting(connecting)
  986. }
  987. }
  988. extension ConnectionManager {
  989. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  990. /// executing on the same `EventLoop` as the connection manager.
  991. internal var sync: Sync {
  992. return Sync(self)
  993. }
  994. internal struct Sync {
  995. private let manager: ConnectionManager
  996. fileprivate init(_ manager: ConnectionManager) {
  997. self.manager = manager
  998. }
  999. /// A delegate for connectivity changes.
  1000. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  1001. get {
  1002. self.manager.eventLoop.assertInEventLoop()
  1003. return self.manager.connectivityDelegate
  1004. }
  1005. nonmutating set {
  1006. self.manager.eventLoop.assertInEventLoop()
  1007. self.manager.connectivityDelegate = newValue
  1008. }
  1009. }
  1010. /// A delegate for HTTP/2 connection changes.
  1011. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  1012. get {
  1013. self.manager.eventLoop.assertInEventLoop()
  1014. return self.manager.http2Delegate
  1015. }
  1016. nonmutating set {
  1017. self.manager.eventLoop.assertInEventLoop()
  1018. self.manager.http2Delegate = newValue
  1019. }
  1020. }
  1021. /// Returns `true` if the connection is in the idle state.
  1022. internal var isIdle: Bool {
  1023. return self.manager.isIdle
  1024. }
  1025. /// Returns `true` if the connection is in the connecting state.
  1026. internal var isConnecting: Bool {
  1027. return self.manager.isConnecting
  1028. }
  1029. /// Returns `true` if the connection is in the ready state.
  1030. internal var isReady: Bool {
  1031. return self.manager.isReady
  1032. }
  1033. /// Returns `true` if the connection is in the transient failure state.
  1034. internal var isTransientFailure: Bool {
  1035. return self.manager.isTransientFailure
  1036. }
  1037. /// Returns `true` if the connection is in the shutdown state.
  1038. internal var isShutdown: Bool {
  1039. return self.manager.isShutdown
  1040. }
  1041. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  1042. /// other state.
  1043. internal var multiplexer: HTTP2StreamMultiplexer? {
  1044. return self.manager.multiplexer
  1045. }
  1046. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  1047. internal func startConnecting() {
  1048. self.manager.startConnecting()
  1049. }
  1050. /// Shutdown the manager now.
  1051. internal func shutdownNow() {
  1052. self.manager._shutdown(mode: .forceful, promise: self.manager.eventLoop.makePromise())
  1053. }
  1054. }
  1055. }
  1056. extension ConnectionManager {
  1057. private func unreachableState(
  1058. function: StaticString = #function,
  1059. file: StaticString = #fileID,
  1060. line: UInt = #line
  1061. ) -> Never {
  1062. fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
  1063. }
  1064. }