ConnectionManager.swift 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. #if compiler(>=5.6)
  22. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  23. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  24. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  25. // event loop is being used.
  26. extension ConnectionManager: @unchecked Sendable {}
  27. #endif // compiler(>=5.6)
  28. @usableFromInline
  29. internal final class ConnectionManager {
  30. internal enum Reconnect {
  31. case none
  32. case after(TimeInterval)
  33. }
  34. internal struct ConnectingState {
  35. var backoffIterator: ConnectionBackoffIterator?
  36. var reconnect: Reconnect
  37. var candidate: EventLoopFuture<Channel>
  38. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  39. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  40. }
  41. internal struct ConnectedState {
  42. var backoffIterator: ConnectionBackoffIterator?
  43. var reconnect: Reconnect
  44. var candidate: Channel
  45. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  46. var multiplexer: HTTP2StreamMultiplexer
  47. var error: Error?
  48. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  49. self.backoffIterator = state.backoffIterator
  50. self.reconnect = state.reconnect
  51. self.candidate = candidate
  52. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  53. self.multiplexer = multiplexer
  54. }
  55. }
  56. internal struct ReadyState {
  57. var channel: Channel
  58. var multiplexer: HTTP2StreamMultiplexer
  59. var error: Error?
  60. init(from state: ConnectedState) {
  61. self.channel = state.candidate
  62. self.multiplexer = state.multiplexer
  63. }
  64. }
  65. internal struct TransientFailureState {
  66. var backoffIterator: ConnectionBackoffIterator?
  67. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  68. var scheduled: Scheduled<Void>
  69. var reason: Error
  70. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  71. self.backoffIterator = state.backoffIterator
  72. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  73. self.scheduled = scheduled
  74. self.reason = reason
  75. }
  76. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  77. self.backoffIterator = state.backoffIterator
  78. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  79. self.scheduled = scheduled
  80. self.reason = state.error ?? GRPCStatus(
  81. code: .unavailable,
  82. message: "Unexpected connection drop"
  83. )
  84. }
  85. init(
  86. from state: ReadyState,
  87. scheduled: Scheduled<Void>,
  88. backoffIterator: ConnectionBackoffIterator?
  89. ) {
  90. self.backoffIterator = backoffIterator
  91. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  92. self.scheduled = scheduled
  93. self.reason = state.error ?? GRPCStatus(
  94. code: .unavailable,
  95. message: "Unexpected connection drop"
  96. )
  97. }
  98. }
  99. internal struct ShutdownState {
  100. var closeFuture: EventLoopFuture<Void>
  101. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  102. /// this error.
  103. var reason: Error
  104. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  105. self.closeFuture = closeFuture
  106. self.reason = reason
  107. }
  108. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  109. return ShutdownState(
  110. closeFuture: closeFuture,
  111. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  112. )
  113. }
  114. }
  115. internal enum State {
  116. /// No `Channel` is required.
  117. ///
  118. /// Valid next states:
  119. /// - `connecting`
  120. /// - `shutdown`
  121. case idle(lastError: Error?)
  122. /// We're actively trying to establish a connection.
  123. ///
  124. /// Valid next states:
  125. /// - `active`
  126. /// - `transientFailure` (if our attempt fails and we're going to try again)
  127. /// - `shutdown`
  128. case connecting(ConnectingState)
  129. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  130. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  131. ///
  132. /// Valid next states:
  133. /// - `ready`
  134. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  135. /// to re-establish the connection)
  136. /// - `shutdown`
  137. case active(ConnectedState)
  138. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  139. /// the channel for making RPCs.
  140. ///
  141. /// Valid next states:
  142. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  143. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  144. /// - `shutdown`
  145. case ready(ReadyState)
  146. /// A `Channel` is desired, we'll attempt to create one in the future.
  147. ///
  148. /// Valid next states:
  149. /// - `connecting`
  150. /// - `shutdown`
  151. case transientFailure(TransientFailureState)
  152. /// We never want another `Channel`: this state is terminal.
  153. case shutdown(ShutdownState)
  154. fileprivate var label: String {
  155. switch self {
  156. case .idle:
  157. return "idle"
  158. case .connecting:
  159. return "connecting"
  160. case .active:
  161. return "active"
  162. case .ready:
  163. return "ready"
  164. case .transientFailure:
  165. return "transientFailure"
  166. case .shutdown:
  167. return "shutdown"
  168. }
  169. }
  170. }
  171. /// The last 'external' state we are in, a subset of the internal state.
  172. private var externalState: _ConnectivityState = .idle(nil)
  173. /// Update the external state, potentially notifying a delegate about the change.
  174. private func updateExternalState(to nextState: _ConnectivityState) {
  175. if !self.externalState.isSameState(as: nextState) {
  176. let oldState = self.externalState
  177. self.externalState = nextState
  178. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  179. }
  180. }
  181. /// Our current state.
  182. private var state: State {
  183. didSet {
  184. switch self.state {
  185. case let .idle(error):
  186. self.updateExternalState(to: .idle(error))
  187. self.updateConnectionID()
  188. case .connecting:
  189. self.updateExternalState(to: .connecting)
  190. // This is an internal state.
  191. case .active:
  192. ()
  193. case .ready:
  194. self.updateExternalState(to: .ready)
  195. case let .transientFailure(state):
  196. self.updateExternalState(to: .transientFailure(state.reason))
  197. self.updateConnectionID()
  198. case .shutdown:
  199. self.updateExternalState(to: .shutdown)
  200. }
  201. }
  202. }
  203. /// Returns whether the state is 'idle'.
  204. private var isIdle: Bool {
  205. self.eventLoop.assertInEventLoop()
  206. switch self.state {
  207. case .idle:
  208. return true
  209. case .connecting, .transientFailure, .active, .ready, .shutdown:
  210. return false
  211. }
  212. }
  213. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  214. private var multiplexer: HTTP2StreamMultiplexer? {
  215. self.eventLoop.assertInEventLoop()
  216. switch self.state {
  217. case let .ready(state):
  218. return state.multiplexer
  219. case .idle, .connecting, .transientFailure, .active, .shutdown:
  220. return nil
  221. }
  222. }
  223. /// The `EventLoop` that the managed connection will run on.
  224. internal let eventLoop: EventLoop
  225. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  226. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  227. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  228. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  229. /// An `EventLoopFuture<Channel>` provider.
  230. private let channelProvider: ConnectionManagerChannelProvider
  231. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  232. /// multiplexer.
  233. private let callStartBehavior: CallStartBehavior.Behavior
  234. /// The configuration to use when backing off between connection attempts, if reconnection
  235. /// attempts should be made at all.
  236. private let connectionBackoff: ConnectionBackoff?
  237. /// A logger.
  238. internal var logger: Logger
  239. private let connectionID: String
  240. private var channelNumber: UInt64
  241. private var channelNumberLock = NIOLock()
  242. private var _connectionIDAndNumber: String {
  243. return "\(self.connectionID)/\(self.channelNumber)"
  244. }
  245. private var connectionIDAndNumber: String {
  246. return self.channelNumberLock.withLock {
  247. return self._connectionIDAndNumber
  248. }
  249. }
  250. private func updateConnectionID() {
  251. self.channelNumberLock.withLock {
  252. self.channelNumber &+= 1
  253. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  254. }
  255. }
  256. internal func appendMetadata(to logger: inout Logger) {
  257. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  258. }
  259. internal convenience init(
  260. configuration: ClientConnection.Configuration,
  261. channelProvider: ConnectionManagerChannelProvider? = nil,
  262. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  263. logger: Logger
  264. ) {
  265. self.init(
  266. eventLoop: configuration.eventLoopGroup.next(),
  267. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  268. callStartBehavior: configuration.callStartBehavior.wrapped,
  269. connectionBackoff: configuration.connectionBackoff,
  270. connectivityDelegate: connectivityDelegate,
  271. http2Delegate: nil,
  272. logger: logger
  273. )
  274. }
  275. internal init(
  276. eventLoop: EventLoop,
  277. channelProvider: ConnectionManagerChannelProvider,
  278. callStartBehavior: CallStartBehavior.Behavior,
  279. connectionBackoff: ConnectionBackoff?,
  280. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  281. http2Delegate: ConnectionManagerHTTP2Delegate?,
  282. logger: Logger
  283. ) {
  284. // Setup the logger.
  285. var logger = logger
  286. let connectionID = UUID().uuidString
  287. let channelNumber: UInt64 = 0
  288. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  289. self.eventLoop = eventLoop
  290. self.state = .idle(lastError: nil)
  291. self.channelProvider = channelProvider
  292. self.callStartBehavior = callStartBehavior
  293. self.connectionBackoff = connectionBackoff
  294. self.connectivityDelegate = connectivityDelegate
  295. self.http2Delegate = http2Delegate
  296. self.connectionID = connectionID
  297. self.channelNumber = channelNumber
  298. self.logger = logger
  299. }
  300. /// Get the multiplexer from the underlying channel handling gRPC calls.
  301. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  302. /// one chance to connect - if not reconnections are managed here.
  303. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  304. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  305. switch self.callStartBehavior {
  306. case .waitsForConnectivity:
  307. return self.getHTTP2MultiplexerPatient()
  308. case .fastFailure:
  309. return self.getHTTP2MultiplexerOptimistic()
  310. }
  311. }
  312. if self.eventLoop.inEventLoop {
  313. return getHTTP2Multiplexer0()
  314. } else {
  315. return self.eventLoop.flatSubmit {
  316. getHTTP2Multiplexer0()
  317. }
  318. }
  319. }
  320. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  321. /// Reconnects are handled if necessary.
  322. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  323. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  324. switch self.state {
  325. case .idle:
  326. self.startConnecting()
  327. // We started connecting so we must transition to the `connecting` state.
  328. guard case let .connecting(connecting) = self.state else {
  329. self.invalidState()
  330. }
  331. multiplexer = connecting.readyChannelMuxPromise.futureResult
  332. case let .connecting(state):
  333. multiplexer = state.readyChannelMuxPromise.futureResult
  334. case let .active(state):
  335. multiplexer = state.readyChannelMuxPromise.futureResult
  336. case let .ready(state):
  337. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  338. case let .transientFailure(state):
  339. multiplexer = state.readyChannelMuxPromise.futureResult
  340. case let .shutdown(state):
  341. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  342. }
  343. self.logger.debug("vending multiplexer future", metadata: [
  344. "connectivity_state": "\(self.state.label)",
  345. ])
  346. return multiplexer
  347. }
  348. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  349. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  350. ///
  351. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  352. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  353. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  354. self.eventLoop.preconditionInEventLoop()
  355. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  356. switch self.state {
  357. case .idle:
  358. self.startConnecting()
  359. // We started connecting so we must transition to the `connecting` state.
  360. guard case let .connecting(connecting) = self.state else {
  361. self.invalidState()
  362. }
  363. return connecting.candidateMuxPromise.futureResult
  364. case let .connecting(state):
  365. return state.candidateMuxPromise.futureResult
  366. case let .active(active):
  367. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  368. case let .ready(ready):
  369. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  370. case let .transientFailure(state):
  371. return self.eventLoop.makeFailedFuture(state.reason)
  372. case let .shutdown(state):
  373. return self.eventLoop.makeFailedFuture(state.reason)
  374. }
  375. }()
  376. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  377. "connectivity_state": "\(self.state.label)",
  378. ])
  379. return muxFuture
  380. }
  381. @usableFromInline
  382. internal enum ShutdownMode {
  383. /// Closes the underlying channel without waiting for existing RPCs to complete.
  384. case forceful
  385. /// Allows running RPCs to run their course before closing the underlying channel. No new
  386. /// streams may be created.
  387. case graceful(NIODeadline)
  388. }
  389. /// Shutdown the underlying connection.
  390. ///
  391. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  392. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  393. let promise = self.eventLoop.makePromise(of: Void.self)
  394. self.shutdown(mode: mode, promise: promise)
  395. return promise.futureResult
  396. }
  397. /// Shutdown the underlying connection.
  398. ///
  399. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  400. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  401. if self.eventLoop.inEventLoop {
  402. self._shutdown(mode: mode, promise: promise)
  403. } else {
  404. self.eventLoop.execute {
  405. self._shutdown(mode: mode, promise: promise)
  406. }
  407. }
  408. }
  409. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  410. self.logger.debug("shutting down connection", metadata: [
  411. "connectivity_state": "\(self.state.label)",
  412. "shutdown.mode": "\(mode)",
  413. ])
  414. switch self.state {
  415. // We don't have a channel and we don't want one, easy!
  416. case .idle:
  417. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  418. self.state = .shutdown(shutdown)
  419. promise.succeed(())
  420. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  421. // the shutdown future and deal with any fallout from the connecting channel without the
  422. // application knowing.
  423. case let .connecting(state):
  424. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  425. self.state = .shutdown(shutdown)
  426. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  427. // connect the application shouldn't have access to the channel or multiplexer.
  428. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  429. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  430. // Complete the shutdown promise when the connection attempt has completed.
  431. state.candidate.whenComplete {
  432. switch $0 {
  433. case let .success(channel):
  434. // In case we do successfully connect, close immediately.
  435. channel.close(mode: .all, promise: nil)
  436. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  437. case .failure:
  438. // We failed to connect, that's fine we still shutdown successfully.
  439. promise.succeed(())
  440. }
  441. }
  442. // We have an active channel but the application doesn't know about it yet. We'll do the same
  443. // as for `.connecting`.
  444. case let .active(state):
  445. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  446. self.state = .shutdown(shutdown)
  447. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  448. // connect the application shouldn't have access to the channel or multiplexer.
  449. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  450. // We have a channel, close it. We only create streams in the ready state so there's no need
  451. // to quiesce here.
  452. state.candidate.close(mode: .all, promise: nil)
  453. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  454. // The channel is up and running: the application could be using it. We can close it and
  455. // return the `closeFuture`.
  456. case let .ready(state):
  457. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  458. self.state = .shutdown(shutdown)
  459. switch mode {
  460. case .forceful:
  461. // We have a channel, close it.
  462. state.channel.close(mode: .all, promise: nil)
  463. case let .graceful(deadline):
  464. // If we don't close by the deadline forcibly close the channel.
  465. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  466. self.logger.info("shutdown timer expired, forcibly closing connection")
  467. state.channel.close(mode: .all, promise: nil)
  468. }
  469. // Cancel the force close if we close normally first.
  470. state.channel.closeFuture.whenComplete { _ in
  471. scheduledForceClose.cancel()
  472. }
  473. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  474. // the channel when all streams have been closed.
  475. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  476. }
  477. // Complete the promise when we eventually close.
  478. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  479. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  480. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  481. // if we cancelled too late.
  482. case let .transientFailure(state):
  483. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  484. self.state = .shutdown(shutdown)
  485. // Stop the creation of a new channel, if we can. If we can't then the task to
  486. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  487. state.scheduled.cancel()
  488. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  489. // connect the application shouldn't should have access to the channel.
  490. state.readyChannelMuxPromise.fail(shutdown.reason)
  491. // No active channel, so complete the shutdown promise now.
  492. promise.succeed(())
  493. // We're already shutdown; there's nothing to do.
  494. case let .shutdown(state):
  495. promise.completeWith(state.closeFuture)
  496. }
  497. }
  498. // MARK: - State changes from the channel handler.
  499. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  500. /// some context.
  501. internal func channelError(_ error: Error) {
  502. self.eventLoop.preconditionInEventLoop()
  503. switch self.state {
  504. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  505. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  506. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  507. case .idle:
  508. self.logger.warning("ignoring unexpected error in idle", metadata: [
  509. MetadataKey.error: "\(error)",
  510. ])
  511. case .connecting:
  512. self.connectionFailed(withError: error)
  513. case var .active(state):
  514. state.error = error
  515. self.state = .active(state)
  516. case var .ready(state):
  517. state.error = error
  518. self.state = .ready(state)
  519. // If we've already in one of these states, then additional errors aren't helpful to us.
  520. case .transientFailure, .shutdown:
  521. ()
  522. }
  523. }
  524. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  525. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  526. self.eventLoop.preconditionInEventLoop()
  527. self.logger.debug("activating connection", metadata: [
  528. "connectivity_state": "\(self.state.label)",
  529. ])
  530. switch self.state {
  531. case let .connecting(connecting):
  532. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  533. self.state = .active(connected)
  534. // Optimistic connections are happy this this level of setup.
  535. connecting.candidateMuxPromise.succeed(multiplexer)
  536. // Application called shutdown before the channel become active; we should close it.
  537. case .shutdown:
  538. channel.close(mode: .all, promise: nil)
  539. // These cases are purposefully separated: some crash reporting services provide stack traces
  540. // which don't include the precondition failure message (which contain the invalid state we were
  541. // in). Keeping the cases separate allows us work out the state from the line number.
  542. case .idle:
  543. self.invalidState()
  544. case .active:
  545. self.invalidState()
  546. case .ready:
  547. self.invalidState()
  548. case .transientFailure:
  549. self.invalidState()
  550. }
  551. }
  552. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  553. /// Must be called on the `EventLoop`.
  554. internal func channelInactive() {
  555. self.eventLoop.preconditionInEventLoop()
  556. self.logger.debug("deactivating connection", metadata: [
  557. "connectivity_state": "\(self.state.label)",
  558. ])
  559. switch self.state {
  560. // The channel is `active` but not `ready`. Should we try again?
  561. case let .active(active):
  562. switch active.reconnect {
  563. // No, shutdown instead.
  564. case .none:
  565. self.logger.debug("shutting down connection")
  566. let error = GRPCStatus(
  567. code: .unavailable,
  568. message: "The connection was dropped and connection re-establishment is disabled"
  569. )
  570. let shutdownState = ShutdownState(
  571. closeFuture: self.eventLoop.makeSucceededFuture(()),
  572. reason: error
  573. )
  574. self.state = .shutdown(shutdownState)
  575. active.readyChannelMuxPromise.fail(error)
  576. // Yes, after some time.
  577. case let .after(delay):
  578. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  579. self.startConnecting()
  580. }
  581. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  582. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  583. }
  584. // The channel was ready and working fine but something went wrong. Should we try to replace
  585. // the channel?
  586. case let .ready(ready):
  587. // No, no backoff is configured.
  588. if self.connectionBackoff == nil {
  589. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  590. self.state = .shutdown(
  591. ShutdownState(
  592. closeFuture: ready.channel.closeFuture,
  593. reason: GRPCStatus(
  594. code: .unavailable,
  595. message: "The connection was dropped and a reconnect was not configured"
  596. )
  597. )
  598. )
  599. } else {
  600. // Yes, start connecting now. We should go via `transientFailure`, however.
  601. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  602. self.startConnecting()
  603. }
  604. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  605. let backoffIterator = self.connectionBackoff?.makeIterator()
  606. self.state = .transientFailure(TransientFailureState(
  607. from: ready,
  608. scheduled: scheduled,
  609. backoffIterator: backoffIterator
  610. ))
  611. }
  612. // This is fine: we expect the channel to become inactive after becoming idle.
  613. case .idle:
  614. ()
  615. // We're already shutdown, that's fine.
  616. case .shutdown:
  617. ()
  618. // These cases are purposefully separated: some crash reporting services provide stack traces
  619. // which don't include the precondition failure message (which contain the invalid state we were
  620. // in). Keeping the cases separate allows us work out the state from the line number.
  621. case .connecting:
  622. self.invalidState()
  623. case .transientFailure:
  624. self.invalidState()
  625. }
  626. }
  627. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  628. /// called on the `EventLoop`.
  629. internal func ready() {
  630. self.eventLoop.preconditionInEventLoop()
  631. self.logger.debug("connection ready", metadata: [
  632. "connectivity_state": "\(self.state.label)",
  633. ])
  634. switch self.state {
  635. case let .active(connected):
  636. self.state = .ready(ReadyState(from: connected))
  637. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  638. case .shutdown:
  639. ()
  640. // These cases are purposefully separated: some crash reporting services provide stack traces
  641. // which don't include the precondition failure message (which contain the invalid state we were
  642. // in). Keeping the cases separate allows us work out the state from the line number.
  643. case .idle:
  644. self.invalidState()
  645. case .transientFailure:
  646. self.invalidState()
  647. case .connecting:
  648. self.invalidState()
  649. case .ready:
  650. self.invalidState()
  651. }
  652. }
  653. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  654. /// the `EventLoop`.
  655. internal func idle() {
  656. self.eventLoop.preconditionInEventLoop()
  657. self.logger.debug("idling connection", metadata: [
  658. "connectivity_state": "\(self.state.label)",
  659. ])
  660. switch self.state {
  661. case let .active(state):
  662. // This state is reachable if the keepalive timer fires before we reach the ready state.
  663. self.state = .idle(lastError: state.error)
  664. state.readyChannelMuxPromise
  665. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  666. case let .ready(state):
  667. self.state = .idle(lastError: state.error)
  668. case .shutdown:
  669. // This is expected when the connection is closed by the user: when the channel becomes
  670. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  671. // 'channelInactive()'.
  672. ()
  673. // These cases are purposefully separated: some crash reporting services provide stack traces
  674. // which don't include the precondition failure message (which contain the invalid state we were
  675. // in). Keeping the cases separate allows us work out the state from the line number.
  676. case .idle:
  677. self.invalidState()
  678. case .connecting:
  679. self.invalidState()
  680. case .transientFailure:
  681. self.invalidState()
  682. }
  683. }
  684. internal func streamClosed() {
  685. self.eventLoop.assertInEventLoop()
  686. self.http2Delegate?.streamClosed(self)
  687. }
  688. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  689. self.eventLoop.assertInEventLoop()
  690. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  691. self, maxConcurrentStreams: maxConcurrentStreams
  692. )
  693. }
  694. /// The connection has started quiescing: notify the connectivity monitor of this.
  695. internal func beginQuiescing() {
  696. self.eventLoop.assertInEventLoop()
  697. self.connectivityDelegate?.connectionIsQuiescing(self)
  698. }
  699. }
  700. extension ConnectionManager {
  701. // A connection attempt failed; we never established a connection.
  702. private func connectionFailed(withError error: Error) {
  703. self.eventLoop.preconditionInEventLoop()
  704. switch self.state {
  705. case let .connecting(connecting):
  706. // Should we reconnect?
  707. switch connecting.reconnect {
  708. // No, shutdown.
  709. case .none:
  710. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  711. self.state = .shutdown(
  712. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  713. )
  714. connecting.readyChannelMuxPromise.fail(error)
  715. connecting.candidateMuxPromise.fail(error)
  716. // Yes, after a delay.
  717. case let .after(delay):
  718. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  719. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  720. self.startConnecting()
  721. }
  722. self.state = .transientFailure(
  723. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  724. )
  725. // Candidate mux users are not willing to wait.
  726. connecting.candidateMuxPromise.fail(error)
  727. }
  728. // The application must have called shutdown while we were trying to establish a connection
  729. // which was doomed to fail anyway. That's fine, we can ignore this.
  730. case .shutdown:
  731. ()
  732. // We can't fail to connect if we aren't trying.
  733. //
  734. // These cases are purposefully separated: some crash reporting services provide stack traces
  735. // which don't include the precondition failure message (which contain the invalid state we were
  736. // in). Keeping the cases separate allows us work out the state from the line number.
  737. case .idle:
  738. self.invalidState()
  739. case .active:
  740. self.invalidState()
  741. case .ready:
  742. self.invalidState()
  743. case .transientFailure:
  744. self.invalidState()
  745. }
  746. }
  747. }
  748. extension ConnectionManager {
  749. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  750. // states. Must be called on the `EventLoop`.
  751. private func startConnecting() {
  752. self.eventLoop.assertInEventLoop()
  753. switch self.state {
  754. case .idle:
  755. let iterator = self.connectionBackoff?.makeIterator()
  756. self.startConnecting(
  757. backoffIterator: iterator,
  758. muxPromise: self.eventLoop.makePromise()
  759. )
  760. case let .transientFailure(pending):
  761. self.startConnecting(
  762. backoffIterator: pending.backoffIterator,
  763. muxPromise: pending.readyChannelMuxPromise
  764. )
  765. // We shutdown before a scheduled connection attempt had started.
  766. case .shutdown:
  767. ()
  768. // These cases are purposefully separated: some crash reporting services provide stack traces
  769. // which don't include the precondition failure message (which contain the invalid state we were
  770. // in). Keeping the cases separate allows us work out the state from the line number.
  771. case .connecting:
  772. self.invalidState()
  773. case .active:
  774. self.invalidState()
  775. case .ready:
  776. self.invalidState()
  777. }
  778. }
  779. private func startConnecting(
  780. backoffIterator: ConnectionBackoffIterator?,
  781. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  782. ) {
  783. let timeoutAndBackoff = backoffIterator?.next()
  784. // We're already on the event loop: submit the connect so it starts after we've made the
  785. // state change to `.connecting`.
  786. self.eventLoop.assertInEventLoop()
  787. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  788. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  789. managedBy: self,
  790. onEventLoop: self.eventLoop,
  791. connectTimeout: timeoutAndBackoff.map { .seconds(timeInterval: $0.timeout) },
  792. logger: self.logger
  793. )
  794. channel.whenFailure { error in
  795. self.connectionFailed(withError: error)
  796. }
  797. return channel
  798. }
  799. // Should we reconnect if the candidate channel fails?
  800. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  801. let connecting = ConnectingState(
  802. backoffIterator: backoffIterator,
  803. reconnect: reconnect,
  804. candidate: candidate,
  805. readyChannelMuxPromise: muxPromise,
  806. candidateMuxPromise: self.eventLoop.makePromise()
  807. )
  808. self.state = .connecting(connecting)
  809. }
  810. }
  811. extension ConnectionManager {
  812. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  813. /// executing on the same `EventLoop` as the connection manager.
  814. internal var sync: Sync {
  815. return Sync(self)
  816. }
  817. internal struct Sync {
  818. private let manager: ConnectionManager
  819. fileprivate init(_ manager: ConnectionManager) {
  820. self.manager = manager
  821. }
  822. /// A delegate for connectivity changes.
  823. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  824. get {
  825. self.manager.eventLoop.assertInEventLoop()
  826. return self.manager.connectivityDelegate
  827. }
  828. nonmutating set {
  829. self.manager.eventLoop.assertInEventLoop()
  830. self.manager.connectivityDelegate = newValue
  831. }
  832. }
  833. /// A delegate for HTTP/2 connection changes.
  834. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  835. get {
  836. self.manager.eventLoop.assertInEventLoop()
  837. return self.manager.http2Delegate
  838. }
  839. nonmutating set {
  840. self.manager.eventLoop.assertInEventLoop()
  841. self.manager.http2Delegate = newValue
  842. }
  843. }
  844. /// Returns `true` if the connection is in the idle state.
  845. internal var isIdle: Bool {
  846. return self.manager.isIdle
  847. }
  848. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  849. /// other state.
  850. internal var multiplexer: HTTP2StreamMultiplexer? {
  851. return self.manager.multiplexer
  852. }
  853. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  854. internal func startConnecting() {
  855. self.manager.startConnecting()
  856. }
  857. }
  858. }
  859. extension ConnectionManager {
  860. private func invalidState(
  861. function: StaticString = #function,
  862. file: StaticString = #file,
  863. line: UInt = #line
  864. ) -> Never {
  865. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  866. }
  867. }