PoolManager.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. #if compiler(>=5.6)
  20. // Unchecked because all mutable state is protected by a lock.
  21. extension PooledChannel: @unchecked Sendable {}
  22. #endif // compiler(>=5.6)
  23. @usableFromInline
  24. internal final class PoolManager {
  25. /// Configuration used for each connection pool.
  26. @usableFromInline
  27. internal struct PerPoolConfiguration {
  28. /// The maximum number of connections per pool.
  29. @usableFromInline
  30. var maxConnections: Int
  31. /// The maximum number of waiters per pool.
  32. @usableFromInline
  33. var maxWaiters: Int
  34. /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
  35. /// (assuming there is a connection available to start).
  36. @usableFromInline
  37. var loadThreshold: Double
  38. /// The assumed value of HTTP/2 'SETTINGS_MAX_CONCURRENT_STREAMS'.
  39. @usableFromInline
  40. var assumedMaxConcurrentStreams: Int
  41. /// The assumed maximum number of streams concurrently available in the pool.
  42. @usableFromInline
  43. var assumedStreamCapacity: Int {
  44. return self.maxConnections * self.assumedMaxConcurrentStreams
  45. }
  46. @usableFromInline
  47. var connectionBackoff: ConnectionBackoff
  48. /// A `Channel` provider.
  49. @usableFromInline
  50. var channelProvider: DefaultChannelProvider
  51. @usableFromInline
  52. var delegate: GRPCConnectionPoolDelegate?
  53. @usableFromInline
  54. internal init(
  55. maxConnections: Int,
  56. maxWaiters: Int,
  57. loadThreshold: Double,
  58. assumedMaxConcurrentStreams: Int = 100,
  59. connectionBackoff: ConnectionBackoff,
  60. channelProvider: DefaultChannelProvider,
  61. delegate: GRPCConnectionPoolDelegate?
  62. ) {
  63. self.maxConnections = maxConnections
  64. self.maxWaiters = maxWaiters
  65. self.loadThreshold = loadThreshold
  66. self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
  67. self.connectionBackoff = connectionBackoff
  68. self.channelProvider = channelProvider
  69. self.delegate = delegate
  70. }
  71. }
  72. /// Logging metadata keys
  73. private enum Metadata {
  74. /// The ID of the pool manager.
  75. static let id = "poolmanager.id"
  76. /// The number of managed connection pools.
  77. static let poolCount = "poolmanager.pools.count"
  78. /// The maximum number of connections per pool.
  79. static let connectionsPerPool = "poolmanager.pools.conns_per_pool"
  80. /// The maximum number of waiters per pool.
  81. static let waitersPerPool = "poolmanager.pools.waiters_per_pool"
  82. }
  83. /// The current state of the pool manager, `lock` must be held when accessing or
  84. /// modifying `state`.
  85. @usableFromInline
  86. internal var _state: PoolManagerStateMachine
  87. @usableFromInline
  88. internal var _pools: [ConnectionPool]
  89. @usableFromInline
  90. internal let lock = NIOLock()
  91. /// The `EventLoopGroup` providing `EventLoop`s for connection pools. Once initialized the manager
  92. /// will hold as many pools as there are loops in this `EventLoopGroup`.
  93. @usableFromInline
  94. internal let group: EventLoopGroup
  95. /// Make a new pool manager and initialize it.
  96. ///
  97. /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
  98. /// Each connection pool is configured using the `perPoolConfiguration`.
  99. ///
  100. /// - Parameters:
  101. /// - group: The `EventLoopGroup` providing `EventLoop`s to connections managed by the pool
  102. /// manager.
  103. /// - perPoolConfiguration: Configuration used by each connection pool managed by the manager.
  104. /// - logger: A logger.
  105. /// - Returns: An initialized pool manager.
  106. @usableFromInline
  107. internal static func makeInitializedPoolManager(
  108. using group: EventLoopGroup,
  109. perPoolConfiguration: PerPoolConfiguration,
  110. logger: GRPCLogger
  111. ) -> PoolManager {
  112. let manager = PoolManager(privateButUsableFromInline_group: group)
  113. manager.initialize(perPoolConfiguration: perPoolConfiguration, logger: logger)
  114. return manager
  115. }
  116. @usableFromInline
  117. internal init(privateButUsableFromInline_group group: EventLoopGroup) {
  118. self._state = PoolManagerStateMachine(.inactive)
  119. self._pools = []
  120. self.group = group
  121. // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
  122. // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
  123. // of that we'll only check when running in debug mode.
  124. debugOnly {
  125. let eventLoopIDs = group.makeIterator().map { ObjectIdentifier($0) }
  126. let uniqueEventLoopIDs = Set(eventLoopIDs)
  127. assert(
  128. eventLoopIDs.count == uniqueEventLoopIDs.count,
  129. "'group' contains non-unique event loops"
  130. )
  131. }
  132. }
  133. deinit {
  134. self.lock.withLock {
  135. assert(
  136. self._state.isShutdownOrShuttingDown,
  137. "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope."
  138. )
  139. }
  140. }
  141. /// Initialize the pool manager, create and initialize one connection pool per event loop in the
  142. /// pools `EventLoopGroup`.
  143. ///
  144. /// - Important: Must only be called once.
  145. /// - Parameters:
  146. /// - configuration: The configuration used for each connection pool.
  147. /// - logger: A logger.
  148. private func initialize(
  149. perPoolConfiguration configuration: PerPoolConfiguration,
  150. logger: GRPCLogger
  151. ) {
  152. var logger = logger
  153. logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
  154. let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
  155. logger.debug("initializing connection pool manager", metadata: [
  156. Metadata.poolCount: "\(pools.count)",
  157. Metadata.connectionsPerPool: "\(configuration.maxConnections)",
  158. Metadata.waitersPerPool: "\(configuration.maxWaiters)",
  159. ])
  160. // The assumed maximum number of streams concurrently available in each pool.
  161. let assumedCapacity = configuration.assumedStreamCapacity
  162. // The state machine stores the per-pool state keyed by the pools `EventLoopID` and tells the
  163. // pool manager about which pool to use/operate via the pools index in `self.pools`.
  164. let poolKeys = pools.indices.map { index in
  165. return ConnectionPoolKey(
  166. index: ConnectionPoolIndex(index),
  167. eventLoopID: pools[index].eventLoop.id
  168. )
  169. }
  170. self.lock.withLock {
  171. assert(self._pools.isEmpty)
  172. self._pools = pools
  173. // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
  174. // call `initialize` directly.
  175. self._state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity)
  176. }
  177. for pool in pools {
  178. pool.initialize(connections: configuration.maxConnections)
  179. }
  180. }
  181. /// Make one pool per `EventLoop` in the pool's `EventLoopGroup`.
  182. /// - Parameters:
  183. /// - configuration: The configuration to make each pool with.
  184. /// - logger: A logger.
  185. /// - Returns: An array of `ConnectionPool`s.
  186. private func makePools(
  187. perPoolConfiguration configuration: PerPoolConfiguration,
  188. logger: GRPCLogger
  189. ) -> [ConnectionPool] {
  190. let eventLoops = self.group.makeIterator()
  191. return eventLoops.map { eventLoop in
  192. // We're creating a retain cycle here as each pool will reference the manager and the per-pool
  193. // state will hold the pool which will in turn be held by the pool manager. That's okay: when
  194. // the pool is shutdown the per-pool state and in turn each connection pool will be dropped.
  195. // and we'll break the cycle.
  196. return ConnectionPool(
  197. eventLoop: eventLoop,
  198. maxWaiters: configuration.maxWaiters,
  199. reservationLoadThreshold: configuration.loadThreshold,
  200. assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
  201. connectionBackoff: configuration.connectionBackoff,
  202. channelProvider: configuration.channelProvider,
  203. streamLender: self,
  204. delegate: configuration.delegate,
  205. logger: logger
  206. )
  207. }
  208. }
  209. // MARK: Stream Creation
  210. /// A future for a `Channel` from a managed connection pool. The `eventLoop` indicates the loop
  211. /// that the `Channel` is running on and therefore which event loop the RPC will use for its
  212. /// transport.
  213. @usableFromInline
  214. internal struct PooledStreamChannel {
  215. @inlinable
  216. internal init(futureResult: EventLoopFuture<Channel>) {
  217. self.futureResult = futureResult
  218. }
  219. /// The future `Channel`.
  220. @usableFromInline
  221. var futureResult: EventLoopFuture<Channel>
  222. /// The `EventLoop` that the `Channel` is using.
  223. @usableFromInline
  224. var eventLoop: EventLoop {
  225. return self.futureResult.eventLoop
  226. }
  227. }
  228. /// Make a stream and initialize it.
  229. ///
  230. /// - Parameters:
  231. /// - preferredEventLoop: The `EventLoop` that the stream should be created on, if possible. If
  232. /// a pool exists running this `EventLoop` then it will be chosen over all other pools,
  233. /// irregardless of the load on the pool. If no pool exists on the preferred `EventLoop` or
  234. /// no preference is given then the pool with the most streams available will be selected.
  235. /// The `EventLoop` of the selected pool will be the same as the `EventLoop` of
  236. /// the `EventLoopFuture<Channel>` returned from this call.
  237. /// - deadline: The point in time by which the stream must have been selected. If this deadline
  238. /// is passed then the returned `EventLoopFuture` will be failed.
  239. /// - logger: A logger.
  240. /// - initializer: A closure to initialize the `Channel` with.
  241. /// - Returns: A `PoolStreamChannel` indicating the future channel and `EventLoop` as that the
  242. /// `Channel` is using. The future will be failed if the pool manager has been shutdown,
  243. /// the deadline has passed before a stream was created or if the selected connection pool
  244. /// is unable to create a stream (if there is too much demand on that pool, for example).
  245. @inlinable
  246. internal func makeStream(
  247. preferredEventLoop: EventLoop?,
  248. deadline: NIODeadline,
  249. logger: GRPCLogger,
  250. streamInitializer initializer: @escaping (Channel) -> EventLoopFuture<Void>
  251. ) -> PooledStreamChannel {
  252. let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) }
  253. let reservedPool = self.lock.withLock {
  254. return self._state.reserveStream(preferringPoolWithEventLoopID: preferredEventLoopID).map {
  255. return self._pools[$0.value]
  256. }
  257. }
  258. switch reservedPool {
  259. case let .success(pool):
  260. let channel = pool.makeStream(deadline: deadline, logger: logger, initializer: initializer)
  261. return PooledStreamChannel(futureResult: channel)
  262. case let .failure(error):
  263. let eventLoop = preferredEventLoop ?? self.group.next()
  264. return PooledStreamChannel(futureResult: eventLoop.makeFailedFuture(error))
  265. }
  266. }
  267. // MARK: Shutdown
  268. /// Shutdown the pool manager and all connection pools it manages.
  269. @usableFromInline
  270. internal func shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
  271. let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
  272. .withLock {
  273. let action = self._state.shutdown(promise: promise)
  274. switch action {
  275. case .shutdownPools:
  276. // Clear out the pools; we need to shut them down.
  277. let pools = self._pools
  278. self._pools.removeAll(keepingCapacity: true)
  279. return (action, pools)
  280. case .alreadyShutdown, .alreadyShuttingDown:
  281. return (action, nil)
  282. }
  283. }
  284. switch (action, pools) {
  285. case let (.shutdownPools, .some(pools)):
  286. promise.futureResult.whenComplete { _ in self.shutdownComplete() }
  287. EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise)
  288. case let (.alreadyShuttingDown(future), .none):
  289. promise.completeWith(future)
  290. case (.alreadyShutdown, .none):
  291. promise.succeed(())
  292. case (.shutdownPools, .none),
  293. (.alreadyShuttingDown, .some),
  294. (.alreadyShutdown, .some):
  295. preconditionFailure()
  296. }
  297. }
  298. private func shutdownComplete() {
  299. self.lock.withLock {
  300. self._state.shutdownComplete()
  301. }
  302. }
  303. }
  304. // MARK: - Connection Pool to Pool Manager
  305. extension PoolManager: StreamLender {
  306. @usableFromInline
  307. internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
  308. self.lock.withLock {
  309. self._state.returnStreams(count, toPoolOnEventLoopWithID: pool.eventLoop.id)
  310. }
  311. }
  312. @usableFromInline
  313. internal func changeStreamCapacity(by delta: Int, for pool: ConnectionPool) {
  314. self.lock.withLock {
  315. self._state.changeStreamCapacity(by: delta, forPoolOnEventLoopWithID: pool.eventLoop.id)
  316. }
  317. }
  318. }
  319. @usableFromInline
  320. internal enum PoolManagerError: Error {
  321. /// The pool manager has not been initialized yet.
  322. case notInitialized
  323. /// The pool manager has been shutdown or is in the process of shutting down.
  324. case shutdown
  325. }