2
0

ConnectionManager.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOConcurrencyHelpers
  20. internal class ConnectionManager {
  21. internal struct IdleState {
  22. var configuration: ClientConnection.Configuration
  23. }
  24. internal enum Reconnect {
  25. case none
  26. case after(TimeInterval)
  27. }
  28. internal struct ConnectingState {
  29. var configuration: ClientConnection.Configuration
  30. var backoffIterator: ConnectionBackoffIterator?
  31. var reconnect: Reconnect
  32. var readyChannelPromise: EventLoopPromise<Channel>
  33. var candidate: EventLoopFuture<Channel>
  34. }
  35. internal struct ConnectedState {
  36. var configuration: ClientConnection.Configuration
  37. var backoffIterator: ConnectionBackoffIterator?
  38. var reconnect: Reconnect
  39. var readyChannelPromise: EventLoopPromise<Channel>
  40. var candidate: Channel
  41. init(from state: ConnectingState, candidate: Channel) {
  42. self.configuration = state.configuration
  43. self.backoffIterator = state.backoffIterator
  44. self.reconnect = state.reconnect
  45. self.readyChannelPromise = state.readyChannelPromise
  46. self.candidate = candidate
  47. }
  48. }
  49. internal struct ReadyState {
  50. var configuration: ClientConnection.Configuration
  51. var channel: Channel
  52. init(from state: ConnectedState) {
  53. self.configuration = state.configuration
  54. self.channel = state.candidate
  55. }
  56. }
  57. internal struct TransientFailureState {
  58. var configuration: ClientConnection.Configuration
  59. var backoffIterator: ConnectionBackoffIterator?
  60. var readyChannelPromise: EventLoopPromise<Channel>
  61. var scheduled: Scheduled<Void>
  62. init(from state: ConnectingState, scheduled: Scheduled<Void>) {
  63. self.configuration = state.configuration
  64. self.backoffIterator = state.backoffIterator
  65. self.readyChannelPromise = state.readyChannelPromise
  66. self.scheduled = scheduled
  67. }
  68. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  69. self.configuration = state.configuration
  70. self.backoffIterator = state.backoffIterator
  71. self.readyChannelPromise = state.readyChannelPromise
  72. self.scheduled = scheduled
  73. }
  74. init(from state: ReadyState, scheduled: Scheduled<Void>) {
  75. self.configuration = state.configuration
  76. self.backoffIterator = state.configuration.connectionBackoff?.makeIterator()
  77. self.readyChannelPromise = state.channel.eventLoop.makePromise()
  78. self.scheduled = scheduled
  79. }
  80. }
  81. internal struct ShutdownState {
  82. var closeFuture: EventLoopFuture<Void>
  83. }
  84. internal enum State {
  85. /// No `Channel` is required.
  86. ///
  87. /// Valid next states:
  88. /// - `connecting`
  89. /// - `shutdown`
  90. case idle(IdleState)
  91. /// We're actively trying to establish a connection.
  92. ///
  93. /// Valid next states:
  94. /// - `active`
  95. /// - `transientFailure` (if our attempt fails and we're going to try again)
  96. /// - `shutdown`
  97. case connecting(ConnectingState)
  98. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  99. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  100. ///
  101. /// Valid next states:
  102. /// - `ready`
  103. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  104. /// to re-establish the connection)
  105. /// - `shutdown`
  106. case active(ConnectedState)
  107. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  108. /// the channel for making RPCs.
  109. ///
  110. /// Valid next states:
  111. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  112. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  113. /// - `shutdown`
  114. case ready(ReadyState)
  115. /// A `Channel` is desired, we'll attempt to create one in the future.
  116. ///
  117. /// Valid next states:
  118. /// - `connecting`
  119. /// - `shutdown`
  120. case transientFailure(TransientFailureState)
  121. /// We never want another `Channel`: this state is terminal.
  122. case shutdown(ShutdownState)
  123. fileprivate var label: String {
  124. switch self {
  125. case .idle:
  126. return "idle"
  127. case .connecting:
  128. return "connecting"
  129. case .active:
  130. return "active"
  131. case .ready:
  132. return "ready"
  133. case .transientFailure:
  134. return "transientFailure"
  135. case .shutdown:
  136. return "shutdown"
  137. }
  138. }
  139. }
  140. private var state: State {
  141. didSet {
  142. switch self.state {
  143. case .idle:
  144. self.monitor.updateState(to: .idle, logger: self.logger)
  145. self.updateConnectionID()
  146. case .connecting:
  147. self.monitor.updateState(to: .connecting, logger: self.logger)
  148. // This is an internal state.
  149. case .active:
  150. ()
  151. case .ready:
  152. self.monitor.updateState(to: .ready, logger: self.logger)
  153. case .transientFailure:
  154. self.monitor.updateState(to: .transientFailure, logger: self.logger)
  155. self.updateConnectionID()
  156. case .shutdown:
  157. self.monitor.updateState(to: .shutdown, logger: self.logger)
  158. }
  159. }
  160. }
  161. internal let eventLoop: EventLoop
  162. internal let monitor: ConnectivityStateMonitor
  163. internal var logger: Logger
  164. private let connectionID: String
  165. private var channelNumber: UInt64
  166. private var channelNumberLock = Lock()
  167. private var _connectionIDAndNumber: String {
  168. return "\(self.connectionID)/\(self.channelNumber)"
  169. }
  170. private var connectionIDAndNumber: String {
  171. return self.channelNumberLock.withLock {
  172. return self._connectionIDAndNumber
  173. }
  174. }
  175. private func updateConnectionID() {
  176. self.channelNumberLock.withLockVoid {
  177. self.channelNumber &+= 1
  178. self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
  179. }
  180. }
  181. internal func appendMetadata(to logger: inout Logger) {
  182. logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
  183. }
  184. // Only used for testing.
  185. private var channelProvider: (() -> EventLoopFuture<Channel>)?
  186. internal convenience init(configuration: ClientConnection.Configuration, logger: Logger) {
  187. self.init(configuration: configuration, logger: logger, channelProvider: nil)
  188. }
  189. /// Create a `ConnectionManager` for testing: uses the given `channelProvider` to create channels.
  190. internal static func testingOnly(
  191. configuration: ClientConnection.Configuration,
  192. logger: Logger,
  193. channelProvider: @escaping () -> EventLoopFuture<Channel>
  194. ) -> ConnectionManager {
  195. return ConnectionManager(
  196. configuration: configuration,
  197. logger: logger,
  198. channelProvider: channelProvider
  199. )
  200. }
  201. private init(
  202. configuration: ClientConnection.Configuration,
  203. logger: Logger,
  204. channelProvider: (() -> EventLoopFuture<Channel>)?
  205. ) {
  206. // Setup the logger.
  207. var logger = logger
  208. let connectionID = UUID().uuidString
  209. let channelNumber: UInt64 = 0
  210. logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
  211. let eventLoop = configuration.eventLoopGroup.next()
  212. self.eventLoop = eventLoop
  213. self.state = .idle(IdleState(configuration: configuration))
  214. self.monitor = ConnectivityStateMonitor(
  215. delegate: configuration.connectivityStateDelegate,
  216. queue: configuration.connectivityStateDelegateQueue
  217. )
  218. self.channelProvider = channelProvider
  219. self.connectionID = connectionID
  220. self.channelNumber = channelNumber
  221. self.logger = logger
  222. }
  223. /// Returns a future for a connected channel.
  224. internal func getChannel() -> EventLoopFuture<Channel> {
  225. return self.eventLoop.flatSubmit {
  226. let channel: EventLoopFuture<Channel>
  227. switch self.state {
  228. case .idle:
  229. self.startConnecting()
  230. // We started connecting so we must transition to the `connecting` state.
  231. guard case let .connecting(connecting) = self.state else {
  232. self.invalidState()
  233. }
  234. channel = connecting.readyChannelPromise.futureResult
  235. case let .connecting(state):
  236. channel = state.readyChannelPromise.futureResult
  237. case let .active(state):
  238. channel = state.readyChannelPromise.futureResult
  239. case let .ready(state):
  240. channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
  241. case let .transientFailure(state):
  242. channel = state.readyChannelPromise.futureResult
  243. case .shutdown:
  244. channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  245. }
  246. self.logger.debug("vending channel future", metadata: [
  247. "connectivity_state": "\(self.state.label)",
  248. ])
  249. return channel
  250. }
  251. }
  252. /// Returns a future for the current channel, or future channel from the current connection
  253. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  254. ///
  255. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  256. internal func getOptimisticChannel() -> EventLoopFuture<Channel> {
  257. return self.eventLoop.flatSubmit {
  258. let channel: EventLoopFuture<Channel>
  259. switch self.state {
  260. case .idle:
  261. self.startConnecting()
  262. // We started connecting so we must transition to the `connecting` state.
  263. guard case let .connecting(connecting) = self.state else {
  264. self.invalidState()
  265. }
  266. channel = connecting.candidate
  267. case let .connecting(state):
  268. channel = state.candidate
  269. case let .active(state):
  270. channel = state.candidate.eventLoop.makeSucceededFuture(state.candidate)
  271. case let .ready(state):
  272. channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
  273. case .transientFailure:
  274. channel = self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
  275. case .shutdown:
  276. channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  277. }
  278. self.logger.debug("vending fast-failing channel future", metadata: [
  279. "connectivity_state": "\(self.state.label)",
  280. ])
  281. return channel
  282. }
  283. }
  284. /// Shutdown any connection which exists. This is a request from the application.
  285. internal func shutdown() -> EventLoopFuture<Void> {
  286. return self.eventLoop.flatSubmit {
  287. self.logger.debug("shutting down connection", metadata: [
  288. "connectivity_state": "\(self.state.label)",
  289. ])
  290. let shutdown: ShutdownState
  291. switch self.state {
  292. // We don't have a channel and we don't want one, easy!
  293. case .idle:
  294. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  295. self.state = .shutdown(shutdown)
  296. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  297. // the shutdown future and deal with any fallout from the connecting channel without the
  298. // application knowing.
  299. case let .connecting(state):
  300. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  301. self.state = .shutdown(shutdown)
  302. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  303. // connect the application shouldn't should have access to the channel.
  304. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  305. // In case we do successfully connect, close immediately.
  306. state.candidate.whenSuccess {
  307. $0.close(mode: .all, promise: nil)
  308. }
  309. // We have an active channel but the application doesn't know about it yet. We'll do the same
  310. // as for `.connecting`.
  311. case let .active(state):
  312. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  313. self.state = .shutdown(shutdown)
  314. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  315. // connect the application shouldn't should have access to the channel.
  316. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  317. // We have a channel, close it.
  318. state.candidate.close(mode: .all, promise: nil)
  319. // The channel is up and running: the application could be using it. We can close it and
  320. // return the `closeFuture`.
  321. case let .ready(state):
  322. shutdown = ShutdownState(closeFuture: state.channel.closeFuture)
  323. self.state = .shutdown(shutdown)
  324. // We have a channel, close it.
  325. state.channel.close(mode: .all, promise: nil)
  326. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  327. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  328. // if we cancelled too late.
  329. case let .transientFailure(state):
  330. // Stop the creation of a new channel, if we can. If we can't then the task to
  331. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  332. state.scheduled.cancel()
  333. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  334. self.state = .shutdown(shutdown)
  335. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  336. // connect the application shouldn't should have access to the channel.
  337. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  338. // We're already shutdown; nothing to do.
  339. case let .shutdown(state):
  340. shutdown = state
  341. }
  342. return shutdown.closeFuture
  343. }
  344. }
  345. // MARK: - State changes from the channel handler.
  346. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  347. internal func channelActive(channel: Channel) {
  348. self.eventLoop.preconditionInEventLoop()
  349. self.logger.debug("activating connection", metadata: [
  350. "connectivity_state": "\(self.state.label)",
  351. ])
  352. switch self.state {
  353. case let .connecting(connecting):
  354. self.state = .active(ConnectedState(from: connecting, candidate: channel))
  355. // Application called shutdown before the channel become active; we should close it.
  356. case .shutdown:
  357. channel.close(mode: .all, promise: nil)
  358. // These cases are purposefully separated: some crash reporting services provide stack traces
  359. // which don't include the precondition failure message (which contain the invalid state we were
  360. // in). Keeping the cases separate allows us work out the state from the line number.
  361. case .idle:
  362. self.invalidState()
  363. case .active:
  364. self.invalidState()
  365. case .ready:
  366. self.invalidState()
  367. case .transientFailure:
  368. self.invalidState()
  369. }
  370. }
  371. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  372. /// Must be called on the `EventLoop`.
  373. internal func channelInactive() {
  374. self.eventLoop.preconditionInEventLoop()
  375. self.logger.debug("deactivating connection", metadata: [
  376. "connectivity_state": "\(self.state.label)",
  377. ])
  378. switch self.state {
  379. // The channel is `active` but not `ready`. Should we try again?
  380. case let .active(active):
  381. switch active.reconnect {
  382. // No, shutdown instead.
  383. case .none:
  384. self.logger.debug("shutting down connection")
  385. self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
  386. active.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  387. // Yes, after some time.
  388. case let .after(delay):
  389. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  390. self.startConnecting()
  391. }
  392. self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
  393. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  394. }
  395. // The channel was ready and working fine but something went wrong. Should we try to replace
  396. // the channel?
  397. case let .ready(ready):
  398. // No, no backoff is configured.
  399. if ready.configuration.connectionBackoff == nil {
  400. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  401. self.state = .shutdown(ShutdownState(closeFuture: ready.channel.closeFuture))
  402. } else {
  403. // Yes, start connecting now. We should go via `transientFailure`, however.
  404. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  405. self.startConnecting()
  406. }
  407. self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
  408. self.state = .transientFailure(TransientFailureState(from: ready, scheduled: scheduled))
  409. }
  410. // This is fine: we expect the channel to become inactive after becoming idle.
  411. case .idle:
  412. ()
  413. // We're already shutdown, that's fine.
  414. case .shutdown:
  415. ()
  416. // These cases are purposefully separated: some crash reporting services provide stack traces
  417. // which don't include the precondition failure message (which contain the invalid state we were
  418. // in). Keeping the cases separate allows us work out the state from the line number.
  419. case .connecting:
  420. self.invalidState()
  421. case .transientFailure:
  422. self.invalidState()
  423. }
  424. }
  425. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  426. /// called on the `EventLoop`.
  427. internal func ready() {
  428. self.eventLoop.preconditionInEventLoop()
  429. self.logger.debug("connection ready", metadata: [
  430. "connectivity_state": "\(self.state.label)",
  431. ])
  432. switch self.state {
  433. case let .active(connected):
  434. self.state = .ready(ReadyState(from: connected))
  435. connected.readyChannelPromise.succeed(connected.candidate)
  436. case .shutdown:
  437. ()
  438. // These cases are purposefully separated: some crash reporting services provide stack traces
  439. // which don't include the precondition failure message (which contain the invalid state we were
  440. // in). Keeping the cases separate allows us work out the state from the line number.
  441. case .idle:
  442. self.invalidState()
  443. case .transientFailure:
  444. self.invalidState()
  445. case .connecting:
  446. self.invalidState()
  447. case .ready:
  448. self.invalidState()
  449. }
  450. }
  451. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  452. /// the `EventLoop`.
  453. internal func idle() {
  454. self.eventLoop.preconditionInEventLoop()
  455. self.logger.debug("idling connection", metadata: [
  456. "connectivity_state": "\(self.state.label)",
  457. ])
  458. switch self.state {
  459. case let .active(state):
  460. // This state is reachable if the keepalive timer fires before we reach the ready state.
  461. self.state = .idle(IdleState(configuration: state.configuration))
  462. state.readyChannelPromise
  463. .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
  464. case let .ready(state):
  465. self.state = .idle(IdleState(configuration: state.configuration))
  466. case .shutdown:
  467. // This is expected when the connection is closed by the user: when the channel becomes
  468. // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
  469. // 'channelInactive()'.
  470. ()
  471. // These cases are purposefully separated: some crash reporting services provide stack traces
  472. // which don't include the precondition failure message (which contain the invalid state we were
  473. // in). Keeping the cases separate allows us work out the state from the line number.
  474. case .idle:
  475. self.invalidState()
  476. case .connecting:
  477. self.invalidState()
  478. case .transientFailure:
  479. self.invalidState()
  480. }
  481. }
  482. }
  483. extension ConnectionManager {
  484. // A connection attempt failed; we never established a connection.
  485. private func connectionFailed(withError error: Error) {
  486. self.eventLoop.preconditionInEventLoop()
  487. switch self.state {
  488. case let .connecting(connecting):
  489. // Should we reconnect?
  490. switch connecting.reconnect {
  491. // No, shutdown.
  492. case .none:
  493. self.logger.debug("shutting down connection, no reconnect configured/remaining")
  494. connecting.readyChannelPromise.fail(error)
  495. self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
  496. // Yes, after a delay.
  497. case let .after(delay):
  498. self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
  499. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  500. self.startConnecting()
  501. }
  502. self
  503. .state = .transientFailure(TransientFailureState(from: connecting, scheduled: scheduled))
  504. }
  505. // The application must have called shutdown while we were trying to establish a connection
  506. // which was doomed to fail anyway. That's fine, we can ignore this.
  507. case .shutdown:
  508. ()
  509. // We can't fail to connect if we aren't trying.
  510. //
  511. // These cases are purposefully separated: some crash reporting services provide stack traces
  512. // which don't include the precondition failure message (which contain the invalid state we were
  513. // in). Keeping the cases separate allows us work out the state from the line number.
  514. case .idle:
  515. self.invalidState()
  516. case .active:
  517. self.invalidState()
  518. case .ready:
  519. self.invalidState()
  520. case .transientFailure:
  521. self.invalidState()
  522. }
  523. }
  524. }
  525. extension ConnectionManager {
  526. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  527. // states. Must be called on the `EventLoop`.
  528. private func startConnecting() {
  529. switch self.state {
  530. case let .idle(state):
  531. let iterator = state.configuration.connectionBackoff?.makeIterator()
  532. self.startConnecting(
  533. configuration: state.configuration,
  534. backoffIterator: iterator,
  535. channelPromise: self.eventLoop.makePromise()
  536. )
  537. case let .transientFailure(pending):
  538. self.startConnecting(
  539. configuration: pending.configuration,
  540. backoffIterator: pending.backoffIterator,
  541. channelPromise: pending.readyChannelPromise
  542. )
  543. // We shutdown before a scheduled connection attempt had started.
  544. case .shutdown:
  545. ()
  546. // These cases are purposefully separated: some crash reporting services provide stack traces
  547. // which don't include the precondition failure message (which contain the invalid state we were
  548. // in). Keeping the cases separate allows us work out the state from the line number.
  549. case .connecting:
  550. self.invalidState()
  551. case .active:
  552. self.invalidState()
  553. case .ready:
  554. self.invalidState()
  555. }
  556. }
  557. private func startConnecting(
  558. configuration: ClientConnection.Configuration,
  559. backoffIterator: ConnectionBackoffIterator?,
  560. channelPromise: EventLoopPromise<Channel>
  561. ) {
  562. let timeoutAndBackoff = backoffIterator?.next()
  563. // We're already on the event loop: submit the connect so it starts after we've made the
  564. // state change to `.connecting`.
  565. self.eventLoop.assertInEventLoop()
  566. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  567. let channel = self.makeChannel(
  568. configuration: configuration,
  569. connectTimeout: timeoutAndBackoff?.timeout
  570. )
  571. channel.whenFailure { error in
  572. self.connectionFailed(withError: error)
  573. }
  574. return channel
  575. }
  576. // Should we reconnect if the candidate channel fails?
  577. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  578. let connecting = ConnectingState(
  579. configuration: configuration,
  580. backoffIterator: backoffIterator,
  581. reconnect: reconnect,
  582. readyChannelPromise: channelPromise,
  583. candidate: candidate
  584. )
  585. self.state = .connecting(connecting)
  586. }
  587. }
  588. extension ConnectionManager {
  589. private func invalidState(
  590. function: StaticString = #function,
  591. file: StaticString = #file,
  592. line: UInt = #line
  593. ) -> Never {
  594. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  595. }
  596. }
  597. extension ConnectionManager {
  598. private func makeBootstrap(
  599. configuration: ClientConnection.Configuration,
  600. connectTimeout: TimeInterval?
  601. ) -> ClientBootstrapProtocol {
  602. let serverHostname: String? = configuration.tls.flatMap { tls -> String? in
  603. if let hostnameOverride = tls.hostnameOverride {
  604. return hostnameOverride
  605. } else {
  606. return configuration.target.host
  607. }
  608. }.flatMap { hostname in
  609. if hostname.isIPAddress {
  610. return nil
  611. } else {
  612. return hostname
  613. }
  614. }
  615. let bootstrap = PlatformSupport.makeClientBootstrap(group: self.eventLoop, logger: self.logger)
  616. .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  617. .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  618. .channelInitializer { channel in
  619. let initialized = channel.configureGRPCClient(
  620. httpTargetWindowSize: configuration.httpTargetWindowSize,
  621. tlsConfiguration: configuration.tls?.configuration,
  622. tlsServerHostname: serverHostname,
  623. connectionManager: self,
  624. connectionKeepalive: configuration.connectionKeepalive,
  625. connectionIdleTimeout: configuration.connectionIdleTimeout,
  626. errorDelegate: configuration.errorDelegate,
  627. requiresZeroLengthWriteWorkaround: PlatformSupport.requiresZeroLengthWriteWorkaround(
  628. group: self.eventLoop,
  629. hasTLS: configuration.tls != nil
  630. ),
  631. logger: self.logger
  632. )
  633. // Run the debug initializer, if there is one.
  634. if let debugInitializer = configuration.debugChannelInitializer {
  635. return initialized.flatMap {
  636. debugInitializer(channel)
  637. }
  638. } else {
  639. return initialized
  640. }
  641. }
  642. if let connectTimeout = connectTimeout {
  643. return bootstrap.connectTimeout(.seconds(timeInterval: connectTimeout))
  644. } else {
  645. return bootstrap
  646. }
  647. }
  648. private func makeChannel(
  649. configuration: ClientConnection.Configuration,
  650. connectTimeout: TimeInterval?
  651. ) -> EventLoopFuture<Channel> {
  652. if let provider = self.channelProvider {
  653. return provider()
  654. } else {
  655. let bootstrap = self.makeBootstrap(
  656. configuration: configuration,
  657. connectTimeout: connectTimeout
  658. )
  659. return bootstrap.connect(to: configuration.target)
  660. }
  661. }
  662. }