| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIOConcurrencyHelpers
- import NIOCore
- // Unchecked because all mutable state is protected by a lock.
- extension PooledChannel: @unchecked Sendable {}
- @usableFromInline
- internal final class PoolManager {
- /// Configuration used for each connection pool.
- @usableFromInline
- internal struct PerPoolConfiguration {
- /// The maximum number of connections per pool.
- @usableFromInline
- var maxConnections: Int
- /// The maximum number of waiters per pool.
- @usableFromInline
- var maxWaiters: Int
- /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
- /// (assuming there is a connection available to start).
- @usableFromInline
- var loadThreshold: Double
- /// The assumed value of HTTP/2 'SETTINGS_MAX_CONCURRENT_STREAMS'.
- @usableFromInline
- var assumedMaxConcurrentStreams: Int
- /// The assumed maximum number of streams concurrently available in the pool.
- @usableFromInline
- var assumedStreamCapacity: Int {
- return self.maxConnections * self.assumedMaxConcurrentStreams
- }
- @usableFromInline
- var connectionBackoff: ConnectionBackoff
- /// A `Channel` provider.
- @usableFromInline
- var channelProvider: DefaultChannelProvider
- @usableFromInline
- var delegate: GRPCConnectionPoolDelegate?
- @usableFromInline
- internal init(
- maxConnections: Int,
- maxWaiters: Int,
- loadThreshold: Double,
- assumedMaxConcurrentStreams: Int = 100,
- connectionBackoff: ConnectionBackoff,
- channelProvider: DefaultChannelProvider,
- delegate: GRPCConnectionPoolDelegate?
- ) {
- self.maxConnections = maxConnections
- self.maxWaiters = maxWaiters
- self.loadThreshold = loadThreshold
- self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
- self.connectionBackoff = connectionBackoff
- self.channelProvider = channelProvider
- self.delegate = delegate
- }
- }
- /// Logging metadata keys
- private enum Metadata {
- /// The ID of the pool manager.
- static let id = "poolmanager.id"
- /// The number of managed connection pools.
- static let poolCount = "poolmanager.pools.count"
- /// The maximum number of connections per pool.
- static let connectionsPerPool = "poolmanager.pools.conns_per_pool"
- /// The maximum number of waiters per pool.
- static let waitersPerPool = "poolmanager.pools.waiters_per_pool"
- }
- /// The current state of the pool manager, `lock` must be held when accessing or
- /// modifying `state`.
- @usableFromInline
- internal var _state: PoolManagerStateMachine
- @usableFromInline
- internal var _pools: [ConnectionPool]
- @usableFromInline
- internal let lock = NIOLock()
- /// The `EventLoopGroup` providing `EventLoop`s for connection pools. Once initialized the manager
- /// will hold as many pools as there are loops in this `EventLoopGroup`.
- @usableFromInline
- internal let group: EventLoopGroup
- /// Make a new pool manager and initialize it.
- ///
- /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`.
- /// Each connection pool is configured using the `perPoolConfiguration`.
- ///
- /// - Parameters:
- /// - group: The `EventLoopGroup` providing `EventLoop`s to connections managed by the pool
- /// manager.
- /// - perPoolConfiguration: Configuration used by each connection pool managed by the manager.
- /// - logger: A logger.
- /// - Returns: An initialized pool manager.
- @usableFromInline
- internal static func makeInitializedPoolManager(
- using group: EventLoopGroup,
- perPoolConfiguration: PerPoolConfiguration,
- logger: GRPCLogger
- ) -> PoolManager {
- let manager = PoolManager(privateButUsableFromInline_group: group)
- manager.initialize(perPoolConfiguration: perPoolConfiguration, logger: logger)
- return manager
- }
- @usableFromInline
- internal init(privateButUsableFromInline_group group: EventLoopGroup) {
- self._state = PoolManagerStateMachine(.inactive)
- self._pools = []
- self.group = group
- // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In
- // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because
- // of that we'll only check when running in debug mode.
- debugOnly {
- let eventLoopIDs = group.makeIterator().map { ObjectIdentifier($0) }
- let uniqueEventLoopIDs = Set(eventLoopIDs)
- assert(
- eventLoopIDs.count == uniqueEventLoopIDs.count,
- "'group' contains non-unique event loops"
- )
- }
- }
- deinit {
- self.lock.withLock {
- assert(
- self._state.isShutdownOrShuttingDown,
- "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope."
- )
- }
- }
- /// Initialize the pool manager, create and initialize one connection pool per event loop in the
- /// pools `EventLoopGroup`.
- ///
- /// - Important: Must only be called once.
- /// - Parameters:
- /// - configuration: The configuration used for each connection pool.
- /// - logger: A logger.
- private func initialize(
- perPoolConfiguration configuration: PerPoolConfiguration,
- logger: GRPCLogger
- ) {
- var logger = logger
- logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
- let pools = self.makePools(perPoolConfiguration: configuration, logger: logger)
- logger.debug(
- "initializing connection pool manager",
- metadata: [
- Metadata.poolCount: "\(pools.count)",
- Metadata.connectionsPerPool: "\(configuration.maxConnections)",
- Metadata.waitersPerPool: "\(configuration.maxWaiters)",
- ]
- )
- // The assumed maximum number of streams concurrently available in each pool.
- let assumedCapacity = configuration.assumedStreamCapacity
- // The state machine stores the per-pool state keyed by the pools `EventLoopID` and tells the
- // pool manager about which pool to use/operate via the pools index in `self.pools`.
- let poolKeys = pools.indices.map { index in
- return ConnectionPoolKey(
- index: ConnectionPoolIndex(index),
- eventLoopID: pools[index].eventLoop.id
- )
- }
- self.lock.withLock {
- assert(self._pools.isEmpty)
- self._pools = pools
- // We'll blow up if we've already been initialized, that's fine, we don't allow callers to
- // call `initialize` directly.
- self._state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity)
- }
- for pool in pools {
- pool.initialize(connections: configuration.maxConnections)
- }
- }
- /// Make one pool per `EventLoop` in the pool's `EventLoopGroup`.
- /// - Parameters:
- /// - configuration: The configuration to make each pool with.
- /// - logger: A logger.
- /// - Returns: An array of `ConnectionPool`s.
- private func makePools(
- perPoolConfiguration configuration: PerPoolConfiguration,
- logger: GRPCLogger
- ) -> [ConnectionPool] {
- let eventLoops = self.group.makeIterator()
- return eventLoops.map { eventLoop in
- // We're creating a retain cycle here as each pool will reference the manager and the per-pool
- // state will hold the pool which will in turn be held by the pool manager. That's okay: when
- // the pool is shutdown the per-pool state and in turn each connection pool will be dropped.
- // and we'll break the cycle.
- return ConnectionPool(
- eventLoop: eventLoop,
- maxWaiters: configuration.maxWaiters,
- reservationLoadThreshold: configuration.loadThreshold,
- assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
- connectionBackoff: configuration.connectionBackoff,
- channelProvider: configuration.channelProvider,
- streamLender: self,
- delegate: configuration.delegate,
- logger: logger
- )
- }
- }
- // MARK: Stream Creation
- /// A future for a `Channel` from a managed connection pool. The `eventLoop` indicates the loop
- /// that the `Channel` is running on and therefore which event loop the RPC will use for its
- /// transport.
- @usableFromInline
- internal struct PooledStreamChannel {
- @inlinable
- internal init(futureResult: EventLoopFuture<Channel>) {
- self.futureResult = futureResult
- }
- /// The future `Channel`.
- @usableFromInline
- var futureResult: EventLoopFuture<Channel>
- /// The `EventLoop` that the `Channel` is using.
- @usableFromInline
- var eventLoop: EventLoop {
- return self.futureResult.eventLoop
- }
- }
- /// Make a stream and initialize it.
- ///
- /// - Parameters:
- /// - preferredEventLoop: The `EventLoop` that the stream should be created on, if possible. If
- /// a pool exists running this `EventLoop` then it will be chosen over all other pools,
- /// irregardless of the load on the pool. If no pool exists on the preferred `EventLoop` or
- /// no preference is given then the pool with the most streams available will be selected.
- /// The `EventLoop` of the selected pool will be the same as the `EventLoop` of
- /// the `EventLoopFuture<Channel>` returned from this call.
- /// - deadline: The point in time by which the stream must have been selected. If this deadline
- /// is passed then the returned `EventLoopFuture` will be failed.
- /// - logger: A logger.
- /// - initializer: A closure to initialize the `Channel` with.
- /// - Returns: A `PoolStreamChannel` indicating the future channel and `EventLoop` as that the
- /// `Channel` is using. The future will be failed if the pool manager has been shutdown,
- /// the deadline has passed before a stream was created or if the selected connection pool
- /// is unable to create a stream (if there is too much demand on that pool, for example).
- @inlinable
- internal func makeStream(
- preferredEventLoop: EventLoop?,
- deadline: NIODeadline,
- logger: GRPCLogger,
- streamInitializer initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
- ) -> PooledStreamChannel {
- let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) }
- let reservedPool = self.lock.withLock {
- return self._state.reserveStream(preferringPoolWithEventLoopID: preferredEventLoopID).map {
- return self._pools[$0.value]
- }
- }
- switch reservedPool {
- case let .success(pool):
- let channel = pool.makeStream(deadline: deadline, logger: logger, initializer: initializer)
- return PooledStreamChannel(futureResult: channel)
- case let .failure(error):
- let eventLoop = preferredEventLoop ?? self.group.next()
- return PooledStreamChannel(futureResult: eventLoop.makeFailedFuture(error))
- }
- }
- // MARK: Shutdown
- /// Shutdown the pool manager and all connection pools it manages.
- @usableFromInline
- internal func shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
- let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
- .withLock {
- let action = self._state.shutdown(promise: promise)
- switch action {
- case .shutdownPools:
- // Clear out the pools; we need to shut them down.
- let pools = self._pools
- self._pools.removeAll(keepingCapacity: true)
- return (action, pools)
- case .alreadyShutdown, .alreadyShuttingDown:
- return (action, nil)
- }
- }
- switch (action, pools) {
- case let (.shutdownPools, .some(pools)):
- promise.futureResult.whenComplete { _ in self.shutdownComplete() }
- EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise)
- case let (.alreadyShuttingDown(future), .none):
- promise.completeWith(future)
- case (.alreadyShutdown, .none):
- promise.succeed(())
- case (.shutdownPools, .none),
- (.alreadyShuttingDown, .some),
- (.alreadyShutdown, .some):
- preconditionFailure()
- }
- }
- private func shutdownComplete() {
- self.lock.withLock {
- self._state.shutdownComplete()
- }
- }
- }
- // MARK: - Connection Pool to Pool Manager
- extension PoolManager: StreamLender {
- @usableFromInline
- internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
- self.lock.withLock {
- self._state.returnStreams(count, toPoolOnEventLoopWithID: pool.eventLoop.id)
- }
- }
- @usableFromInline
- internal func changeStreamCapacity(by delta: Int, for pool: ConnectionPool) {
- self.lock.withLock {
- self._state.changeStreamCapacity(by: delta, forPoolOnEventLoopWithID: pool.eventLoop.id)
- }
- }
- }
- @usableFromInline
- internal enum PoolManagerError: Error {
- /// The pool manager has not been initialized yet.
- case notInitialized
- /// The pool manager has been shutdown or is in the process of shutting down.
- case shutdown
- }
|