ConnectionManager.swift 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. // Unchecked because mutable state is always accessed and modified on a particular event loop.
  22. // APIs which _may_ be called from different threads execute onto the correct event loop first.
  23. // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
  24. // event loop is being used.
  25. @usableFromInline
  26. internal final class ConnectionManager: @unchecked Sendable {
  27. internal enum Reconnect {
  28. case none
  29. case after(TimeInterval)
  30. }
  31. internal struct ConnectingState {
  32. var backoffIterator: ConnectionBackoffIterator?
  33. var reconnect: Reconnect
  34. var connectError: Error?
  35. var candidate: EventLoopFuture<Channel>
  36. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  37. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  38. }
  39. internal struct ConnectedState {
  40. var backoffIterator: ConnectionBackoffIterator?
  41. var reconnect: Reconnect
  42. var candidate: Channel
  43. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  44. var multiplexer: HTTP2StreamMultiplexer
  45. var error: Error?
  46. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  47. self.backoffIterator = state.backoffIterator
  48. self.reconnect = state.reconnect
  49. self.candidate = candidate
  50. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  51. self.multiplexer = multiplexer
  52. }
  53. }
  54. internal struct ReadyState {
  55. var channel: Channel
  56. var multiplexer: HTTP2StreamMultiplexer
  57. var error: Error?
  58. init(from state: ConnectedState) {
  59. self.channel = state.candidate
  60. self.multiplexer = state.multiplexer
  61. }
  62. }
  63. internal struct TransientFailureState {
  64. var backoffIterator: ConnectionBackoffIterator?
  65. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  66. var scheduled: Scheduled<Void>
  67. var reason: Error
  68. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
  69. self.backoffIterator = state.backoffIterator
  70. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  71. self.scheduled = scheduled
  72. self.reason =
  73. reason
  74. ?? GRPCStatus(
  75. code: .unavailable,
  76. message: "Unexpected connection drop"
  77. )
  78. }
  79. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  80. self.backoffIterator = state.backoffIterator
  81. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  82. self.scheduled = scheduled
  83. self.reason =
  84. state.error
  85. ?? GRPCStatus(
  86. code: .unavailable,
  87. message: "Unexpected connection drop"
  88. )
  89. }
  90. init(
  91. from state: ReadyState,
  92. scheduled: Scheduled<Void>,
  93. backoffIterator: ConnectionBackoffIterator?
  94. ) {
  95. self.backoffIterator = backoffIterator
  96. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  97. self.scheduled = scheduled
  98. self.reason =
  99. state.error
  100. ?? GRPCStatus(
  101. code: .unavailable,
  102. message: "Unexpected connection drop"
  103. )
  104. }
  105. }
  106. internal struct ShutdownState {
  107. var closeFuture: EventLoopFuture<Void>
  108. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  109. /// this error.
  110. var reason: Error
  111. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  112. self.closeFuture = closeFuture
  113. self.reason = reason
  114. }
  115. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  116. return ShutdownState(
  117. closeFuture: closeFuture,
  118. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  119. )
  120. }
  121. }
  122. internal enum State {
  123. /// No `Channel` is required.
  124. ///
  125. /// Valid next states:
  126. /// - `connecting`
  127. /// - `shutdown`
  128. case idle(lastError: Error?)
  129. /// We're actively trying to establish a connection.
  130. ///
  131. /// Valid next states:
  132. /// - `active`
  133. /// - `transientFailure` (if our attempt fails and we're going to try again)
  134. /// - `shutdown`
  135. case connecting(ConnectingState)
  136. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  137. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  138. ///
  139. /// Valid next states:
  140. /// - `ready`
  141. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  142. /// to re-establish the connection)
  143. /// - `shutdown`
  144. case active(ConnectedState)
  145. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  146. /// the channel for making RPCs.
  147. ///
  148. /// Valid next states:
  149. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  150. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  151. /// - `shutdown`
  152. case ready(ReadyState)
  153. /// A `Channel` is desired, we'll attempt to create one in the future.
  154. ///
  155. /// Valid next states:
  156. /// - `connecting`
  157. /// - `shutdown`
  158. case transientFailure(TransientFailureState)
  159. /// We never want another `Channel`: this state is terminal.
  160. case shutdown(ShutdownState)
  161. fileprivate var label: String {
  162. switch self {
  163. case .idle:
  164. return "idle"
  165. case .connecting:
  166. return "connecting"
  167. case .active:
  168. return "active"
  169. case .ready:
  170. return "ready"
  171. case .transientFailure:
  172. return "transientFailure"
  173. case .shutdown:
  174. return "shutdown"
  175. }
  176. }
  177. }
  178. /// The last 'external' state we are in, a subset of the internal state.
  179. private var externalState: _ConnectivityState = .idle(nil)
  180. /// Update the external state, potentially notifying a delegate about the change.
  181. private func updateExternalState(to nextState: _ConnectivityState) {
  182. if !self.externalState.isSameState(as: nextState) {
  183. let oldState = self.externalState
  184. self.externalState = nextState
  185. self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
  186. }
  187. }
  188. /// Our current state.
  189. private var state: State {
  190. didSet {
  191. switch self.state {
  192. case let .idle(error):
  193. self.updateExternalState(to: .idle(error))
  194. self.updateConnectionID()
  195. case .connecting:
  196. self.updateExternalState(to: .connecting)
  197. // This is an internal state.
  198. case .active:
  199. ()
  200. case .ready:
  201. self.updateExternalState(to: .ready)
  202. case let .transientFailure(state):
  203. self.updateExternalState(to: .transientFailure(state.reason))
  204. self.updateConnectionID()
  205. case .shutdown:
  206. self.updateExternalState(to: .shutdown)
  207. }
  208. }
  209. }
  210. /// Returns whether the state is 'idle'.
  211. private var isIdle: Bool {
  212. self.eventLoop.assertInEventLoop()
  213. switch self.state {
  214. case .idle:
  215. return true
  216. case .connecting, .transientFailure, .active, .ready, .shutdown:
  217. return false
  218. }
  219. }
  220. /// Returns whether the state is 'connecting'.
  221. private var isConnecting: Bool {
  222. self.eventLoop.assertInEventLoop()
  223. switch self.state {
  224. case .connecting:
  225. return true
  226. case .idle, .transientFailure, .active, .ready, .shutdown:
  227. return false
  228. }
  229. }
  230. /// Returns whether the state is 'ready'.
  231. private var isReady: Bool {
  232. self.eventLoop.assertInEventLoop()
  233. switch self.state {
  234. case .ready:
  235. return true
  236. case .idle, .active, .connecting, .transientFailure, .shutdown:
  237. return false
  238. }
  239. }
  240. /// Returns whether the state is 'ready'.
  241. private var isTransientFailure: Bool {
  242. self.eventLoop.assertInEventLoop()
  243. switch self.state {
  244. case .transientFailure:
  245. return true
  246. case .idle, .connecting, .active, .ready, .shutdown:
  247. return false
  248. }
  249. }
  250. /// Returns whether the state is 'shutdown'.
  251. private var isShutdown: Bool {
  252. self.eventLoop.assertInEventLoop()
  253. switch self.state {
  254. case .shutdown:
  255. return true
  256. case .idle, .connecting, .transientFailure, .active, .ready:
  257. return false
  258. }
  259. }
  260. /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
  261. private var multiplexer: HTTP2StreamMultiplexer? {
  262. self.eventLoop.assertInEventLoop()
  263. switch self.state {
  264. case let .ready(state):
  265. return state.multiplexer
  266. case .idle, .connecting, .transientFailure, .active, .shutdown:
  267. return nil
  268. }
  269. }
  270. /// The `EventLoop` that the managed connection will run on.
  271. internal let eventLoop: EventLoop
  272. /// A delegate for connectivity changes. Executed on the `EventLoop`.
  273. private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
  274. /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
  275. private var http2Delegate: ConnectionManagerHTTP2Delegate?
  276. /// An `EventLoopFuture<Channel>` provider.
  277. private let channelProvider: ConnectionManagerChannelProvider
  278. /// The behavior for starting a call, i.e. how patient is the caller when asking for a
  279. /// multiplexer.
  280. private let callStartBehavior: CallStartBehavior.Behavior
  281. /// The configuration to use when backing off between connection attempts, if reconnection
  282. /// attempts should be made at all.
  283. private let connectionBackoff: ConnectionBackoff?
  284. /// A logger.
  285. internal var logger: Logger
  286. private let connectionID: String
  287. private var channelNumber: UInt64
  288. private var channelNumberLock = NIOLock()
  289. private var _connectionIDAndNumber: String {
  290. return "\(self.connectionID)/\(self.channelNumber)"
  291. }
  292. private var connectionIDAndNumber: String {
  293. return self.channelNumberLock.withLock {
  294. return self._connectionIDAndNumber
  295. }
  296. }
  297. private func updateConnectionID() {
  298. self.channelNumberLock.withLock {
  299. self.channelNumber &+= 1
  300. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  301. }
  302. }
  303. internal func appendMetadata(to logger: inout Logger) {
  304. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  305. }
  306. internal convenience init(
  307. configuration: ClientConnection.Configuration,
  308. channelProvider: ConnectionManagerChannelProvider? = nil,
  309. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  310. logger: Logger
  311. ) {
  312. self.init(
  313. eventLoop: configuration.eventLoopGroup.next(),
  314. channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
  315. callStartBehavior: configuration.callStartBehavior.wrapped,
  316. connectionBackoff: configuration.connectionBackoff,
  317. connectivityDelegate: connectivityDelegate,
  318. http2Delegate: nil,
  319. logger: logger
  320. )
  321. }
  322. internal init(
  323. eventLoop: EventLoop,
  324. channelProvider: ConnectionManagerChannelProvider,
  325. callStartBehavior: CallStartBehavior.Behavior,
  326. connectionBackoff: ConnectionBackoff?,
  327. connectivityDelegate: ConnectionManagerConnectivityDelegate?,
  328. http2Delegate: ConnectionManagerHTTP2Delegate?,
  329. logger: Logger
  330. ) {
  331. // Setup the logger.
  332. var logger = logger
  333. let connectionID = UUID().uuidString
  334. let channelNumber: UInt64 = 0
  335. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  336. self.eventLoop = eventLoop
  337. self.state = .idle(lastError: nil)
  338. self.channelProvider = channelProvider
  339. self.callStartBehavior = callStartBehavior
  340. self.connectionBackoff = connectionBackoff
  341. self.connectivityDelegate = connectivityDelegate
  342. self.http2Delegate = http2Delegate
  343. self.connectionID = connectionID
  344. self.channelNumber = channelNumber
  345. self.logger = logger
  346. }
  347. /// Get the multiplexer from the underlying channel handling gRPC calls.
  348. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  349. /// one chance to connect - if not reconnections are managed here.
  350. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  351. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  352. switch self.callStartBehavior {
  353. case .waitsForConnectivity:
  354. return self.getHTTP2MultiplexerPatient()
  355. case .fastFailure:
  356. return self.getHTTP2MultiplexerOptimistic()
  357. }
  358. }
  359. if self.eventLoop.inEventLoop {
  360. return getHTTP2Multiplexer0()
  361. } else {
  362. return self.eventLoop.flatSubmit {
  363. getHTTP2Multiplexer0()
  364. }
  365. }
  366. }
  367. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  368. /// Reconnects are handled if necessary.
  369. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  370. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  371. switch self.state {
  372. case .idle:
  373. self.startConnecting()
  374. // We started connecting so we must transition to the `connecting` state.
  375. guard case let .connecting(connecting) = self.state else {
  376. self.unreachableState()
  377. }
  378. multiplexer = connecting.readyChannelMuxPromise.futureResult
  379. case let .connecting(state):
  380. multiplexer = state.readyChannelMuxPromise.futureResult
  381. case let .active(state):
  382. multiplexer = state.readyChannelMuxPromise.futureResult
  383. case let .ready(state):
  384. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  385. case let .transientFailure(state):
  386. multiplexer = state.readyChannelMuxPromise.futureResult
  387. case let .shutdown(state):
  388. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  389. }
  390. self.logger.debug(
  391. "vending multiplexer future",
  392. metadata: [
  393. "connectivity_state": "\(self.state.label)"
  394. ]
  395. )
  396. return multiplexer
  397. }
  398. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  399. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  400. ///
  401. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  402. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  403. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  404. self.eventLoop.preconditionInEventLoop()
  405. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  406. switch self.state {
  407. case .idle:
  408. self.startConnecting()
  409. // We started connecting so we must transition to the `connecting` state.
  410. guard case let .connecting(connecting) = self.state else {
  411. self.unreachableState()
  412. }
  413. return connecting.candidateMuxPromise.futureResult
  414. case let .connecting(state):
  415. return state.candidateMuxPromise.futureResult
  416. case let .active(active):
  417. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  418. case let .ready(ready):
  419. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  420. case let .transientFailure(state):
  421. return self.eventLoop.makeFailedFuture(state.reason)
  422. case let .shutdown(state):
  423. return self.eventLoop.makeFailedFuture(state.reason)
  424. }
  425. }()
  426. self.logger.debug(
  427. "vending fast-failing multiplexer future",
  428. metadata: [
  429. "connectivity_state": "\(self.state.label)"
  430. ]
  431. )
  432. return muxFuture
  433. }
  434. @usableFromInline
  435. internal enum ShutdownMode {
  436. /// Closes the underlying channel without waiting for existing RPCs to complete.
  437. case forceful
  438. /// Allows running RPCs to run their course before closing the underlying channel. No new
  439. /// streams may be created.
  440. case graceful(NIODeadline)
  441. }
  442. /// Shutdown the underlying connection.
  443. ///
  444. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  445. internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
  446. let promise = self.eventLoop.makePromise(of: Void.self)
  447. self.shutdown(mode: mode, promise: promise)
  448. return promise.futureResult
  449. }
  450. /// Shutdown the underlying connection.
  451. ///
  452. /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
  453. internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  454. if self.eventLoop.inEventLoop {
  455. self._shutdown(mode: mode, promise: promise)
  456. } else {
  457. self.eventLoop.execute {
  458. self._shutdown(mode: mode, promise: promise)
  459. }
  460. }
  461. }
  462. private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
  463. self.logger.debug(
  464. "shutting down connection",
  465. metadata: [
  466. "connectivity_state": "\(self.state.label)",
  467. "shutdown.mode": "\(mode)",
  468. ]
  469. )
  470. switch self.state {
  471. // We don't have a channel and we don't want one, easy!
  472. case .idle:
  473. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  474. self.state = .shutdown(shutdown)
  475. promise.succeed(())
  476. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  477. // the shutdown future and deal with any fallout from the connecting channel without the
  478. // application knowing.
  479. case let .connecting(state):
  480. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  481. self.state = .shutdown(shutdown)
  482. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  483. // connect the application shouldn't have access to the channel or multiplexer.
  484. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  485. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  486. // Complete the shutdown promise when the connection attempt has completed.
  487. state.candidate.whenComplete {
  488. switch $0 {
  489. case let .success(channel):
  490. // In case we do successfully connect, close on the next loop tick. When connecting a
  491. // channel NIO will complete the promise for the channel before firing channel active.
  492. // That means we may close and fire inactive before active which HTTP/2 will be unhappy
  493. // about.
  494. self.eventLoop.execute {
  495. channel.close(mode: .all, promise: nil)
  496. promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
  497. }
  498. case .failure:
  499. // We failed to connect, that's fine we still shutdown successfully.
  500. promise.succeed(())
  501. }
  502. }
  503. // We have an active channel but the application doesn't know about it yet. We'll do the same
  504. // as for `.connecting`.
  505. case let .active(state):
  506. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  507. self.state = .shutdown(shutdown)
  508. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  509. // connect the application shouldn't have access to the channel or multiplexer.
  510. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  511. // We have a channel, close it. We only create streams in the ready state so there's no need
  512. // to quiesce here.
  513. state.candidate.close(mode: .all, promise: nil)
  514. promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
  515. // The channel is up and running: the application could be using it. We can close it and
  516. // return the `closeFuture`.
  517. case let .ready(state):
  518. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  519. self.state = .shutdown(shutdown)
  520. switch mode {
  521. case .forceful:
  522. // We have a channel, close it.
  523. state.channel.close(mode: .all, promise: nil)
  524. case let .graceful(deadline):
  525. // If we don't close by the deadline forcibly close the channel.
  526. let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
  527. self.logger.info("shutdown timer expired, forcibly closing connection")
  528. state.channel.close(mode: .all, promise: nil)
  529. }
  530. // Cancel the force close if we close normally first.
  531. state.channel.closeFuture.whenComplete { _ in
  532. scheduledForceClose.cancel()
  533. }
  534. // Tell the channel to quiesce. It will be picked up by the idle handler which will close
  535. // the channel when all streams have been closed.
  536. state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  537. }
  538. // Complete the promise when we eventually close.
  539. promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
  540. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  541. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  542. // if we cancelled too late.
  543. case let .transientFailure(state):
  544. let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
  545. self.state = .shutdown(shutdown)
  546. // Stop the creation of a new channel, if we can. If we can't then the task to
  547. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  548. state.scheduled.cancel()
  549. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  550. // connect the application shouldn't should have access to the channel.
  551. state.readyChannelMuxPromise.fail(shutdown.reason)
  552. // No active channel, so complete the shutdown promise now.
  553. promise.succeed(())
  554. // We're already shutdown; there's nothing to do.
  555. case let .shutdown(state):
  556. promise.completeWith(state.closeFuture)
  557. }
  558. }
  559. /// Registers a callback which fires when the current active connection is closed.
  560. ///
  561. /// If there is a connection, the callback will be invoked with `true` when the connection is
  562. /// closed. Otherwise the callback is invoked with `false`.
  563. internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  564. if self.eventLoop.inEventLoop {
  565. self._onCurrentConnectionClose(onClose)
  566. } else {
  567. self.eventLoop.execute {
  568. self._onCurrentConnectionClose(onClose)
  569. }
  570. }
  571. }
  572. private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
  573. self.eventLoop.assertInEventLoop()
  574. switch self.state {
  575. case let .ready(state):
  576. state.channel.closeFuture.whenComplete { _ in onClose(true) }
  577. case .idle, .connecting, .active, .transientFailure, .shutdown:
  578. onClose(false)
  579. }
  580. }
  581. // MARK: - State changes from the channel handler.
  582. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  583. /// some context.
  584. internal func channelError(_ error: Error) {
  585. self.eventLoop.preconditionInEventLoop()
  586. switch self.state {
  587. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  588. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  589. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  590. case .idle:
  591. self.logger.warning(
  592. "ignoring unexpected error in idle",
  593. metadata: [
  594. MetadataKey.error: "\(error)"
  595. ]
  596. )
  597. case .connecting(var state):
  598. // Record the error, the channel promise will notify the manager of any error which occurs
  599. // while connecting. It may be overridden by this error if it contains more relevant
  600. // information
  601. if state.connectError == nil {
  602. state.connectError = error
  603. self.state = .connecting(state)
  604. // The pool is only notified of connection errors when the connection transitions to the
  605. // transient failure state. However, in some cases (i.e. with NIOTS), errors can be thrown
  606. // during the connect but before the connect times out.
  607. //
  608. // This opens up a period of time where you can start a call and have it fail with
  609. // deadline exceeded (because no connection was available within the configured max
  610. // wait time for the pool) but without any diagnostic information. The information is
  611. // available but it hasn't been made available to the pool at that point in time.
  612. //
  613. // The delegate can't easily be modified (it's public API) and a new API doesn't make all
  614. // that much sense so we elect to check whether the delegate is the pool and call it
  615. // directly.
  616. if let pool = self.connectivityDelegate as? ConnectionPool {
  617. pool.sync.updateMostRecentError(error)
  618. }
  619. }
  620. case var .active(state):
  621. state.error = error
  622. self.state = .active(state)
  623. case var .ready(state):
  624. state.error = error
  625. self.state = .ready(state)
  626. // If we've already in one of these states, then additional errors aren't helpful to us.
  627. case .transientFailure, .shutdown:
  628. ()
  629. }
  630. }
  631. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  632. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  633. self.eventLoop.preconditionInEventLoop()
  634. self.logger.debug(
  635. "activating connection",
  636. metadata: [
  637. "connectivity_state": "\(self.state.label)"
  638. ]
  639. )
  640. switch self.state {
  641. case let .connecting(connecting):
  642. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  643. self.state = .active(connected)
  644. // Optimistic connections are happy this this level of setup.
  645. connecting.candidateMuxPromise.succeed(multiplexer)
  646. // Application called shutdown before the channel become active; we should close it.
  647. case .shutdown:
  648. channel.close(mode: .all, promise: nil)
  649. case .idle, .transientFailure:
  650. // Received a channelActive when not connecting. Can happen if channelActive and
  651. // channelInactive are reordered. Ignore.
  652. ()
  653. case .active, .ready:
  654. // Received a second 'channelActive', already active so ignore.
  655. ()
  656. }
  657. }
  658. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  659. /// Must be called on the `EventLoop`.
  660. internal func channelInactive() {
  661. self.eventLoop.preconditionInEventLoop()
  662. self.logger.debug(
  663. "deactivating connection",
  664. metadata: [
  665. "connectivity_state": "\(self.state.label)"
  666. ]
  667. )
  668. switch self.state {
  669. // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
  670. // common but we should tolerate it.
  671. case let .connecting(connecting):
  672. // Should we try connecting again?
  673. switch connecting.reconnect {
  674. // No, shutdown instead.
  675. case .none:
  676. self.logger.debug("shutting down connection")
  677. let error = GRPCStatus(
  678. code: .unavailable,
  679. message: "The connection was dropped and connection re-establishment is disabled"
  680. )
  681. let shutdownState = ShutdownState(
  682. closeFuture: self.eventLoop.makeSucceededFuture(()),
  683. reason: error
  684. )
  685. self.state = .shutdown(shutdownState)
  686. // Shutting down, so fail the outstanding promises.
  687. connecting.readyChannelMuxPromise.fail(error)
  688. connecting.candidateMuxPromise.fail(error)
  689. // Yes, after some time.
  690. case let .after(delay):
  691. let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
  692. // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
  693. connecting.candidateMuxPromise.fail(error)
  694. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  695. self.startConnecting()
  696. }
  697. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  698. self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
  699. }
  700. // The channel is `active` but not `ready`. Should we try again?
  701. case let .active(active):
  702. switch active.reconnect {
  703. // No, shutdown instead.
  704. case .none:
  705. self.logger.debug("shutting down connection")
  706. let error = GRPCStatus(
  707. code: .unavailable,
  708. message: "The connection was dropped and connection re-establishment is disabled"
  709. )
  710. let shutdownState = ShutdownState(
  711. closeFuture: self.eventLoop.makeSucceededFuture(()),
  712. reason: error
  713. )
  714. self.state = .shutdown(shutdownState)
  715. active.readyChannelMuxPromise.fail(error)
  716. // Yes, after some time.
  717. case let .after(delay):
  718. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  719. self.startConnecting()
  720. }
  721. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  722. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  723. }
  724. // The channel was ready and working fine but something went wrong. Should we try to replace
  725. // the channel?
  726. case let .ready(ready):
  727. // No, no backoff is configured.
  728. if self.connectionBackoff == nil {
  729. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  730. self.state = .shutdown(
  731. ShutdownState(
  732. closeFuture: ready.channel.closeFuture,
  733. reason: GRPCStatus(
  734. code: .unavailable,
  735. message: "The connection was dropped and a reconnect was not configured"
  736. )
  737. )
  738. )
  739. } else {
  740. // Yes, start connecting now. We should go via `transientFailure`, however.
  741. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  742. self.startConnecting()
  743. }
  744. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  745. let backoffIterator = self.connectionBackoff?.makeIterator()
  746. self.state = .transientFailure(
  747. TransientFailureState(
  748. from: ready,
  749. scheduled: scheduled,
  750. backoffIterator: backoffIterator
  751. )
  752. )
  753. }
  754. // This is fine: we expect the channel to become inactive after becoming idle.
  755. case .idle:
  756. ()
  757. // We're already shutdown, that's fine.
  758. case .shutdown:
  759. ()
  760. // Received 'channelInactive' twice; fine, ignore.
  761. case .transientFailure:
  762. ()
  763. }
  764. }
  765. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  766. /// called on the `EventLoop`.
  767. internal func ready() {
  768. self.eventLoop.preconditionInEventLoop()
  769. self.logger.debug(
  770. "connection ready",
  771. metadata: [
  772. "connectivity_state": "\(self.state.label)"
  773. ]
  774. )
  775. switch self.state {
  776. case let .active(connected):
  777. self.state = .ready(ReadyState(from: connected))
  778. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  779. case .shutdown:
  780. ()
  781. case .idle, .transientFailure:
  782. // No connection or connection attempt exists but connection was marked as ready. This is
  783. // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
  784. // error to.
  785. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  786. case .connecting:
  787. // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
  788. // mode.
  789. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  790. case .ready:
  791. // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
  792. assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
  793. }
  794. }
  795. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  796. /// the `EventLoop`.
  797. internal func idle() {
  798. self.eventLoop.preconditionInEventLoop()
  799. self.logger.debug(
  800. "idling connection",
  801. metadata: [
  802. "connectivity_state": "\(self.state.label)"
  803. ]
  804. )
  805. switch self.state {
  806. case let .active(state):
  807. // This state is reachable if the keepalive timer fires before we reach the ready state.
  808. self.state = .idle(lastError: state.error)
  809. state.readyChannelMuxPromise
  810. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  811. case let .ready(state):
  812. self.state = .idle(lastError: state.error)
  813. case .shutdown:
  814. // This is expected when the connection is closed by the user: when the channel becomes
  815. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  816. // 'channelInactive()'.
  817. ()
  818. case .idle, .transientFailure:
  819. // There's no connection to idle; ignore.
  820. ()
  821. case .connecting:
  822. // The idle watchdog is started when the connection is active, this shouldn't happen
  823. // in the connecting state. Ignore it in release mode.
  824. assertionFailure("tried to idle a connection in the \(self.state.label) state")
  825. }
  826. }
  827. internal func streamOpened() {
  828. self.eventLoop.assertInEventLoop()
  829. self.http2Delegate?.streamOpened(self)
  830. }
  831. internal func streamClosed() {
  832. self.eventLoop.assertInEventLoop()
  833. self.http2Delegate?.streamClosed(self)
  834. }
  835. internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
  836. self.eventLoop.assertInEventLoop()
  837. self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
  838. self,
  839. maxConcurrentStreams: maxConcurrentStreams
  840. )
  841. }
  842. /// The connection has started quiescing: notify the connectivity monitor of this.
  843. internal func beginQuiescing() {
  844. self.eventLoop.assertInEventLoop()
  845. self.connectivityDelegate?.connectionIsQuiescing(self)
  846. }
  847. }
  848. extension ConnectionManager {
  849. // A connection attempt failed; we never established a connection.
  850. private func connectionFailed(withError error: Error) {
  851. self.eventLoop.preconditionInEventLoop()
  852. switch self.state {
  853. case let .connecting(connecting):
  854. let reportedError: Error
  855. switch error as? ChannelError {
  856. case .some(.connectTimeout):
  857. // A more relevant error may have been caught earlier. Use that in preference to the
  858. // timeout as it'll likely be more useful.
  859. reportedError = connecting.connectError ?? error
  860. default:
  861. reportedError = error
  862. }
  863. // Should we reconnect?
  864. switch connecting.reconnect {
  865. // No, shutdown.
  866. case .none:
  867. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  868. self.state = .shutdown(
  869. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: reportedError)
  870. )
  871. connecting.readyChannelMuxPromise.fail(reportedError)
  872. connecting.candidateMuxPromise.fail(reportedError)
  873. // Yes, after a delay.
  874. case let .after(delay):
  875. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  876. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  877. self.startConnecting()
  878. }
  879. self.state = .transientFailure(
  880. TransientFailureState(from: connecting, scheduled: scheduled, reason: reportedError)
  881. )
  882. // Candidate mux users are not willing to wait.
  883. connecting.candidateMuxPromise.fail(reportedError)
  884. }
  885. // The application must have called shutdown while we were trying to establish a connection
  886. // which was doomed to fail anyway. That's fine, we can ignore this.
  887. case .shutdown:
  888. ()
  889. // Connection attempt failed, but no connection attempt is in progress.
  890. case .idle, .active, .ready, .transientFailure:
  891. // Nothing we can do other than ignore in release mode.
  892. assertionFailure("connect promise failed in \(self.state.label) state")
  893. }
  894. }
  895. }
  896. extension ConnectionManager {
  897. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  898. // states. Must be called on the `EventLoop`.
  899. private func startConnecting() {
  900. self.eventLoop.assertInEventLoop()
  901. switch self.state {
  902. case .idle:
  903. let iterator = self.connectionBackoff?.makeIterator()
  904. // The iterator produces the connect timeout and the backoff to use for the next attempt. This
  905. // is unfortunate if retries is set to none because we need to connect timeout but not the
  906. // backoff yet the iterator will not return a value to us. To workaround this we grab the
  907. // connect timeout and override it.
  908. let connectTimeoutOverride: TimeAmount?
  909. if let backoff = self.connectionBackoff, backoff.retries == .none {
  910. connectTimeoutOverride = .seconds(timeInterval: backoff.minimumConnectionTimeout)
  911. } else {
  912. connectTimeoutOverride = nil
  913. }
  914. self.startConnecting(
  915. backoffIterator: iterator,
  916. muxPromise: self.eventLoop.makePromise(),
  917. connectTimeoutOverride: connectTimeoutOverride
  918. )
  919. case let .transientFailure(pending):
  920. self.startConnecting(
  921. backoffIterator: pending.backoffIterator,
  922. muxPromise: pending.readyChannelMuxPromise
  923. )
  924. // We shutdown before a scheduled connection attempt had started.
  925. case .shutdown:
  926. ()
  927. // We only call startConnecting() if the connection does not exist and after checking what the
  928. // current state is, so none of these states should be reachable.
  929. case .connecting:
  930. self.unreachableState()
  931. case .active:
  932. self.unreachableState()
  933. case .ready:
  934. self.unreachableState()
  935. }
  936. }
  937. private func startConnecting(
  938. backoffIterator: ConnectionBackoffIterator?,
  939. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>,
  940. connectTimeoutOverride: TimeAmount? = nil
  941. ) {
  942. let timeoutAndBackoff = backoffIterator?.next()
  943. // We're already on the event loop: submit the connect so it starts after we've made the
  944. // state change to `.connecting`.
  945. self.eventLoop.assertInEventLoop()
  946. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  947. let connectTimeout: TimeAmount?
  948. if let connectTimeoutOverride = connectTimeoutOverride {
  949. connectTimeout = connectTimeoutOverride
  950. } else {
  951. connectTimeout = timeoutAndBackoff.map { TimeAmount.seconds(timeInterval: $0.timeout) }
  952. }
  953. let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
  954. managedBy: self,
  955. onEventLoop: self.eventLoop,
  956. connectTimeout: connectTimeout,
  957. logger: self.logger
  958. )
  959. channel.whenFailure { error in
  960. self.connectionFailed(withError: error)
  961. }
  962. return channel
  963. }
  964. // Should we reconnect if the candidate channel fails?
  965. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  966. let connecting = ConnectingState(
  967. backoffIterator: backoffIterator,
  968. reconnect: reconnect,
  969. candidate: candidate,
  970. readyChannelMuxPromise: muxPromise,
  971. candidateMuxPromise: self.eventLoop.makePromise()
  972. )
  973. self.state = .connecting(connecting)
  974. }
  975. }
  976. extension ConnectionManager {
  977. /// Returns a synchronous view of the connection manager; each operation requires the caller to be
  978. /// executing on the same `EventLoop` as the connection manager.
  979. internal var sync: Sync {
  980. return Sync(self)
  981. }
  982. internal struct Sync {
  983. private let manager: ConnectionManager
  984. fileprivate init(_ manager: ConnectionManager) {
  985. self.manager = manager
  986. }
  987. /// A delegate for connectivity changes.
  988. internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
  989. get {
  990. self.manager.eventLoop.assertInEventLoop()
  991. return self.manager.connectivityDelegate
  992. }
  993. nonmutating set {
  994. self.manager.eventLoop.assertInEventLoop()
  995. self.manager.connectivityDelegate = newValue
  996. }
  997. }
  998. /// A delegate for HTTP/2 connection changes.
  999. internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
  1000. get {
  1001. self.manager.eventLoop.assertInEventLoop()
  1002. return self.manager.http2Delegate
  1003. }
  1004. nonmutating set {
  1005. self.manager.eventLoop.assertInEventLoop()
  1006. self.manager.http2Delegate = newValue
  1007. }
  1008. }
  1009. /// Returns `true` if the connection is in the idle state.
  1010. internal var isIdle: Bool {
  1011. return self.manager.isIdle
  1012. }
  1013. /// Returns `true` if the connection is in the connecting state.
  1014. internal var isConnecting: Bool {
  1015. return self.manager.isConnecting
  1016. }
  1017. /// Returns `true` if the connection is in the ready state.
  1018. internal var isReady: Bool {
  1019. return self.manager.isReady
  1020. }
  1021. /// Returns `true` if the connection is in the transient failure state.
  1022. internal var isTransientFailure: Bool {
  1023. return self.manager.isTransientFailure
  1024. }
  1025. /// Returns `true` if the connection is in the shutdown state.
  1026. internal var isShutdown: Bool {
  1027. return self.manager.isShutdown
  1028. }
  1029. /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
  1030. /// other state.
  1031. internal var multiplexer: HTTP2StreamMultiplexer? {
  1032. return self.manager.multiplexer
  1033. }
  1034. // Start establishing a connection. Must only be called when `isIdle` is `true`.
  1035. internal func startConnecting() {
  1036. self.manager.startConnecting()
  1037. }
  1038. }
  1039. }
  1040. extension ConnectionManager {
  1041. private func unreachableState(
  1042. function: StaticString = #function,
  1043. file: StaticString = #fileID,
  1044. line: UInt = #line
  1045. ) -> Never {
  1046. fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
  1047. }
  1048. }