ConnectionPool.swift 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import Logging
  18. import NIOConcurrencyHelpers
  19. import NIOCore
  20. import NIOHTTP2
  21. @usableFromInline
  22. internal final class ConnectionPool {
  23. /// The event loop all connections in this pool are running on.
  24. @usableFromInline
  25. internal let eventLoop: EventLoop
  26. @usableFromInline
  27. internal enum State {
  28. case active
  29. case shuttingDown(EventLoopFuture<Void>)
  30. case shutdown
  31. }
  32. /// The state of the connection pool.
  33. @usableFromInline
  34. internal var _state: State = .active
  35. /// The most recent connection error we have observed.
  36. ///
  37. /// This error is used to provide additional context to failed waiters. A waiter may, for example,
  38. /// timeout because the pool is busy, or because no connection can be established because of an
  39. /// underlying connection error. In the latter case it's useful for the caller to know why the
  40. /// connection is failing at the RPC layer.
  41. ///
  42. /// This value is cleared when a connection becomes 'available'. That is, when we receive an
  43. /// http/2 SETTINGS frame.
  44. ///
  45. /// This value is set whenever an underlying connection transitions to the transient failure state
  46. /// or to the idle state and has an associated error.
  47. @usableFromInline
  48. internal var _mostRecentError: Error? = nil
  49. /// Connection managers and their stream availability state keyed by the ID of the connection
  50. /// manager.
  51. ///
  52. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  53. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  54. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  55. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  56. /// be a better choice given the number of connections is likely to be very low in practice).
  57. @usableFromInline
  58. internal var _connections: [ConnectionManagerID: PerConnectionState]
  59. /// The threshold which if exceeded when creating a stream determines whether the pool will
  60. /// start connecting an idle connection (if one exists).
  61. ///
  62. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  63. /// and the number of reserved streams) and the total number of streams which non-idle connections
  64. /// could support (this includes the streams that a connection in the connecting state could
  65. /// support).
  66. @usableFromInline
  67. internal let reservationLoadThreshold: Double
  68. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  69. /// assume a connection will support this many streams until we know better.
  70. @usableFromInline
  71. internal let assumedMaxConcurrentStreams: Int
  72. /// A queue of waiters which may or may not get a stream in the future.
  73. @usableFromInline
  74. internal var waiters: CircularBuffer<Waiter>
  75. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  76. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  77. @usableFromInline
  78. internal let maxWaiters: Int
  79. /// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
  80. /// In other words, it's the number of connections for which we should ignore idle timers.
  81. @usableFromInline
  82. internal let minConnections: Int
  83. /// Configuration for backoff between subsequence connection attempts.
  84. @usableFromInline
  85. internal let connectionBackoff: ConnectionBackoff
  86. /// Provides a channel factory to the `ConnectionManager`.
  87. @usableFromInline
  88. internal let channelProvider: ConnectionManagerChannelProvider
  89. /// The object to notify about changes to stream reservations; in practice this is usually
  90. /// the `PoolManager`.
  91. @usableFromInline
  92. internal let streamLender: StreamLender
  93. @usableFromInline
  94. internal var delegate: GRPCConnectionPoolDelegate?
  95. /// A logger.
  96. @usableFromInline
  97. internal let logger: Logger
  98. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  99. @usableFromInline
  100. internal let now: () -> NIODeadline
  101. /// The ID of this sub-pool.
  102. @usableFromInline
  103. internal let id: GRPCSubPoolID
  104. /// Logging metadata keys.
  105. @usableFromInline
  106. internal enum Metadata {
  107. /// The ID of this pool.
  108. @usableFromInline
  109. static let id = "pool.id"
  110. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  111. @usableFromInline
  112. static let reservationsCount = "pool.reservations.count"
  113. /// The number of streams this pool can support with non-idle connections at this time.
  114. @usableFromInline
  115. static let reservationsCapacity = "pool.reservations.capacity"
  116. /// The current reservation load (i.e. reservation count / reservation capacity)
  117. @usableFromInline
  118. static let reservationsLoad = "pool.reservations.load"
  119. /// The reservation load threshold, above which a new connection will be created (if possible).
  120. @usableFromInline
  121. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  122. /// The current number of waiters in the pool.
  123. @usableFromInline
  124. static let waitersCount = "pool.waiters.count"
  125. /// The maximum number of waiters the pool is configured to allow.
  126. @usableFromInline
  127. static let waitersMax = "pool.waiters.max"
  128. /// The number of waiters which were successfully serviced.
  129. @usableFromInline
  130. static let waitersServiced = "pool.waiters.serviced"
  131. /// The ID of waiter.
  132. @usableFromInline
  133. static let waiterID = "pool.waiter.id"
  134. /// The maximum number of connections allowed in the pool.
  135. @usableFromInline
  136. static let connectionsMax = "pool.connections.max"
  137. /// The number of connections in the ready state.
  138. @usableFromInline
  139. static let connectionsReady = "pool.connections.ready"
  140. /// The number of connections in the connecting state.
  141. @usableFromInline
  142. static let connectionsConnecting = "pool.connections.connecting"
  143. /// The number of connections in the transient failure state.
  144. @usableFromInline
  145. static let connectionsTransientFailure = "pool.connections.transientFailure"
  146. }
  147. @usableFromInline
  148. init(
  149. eventLoop: EventLoop,
  150. maxWaiters: Int,
  151. minConnections: Int,
  152. reservationLoadThreshold: Double,
  153. assumedMaxConcurrentStreams: Int,
  154. connectionBackoff: ConnectionBackoff,
  155. channelProvider: ConnectionManagerChannelProvider,
  156. streamLender: StreamLender,
  157. delegate: GRPCConnectionPoolDelegate?,
  158. logger: Logger,
  159. now: @escaping () -> NIODeadline = NIODeadline.now
  160. ) {
  161. precondition(
  162. (0.0 ... 1.0).contains(reservationLoadThreshold),
  163. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  164. )
  165. self.reservationLoadThreshold = reservationLoadThreshold
  166. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  167. self._connections = [:]
  168. self.maxWaiters = maxWaiters
  169. self.minConnections = minConnections
  170. self.waiters = CircularBuffer(initialCapacity: 16)
  171. self.eventLoop = eventLoop
  172. self.connectionBackoff = connectionBackoff
  173. self.channelProvider = channelProvider
  174. self.streamLender = streamLender
  175. self.delegate = delegate
  176. self.now = now
  177. let id = GRPCSubPoolID.next()
  178. var logger = logger
  179. logger[metadataKey: Metadata.id] = "\(id)"
  180. self.id = id
  181. self.logger = logger
  182. }
  183. /// Initialize the connection pool.
  184. ///
  185. /// - Parameter connections: The number of connections to add to the pool.
  186. internal func initialize(connections: Int) {
  187. assert(self._connections.isEmpty)
  188. self.logger.debug(
  189. "initializing new sub-pool",
  190. metadata: [
  191. Metadata.waitersMax: .stringConvertible(self.maxWaiters),
  192. Metadata.connectionsMax: .stringConvertible(connections),
  193. ]
  194. )
  195. self._connections.reserveCapacity(connections)
  196. var numberOfKeepOpenConnections = self.minConnections
  197. while self._connections.count < connections {
  198. // If we have less than the minimum number of connections, don't let
  199. // the new connection close when idle.
  200. let idleBehavior =
  201. numberOfKeepOpenConnections > 0
  202. ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
  203. numberOfKeepOpenConnections -= 1
  204. self.addConnectionToPool(idleBehavior: idleBehavior)
  205. }
  206. }
  207. /// Make and add a new connection to the pool.
  208. private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
  209. let manager = ConnectionManager(
  210. eventLoop: self.eventLoop,
  211. channelProvider: self.channelProvider,
  212. callStartBehavior: .waitsForConnectivity,
  213. idleBehavior: idleBehavior,
  214. connectionBackoff: self.connectionBackoff,
  215. connectivityDelegate: self,
  216. http2Delegate: self,
  217. logger: self.logger
  218. )
  219. let id = manager.id
  220. self._connections[id] = PerConnectionState(manager: manager)
  221. self.delegate?.connectionAdded(id: .init(id))
  222. // If it's one of the connections that should be kept open, then connect
  223. // straight away.
  224. switch idleBehavior {
  225. case .neverGoIdle:
  226. self.eventLoop.execute {
  227. if manager.sync.isIdle {
  228. manager.sync.startConnecting()
  229. }
  230. }
  231. case .closeWhenIdleTimeout:
  232. ()
  233. }
  234. }
  235. // MARK: - Called from the pool manager
  236. /// Make and initialize an HTTP/2 stream `Channel`.
  237. ///
  238. /// - Parameters:
  239. /// - deadline: The point in time by which the `promise` must have been resolved.
  240. /// - promise: A promise for a `Channel`.
  241. /// - logger: A request logger.
  242. /// - initializer: A closure to initialize the `Channel` with.
  243. @inlinable
  244. internal func makeStream(
  245. deadline: NIODeadline,
  246. promise: EventLoopPromise<Channel>,
  247. logger: Logger,
  248. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  249. ) {
  250. if self.eventLoop.inEventLoop {
  251. self._makeStream(
  252. deadline: deadline,
  253. promise: promise,
  254. logger: logger,
  255. initializer: initializer
  256. )
  257. } else {
  258. self.eventLoop.execute {
  259. self._makeStream(
  260. deadline: deadline,
  261. promise: promise,
  262. logger: logger,
  263. initializer: initializer
  264. )
  265. }
  266. }
  267. }
  268. /// See `makeStream(deadline:promise:logger:initializer:)`.
  269. @inlinable
  270. internal func makeStream(
  271. deadline: NIODeadline,
  272. logger: Logger,
  273. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  274. ) -> EventLoopFuture<Channel> {
  275. let promise = self.eventLoop.makePromise(of: Channel.self)
  276. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  277. return promise.futureResult
  278. }
  279. /// Shutdown the connection pool.
  280. ///
  281. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  282. /// calls to `makeStream` will be failed immediately.
  283. ///
  284. /// - Parameter mode: The mode to use when shutting down.
  285. /// - Returns: A future indicated when shutdown has been completed.
  286. internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
  287. let promise = self.eventLoop.makePromise(of: Void.self)
  288. if self.eventLoop.inEventLoop {
  289. self._shutdown(mode: mode, promise: promise)
  290. } else {
  291. self.eventLoop.execute {
  292. self._shutdown(mode: mode, promise: promise)
  293. }
  294. }
  295. return promise.futureResult
  296. }
  297. /// See `makeStream(deadline:promise:logger:initializer:)`.
  298. ///
  299. /// - Important: Must be called on the pool's `EventLoop`.
  300. @inlinable
  301. internal func _makeStream(
  302. deadline: NIODeadline,
  303. promise: EventLoopPromise<Channel>,
  304. logger: Logger,
  305. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  306. ) {
  307. self.eventLoop.assertInEventLoop()
  308. guard case .active = self._state else {
  309. // Fail the promise right away if we're shutting down or already shut down.
  310. promise.fail(GRPCConnectionPoolError.shutdown)
  311. return
  312. }
  313. // Try to make a stream on an existing connection.
  314. let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
  315. if !streamCreated {
  316. // No stream was created, wait for one.
  317. self._enqueueWaiter(
  318. deadline: deadline,
  319. promise: promise,
  320. logger: logger,
  321. initializer: initializer
  322. )
  323. }
  324. }
  325. /// Try to find an existing connection on which we can make a stream.
  326. ///
  327. /// - Parameters:
  328. /// - promise: A promise to succeed if we can make a stream.
  329. /// - initializer: A closure to initialize the stream with.
  330. /// - Returns: A boolean value indicating whether the stream was created or not.
  331. @inlinable
  332. internal func _tryMakeStream(
  333. promise: EventLoopPromise<Channel>,
  334. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  335. ) -> Bool {
  336. // We shouldn't jump the queue.
  337. guard self.waiters.isEmpty else {
  338. return false
  339. }
  340. // Reserve a stream, if we can.
  341. guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
  342. return false
  343. }
  344. multiplexer.createStreamChannel(promise: promise, initializer)
  345. // Has reserving another stream tipped us over the limit for needing another connection?
  346. if self._shouldBringUpAnotherConnection() {
  347. self._startConnectingIdleConnection()
  348. }
  349. return true
  350. }
  351. /// Enqueue a waiter to be provided with a stream at some point in the future.
  352. ///
  353. /// - Parameters:
  354. /// - deadline: The point in time by which the promise should have been completed.
  355. /// - promise: The promise to complete with the `Channel`.
  356. /// - logger: A logger.
  357. /// - initializer: A closure to initialize the `Channel` with.
  358. @inlinable
  359. internal func _enqueueWaiter(
  360. deadline: NIODeadline,
  361. promise: EventLoopPromise<Channel>,
  362. logger: Logger,
  363. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  364. ) {
  365. // Don't overwhelm the pool with too many waiters.
  366. guard self.waiters.count < self.maxWaiters else {
  367. logger.trace(
  368. "connection pool has too many waiters",
  369. metadata: [
  370. Metadata.waitersMax: .stringConvertible(self.maxWaiters)
  371. ]
  372. )
  373. promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
  374. return
  375. }
  376. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  377. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  378. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  379. // earliest (even if the deadline has already passed).
  380. waiter.scheduleTimeout(on: self.eventLoop) {
  381. waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
  382. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  383. self.waiters.remove(at: index)
  384. logger.trace(
  385. "timed out waiting for a connection",
  386. metadata: [
  387. Metadata.waiterID: "\(waiter.id)",
  388. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  389. ]
  390. )
  391. }
  392. }
  393. // request logger
  394. logger.debug(
  395. "waiting for a connection to become available",
  396. metadata: [
  397. Metadata.waiterID: "\(waiter.id)",
  398. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  399. ]
  400. )
  401. self.waiters.append(waiter)
  402. // pool logger
  403. self.logger.trace(
  404. "enqueued connection waiter",
  405. metadata: [
  406. Metadata.waitersCount: .stringConvertible(self.waiters.count)
  407. ]
  408. )
  409. if self._shouldBringUpAnotherConnection() {
  410. self._startConnectingIdleConnection()
  411. }
  412. }
  413. /// Compute the current demand and capacity for streams.
  414. ///
  415. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  416. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  417. /// connections.
  418. ///
  419. /// - Returns: A tuple of the demand and capacity for streams.
  420. @usableFromInline
  421. internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  422. let demand = self.sync.reservedStreams + self.sync.waiters
  423. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  424. let capacity = self._connections.values.reduce(0) { sum, state in
  425. if state.manager.sync.isIdle || state.isQuiescing {
  426. // Idle connection or quiescing (so the capacity should be ignored).
  427. return sum
  428. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  429. // A known value of max concurrent streams, i.e. the connection is active.
  430. return sum + knownMaxAvailableStreams
  431. } else {
  432. // Not idle and no known value, the connection must be connecting so use our assumed value.
  433. return sum + self.assumedMaxConcurrentStreams
  434. }
  435. }
  436. return (demand, capacity)
  437. }
  438. /// Returns whether the pool should start connecting an idle connection (if one exists).
  439. @usableFromInline
  440. internal func _shouldBringUpAnotherConnection() -> Bool {
  441. let (demand, capacity) = self._computeStreamDemandAndCapacity()
  442. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  443. // will always be greater than the threshold and a new connection will be spun up.
  444. let load = Double(demand) / Double(capacity)
  445. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  446. if loadExceedsThreshold {
  447. self.logger.debug(
  448. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  449. metadata: [
  450. Metadata.reservationsCount: .stringConvertible(demand),
  451. Metadata.reservationsCapacity: .stringConvertible(capacity),
  452. Metadata.reservationsLoad: .stringConvertible(load),
  453. Metadata.reservationsLoadThreshold: .stringConvertible(self.reservationLoadThreshold),
  454. ]
  455. )
  456. }
  457. return loadExceedsThreshold
  458. }
  459. /// Starts connecting an idle connection, if one exists.
  460. @usableFromInline
  461. internal func _startConnectingIdleConnection() {
  462. if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  463. self._connections.values[index].manager.sync.startConnecting()
  464. } else {
  465. let connecting = self._connections.values.count { $0.manager.sync.isConnecting }
  466. let ready = self._connections.values.count { $0.manager.sync.isReady }
  467. let transientFailure = self._connections.values.count { $0.manager.sync.isTransientFailure }
  468. self.logger.debug(
  469. "no idle connections in pool",
  470. metadata: [
  471. Metadata.connectionsConnecting: .stringConvertible(connecting),
  472. Metadata.connectionsReady: .stringConvertible(ready),
  473. Metadata.connectionsTransientFailure: .stringConvertible(transientFailure),
  474. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  475. ]
  476. )
  477. }
  478. }
  479. /// Returns the index in `self.connections.values` of the connection with the most available
  480. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  481. /// available.
  482. ///
  483. /// - Note: this is linear in the number of connections.
  484. @usableFromInline
  485. internal func _mostAvailableConnectionIndex()
  486. -> Dictionary<ConnectionManagerID, PerConnectionState>.Index
  487. {
  488. var index = self._connections.values.startIndex
  489. var selectedIndex = self._connections.values.endIndex
  490. var mostAvailableStreams = 0
  491. while index != self._connections.values.endIndex {
  492. let availableStreams = self._connections.values[index].availableStreams
  493. if availableStreams > mostAvailableStreams {
  494. mostAvailableStreams = availableStreams
  495. selectedIndex = index
  496. }
  497. self._connections.values.formIndex(after: &index)
  498. }
  499. return selectedIndex
  500. }
  501. /// Reserves a stream from the connection with the most available streams, if one exists.
  502. ///
  503. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  504. /// or `nil` if no stream could be reserved.
  505. @usableFromInline
  506. internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  507. let index = self._mostAvailableConnectionIndex()
  508. if index != self._connections.endIndex {
  509. // '!' is okay here; the most available connection must have at least one stream available
  510. // to reserve.
  511. return self._connections.values[index].reserveStream()!
  512. } else {
  513. return nil
  514. }
  515. }
  516. /// See `shutdown(mode:)`.
  517. ///
  518. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  519. @usableFromInline
  520. internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  521. self.eventLoop.assertInEventLoop()
  522. switch self._state {
  523. case .active:
  524. self.logger.debug("shutting down connection pool")
  525. // We're shutting down now and when that's done we'll be fully shutdown.
  526. self._state = .shuttingDown(promise.futureResult)
  527. promise.futureResult.whenComplete { _ in
  528. self._state = .shutdown
  529. self.delegate = nil
  530. self.logger.trace("finished shutting down connection pool")
  531. }
  532. // Shutdown all the connections and remove them from the pool.
  533. let connections = self._connections
  534. self._connections.removeAll()
  535. let allShutdown: [EventLoopFuture<Void>] = connections.values.map {
  536. let id = $0.manager.id
  537. let manager = $0.manager
  538. return manager.eventLoop.flatSubmit {
  539. // If the connection was idle/shutdown before calling shutdown then we shouldn't tell
  540. // the delegate the connection closed (because it either never connected or was already
  541. // informed about this).
  542. let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown
  543. return manager.shutdown(mode: mode).always { _ in
  544. if !connectionIsInactive {
  545. self.delegate?.connectionClosed(id: .init(id), error: nil)
  546. }
  547. self.delegate?.connectionRemoved(id: .init(id))
  548. }
  549. }
  550. }
  551. // Fail the outstanding waiters.
  552. while let waiter = self.waiters.popFirst() {
  553. waiter.fail(GRPCConnectionPoolError.shutdown)
  554. }
  555. // Cascade the result of the shutdown into the promise.
  556. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  557. case let .shuttingDown(future):
  558. // We're already shutting down, cascade the result.
  559. promise.completeWith(future)
  560. case .shutdown:
  561. // Already shutdown, fine.
  562. promise.succeed(())
  563. }
  564. }
  565. internal func stats() -> EventLoopFuture<GRPCSubPoolStats> {
  566. let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self)
  567. if self.eventLoop.inEventLoop {
  568. self._stats(promise: promise)
  569. } else {
  570. self.eventLoop.execute {
  571. self._stats(promise: promise)
  572. }
  573. }
  574. return promise.futureResult
  575. }
  576. private func _stats(promise: EventLoopPromise<GRPCSubPoolStats>) {
  577. self.eventLoop.assertInEventLoop()
  578. var stats = GRPCSubPoolStats(id: self.id)
  579. for connection in self._connections.values {
  580. let sync = connection.manager.sync
  581. if sync.isIdle {
  582. stats.connectionStates.idle += 1
  583. } else if sync.isConnecting {
  584. stats.connectionStates.connecting += 1
  585. } else if sync.isReady {
  586. stats.connectionStates.ready += 1
  587. } else if sync.isTransientFailure {
  588. stats.connectionStates.transientFailure += 1
  589. }
  590. stats.streamsInUse += connection.reservedStreams
  591. stats.streamsFreeToUse += connection.availableStreams
  592. }
  593. stats.rpcsWaiting += self.waiters.count
  594. promise.succeed(stats)
  595. }
  596. }
  597. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  598. // We're interested in a few different situations here:
  599. //
  600. // 1. The connection was usable ('ready') and is no longer usable (either it became idle or
  601. // encountered an error. If this happens we need to notify any connections of the change as
  602. // they may no longer be used for new RPCs.
  603. // 2. The connection was not usable but moved to a different unusable state. If this happens and
  604. // we know the cause of the state transition (i.e. the error) then we need to update our most
  605. // recent error with the error. This information is used when failing waiters to provide some
  606. // context as to why they may be failing.
  607. func connectionStateDidChange(
  608. _ manager: ConnectionManager,
  609. from oldState: _ConnectivityState,
  610. to newState: _ConnectivityState
  611. ) {
  612. switch (oldState, newState) {
  613. case let (.ready, .transientFailure(error)),
  614. let (.ready, .idle(.some(error))):
  615. self.updateMostRecentError(error)
  616. self.connectionUnavailable(manager.id)
  617. case (.ready, .idle(.none)),
  618. (.ready, .shutdown):
  619. self.connectionUnavailable(manager.id)
  620. case let (_, .transientFailure(error)),
  621. let (_, .idle(.some(error))):
  622. self.updateMostRecentError(error)
  623. default:
  624. ()
  625. }
  626. guard let delegate = self.delegate else { return }
  627. switch (oldState, newState) {
  628. case (.idle, .connecting),
  629. (.transientFailure, .connecting):
  630. delegate.startedConnecting(id: .init(manager.id))
  631. case (.connecting, .ready):
  632. // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'.
  633. ()
  634. case (.ready, .idle):
  635. delegate.connectionClosed(id: .init(manager.id), error: nil)
  636. case let (.ready, .transientFailure(error)):
  637. delegate.connectionClosed(id: .init(manager.id), error: error)
  638. case let (.connecting, .transientFailure(error)):
  639. delegate.connectFailed(id: .init(manager.id), error: error)
  640. default:
  641. ()
  642. }
  643. }
  644. func connectionIsQuiescing(_ manager: ConnectionManager) {
  645. self.eventLoop.assertInEventLoop()
  646. // Find the relevant connection.
  647. guard let index = self._connections.index(forKey: manager.id) else {
  648. return
  649. }
  650. // Drop the connectivity delegate, we're no longer interested in its events now.
  651. manager.sync.connectivityDelegate = nil
  652. // Started quiescing; update our state and notify the pool delegate.
  653. self._connections.values[index].isQuiescing = true
  654. self.delegate?.connectionQuiescing(id: .init(manager.id))
  655. // As the connection is quescing, we need to know when the current connection its managing has
  656. // closed. When that happens drop the H2 delegate and update the pool delegate.
  657. manager.onCurrentConnectionClose { hadActiveConnection in
  658. assert(hadActiveConnection)
  659. if let removed = self._connections.removeValue(forKey: manager.id) {
  660. removed.manager.sync.http2Delegate = nil
  661. removed.manager.sync.shutdownNow() // Manager may have internal state to tear down.
  662. self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil)
  663. self.delegate?.connectionRemoved(id: .init(removed.manager.id))
  664. }
  665. }
  666. // Grab the number of reserved streams (before invalidating the index by adding a connection).
  667. let reservedStreams = self._connections.values[index].reservedStreams
  668. // Replace the connection with a new idle one. Keep the idle behavior, so that
  669. // if it's a connection that should be kept alive, we maintain it.
  670. self.addConnectionToPool(idleBehavior: manager.idleBehavior)
  671. // Since we're removing this connection from the pool (and no new streams can be created on
  672. // the connection), the pool manager can ignore any streams reserved against this connection.
  673. // We do still care about the number of reserved streams for the connection though
  674. //
  675. // Note: we don't need to adjust the number of available streams as the effective number of
  676. // connections hasn't changed.
  677. self.streamLender.returnStreams(reservedStreams, to: self)
  678. }
  679. private func updateMostRecentError(_ error: Error) {
  680. self.eventLoop.assertInEventLoop()
  681. // Update the last known error if there is one. We will use it to provide some context to
  682. // waiters which may fail.
  683. self._mostRecentError = error
  684. }
  685. /// A connection has become unavailable.
  686. private func connectionUnavailable(_ id: ConnectionManagerID) {
  687. self.eventLoop.assertInEventLoop()
  688. // The connection is no longer available: any streams which haven't been closed will be counted
  689. // as a dropped reservation, we need to tell the pool manager about them.
  690. if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
  691. self.streamLender.returnStreams(droppedReservations, to: self)
  692. }
  693. }
  694. }
  695. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  696. internal func streamOpened(_ manager: ConnectionManager) {
  697. self.eventLoop.assertInEventLoop()
  698. if let utilization = self._connections[manager.id]?.openedStream(),
  699. let delegate = self.delegate
  700. {
  701. delegate.connectionUtilizationChanged(
  702. id: .init(manager.id),
  703. streamsUsed: utilization.used,
  704. streamCapacity: utilization.capacity
  705. )
  706. }
  707. }
  708. internal func streamClosed(_ manager: ConnectionManager) {
  709. self.eventLoop.assertInEventLoop()
  710. guard let index = self._connections.index(forKey: manager.id) else {
  711. return
  712. }
  713. // Return the stream the connection and to the pool manager.
  714. if let utilization = self._connections.values[index].returnStream(),
  715. let delegate = self.delegate
  716. {
  717. delegate.connectionUtilizationChanged(
  718. id: .init(manager.id),
  719. streamsUsed: utilization.used,
  720. streamCapacity: utilization.capacity
  721. )
  722. }
  723. // Return the stream to the pool manager if the connection is available and not quiescing. For
  724. // quiescing connections streams were returned when the connection started quiescing.
  725. if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing {
  726. self.streamLender.returnStreams(1, to: self)
  727. // A stream was returned: we may be able to service a waiter now.
  728. self.tryServiceWaiters()
  729. }
  730. }
  731. internal func receivedSettingsMaxConcurrentStreams(
  732. _ manager: ConnectionManager,
  733. maxConcurrentStreams: Int
  734. ) {
  735. self.eventLoop.assertInEventLoop()
  736. // Find the relevant connection.
  737. guard let index = self._connections.index(forKey: manager.id) else {
  738. return
  739. }
  740. // When the connection is quiescing, the pool manager is not interested in updates to the
  741. // connection, bail out early.
  742. if self._connections.values[index].isQuiescing {
  743. return
  744. }
  745. // If we received a SETTINGS update then a connection is okay: drop the last known error.
  746. self._mostRecentError = nil
  747. let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams)
  748. let delta: Int
  749. if let previousValue = previous {
  750. // There was a previous value of max concurrent streams, i.e. a change in value for an
  751. // existing connection.
  752. delta = maxConcurrentStreams - previousValue
  753. } else {
  754. // There was no previous value so this must be a new connection. We'll compare against our
  755. // assumed default.
  756. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  757. // Notify the delegate.
  758. self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams)
  759. }
  760. if delta != 0 {
  761. self.streamLender.changeStreamCapacity(by: delta, for: self)
  762. }
  763. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  764. self.tryServiceWaiters()
  765. }
  766. }
  767. extension ConnectionPool {
  768. // MARK: - Waiters
  769. /// Try to service as many waiters as possible.
  770. ///
  771. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  772. /// of waiters and `N` is the number of connections.
  773. private func tryServiceWaiters() {
  774. if self.waiters.isEmpty { return }
  775. self.logger.trace(
  776. "servicing waiters",
  777. metadata: [
  778. Metadata.waitersCount: .stringConvertible(self.waiters.count)
  779. ]
  780. )
  781. let now = self.now()
  782. var serviced = 0
  783. while !self.waiters.isEmpty {
  784. if self.waiters.first!.deadlineIsAfter(now) {
  785. if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
  786. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  787. // succeed the waiter.
  788. let waiter = self.waiters.removeFirst()
  789. serviced &+= 1
  790. waiter.succeed(with: multiplexer)
  791. } else {
  792. // There are waiters but no available connections, we're done.
  793. break
  794. }
  795. } else {
  796. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  797. // let its scheduled timeout fail the promise.
  798. self.waiters.removeFirst()
  799. }
  800. }
  801. self.logger.trace(
  802. "done servicing waiters",
  803. metadata: [
  804. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  805. Metadata.waitersServiced: .stringConvertible(serviced),
  806. ]
  807. )
  808. }
  809. }
  810. extension ConnectionPool {
  811. /// Synchronous operations for the pool, mostly used by tests.
  812. internal struct Sync {
  813. private let pool: ConnectionPool
  814. fileprivate init(_ pool: ConnectionPool) {
  815. self.pool = pool
  816. }
  817. /// The number of outstanding connection waiters.
  818. internal var waiters: Int {
  819. self.pool.eventLoop.assertInEventLoop()
  820. return self.pool.waiters.count
  821. }
  822. /// The number of connection currently in the pool (in any state).
  823. internal var connections: Int {
  824. self.pool.eventLoop.assertInEventLoop()
  825. return self.pool._connections.count
  826. }
  827. /// The number of idle connections in the pool.
  828. internal var idleConnections: Int {
  829. self.pool.eventLoop.assertInEventLoop()
  830. return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  831. }
  832. /// The number of active (i.e. connecting or ready) connections in the pool.
  833. internal var activeConnections: Int {
  834. self.pool.eventLoop.assertInEventLoop()
  835. return self.pool._connections.values.reduce(0) {
  836. $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
  837. }
  838. }
  839. /// The number of connections in the pool in transient failure state.
  840. internal var transientFailureConnections: Int {
  841. self.pool.eventLoop.assertInEventLoop()
  842. return self.pool._connections.values.reduce(0) {
  843. $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
  844. }
  845. }
  846. /// The number of streams currently available to reserve across all connections in the pool.
  847. internal var availableStreams: Int {
  848. self.pool.eventLoop.assertInEventLoop()
  849. return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
  850. }
  851. /// The number of streams which have been reserved across all connections in the pool.
  852. internal var reservedStreams: Int {
  853. self.pool.eventLoop.assertInEventLoop()
  854. return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
  855. }
  856. /// Updates the most recent connection error.
  857. internal func updateMostRecentError(_ error: Error) {
  858. self.pool.eventLoop.assertInEventLoop()
  859. self.pool.updateMostRecentError(error)
  860. }
  861. }
  862. internal var sync: Sync {
  863. return Sync(self)
  864. }
  865. }
  866. /// An error thrown from the ``GRPCChannelPool``.
  867. public struct GRPCConnectionPoolError: Error, CustomStringConvertible {
  868. public struct Code: Hashable, Sendable, CustomStringConvertible {
  869. enum Code {
  870. case shutdown
  871. case tooManyWaiters
  872. case deadlineExceeded
  873. }
  874. fileprivate var code: Code
  875. private init(_ code: Code) {
  876. self.code = code
  877. }
  878. public var description: String {
  879. String(describing: self.code)
  880. }
  881. /// The pool is shutdown or shutting down.
  882. public static var shutdown: Self { Self(.shutdown) }
  883. /// There are too many waiters in the pool.
  884. public static var tooManyWaiters: Self { Self(.tooManyWaiters) }
  885. /// The deadline for creating a stream has passed.
  886. public static var deadlineExceeded: Self { Self(.deadlineExceeded) }
  887. }
  888. /// The error code.
  889. public var code: Code
  890. /// An underlying error which caused this error to be thrown.
  891. public var underlyingError: Error?
  892. public var description: String {
  893. if let underlyingError = self.underlyingError {
  894. return "\(self.code) (\(underlyingError))"
  895. } else {
  896. return String(describing: self.code)
  897. }
  898. }
  899. /// Create a new connection pool error with the given code and underlying error.
  900. ///
  901. /// - Parameters:
  902. /// - code: The error code.
  903. /// - underlyingError: The underlying error which led to this error being thrown.
  904. public init(code: Code, underlyingError: Error? = nil) {
  905. self.code = code
  906. self.underlyingError = underlyingError
  907. }
  908. }
  909. extension GRPCConnectionPoolError {
  910. @usableFromInline
  911. static let shutdown = Self(code: .shutdown)
  912. @inlinable
  913. static func tooManyWaiters(connectionError: Error?) -> Self {
  914. Self(code: .tooManyWaiters, underlyingError: connectionError)
  915. }
  916. @inlinable
  917. static func deadlineExceeded(connectionError: Error?) -> Self {
  918. Self(code: .deadlineExceeded, underlyingError: connectionError)
  919. }
  920. }
  921. extension GRPCConnectionPoolError: GRPCStatusTransformable {
  922. public func makeGRPCStatus() -> GRPCStatus {
  923. switch self.code.code {
  924. case .shutdown:
  925. return GRPCStatus(
  926. code: .unavailable,
  927. message: "The connection pool is shutdown",
  928. cause: self.underlyingError
  929. )
  930. case .tooManyWaiters:
  931. return GRPCStatus(
  932. code: .resourceExhausted,
  933. message: "The connection pool has no capacity for new RPCs or RPC waiters",
  934. cause: self.underlyingError
  935. )
  936. case .deadlineExceeded:
  937. return GRPCStatus(
  938. code: .deadlineExceeded,
  939. message: "Timed out waiting for an HTTP/2 stream from the connection pool",
  940. cause: self.underlyingError
  941. )
  942. }
  943. }
  944. }
  945. extension Sequence {
  946. fileprivate func count(where predicate: (Element) -> Bool) -> Int {
  947. return self.reduce(0) { count, element in
  948. predicate(element) ? count + 1 : count
  949. }
  950. }
  951. }