ConnectionPool.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOHTTP2
  20. @usableFromInline
  21. internal final class ConnectionPool {
  22. /// The event loop all connections in this pool are running on.
  23. @usableFromInline
  24. internal let eventLoop: EventLoop
  25. @usableFromInline
  26. internal enum State {
  27. case active
  28. case shuttingDown(EventLoopFuture<Void>)
  29. case shutdown
  30. }
  31. /// The state of the connection pool.
  32. @usableFromInline
  33. internal var _state: State = .active
  34. /// The most recent connection error we have observed.
  35. ///
  36. /// This error is used to provide additional context to failed waiters. A waiter may, for example,
  37. /// timeout because the pool is busy, or because no connection can be established because of an
  38. /// underlying connection error. In the latter case it's useful for the caller to know why the
  39. /// connection is failing at the RPC layer.
  40. ///
  41. /// This value is cleared when a connection becomes 'available'. That is, when we receive an
  42. /// http/2 SETTINGS frame.
  43. ///
  44. /// This value is set whenever an underlying connection transitions to the transient failure state
  45. /// or to the idle state and has an associated error.
  46. @usableFromInline
  47. internal var _mostRecentError: Error? = nil
  48. /// Connection managers and their stream availability state keyed by the ID of the connection
  49. /// manager.
  50. ///
  51. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  52. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  53. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  54. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  55. /// be a better choice given the number of connections is likely to be very low in practice).
  56. @usableFromInline
  57. internal var _connections: [ConnectionManagerID: PerConnectionState]
  58. /// The threshold which if exceeded when creating a stream determines whether the pool will
  59. /// start connecting an idle connection (if one exists).
  60. ///
  61. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  62. /// and the number of reserved streams) and the total number of streams which non-idle connections
  63. /// could support (this includes the streams that a connection in the connecting state could
  64. /// support).
  65. @usableFromInline
  66. internal let reservationLoadThreshold: Double
  67. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  68. /// assume a connection will support this many streams until we know better.
  69. @usableFromInline
  70. internal let assumedMaxConcurrentStreams: Int
  71. /// A queue of waiters which may or may not get a stream in the future.
  72. @usableFromInline
  73. internal var waiters: CircularBuffer<Waiter>
  74. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  75. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  76. @usableFromInline
  77. internal let maxWaiters: Int
  78. /// Configuration for backoff between subsequence connection attempts.
  79. @usableFromInline
  80. internal let connectionBackoff: ConnectionBackoff
  81. /// Provides a channel factory to the `ConnectionManager`.
  82. @usableFromInline
  83. internal let channelProvider: ConnectionManagerChannelProvider
  84. /// The object to notify about changes to stream reservations; in practice this is usually
  85. /// the `PoolManager`.
  86. @usableFromInline
  87. internal let streamLender: StreamLender
  88. @usableFromInline
  89. internal var delegate: GRPCConnectionPoolDelegate?
  90. /// A logger which always sets "GRPC" as its source.
  91. @usableFromInline
  92. internal let logger: GRPCLogger
  93. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  94. @usableFromInline
  95. internal let now: () -> NIODeadline
  96. /// Logging metadata keys.
  97. @usableFromInline
  98. internal enum Metadata {
  99. /// The ID of this pool.
  100. @usableFromInline
  101. static let id = "pool.id"
  102. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  103. @usableFromInline
  104. static let reservationsCount = "pool.reservations.count"
  105. /// The number of streams this pool can support with non-idle connections at this time.
  106. @usableFromInline
  107. static let reservationsCapacity = "pool.reservations.capacity"
  108. /// The current reservation load (i.e. reservation count / reservation capacity)
  109. @usableFromInline
  110. static let reservationsLoad = "pool.reservations.load"
  111. /// The reservation load threshold, above which a new connection will be created (if possible).
  112. @usableFromInline
  113. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  114. /// The current number of waiters in the pool.
  115. @usableFromInline
  116. static let waitersCount = "pool.waiters.count"
  117. /// The maximum number of waiters the pool is configured to allow.
  118. @usableFromInline
  119. static let waitersMax = "pool.waiters.max"
  120. /// The number of waiters which were successfully serviced.
  121. @usableFromInline
  122. static let waitersServiced = "pool.waiters.serviced"
  123. /// The ID of waiter.
  124. @usableFromInline
  125. static let waiterID = "pool.waiter.id"
  126. }
  127. @usableFromInline
  128. init(
  129. eventLoop: EventLoop,
  130. maxWaiters: Int,
  131. reservationLoadThreshold: Double,
  132. assumedMaxConcurrentStreams: Int,
  133. connectionBackoff: ConnectionBackoff,
  134. channelProvider: ConnectionManagerChannelProvider,
  135. streamLender: StreamLender,
  136. delegate: GRPCConnectionPoolDelegate?,
  137. logger: GRPCLogger,
  138. now: @escaping () -> NIODeadline = NIODeadline.now
  139. ) {
  140. precondition(
  141. (0.0 ... 1.0).contains(reservationLoadThreshold),
  142. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  143. )
  144. self.reservationLoadThreshold = reservationLoadThreshold
  145. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  146. self._connections = [:]
  147. self.maxWaiters = maxWaiters
  148. self.waiters = CircularBuffer(initialCapacity: 16)
  149. self.eventLoop = eventLoop
  150. self.connectionBackoff = connectionBackoff
  151. self.channelProvider = channelProvider
  152. self.streamLender = streamLender
  153. self.delegate = delegate
  154. self.logger = logger
  155. self.now = now
  156. }
  157. /// Initialize the connection pool.
  158. ///
  159. /// - Parameter connections: The number of connections to add to the pool.
  160. internal func initialize(connections: Int) {
  161. assert(self._connections.isEmpty)
  162. self._connections.reserveCapacity(connections)
  163. while self._connections.count < connections {
  164. self.addConnectionToPool()
  165. }
  166. }
  167. /// Make and add a new connection to the pool.
  168. private func addConnectionToPool() {
  169. let manager = ConnectionManager(
  170. eventLoop: self.eventLoop,
  171. channelProvider: self.channelProvider,
  172. callStartBehavior: .waitsForConnectivity,
  173. connectionBackoff: self.connectionBackoff,
  174. connectivityDelegate: self,
  175. http2Delegate: self,
  176. logger: self.logger.unwrapped
  177. )
  178. let id = manager.id
  179. self._connections[id] = PerConnectionState(manager: manager)
  180. self.delegate?.connectionAdded(id: .init(id))
  181. }
  182. // MARK: - Called from the pool manager
  183. /// Make and initialize an HTTP/2 stream `Channel`.
  184. ///
  185. /// - Parameters:
  186. /// - deadline: The point in time by which the `promise` must have been resolved.
  187. /// - promise: A promise for a `Channel`.
  188. /// - logger: A request logger.
  189. /// - initializer: A closure to initialize the `Channel` with.
  190. @inlinable
  191. internal func makeStream(
  192. deadline: NIODeadline,
  193. promise: EventLoopPromise<Channel>,
  194. logger: GRPCLogger,
  195. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  196. ) {
  197. if self.eventLoop.inEventLoop {
  198. self._makeStream(
  199. deadline: deadline,
  200. promise: promise,
  201. logger: logger,
  202. initializer: initializer
  203. )
  204. } else {
  205. self.eventLoop.execute {
  206. self._makeStream(
  207. deadline: deadline,
  208. promise: promise,
  209. logger: logger,
  210. initializer: initializer
  211. )
  212. }
  213. }
  214. }
  215. /// See `makeStream(deadline:promise:logger:initializer:)`.
  216. @inlinable
  217. internal func makeStream(
  218. deadline: NIODeadline,
  219. logger: GRPCLogger,
  220. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  221. ) -> EventLoopFuture<Channel> {
  222. let promise = self.eventLoop.makePromise(of: Channel.self)
  223. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  224. return promise.futureResult
  225. }
  226. /// Shutdown the connection pool.
  227. ///
  228. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  229. /// calls to `makeStream` will be failed immediately.
  230. ///
  231. /// - Parameter mode: The mode to use when shutting down.
  232. /// - Returns: A future indicated when shutdown has been completed.
  233. internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
  234. let promise = self.eventLoop.makePromise(of: Void.self)
  235. if self.eventLoop.inEventLoop {
  236. self._shutdown(mode: mode, promise: promise)
  237. } else {
  238. self.eventLoop.execute {
  239. self._shutdown(mode: mode, promise: promise)
  240. }
  241. }
  242. return promise.futureResult
  243. }
  244. /// See `makeStream(deadline:promise:logger:initializer:)`.
  245. ///
  246. /// - Important: Must be called on the pool's `EventLoop`.
  247. @inlinable
  248. internal func _makeStream(
  249. deadline: NIODeadline,
  250. promise: EventLoopPromise<Channel>,
  251. logger: GRPCLogger,
  252. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  253. ) {
  254. self.eventLoop.assertInEventLoop()
  255. guard case .active = self._state else {
  256. // Fail the promise right away if we're shutting down or already shut down.
  257. promise.fail(GRPCConnectionPoolError.shutdown)
  258. return
  259. }
  260. // Try to make a stream on an existing connection.
  261. let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
  262. if !streamCreated {
  263. // No stream was created, wait for one.
  264. self._enqueueWaiter(
  265. deadline: deadline,
  266. promise: promise,
  267. logger: logger,
  268. initializer: initializer
  269. )
  270. }
  271. }
  272. /// Try to find an existing connection on which we can make a stream.
  273. ///
  274. /// - Parameters:
  275. /// - promise: A promise to succeed if we can make a stream.
  276. /// - initializer: A closure to initialize the stream with.
  277. /// - Returns: A boolean value indicating whether the stream was created or not.
  278. @inlinable
  279. internal func _tryMakeStream(
  280. promise: EventLoopPromise<Channel>,
  281. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  282. ) -> Bool {
  283. // We shouldn't jump the queue.
  284. guard self.waiters.isEmpty else {
  285. return false
  286. }
  287. // Reserve a stream, if we can.
  288. guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
  289. return false
  290. }
  291. multiplexer.createStreamChannel(promise: promise, initializer)
  292. // Has reserving another stream tipped us over the limit for needing another connection?
  293. if self._shouldBringUpAnotherConnection() {
  294. self._startConnectingIdleConnection()
  295. }
  296. return true
  297. }
  298. /// Enqueue a waiter to be provided with a stream at some point in the future.
  299. ///
  300. /// - Parameters:
  301. /// - deadline: The point in time by which the promise should have been completed.
  302. /// - promise: The promise to complete with the `Channel`.
  303. /// - logger: A logger.
  304. /// - initializer: A closure to initialize the `Channel` with.
  305. @inlinable
  306. internal func _enqueueWaiter(
  307. deadline: NIODeadline,
  308. promise: EventLoopPromise<Channel>,
  309. logger: GRPCLogger,
  310. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  311. ) {
  312. // Don't overwhelm the pool with too many waiters.
  313. guard self.waiters.count < self.maxWaiters else {
  314. logger.trace(
  315. "connection pool has too many waiters",
  316. metadata: [
  317. Metadata.waitersMax: "\(self.maxWaiters)"
  318. ]
  319. )
  320. promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
  321. return
  322. }
  323. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  324. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  325. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  326. // earliest (even if the deadline has already passed).
  327. waiter.scheduleTimeout(on: self.eventLoop) {
  328. waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
  329. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  330. self.waiters.remove(at: index)
  331. logger.trace(
  332. "timed out waiting for a connection",
  333. metadata: [
  334. Metadata.waiterID: "\(waiter.id)",
  335. Metadata.waitersCount: "\(self.waiters.count)",
  336. ]
  337. )
  338. }
  339. }
  340. // request logger
  341. logger.debug(
  342. "waiting for a connection to become available",
  343. metadata: [
  344. Metadata.waiterID: "\(waiter.id)",
  345. Metadata.waitersCount: "\(self.waiters.count)",
  346. ]
  347. )
  348. self.waiters.append(waiter)
  349. // pool logger
  350. self.logger.trace(
  351. "enqueued connection waiter",
  352. metadata: [
  353. Metadata.waitersCount: "\(self.waiters.count)"
  354. ]
  355. )
  356. if self._shouldBringUpAnotherConnection() {
  357. self._startConnectingIdleConnection()
  358. }
  359. }
  360. /// Compute the current demand and capacity for streams.
  361. ///
  362. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  363. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  364. /// connections.
  365. ///
  366. /// - Returns: A tuple of the demand and capacity for streams.
  367. @usableFromInline
  368. internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  369. let demand = self.sync.reservedStreams + self.sync.waiters
  370. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  371. let capacity = self._connections.values.reduce(0) { sum, state in
  372. if state.manager.sync.isIdle || state.isQuiescing {
  373. // Idle connection or quiescing (so the capacity should be ignored).
  374. return sum
  375. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  376. // A known value of max concurrent streams, i.e. the connection is active.
  377. return sum + knownMaxAvailableStreams
  378. } else {
  379. // Not idle and no known value, the connection must be connecting so use our assumed value.
  380. return sum + self.assumedMaxConcurrentStreams
  381. }
  382. }
  383. return (demand, capacity)
  384. }
  385. /// Returns whether the pool should start connecting an idle connection (if one exists).
  386. @usableFromInline
  387. internal func _shouldBringUpAnotherConnection() -> Bool {
  388. let (demand, capacity) = self._computeStreamDemandAndCapacity()
  389. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  390. // will always be greater than the threshold and a new connection will be spun up.
  391. let load = Double(demand) / Double(capacity)
  392. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  393. if loadExceedsThreshold {
  394. self.logger.debug(
  395. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  396. metadata: [
  397. Metadata.reservationsCount: "\(demand)",
  398. Metadata.reservationsCapacity: "\(capacity)",
  399. Metadata.reservationsLoad: "\(load)",
  400. Metadata.reservationsLoadThreshold: "\(self.reservationLoadThreshold)",
  401. ]
  402. )
  403. }
  404. return loadExceedsThreshold
  405. }
  406. /// Starts connecting an idle connection, if one exists.
  407. @usableFromInline
  408. internal func _startConnectingIdleConnection() {
  409. if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  410. self._connections.values[index].manager.sync.startConnecting()
  411. }
  412. }
  413. /// Returns the index in `self.connections.values` of the connection with the most available
  414. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  415. /// available.
  416. ///
  417. /// - Note: this is linear in the number of connections.
  418. @usableFromInline
  419. internal func _mostAvailableConnectionIndex()
  420. -> Dictionary<ConnectionManagerID, PerConnectionState>.Index
  421. {
  422. var index = self._connections.values.startIndex
  423. var selectedIndex = self._connections.values.endIndex
  424. var mostAvailableStreams = 0
  425. while index != self._connections.values.endIndex {
  426. let availableStreams = self._connections.values[index].availableStreams
  427. if availableStreams > mostAvailableStreams {
  428. mostAvailableStreams = availableStreams
  429. selectedIndex = index
  430. }
  431. self._connections.values.formIndex(after: &index)
  432. }
  433. return selectedIndex
  434. }
  435. /// Reserves a stream from the connection with the most available streams, if one exists.
  436. ///
  437. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  438. /// or `nil` if no stream could be reserved.
  439. @usableFromInline
  440. internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  441. let index = self._mostAvailableConnectionIndex()
  442. if index != self._connections.endIndex {
  443. // '!' is okay here; the most available connection must have at least one stream available
  444. // to reserve.
  445. return self._connections.values[index].reserveStream()!
  446. } else {
  447. return nil
  448. }
  449. }
  450. /// See `shutdown(mode:)`.
  451. ///
  452. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  453. @usableFromInline
  454. internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  455. self.eventLoop.assertInEventLoop()
  456. switch self._state {
  457. case .active:
  458. self.logger.debug("shutting down connection pool")
  459. // We're shutting down now and when that's done we'll be fully shutdown.
  460. self._state = .shuttingDown(promise.futureResult)
  461. promise.futureResult.whenComplete { _ in
  462. self._state = .shutdown
  463. self.delegate = nil
  464. self.logger.trace("finished shutting down connection pool")
  465. }
  466. // Shutdown all the connections and remove them from the pool.
  467. let connections = self._connections
  468. self._connections.removeAll()
  469. let allShutdown: [EventLoopFuture<Void>] = connections.values.map {
  470. let id = $0.manager.id
  471. let manager = $0.manager
  472. return manager.eventLoop.flatSubmit {
  473. // If the connection was idle/shutdown before calling shutdown then we shouldn't tell
  474. // the delegate the connection closed (because it either never connected or was already
  475. // informed about this).
  476. let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown
  477. return manager.shutdown(mode: mode).always { _ in
  478. if !connectionIsInactive {
  479. self.delegate?.connectionClosed(id: .init(id), error: nil)
  480. }
  481. self.delegate?.connectionRemoved(id: .init(id))
  482. }
  483. }
  484. }
  485. // Fail the outstanding waiters.
  486. while let waiter = self.waiters.popFirst() {
  487. waiter.fail(GRPCConnectionPoolError.shutdown)
  488. }
  489. // Cascade the result of the shutdown into the promise.
  490. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  491. case let .shuttingDown(future):
  492. // We're already shutting down, cascade the result.
  493. promise.completeWith(future)
  494. case .shutdown:
  495. // Already shutdown, fine.
  496. promise.succeed(())
  497. }
  498. }
  499. }
  500. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  501. // We're interested in a few different situations here:
  502. //
  503. // 1. The connection was usable ('ready') and is no longer usable (either it became idle or
  504. // encountered an error. If this happens we need to notify any connections of the change as
  505. // they may no longer be used for new RPCs.
  506. // 2. The connection was not usable but moved to a different unusable state. If this happens and
  507. // we know the cause of the state transition (i.e. the error) then we need to update our most
  508. // recent error with the error. This information is used when failing waiters to provide some
  509. // context as to why they may be failing.
  510. func connectionStateDidChange(
  511. _ manager: ConnectionManager,
  512. from oldState: _ConnectivityState,
  513. to newState: _ConnectivityState
  514. ) {
  515. switch (oldState, newState) {
  516. case let (.ready, .transientFailure(error)),
  517. let (.ready, .idle(.some(error))):
  518. self.updateMostRecentError(error)
  519. self.connectionUnavailable(manager.id)
  520. case (.ready, .idle(.none)),
  521. (.ready, .shutdown):
  522. self.connectionUnavailable(manager.id)
  523. case let (_, .transientFailure(error)),
  524. let (_, .idle(.some(error))):
  525. self.updateMostRecentError(error)
  526. default:
  527. ()
  528. }
  529. guard let delegate = self.delegate else { return }
  530. switch (oldState, newState) {
  531. case (.idle, .connecting),
  532. (.transientFailure, .connecting):
  533. delegate.startedConnecting(id: .init(manager.id))
  534. case (.connecting, .ready):
  535. // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'.
  536. ()
  537. case (.ready, .idle):
  538. delegate.connectionClosed(id: .init(manager.id), error: nil)
  539. case let (.ready, .transientFailure(error)):
  540. delegate.connectionClosed(id: .init(manager.id), error: error)
  541. case let (.connecting, .transientFailure(error)):
  542. delegate.connectFailed(id: .init(manager.id), error: error)
  543. default:
  544. ()
  545. }
  546. }
  547. func connectionIsQuiescing(_ manager: ConnectionManager) {
  548. self.eventLoop.assertInEventLoop()
  549. // Find the relevant connection.
  550. guard let index = self._connections.index(forKey: manager.id) else {
  551. return
  552. }
  553. // Drop the connectivity delegate, we're no longer interested in its events now.
  554. manager.sync.connectivityDelegate = nil
  555. // Started quiescing; update our state and notify the pool delegate.
  556. self._connections.values[index].isQuiescing = true
  557. self.delegate?.connectionQuiescing(id: .init(manager.id))
  558. // As the connection is quescing, we need to know when the current connection its managing has
  559. // closed. When that happens drop the H2 delegate and update the pool delegate.
  560. manager.onCurrentConnectionClose { hadActiveConnection in
  561. assert(hadActiveConnection)
  562. if let removed = self._connections.removeValue(forKey: manager.id) {
  563. removed.manager.sync.http2Delegate = nil
  564. self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil)
  565. self.delegate?.connectionRemoved(id: .init(removed.manager.id))
  566. }
  567. }
  568. // Grab the number of reserved streams (before invalidating the index by adding a connection).
  569. let reservedStreams = self._connections.values[index].reservedStreams
  570. // Replace the connection with a new idle one.
  571. self.addConnectionToPool()
  572. // Since we're removing this connection from the pool (and no new streams can be created on
  573. // the connection), the pool manager can ignore any streams reserved against this connection.
  574. // We do still care about the number of reserved streams for the connection though
  575. //
  576. // Note: we don't need to adjust the number of available streams as the effective number of
  577. // connections hasn't changed.
  578. self.streamLender.returnStreams(reservedStreams, to: self)
  579. }
  580. private func updateMostRecentError(_ error: Error) {
  581. self.eventLoop.assertInEventLoop()
  582. // Update the last known error if there is one. We will use it to provide some context to
  583. // waiters which may fail.
  584. self._mostRecentError = error
  585. }
  586. /// A connection has become unavailable.
  587. private func connectionUnavailable(_ id: ConnectionManagerID) {
  588. self.eventLoop.assertInEventLoop()
  589. // The connection is no longer available: any streams which haven't been closed will be counted
  590. // as a dropped reservation, we need to tell the pool manager about them.
  591. if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
  592. self.streamLender.returnStreams(droppedReservations, to: self)
  593. }
  594. }
  595. }
  596. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  597. internal func streamOpened(_ manager: ConnectionManager) {
  598. self.eventLoop.assertInEventLoop()
  599. if let utilization = self._connections[manager.id]?.openedStream(),
  600. let delegate = self.delegate
  601. {
  602. delegate.connectionUtilizationChanged(
  603. id: .init(manager.id),
  604. streamsUsed: utilization.used,
  605. streamCapacity: utilization.capacity
  606. )
  607. }
  608. }
  609. internal func streamClosed(_ manager: ConnectionManager) {
  610. self.eventLoop.assertInEventLoop()
  611. guard let index = self._connections.index(forKey: manager.id) else {
  612. return
  613. }
  614. // Return the stream the connection and to the pool manager.
  615. if let utilization = self._connections.values[index].returnStream(),
  616. let delegate = self.delegate
  617. {
  618. delegate.connectionUtilizationChanged(
  619. id: .init(manager.id),
  620. streamsUsed: utilization.used,
  621. streamCapacity: utilization.capacity
  622. )
  623. }
  624. // Return the stream to the pool manager if the connection is available and not quiescing. For
  625. // quiescing connections streams were returned when the connection started quiescing.
  626. if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing {
  627. self.streamLender.returnStreams(1, to: self)
  628. // A stream was returned: we may be able to service a waiter now.
  629. self.tryServiceWaiters()
  630. }
  631. }
  632. internal func receivedSettingsMaxConcurrentStreams(
  633. _ manager: ConnectionManager,
  634. maxConcurrentStreams: Int
  635. ) {
  636. self.eventLoop.assertInEventLoop()
  637. // Find the relevant connection.
  638. guard let index = self._connections.index(forKey: manager.id) else {
  639. return
  640. }
  641. // When the connection is quiescing, the pool manager is not interested in updates to the
  642. // connection, bail out early.
  643. if self._connections.values[index].isQuiescing {
  644. return
  645. }
  646. // If we received a SETTINGS update then a connection is okay: drop the last known error.
  647. self._mostRecentError = nil
  648. let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams)
  649. let delta: Int
  650. if let previousValue = previous {
  651. // There was a previous value of max concurrent streams, i.e. a change in value for an
  652. // existing connection.
  653. delta = maxConcurrentStreams - previousValue
  654. } else {
  655. // There was no previous value so this must be a new connection. We'll compare against our
  656. // assumed default.
  657. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  658. // Notify the delegate.
  659. self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams)
  660. }
  661. if delta != 0 {
  662. self.streamLender.changeStreamCapacity(by: delta, for: self)
  663. }
  664. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  665. self.tryServiceWaiters()
  666. }
  667. }
  668. extension ConnectionPool {
  669. // MARK: - Waiters
  670. /// Try to service as many waiters as possible.
  671. ///
  672. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  673. /// of waiters and `N` is the number of connections.
  674. private func tryServiceWaiters() {
  675. if self.waiters.isEmpty { return }
  676. self.logger.trace(
  677. "servicing waiters",
  678. metadata: [
  679. Metadata.waitersCount: "\(self.waiters.count)"
  680. ]
  681. )
  682. let now = self.now()
  683. var serviced = 0
  684. while !self.waiters.isEmpty {
  685. if self.waiters.first!.deadlineIsAfter(now) {
  686. if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
  687. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  688. // succeed the waiter.
  689. let waiter = self.waiters.removeFirst()
  690. serviced &+= 1
  691. waiter.succeed(with: multiplexer)
  692. } else {
  693. // There are waiters but no available connections, we're done.
  694. break
  695. }
  696. } else {
  697. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  698. // let its scheduled timeout fail the promise.
  699. self.waiters.removeFirst()
  700. }
  701. }
  702. self.logger.trace(
  703. "done servicing waiters",
  704. metadata: [
  705. Metadata.waitersCount: "\(self.waiters.count)",
  706. Metadata.waitersServiced: "\(serviced)",
  707. ]
  708. )
  709. }
  710. }
  711. extension ConnectionPool {
  712. /// Synchronous operations for the pool, mostly used by tests.
  713. internal struct Sync {
  714. private let pool: ConnectionPool
  715. fileprivate init(_ pool: ConnectionPool) {
  716. self.pool = pool
  717. }
  718. /// The number of outstanding connection waiters.
  719. internal var waiters: Int {
  720. self.pool.eventLoop.assertInEventLoop()
  721. return self.pool.waiters.count
  722. }
  723. /// The number of connection currently in the pool (in any state).
  724. internal var connections: Int {
  725. self.pool.eventLoop.assertInEventLoop()
  726. return self.pool._connections.count
  727. }
  728. /// The number of idle connections in the pool.
  729. internal var idleConnections: Int {
  730. self.pool.eventLoop.assertInEventLoop()
  731. return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  732. }
  733. /// The number of streams currently available to reserve across all connections in the pool.
  734. internal var availableStreams: Int {
  735. self.pool.eventLoop.assertInEventLoop()
  736. return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
  737. }
  738. /// The number of streams which have been reserved across all connections in the pool.
  739. internal var reservedStreams: Int {
  740. self.pool.eventLoop.assertInEventLoop()
  741. return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
  742. }
  743. }
  744. internal var sync: Sync {
  745. return Sync(self)
  746. }
  747. }
  748. /// An error thrown from the ``GRPCChannelPool``.
  749. public struct GRPCConnectionPoolError: Error, CustomStringConvertible {
  750. public struct Code: Hashable, Sendable, CustomStringConvertible {
  751. enum Code {
  752. case shutdown
  753. case tooManyWaiters
  754. case deadlineExceeded
  755. }
  756. fileprivate var code: Code
  757. private init(_ code: Code) {
  758. self.code = code
  759. }
  760. public var description: String {
  761. String(describing: self.code)
  762. }
  763. /// The pool is shutdown or shutting down.
  764. public static var shutdown: Self { Self(.shutdown) }
  765. /// There are too many waiters in the pool.
  766. public static var tooManyWaiters: Self { Self(.tooManyWaiters) }
  767. /// The deadline for creating a stream has passed.
  768. public static var deadlineExceeded: Self { Self(.deadlineExceeded) }
  769. }
  770. /// The error code.
  771. public var code: Code
  772. /// An underlying error which caused this error to be thrown.
  773. public var underlyingError: Error?
  774. public var description: String {
  775. if let underlyingError = self.underlyingError {
  776. return "\(self.code) (\(underlyingError))"
  777. } else {
  778. return String(describing: self.code)
  779. }
  780. }
  781. /// Create a new connection pool error with the given code and underlying error.
  782. ///
  783. /// - Parameters:
  784. /// - code: The error code.
  785. /// - underlyingError: The underlying error which led to this error being thrown.
  786. public init(code: Code, underlyingError: Error? = nil) {
  787. self.code = code
  788. self.underlyingError = underlyingError
  789. }
  790. }
  791. extension GRPCConnectionPoolError {
  792. @usableFromInline
  793. static let shutdown = Self(code: .shutdown)
  794. @inlinable
  795. static func tooManyWaiters(connectionError: Error?) -> Self {
  796. Self(code: .tooManyWaiters, underlyingError: connectionError)
  797. }
  798. @inlinable
  799. static func deadlineExceeded(connectionError: Error?) -> Self {
  800. Self(code: .deadlineExceeded, underlyingError: connectionError)
  801. }
  802. }
  803. extension GRPCConnectionPoolError: GRPCStatusTransformable {
  804. public func makeGRPCStatus() -> GRPCStatus {
  805. switch self.code.code {
  806. case .shutdown:
  807. return GRPCStatus(
  808. code: .unavailable,
  809. message: "The connection pool is shutdown",
  810. cause: self.underlyingError
  811. )
  812. case .tooManyWaiters:
  813. return GRPCStatus(
  814. code: .resourceExhausted,
  815. message: "The connection pool has no capacity for new RPCs or RPC waiters",
  816. cause: self.underlyingError
  817. )
  818. case .deadlineExceeded:
  819. return GRPCStatus(
  820. code: .deadlineExceeded,
  821. message: "Timed out waiting for an HTTP/2 stream from the connection pool",
  822. cause: self.underlyingError
  823. )
  824. }
  825. }
  826. }