| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIOCore
- import NIOPosix
- public enum GRPCChannelPool {
- /// Make a new ``GRPCChannel`` on which calls may be made to gRPC services.
- ///
- /// The channel is backed by one connection pool per event loop, each of which may make multiple
- /// connections to the given target. The size of the connection pool, and therefore the maximum
- /// number of connections it may create at a given time is determined by the number of event loops
- /// in the provided `EventLoopGroup` and the value of
- /// ``GRPCChannelPool/Configuration/ConnectionPool-swift.struct/connectionsPerEventLoop``.
- ///
- /// The event loop and therefore connection chosen for a call is determined by
- /// ``CallOptions/eventLoopPreference-swift.property``. If the `indifferent` preference is used
- /// then the least-used event loop is chosen and a connection on that event loop will be selected.
- /// If an `exact` preference is used then a connection on that event loop will be chosen provided
- /// the given event loop belongs to the `EventLoopGroup` used to create this ``GRPCChannel``.
- ///
- /// Each connection in the pool is initially idle, and no connections will be established until
- /// a call is made. The pool also closes connections after they have been inactive (i.e. are not
- /// being used for calls) for some period of time. This is determined by
- /// ``GRPCChannelPool/Configuration/idleTimeout``.
- ///
- /// > Important: The values of `transportSecurity` and `eventLoopGroup` **must** be compatible.
- /// >
- /// > For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/tls(_:)`` the allowed
- /// > `EventLoopGroup`s depends on the value of ``GRPCTLSConfiguration``. If a TLS configuration
- /// > is known ahead of time, ``PlatformSupport/makeEventLoopGroup(compatibleWith:loopCount:)``
- /// > may be used to construct a compatible `EventLoopGroup`.
- /// >
- /// > If the `EventLoopGroup` is known ahead of time then a default TLS configuration may be
- /// > constructed with ``GRPCTLSConfiguration/makeClientDefault(compatibleWith:)``.
- /// >
- /// > For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext`` transport
- /// > security both `MultiThreadedEventLoopGroup` and `NIOTSEventLoopGroup` (and `EventLoop`s
- /// > from either) may be used.
- ///
- /// - Parameters:
- /// - target: The target to connect to.
- /// - transportSecurity: Transport layer security for connections.
- /// - eventLoopGroup: The `EventLoopGroup` to run connections on.
- /// - configure: A closure which may be used to modify defaulted configuration before
- /// constructing the ``GRPCChannel``.
- /// - Throws: If it is not possible to construct an SSL context. This will never happen when
- /// using the ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext``
- /// transport security.
- /// - Returns: A ``GRPCChannel``.
- @inlinable
- public static func with(
- target: ConnectionTarget,
- transportSecurity: GRPCChannelPool.Configuration.TransportSecurity,
- eventLoopGroup: EventLoopGroup,
- _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
- ) throws -> GRPCChannel {
- let configuration = GRPCChannelPool.Configuration.with(
- target: target,
- transportSecurity: transportSecurity,
- eventLoopGroup: eventLoopGroup,
- configure
- )
- return try PooledChannel(configuration: configuration)
- }
- /// See ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)``.
- public static func with(
- configuration: GRPCChannelPool.Configuration
- ) throws -> GRPCChannel {
- return try PooledChannel(configuration: configuration)
- }
- }
- extension GRPCChannelPool {
- public struct Configuration: Sendable {
- @inlinable
- internal init(
- target: ConnectionTarget,
- transportSecurity: TransportSecurity,
- eventLoopGroup: EventLoopGroup
- ) {
- self.target = target
- self.transportSecurity = transportSecurity
- self.eventLoopGroup = eventLoopGroup
- }
- // Note: we use `configure` blocks to avoid having to add new initializers when properties are
- // added to the configuration while allowing the configuration to be constructed as a constant.
- /// Construct and configure a ``GRPCChannelPool/Configuration``.
- ///
- /// - Parameters:
- /// - target: The target to connect to.
- /// - transportSecurity: Transport layer security for connections. Note that the value of
- /// `eventLoopGroup` must be compatible with the value
- /// - eventLoopGroup: The `EventLoopGroup` to run connections on.
- /// - configure: A closure which may be used to modify defaulted configuration.
- @inlinable
- public static func with(
- target: ConnectionTarget,
- transportSecurity: TransportSecurity,
- eventLoopGroup: EventLoopGroup,
- _ configure: (inout Configuration) -> Void = { _ in }
- ) -> Configuration {
- var configuration = Configuration(
- target: target,
- transportSecurity: transportSecurity,
- eventLoopGroup: eventLoopGroup
- )
- configure(&configuration)
- return configuration
- }
- /// The target to connect to.
- public var target: ConnectionTarget
- /// Connection security.
- public var transportSecurity: TransportSecurity
- /// The `EventLoopGroup` used by the connection pool.
- public var eventLoopGroup: EventLoopGroup
- /// Connection pool configuration.
- public var connectionPool: ConnectionPool = .defaults
- /// HTTP/2 configuration.
- public var http2: HTTP2 = .defaults
- /// The connection backoff configuration.
- public var connectionBackoff = ConnectionBackoff()
- /// The amount of time to wait before closing the connection. The idle timeout will start only
- /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
- ///
- /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
- public var idleTimeout = TimeAmount.minutes(30)
- /// The connection keepalive configuration.
- public var keepalive = ClientConnectionKeepalive()
- /// The maximum size in bytes of a message which may be received from a server. Defaults to 4MB.
- ///
- /// Any received messages whose size exceeds this limit will cause RPCs to fail with
- /// a `.resourceExhausted` status code.
- public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 {
- willSet {
- precondition(newValue >= 0, "maximumReceiveMessageLength must be positive")
- }
- }
- /// A channel initializer which will be run after gRPC has initialized each `NIOCore.Channel`.
- /// This may be used to add additional handlers to the pipeline and is intended for debugging.
- ///
- /// - Warning: The initializer closure may be invoked *multiple times*.
- @preconcurrency
- public var debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
- /// An error delegate which is called when errors are caught.
- public var errorDelegate: ClientErrorDelegate?
- /// A delegate which will be notified about changes to the state of connections managed by the
- /// pool.
- public var delegate: GRPCConnectionPoolDelegate?
- /// A logger used for background activity, such as connection state changes.
- public var backgroundActivityLogger = Logger(
- label: "io.grpc",
- factory: { _ in
- return SwiftLogNoOpLogHandler()
- }
- )
- }
- }
- extension GRPCChannelPool.Configuration {
- public struct TransportSecurity: Sendable {
- private init(_ configuration: GRPCTLSConfiguration?) {
- self.tlsConfiguration = configuration
- }
- /// The TLS configuration used. A `nil` value means that no TLS will be used and
- /// communication at the transport layer will be plaintext.
- public var tlsConfiguration: Optional<GRPCTLSConfiguration>
- /// Secure the transport layer with TLS.
- ///
- /// The TLS backend used depends on the value of `configuration`. See ``GRPCTLSConfiguration``
- /// for more details.
- ///
- /// > Important: the value of `configuration` **must** be compatible with
- /// > ``GRPCChannelPool/Configuration/eventLoopGroup``. See the documentation of
- /// > ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)`` for more details.
- public static func tls(_ configuration: GRPCTLSConfiguration) -> TransportSecurity {
- return TransportSecurity(configuration)
- }
- /// Insecure plaintext communication.
- public static let plaintext = TransportSecurity(nil)
- }
- }
- extension GRPCChannelPool.Configuration {
- public struct HTTP2: Hashable, Sendable {
- private static let allowedTargetWindowSizes = (1 ... Int(Int32.max))
- private static let allowedMaxFrameSizes = (1 << 14) ... ((1 << 24) - 1)
- /// Default HTTP/2 configuration.
- public static let defaults = HTTP2()
- @inlinable
- public static func with(_ configure: (inout HTTP2) -> Void) -> HTTP2 {
- var configuration = Self.defaults
- configure(&configuration)
- return configuration
- }
- /// The HTTP/2 max frame size. Defaults to 8MB. Values are clamped between 2^14 and 2^24-1
- /// octets inclusive (RFC 7540 § 4.2).
- public var targetWindowSize = 8 * 1024 * 1024 {
- didSet {
- self.targetWindowSize = self.targetWindowSize.clamped(to: Self.allowedTargetWindowSizes)
- }
- }
- /// The HTTP/2 max frame size. Defaults to 16384. Value is clamped between 2^14 and 2^24-1
- /// octets inclusive (the minimum and maximum allowable values - HTTP/2 RFC 7540 4.2).
- public var maxFrameSize: Int = 16384 {
- didSet {
- self.maxFrameSize = self.maxFrameSize.clamped(to: Self.allowedMaxFrameSizes)
- }
- }
- }
- }
- extension GRPCChannelPool.Configuration {
- public struct ConnectionPool: Hashable, Sendable {
- /// Default connection pool configuration.
- public static let defaults = ConnectionPool()
- @inlinable
- public static func with(_ configure: (inout ConnectionPool) -> Void) -> ConnectionPool {
- var configuration = Self.defaults
- configure(&configuration)
- return configuration
- }
- /// The maximum number of connections per `EventLoop` that may be created at a given time.
- ///
- /// Defaults to 1.
- public var connectionsPerEventLoop: Int = 1
- /// The maximum number of callers which may be waiting for a stream at any given time on a
- /// given `EventLoop`.
- ///
- /// Any requests for a stream which would cause this limit to be exceeded will be failed
- /// immediately.
- ///
- /// Defaults to 100.
- public var maxWaitersPerEventLoop: Int = 100
- /// The maximum amount of time a caller is willing to wait for a stream for before timing out.
- ///
- /// Defaults to 30 seconds.
- public var maxWaitTime: TimeAmount = .seconds(30)
- /// The threshold which, if exceeded, when creating a stream determines whether the pool will
- /// establish another connection (if doing so will not violate ``connectionsPerEventLoop``).
- ///
- /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of
- /// waiters and the number of reserved streams) and the total number of streams which each
- /// thread _could support.
- public var reservationLoadThreshold: Double = 0.9
- }
- }
- /// The ID of a connection in the connection pool.
- public struct GRPCConnectionID: Hashable, Sendable, CustomStringConvertible {
- private let id: ConnectionManagerID
- public var description: String {
- return String(describing: self.id)
- }
- internal init(_ id: ConnectionManagerID) {
- self.id = id
- }
- }
- /// A delegate for the connection pool which is notified of various lifecycle events.
- ///
- /// All functions must execute quickly and may be executed on arbitrary threads. The implementor is
- /// responsible for ensuring thread safety.
- public protocol GRPCConnectionPoolDelegate: Sendable {
- /// A new connection was created with the given ID and added to the pool. The connection is not
- /// yet active (or connecting).
- ///
- /// In most cases ``startedConnecting(id:)`` will be the next function called for the given
- /// connection but ``connectionRemoved(id:)`` may also be called.
- func connectionAdded(id: GRPCConnectionID)
- /// The connection with the given ID was removed from the pool.
- func connectionRemoved(id: GRPCConnectionID)
- /// The connection with the given ID has started trying to establish a connection. The outcome
- /// of the connection will be reported as either ``connectSucceeded(id:streamCapacity:)`` or
- /// ``connectFailed(id:error:)``.
- func startedConnecting(id: GRPCConnectionID)
- /// A connection attempt failed with the given error. After some period of
- /// time ``startedConnecting(id:)`` may be called again.
- func connectFailed(id: GRPCConnectionID, error: Error)
- /// A connection was established on the connection with the given ID. `streamCapacity` streams are
- /// available to use on the connection. The maximum number of available streams may change over
- /// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The
- func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int)
- /// The utlization of the connection changed; a stream may have been used, returned or the
- /// maximum number of concurrent streams available on the connection changed.
- func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int)
- /// The remote peer is quiescing the connection: no new streams will be created on it. The
- /// connection will eventually be closed and removed from the pool.
- func connectionQuiescing(id: GRPCConnectionID)
- /// The connection was closed. The connection may be established again in the future (notified
- /// via ``startedConnecting(id:)``).
- func connectionClosed(id: GRPCConnectionID, error: Error?)
- }
|