ConnectionManager.swift 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOConcurrencyHelpers
  20. import NIOHTTP2
  21. internal class ConnectionManager {
  22. internal enum Reconnect {
  23. case none
  24. case after(TimeInterval)
  25. }
  26. internal struct ConnectingState {
  27. var backoffIterator: ConnectionBackoffIterator?
  28. var reconnect: Reconnect
  29. var candidate: EventLoopFuture<Channel>
  30. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  31. var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  32. }
  33. internal struct ConnectedState {
  34. var backoffIterator: ConnectionBackoffIterator?
  35. var reconnect: Reconnect
  36. var candidate: Channel
  37. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  38. var multiplexer: HTTP2StreamMultiplexer
  39. var error: Error?
  40. init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
  41. self.backoffIterator = state.backoffIterator
  42. self.reconnect = state.reconnect
  43. self.candidate = candidate
  44. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  45. self.multiplexer = multiplexer
  46. }
  47. }
  48. internal struct ReadyState {
  49. var channel: Channel
  50. var multiplexer: HTTP2StreamMultiplexer
  51. var error: Error?
  52. init(from state: ConnectedState) {
  53. self.channel = state.candidate
  54. self.multiplexer = state.multiplexer
  55. }
  56. }
  57. internal struct TransientFailureState {
  58. var backoffIterator: ConnectionBackoffIterator?
  59. var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  60. var scheduled: Scheduled<Void>
  61. var reason: Error?
  62. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  63. self.backoffIterator = state.backoffIterator
  64. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  65. self.scheduled = scheduled
  66. self.reason = reason
  67. }
  68. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  69. self.backoffIterator = state.backoffIterator
  70. self.readyChannelMuxPromise = state.readyChannelMuxPromise
  71. self.scheduled = scheduled
  72. self.reason = state.error
  73. }
  74. init(
  75. from state: ReadyState,
  76. scheduled: Scheduled<Void>,
  77. backoffIterator: ConnectionBackoffIterator?
  78. ) {
  79. self.backoffIterator = backoffIterator
  80. self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
  81. self.scheduled = scheduled
  82. self.reason = state.error
  83. }
  84. }
  85. internal struct ShutdownState {
  86. var closeFuture: EventLoopFuture<Void>
  87. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  88. /// this error.
  89. var reason: Error
  90. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  91. self.closeFuture = closeFuture
  92. self.reason = reason
  93. }
  94. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  95. return ShutdownState(
  96. closeFuture: closeFuture,
  97. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  98. )
  99. }
  100. }
  101. internal enum State {
  102. /// No `Channel` is required.
  103. ///
  104. /// Valid next states:
  105. /// - `connecting`
  106. /// - `shutdown`
  107. case idle
  108. /// We're actively trying to establish a connection.
  109. ///
  110. /// Valid next states:
  111. /// - `active`
  112. /// - `transientFailure` (if our attempt fails and we're going to try again)
  113. /// - `shutdown`
  114. case connecting(ConnectingState)
  115. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  116. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  117. ///
  118. /// Valid next states:
  119. /// - `ready`
  120. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  121. /// to re-establish the connection)
  122. /// - `shutdown`
  123. case active(ConnectedState)
  124. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  125. /// the channel for making RPCs.
  126. ///
  127. /// Valid next states:
  128. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  129. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  130. /// - `shutdown`
  131. case ready(ReadyState)
  132. /// A `Channel` is desired, we'll attempt to create one in the future.
  133. ///
  134. /// Valid next states:
  135. /// - `connecting`
  136. /// - `shutdown`
  137. case transientFailure(TransientFailureState)
  138. /// We never want another `Channel`: this state is terminal.
  139. case shutdown(ShutdownState)
  140. fileprivate var label: String {
  141. switch self {
  142. case .idle:
  143. return "idle"
  144. case .connecting:
  145. return "connecting"
  146. case .active:
  147. return "active"
  148. case .ready:
  149. return "ready"
  150. case .transientFailure:
  151. return "transientFailure"
  152. case .shutdown:
  153. return "shutdown"
  154. }
  155. }
  156. }
  157. private var state: State {
  158. didSet {
  159. switch self.state {
  160. case .idle:
  161. self.monitor.updateState(to: .idle, logger: self.logger)
  162. self.updateConnectionID()
  163. case .connecting:
  164. self.monitor.updateState(to: .connecting, logger: self.logger)
  165. // This is an internal state.
  166. case .active:
  167. ()
  168. case .ready:
  169. self.monitor.updateState(to: .ready, logger: self.logger)
  170. case .transientFailure:
  171. self.monitor.updateState(to: .transientFailure, logger: self.logger)
  172. self.updateConnectionID()
  173. case .shutdown:
  174. self.monitor.updateState(to: .shutdown, logger: self.logger)
  175. }
  176. }
  177. }
  178. internal let eventLoop: EventLoop
  179. internal let monitor: ConnectivityStateMonitor
  180. internal var logger: Logger
  181. private let configuration: ClientConnection.Configuration
  182. private let connectionID: String
  183. private var channelNumber: UInt64
  184. private var channelNumberLock = Lock()
  185. private var _connectionIDAndNumber: String {
  186. return "\(self.connectionID)/\(self.channelNumber)"
  187. }
  188. private var connectionIDAndNumber: String {
  189. return self.channelNumberLock.withLock {
  190. return self._connectionIDAndNumber
  191. }
  192. }
  193. private func updateConnectionID() {
  194. self.channelNumberLock.withLockVoid {
  195. self.channelNumber &+= 1
  196. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  197. }
  198. }
  199. internal func appendMetadata(to logger: inout Logger) {
  200. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  201. }
  202. // Only used for testing.
  203. private var channelProvider: (() -> EventLoopFuture<Channel>)?
  204. internal convenience init(configuration: ClientConnection.Configuration, logger: Logger) {
  205. self.init(configuration: configuration, logger: logger, channelProvider: nil)
  206. }
  207. /// Create a `ConnectionManager` for testing: uses the given `channelProvider` to create channels.
  208. internal static func testingOnly(
  209. configuration: ClientConnection.Configuration,
  210. logger: Logger,
  211. channelProvider: @escaping () -> EventLoopFuture<Channel>
  212. ) -> ConnectionManager {
  213. return ConnectionManager(
  214. configuration: configuration,
  215. logger: logger,
  216. channelProvider: channelProvider
  217. )
  218. }
  219. private init(
  220. configuration: ClientConnection.Configuration,
  221. logger: Logger,
  222. channelProvider: (() -> EventLoopFuture<Channel>)?
  223. ) {
  224. // Setup the logger.
  225. var logger = logger
  226. let connectionID = UUID().uuidString
  227. let channelNumber: UInt64 = 0
  228. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  229. let eventLoop = configuration.eventLoopGroup.next()
  230. self.eventLoop = eventLoop
  231. self.state = .idle
  232. self.monitor = ConnectivityStateMonitor(
  233. delegate: configuration.connectivityStateDelegate,
  234. queue: configuration.connectivityStateDelegateQueue
  235. )
  236. self.configuration = configuration
  237. self.channelProvider = channelProvider
  238. self.connectionID = connectionID
  239. self.channelNumber = channelNumber
  240. self.logger = logger
  241. }
  242. /// Get the multiplexer from the underlying channel handling gRPC calls.
  243. /// if the `ConnectionManager` was configured to be `fastFailure` this will have
  244. /// one chance to connect - if not reconnections are managed here.
  245. internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  246. func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  247. switch self.configuration.callStartBehavior.wrapped {
  248. case .waitsForConnectivity:
  249. return self.getHTTP2MultiplexerPatient()
  250. case .fastFailure:
  251. return self.getHTTP2MultiplexerOptimistic()
  252. }
  253. }
  254. if self.eventLoop.inEventLoop {
  255. return getHTTP2Multiplexer0()
  256. } else {
  257. return self.eventLoop.flatSubmit {
  258. getHTTP2Multiplexer0()
  259. }
  260. }
  261. }
  262. /// Returns a future for the multiplexer which succeeded when the channel is connected.
  263. /// Reconnects are handled if necessary.
  264. private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  265. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  266. switch self.state {
  267. case .idle:
  268. self.startConnecting()
  269. // We started connecting so we must transition to the `connecting` state.
  270. guard case let .connecting(connecting) = self.state else {
  271. self.invalidState()
  272. }
  273. multiplexer = connecting.readyChannelMuxPromise.futureResult
  274. case let .connecting(state):
  275. multiplexer = state.readyChannelMuxPromise.futureResult
  276. case let .active(state):
  277. multiplexer = state.readyChannelMuxPromise.futureResult
  278. case let .ready(state):
  279. multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
  280. case let .transientFailure(state):
  281. multiplexer = state.readyChannelMuxPromise.futureResult
  282. case let .shutdown(state):
  283. multiplexer = self.eventLoop.makeFailedFuture(state.reason)
  284. }
  285. self.logger.debug("vending multiplexer future", metadata: [
  286. "connectivity_state": "\(self.state.label)",
  287. ])
  288. return multiplexer
  289. }
  290. /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
  291. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  292. ///
  293. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  294. private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
  295. // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
  296. self.eventLoop.preconditionInEventLoop()
  297. let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
  298. switch self.state {
  299. case .idle:
  300. self.startConnecting()
  301. // We started connecting so we must transition to the `connecting` state.
  302. guard case let .connecting(connecting) = self.state else {
  303. self.invalidState()
  304. }
  305. return connecting.candidateMuxPromise.futureResult
  306. case let .connecting(state):
  307. return state.candidateMuxPromise.futureResult
  308. case let .active(active):
  309. return self.eventLoop.makeSucceededFuture(active.multiplexer)
  310. case let .ready(ready):
  311. return self.eventLoop.makeSucceededFuture(ready.multiplexer)
  312. case let .transientFailure(state):
  313. // Provide the reason we failed transiently, if we can.
  314. let error = state.reason ?? GRPCStatus(
  315. code: .unavailable,
  316. message: "Connection multiplexer requested while backing off"
  317. )
  318. return self.eventLoop.makeFailedFuture(error)
  319. case let .shutdown(state):
  320. return self.eventLoop.makeFailedFuture(state.reason)
  321. }
  322. }()
  323. self.logger.debug("vending fast-failing multiplexer future", metadata: [
  324. "connectivity_state": "\(self.state.label)",
  325. ])
  326. return muxFuture
  327. }
  328. /// Shutdown any connection which exists. This is a request from the application.
  329. internal func shutdown() -> EventLoopFuture<Void> {
  330. return self.eventLoop.flatSubmit {
  331. self.logger.debug("shutting down connection", metadata: [
  332. "connectivity_state": "\(self.state.label)",
  333. ])
  334. let shutdown: ShutdownState
  335. switch self.state {
  336. // We don't have a channel and we don't want one, easy!
  337. case .idle:
  338. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  339. self.state = .shutdown(shutdown)
  340. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  341. // the shutdown future and deal with any fallout from the connecting channel without the
  342. // application knowing.
  343. case let .connecting(state):
  344. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  345. self.state = .shutdown(shutdown)
  346. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  347. // connect the application shouldn't have access to the channel or multiplexer.
  348. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  349. state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  350. // In case we do successfully connect, close immediately.
  351. state.candidate.whenSuccess {
  352. $0.close(mode: .all, promise: nil)
  353. }
  354. // We have an active channel but the application doesn't know about it yet. We'll do the same
  355. // as for `.connecting`.
  356. case let .active(state):
  357. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  358. self.state = .shutdown(shutdown)
  359. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  360. // connect the application shouldn't have access to the channel or multiplexer.
  361. state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  362. // We have a channel, close it.
  363. state.candidate.close(mode: .all, promise: nil)
  364. // The channel is up and running: the application could be using it. We can close it and
  365. // return the `closeFuture`.
  366. case let .ready(state):
  367. shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
  368. self.state = .shutdown(shutdown)
  369. // We have a channel, close it.
  370. state.channel.close(mode: .all, promise: nil)
  371. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  372. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  373. // if we cancelled too late.
  374. case let .transientFailure(state):
  375. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  376. self.state = .shutdown(shutdown)
  377. // Stop the creation of a new channel, if we can. If we can't then the task to
  378. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  379. state.scheduled.cancel()
  380. // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
  381. // connect the application shouldn't should have access to the channel.
  382. state.readyChannelMuxPromise.fail(shutdown.reason)
  383. // We're already shutdown; nothing to do.
  384. case let .shutdown(state):
  385. shutdown = state
  386. }
  387. return shutdown.closeFuture
  388. }
  389. }
  390. // MARK: - State changes from the channel handler.
  391. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  392. /// some context.
  393. internal func channelError(_ error: Error) {
  394. self.eventLoop.preconditionInEventLoop()
  395. switch self.state {
  396. // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
  397. // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
  398. // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
  399. case .idle:
  400. self.logger.warning("ignoring unexpected error in idle", metadata: [
  401. MetadataKey.error: "\(error)",
  402. ])
  403. case .connecting:
  404. self.invalidState()
  405. case var .active(state):
  406. state.error = error
  407. self.state = .active(state)
  408. case var .ready(state):
  409. state.error = error
  410. self.state = .ready(state)
  411. // If we've already in one of these states, then additional errors aren't helpful to us.
  412. case .transientFailure, .shutdown:
  413. ()
  414. }
  415. }
  416. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  417. internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
  418. self.eventLoop.preconditionInEventLoop()
  419. self.logger.debug("activating connection", metadata: [
  420. "connectivity_state": "\(self.state.label)",
  421. ])
  422. switch self.state {
  423. case let .connecting(connecting):
  424. let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
  425. self.state = .active(connected)
  426. // Optimistic connections are happy this this level of setup.
  427. connecting.candidateMuxPromise.succeed(multiplexer)
  428. // Application called shutdown before the channel become active; we should close it.
  429. case .shutdown:
  430. channel.close(mode: .all, promise: nil)
  431. // These cases are purposefully separated: some crash reporting services provide stack traces
  432. // which don't include the precondition failure message (which contain the invalid state we were
  433. // in). Keeping the cases separate allows us work out the state from the line number.
  434. case .idle:
  435. self.invalidState()
  436. case .active:
  437. self.invalidState()
  438. case .ready:
  439. self.invalidState()
  440. case .transientFailure:
  441. self.invalidState()
  442. }
  443. }
  444. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  445. /// Must be called on the `EventLoop`.
  446. internal func channelInactive() {
  447. self.eventLoop.preconditionInEventLoop()
  448. self.logger.debug("deactivating connection", metadata: [
  449. "connectivity_state": "\(self.state.label)",
  450. ])
  451. switch self.state {
  452. // The channel is `active` but not `ready`. Should we try again?
  453. case let .active(active):
  454. let error = GRPCStatus(
  455. code: .unavailable,
  456. message: "The connection was dropped and connection re-establishment is disabled"
  457. )
  458. switch active.reconnect {
  459. // No, shutdown instead.
  460. case .none:
  461. self.logger.debug("shutting down connection")
  462. let shutdownState = ShutdownState(
  463. closeFuture: self.eventLoop.makeSucceededFuture(()),
  464. reason: error
  465. )
  466. self.state = .shutdown(shutdownState)
  467. active.readyChannelMuxPromise.fail(error)
  468. // Yes, after some time.
  469. case let .after(delay):
  470. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  471. self.startConnecting()
  472. }
  473. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  474. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  475. }
  476. // The channel was ready and working fine but something went wrong. Should we try to replace
  477. // the channel?
  478. case let .ready(ready):
  479. // No, no backoff is configured.
  480. if self.configuration.connectionBackoff == nil {
  481. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  482. self.state = .shutdown(
  483. ShutdownState(
  484. closeFuture: ready.channel.closeFuture,
  485. reason: GRPCStatus(
  486. code: .unavailable,
  487. message: "The connection was dropped and a reconnect was not configured"
  488. )
  489. )
  490. )
  491. } else {
  492. // Yes, start connecting now. We should go via `transientFailure`, however.
  493. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  494. self.startConnecting()
  495. }
  496. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  497. let backoffIterator = self.configuration.connectionBackoff?.makeIterator()
  498. self.state = .transientFailure(TransientFailureState(
  499. from: ready,
  500. scheduled: scheduled,
  501. backoffIterator: backoffIterator
  502. ))
  503. }
  504. // This is fine: we expect the channel to become inactive after becoming idle.
  505. case .idle:
  506. ()
  507. // We're already shutdown, that's fine.
  508. case .shutdown:
  509. ()
  510. // These cases are purposefully separated: some crash reporting services provide stack traces
  511. // which don't include the precondition failure message (which contain the invalid state we were
  512. // in). Keeping the cases separate allows us work out the state from the line number.
  513. case .connecting:
  514. self.invalidState()
  515. case .transientFailure:
  516. self.invalidState()
  517. }
  518. }
  519. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  520. /// called on the `EventLoop`.
  521. internal func ready() {
  522. self.eventLoop.preconditionInEventLoop()
  523. self.logger.debug("connection ready", metadata: [
  524. "connectivity_state": "\(self.state.label)",
  525. ])
  526. switch self.state {
  527. case let .active(connected):
  528. self.state = .ready(ReadyState(from: connected))
  529. connected.readyChannelMuxPromise.succeed(connected.multiplexer)
  530. case .shutdown:
  531. ()
  532. // These cases are purposefully separated: some crash reporting services provide stack traces
  533. // which don't include the precondition failure message (which contain the invalid state we were
  534. // in). Keeping the cases separate allows us work out the state from the line number.
  535. case .idle:
  536. self.invalidState()
  537. case .transientFailure:
  538. self.invalidState()
  539. case .connecting:
  540. self.invalidState()
  541. case .ready:
  542. self.invalidState()
  543. }
  544. }
  545. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  546. /// the `EventLoop`.
  547. internal func idle() {
  548. self.eventLoop.preconditionInEventLoop()
  549. self.logger.debug("idling connection", metadata: [
  550. "connectivity_state": "\(self.state.label)",
  551. ])
  552. switch self.state {
  553. case let .active(state):
  554. // This state is reachable if the keepalive timer fires before we reach the ready state.
  555. self.state = .idle
  556. state.readyChannelMuxPromise
  557. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  558. case .ready:
  559. self.state = .idle
  560. case .shutdown:
  561. // This is expected when the connection is closed by the user: when the channel becomes
  562. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  563. // 'channelInactive()'.
  564. ()
  565. // These cases are purposefully separated: some crash reporting services provide stack traces
  566. // which don't include the precondition failure message (which contain the invalid state we were
  567. // in). Keeping the cases separate allows us work out the state from the line number.
  568. case .idle:
  569. self.invalidState()
  570. case .connecting:
  571. self.invalidState()
  572. case .transientFailure:
  573. self.invalidState()
  574. }
  575. }
  576. /// The connection has started quiescing: notify the connectivity monitor of this.
  577. internal func beginQuiescing() {
  578. self.monitor.beginQuiescing()
  579. }
  580. }
  581. extension ConnectionManager {
  582. // A connection attempt failed; we never established a connection.
  583. private func connectionFailed(withError error: Error) {
  584. self.eventLoop.preconditionInEventLoop()
  585. switch self.state {
  586. case let .connecting(connecting):
  587. // Should we reconnect?
  588. switch connecting.reconnect {
  589. // No, shutdown.
  590. case .none:
  591. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  592. self.state = .shutdown(
  593. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  594. )
  595. connecting.readyChannelMuxPromise.fail(error)
  596. connecting.candidateMuxPromise.fail(error)
  597. // Yes, after a delay.
  598. case let .after(delay):
  599. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  600. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  601. self.startConnecting()
  602. }
  603. self.state = .transientFailure(
  604. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  605. )
  606. // Candidate mux users are not willing to wait.
  607. connecting.candidateMuxPromise.fail(error)
  608. }
  609. // The application must have called shutdown while we were trying to establish a connection
  610. // which was doomed to fail anyway. That's fine, we can ignore this.
  611. case .shutdown:
  612. ()
  613. // We can't fail to connect if we aren't trying.
  614. //
  615. // These cases are purposefully separated: some crash reporting services provide stack traces
  616. // which don't include the precondition failure message (which contain the invalid state we were
  617. // in). Keeping the cases separate allows us work out the state from the line number.
  618. case .idle:
  619. self.invalidState()
  620. case .active:
  621. self.invalidState()
  622. case .ready:
  623. self.invalidState()
  624. case .transientFailure:
  625. self.invalidState()
  626. }
  627. }
  628. }
  629. extension ConnectionManager {
  630. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  631. // states. Must be called on the `EventLoop`.
  632. private func startConnecting() {
  633. switch self.state {
  634. case .idle:
  635. let iterator = self.configuration.connectionBackoff?.makeIterator()
  636. self.startConnecting(
  637. backoffIterator: iterator,
  638. muxPromise: self.eventLoop.makePromise()
  639. )
  640. case let .transientFailure(pending):
  641. self.startConnecting(
  642. backoffIterator: pending.backoffIterator,
  643. muxPromise: pending.readyChannelMuxPromise
  644. )
  645. // We shutdown before a scheduled connection attempt had started.
  646. case .shutdown:
  647. ()
  648. // These cases are purposefully separated: some crash reporting services provide stack traces
  649. // which don't include the precondition failure message (which contain the invalid state we were
  650. // in). Keeping the cases separate allows us work out the state from the line number.
  651. case .connecting:
  652. self.invalidState()
  653. case .active:
  654. self.invalidState()
  655. case .ready:
  656. self.invalidState()
  657. }
  658. }
  659. private func startConnecting(
  660. backoffIterator: ConnectionBackoffIterator?,
  661. muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
  662. ) {
  663. let timeoutAndBackoff = backoffIterator?.next()
  664. // We're already on the event loop: submit the connect so it starts after we've made the
  665. // state change to `.connecting`.
  666. self.eventLoop.assertInEventLoop()
  667. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  668. let channel = self.makeChannel(
  669. connectTimeout: timeoutAndBackoff?.timeout
  670. )
  671. channel.whenFailure { error in
  672. self.connectionFailed(withError: error)
  673. }
  674. return channel
  675. }
  676. // Should we reconnect if the candidate channel fails?
  677. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  678. let connecting = ConnectingState(
  679. backoffIterator: backoffIterator,
  680. reconnect: reconnect,
  681. candidate: candidate,
  682. readyChannelMuxPromise: muxPromise,
  683. candidateMuxPromise: self.eventLoop.makePromise()
  684. )
  685. self.state = .connecting(connecting)
  686. }
  687. }
  688. extension ConnectionManager {
  689. private func invalidState(
  690. function: StaticString = #function,
  691. file: StaticString = #file,
  692. line: UInt = #line
  693. ) -> Never {
  694. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  695. }
  696. }
  697. extension ConnectionManager {
  698. private func makeBootstrap(
  699. connectTimeout: TimeInterval?
  700. ) -> ClientBootstrapProtocol {
  701. let serverHostname: String? = self.configuration.tls.flatMap { tls -> String? in
  702. if let hostnameOverride = tls.hostnameOverride {
  703. return hostnameOverride
  704. } else {
  705. return configuration.target.host
  706. }
  707. }.flatMap { hostname in
  708. if hostname.isIPAddress {
  709. return nil
  710. } else {
  711. return hostname
  712. }
  713. }
  714. let bootstrap = PlatformSupport.makeClientBootstrap(group: self.eventLoop, logger: self.logger)
  715. .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  716. .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  717. .channelInitializer { channel in
  718. let initialized = channel.configureGRPCClient(
  719. httpTargetWindowSize: self.configuration.httpTargetWindowSize,
  720. tlsConfiguration: self.configuration.tls?.configuration,
  721. tlsServerHostname: serverHostname,
  722. connectionManager: self,
  723. connectionKeepalive: self.configuration.connectionKeepalive,
  724. connectionIdleTimeout: self.configuration.connectionIdleTimeout,
  725. errorDelegate: self.configuration.errorDelegate,
  726. requiresZeroLengthWriteWorkaround: PlatformSupport.requiresZeroLengthWriteWorkaround(
  727. group: self.eventLoop,
  728. hasTLS: self.configuration.tls != nil
  729. ),
  730. logger: self.logger,
  731. customVerificationCallback: self.configuration.tls?.customVerificationCallback
  732. )
  733. // Run the debug initializer, if there is one.
  734. if let debugInitializer = self.configuration.debugChannelInitializer {
  735. return initialized.flatMap {
  736. debugInitializer(channel)
  737. }
  738. } else {
  739. return initialized
  740. }
  741. }
  742. if let connectTimeout = connectTimeout {
  743. return bootstrap.connectTimeout(.seconds(timeInterval: connectTimeout))
  744. } else {
  745. return bootstrap
  746. }
  747. }
  748. private func makeChannel(
  749. connectTimeout: TimeInterval?
  750. ) -> EventLoopFuture<Channel> {
  751. if let provider = self.channelProvider {
  752. return provider()
  753. } else {
  754. let bootstrap = self.makeBootstrap(
  755. connectTimeout: connectTimeout
  756. )
  757. return bootstrap.connect(to: self.configuration.target)
  758. }
  759. }
  760. }