ConnectionManager.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOConcurrencyHelpers
  18. import Logging
  19. import Foundation
  20. internal class ConnectionManager {
  21. internal struct IdleState {
  22. var configuration: ClientConnection.Configuration
  23. }
  24. internal enum Reconnect {
  25. case none
  26. case after(TimeInterval)
  27. }
  28. internal struct ConnectingState {
  29. var configuration: ClientConnection.Configuration
  30. var backoffIterator: ConnectionBackoffIterator?
  31. var reconnect: Reconnect
  32. var readyChannelPromise: EventLoopPromise<Channel>
  33. var candidate: EventLoopFuture<Channel>
  34. }
  35. internal struct ConnectedState {
  36. var configuration: ClientConnection.Configuration
  37. var backoffIterator: ConnectionBackoffIterator?
  38. var reconnect: Reconnect
  39. var readyChannelPromise: EventLoopPromise<Channel>
  40. var candidate: Channel
  41. init(from state: ConnectingState, candidate: Channel) {
  42. self.configuration = state.configuration
  43. self.backoffIterator = state.backoffIterator
  44. self.reconnect = state.reconnect
  45. self.readyChannelPromise = state.readyChannelPromise
  46. self.candidate = candidate
  47. }
  48. }
  49. internal struct ReadyState {
  50. var configuration: ClientConnection.Configuration
  51. var channel: Channel
  52. init(from state: ConnectedState) {
  53. self.configuration = state.configuration
  54. self.channel = state.candidate
  55. }
  56. }
  57. internal struct TransientFailureState {
  58. var configuration: ClientConnection.Configuration
  59. var backoffIterator: ConnectionBackoffIterator?
  60. var readyChannelPromise: EventLoopPromise<Channel>
  61. var scheduled: Scheduled<Void>
  62. init(from state: ConnectingState, scheduled: Scheduled<Void>) {
  63. self.configuration = state.configuration
  64. self.backoffIterator = state.backoffIterator
  65. self.readyChannelPromise = state.readyChannelPromise
  66. self.scheduled = scheduled
  67. }
  68. init(from state: ConnectedState, scheduled: Scheduled<Void>) {
  69. self.configuration = state.configuration
  70. self.backoffIterator = state.backoffIterator
  71. self.readyChannelPromise = state.readyChannelPromise
  72. self.scheduled = scheduled
  73. }
  74. init(from state: ReadyState, scheduled: Scheduled<Void>) {
  75. self.configuration = state.configuration
  76. self.backoffIterator = state.configuration.connectionBackoff?.makeIterator()
  77. self.readyChannelPromise = state.channel.eventLoop.makePromise()
  78. self.scheduled = scheduled
  79. }
  80. }
  81. internal struct ShutdownState {
  82. var closeFuture: EventLoopFuture<Void>
  83. }
  84. internal enum State {
  85. /// No `Channel` is required.
  86. ///
  87. /// Valid next states:
  88. /// - `connecting`
  89. /// - `shutdown`
  90. case idle(IdleState)
  91. /// We're actively trying to establish a connection.
  92. ///
  93. /// Valid next states:
  94. /// - `active`
  95. /// - `transientFailure` (if our attempt fails and we're going to try again)
  96. /// - `shutdown`
  97. case connecting(ConnectingState)
  98. /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
  99. /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
  100. ///
  101. /// Valid next states:
  102. /// - `ready`
  103. /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
  104. /// to re-establish the connection)
  105. /// - `shutdown`
  106. case active(ConnectedState)
  107. /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
  108. /// the channel for making RPCs.
  109. ///
  110. /// Valid next states:
  111. /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
  112. /// - `transientFailure` (we encountered an error and will re-establish the connection)
  113. /// - `shutdown`
  114. case ready(ReadyState)
  115. /// A `Channel` is desired, we'll attempt to create one in the future.
  116. ///
  117. /// Valid next states:
  118. /// - `connecting`
  119. /// - `shutdown`
  120. case transientFailure(TransientFailureState)
  121. /// We never want another `Channel`: this state is terminal.
  122. case shutdown(ShutdownState)
  123. }
  124. private var state: State {
  125. didSet {
  126. switch self.state {
  127. case .idle:
  128. self.monitor.updateState(to: .idle, logger: self.logger)
  129. // Create a new id; it'll be used for the *next* channel we create.
  130. self.channelNumber &+= 1
  131. self.logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionId)/\(self.channelNumber)"
  132. case .connecting:
  133. self.monitor.updateState(to: .connecting, logger: self.logger)
  134. // This is an internal state.
  135. case .active:
  136. ()
  137. case .ready:
  138. self.monitor.updateState(to: .ready, logger: self.logger)
  139. case .transientFailure:
  140. self.monitor.updateState(to: .transientFailure, logger: self.logger)
  141. case .shutdown:
  142. self.monitor.updateState(to: .shutdown, logger: self.logger)
  143. }
  144. }
  145. }
  146. internal let eventLoop: EventLoop
  147. internal let monitor: ConnectivityStateMonitor
  148. internal var logger: Logger
  149. private let connectionId: String
  150. private var channelNumber: UInt64
  151. // Only used for testing.
  152. private var channelProvider: (() -> EventLoopFuture<Channel>)?
  153. internal convenience init(configuration: ClientConnection.Configuration, logger: Logger) {
  154. self.init(configuration: configuration, logger: logger, channelProvider: nil)
  155. }
  156. /// Create a `ConnectionManager` for testing: uses the given `channelProvider` to create channels.
  157. internal static func testingOnly(
  158. configuration: ClientConnection.Configuration,
  159. logger: Logger,
  160. channelProvider: @escaping () -> EventLoopFuture<Channel>
  161. ) -> ConnectionManager {
  162. return ConnectionManager(
  163. configuration: configuration,
  164. logger: logger,
  165. channelProvider: channelProvider
  166. )
  167. }
  168. private init(
  169. configuration: ClientConnection.Configuration,
  170. logger: Logger,
  171. channelProvider: (() -> EventLoopFuture<Channel>)?
  172. ) {
  173. // Setup the logger.
  174. var logger = logger
  175. let connectionId = UUID().uuidString
  176. let channelNumber: UInt64 = 0
  177. logger[metadataKey: MetadataKey.connectionID] = "\(connectionId)/\(channelNumber)"
  178. let eventLoop = configuration.eventLoopGroup.next()
  179. self.eventLoop = eventLoop
  180. self.state = .idle(IdleState(configuration: configuration))
  181. self.monitor = ConnectivityStateMonitor(
  182. delegate: configuration.connectivityStateDelegate,
  183. queue: configuration.connectivityStateDelegateQueue
  184. )
  185. self.channelProvider = channelProvider
  186. self.connectionId = connectionId
  187. self.channelNumber = channelNumber
  188. self.logger = logger
  189. }
  190. /// Returns a future for a connected channel.
  191. internal func getChannel() -> EventLoopFuture<Channel> {
  192. return self.eventLoop.flatSubmit {
  193. switch self.state {
  194. case .idle:
  195. self.startConnecting()
  196. // We started connecting so we must transition to the `connecting` state.
  197. guard case .connecting(let connecting) = self.state else {
  198. self.invalidState()
  199. }
  200. return connecting.readyChannelPromise.futureResult
  201. case .connecting(let state):
  202. return state.readyChannelPromise.futureResult
  203. case .active(let state):
  204. return state.readyChannelPromise.futureResult
  205. case .ready(let state):
  206. return state.channel.eventLoop.makeSucceededFuture(state.channel)
  207. case .transientFailure(let state):
  208. return state.readyChannelPromise.futureResult
  209. case .shutdown:
  210. return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  211. }
  212. }
  213. }
  214. /// Returns a future for the current channel, or future channel from the current connection
  215. /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
  216. ///
  217. /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
  218. internal func getOptimisticChannel() -> EventLoopFuture<Channel> {
  219. return self.eventLoop.flatSubmit {
  220. switch self.state {
  221. case .idle:
  222. self.startConnecting()
  223. // We started connecting so we must transition to the `connecting` state.
  224. guard case .connecting(let connecting) = self.state else {
  225. self.invalidState()
  226. }
  227. return connecting.candidate
  228. case .connecting(let state):
  229. return state.candidate
  230. case .active(let state):
  231. return state.candidate.eventLoop.makeSucceededFuture(state.candidate)
  232. case .ready(let state):
  233. return state.channel.eventLoop.makeSucceededFuture(state.channel)
  234. case .transientFailure:
  235. return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
  236. case .shutdown:
  237. return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  238. }
  239. }
  240. }
  241. /// Shutdown any connection which exists. This is a request from the application.
  242. internal func shutdown() -> EventLoopFuture<Void> {
  243. return self.eventLoop.flatSubmit {
  244. let shutdown: ShutdownState
  245. switch self.state {
  246. // We don't have a channel and we don't want one, easy!
  247. case .idle:
  248. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  249. self.state = .shutdown(shutdown)
  250. // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
  251. // the shutdown future and deal with any fallout from the connecting channel without the
  252. // application knowing.
  253. case .connecting(let state):
  254. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  255. self.state = .shutdown(shutdown)
  256. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  257. // connect the application shouldn't should have access to the channel.
  258. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  259. // In case we do successfully connect, close immediately.
  260. state.candidate.whenSuccess {
  261. $0.close(mode: .all, promise: nil)
  262. }
  263. // We have an active channel but the application doesn't know about it yet. We'll do the same
  264. // as for `.connecting`.
  265. case .active(let state):
  266. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  267. self.state = .shutdown(shutdown)
  268. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  269. // connect the application shouldn't should have access to the channel.
  270. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  271. // We have a channel, close it.
  272. state.candidate.close(mode: .all, promise: nil)
  273. // The channel is up and running: the application could be using it. We can close it and
  274. // return the `closeFuture`.
  275. case .ready(let state):
  276. shutdown = ShutdownState(closeFuture: state.channel.closeFuture)
  277. self.state = .shutdown(shutdown)
  278. // We have a channel, close it.
  279. state.channel.close(mode: .all, promise: nil)
  280. // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
  281. // do the same but also cancel any scheduled connection attempts and deal with any fallout
  282. // if we cancelled too late.
  283. case .transientFailure(let state):
  284. // Stop the creation of a new channel, if we can. If we can't then the task to
  285. // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
  286. state.scheduled.cancel()
  287. shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
  288. self.state = .shutdown(shutdown)
  289. // Fail the ready channel promise: we're shutting down so even if we manage to successfully
  290. // connect the application shouldn't should have access to the channel.
  291. state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  292. // We're already shutdown; nothing to do.
  293. case .shutdown(let state):
  294. shutdown = state
  295. }
  296. return shutdown.closeFuture
  297. }
  298. }
  299. // MARK: - State changes from the channel handler.
  300. /// The connecting channel became `active`. Must be called on the `EventLoop`.
  301. internal func channelActive(channel: Channel) {
  302. self.eventLoop.preconditionInEventLoop()
  303. switch self.state {
  304. case .connecting(let connecting):
  305. self.state = .active(ConnectedState(from: connecting, candidate: channel))
  306. // Application called shutdown before the channel become active; we should close it.
  307. case .shutdown:
  308. channel.close(mode: .all, promise: nil)
  309. case .idle, .active, .ready, .transientFailure:
  310. self.invalidState()
  311. }
  312. }
  313. /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
  314. /// Must be called on the `EventLoop`.
  315. internal func channelInactive() {
  316. self.eventLoop.preconditionInEventLoop()
  317. switch self.state {
  318. // The channel is `active` but not `ready`. Should we try again?
  319. case .active(let active):
  320. switch active.reconnect {
  321. // No, shutdown instead.
  322. case .none:
  323. self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
  324. active.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  325. // Yes, after some time.
  326. case .after(let delay):
  327. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  328. self.startConnecting()
  329. }
  330. self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
  331. }
  332. // The channel was ready and working fine but something went wrong. Should we try to replace
  333. // the channel?
  334. case .ready(let ready):
  335. // No, no backoff is configured.
  336. if ready.configuration.connectionBackoff == nil {
  337. self.state = .shutdown(ShutdownState(closeFuture: ready.channel.closeFuture))
  338. } else {
  339. // Yes, start connecting now. We should go via `transientFailure`, however.
  340. let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
  341. self.startConnecting()
  342. }
  343. self.state = .transientFailure(TransientFailureState(from: ready, scheduled: scheduled))
  344. }
  345. // This is fine: we expect the channel to become inactive after becoming idle.
  346. case .idle:
  347. ()
  348. // We're already shutdown, that's fine.
  349. case .shutdown:
  350. ()
  351. case .connecting, .transientFailure:
  352. self.invalidState()
  353. }
  354. }
  355. /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
  356. /// called on the `EventLoop`.
  357. internal func ready() {
  358. self.eventLoop.preconditionInEventLoop()
  359. switch self.state {
  360. case .active(let connected):
  361. self.state = .ready(ReadyState(from: connected))
  362. connected.readyChannelPromise.succeed(connected.candidate)
  363. case .shutdown:
  364. ()
  365. case .idle, .transientFailure, .connecting, .ready:
  366. self.invalidState()
  367. }
  368. }
  369. /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
  370. /// the `EventLoop`.
  371. internal func idle() {
  372. self.eventLoop.preconditionInEventLoop()
  373. switch self.state {
  374. case .ready(let state):
  375. self.state = .idle(IdleState(configuration: state.configuration))
  376. case .idle, .connecting, .transientFailure, .active, .shutdown:
  377. self.invalidState()
  378. }
  379. }
  380. }
  381. extension ConnectionManager {
  382. // A connection attempt failed; we never established a connection.
  383. private func connectionFailed(withError error: Error) {
  384. self.eventLoop.preconditionInEventLoop()
  385. switch self.state {
  386. case .connecting(let connecting):
  387. // Should we reconnect?
  388. switch connecting.reconnect {
  389. // No, shutdown.
  390. case .none:
  391. connecting.readyChannelPromise.fail(error)
  392. self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
  393. // Yes, after a delay.
  394. case .after(let delay):
  395. let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
  396. self.startConnecting()
  397. }
  398. self.state = .transientFailure(TransientFailureState(from: connecting, scheduled: scheduled))
  399. }
  400. // The application must have called shutdown while we were trying to establish a connection
  401. // which was doomed to fail anyway. That's fine, we can ignore this.
  402. case .shutdown:
  403. ()
  404. // We can't fail to connect if we aren't trying.
  405. case .idle, .active, .ready, .transientFailure:
  406. self.invalidState()
  407. }
  408. }
  409. }
  410. extension ConnectionManager {
  411. // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
  412. // states. Must be called on the `EventLoop`.
  413. private func startConnecting() {
  414. switch self.state {
  415. case .idle(let state):
  416. let iterator = state.configuration.connectionBackoff?.makeIterator()
  417. self.startConnecting(
  418. configuration: state.configuration,
  419. backoffIterator: iterator,
  420. channelPromise: self.eventLoop.makePromise()
  421. )
  422. case .transientFailure(let pending):
  423. self.startConnecting(
  424. configuration: pending.configuration,
  425. backoffIterator: pending.backoffIterator,
  426. channelPromise: pending.readyChannelPromise
  427. )
  428. // We shutdown before a scheduled connection attempt had started.
  429. case .shutdown:
  430. ()
  431. case .connecting, .active, .ready:
  432. self.invalidState()
  433. }
  434. }
  435. private func startConnecting(
  436. configuration: ClientConnection.Configuration,
  437. backoffIterator: ConnectionBackoffIterator?,
  438. channelPromise: EventLoopPromise<Channel>
  439. ) {
  440. let timeoutAndBackoff = backoffIterator?.next()
  441. // We're already on the event loop: submit the connect so it starts after we've made the
  442. // state change to `.connecting`.
  443. self.eventLoop.assertInEventLoop()
  444. let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
  445. let channel = self.makeChannel(
  446. configuration: configuration,
  447. connectTimeout: timeoutAndBackoff?.timeout
  448. )
  449. channel.whenFailure { error in
  450. self.connectionFailed(withError: error)
  451. }
  452. return channel
  453. }
  454. // Should we reconnect if the candidate channel fails?
  455. let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
  456. let connecting = ConnectingState(
  457. configuration: configuration,
  458. backoffIterator: backoffIterator,
  459. reconnect: reconnect,
  460. readyChannelPromise: channelPromise,
  461. candidate: candidate
  462. )
  463. self.state = .connecting(connecting)
  464. }
  465. }
  466. extension ConnectionManager {
  467. private func invalidState(
  468. function: StaticString = #function,
  469. file: StaticString = #file,
  470. line: UInt = #line
  471. ) -> Never {
  472. preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
  473. }
  474. }
  475. extension ConnectionManager {
  476. private func makeBootstrap(
  477. configuration: ClientConnection.Configuration,
  478. connectTimeout: TimeInterval?
  479. ) -> ClientBootstrapProtocol {
  480. let serverHostname: String? = configuration.tls.flatMap { tls -> String? in
  481. if let hostnameOverride = tls.hostnameOverride {
  482. return hostnameOverride
  483. } else {
  484. return configuration.target.host
  485. }
  486. }.flatMap { hostname in
  487. if hostname.isIPAddress {
  488. return nil
  489. } else {
  490. return hostname
  491. }
  492. }
  493. let bootstrap = PlatformSupport.makeClientBootstrap(group: self.eventLoop, logger: self.logger)
  494. .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  495. .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  496. .channelInitializer { channel in
  497. channel.configureGRPCClient(
  498. httpTargetWindowSize: configuration.httpTargetWindowSize,
  499. tlsConfiguration: configuration.tls?.configuration,
  500. tlsServerHostname: serverHostname,
  501. connectionManager: self,
  502. connectionIdleTimeout: configuration.connectionIdleTimeout,
  503. errorDelegate: configuration.errorDelegate,
  504. logger: self.logger
  505. )
  506. }
  507. if let connectTimeout = connectTimeout {
  508. return bootstrap.connectTimeout(.seconds(timeInterval: connectTimeout))
  509. } else {
  510. return bootstrap
  511. }
  512. }
  513. private func makeChannel(
  514. configuration: ClientConnection.Configuration,
  515. connectTimeout: TimeInterval?
  516. ) -> EventLoopFuture<Channel> {
  517. if let provider = self.channelProvider {
  518. return provider()
  519. } else {
  520. let bootstrap = self.makeBootstrap(
  521. configuration: configuration,
  522. connectTimeout: connectTimeout
  523. )
  524. return bootstrap.connect(to: configuration.target)
  525. }
  526. }
  527. }