ConnectionPool.swift 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOHTTP2
  20. @usableFromInline
  21. internal final class ConnectionPool {
  22. /// The event loop all connections in this pool are running on.
  23. @usableFromInline
  24. internal let eventLoop: EventLoop
  25. @usableFromInline
  26. internal enum State {
  27. case active
  28. case shuttingDown(EventLoopFuture<Void>)
  29. case shutdown
  30. }
  31. /// The state of the connection pool.
  32. @usableFromInline
  33. internal var _state: State = .active
  34. /// The most recent connection error we have observed.
  35. ///
  36. /// This error is used to provide additional context to failed waiters. A waiter may, for example,
  37. /// timeout because the pool is busy, or because no connection can be established because of an
  38. /// underlying connection error. In the latter case it's useful for the caller to know why the
  39. /// connection is failing at the RPC layer.
  40. ///
  41. /// This value is cleared when a connection becomes 'available'. That is, when we receive an
  42. /// http/2 SETTINGS frame.
  43. ///
  44. /// This value is set whenever an underlying connection transitions to the transient failure state
  45. /// or to the idle state and has an associated error.
  46. @usableFromInline
  47. internal var _mostRecentError: Error? = nil
  48. /// Connection managers and their stream availability state keyed by the ID of the connection
  49. /// manager.
  50. ///
  51. /// Connections are accessed by their ID for connection state changes (infrequent) and when
  52. /// streams are closed (frequent). However when choosing which connection to succeed a waiter
  53. /// with (frequent) requires the connections to be ordered by their availability. A dictionary
  54. /// might not be the most efficient data structure (a queue prioritised by stream availability may
  55. /// be a better choice given the number of connections is likely to be very low in practice).
  56. @usableFromInline
  57. internal var _connections: [ConnectionManagerID: PerConnectionState]
  58. /// The threshold which if exceeded when creating a stream determines whether the pool will
  59. /// start connecting an idle connection (if one exists).
  60. ///
  61. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
  62. /// and the number of reserved streams) and the total number of streams which non-idle connections
  63. /// could support (this includes the streams that a connection in the connecting state could
  64. /// support).
  65. @usableFromInline
  66. internal let reservationLoadThreshold: Double
  67. /// The assumed value for the maximum number of concurrent streams a connection can support. We
  68. /// assume a connection will support this many streams until we know better.
  69. @usableFromInline
  70. internal let assumedMaxConcurrentStreams: Int
  71. /// A queue of waiters which may or may not get a stream in the future.
  72. @usableFromInline
  73. internal var waiters: CircularBuffer<Waiter>
  74. /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
  75. /// there are this many waiters in the queue then the next waiter will be failed immediately.
  76. @usableFromInline
  77. internal let maxWaiters: Int
  78. /// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
  79. /// In other words, it's the number of connections for which we should ignore idle timers.
  80. @usableFromInline
  81. internal let minConnections: Int
  82. /// Configuration for backoff between subsequence connection attempts.
  83. @usableFromInline
  84. internal let connectionBackoff: ConnectionBackoff
  85. /// Provides a channel factory to the `ConnectionManager`.
  86. @usableFromInline
  87. internal let channelProvider: ConnectionManagerChannelProvider
  88. /// The object to notify about changes to stream reservations; in practice this is usually
  89. /// the `PoolManager`.
  90. @usableFromInline
  91. internal let streamLender: StreamLender
  92. @usableFromInline
  93. internal var delegate: GRPCConnectionPoolDelegate?
  94. /// A logger which always sets "GRPC" as its source.
  95. @usableFromInline
  96. private(set) var logger: GRPCLogger
  97. /// Returns `NIODeadline` representing 'now'. This is useful for testing.
  98. @usableFromInline
  99. internal let now: () -> NIODeadline
  100. /// Logging metadata keys.
  101. @usableFromInline
  102. internal enum Metadata {
  103. /// The ID of this pool.
  104. @usableFromInline
  105. static let id = "pool.id"
  106. /// The number of stream reservations (i.e. number of open streams + number of waiters).
  107. @usableFromInline
  108. static let reservationsCount = "pool.reservations.count"
  109. /// The number of streams this pool can support with non-idle connections at this time.
  110. @usableFromInline
  111. static let reservationsCapacity = "pool.reservations.capacity"
  112. /// The current reservation load (i.e. reservation count / reservation capacity)
  113. @usableFromInline
  114. static let reservationsLoad = "pool.reservations.load"
  115. /// The reservation load threshold, above which a new connection will be created (if possible).
  116. @usableFromInline
  117. static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
  118. /// The current number of waiters in the pool.
  119. @usableFromInline
  120. static let waitersCount = "pool.waiters.count"
  121. /// The maximum number of waiters the pool is configured to allow.
  122. @usableFromInline
  123. static let waitersMax = "pool.waiters.max"
  124. /// The number of waiters which were successfully serviced.
  125. @usableFromInline
  126. static let waitersServiced = "pool.waiters.serviced"
  127. /// The ID of waiter.
  128. @usableFromInline
  129. static let waiterID = "pool.waiter.id"
  130. /// The maximum number of connections allowed in the pool.
  131. @usableFromInline
  132. static let connectionsMax = "pool.connections.max"
  133. /// The number of connections in the ready state.
  134. @usableFromInline
  135. static let connectionsReady = "pool.connections.ready"
  136. /// The number of connections in the connecting state.
  137. @usableFromInline
  138. static let connectionsConnecting = "pool.connections.connecting"
  139. /// The number of connections in the transient failure state.
  140. @usableFromInline
  141. static let connectionsTransientFailure = "pool.connections.transientFailure"
  142. }
  143. @usableFromInline
  144. init(
  145. eventLoop: EventLoop,
  146. maxWaiters: Int,
  147. minConnections: Int,
  148. reservationLoadThreshold: Double,
  149. assumedMaxConcurrentStreams: Int,
  150. connectionBackoff: ConnectionBackoff,
  151. channelProvider: ConnectionManagerChannelProvider,
  152. streamLender: StreamLender,
  153. delegate: GRPCConnectionPoolDelegate?,
  154. logger: GRPCLogger,
  155. now: @escaping () -> NIODeadline = NIODeadline.now
  156. ) {
  157. precondition(
  158. (0.0 ... 1.0).contains(reservationLoadThreshold),
  159. "reservationLoadThreshold must be within the range 0.0 ... 1.0"
  160. )
  161. self.reservationLoadThreshold = reservationLoadThreshold
  162. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  163. self._connections = [:]
  164. self.maxWaiters = maxWaiters
  165. self.minConnections = minConnections
  166. self.waiters = CircularBuffer(initialCapacity: 16)
  167. self.eventLoop = eventLoop
  168. self.connectionBackoff = connectionBackoff
  169. self.channelProvider = channelProvider
  170. self.streamLender = streamLender
  171. self.delegate = delegate
  172. self.logger = logger
  173. self.now = now
  174. }
  175. /// Initialize the connection pool.
  176. ///
  177. /// - Parameter connections: The number of connections to add to the pool.
  178. internal func initialize(connections: Int) {
  179. assert(self._connections.isEmpty)
  180. self.logger.logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
  181. self.logger.debug(
  182. "initializing new sub-pool",
  183. metadata: [
  184. Metadata.waitersMax: .stringConvertible(self.maxWaiters),
  185. Metadata.connectionsMax: .stringConvertible(connections),
  186. ]
  187. )
  188. self._connections.reserveCapacity(connections)
  189. var numberOfKeepOpenConnections = self.minConnections
  190. while self._connections.count < connections {
  191. // If we have less than the minimum number of connections, don't let
  192. // the new connection close when idle.
  193. let idleBehavior =
  194. numberOfKeepOpenConnections > 0
  195. ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
  196. numberOfKeepOpenConnections -= 1
  197. self.addConnectionToPool(idleBehavior: idleBehavior)
  198. }
  199. }
  200. /// Make and add a new connection to the pool.
  201. private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
  202. let manager = ConnectionManager(
  203. eventLoop: self.eventLoop,
  204. channelProvider: self.channelProvider,
  205. callStartBehavior: .waitsForConnectivity,
  206. idleBehavior: idleBehavior,
  207. connectionBackoff: self.connectionBackoff,
  208. connectivityDelegate: self,
  209. http2Delegate: self,
  210. logger: self.logger.unwrapped
  211. )
  212. let id = manager.id
  213. self._connections[id] = PerConnectionState(manager: manager)
  214. self.delegate?.connectionAdded(id: .init(id))
  215. // If it's one of the connections that should be kept open, then connect
  216. // straight away.
  217. switch idleBehavior {
  218. case .neverGoIdle:
  219. self.eventLoop.execute {
  220. if manager.sync.isIdle {
  221. manager.sync.startConnecting()
  222. }
  223. }
  224. case .closeWhenIdleTimeout:
  225. ()
  226. }
  227. }
  228. // MARK: - Called from the pool manager
  229. /// Make and initialize an HTTP/2 stream `Channel`.
  230. ///
  231. /// - Parameters:
  232. /// - deadline: The point in time by which the `promise` must have been resolved.
  233. /// - promise: A promise for a `Channel`.
  234. /// - logger: A request logger.
  235. /// - initializer: A closure to initialize the `Channel` with.
  236. @inlinable
  237. internal func makeStream(
  238. deadline: NIODeadline,
  239. promise: EventLoopPromise<Channel>,
  240. logger: GRPCLogger,
  241. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  242. ) {
  243. if self.eventLoop.inEventLoop {
  244. self._makeStream(
  245. deadline: deadline,
  246. promise: promise,
  247. logger: logger,
  248. initializer: initializer
  249. )
  250. } else {
  251. self.eventLoop.execute {
  252. self._makeStream(
  253. deadline: deadline,
  254. promise: promise,
  255. logger: logger,
  256. initializer: initializer
  257. )
  258. }
  259. }
  260. }
  261. /// See `makeStream(deadline:promise:logger:initializer:)`.
  262. @inlinable
  263. internal func makeStream(
  264. deadline: NIODeadline,
  265. logger: GRPCLogger,
  266. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  267. ) -> EventLoopFuture<Channel> {
  268. let promise = self.eventLoop.makePromise(of: Channel.self)
  269. self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
  270. return promise.futureResult
  271. }
  272. /// Shutdown the connection pool.
  273. ///
  274. /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
  275. /// calls to `makeStream` will be failed immediately.
  276. ///
  277. /// - Parameter mode: The mode to use when shutting down.
  278. /// - Returns: A future indicated when shutdown has been completed.
  279. internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
  280. let promise = self.eventLoop.makePromise(of: Void.self)
  281. if self.eventLoop.inEventLoop {
  282. self._shutdown(mode: mode, promise: promise)
  283. } else {
  284. self.eventLoop.execute {
  285. self._shutdown(mode: mode, promise: promise)
  286. }
  287. }
  288. return promise.futureResult
  289. }
  290. /// See `makeStream(deadline:promise:logger:initializer:)`.
  291. ///
  292. /// - Important: Must be called on the pool's `EventLoop`.
  293. @inlinable
  294. internal func _makeStream(
  295. deadline: NIODeadline,
  296. promise: EventLoopPromise<Channel>,
  297. logger: GRPCLogger,
  298. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  299. ) {
  300. self.eventLoop.assertInEventLoop()
  301. guard case .active = self._state else {
  302. // Fail the promise right away if we're shutting down or already shut down.
  303. promise.fail(GRPCConnectionPoolError.shutdown)
  304. return
  305. }
  306. // Try to make a stream on an existing connection.
  307. let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
  308. if !streamCreated {
  309. // No stream was created, wait for one.
  310. self._enqueueWaiter(
  311. deadline: deadline,
  312. promise: promise,
  313. logger: logger,
  314. initializer: initializer
  315. )
  316. }
  317. }
  318. /// Try to find an existing connection on which we can make a stream.
  319. ///
  320. /// - Parameters:
  321. /// - promise: A promise to succeed if we can make a stream.
  322. /// - initializer: A closure to initialize the stream with.
  323. /// - Returns: A boolean value indicating whether the stream was created or not.
  324. @inlinable
  325. internal func _tryMakeStream(
  326. promise: EventLoopPromise<Channel>,
  327. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  328. ) -> Bool {
  329. // We shouldn't jump the queue.
  330. guard self.waiters.isEmpty else {
  331. return false
  332. }
  333. // Reserve a stream, if we can.
  334. guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
  335. return false
  336. }
  337. multiplexer.createStreamChannel(promise: promise, initializer)
  338. // Has reserving another stream tipped us over the limit for needing another connection?
  339. if self._shouldBringUpAnotherConnection() {
  340. self._startConnectingIdleConnection()
  341. }
  342. return true
  343. }
  344. /// Enqueue a waiter to be provided with a stream at some point in the future.
  345. ///
  346. /// - Parameters:
  347. /// - deadline: The point in time by which the promise should have been completed.
  348. /// - promise: The promise to complete with the `Channel`.
  349. /// - logger: A logger.
  350. /// - initializer: A closure to initialize the `Channel` with.
  351. @inlinable
  352. internal func _enqueueWaiter(
  353. deadline: NIODeadline,
  354. promise: EventLoopPromise<Channel>,
  355. logger: GRPCLogger,
  356. initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  357. ) {
  358. // Don't overwhelm the pool with too many waiters.
  359. guard self.waiters.count < self.maxWaiters else {
  360. logger.trace(
  361. "connection pool has too many waiters",
  362. metadata: [
  363. Metadata.waitersMax: .stringConvertible(self.maxWaiters)
  364. ]
  365. )
  366. promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
  367. return
  368. }
  369. let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
  370. // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
  371. // timeout before appending it to the waiters, it wont run until the next event loop tick at the
  372. // earliest (even if the deadline has already passed).
  373. waiter.scheduleTimeout(on: self.eventLoop) {
  374. waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
  375. if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
  376. self.waiters.remove(at: index)
  377. logger.trace(
  378. "timed out waiting for a connection",
  379. metadata: [
  380. Metadata.waiterID: "\(waiter.id)",
  381. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  382. ]
  383. )
  384. }
  385. }
  386. // request logger
  387. logger.debug(
  388. "waiting for a connection to become available",
  389. metadata: [
  390. Metadata.waiterID: "\(waiter.id)",
  391. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  392. ]
  393. )
  394. self.waiters.append(waiter)
  395. // pool logger
  396. self.logger.trace(
  397. "enqueued connection waiter",
  398. metadata: [
  399. Metadata.waitersCount: .stringConvertible(self.waiters.count)
  400. ]
  401. )
  402. if self._shouldBringUpAnotherConnection() {
  403. self._startConnectingIdleConnection()
  404. }
  405. }
  406. /// Compute the current demand and capacity for streams.
  407. ///
  408. /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
  409. /// capacity for streams is the product of max concurrent streams and the number of non-idle
  410. /// connections.
  411. ///
  412. /// - Returns: A tuple of the demand and capacity for streams.
  413. @usableFromInline
  414. internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
  415. let demand = self.sync.reservedStreams + self.sync.waiters
  416. // TODO: make this cheaper by storing and incrementally updating the number of idle connections
  417. let capacity = self._connections.values.reduce(0) { sum, state in
  418. if state.manager.sync.isIdle || state.isQuiescing {
  419. // Idle connection or quiescing (so the capacity should be ignored).
  420. return sum
  421. } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
  422. // A known value of max concurrent streams, i.e. the connection is active.
  423. return sum + knownMaxAvailableStreams
  424. } else {
  425. // Not idle and no known value, the connection must be connecting so use our assumed value.
  426. return sum + self.assumedMaxConcurrentStreams
  427. }
  428. }
  429. return (demand, capacity)
  430. }
  431. /// Returns whether the pool should start connecting an idle connection (if one exists).
  432. @usableFromInline
  433. internal func _shouldBringUpAnotherConnection() -> Bool {
  434. let (demand, capacity) = self._computeStreamDemandAndCapacity()
  435. // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
  436. // will always be greater than the threshold and a new connection will be spun up.
  437. let load = Double(demand) / Double(capacity)
  438. let loadExceedsThreshold = load >= self.reservationLoadThreshold
  439. if loadExceedsThreshold {
  440. self.logger.debug(
  441. "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
  442. metadata: [
  443. Metadata.reservationsCount: .stringConvertible(demand),
  444. Metadata.reservationsCapacity: .stringConvertible(capacity),
  445. Metadata.reservationsLoad: .stringConvertible(load),
  446. Metadata.reservationsLoadThreshold: .stringConvertible(self.reservationLoadThreshold),
  447. ]
  448. )
  449. }
  450. return loadExceedsThreshold
  451. }
  452. /// Starts connecting an idle connection, if one exists.
  453. @usableFromInline
  454. internal func _startConnectingIdleConnection() {
  455. if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
  456. self._connections.values[index].manager.sync.startConnecting()
  457. } else {
  458. let connecting = self._connections.values.count { $0.manager.sync.isConnecting }
  459. let ready = self._connections.values.count { $0.manager.sync.isReady }
  460. let transientFailure = self._connections.values.count { $0.manager.sync.isTransientFailure }
  461. self.logger.debug(
  462. "no idle connections in pool",
  463. metadata: [
  464. Metadata.connectionsConnecting: .stringConvertible(connecting),
  465. Metadata.connectionsReady: .stringConvertible(ready),
  466. Metadata.connectionsTransientFailure: .stringConvertible(transientFailure),
  467. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  468. ]
  469. )
  470. }
  471. }
  472. /// Returns the index in `self.connections.values` of the connection with the most available
  473. /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
  474. /// available.
  475. ///
  476. /// - Note: this is linear in the number of connections.
  477. @usableFromInline
  478. internal func _mostAvailableConnectionIndex()
  479. -> Dictionary<ConnectionManagerID, PerConnectionState>.Index
  480. {
  481. var index = self._connections.values.startIndex
  482. var selectedIndex = self._connections.values.endIndex
  483. var mostAvailableStreams = 0
  484. while index != self._connections.values.endIndex {
  485. let availableStreams = self._connections.values[index].availableStreams
  486. if availableStreams > mostAvailableStreams {
  487. mostAvailableStreams = availableStreams
  488. selectedIndex = index
  489. }
  490. self._connections.values.formIndex(after: &index)
  491. }
  492. return selectedIndex
  493. }
  494. /// Reserves a stream from the connection with the most available streams, if one exists.
  495. ///
  496. /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
  497. /// or `nil` if no stream could be reserved.
  498. @usableFromInline
  499. internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
  500. let index = self._mostAvailableConnectionIndex()
  501. if index != self._connections.endIndex {
  502. // '!' is okay here; the most available connection must have at least one stream available
  503. // to reserve.
  504. return self._connections.values[index].reserveStream()!
  505. } else {
  506. return nil
  507. }
  508. }
  509. /// See `shutdown(mode:)`.
  510. ///
  511. /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
  512. @usableFromInline
  513. internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  514. self.eventLoop.assertInEventLoop()
  515. switch self._state {
  516. case .active:
  517. self.logger.debug("shutting down connection pool")
  518. // We're shutting down now and when that's done we'll be fully shutdown.
  519. self._state = .shuttingDown(promise.futureResult)
  520. promise.futureResult.whenComplete { _ in
  521. self._state = .shutdown
  522. self.delegate = nil
  523. self.logger.trace("finished shutting down connection pool")
  524. }
  525. // Shutdown all the connections and remove them from the pool.
  526. let connections = self._connections
  527. self._connections.removeAll()
  528. let allShutdown: [EventLoopFuture<Void>] = connections.values.map {
  529. let id = $0.manager.id
  530. let manager = $0.manager
  531. return manager.eventLoop.flatSubmit {
  532. // If the connection was idle/shutdown before calling shutdown then we shouldn't tell
  533. // the delegate the connection closed (because it either never connected or was already
  534. // informed about this).
  535. let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown
  536. return manager.shutdown(mode: mode).always { _ in
  537. if !connectionIsInactive {
  538. self.delegate?.connectionClosed(id: .init(id), error: nil)
  539. }
  540. self.delegate?.connectionRemoved(id: .init(id))
  541. }
  542. }
  543. }
  544. // Fail the outstanding waiters.
  545. while let waiter = self.waiters.popFirst() {
  546. waiter.fail(GRPCConnectionPoolError.shutdown)
  547. }
  548. // Cascade the result of the shutdown into the promise.
  549. EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
  550. case let .shuttingDown(future):
  551. // We're already shutting down, cascade the result.
  552. promise.completeWith(future)
  553. case .shutdown:
  554. // Already shutdown, fine.
  555. promise.succeed(())
  556. }
  557. }
  558. }
  559. extension ConnectionPool: ConnectionManagerConnectivityDelegate {
  560. // We're interested in a few different situations here:
  561. //
  562. // 1. The connection was usable ('ready') and is no longer usable (either it became idle or
  563. // encountered an error. If this happens we need to notify any connections of the change as
  564. // they may no longer be used for new RPCs.
  565. // 2. The connection was not usable but moved to a different unusable state. If this happens and
  566. // we know the cause of the state transition (i.e. the error) then we need to update our most
  567. // recent error with the error. This information is used when failing waiters to provide some
  568. // context as to why they may be failing.
  569. func connectionStateDidChange(
  570. _ manager: ConnectionManager,
  571. from oldState: _ConnectivityState,
  572. to newState: _ConnectivityState
  573. ) {
  574. switch (oldState, newState) {
  575. case let (.ready, .transientFailure(error)),
  576. let (.ready, .idle(.some(error))):
  577. self.updateMostRecentError(error)
  578. self.connectionUnavailable(manager.id)
  579. case (.ready, .idle(.none)),
  580. (.ready, .shutdown):
  581. self.connectionUnavailable(manager.id)
  582. case let (_, .transientFailure(error)),
  583. let (_, .idle(.some(error))):
  584. self.updateMostRecentError(error)
  585. default:
  586. ()
  587. }
  588. guard let delegate = self.delegate else { return }
  589. switch (oldState, newState) {
  590. case (.idle, .connecting),
  591. (.transientFailure, .connecting):
  592. delegate.startedConnecting(id: .init(manager.id))
  593. case (.connecting, .ready):
  594. // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'.
  595. ()
  596. case (.ready, .idle):
  597. delegate.connectionClosed(id: .init(manager.id), error: nil)
  598. case let (.ready, .transientFailure(error)):
  599. delegate.connectionClosed(id: .init(manager.id), error: error)
  600. case let (.connecting, .transientFailure(error)):
  601. delegate.connectFailed(id: .init(manager.id), error: error)
  602. default:
  603. ()
  604. }
  605. }
  606. func connectionIsQuiescing(_ manager: ConnectionManager) {
  607. self.eventLoop.assertInEventLoop()
  608. // Find the relevant connection.
  609. guard let index = self._connections.index(forKey: manager.id) else {
  610. return
  611. }
  612. // Drop the connectivity delegate, we're no longer interested in its events now.
  613. manager.sync.connectivityDelegate = nil
  614. // Started quiescing; update our state and notify the pool delegate.
  615. self._connections.values[index].isQuiescing = true
  616. self.delegate?.connectionQuiescing(id: .init(manager.id))
  617. // As the connection is quescing, we need to know when the current connection its managing has
  618. // closed. When that happens drop the H2 delegate and update the pool delegate.
  619. manager.onCurrentConnectionClose { hadActiveConnection in
  620. assert(hadActiveConnection)
  621. if let removed = self._connections.removeValue(forKey: manager.id) {
  622. removed.manager.sync.http2Delegate = nil
  623. self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil)
  624. self.delegate?.connectionRemoved(id: .init(removed.manager.id))
  625. }
  626. }
  627. // Grab the number of reserved streams (before invalidating the index by adding a connection).
  628. let reservedStreams = self._connections.values[index].reservedStreams
  629. // Replace the connection with a new idle one. Keep the idle behavior, so that
  630. // if it's a connection that should be kept alive, we maintain it.
  631. self.addConnectionToPool(idleBehavior: manager.idleBehavior)
  632. // Since we're removing this connection from the pool (and no new streams can be created on
  633. // the connection), the pool manager can ignore any streams reserved against this connection.
  634. // We do still care about the number of reserved streams for the connection though
  635. //
  636. // Note: we don't need to adjust the number of available streams as the effective number of
  637. // connections hasn't changed.
  638. self.streamLender.returnStreams(reservedStreams, to: self)
  639. }
  640. private func updateMostRecentError(_ error: Error) {
  641. self.eventLoop.assertInEventLoop()
  642. // Update the last known error if there is one. We will use it to provide some context to
  643. // waiters which may fail.
  644. self._mostRecentError = error
  645. }
  646. /// A connection has become unavailable.
  647. private func connectionUnavailable(_ id: ConnectionManagerID) {
  648. self.eventLoop.assertInEventLoop()
  649. // The connection is no longer available: any streams which haven't been closed will be counted
  650. // as a dropped reservation, we need to tell the pool manager about them.
  651. if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
  652. self.streamLender.returnStreams(droppedReservations, to: self)
  653. }
  654. }
  655. }
  656. extension ConnectionPool: ConnectionManagerHTTP2Delegate {
  657. internal func streamOpened(_ manager: ConnectionManager) {
  658. self.eventLoop.assertInEventLoop()
  659. if let utilization = self._connections[manager.id]?.openedStream(),
  660. let delegate = self.delegate
  661. {
  662. delegate.connectionUtilizationChanged(
  663. id: .init(manager.id),
  664. streamsUsed: utilization.used,
  665. streamCapacity: utilization.capacity
  666. )
  667. }
  668. }
  669. internal func streamClosed(_ manager: ConnectionManager) {
  670. self.eventLoop.assertInEventLoop()
  671. guard let index = self._connections.index(forKey: manager.id) else {
  672. return
  673. }
  674. // Return the stream the connection and to the pool manager.
  675. if let utilization = self._connections.values[index].returnStream(),
  676. let delegate = self.delegate
  677. {
  678. delegate.connectionUtilizationChanged(
  679. id: .init(manager.id),
  680. streamsUsed: utilization.used,
  681. streamCapacity: utilization.capacity
  682. )
  683. }
  684. // Return the stream to the pool manager if the connection is available and not quiescing. For
  685. // quiescing connections streams were returned when the connection started quiescing.
  686. if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing {
  687. self.streamLender.returnStreams(1, to: self)
  688. // A stream was returned: we may be able to service a waiter now.
  689. self.tryServiceWaiters()
  690. }
  691. }
  692. internal func receivedSettingsMaxConcurrentStreams(
  693. _ manager: ConnectionManager,
  694. maxConcurrentStreams: Int
  695. ) {
  696. self.eventLoop.assertInEventLoop()
  697. // Find the relevant connection.
  698. guard let index = self._connections.index(forKey: manager.id) else {
  699. return
  700. }
  701. // When the connection is quiescing, the pool manager is not interested in updates to the
  702. // connection, bail out early.
  703. if self._connections.values[index].isQuiescing {
  704. return
  705. }
  706. // If we received a SETTINGS update then a connection is okay: drop the last known error.
  707. self._mostRecentError = nil
  708. let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams)
  709. let delta: Int
  710. if let previousValue = previous {
  711. // There was a previous value of max concurrent streams, i.e. a change in value for an
  712. // existing connection.
  713. delta = maxConcurrentStreams - previousValue
  714. } else {
  715. // There was no previous value so this must be a new connection. We'll compare against our
  716. // assumed default.
  717. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
  718. // Notify the delegate.
  719. self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams)
  720. }
  721. if delta != 0 {
  722. self.streamLender.changeStreamCapacity(by: delta, for: self)
  723. }
  724. // We always check, even if `delta` isn't greater than zero as this might be a new connection.
  725. self.tryServiceWaiters()
  726. }
  727. }
  728. extension ConnectionPool {
  729. // MARK: - Waiters
  730. /// Try to service as many waiters as possible.
  731. ///
  732. /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
  733. /// of waiters and `N` is the number of connections.
  734. private func tryServiceWaiters() {
  735. if self.waiters.isEmpty { return }
  736. self.logger.trace(
  737. "servicing waiters",
  738. metadata: [
  739. Metadata.waitersCount: .stringConvertible(self.waiters.count)
  740. ]
  741. )
  742. let now = self.now()
  743. var serviced = 0
  744. while !self.waiters.isEmpty {
  745. if self.waiters.first!.deadlineIsAfter(now) {
  746. if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
  747. // The waiter's deadline is in the future, and we have a suitable connection. Remove and
  748. // succeed the waiter.
  749. let waiter = self.waiters.removeFirst()
  750. serviced &+= 1
  751. waiter.succeed(with: multiplexer)
  752. } else {
  753. // There are waiters but no available connections, we're done.
  754. break
  755. }
  756. } else {
  757. // The waiter's deadline has already expired, there's no point completing it. Remove it and
  758. // let its scheduled timeout fail the promise.
  759. self.waiters.removeFirst()
  760. }
  761. }
  762. self.logger.trace(
  763. "done servicing waiters",
  764. metadata: [
  765. Metadata.waitersCount: .stringConvertible(self.waiters.count),
  766. Metadata.waitersServiced: .stringConvertible(serviced),
  767. ]
  768. )
  769. }
  770. }
  771. extension ConnectionPool {
  772. /// Synchronous operations for the pool, mostly used by tests.
  773. internal struct Sync {
  774. private let pool: ConnectionPool
  775. fileprivate init(_ pool: ConnectionPool) {
  776. self.pool = pool
  777. }
  778. /// The number of outstanding connection waiters.
  779. internal var waiters: Int {
  780. self.pool.eventLoop.assertInEventLoop()
  781. return self.pool.waiters.count
  782. }
  783. /// The number of connection currently in the pool (in any state).
  784. internal var connections: Int {
  785. self.pool.eventLoop.assertInEventLoop()
  786. return self.pool._connections.count
  787. }
  788. /// The number of idle connections in the pool.
  789. internal var idleConnections: Int {
  790. self.pool.eventLoop.assertInEventLoop()
  791. return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
  792. }
  793. /// The number of active (i.e. connecting or ready) connections in the pool.
  794. internal var activeConnections: Int {
  795. self.pool.eventLoop.assertInEventLoop()
  796. return self.pool._connections.values.reduce(0) {
  797. $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
  798. }
  799. }
  800. /// The number of connections in the pool in transient failure state.
  801. internal var transientFailureConnections: Int {
  802. self.pool.eventLoop.assertInEventLoop()
  803. return self.pool._connections.values.reduce(0) {
  804. $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
  805. }
  806. }
  807. /// The number of streams currently available to reserve across all connections in the pool.
  808. internal var availableStreams: Int {
  809. self.pool.eventLoop.assertInEventLoop()
  810. return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
  811. }
  812. /// The number of streams which have been reserved across all connections in the pool.
  813. internal var reservedStreams: Int {
  814. self.pool.eventLoop.assertInEventLoop()
  815. return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
  816. }
  817. /// Updates the most recent connection error.
  818. internal func updateMostRecentError(_ error: Error) {
  819. self.pool.eventLoop.assertInEventLoop()
  820. self.pool.updateMostRecentError(error)
  821. }
  822. }
  823. internal var sync: Sync {
  824. return Sync(self)
  825. }
  826. }
  827. /// An error thrown from the ``GRPCChannelPool``.
  828. public struct GRPCConnectionPoolError: Error, CustomStringConvertible {
  829. public struct Code: Hashable, Sendable, CustomStringConvertible {
  830. enum Code {
  831. case shutdown
  832. case tooManyWaiters
  833. case deadlineExceeded
  834. }
  835. fileprivate var code: Code
  836. private init(_ code: Code) {
  837. self.code = code
  838. }
  839. public var description: String {
  840. String(describing: self.code)
  841. }
  842. /// The pool is shutdown or shutting down.
  843. public static var shutdown: Self { Self(.shutdown) }
  844. /// There are too many waiters in the pool.
  845. public static var tooManyWaiters: Self { Self(.tooManyWaiters) }
  846. /// The deadline for creating a stream has passed.
  847. public static var deadlineExceeded: Self { Self(.deadlineExceeded) }
  848. }
  849. /// The error code.
  850. public var code: Code
  851. /// An underlying error which caused this error to be thrown.
  852. public var underlyingError: Error?
  853. public var description: String {
  854. if let underlyingError = self.underlyingError {
  855. return "\(self.code) (\(underlyingError))"
  856. } else {
  857. return String(describing: self.code)
  858. }
  859. }
  860. /// Create a new connection pool error with the given code and underlying error.
  861. ///
  862. /// - Parameters:
  863. /// - code: The error code.
  864. /// - underlyingError: The underlying error which led to this error being thrown.
  865. public init(code: Code, underlyingError: Error? = nil) {
  866. self.code = code
  867. self.underlyingError = underlyingError
  868. }
  869. }
  870. extension GRPCConnectionPoolError {
  871. @usableFromInline
  872. static let shutdown = Self(code: .shutdown)
  873. @inlinable
  874. static func tooManyWaiters(connectionError: Error?) -> Self {
  875. Self(code: .tooManyWaiters, underlyingError: connectionError)
  876. }
  877. @inlinable
  878. static func deadlineExceeded(connectionError: Error?) -> Self {
  879. Self(code: .deadlineExceeded, underlyingError: connectionError)
  880. }
  881. }
  882. extension GRPCConnectionPoolError: GRPCStatusTransformable {
  883. public func makeGRPCStatus() -> GRPCStatus {
  884. switch self.code.code {
  885. case .shutdown:
  886. return GRPCStatus(
  887. code: .unavailable,
  888. message: "The connection pool is shutdown",
  889. cause: self.underlyingError
  890. )
  891. case .tooManyWaiters:
  892. return GRPCStatus(
  893. code: .resourceExhausted,
  894. message: "The connection pool has no capacity for new RPCs or RPC waiters",
  895. cause: self.underlyingError
  896. )
  897. case .deadlineExceeded:
  898. return GRPCStatus(
  899. code: .deadlineExceeded,
  900. message: "Timed out waiting for an HTTP/2 stream from the connection pool",
  901. cause: self.underlyingError
  902. )
  903. }
  904. }
  905. }
  906. extension Sequence {
  907. fileprivate func count(where predicate: (Element) -> Bool) -> Int {
  908. return self.reduce(0) { count, element in
  909. predicate(element) ? count + 1 : count
  910. }
  911. }
  912. }