ConnectionManager.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOConcurrencyHelpers
  20. internal class ConnectionManager {
  21. internal struct IdleState {
  22. var configuration: ClientConnection.Configuration
  23. }
  24. internal enum Reconnect {
  25. case none
  26. case after(TimeInterval)
  27. }
  28. internal struct ConnectingState {
  29. var configuration: ClientConnection.Configuration
  30. var backoffIterator: ConnectionBackoffIterator?
  31. var reconnect: Reconnect
  32. var readyChannelPromise: EventLoopPromise<Channel>
  33. var candidate: EventLoopFuture<Channel>
  34. }
  35. internal struct ConnectedState {
  36. var configuration: ClientConnection.Configuration
  37. var backoffIterator: ConnectionBackoffIterator?
  38. var reconnect: Reconnect
  39. var readyChannelPromise: EventLoopPromise<Channel>
  40. var candidate: Channel
  41. var error: Error?
  42. init(from state: ConnectingState, candidate: Channel) {
  43. self.configuration = state.configuration
  44. self.backoffIterator = state.backoffIterator
  45. self.reconnect = state.reconnect
  46. self.readyChannelPromise = state.readyChannelPromise
  47. self.candidate = candidate
  48. }
  49. }
  50. internal struct ReadyState {
  51. var configuration: ClientConnection.Configuration
  52. var channel: Channel
  53. var error: Error?
  54. init(from state: ConnectedState) {
  55. self.configuration = state.configuration
  56. self.channel = state.candidate
  57. }
  58. }
  59. internal struct TransientFailureState {
  60. var configuration: ClientConnection.Configuration
  61. var backoffIterator: ConnectionBackoffIterator?
  62. var readyChannelPromise: EventLoopPromise<Channel>
  63. var scheduled: Scheduled<Void>
  64. var reason: Error?
  65. init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
  66. self.configuration = state.configuration
  67. self.backoffIterator = state.backoffIterator
  68. self.readyChannelPromise = state.readyChannelPromise
  69. self.scheduled = scheduled
  70. self.reason = reason
  71. }
  72. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  73. self.configuration = state.configuration
  74. self.backoffIterator = state.backoffIterator
  75. self.readyChannelPromise = state.readyChannelPromise
  76. self.scheduled = scheduled
  77. self.reason = state.error
  78. }
  79. init(from state: ReadyState, scheduled: Scheduled<Void>) {
  80. self.configuration = state.configuration
  81. self.backoffIterator = state.configuration.connectionBackoff?.makeIterator()
  82. self.readyChannelPromise = state.channel.eventLoop.makePromise()
  83. self.scheduled = scheduled
  84. self.reason = state.error
  85. }
  86. }
  87. internal struct ShutdownState {
  88. var closeFuture: EventLoopFuture<Void>
  89. /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
  90. /// this error.
  91. var reason: Error
  92. init(closeFuture: EventLoopFuture<Void>, reason: Error) {
  93. self.closeFuture = closeFuture
  94. self.reason = reason
  95. }
  96. static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
  97. return ShutdownState(
  98. closeFuture: closeFuture,
  99. reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
  100. )
  101. }
  102. }
  103. internal enum State {
  104. /// No `Channel` is required.
  105. ///
  106. /// Valid next states:
  107. /// - `connecting`
  108. /// - `shutdown`
  109. case idle(IdleState)
  110. /// We're actively trying to establish a connection.
  111. ///
  112. /// Valid next states:
  113. /// - `active`
  114. /// - `transientFailure` (if our attempt fails and we're going to try again)
  115. /// - `shutdown`
  116. case connecting(ConnectingState)
  117. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  118. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  119. ///
  120. /// Valid next states:
  121. /// - `ready`
  122. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  123. /// to re-establish the connection)
  124. /// - `shutdown`
  125. case active(ConnectedState)
  126. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  127. /// the channel for making RPCs.
  128. ///
  129. /// Valid next states:
  130. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  131. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  132. /// - `shutdown`
  133. case ready(ReadyState)
  134. /// A `Channel` is desired, we'll attempt to create one in the future.
  135. ///
  136. /// Valid next states:
  137. /// - `connecting`
  138. /// - `shutdown`
  139. case transientFailure(TransientFailureState)
  140. /// We never want another `Channel`: this state is terminal.
  141. case shutdown(ShutdownState)
  142. fileprivate var label: String {
  143. switch self {
  144. case .idle:
  145. return "idle"
  146. case .connecting:
  147. return "connecting"
  148. case .active:
  149. return "active"
  150. case .ready:
  151. return "ready"
  152. case .transientFailure:
  153. return "transientFailure"
  154. case .shutdown:
  155. return "shutdown"
  156. }
  157. }
  158. }
  159. private var state: State {
  160. didSet {
  161. switch self.state {
  162. case .idle:
  163. self.monitor.updateState(to: .idle, logger: self.logger)
  164. self.updateConnectionID()
  165. case .connecting:
  166. self.monitor.updateState(to: .connecting, logger: self.logger)
  167. // This is an internal state.
  168. case .active:
  169. ()
  170. case .ready:
  171. self.monitor.updateState(to: .ready, logger: self.logger)
  172. case .transientFailure:
  173. self.monitor.updateState(to: .transientFailure, logger: self.logger)
  174. self.updateConnectionID()
  175. case .shutdown:
  176. self.monitor.updateState(to: .shutdown, logger: self.logger)
  177. }
  178. }
  179. }
  180. internal let eventLoop: EventLoop
  181. internal let monitor: ConnectivityStateMonitor
  182. internal var logger: Logger
  183. private let connectionID: String
  184. private var channelNumber: UInt64
  185. private var channelNumberLock = Lock()
  186. private var _connectionIDAndNumber: String {
  187. return "\(self.connectionID)/\(self.channelNumber)"
  188. }
  189. private var connectionIDAndNumber: String {
  190. return self.channelNumberLock.withLock {
  191. return self._connectionIDAndNumber
  192. }
  193. }
  194. private func updateConnectionID() {
  195. self.channelNumberLock.withLockVoid {
  196. self.channelNumber &+= 1
  197. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  198. }
  199. }
  200. internal func appendMetadata(to logger: inout Logger) {
  201. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  202. }
  203. // Only used for testing.
  204. private var channelProvider: (() -> EventLoopFuture<Channel>)?
  205. internal convenience init(configuration: ClientConnection.Configuration, logger: Logger) {
  206. self.init(configuration: configuration, logger: logger, channelProvider: nil)
  207. }
  208. /// Create a `ConnectionManager` for testing: uses the given `channelProvider` to create channels.
  209. internal static func testingOnly(
  210. configuration: ClientConnection.Configuration,
  211. logger: Logger,
  212. channelProvider: @escaping () -> EventLoopFuture<Channel>
  213. ) -> ConnectionManager {
  214. return ConnectionManager(
  215. configuration: configuration,
  216. logger: logger,
  217. channelProvider: channelProvider
  218. )
  219. }
  220. private init(
  221. configuration: ClientConnection.Configuration,
  222. logger: Logger,
  223. channelProvider: (() -> EventLoopFuture<Channel>)?
  224. ) {
  225. // Setup the logger.
  226. var logger = logger
  227. let connectionID = UUID().uuidString
  228. let channelNumber: UInt64 = 0
  229. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  230. let eventLoop = configuration.eventLoopGroup.next()
  231. self.eventLoop = eventLoop
  232. self.state = .idle(IdleState(configuration: configuration))
  233. self.monitor = ConnectivityStateMonitor(
  234. delegate: configuration.connectivityStateDelegate,
  235. queue: configuration.connectivityStateDelegateQueue
  236. )
  237. self.channelProvider = channelProvider
  238. self.connectionID = connectionID
  239. self.channelNumber = channelNumber
  240. self.logger = logger
  241. }
  242. /// Returns a future for a connected channel.
  243. internal func getChannel() -> EventLoopFuture<Channel> {
  244. return self.eventLoop.flatSubmit {
  245. let channel: EventLoopFuture<Channel>
  246. switch self.state {
  247. case .idle:
  248. self.startConnecting()
  249. // We started connecting so we must transition to the `connecting` state.
  250. guard case let .connecting(connecting) = self.state else {
  251. self.invalidState()
  252. }
  253. channel = connecting.readyChannelPromise.futureResult
  254. case let .connecting(state):
  255. channel = state.readyChannelPromise.futureResult
  256. case let .active(state):
  257. channel = state.readyChannelPromise.futureResult
  258. case let .ready(state):
  259. channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
  260. case let .transientFailure(state):
  261. channel = state.readyChannelPromise.futureResult
  262. case let .shutdown(state):
  263. channel = self.eventLoop.makeFailedFuture(state.reason)
  264. }
  265. self.logger.debug("vending channel future", metadata: [
  266. "connectivity_state": "\(self.state.label)",
  267. ])
  268. return channel
  269. }
  270. }
  271. /// Returns a future for the current channel, or future channel from the current connection
  272. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  273. ///
  274. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  275. internal func getOptimisticChannel() -> EventLoopFuture<Channel> {
  276. return self.eventLoop.flatSubmit {
  277. let channel: EventLoopFuture<Channel>
  278. switch self.state {
  279. case .idle:
  280. self.startConnecting()
  281. // We started connecting so we must transition to the `connecting` state.
  282. guard case let .connecting(connecting) = self.state else {
  283. self.invalidState()
  284. }
  285. channel = connecting.candidate
  286. case let .connecting(state):
  287. channel = state.candidate
  288. case let .active(state):
  289. channel = state.candidate.eventLoop.makeSucceededFuture(state.candidate)
  290. case let .ready(state):
  291. channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
  292. case let .transientFailure(state):
  293. // Provide the reason we failed transiently, if we can.
  294. let error = state.reason ?? GRPCStatus(
  295. code: .unavailable,
  296. message: "Connection requested while backing off"
  297. )
  298. channel = self.eventLoop.makeFailedFuture(error)
  299. case let .shutdown(state):
  300. channel = self.eventLoop.makeFailedFuture(state.reason)
  301. }
  302. self.logger.debug("vending fast-failing channel future", metadata: [
  303. "connectivity_state": "\(self.state.label)",
  304. ])
  305. return channel
  306. }
  307. }
  308. /// Shutdown any connection which exists. This is a request from the application.
  309. internal func shutdown() -> EventLoopFuture<Void> {
  310. return self.eventLoop.flatSubmit {
  311. self.logger.debug("shutting down connection", metadata: [
  312. "connectivity_state": "\(self.state.label)",
  313. ])
  314. let shutdown: ShutdownState
  315. switch self.state {
  316. // We don't have a channel and we don't want one, easy!
  317. case .idle:
  318. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  319. self.state = .shutdown(shutdown)
  320. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  321. // the shutdown future and deal with any fallout from the connecting channel without the
  322. // application knowing.
  323. case let .connecting(state):
  324. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  325. self.state = .shutdown(shutdown)
  326. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  327. // connect the application shouldn't should have access to the channel.
  328. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  329. // In case we do successfully connect, close immediately.
  330. state.candidate.whenSuccess {
  331. $0.close(mode: .all, promise: nil)
  332. }
  333. // We have an active channel but the application doesn't know about it yet. We'll do the same
  334. // as for `.connecting`.
  335. case let .active(state):
  336. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  337. self.state = .shutdown(shutdown)
  338. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  339. // connect the application shouldn't should have access to the channel.
  340. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  341. // We have a channel, close it.
  342. state.candidate.close(mode: .all, promise: nil)
  343. // The channel is up and running: the application could be using it. We can close it and
  344. // return the `closeFuture`.
  345. case let .ready(state):
  346. shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
  347. self.state = .shutdown(shutdown)
  348. // We have a channel, close it.
  349. state.channel.close(mode: .all, promise: nil)
  350. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  351. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  352. // if we cancelled too late.
  353. case let .transientFailure(state):
  354. shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
  355. self.state = .shutdown(shutdown)
  356. // Stop the creation of a new channel, if we can. If we can't then the task to
  357. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  358. state.scheduled.cancel()
  359. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  360. // connect the application shouldn't should have access to the channel.
  361. state.readyChannelPromise.fail(shutdown.reason)
  362. // We're already shutdown; nothing to do.
  363. case let .shutdown(state):
  364. shutdown = state
  365. }
  366. return shutdown.closeFuture
  367. }
  368. }
  369. // MARK: - State changes from the channel handler.
  370. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
  371. /// some context.
  372. internal func channelError(_ error: Error) {
  373. self.eventLoop.preconditionInEventLoop()
  374. switch self.state {
  375. // These cases are purposefully separated: some crash reporting services provide stack traces
  376. // which don't include the precondition failure message (which contain the invalid state we were
  377. // in). Keeping the cases separate allows us work out the state from the line number.
  378. case .idle:
  379. self.invalidState()
  380. case .connecting:
  381. self.invalidState()
  382. case var .active(state):
  383. state.error = error
  384. self.state = .active(state)
  385. case var .ready(state):
  386. state.error = error
  387. self.state = .ready(state)
  388. // If we've already in one of these states, then additional errors aren't helpful to us.
  389. case .transientFailure, .shutdown:
  390. ()
  391. }
  392. }
  393. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  394. internal func channelActive(channel: Channel) {
  395. self.eventLoop.preconditionInEventLoop()
  396. self.logger.debug("activating connection", metadata: [
  397. "connectivity_state": "\(self.state.label)",
  398. ])
  399. switch self.state {
  400. case let .connecting(connecting):
  401. self.state = .active(ConnectedState(from: connecting, candidate: channel))
  402. // Application called shutdown before the channel become active; we should close it.
  403. case .shutdown:
  404. channel.close(mode: .all, promise: nil)
  405. // These cases are purposefully separated: some crash reporting services provide stack traces
  406. // which don't include the precondition failure message (which contain the invalid state we were
  407. // in). Keeping the cases separate allows us work out the state from the line number.
  408. case .idle:
  409. self.invalidState()
  410. case .active:
  411. self.invalidState()
  412. case .ready:
  413. self.invalidState()
  414. case .transientFailure:
  415. self.invalidState()
  416. }
  417. }
  418. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  419. /// Must be called on the `EventLoop`.
  420. internal func channelInactive() {
  421. self.eventLoop.preconditionInEventLoop()
  422. self.logger.debug("deactivating connection", metadata: [
  423. "connectivity_state": "\(self.state.label)",
  424. ])
  425. switch self.state {
  426. // The channel is `active` but not `ready`. Should we try again?
  427. case let .active(active):
  428. switch active.reconnect {
  429. // No, shutdown instead.
  430. case .none:
  431. self.logger.debug("shutting down connection")
  432. let error = GRPCStatus(
  433. code: .unavailable,
  434. message: "The connection was dropped and connection re-establishment is disabled"
  435. )
  436. let shutdownState = ShutdownState(
  437. closeFuture: self.eventLoop.makeSucceededFuture(()),
  438. reason: error
  439. )
  440. self.state = .shutdown(shutdownState)
  441. active.readyChannelPromise.fail(error)
  442. // Yes, after some time.
  443. case let .after(delay):
  444. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  445. self.startConnecting()
  446. }
  447. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  448. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  449. }
  450. // The channel was ready and working fine but something went wrong. Should we try to replace
  451. // the channel?
  452. case let .ready(ready):
  453. // No, no backoff is configured.
  454. if ready.configuration.connectionBackoff == nil {
  455. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  456. self.state = .shutdown(
  457. ShutdownState(
  458. closeFuture: ready.channel.closeFuture,
  459. reason: GRPCStatus(
  460. code: .unavailable,
  461. message: "The connection was dropped and a reconnect was not configured"
  462. )
  463. )
  464. )
  465. } else {
  466. // Yes, start connecting now. We should go via `transientFailure`, however.
  467. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  468. self.startConnecting()
  469. }
  470. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  471. self.state = .transientFailure(TransientFailureState(from: ready, scheduled: scheduled))
  472. }
  473. // This is fine: we expect the channel to become inactive after becoming idle.
  474. case .idle:
  475. ()
  476. // We're already shutdown, that's fine.
  477. case .shutdown:
  478. ()
  479. // These cases are purposefully separated: some crash reporting services provide stack traces
  480. // which don't include the precondition failure message (which contain the invalid state we were
  481. // in). Keeping the cases separate allows us work out the state from the line number.
  482. case .connecting:
  483. self.invalidState()
  484. case .transientFailure:
  485. self.invalidState()
  486. }
  487. }
  488. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  489. /// called on the `EventLoop`.
  490. internal func ready() {
  491. self.eventLoop.preconditionInEventLoop()
  492. self.logger.debug("connection ready", metadata: [
  493. "connectivity_state": "\(self.state.label)",
  494. ])
  495. switch self.state {
  496. case let .active(connected):
  497. self.state = .ready(ReadyState(from: connected))
  498. connected.readyChannelPromise.succeed(connected.candidate)
  499. case .shutdown:
  500. ()
  501. // These cases are purposefully separated: some crash reporting services provide stack traces
  502. // which don't include the precondition failure message (which contain the invalid state we were
  503. // in). Keeping the cases separate allows us work out the state from the line number.
  504. case .idle:
  505. self.invalidState()
  506. case .transientFailure:
  507. self.invalidState()
  508. case .connecting:
  509. self.invalidState()
  510. case .ready:
  511. self.invalidState()
  512. }
  513. }
  514. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  515. /// the `EventLoop`.
  516. internal func idle() {
  517. self.eventLoop.preconditionInEventLoop()
  518. self.logger.debug("idling connection", metadata: [
  519. "connectivity_state": "\(self.state.label)",
  520. ])
  521. switch self.state {
  522. case let .active(state):
  523. // This state is reachable if the keepalive timer fires before we reach the ready state.
  524. self.state = .idle(IdleState(configuration: state.configuration))
  525. state.readyChannelPromise
  526. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  527. case let .ready(state):
  528. self.state = .idle(IdleState(configuration: state.configuration))
  529. case .shutdown:
  530. // This is expected when the connection is closed by the user: when the channel becomes
  531. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  532. // 'channelInactive()'.
  533. ()
  534. // These cases are purposefully separated: some crash reporting services provide stack traces
  535. // which don't include the precondition failure message (which contain the invalid state we were
  536. // in). Keeping the cases separate allows us work out the state from the line number.
  537. case .idle:
  538. self.invalidState()
  539. case .connecting:
  540. self.invalidState()
  541. case .transientFailure:
  542. self.invalidState()
  543. }
  544. }
  545. }
  546. extension ConnectionManager {
  547. // A connection attempt failed; we never established a connection.
  548. private func connectionFailed(withError error: Error) {
  549. self.eventLoop.preconditionInEventLoop()
  550. switch self.state {
  551. case let .connecting(connecting):
  552. // Should we reconnect?
  553. switch connecting.reconnect {
  554. // No, shutdown.
  555. case .none:
  556. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  557. self.state = .shutdown(
  558. ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
  559. )
  560. connecting.readyChannelPromise.fail(error)
  561. // Yes, after a delay.
  562. case let .after(delay):
  563. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  564. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  565. self.startConnecting()
  566. }
  567. self.state = .transientFailure(
  568. TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
  569. )
  570. }
  571. // The application must have called shutdown while we were trying to establish a connection
  572. // which was doomed to fail anyway. That's fine, we can ignore this.
  573. case .shutdown:
  574. ()
  575. // We can't fail to connect if we aren't trying.
  576. //
  577. // These cases are purposefully separated: some crash reporting services provide stack traces
  578. // which don't include the precondition failure message (which contain the invalid state we were
  579. // in). Keeping the cases separate allows us work out the state from the line number.
  580. case .idle:
  581. self.invalidState()
  582. case .active:
  583. self.invalidState()
  584. case .ready:
  585. self.invalidState()
  586. case .transientFailure:
  587. self.invalidState()
  588. }
  589. }
  590. }
  591. extension ConnectionManager {
  592. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  593. // states. Must be called on the `EventLoop`.
  594. private func startConnecting() {
  595. switch self.state {
  596. case let .idle(state):
  597. let iterator = state.configuration.connectionBackoff?.makeIterator()
  598. self.startConnecting(
  599. configuration: state.configuration,
  600. backoffIterator: iterator,
  601. channelPromise: self.eventLoop.makePromise()
  602. )
  603. case let .transientFailure(pending):
  604. self.startConnecting(
  605. configuration: pending.configuration,
  606. backoffIterator: pending.backoffIterator,
  607. channelPromise: pending.readyChannelPromise
  608. )
  609. // We shutdown before a scheduled connection attempt had started.
  610. case .shutdown:
  611. ()
  612. // These cases are purposefully separated: some crash reporting services provide stack traces
  613. // which don't include the precondition failure message (which contain the invalid state we were
  614. // in). Keeping the cases separate allows us work out the state from the line number.
  615. case .connecting:
  616. self.invalidState()
  617. case .active:
  618. self.invalidState()
  619. case .ready:
  620. self.invalidState()
  621. }
  622. }
  623. private func startConnecting(
  624. configuration: ClientConnection.Configuration,
  625. backoffIterator: ConnectionBackoffIterator?,
  626. channelPromise: EventLoopPromise<Channel>
  627. ) {
  628. let timeoutAndBackoff = backoffIterator?.next()
  629. // We're already on the event loop: submit the connect so it starts after we've made the
  630. // state change to `.connecting`.
  631. self.eventLoop.assertInEventLoop()
  632. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  633. let channel = self.makeChannel(
  634. configuration: configuration,
  635. connectTimeout: timeoutAndBackoff?.timeout
  636. )
  637. channel.whenFailure { error in
  638. self.connectionFailed(withError: error)
  639. }
  640. return channel
  641. }
  642. // Should we reconnect if the candidate channel fails?
  643. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  644. let connecting = ConnectingState(
  645. configuration: configuration,
  646. backoffIterator: backoffIterator,
  647. reconnect: reconnect,
  648. readyChannelPromise: channelPromise,
  649. candidate: candidate
  650. )
  651. self.state = .connecting(connecting)
  652. }
  653. }
  654. extension ConnectionManager {
  655. private func invalidState(
  656. function: StaticString = #function,
  657. file: StaticString = #file,
  658. line: UInt = #line
  659. ) -> Never {
  660. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  661. }
  662. }
  663. extension ConnectionManager {
  664. private func makeBootstrap(
  665. configuration: ClientConnection.Configuration,
  666. connectTimeout: TimeInterval?
  667. ) -> ClientBootstrapProtocol {
  668. let serverHostname: String? = configuration.tls.flatMap { tls -> String? in
  669. if let hostnameOverride = tls.hostnameOverride {
  670. return hostnameOverride
  671. } else {
  672. return configuration.target.host
  673. }
  674. }.flatMap { hostname in
  675. if hostname.isIPAddress {
  676. return nil
  677. } else {
  678. return hostname
  679. }
  680. }
  681. let bootstrap = PlatformSupport.makeClientBootstrap(group: self.eventLoop, logger: self.logger)
  682. .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  683. .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  684. .channelInitializer { channel in
  685. let initialized = channel.configureGRPCClient(
  686. httpTargetWindowSize: configuration.httpTargetWindowSize,
  687. tlsConfiguration: configuration.tls?.configuration,
  688. tlsServerHostname: serverHostname,
  689. connectionManager: self,
  690. connectionKeepalive: configuration.connectionKeepalive,
  691. connectionIdleTimeout: configuration.connectionIdleTimeout,
  692. errorDelegate: configuration.errorDelegate,
  693. requiresZeroLengthWriteWorkaround: PlatformSupport.requiresZeroLengthWriteWorkaround(
  694. group: self.eventLoop,
  695. hasTLS: configuration.tls != nil
  696. ),
  697. logger: self.logger
  698. )
  699. // Run the debug initializer, if there is one.
  700. if let debugInitializer = configuration.debugChannelInitializer {
  701. return initialized.flatMap {
  702. debugInitializer(channel)
  703. }
  704. } else {
  705. return initialized
  706. }
  707. }
  708. if let connectTimeout = connectTimeout {
  709. return bootstrap.connectTimeout(.seconds(timeInterval: connectTimeout))
  710. } else {
  711. return bootstrap
  712. }
  713. }
  714. private func makeChannel(
  715. configuration: ClientConnection.Configuration,
  716. connectTimeout: TimeInterval?
  717. ) -> EventLoopFuture<Channel> {
  718. if let provider = self.channelProvider {
  719. return provider()
  720. } else {
  721. let bootstrap = self.makeBootstrap(
  722. configuration: configuration,
  723. connectTimeout: connectTimeout
  724. )
  725. return bootstrap.connect(to: configuration.target)
  726. }
  727. }
  728. }