PoolManager.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. // Unchecked because all mutable state is protected by a lock.
  20. extension PooledChannel: @unchecked Sendable {}
  21. @usableFromInline
  22. internal final class PoolManager {
  23. /// Configuration used for each connection pool.
  24. @usableFromInline
  25. internal struct PerPoolConfiguration {
  26. /// The maximum number of connections per pool.
  27. @usableFromInline
  28. var maxConnections: Int
  29. /// The maximum number of waiters per pool.
  30. @usableFromInline
  31. var maxWaiters: Int
  32. /// The minimum number of connections to keep open per pool.
  33. /// This number of connections will never go idle and be closed.
  34. @usableFromInline
  35. var minConnections: Int
  36. /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
  37. /// (assuming there is a connection available to start).
  38. @usableFromInline
  39. var loadThreshold: Double
  40. /// The assumed value of HTTP/2 'SETTINGS_MAX_CONCURRENT_STREAMS'.
  41. @usableFromInline
  42. var assumedMaxConcurrentStreams: Int
  43. /// The assumed maximum number of streams concurrently available in the pool.
  44. @usableFromInline
  45. var assumedStreamCapacity: Int {
  46. return self.maxConnections * self.assumedMaxConcurrentStreams
  47. }
  48. @usableFromInline
  49. var connectionBackoff: ConnectionBackoff
  50. /// A `Channel` provider.
  51. @usableFromInline
  52. var channelProvider: DefaultChannelProvider
  53. @usableFromInline
  54. var delegate: GRPCConnectionPoolDelegate?
  55. @usableFromInline
  56. var statsPeriod: TimeAmount?
  57. @usableFromInline
  58. internal init(
  59. maxConnections: Int,
  60. maxWaiters: Int,
  61. minConnections: Int,
  62. loadThreshold: Double,
  63. assumedMaxConcurrentStreams: Int = 100,
  64. connectionBackoff: ConnectionBackoff,
  65. channelProvider: DefaultChannelProvider,
  66. delegate: GRPCConnectionPoolDelegate?,
  67. statsPeriod: TimeAmount?
  68. ) {
  69. self.maxConnections = maxConnections
  70. self.maxWaiters = maxWaiters
  71. self.minConnections = minConnections
  72. self.loadThreshold = loadThreshold
  73. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  74. self.connectionBackoff = connectionBackoff
  75. self.channelProvider = channelProvider
  76. self.delegate = delegate
  77. self.statsPeriod = statsPeriod
  78. }
  79. }
  80. /// Logging metadata keys
  81. private enum Metadata {
  82. /// The ID of the pool manager.
  83. static let id = "poolmanager.id"
  84. /// The number of managed connection pools.
  85. static let poolCount = "poolmanager.pools.count"
  86. /// The maximum number of connections per pool.
  87. static let connectionsPerPool = "poolmanager.pools.conns_per_pool"
  88. /// The maximum number of waiters per pool.
  89. static let waitersPerPool = "poolmanager.pools.waiters_per_pool"
  90. }
  91. /// The current state of the pool manager, `lock` must be held when accessing or
  92. /// modifying `state`.
  93. @usableFromInline
  94. internal var _state: PoolManagerStateMachine
  95. @usableFromInline
  96. internal var _pools: [ConnectionPool]
  97. @usableFromInline
  98. internal let lock = NIOLock()
  99. /// The `EventLoopGroup` providing `EventLoop`s for connection pools. Once initialized the manager
  100. /// will hold as many pools as there are loops in this `EventLoopGroup`.
  101. @usableFromInline
  102. internal let group: EventLoopGroup
  103. @usableFromInline
  104. internal let id: GRPCConnectionPoolID
  105. /// Make a new pool manager and initialize it.
  106. ///
  107. /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
  108. /// Each connection pool is configured using the `perPoolConfiguration`.
  109. ///
  110. /// - Parameters:
  111. /// - group: The `EventLoopGroup` providing `EventLoop`s to connections managed by the pool
  112. /// manager.
  113. /// - perPoolConfiguration: Configuration used by each connection pool managed by the manager.
  114. /// - logger: A logger.
  115. /// - Returns: An initialized pool manager.
  116. @usableFromInline
  117. internal static func makeInitializedPoolManager(
  118. using group: EventLoopGroup,
  119. perPoolConfiguration: PerPoolConfiguration,
  120. logger: Logger
  121. ) -> PoolManager {
  122. let manager = PoolManager(privateButUsableFromInline_group: group)
  123. manager.initialize(perPoolConfiguration: perPoolConfiguration, logger: logger)
  124. return manager
  125. }
  126. @usableFromInline
  127. internal init(privateButUsableFromInline_group group: EventLoopGroup) {
  128. self._state = PoolManagerStateMachine(.inactive)
  129. self._pools = []
  130. self.group = group
  131. self.id = .next()
  132. // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
  133. // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
  134. // of that we'll only check when running in debug mode.
  135. debugOnly {
  136. let eventLoopIDs = group.makeIterator().map { ObjectIdentifier($0) }
  137. let uniqueEventLoopIDs = Set(eventLoopIDs)
  138. assert(
  139. eventLoopIDs.count == uniqueEventLoopIDs.count,
  140. "'group' contains non-unique event loops"
  141. )
  142. }
  143. }
  144. deinit {
  145. self.lock.withLock {
  146. assert(
  147. self._state.isShutdownOrShuttingDown,
  148. "The pool manager (\(self.id)) must be shutdown before going out of scope."
  149. )
  150. }
  151. }
  152. /// Initialize the pool manager, create and initialize one connection pool per event loop in the
  153. /// pools `EventLoopGroup`.
  154. ///
  155. /// - Important: Must only be called once.
  156. /// - Parameters:
  157. /// - configuration: The configuration used for each connection pool.
  158. /// - logger: A logger.
  159. private func initialize(
  160. perPoolConfiguration configuration: PerPoolConfiguration,
  161. logger: Logger
  162. ) {
  163. var logger = logger
  164. logger[metadataKey: Metadata.id] = "\(self.id)"
  165. let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
  166. logger.debug(
  167. "initializing connection pool manager",
  168. metadata: [
  169. Metadata.poolCount: "\(pools.count)",
  170. Metadata.connectionsPerPool: "\(configuration.maxConnections)",
  171. Metadata.waitersPerPool: "\(configuration.maxWaiters)",
  172. ]
  173. )
  174. // The assumed maximum number of streams concurrently available in each pool.
  175. let assumedCapacity = configuration.assumedStreamCapacity
  176. // The state machine stores the per-pool state keyed by the pools `EventLoopID` and tells the
  177. // pool manager about which pool to use/operate via the pools index in `self.pools`.
  178. let poolKeys = pools.indices.map { index in
  179. return ConnectionPoolKey(
  180. index: ConnectionPoolIndex(index),
  181. eventLoopID: pools[index].eventLoop.id
  182. )
  183. }
  184. let statsTask: RepeatedTask?
  185. if let period = configuration.statsPeriod, let delegate = configuration.delegate {
  186. let loop = self.group.next()
  187. statsTask = loop.scheduleRepeatedTask(initialDelay: period, delay: period) { _ in
  188. self.emitStats(delegate: delegate)
  189. }
  190. } else {
  191. statsTask = nil
  192. }
  193. self.lock.withLock {
  194. assert(self._pools.isEmpty)
  195. self._pools = pools
  196. // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
  197. // call `initialize` directly.
  198. self._state.activatePools(
  199. keyedBy: poolKeys,
  200. assumingPerPoolCapacity: assumedCapacity,
  201. statsTask: statsTask
  202. )
  203. }
  204. for pool in pools {
  205. pool.initialize(connections: configuration.maxConnections)
  206. }
  207. }
  208. /// Make one pool per `EventLoop` in the pool's `EventLoopGroup`.
  209. /// - Parameters:
  210. /// - configuration: The configuration to make each pool with.
  211. /// - logger: A logger.
  212. /// - Returns: An array of `ConnectionPool`s.
  213. private func makePools(
  214. perPoolConfiguration configuration: PerPoolConfiguration,
  215. logger: Logger
  216. ) -> [ConnectionPool] {
  217. let eventLoops = self.group.makeIterator()
  218. return eventLoops.map { eventLoop in
  219. // We're creating a retain cycle here as each pool will reference the manager and the per-pool
  220. // state will hold the pool which will in turn be held by the pool manager. That's okay: when
  221. // the pool is shutdown the per-pool state and in turn each connection pool will be dropped.
  222. // and we'll break the cycle.
  223. return ConnectionPool(
  224. eventLoop: eventLoop,
  225. maxWaiters: configuration.maxWaiters,
  226. minConnections: configuration.minConnections,
  227. reservationLoadThreshold: configuration.loadThreshold,
  228. assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
  229. connectionBackoff: configuration.connectionBackoff,
  230. channelProvider: configuration.channelProvider,
  231. streamLender: self,
  232. delegate: configuration.delegate,
  233. logger: logger
  234. )
  235. }
  236. }
  237. // MARK: Stream Creation
  238. /// A future for a `Channel` from a managed connection pool. The `eventLoop` indicates the loop
  239. /// that the `Channel` is running on and therefore which event loop the RPC will use for its
  240. /// transport.
  241. @usableFromInline
  242. internal struct PooledStreamChannel {
  243. @inlinable
  244. internal init(futureResult: EventLoopFuture<Channel>) {
  245. self.futureResult = futureResult
  246. }
  247. /// The future `Channel`.
  248. @usableFromInline
  249. var futureResult: EventLoopFuture<Channel>
  250. /// The `EventLoop` that the `Channel` is using.
  251. @usableFromInline
  252. var eventLoop: EventLoop {
  253. return self.futureResult.eventLoop
  254. }
  255. }
  256. /// Make a stream and initialize it.
  257. ///
  258. /// - Parameters:
  259. /// - preferredEventLoop: The `EventLoop` that the stream should be created on, if possible. If
  260. /// a pool exists running this `EventLoop` then it will be chosen over all other pools,
  261. /// irregardless of the load on the pool. If no pool exists on the preferred `EventLoop` or
  262. /// no preference is given then the pool with the most streams available will be selected.
  263. /// The `EventLoop` of the selected pool will be the same as the `EventLoop` of
  264. /// the `EventLoopFuture<Channel>` returned from this call.
  265. /// - deadline: The point in time by which the stream must have been selected. If this deadline
  266. /// is passed then the returned `EventLoopFuture` will be failed.
  267. /// - logger: A logger.
  268. /// - initializer: A closure to initialize the `Channel` with.
  269. /// - Returns: A `PoolStreamChannel` indicating the future channel and `EventLoop` as that the
  270. /// `Channel` is using. The future will be failed if the pool manager has been shutdown,
  271. /// the deadline has passed before a stream was created or if the selected connection pool
  272. /// is unable to create a stream (if there is too much demand on that pool, for example).
  273. @inlinable
  274. internal func makeStream(
  275. preferredEventLoop: EventLoop?,
  276. deadline: NIODeadline,
  277. logger: Logger,
  278. streamInitializer initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
  279. ) -> PooledStreamChannel {
  280. let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) }
  281. let reservedPool = self.lock.withLock {
  282. return self._state.reserveStream(preferringPoolWithEventLoopID: preferredEventLoopID).map {
  283. return self._pools[$0.value]
  284. }
  285. }
  286. switch reservedPool {
  287. case let .success(pool):
  288. let channel = pool.makeStream(deadline: deadline, logger: logger, initializer: initializer)
  289. return PooledStreamChannel(futureResult: channel)
  290. case let .failure(error):
  291. let eventLoop = preferredEventLoop ?? self.group.next()
  292. return PooledStreamChannel(futureResult: eventLoop.makeFailedFuture(error))
  293. }
  294. }
  295. // MARK: Shutdown
  296. /// Shutdown the pool manager and all connection pools it manages.
  297. @usableFromInline
  298. internal func shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  299. let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
  300. .withLock {
  301. let action = self._state.shutdown(promise: promise)
  302. switch action {
  303. case .shutdownPools:
  304. // Clear out the pools; we need to shut them down.
  305. let pools = self._pools
  306. self._pools.removeAll(keepingCapacity: true)
  307. return (action, pools)
  308. case .alreadyShutdown, .alreadyShuttingDown:
  309. return (action, nil)
  310. }
  311. }
  312. switch (action, pools) {
  313. case let (.shutdownPools(statsTask), .some(pools)):
  314. statsTask?.cancel(promise: nil)
  315. promise.futureResult.whenComplete { _ in self.shutdownComplete() }
  316. EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise)
  317. case let (.alreadyShuttingDown(future), .none):
  318. promise.completeWith(future)
  319. case (.alreadyShutdown, .none):
  320. promise.succeed(())
  321. case (.shutdownPools, .none),
  322. (.alreadyShuttingDown, .some),
  323. (.alreadyShutdown, .some):
  324. preconditionFailure()
  325. }
  326. }
  327. private func shutdownComplete() {
  328. self.lock.withLock {
  329. self._state.shutdownComplete()
  330. }
  331. }
  332. // MARK: - Stats
  333. private func emitStats(delegate: GRPCConnectionPoolDelegate) {
  334. let pools = self.lock.withLock { self._pools }
  335. if pools.isEmpty { return }
  336. let statsFutures = pools.map { $0.stats() }
  337. EventLoopFuture.whenAllSucceed(statsFutures, on: self.group.any()).whenSuccess { stats in
  338. delegate.connectionPoolStats(stats, id: self.id)
  339. }
  340. }
  341. }
  342. // MARK: - Connection Pool to Pool Manager
  343. extension PoolManager: StreamLender {
  344. @usableFromInline
  345. internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
  346. self.lock.withLock {
  347. self._state.returnStreams(count, toPoolOnEventLoopWithID: pool.eventLoop.id)
  348. }
  349. }
  350. @usableFromInline
  351. internal func changeStreamCapacity(by delta: Int, for pool: ConnectionPool) {
  352. self.lock.withLock {
  353. self._state.changeStreamCapacity(by: delta, forPoolOnEventLoopWithID: pool.eventLoop.id)
  354. }
  355. }
  356. }
  357. @usableFromInline
  358. internal enum PoolManagerError: Error {
  359. /// The pool manager has not been initialized yet.
  360. case notInitialized
  361. /// The pool manager has been shutdown or is in the process of shutting down.
  362. case shutdown
  363. }