PoolManager.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOConcurrencyHelpers
  19. internal final class PoolManager {
  20. /// Configuration used for each connection pool.
  21. internal struct PerPoolConfiguration {
  22. /// The maximum number of connections per pool.
  23. var maxConnections: Int
  24. /// The maximum number of waiters per pool.
  25. var maxWaiters: Int
  26. /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
  27. /// (assuming there is a connection available to start).
  28. var loadThreshold: Double
  29. /// The assumed value of HTTP/2 'SETTINGS_MAX_CONCURRENT_STREAMS'.
  30. var assumedMaxConcurrentStreams: Int = 100
  31. /// The assumed maximum number of streams concurrently available in the pool.
  32. var assumedStreamCapacity: Int {
  33. return self.maxConnections * self.assumedMaxConcurrentStreams
  34. }
  35. /// A `Channel` provider.
  36. var channelProvider: DefaultChannelProvider
  37. }
  38. /// Logging metadata keys
  39. private enum Metadata {
  40. /// The ID of the pool manager.
  41. static let id = "poolmanager.id"
  42. /// The number of managed connection pools.
  43. static let poolCount = "poolmanager.pools.count"
  44. /// The maximum number of connections per pool.
  45. static let connectionsPerPool = "poolmanager.pools.conns_per_pool"
  46. /// The maximum number of waiters per pool.
  47. static let waitersPerPool = "poolmanager.pools.waiters_per_pool"
  48. }
  49. /// The current state of the pool manager, `lock` must be held when accessing or
  50. /// modifying `state`.
  51. private var state: PoolManagerStateMachine
  52. private var pools: [ConnectionPool]
  53. private let lock = Lock()
  54. /// The `EventLoopGroup` providing `EventLoop`s for connection pools. Once initialized the manager
  55. /// will hold as many pools as there are loops in this `EventLoopGroup`.
  56. private let group: EventLoopGroup
  57. /// Make a new pool manager and initialize it.
  58. ///
  59. /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
  60. /// Each connection pool is configured using the `perPoolConfiguration`.
  61. ///
  62. /// - Parameters:
  63. /// - group: The `EventLoopGroup` providing `EventLoop`s to connections managed by the pool
  64. /// manager.
  65. /// - perPoolConfiguration: Configuration used by each connection pool managed by the manager.
  66. /// - logger: A logger.
  67. /// - Returns: An initialized pool manager.
  68. internal static func makeInitializedPoolManager(
  69. using group: EventLoopGroup,
  70. perPoolConfiguration: PerPoolConfiguration,
  71. logger: GRPCLogger
  72. ) -> PoolManager {
  73. let manager = PoolManager(group: group)
  74. manager.initialize(perPoolConfiguration: perPoolConfiguration, logger: logger)
  75. return manager
  76. }
  77. private init(group: EventLoopGroup) {
  78. self.state = PoolManagerStateMachine(.inactive)
  79. self.pools = []
  80. self.group = group
  81. // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
  82. // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
  83. // of that we'll only check when running in debug mode.
  84. debugOnly {
  85. let eventLoopIDs = group.makeIterator().map { ObjectIdentifier($0) }
  86. let uniqueEventLoopIDs = Set(eventLoopIDs)
  87. assert(
  88. eventLoopIDs.count == uniqueEventLoopIDs.count,
  89. "'group' contains non-unique event loops"
  90. )
  91. }
  92. }
  93. deinit {
  94. self.lock.withLockVoid {
  95. assert(
  96. self.state.isShutdownOrShuttingDown,
  97. "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope."
  98. )
  99. }
  100. }
  101. /// Initialize the pool manager, create and initialize one connection pool per event loop in the
  102. /// pools `EventLoopGroup`.
  103. ///
  104. /// - Important: Must only be called once.
  105. /// - Parameters:
  106. /// - configuration: The configuration used for each connection pool.
  107. /// - logger: A logger.
  108. private func initialize(
  109. perPoolConfiguration configuration: PerPoolConfiguration,
  110. logger: GRPCLogger
  111. ) {
  112. var logger = logger
  113. logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
  114. let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
  115. logger.debug("initializing connection pool manager", metadata: [
  116. Metadata.poolCount: "\(pools.count)",
  117. Metadata.connectionsPerPool: "\(configuration.maxConnections)",
  118. Metadata.waitersPerPool: "\(configuration.maxWaiters)",
  119. ])
  120. // The assumed maximum number of streams concurrently available in each pool.
  121. let assumedCapacity = configuration.assumedStreamCapacity
  122. // The state machine stores the per-pool state keyed by the pools `EventLoopID` and tells the
  123. // pool manager about which pool to use/operate via the pools index in `self.pools`.
  124. let poolKeys = pools.indices.map { index in
  125. return ConnectionPoolKey(
  126. index: ConnectionPoolIndex(index),
  127. eventLoopID: pools[index].eventLoop.id
  128. )
  129. }
  130. self.lock.withLockVoid {
  131. assert(self.pools.isEmpty)
  132. self.pools = pools
  133. // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
  134. // call `initialize` directly.
  135. self.state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity)
  136. }
  137. for pool in pools {
  138. pool.initialize(connections: configuration.maxConnections)
  139. }
  140. }
  141. /// Make one pool per `EventLoop` in the pool's `EventLoopGroup`.
  142. /// - Parameters:
  143. /// - configuration: The configuration to make each pool with.
  144. /// - logger: A logger.
  145. /// - Returns: An array of `ConnectionPool`s.
  146. private func makePools(
  147. perPoolConfiguration configuration: PerPoolConfiguration,
  148. logger: GRPCLogger
  149. ) -> [ConnectionPool] {
  150. let eventLoops = self.group.makeIterator()
  151. return eventLoops.map { eventLoop in
  152. // We're creating a retain cycle here as each pool will reference the manager and the per-pool
  153. // state will hold the pool which will in turn be held by the pool manager. That's okay: when
  154. // the pool is shutdown the per-pool state and in turn each connection pool will be dropped.
  155. // and we'll break the cycle.
  156. return ConnectionPool(
  157. eventLoop: eventLoop,
  158. maxWaiters: configuration.maxWaiters,
  159. reservationLoadThreshold: configuration.loadThreshold,
  160. assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
  161. channelProvider: configuration.channelProvider,
  162. streamLender: self,
  163. logger: logger
  164. )
  165. }
  166. }
  167. // MARK: Stream Creation
  168. /// A future for a `Channel` from a managed connection pool. The `eventLoop` indicates the loop
  169. /// that the `Channel` is running on and therefore which event loop the RPC will use for its
  170. /// transport.
  171. internal struct PooledStreamChannel {
  172. /// The future `Channel`.
  173. var futureResult: EventLoopFuture<Channel>
  174. /// The `EventLoop` that the `Channel` is using.
  175. var eventLoop: EventLoop {
  176. return self.futureResult.eventLoop
  177. }
  178. }
  179. /// Make a stream and initialize it.
  180. ///
  181. /// - Parameters:
  182. /// - preferredEventLoop: The `EventLoop` that the stream should be created on, if possible. If
  183. /// a pool exists running this `EventLoop` then it will be chosen over all other pools,
  184. /// irregardless of the load on the pool. If no pool exists on the preferred `EventLoop` or
  185. /// no preference is given then the pool with the most streams available will be selected.
  186. /// The `EventLoop` of the selected pool will be the same as the `EventLoop` of
  187. /// the `EventLoopFuture<Channel>` returned from this call.
  188. /// - deadline: The point in time by which the stream must have been selected. If this deadline
  189. /// is passed then the returned `EventLoopFuture` will be failed.
  190. /// - logger: A logger.
  191. /// - initializer: A closure to initialize the `Channel` with.
  192. /// - Returns: A `PoolStreamChannel` indicating the future channel and `EventLoop` as that the
  193. /// `Channel` is using. The future will be failed if the pool manager has been shutdown,
  194. /// the deadline has passed before a stream was created or if the selected connection pool
  195. /// is unable to create a stream (if there is too much demand on that pool, for example).
  196. internal func makeStream(
  197. preferredEventLoop: EventLoop?,
  198. deadline: NIODeadline,
  199. logger: GRPCLogger,
  200. streamInitializer initializer: @escaping (Channel) -> EventLoopFuture<Void>
  201. ) -> PooledStreamChannel {
  202. let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) }
  203. let reservedPool = self.lock.withLock {
  204. return self.state.reserveStream(preferringPoolWithEventLoopID: preferredEventLoopID).map {
  205. return self.pools[$0.value]
  206. }
  207. }
  208. switch reservedPool {
  209. case let .success(pool):
  210. let channel = pool.makeStream(deadline: deadline, logger: logger, initializer: initializer)
  211. return PooledStreamChannel(futureResult: channel)
  212. case let .failure(error):
  213. let eventLoop = preferredEventLoop ?? self.group.next()
  214. return PooledStreamChannel(futureResult: eventLoop.makeFailedFuture(error))
  215. }
  216. }
  217. // MARK: Shutdown
  218. /// Shutdown the pool manager and all connection pools it manages.
  219. internal func shutdown(promise: EventLoopPromise<Void>) {
  220. let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
  221. .withLock {
  222. let action = self.state.shutdown(promise: promise)
  223. switch action {
  224. case .shutdownPools:
  225. // Clear out the pools; we need to shut them down.
  226. let pools = self.pools
  227. self.pools.removeAll(keepingCapacity: true)
  228. return (action, pools)
  229. case .alreadyShutdown, .alreadyShuttingDown:
  230. return (action, nil)
  231. }
  232. }
  233. switch (action, pools) {
  234. case let (.shutdownPools, .some(pools)):
  235. promise.futureResult.whenComplete { _ in self.shutdownComplete() }
  236. EventLoopFuture.andAllSucceed(pools.map { $0.shutdown() }, promise: promise)
  237. case let (.alreadyShuttingDown(future), .none):
  238. promise.completeWith(future)
  239. case (.alreadyShutdown, .none):
  240. promise.succeed(())
  241. case (.shutdownPools, .none),
  242. (.alreadyShuttingDown, .some),
  243. (.alreadyShutdown, .some):
  244. preconditionFailure()
  245. }
  246. }
  247. private func shutdownComplete() {
  248. self.lock.withLockVoid {
  249. self.state.shutdownComplete()
  250. }
  251. }
  252. }
  253. // MARK: - Connection Pool to Pool Manager
  254. extension PoolManager: StreamLender {
  255. internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
  256. self.lock.withLockVoid {
  257. self.state.returnStreams(count, toPoolOnEventLoopWithID: pool.eventLoop.id)
  258. }
  259. }
  260. internal func changeStreamCapacity(by delta: Int, for pool: ConnectionPool) {
  261. self.lock.withLockVoid {
  262. self.state.changeStreamCapacity(by: delta, forPoolOnEventLoopWithID: pool.eventLoop.id)
  263. }
  264. }
  265. }
  266. internal enum PoolManagerError: Error {
  267. /// The pool manager has not been initialized yet.
  268. case notInitialized
  269. /// The pool manager has been shutdown or is in the process of shutting down.
  270. case shutdown
  271. }