ConnectionPool.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOHTTP2
  20. @usableFromInline
  21. internal final class ConnectionPool {
  22. /// The event loop all connections in this pool are running on.
  23. @usableFromInline
  24. internal let eventLoop: EventLoop
  25. @usableFromInline
  26. internal enum State {
  27. case active
  28. case shuttingDown(EventLoopFuture<Void>)
  29. case shutdown
  30. }
  31. /// The state of the connection pool.
  32. @usableFromInline
  33. internal var _state: State = .active
  34. /// The most recent connection error we have observed.
  35. ///
  36. /// This error is used to provide additional context to failed waiters. A waiter may, for example,
  37. /// timeout because the pool is busy, or because no connection can be established because of an
  38. /// underlying connection error. In the latter case it's useful for the caller to know why the
  39. /// connection is failing at the RPC layer.
  40. ///
  41. /// This value is cleared when a connection becomes 'available'. That is, when we receive an
  42. /// http/2 SETTINGS frame.
  43. ///
  44. /// This value is set whenever an underlying connection transitions to the transient failure state
  45. /// or to the idle state and has an associated error.
  46. @usableFromInline
  47. internal var _mostRecentError: Error? = nil
  48. /// Connection managers and their stream availability state keyed by the ID of the connection
  49. /// manager.
  50. ///
  51. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  52. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  53. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  54. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  55. /// be a better choice given the number of connections is likely to be very low in practice).
  56. @usableFromInline
  57. internal var _connections: [ConnectionManagerID: PerConnectionState]
  58. /// The threshold which if exceeded when creating a stream determines whether the pool will
  59. /// start connecting an idle connection (if one exists).
  60. ///
  61. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  62. /// and the number of reserved streams) and the total number of streams which non-idle connections
  63. /// could support (this includes the streams that a connection in the connecting state could
  64. /// support).
  65. @usableFromInline
  66. internal let reservationLoadThreshold: Double
  67. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  68. /// assume a connection will support this many streams until we know better.
  69. @usableFromInline
  70. internal let assumedMaxConcurrentStreams: Int
  71. /// A queue of waiters which may or may not get a stream in the future.
  72. @usableFromInline
  73. internal var waiters: CircularBuffer<Waiter>
  74. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  75. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  76. @usableFromInline
  77. internal let maxWaiters: Int
  78. /// Configuration for backoff between subsequence connection attempts.
  79. @usableFromInline
  80. internal let connectionBackoff: ConnectionBackoff
  81. /// Provides a channel factory to the `ConnectionManager`.
  82. @usableFromInline
  83. internal let channelProvider: ConnectionManagerChannelProvider
  84. /// The object to notify about changes to stream reservations; in practice this is usually
  85. /// the `PoolManager`.
  86. @usableFromInline
  87. internal let streamLender: StreamLender
  88. /// A logger which always sets "GRPC" as its source.
  89. @usableFromInline
  90. internal let logger: GRPCLogger
  91. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  92. @usableFromInline
  93. internal let now: () -> NIODeadline
  94. /// Logging metadata keys.
  95. @usableFromInline
  96. internal enum Metadata {
  97. /// The ID of this pool.
  98. @usableFromInline
  99. static let id = "pool.id"
  100. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  101. @usableFromInline
  102. static let reservationsCount = "pool.reservations.count"
  103. /// The number of streams this pool can support with non-idle connections at this time.
  104. @usableFromInline
  105. static let reservationsCapacity = "pool.reservations.capacity"
  106. /// The current reservation load (i.e. reservation count / reservation capacity)
  107. @usableFromInline
  108. static let reservationsLoad = "pool.reservations.load"
  109. /// The reservation load threshold, above which a new connection will be created (if possible).
  110. @usableFromInline
  111. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  112. /// The current number of waiters in the pool.
  113. @usableFromInline
  114. static let waitersCount = "pool.waiters.count"
  115. /// The maximum number of waiters the pool is configured to allow.
  116. @usableFromInline
  117. static let waitersMax = "pool.waiters.max"
  118. /// The number of waiters which were successfully serviced.
  119. @usableFromInline
  120. static let waitersServiced = "pool.waiters.serviced"
  121. /// The ID of waiter.
  122. @usableFromInline
  123. static let waiterID = "pool.waiter.id"
  124. }
  125. @usableFromInline
  126. init(
  127. eventLoop: EventLoop,
  128. maxWaiters: Int,
  129. reservationLoadThreshold: Double,
  130. assumedMaxConcurrentStreams: Int,
  131. connectionBackoff: ConnectionBackoff,
  132. channelProvider: ConnectionManagerChannelProvider,
  133. streamLender: StreamLender,
  134. logger: GRPCLogger,
  135. now: @escaping () -> NIODeadline = NIODeadline.now
  136. ) {
  137. precondition(
  138. (0.0 ... 1.0).contains(reservationLoadThreshold),
  139. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  140. )
  141. self.reservationLoadThreshold = reservationLoadThreshold
  142. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  143. self._connections = [:]
  144. self.maxWaiters = maxWaiters
  145. self.waiters = CircularBuffer(initialCapacity: 16)
  146. self.eventLoop = eventLoop
  147. self.connectionBackoff = connectionBackoff
  148. self.channelProvider = channelProvider
  149. self.streamLender = streamLender
  150. self.logger = logger
  151. self.now = now
  152. }
  153. /// Initialize the connection pool.
  154. ///
  155. /// - Parameter connections: The number of connections to add to the pool.
  156. internal func initialize(connections: Int) {
  157. assert(self._connections.isEmpty)
  158. self._connections.reserveCapacity(connections)
  159. while self._connections.count < connections {
  160. self.addConnectionToPool()
  161. }
  162. }
  163. /// Make and add a new connection to the pool.
  164. private func addConnectionToPool() {
  165. let manager = ConnectionManager(
  166. eventLoop: self.eventLoop,
  167. channelProvider: self.channelProvider,
  168. callStartBehavior: .waitsForConnectivity,
  169. connectionBackoff: self.connectionBackoff,
  170. connectivityDelegate: self,
  171. http2Delegate: self,
  172. logger: self.logger.unwrapped
  173. )
  174. self._connections[manager.id] = PerConnectionState(manager: manager)
  175. }
  176. // MARK: - Called from the pool manager
  177. /// Make and initialize an HTTP/2 stream `Channel`.
  178. ///
  179. /// - Parameters:
  180. /// - deadline: The point in time by which the `promise` must have been resolved.
  181. /// - promise: A promise for a `Channel`.
  182. /// - logger: A request logger.
  183. /// - initializer: A closure to initialize the `Channel` with.
  184. @inlinable
  185. internal func makeStream(
  186. deadline: NIODeadline,
  187. promise: EventLoopPromise<Channel>,
  188. logger: GRPCLogger,
  189. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  190. ) {
  191. if self.eventLoop.inEventLoop {
  192. self._makeStream(
  193. deadline: deadline,
  194. promise: promise,
  195. logger: logger,
  196. initializer: initializer
  197. )
  198. } else {
  199. self.eventLoop.execute {
  200. self._makeStream(
  201. deadline: deadline,
  202. promise: promise,
  203. logger: logger,
  204. initializer: initializer
  205. )
  206. }
  207. }
  208. }
  209. /// See `makeStream(deadline:promise:logger:initializer:)`.
  210. @inlinable
  211. internal func makeStream(
  212. deadline: NIODeadline,
  213. logger: GRPCLogger,
  214. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  215. ) -> EventLoopFuture<Channel> {
  216. let promise = self.eventLoop.makePromise(of: Channel.self)
  217. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  218. return promise.futureResult
  219. }
  220. /// Shutdown the connection pool.
  221. ///
  222. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  223. /// calls to `makeStream` will be failed immediately.
  224. ///
  225. /// - Parameter mode: The mode to use when shutting down.
  226. /// - Returns: A future indicated when shutdown has been completed.
  227. internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
  228. let promise = self.eventLoop.makePromise(of: Void.self)
  229. if self.eventLoop.inEventLoop {
  230. self._shutdown(mode: mode, promise: promise)
  231. } else {
  232. self.eventLoop.execute {
  233. self._shutdown(mode: mode, promise: promise)
  234. }
  235. }
  236. return promise.futureResult
  237. }
  238. /// See `makeStream(deadline:promise:logger:initializer:)`.
  239. ///
  240. /// - Important: Must be called on the pool's `EventLoop`.
  241. @inlinable
  242. internal func _makeStream(
  243. deadline: NIODeadline,
  244. promise: EventLoopPromise<Channel>,
  245. logger: GRPCLogger,
  246. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  247. ) {
  248. self.eventLoop.assertInEventLoop()
  249. guard case .active = self._state else {
  250. // Fail the promise right away if we're shutting down or already shut down.
  251. promise.fail(ConnectionPoolError.shutdown)
  252. return
  253. }
  254. // Try to make a stream on an existing connection.
  255. let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
  256. if !streamCreated {
  257. // No stream was created, wait for one.
  258. self._enqueueWaiter(
  259. deadline: deadline,
  260. promise: promise,
  261. logger: logger,
  262. initializer: initializer
  263. )
  264. }
  265. }
  266. /// Try to find an existing connection on which we can make a stream.
  267. ///
  268. /// - Parameters:
  269. /// - promise: A promise to succeed if we can make a stream.
  270. /// - initializer: A closure to initialize the stream with.
  271. /// - Returns: A boolean value indicating whether the stream was created or not.
  272. @inlinable
  273. internal func _tryMakeStream(
  274. promise: EventLoopPromise<Channel>,
  275. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  276. ) -> Bool {
  277. // We shouldn't jump the queue.
  278. guard self.waiters.isEmpty else {
  279. return false
  280. }
  281. // Reserve a stream, if we can.
  282. guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
  283. return false
  284. }
  285. multiplexer.createStreamChannel(promise: promise, initializer)
  286. // Has reserving another stream tipped us over the limit for needing another connection?
  287. if self._shouldBringUpAnotherConnection() {
  288. self._startConnectingIdleConnection()
  289. }
  290. return true
  291. }
  292. /// Enqueue a waiter to be provided with a stream at some point in the future.
  293. ///
  294. /// - Parameters:
  295. /// - deadline: The point in time by which the promise should have been completed.
  296. /// - promise: The promise to complete with the `Channel`.
  297. /// - logger: A logger.
  298. /// - initializer: A closure to initialize the `Channel` with.
  299. @inlinable
  300. internal func _enqueueWaiter(
  301. deadline: NIODeadline,
  302. promise: EventLoopPromise<Channel>,
  303. logger: GRPCLogger,
  304. initializer: @escaping (Channel) -> EventLoopFuture<Void>
  305. ) {
  306. // Don't overwhelm the pool with too many waiters.
  307. guard self.waiters.count < self.maxWaiters else {
  308. logger.trace("connection pool has too many waiters", metadata: [
  309. Metadata.waitersMax: "\(self.maxWaiters)",
  310. ])
  311. promise.fail(ConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
  312. return
  313. }
  314. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  315. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  316. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  317. // earliest (even if the deadline has already passed).
  318. waiter.scheduleTimeout(on: self.eventLoop) {
  319. waiter.fail(ConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
  320. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  321. self.waiters.remove(at: index)
  322. logger.trace("timed out waiting for a connection", metadata: [
  323. Metadata.waiterID: "\(waiter.id)",
  324. Metadata.waitersCount: "\(self.waiters.count)",
  325. ])
  326. }
  327. }
  328. // request logger
  329. logger.debug("waiting for a connection to become available", metadata: [
  330. Metadata.waiterID: "\(waiter.id)",
  331. Metadata.waitersCount: "\(self.waiters.count)",
  332. ])
  333. self.waiters.append(waiter)
  334. // pool logger
  335. self.logger.trace("enqueued connection waiter", metadata: [
  336. Metadata.waitersCount: "\(self.waiters.count)",
  337. ])
  338. if self._shouldBringUpAnotherConnection() {
  339. self._startConnectingIdleConnection()
  340. }
  341. }
  342. /// Compute the current demand and capacity for streams.
  343. ///
  344. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  345. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  346. /// connections.
  347. ///
  348. /// - Returns: A tuple of the demand and capacity for streams.
  349. @usableFromInline
  350. internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  351. let demand = self.sync.reservedStreams + self.sync.waiters
  352. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  353. let capacity = self._connections.values.reduce(0) { sum, state in
  354. if state.manager.sync.isIdle {
  355. // Idle connection, no capacity.
  356. return sum
  357. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  358. // A known value of max concurrent streams, i.e. the connection is active.
  359. return sum + knownMaxAvailableStreams
  360. } else {
  361. // Not idle and no known value, the connection must be connecting so use our assumed value.
  362. return sum + self.assumedMaxConcurrentStreams
  363. }
  364. }
  365. return (demand, capacity)
  366. }
  367. /// Returns whether the pool should start connecting an idle connection (if one exists).
  368. @usableFromInline
  369. internal func _shouldBringUpAnotherConnection() -> Bool {
  370. let (demand, capacity) = self._computeStreamDemandAndCapacity()
  371. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  372. // will always be greater than the threshold and a new connection will be spun up.
  373. let load = Double(demand) / Double(capacity)
  374. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  375. if loadExceedsThreshold {
  376. self.logger.debug(
  377. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  378. metadata: [
  379. Metadata.reservationsCount: "\(demand)",
  380. Metadata.reservationsCapacity: "\(capacity)",
  381. Metadata.reservationsLoad: "\(load)",
  382. Metadata.reservationsLoadThreshold: "\(self.reservationLoadThreshold)",
  383. ]
  384. )
  385. }
  386. return loadExceedsThreshold
  387. }
  388. /// Starts connecting an idle connection, if one exists.
  389. @usableFromInline
  390. internal func _startConnectingIdleConnection() {
  391. if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  392. self._connections.values[index].manager.sync.startConnecting()
  393. }
  394. }
  395. /// Returns the index in `self.connections.values` of the connection with the most available
  396. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  397. /// available.
  398. ///
  399. /// - Note: this is linear in the number of connections.
  400. @usableFromInline
  401. internal func _mostAvailableConnectionIndex(
  402. ) -> Dictionary<ConnectionManagerID, PerConnectionState>.Index {
  403. var index = self._connections.values.startIndex
  404. var selectedIndex = self._connections.values.endIndex
  405. var mostAvailableStreams = 0
  406. while index != self._connections.values.endIndex {
  407. let availableStreams = self._connections.values[index].availableStreams
  408. if availableStreams > mostAvailableStreams {
  409. mostAvailableStreams = availableStreams
  410. selectedIndex = index
  411. }
  412. self._connections.values.formIndex(after: &index)
  413. }
  414. return selectedIndex
  415. }
  416. /// Reserves a stream from the connection with the most available streams, if one exists.
  417. ///
  418. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  419. /// or `nil` if no stream could be reserved.
  420. @usableFromInline
  421. internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  422. let index = self._mostAvailableConnectionIndex()
  423. if index != self._connections.endIndex {
  424. // '!' is okay here; the most available connection must have at least one stream available
  425. // to reserve.
  426. return self._connections.values[index].reserveStream()!
  427. } else {
  428. return nil
  429. }
  430. }
  431. /// See `shutdown(mode:)`.
  432. ///
  433. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  434. @usableFromInline
  435. internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  436. self.eventLoop.assertInEventLoop()
  437. switch self._state {
  438. case .active:
  439. self.logger.debug("shutting down connection pool")
  440. // We're shutting down now and when that's done we'll be fully shutdown.
  441. self._state = .shuttingDown(promise.futureResult)
  442. promise.futureResult.whenComplete { _ in
  443. self._state = .shutdown
  444. self.logger.trace("finished shutting down connection pool")
  445. }
  446. // Shutdown all the connections and remove them from the pool.
  447. let connections = self._connections
  448. self._connections.removeAll()
  449. let allShutdown = connections.values.map {
  450. $0.manager.shutdown(mode: mode)
  451. }
  452. // Fail the outstanding waiters.
  453. while let waiter = self.waiters.popFirst() {
  454. waiter.fail(ConnectionPoolError.shutdown)
  455. }
  456. // Cascade the result of the shutdown into the promise.
  457. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  458. case let .shuttingDown(future):
  459. // We're already shutting down, cascade the result.
  460. promise.completeWith(future)
  461. case .shutdown:
  462. // Already shutdown, fine.
  463. promise.succeed(())
  464. }
  465. }
  466. }
  467. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  468. // We're interested in a few different situations here:
  469. //
  470. // 1. The connection was usable ('ready') and is no longer usable (either it became idle or
  471. // encountered an error. If this happens we need to notify any connections of the change as
  472. // they may no longer be used for new RPCs.
  473. // 2. The connection was not usable but moved to a different unusable state. If this happens and
  474. // we know the cause of the state transition (i.e. the error) then we need to update our most
  475. // recent error with the error. This information is used when failing waiters to provide some
  476. // context as to why they may be failing.
  477. func connectionStateDidChange(
  478. _ manager: ConnectionManager,
  479. from oldState: _ConnectivityState,
  480. to newState: _ConnectivityState
  481. ) {
  482. switch (oldState, newState) {
  483. case let (.ready, .transientFailure(error)),
  484. let (.ready, .idle(.some(error))):
  485. self.updateMostRecentError(error)
  486. self.connectionUnavailable(manager.id)
  487. case (.ready, .idle(.none)),
  488. (.ready, .shutdown):
  489. self.connectionUnavailable(manager.id)
  490. case let (_, .transientFailure(error)),
  491. let (_, .idle(.some(error))):
  492. self.updateMostRecentError(error)
  493. default:
  494. ()
  495. }
  496. }
  497. func connectionIsQuiescing(_ manager: ConnectionManager) {
  498. self.eventLoop.assertInEventLoop()
  499. guard let removed = self._connections.removeValue(forKey: manager.id) else {
  500. return
  501. }
  502. // Drop any delegates. We're no longer interested in these events.
  503. removed.manager.sync.connectivityDelegate = nil
  504. removed.manager.sync.http2Delegate = nil
  505. // Replace the connection with a new idle one.
  506. self.addConnectionToPool()
  507. // Since we're removing this connection from the pool, the pool manager can ignore any streams
  508. // reserved against this connection.
  509. //
  510. // Note: we don't need to adjust the number of available streams as the number of connections
  511. // hasn't changed.
  512. self.streamLender.returnStreams(removed.reservedStreams, to: self)
  513. }
  514. private func updateMostRecentError(_ error: Error) {
  515. self.eventLoop.assertInEventLoop()
  516. // Update the last known error if there is one. We will use it to provide some context to
  517. // waiters which may fail.
  518. self._mostRecentError = error
  519. }
  520. /// A connection has become unavailable.
  521. private func connectionUnavailable(_ id: ConnectionManagerID) {
  522. self.eventLoop.assertInEventLoop()
  523. // The connection is no longer available: any streams which haven't been closed will be counted
  524. // as a dropped reservation, we need to tell the pool manager about them.
  525. if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
  526. self.streamLender.returnStreams(droppedReservations, to: self)
  527. }
  528. }
  529. }
  530. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  531. internal func streamClosed(_ manager: ConnectionManager) {
  532. self.eventLoop.assertInEventLoop()
  533. // Return the stream the connection and to the pool manager.
  534. self._connections[manager.id]?.returnStream()
  535. self.streamLender.returnStreams(1, to: self)
  536. // A stream was returned: we may be able to service a waiter now.
  537. self.tryServiceWaiters()
  538. }
  539. internal func receivedSettingsMaxConcurrentStreams(
  540. _ manager: ConnectionManager,
  541. maxConcurrentStreams: Int
  542. ) {
  543. self.eventLoop.assertInEventLoop()
  544. // If we received a SETTINGS update then a connection is okay: drop the last known error.
  545. self._mostRecentError = nil
  546. let previous = self._connections[manager.id]?.updateMaxConcurrentStreams(maxConcurrentStreams)
  547. let delta: Int
  548. if let previousValue = previous {
  549. // There was a previous value of max concurrent streams, i.e. a change in value for an
  550. // existing connection.
  551. delta = maxConcurrentStreams - previousValue
  552. } else {
  553. // There was no previous value so this must be a new connection. We'll compare against our
  554. // assumed default.
  555. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  556. }
  557. if delta != 0 {
  558. self.streamLender.changeStreamCapacity(by: delta, for: self)
  559. }
  560. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  561. self.tryServiceWaiters()
  562. }
  563. }
  564. extension ConnectionPool {
  565. // MARK: - Waiters
  566. /// Try to service as many waiters as possible.
  567. ///
  568. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  569. /// of waiters and `N` is the number of connections.
  570. private func tryServiceWaiters() {
  571. if self.waiters.isEmpty { return }
  572. self.logger.trace("servicing waiters", metadata: [
  573. Metadata.waitersCount: "\(self.waiters.count)",
  574. ])
  575. let now = self.now()
  576. var serviced = 0
  577. while !self.waiters.isEmpty {
  578. if self.waiters.first!.deadlineIsAfter(now) {
  579. if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
  580. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  581. // succeed the waiter.
  582. let waiter = self.waiters.removeFirst()
  583. serviced &+= 1
  584. waiter.succeed(with: multiplexer)
  585. } else {
  586. // There are waiters but no available connections, we're done.
  587. break
  588. }
  589. } else {
  590. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  591. // let its scheduled timeout fail the promise.
  592. self.waiters.removeFirst()
  593. }
  594. }
  595. self.logger.trace("done servicing waiters", metadata: [
  596. Metadata.waitersCount: "\(self.waiters.count)",
  597. Metadata.waitersServiced: "\(serviced)",
  598. ])
  599. }
  600. }
  601. extension ConnectionPool {
  602. /// Synchronous operations for the pool, mostly used by tests.
  603. internal struct Sync {
  604. private let pool: ConnectionPool
  605. fileprivate init(_ pool: ConnectionPool) {
  606. self.pool = pool
  607. }
  608. /// The number of outstanding connection waiters.
  609. internal var waiters: Int {
  610. self.pool.eventLoop.assertInEventLoop()
  611. return self.pool.waiters.count
  612. }
  613. /// The number of connection currently in the pool (in any state).
  614. internal var connections: Int {
  615. self.pool.eventLoop.assertInEventLoop()
  616. return self.pool._connections.count
  617. }
  618. /// The number of idle connections in the pool.
  619. internal var idleConnections: Int {
  620. self.pool.eventLoop.assertInEventLoop()
  621. return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  622. }
  623. /// The number of streams currently available to reserve across all connections in the pool.
  624. internal var availableStreams: Int {
  625. self.pool.eventLoop.assertInEventLoop()
  626. return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
  627. }
  628. /// The number of streams which have been reserved across all connections in the pool.
  629. internal var reservedStreams: Int {
  630. self.pool.eventLoop.assertInEventLoop()
  631. return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
  632. }
  633. }
  634. internal var sync: Sync {
  635. return Sync(self)
  636. }
  637. }
  638. @usableFromInline
  639. internal enum ConnectionPoolError: Error {
  640. /// The pool is shutdown or shutting down.
  641. case shutdown
  642. /// There are too many waiters in the pool.
  643. case tooManyWaiters(connectionError: Error?)
  644. /// The deadline for creating a stream has passed.
  645. case deadlineExceeded(connectionError: Error?)
  646. }
  647. extension ConnectionPoolError: GRPCStatusTransformable {
  648. @usableFromInline
  649. internal func makeGRPCStatus() -> GRPCStatus {
  650. switch self {
  651. case .shutdown:
  652. return GRPCStatus(
  653. code: .unavailable,
  654. message: "The connection pool is shutdown"
  655. )
  656. case let .tooManyWaiters(error):
  657. return GRPCStatus(
  658. code: .resourceExhausted,
  659. message: "The connection pool has no capacity for new RPCs or RPC waiters",
  660. cause: error
  661. )
  662. case let .deadlineExceeded(error):
  663. return GRPCStatus(
  664. code: .deadlineExceeded,
  665. message: "Timed out waiting for an HTTP/2 stream from the connection pool",
  666. cause: error
  667. )
  668. }
  669. }
  670. }