ConnectionPool.swift 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOHTTP2
  20. @usableFromInline
  21. internal final class ConnectionPool {
  22. /// The event loop all connections in this pool are running on.
  23. @usableFromInline
  24. internal let eventLoop: EventLoop
  25. @usableFromInline
  26. internal enum State {
  27. case active
  28. case shuttingDown(EventLoopFuture<Void>)
  29. case shutdown
  30. }
  31. /// The state of the connection pool.
  32. @usableFromInline
  33. internal var _state: State = .active
  34. /// Connection managers and their stream availability state keyed by the ID of the connection
  35. /// manager.
  36. ///
  37. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  38. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  39. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  40. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  41. /// be a better choice given the number of connections is likely to be very low in practice).
  42. @usableFromInline
  43. internal var _connections: [ConnectionManagerID: PerConnectionState]
  44. /// The threshold which if exceeded when creating a stream determines whether the pool will
  45. /// start connecting an idle connection (if one exists).
  46. ///
  47. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  48. /// and the number of reserved streams) and the total number of streams which non-idle connections
  49. /// could support (this includes the streams that a connection in the connecting state could
  50. /// support).
  51. @usableFromInline
  52. internal let reservationLoadThreshold: Double
  53. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  54. /// assume a connection will support this many streams until we know better.
  55. @usableFromInline
  56. internal let assumedMaxConcurrentStreams: Int
  57. /// A queue of waiters which may or may not get a stream in the future.
  58. @usableFromInline
  59. internal var waiters: CircularBuffer<Waiter>
  60. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  61. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  62. @usableFromInline
  63. internal let maxWaiters: Int
  64. /// Configuration for backoff between subsequence connection attempts.
  65. @usableFromInline
  66. internal let connectionBackoff: ConnectionBackoff
  67. /// Provides a channel factory to the `ConnectionManager`.
  68. @usableFromInline
  69. internal let channelProvider: ConnectionManagerChannelProvider
  70. /// The object to notify about changes to stream reservations; in practice this is usually
  71. /// the `PoolManager`.
  72. @usableFromInline
  73. internal let streamLender: StreamLender
  74. /// A logger which always sets "GRPC" as its source.
  75. @usableFromInline
  76. internal let logger: GRPCLogger
  77. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  78. @usableFromInline
  79. internal let now: () -> NIODeadline
  80. /// Logging metadata keys.
  81. @usableFromInline
  82. internal enum Metadata {
  83. /// The ID of this pool.
  84. @usableFromInline
  85. static let id = "pool.id"
  86. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  87. @usableFromInline
  88. static let reservationsCount = "pool.reservations.count"
  89. /// The number of streams this pool can support with non-idle connections at this time.
  90. @usableFromInline
  91. static let reservationsCapacity = "pool.reservations.capacity"
  92. /// The current reservation load (i.e. reservation count / reservation capacity)
  93. @usableFromInline
  94. static let reservationsLoad = "pool.reservations.load"
  95. /// The reservation load threshold, above which a new connection will be created (if possible).
  96. @usableFromInline
  97. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  98. /// The current number of waiters in the pool.
  99. @usableFromInline
  100. static let waitersCount = "pool.waiters.count"
  101. /// The maximum number of waiters the pool is configured to allow.
  102. @usableFromInline
  103. static let waitersMax = "pool.waiters.max"
  104. /// The number of waiters which were successfully serviced.
  105. @usableFromInline
  106. static let waitersServiced = "pool.waiters.serviced"
  107. /// The ID of waiter.
  108. @usableFromInline
  109. static let waiterID = "pool.waiter.id"
  110. }
  111. @usableFromInline
  112. init(
  113. eventLoop: EventLoop,
  114. maxWaiters: Int,
  115. reservationLoadThreshold: Double,
  116. assumedMaxConcurrentStreams: Int,
  117. connectionBackoff: ConnectionBackoff,
  118. channelProvider: ConnectionManagerChannelProvider,
  119. streamLender: StreamLender,
  120. logger: GRPCLogger,
  121. now: @escaping () -> NIODeadline = NIODeadline.now
  122. ) {
  123. precondition(
  124. (0.0 ... 1.0).contains(reservationLoadThreshold),
  125. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  126. )
  127. self.reservationLoadThreshold = reservationLoadThreshold
  128. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  129. self._connections = [:]
  130. self.maxWaiters = maxWaiters
  131. self.waiters = CircularBuffer(initialCapacity: 16)
  132. self.eventLoop = eventLoop
  133. self.connectionBackoff = connectionBackoff
  134. self.channelProvider = channelProvider
  135. self.streamLender = streamLender
  136. self.logger = logger
  137. self.now = now
  138. }
  139. /// Initialize the connection pool.
  140. ///
  141. /// - Parameter connections: The number of connections to add to the pool.
  142. internal func initialize(connections: Int) {
  143. assert(self._connections.isEmpty)
  144. self._connections.reserveCapacity(connections)
  145. while self._connections.count < connections {
  146. self.addConnectionToPool()
  147. }
  148. }
  149. /// Make and add a new connection to the pool.
  150. private func addConnectionToPool() {
  151. let manager = ConnectionManager(
  152. eventLoop: self.eventLoop,
  153. channelProvider: self.channelProvider,
  154. callStartBehavior: .waitsForConnectivity,
  155. connectionBackoff: self.connectionBackoff,
  156. connectivityDelegate: self,
  157. http2Delegate: self,
  158. logger: self.logger.unwrapped
  159. )
  160. self._connections[manager.id] = PerConnectionState(manager: manager)
  161. }
  162. // MARK: - Called from the pool manager
  163. /// Make and initialize an HTTP/2 stream `Channel`.
  164. ///
  165. /// - Parameters:
  166. /// - deadline: The point in time by which the `promise` must have been resolved.
  167. /// - promise: A promise for a `Channel`.
  168. /// - logger: A request logger.
  169. /// - initializer: A closure to initialize the `Channel` with.
  170. @inlinable
  171. internal func makeStream(
  172. deadline: NIODeadline,
  173. promise: EventLoopPromise<Channel>,
  174. logger: GRPCLogger,
  175. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  176. ) {
  177. if self.eventLoop.inEventLoop {
  178. self._makeStream(
  179. deadline: deadline,
  180. promise: promise,
  181. logger: logger,
  182. initializer: initializer
  183. )
  184. } else {
  185. self.eventLoop.execute {
  186. self._makeStream(
  187. deadline: deadline,
  188. promise: promise,
  189. logger: logger,
  190. initializer: initializer
  191. )
  192. }
  193. }
  194. }
  195. /// See `makeStream(deadline:promise:logger:initializer:)`.
  196. @inlinable
  197. internal func makeStream(
  198. deadline: NIODeadline,
  199. logger: GRPCLogger,
  200. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  201. ) -> EventLoopFuture<Channel> {
  202. let promise = self.eventLoop.makePromise(of: Channel.self)
  203. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  204. return promise.futureResult
  205. }
  206. /// Shutdown the connection pool.
  207. ///
  208. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  209. /// calls to `makeStream` will be failed immediately.
  210. internal func shutdown() -> EventLoopFuture<Void> {
  211. let promise = self.eventLoop.makePromise(of: Void.self)
  212. if self.eventLoop.inEventLoop {
  213. self._shutdown(promise: promise)
  214. } else {
  215. self.eventLoop.execute {
  216. self._shutdown(promise: promise)
  217. }
  218. }
  219. return promise.futureResult
  220. }
  221. /// See `makeStream(deadline:promise:logger:initializer:)`.
  222. ///
  223. /// - Important: Must be called on the pool's `EventLoop`.
  224. @inlinable
  225. internal func _makeStream(
  226. deadline: NIODeadline,
  227. promise: EventLoopPromise<Channel>,
  228. logger: GRPCLogger,
  229. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  230. ) {
  231. self.eventLoop.assertInEventLoop()
  232. guard case .active = self._state else {
  233. // Fail the promise right away if we're shutting down or already shut down.
  234. promise.fail(ConnectionPoolError.shutdown)
  235. return
  236. }
  237. // Try to make a stream on an existing connection.
  238. let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
  239. if !streamCreated {
  240. // No stream was created, wait for one.
  241. self._enqueueWaiter(
  242. deadline: deadline,
  243. promise: promise,
  244. logger: logger,
  245. initializer: initializer
  246. )
  247. }
  248. }
  249. /// Try to find an existing connection on which we can make a stream.
  250. ///
  251. /// - Parameters:
  252. /// - promise: A promise to succeed if we can make a stream.
  253. /// - initializer: A closure to initialize the stream with.
  254. /// - Returns: A boolean value indicating whether the stream was created or not.
  255. @inlinable
  256. internal func _tryMakeStream(
  257. promise: EventLoopPromise<Channel>,
  258. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  259. ) -> Bool {
  260. // We shouldn't jump the queue.
  261. guard self.waiters.isEmpty else {
  262. return false
  263. }
  264. // Reserve a stream, if we can.
  265. guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
  266. return false
  267. }
  268. multiplexer.createStreamChannel(promise: promise, initializer)
  269. // Has reserving another stream tipped us over the limit for needing another connection?
  270. if self._shouldBringUpAnotherConnection() {
  271. self._startConnectingIdleConnection()
  272. }
  273. return true
  274. }
  275. /// Enqueue a waiter to be provided with a stream at some point in the future.
  276. ///
  277. /// - Parameters:
  278. /// - deadline: The point in time by which the promise should have been completed.
  279. /// - promise: The promise to complete with the `Channel`.
  280. /// - logger: A logger.
  281. /// - initializer: A closure to initialize the `Channel` with.
  282. @inlinable
  283. internal func _enqueueWaiter(
  284. deadline: NIODeadline,
  285. promise: EventLoopPromise<Channel>,
  286. logger: GRPCLogger,
  287. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  288. ) {
  289. // Don't overwhelm the pool with too many waiters.
  290. guard self.waiters.count < self.maxWaiters else {
  291. logger.trace("connection pool has too many waiters", metadata: [
  292. Metadata.waitersMax: "\(self.maxWaiters)",
  293. ])
  294. promise.fail(ConnectionPoolError.tooManyWaiters)
  295. return
  296. }
  297. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  298. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  299. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  300. // earliest (even if the deadline has already passed).
  301. waiter.scheduleTimeout(on: self.eventLoop) {
  302. waiter.fail(ConnectionPoolError.deadlineExceeded)
  303. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  304. self.waiters.remove(at: index)
  305. logger.trace("timed out waiting for a connection", metadata: [
  306. Metadata.waiterID: "\(waiter.id)",
  307. Metadata.waitersCount: "\(self.waiters.count)",
  308. ])
  309. }
  310. }
  311. // request logger
  312. logger.debug("waiting for a connection to become available", metadata: [
  313. Metadata.waiterID: "\(waiter.id)",
  314. Metadata.waitersCount: "\(self.waiters.count)",
  315. ])
  316. self.waiters.append(waiter)
  317. // pool logger
  318. self.logger.trace("enqueued connection waiter", metadata: [
  319. Metadata.waitersCount: "\(self.waiters.count)",
  320. ])
  321. if self._shouldBringUpAnotherConnection() {
  322. self._startConnectingIdleConnection()
  323. }
  324. }
  325. /// Compute the current demand and capacity for streams.
  326. ///
  327. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  328. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  329. /// connections.
  330. ///
  331. /// - Returns: A tuple of the demand and capacity for streams.
  332. @usableFromInline
  333. internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  334. let demand = self.sync.reservedStreams + self.sync.waiters
  335. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  336. let capacity = self._connections.values.reduce(0) { sum, state in
  337. if state.manager.sync.isIdle {
  338. // Idle connection, no capacity.
  339. return sum
  340. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  341. // A known value of max concurrent streams, i.e. the connection is active.
  342. return sum + knownMaxAvailableStreams
  343. } else {
  344. // Not idle and no known value, the connection must be connecting so use our assumed value.
  345. return sum + self.assumedMaxConcurrentStreams
  346. }
  347. }
  348. return (demand, capacity)
  349. }
  350. /// Returns whether the pool should start connecting an idle connection (if one exists).
  351. @usableFromInline
  352. internal func _shouldBringUpAnotherConnection() -> Bool {
  353. let (demand, capacity) = self._computeStreamDemandAndCapacity()
  354. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  355. // will always be greater than the threshold and a new connection will be spun up.
  356. let load = Double(demand) / Double(capacity)
  357. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  358. if loadExceedsThreshold {
  359. self.logger.debug(
  360. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  361. metadata: [
  362. Metadata.reservationsCount: "\(demand)",
  363. Metadata.reservationsCapacity: "\(capacity)",
  364. Metadata.reservationsLoad: "\(load)",
  365. Metadata.reservationsLoadThreshold: "\(self.reservationLoadThreshold)",
  366. ]
  367. )
  368. }
  369. return loadExceedsThreshold
  370. }
  371. /// Starts connecting an idle connection, if one exists.
  372. @usableFromInline
  373. internal func _startConnectingIdleConnection() {
  374. if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  375. self._connections.values[index].manager.sync.startConnecting()
  376. }
  377. }
  378. /// Returns the index in `self.connections.values` of the connection with the most available
  379. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  380. /// available.
  381. ///
  382. /// - Note: this is linear in the number of connections.
  383. @usableFromInline
  384. internal func _mostAvailableConnectionIndex(
  385. ) -> Dictionary<ConnectionManagerID, PerConnectionState>.Index {
  386. var index = self._connections.values.startIndex
  387. var selectedIndex = self._connections.values.endIndex
  388. var mostAvailableStreams = 0
  389. while index != self._connections.values.endIndex {
  390. let availableStreams = self._connections.values[index].availableStreams
  391. if availableStreams > mostAvailableStreams {
  392. mostAvailableStreams = availableStreams
  393. selectedIndex = index
  394. }
  395. self._connections.values.formIndex(after: &index)
  396. }
  397. return selectedIndex
  398. }
  399. /// Reserves a stream from the connection with the most available streams, if one exists.
  400. ///
  401. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  402. /// or `nil` if no stream could be reserved.
  403. @usableFromInline
  404. internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  405. let index = self._mostAvailableConnectionIndex()
  406. if index != self._connections.endIndex {
  407. // '!' is okay here; the most available connection must have at least one stream available
  408. // to reserve.
  409. return self._connections.values[index].reserveStream()!
  410. } else {
  411. return nil
  412. }
  413. }
  414. /// See `shutdown()`.
  415. ///
  416. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  417. @usableFromInline
  418. internal func _shutdown(promise: EventLoopPromise<Void>) {
  419. self.eventLoop.assertInEventLoop()
  420. switch self._state {
  421. case .active:
  422. self.logger.debug("shutting down connection pool")
  423. // We're shutting down now and when that's done we'll be fully shutdown.
  424. self._state = .shuttingDown(promise.futureResult)
  425. promise.futureResult.whenComplete { _ in
  426. self._state = .shutdown
  427. self.logger.trace("finished shutting down connection pool")
  428. }
  429. // Shutdown all the connections and remove them from the pool.
  430. let allShutdown: [EventLoopFuture<Void>] = self._connections.values.map {
  431. return $0.manager.shutdown()
  432. }
  433. self._connections.removeAll()
  434. // Fail the outstanding waiters.
  435. while let waiter = self.waiters.popFirst() {
  436. waiter.fail(ConnectionPoolError.shutdown)
  437. }
  438. // Cascade the result of the shutdown into the promise.
  439. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  440. case let .shuttingDown(future):
  441. // We're already shutting down, cascade the result.
  442. promise.completeWith(future)
  443. case .shutdown:
  444. // Already shutdown, fine.
  445. promise.succeed(())
  446. }
  447. }
  448. }
  449. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  450. func connectionStateDidChange(
  451. _ manager: ConnectionManager,
  452. from oldState: ConnectivityState,
  453. to newState: ConnectivityState
  454. ) {
  455. switch (oldState, newState) {
  456. case (.ready, .transientFailure),
  457. (.ready, .idle),
  458. (.ready, .shutdown):
  459. // The connection is no longer available.
  460. self.connectionUnavailable(manager.id)
  461. default:
  462. // We're only interested in connection drops.
  463. ()
  464. }
  465. }
  466. func connectionIsQuiescing(_ manager: ConnectionManager) {
  467. self.eventLoop.assertInEventLoop()
  468. guard let removed = self._connections.removeValue(forKey: manager.id) else {
  469. return
  470. }
  471. // Drop any delegates. We're no longer interested in these events.
  472. removed.manager.sync.connectivityDelegate = nil
  473. removed.manager.sync.http2Delegate = nil
  474. // Replace the connection with a new idle one.
  475. self.addConnectionToPool()
  476. // Since we're removing this connection from the pool, the pool manager can ignore any streams
  477. // reserved against this connection.
  478. //
  479. // Note: we don't need to adjust the number of available streams as the number of connections
  480. // hasn't changed.
  481. self.streamLender.returnStreams(removed.reservedStreams, to: self)
  482. }
  483. /// A connection has become unavailable.
  484. private func connectionUnavailable(_ id: ConnectionManagerID) {
  485. self.eventLoop.assertInEventLoop()
  486. // The connection is no longer available: any streams which haven't been closed will be counted
  487. // as a dropped reservation, we need to tell the pool manager about them.
  488. if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
  489. self.streamLender.returnStreams(droppedReservations, to: self)
  490. }
  491. }
  492. }
  493. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  494. internal func streamClosed(_ manager: ConnectionManager) {
  495. self.eventLoop.assertInEventLoop()
  496. // Return the stream the connection and to the pool manager.
  497. self._connections[manager.id]?.returnStream()
  498. self.streamLender.returnStreams(1, to: self)
  499. // A stream was returned: we may be able to service a waiter now.
  500. self.tryServiceWaiters()
  501. }
  502. internal func receivedSettingsMaxConcurrentStreams(
  503. _ manager: ConnectionManager,
  504. maxConcurrentStreams: Int
  505. ) {
  506. self.eventLoop.assertInEventLoop()
  507. let previous = self._connections[manager.id]?.updateMaxConcurrentStreams(maxConcurrentStreams)
  508. let delta: Int
  509. if let previousValue = previous {
  510. // There was a previous value of max concurrent streams, i.e. a change in value for an
  511. // existing connection.
  512. delta = maxConcurrentStreams - previousValue
  513. } else {
  514. // There was no previous value so this must be a new connection. We'll compare against our
  515. // assumed default.
  516. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  517. }
  518. if delta != 0 {
  519. self.streamLender.changeStreamCapacity(by: delta, for: self)
  520. }
  521. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  522. self.tryServiceWaiters()
  523. }
  524. }
  525. extension ConnectionPool {
  526. // MARK: - Waiters
  527. /// Try to service as many waiters as possible.
  528. ///
  529. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  530. /// of waiters and `N` is the number of connections.
  531. private func tryServiceWaiters() {
  532. if self.waiters.isEmpty { return }
  533. self.logger.trace("servicing waiters", metadata: [
  534. Metadata.waitersCount: "\(self.waiters.count)",
  535. ])
  536. let now = self.now()
  537. var serviced = 0
  538. while !self.waiters.isEmpty {
  539. if self.waiters.first!.deadlineIsAfter(now) {
  540. if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
  541. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  542. // succeed the waiter.
  543. let waiter = self.waiters.removeFirst()
  544. serviced &+= 1
  545. waiter.succeed(with: multiplexer)
  546. } else {
  547. // There are waiters but no available connections, we're done.
  548. break
  549. }
  550. } else {
  551. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  552. // let its scheduled timeout fail the promise.
  553. self.waiters.removeFirst()
  554. }
  555. }
  556. self.logger.trace("done servicing waiters", metadata: [
  557. Metadata.waitersCount: "\(self.waiters.count)",
  558. Metadata.waitersServiced: "\(serviced)",
  559. ])
  560. }
  561. }
  562. extension ConnectionPool {
  563. /// Synchronous operations for the pool, mostly used by tests.
  564. internal struct Sync {
  565. private let pool: ConnectionPool
  566. fileprivate init(_ pool: ConnectionPool) {
  567. self.pool = pool
  568. }
  569. /// The number of outstanding connection waiters.
  570. internal var waiters: Int {
  571. self.pool.eventLoop.assertInEventLoop()
  572. return self.pool.waiters.count
  573. }
  574. /// The number of connection currently in the pool (in any state).
  575. internal var connections: Int {
  576. self.pool.eventLoop.assertInEventLoop()
  577. return self.pool._connections.count
  578. }
  579. /// The number of idle connections in the pool.
  580. internal var idleConnections: Int {
  581. self.pool.eventLoop.assertInEventLoop()
  582. return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  583. }
  584. /// The number of streams currently available to reserve across all connections in the pool.
  585. internal var availableStreams: Int {
  586. self.pool.eventLoop.assertInEventLoop()
  587. return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
  588. }
  589. /// The number of streams which have been reserved across all connections in the pool.
  590. internal var reservedStreams: Int {
  591. self.pool.eventLoop.assertInEventLoop()
  592. return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
  593. }
  594. }
  595. internal var sync: Sync {
  596. return Sync(self)
  597. }
  598. }
  599. @usableFromInline
  600. internal enum ConnectionPoolError: Error {
  601. /// The pool is shutdown or shutting down.
  602. case shutdown
  603. /// There are too many waiters in the pool.
  604. case tooManyWaiters
  605. /// The deadline for creating a stream has passed.
  606. case deadlineExceeded
  607. }
  608. extension ConnectionPoolError: GRPCStatusTransformable {
  609. @usableFromInline
  610. internal func makeGRPCStatus() -> GRPCStatus {
  611. switch self {
  612. case .shutdown:
  613. return GRPCStatus(
  614. code: .unavailable,
  615. message: "The connection pool is shutdown"
  616. )
  617. case .tooManyWaiters:
  618. return GRPCStatus(
  619. code: .resourceExhausted,
  620. message: "The connection pool has no capacity for new RPCs or RPC waiters"
  621. )
  622. case .deadlineExceeded:
  623. return GRPCStatus(
  624. code: .deadlineExceeded,
  625. message: "Timed out waiting for an HTTP/2 stream from the connection pool"
  626. )
  627. }
  628. }
  629. }