ConnectionPool.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOConcurrencyHelpers
  19. import NIOHTTP2
  20. internal final class ConnectionPool {
  21. /// The event loop all connections in this pool are running on.
  22. internal let eventLoop: EventLoop
  23. private enum State {
  24. case active
  25. case shuttingDown(EventLoopFuture<Void>)
  26. case shutdown
  27. }
  28. /// The state of the connection pool.
  29. private var state: State = .active
  30. /// Connection managers and their stream availability state keyed by the ID of the connection
  31. /// manager.
  32. ///
  33. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  34. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  35. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  36. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  37. /// be a better choice given the number of connections is likely to be very low in practice).
  38. private var connections: [ConnectionManagerID: PerConnectionState]
  39. /// The threshold which if exceeded when creating a stream determines whether the pool will
  40. /// start connecting an idle connection (if one exists).
  41. ///
  42. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  43. /// and the number of reserved streams) and the total number of streams which non-idle connections
  44. /// could support (this includes the streams that a connection in the connecting state could
  45. /// support).
  46. private let reservationLoadThreshold: Double
  47. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  48. /// assume a connection will support this many streams until we know better.
  49. private let assumedMaxConcurrentStreams: Int
  50. /// A queue of waiters which may or may not get a stream in the future.
  51. private var waiters: CircularBuffer<Waiter>
  52. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  53. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  54. private let maxWaiters: Int
  55. /// Provides a channel factory to the `ConnectionManager`.
  56. private let channelProvider: ConnectionManagerChannelProvider
  57. /// The object to notify about changes to stream reservations; in practice this is usually
  58. /// the `PoolManager`.
  59. private let streamLender: StreamLender
  60. /// A logger which always sets "GRPC" as its source.
  61. private let logger: GRPCLogger
  62. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  63. private let now: () -> NIODeadline
  64. /// Logging metadata keys.
  65. private enum Metadata {
  66. /// The ID of this pool.
  67. static let id = "pool.id"
  68. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  69. static let reservationsCount = "pool.reservations.count"
  70. /// The number of streams this pool can support with non-idle connections at this time.
  71. static let reservationsCapacity = "pool.reservations.capacity"
  72. /// The current reservation load (i.e. reservation count / reservation capacity)
  73. static let reservationsLoad = "pool.reservations.load"
  74. /// The reservation load threshold, above which a new connection will be created (if possible).
  75. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  76. /// The current number of waiters in the pool.
  77. static let waitersCount = "pool.waiters.count"
  78. /// The maximum number of waiters the pool is configured to allow.
  79. static let waitersMax = "pool.waiters.max"
  80. /// The number of waiters which were successfully serviced.
  81. static let waitersServiced = "pool.waiters.serviced"
  82. /// The ID of waiter.
  83. static let waiterID = "pool.waiter.id"
  84. }
  85. init(
  86. eventLoop: EventLoop,
  87. maxWaiters: Int,
  88. reservationLoadThreshold: Double,
  89. assumedMaxConcurrentStreams: Int,
  90. channelProvider: ConnectionManagerChannelProvider,
  91. streamLender: StreamLender,
  92. logger: GRPCLogger,
  93. now: @escaping () -> NIODeadline = NIODeadline.now
  94. ) {
  95. precondition(
  96. (0.0 ... 1.0).contains(reservationLoadThreshold),
  97. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  98. )
  99. self.reservationLoadThreshold = reservationLoadThreshold
  100. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  101. self.connections = [:]
  102. self.maxWaiters = maxWaiters
  103. self.waiters = CircularBuffer(initialCapacity: 16)
  104. self.eventLoop = eventLoop
  105. self.channelProvider = channelProvider
  106. self.streamLender = streamLender
  107. self.logger = logger
  108. self.now = now
  109. }
  110. /// Initialize the connection pool.
  111. ///
  112. /// - Parameter connections: The number of connections to add to the pool.
  113. internal func initialize(connections: Int) {
  114. assert(self.connections.isEmpty)
  115. self.connections.reserveCapacity(connections)
  116. while self.connections.count < connections {
  117. self.addConnectionToPool()
  118. }
  119. }
  120. /// Make and add a new connection to the pool.
  121. private func addConnectionToPool() {
  122. let manager = ConnectionManager(
  123. eventLoop: self.eventLoop,
  124. channelProvider: self.channelProvider,
  125. callStartBehavior: .waitsForConnectivity,
  126. connectionBackoff: ConnectionBackoff(),
  127. connectivityDelegate: self,
  128. http2Delegate: self,
  129. logger: self.logger.unwrapped
  130. )
  131. self.connections[manager.id] = PerConnectionState(manager: manager)
  132. }
  133. // MARK: - Called from the pool manager
  134. /// Make and initialize an HTTP/2 stream `Channel`.
  135. ///
  136. /// - Parameters:
  137. /// - deadline: The point in time by which the `promise` must have been resolved.
  138. /// - promise: A promise for a `Channel`.
  139. /// - logger: A request logger.
  140. /// - initializer: A closure to initialize the `Channel` with.
  141. internal func makeStream(
  142. deadline: NIODeadline,
  143. promise: EventLoopPromise<Channel>,
  144. logger: GRPCLogger,
  145. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  146. ) {
  147. if self.eventLoop.inEventLoop {
  148. self._makeStream(
  149. deadline: deadline,
  150. promise: promise,
  151. logger: logger,
  152. initializer: initializer
  153. )
  154. } else {
  155. self.eventLoop.execute {
  156. self._makeStream(
  157. deadline: deadline,
  158. promise: promise,
  159. logger: logger,
  160. initializer: initializer
  161. )
  162. }
  163. }
  164. }
  165. /// See `makeStream(deadline:promise:logger:initializer:)`.
  166. internal func makeStream(
  167. deadline: NIODeadline,
  168. logger: GRPCLogger,
  169. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  170. ) -> EventLoopFuture<Channel> {
  171. let promise = self.eventLoop.makePromise(of: Channel.self)
  172. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  173. return promise.futureResult
  174. }
  175. /// Shutdown the connection pool.
  176. ///
  177. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  178. /// calls to `makeStream` will be failed immediately.
  179. internal func shutdown() -> EventLoopFuture<Void> {
  180. let promise = self.eventLoop.makePromise(of: Void.self)
  181. if self.eventLoop.inEventLoop {
  182. self._shutdown(promise: promise)
  183. } else {
  184. self.eventLoop.execute {
  185. self._shutdown(promise: promise)
  186. }
  187. }
  188. return promise.futureResult
  189. }
  190. /// See `makeStream(deadline:promise:logger:initializer:)`.
  191. ///
  192. /// - Important: Must be called on the pool's `EventLoop`.
  193. private func _makeStream(
  194. deadline: NIODeadline,
  195. promise: EventLoopPromise<Channel>,
  196. logger: GRPCLogger,
  197. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  198. ) {
  199. self.eventLoop.assertInEventLoop()
  200. guard case .active = self.state else {
  201. // Fail the promise right away if we're shutting down or already shut down.
  202. promise.fail(ConnectionPoolError.shutdown)
  203. return
  204. }
  205. // Try to make a stream on an existing connection.
  206. let streamCreated = self.tryMakeStream(promise: promise, initializer: initializer)
  207. if !streamCreated {
  208. // No stream was created, wait for one.
  209. self.enqueueWaiter(
  210. deadline: deadline,
  211. promise: promise,
  212. logger: logger,
  213. initializer: initializer
  214. )
  215. }
  216. }
  217. /// Try to find an existing connection on which we can make a stream.
  218. ///
  219. /// - Parameters:
  220. /// - promise: A promise to succeed if we can make a stream.
  221. /// - initializer: A closure to initialize the stream with.
  222. /// - Returns: A boolean value indicating whether the stream was created or not.
  223. private func tryMakeStream(
  224. promise: EventLoopPromise<Channel>,
  225. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  226. ) -> Bool {
  227. // We shouldn't jump the queue.
  228. guard self.waiters.isEmpty else {
  229. return false
  230. }
  231. // Reserve a stream, if we can.
  232. guard let multiplexer = self.reserveStreamFromMostAvailableConnection() else {
  233. return false
  234. }
  235. multiplexer.createStreamChannel(promise: promise, initializer)
  236. // Has reserving another stream tipped us over the limit for needing another connection?
  237. if self.shouldBringUpAnotherConnection() {
  238. self.startConnectingIdleConnection()
  239. }
  240. return true
  241. }
  242. /// Enqueue a waiter to be provided with a stream at some point in the future.
  243. ///
  244. /// - Parameters:
  245. /// - deadline: The point in time by which the promise should have been completed.
  246. /// - promise: The promise to complete with the `Channel`.
  247. /// - logger: A logger.
  248. /// - initializer: A closure to initialize the `Channel` with.
  249. private func enqueueWaiter(
  250. deadline: NIODeadline,
  251. promise: EventLoopPromise<Channel>,
  252. logger: GRPCLogger,
  253. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  254. ) {
  255. // Don't overwhelm the pool with too many waiters.
  256. guard self.waiters.count < self.maxWaiters else {
  257. logger.trace("connection pool has too many waiters", metadata: [
  258. Metadata.waitersMax: "\(self.maxWaiters)",
  259. ])
  260. promise.fail(ConnectionPoolError.tooManyWaiters)
  261. return
  262. }
  263. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  264. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  265. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  266. // earliest (even if the deadline has already passed).
  267. waiter.scheduleTimeout(on: self.eventLoop) {
  268. waiter.fail(ConnectionPoolError.deadlineExceeded)
  269. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  270. self.waiters.remove(at: index)
  271. logger.trace("timed out waiting for a connection", metadata: [
  272. Metadata.waiterID: "\(waiter.id)",
  273. Metadata.waitersCount: "\(self.waiters.count)",
  274. ])
  275. }
  276. }
  277. // request logger
  278. logger.debug("waiting for a connection to become available", metadata: [
  279. Metadata.waiterID: "\(waiter.id)",
  280. Metadata.waitersCount: "\(self.waiters.count)",
  281. ])
  282. self.waiters.append(waiter)
  283. // pool logger
  284. self.logger.trace("enqueued connection waiter", metadata: [
  285. Metadata.waitersCount: "\(self.waiters.count)",
  286. ])
  287. if self.shouldBringUpAnotherConnection() {
  288. self.startConnectingIdleConnection()
  289. }
  290. }
  291. /// Compute the current demand and capacity for streams.
  292. ///
  293. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  294. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  295. /// connections.
  296. ///
  297. /// - Returns: A tuple of the demand and capacity for streams.
  298. private func computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  299. let demand = self.sync.reservedStreams + self.sync.waiters
  300. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  301. let capacity = self.connections.values.reduce(0) { sum, state in
  302. if state.manager.sync.isIdle {
  303. // Idle connection, no capacity.
  304. return sum
  305. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  306. // A known value of max concurrent streams, i.e. the connection is active.
  307. return sum + knownMaxAvailableStreams
  308. } else {
  309. // Not idle and no known value, the connection must be connecting so use our assumed value.
  310. return sum + self.assumedMaxConcurrentStreams
  311. }
  312. }
  313. return (demand, capacity)
  314. }
  315. /// Returns whether the pool should start connecting an idle connection (if one exists).
  316. private func shouldBringUpAnotherConnection() -> Bool {
  317. let (demand, capacity) = self.computeStreamDemandAndCapacity()
  318. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  319. // will always be greater than the threshold and a new connection will be spun up.
  320. let load = Double(demand) / Double(capacity)
  321. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  322. if loadExceedsThreshold {
  323. self.logger.debug(
  324. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  325. metadata: [
  326. Metadata.reservationsCount: "\(demand)",
  327. Metadata.reservationsCapacity: "\(capacity)",
  328. Metadata.reservationsLoad: "\(load)",
  329. Metadata.reservationsLoadThreshold: "\(self.reservationLoadThreshold)",
  330. ]
  331. )
  332. }
  333. return loadExceedsThreshold
  334. }
  335. /// Starts connecting an idle connection, if one exists.
  336. private func startConnectingIdleConnection() {
  337. if let index = self.connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  338. self.connections.values[index].manager.sync.startConnecting()
  339. }
  340. }
  341. /// Returns the index in `self.connections.values` of the connection with the most available
  342. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  343. /// available.
  344. ///
  345. /// - Note: this is linear in the number of connections.
  346. private func mostAvailableConnectionIndex(
  347. ) -> Dictionary<ConnectionManagerID, PerConnectionState>.Index {
  348. var index = self.connections.values.startIndex
  349. var selectedIndex = self.connections.values.endIndex
  350. var mostAvailableStreams = 0
  351. while index != self.connections.values.endIndex {
  352. let availableStreams = self.connections.values[index].availableStreams
  353. if availableStreams > mostAvailableStreams {
  354. mostAvailableStreams = availableStreams
  355. selectedIndex = index
  356. }
  357. self.connections.values.formIndex(after: &index)
  358. }
  359. return selectedIndex
  360. }
  361. /// Reserves a stream from the connection with the most available streams, if one exists.
  362. ///
  363. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  364. /// or `nil` if no stream could be reserved.
  365. private func reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  366. let index = self.mostAvailableConnectionIndex()
  367. if index != self.connections.endIndex {
  368. // '!' is okay here; the most available connection must have at least one stream available
  369. // to reserve.
  370. return self.connections.values[index].reserveStream()!
  371. } else {
  372. return nil
  373. }
  374. }
  375. /// See `shutdown()`.
  376. ///
  377. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  378. private func _shutdown(promise: EventLoopPromise<Void>) {
  379. self.eventLoop.assertInEventLoop()
  380. switch self.state {
  381. case .active:
  382. self.logger.debug("shutting down connection pool")
  383. // We're shutting down now and when that's done we'll be fully shutdown.
  384. self.state = .shuttingDown(promise.futureResult)
  385. promise.futureResult.whenComplete { _ in
  386. self.state = .shutdown
  387. self.logger.trace("finished shutting down connection pool")
  388. }
  389. // Shutdown all the connections and remove them from the pool.
  390. let allShutdown: [EventLoopFuture<Void>] = self.connections.values.map {
  391. return $0.manager.shutdown()
  392. }
  393. self.connections.removeAll()
  394. // Fail the outstanding waiters.
  395. while let waiter = self.waiters.popFirst() {
  396. waiter.fail(ConnectionPoolError.shutdown)
  397. }
  398. // Cascade the result of the shutdown into the promise.
  399. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  400. case let .shuttingDown(future):
  401. // We're already shutting down, cascade the result.
  402. promise.completeWith(future)
  403. case .shutdown:
  404. // Already shutdown, fine.
  405. promise.succeed(())
  406. }
  407. }
  408. }
  409. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  410. func connectionStateDidChange(
  411. _ manager: ConnectionManager,
  412. from oldState: ConnectivityState,
  413. to newState: ConnectivityState
  414. ) {
  415. switch (oldState, newState) {
  416. case (.ready, .transientFailure),
  417. (.ready, .idle),
  418. (.ready, .shutdown):
  419. // The connection is no longer available.
  420. self.connectionUnavailable(manager.id)
  421. default:
  422. // We're only interested in connection drops.
  423. ()
  424. }
  425. }
  426. func connectionIsQuiescing(_ manager: ConnectionManager) {
  427. self.eventLoop.assertInEventLoop()
  428. guard let removed = self.connections.removeValue(forKey: manager.id) else {
  429. return
  430. }
  431. // Drop any delegates. We're no longer interested in these events.
  432. removed.manager.sync.connectivityDelegate = nil
  433. removed.manager.sync.http2Delegate = nil
  434. // Replace the connection with a new idle one.
  435. self.addConnectionToPool()
  436. // Since we're removing this connection from the pool, the pool manager can ignore any streams
  437. // reserved against this connection.
  438. //
  439. // Note: we don't need to adjust the number of available streams as the number of connections
  440. // hasn't changed.
  441. self.streamLender.returnStreams(removed.reservedStreams, to: self)
  442. }
  443. /// A connection has become unavailable.
  444. private func connectionUnavailable(_ id: ConnectionManagerID) {
  445. self.eventLoop.assertInEventLoop()
  446. // The connection is no longer available: any streams which haven't been closed will be counted
  447. // as a dropped reservation, we need to tell the pool manager about them.
  448. if let droppedReservations = self.connections[id]?.unavailable(), droppedReservations > 0 {
  449. self.streamLender.returnStreams(droppedReservations, to: self)
  450. }
  451. }
  452. }
  453. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  454. internal func streamClosed(_ manager: ConnectionManager) {
  455. self.eventLoop.assertInEventLoop()
  456. // Return the stream the connection and to the pool manager.
  457. self.connections[manager.id]?.returnStream()
  458. self.streamLender.returnStreams(1, to: self)
  459. // A stream was returned: we may be able to service a waiter now.
  460. self.tryServiceWaiters()
  461. }
  462. internal func receivedSettingsMaxConcurrentStreams(
  463. _ manager: ConnectionManager,
  464. maxConcurrentStreams: Int
  465. ) {
  466. self.eventLoop.assertInEventLoop()
  467. let previous = self.connections[manager.id]?.updateMaxConcurrentStreams(maxConcurrentStreams)
  468. let delta: Int
  469. if let previousValue = previous {
  470. // There was a previous value of max concurrent streams, i.e. a change in value for an
  471. // existing connection.
  472. delta = maxConcurrentStreams - previousValue
  473. } else {
  474. // There was no previous value so this must be a new connection. We'll compare against our
  475. // assumed default.
  476. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  477. }
  478. if delta != 0 {
  479. self.streamLender.changeStreamCapacity(by: delta, for: self)
  480. }
  481. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  482. self.tryServiceWaiters()
  483. }
  484. }
  485. extension ConnectionPool {
  486. // MARK: - Waiters
  487. /// Try to service as many waiters as possible.
  488. ///
  489. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  490. /// of waiters and `N` is the number of connections.
  491. private func tryServiceWaiters() {
  492. if self.waiters.isEmpty { return }
  493. self.logger.trace("servicing waiters", metadata: [
  494. Metadata.waitersCount: "\(self.waiters.count)",
  495. ])
  496. let now = self.now()
  497. var serviced = 0
  498. while !self.waiters.isEmpty {
  499. if self.waiters.first!.deadlineIsAfter(now) {
  500. if let multiplexer = self.reserveStreamFromMostAvailableConnection() {
  501. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  502. // succeed the waiter.
  503. let waiter = self.waiters.removeFirst()
  504. serviced &+= 1
  505. waiter.succeed(with: multiplexer)
  506. } else {
  507. // There are waiters but no available connections, we're done.
  508. break
  509. }
  510. } else {
  511. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  512. // let its scheduled timeout fail the promise.
  513. self.waiters.removeFirst()
  514. }
  515. }
  516. self.logger.trace("done servicing waiters", metadata: [
  517. Metadata.waitersCount: "\(self.waiters.count)",
  518. Metadata.waitersServiced: "\(serviced)",
  519. ])
  520. }
  521. }
  522. extension ConnectionPool {
  523. /// Synchronous operations for the pool, mostly used by tests.
  524. internal struct Sync {
  525. private let pool: ConnectionPool
  526. fileprivate init(_ pool: ConnectionPool) {
  527. self.pool = pool
  528. }
  529. /// The number of outstanding connection waiters.
  530. internal var waiters: Int {
  531. self.pool.eventLoop.assertInEventLoop()
  532. return self.pool.waiters.count
  533. }
  534. /// The number of connection currently in the pool (in any state).
  535. internal var connections: Int {
  536. self.pool.eventLoop.assertInEventLoop()
  537. return self.pool.connections.count
  538. }
  539. /// The number of idle connections in the pool.
  540. internal var idleConnections: Int {
  541. self.pool.eventLoop.assertInEventLoop()
  542. return self.pool.connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  543. }
  544. /// The number of streams currently available to reserve across all connections in the pool.
  545. internal var availableStreams: Int {
  546. self.pool.eventLoop.assertInEventLoop()
  547. return self.pool.connections.values.reduce(0) { $0 + $1.availableStreams }
  548. }
  549. /// The number of streams which have been reserved across all connections in the pool.
  550. internal var reservedStreams: Int {
  551. self.pool.eventLoop.assertInEventLoop()
  552. return self.pool.connections.values.reduce(0) { $0 + $1.reservedStreams }
  553. }
  554. }
  555. internal var sync: Sync {
  556. return Sync(self)
  557. }
  558. }
  559. internal enum ConnectionPoolError: Error {
  560. /// The pool is shutdown or shutting down.
  561. case shutdown
  562. /// There are too many waiters in the pool.
  563. case tooManyWaiters
  564. /// The deadline for creating a stream has passed.
  565. case deadlineExceeded
  566. }