GRPCChannelPool.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOPosix
  19. import struct Foundation.UUID
  20. #if canImport(Network)
  21. import Network
  22. #endif
  23. public enum GRPCChannelPool {
  24. /// Make a new ``GRPCChannel`` on which calls may be made to gRPC services.
  25. ///
  26. /// The channel is backed by one connection pool per event loop, each of which may make multiple
  27. /// connections to the given target. The size of the connection pool, and therefore the maximum
  28. /// number of connections it may create at a given time is determined by the number of event loops
  29. /// in the provided `EventLoopGroup` and the value of
  30. /// ``GRPCChannelPool/Configuration/ConnectionPool-swift.struct/connectionsPerEventLoop``.
  31. ///
  32. /// The event loop and therefore connection chosen for a call is determined by
  33. /// ``CallOptions/eventLoopPreference-swift.property``. If the `indifferent` preference is used
  34. /// then the least-used event loop is chosen and a connection on that event loop will be selected.
  35. /// If an `exact` preference is used then a connection on that event loop will be chosen provided
  36. /// the given event loop belongs to the `EventLoopGroup` used to create this ``GRPCChannel``.
  37. ///
  38. /// Each connection in the pool is initially idle, and no connections will be established until
  39. /// a call is made. The pool also closes connections after they have been inactive (i.e. are not
  40. /// being used for calls) for some period of time. This is determined by
  41. /// ``GRPCChannelPool/Configuration/idleTimeout``.
  42. ///
  43. /// > Important: The values of `transportSecurity` and `eventLoopGroup` **must** be compatible.
  44. /// >
  45. /// > For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/tls(_:)`` the allowed
  46. /// > `EventLoopGroup`s depends on the value of ``GRPCTLSConfiguration``. If a TLS configuration
  47. /// > is known ahead of time, ``PlatformSupport/makeEventLoopGroup(compatibleWith:loopCount:)``
  48. /// > may be used to construct a compatible `EventLoopGroup`.
  49. /// >
  50. /// > If the `EventLoopGroup` is known ahead of time then a default TLS configuration may be
  51. /// > constructed with ``GRPCTLSConfiguration/makeClientDefault(compatibleWith:)``.
  52. /// >
  53. /// > For ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext`` transport
  54. /// > security both `MultiThreadedEventLoopGroup` and `NIOTSEventLoopGroup` (and `EventLoop`s
  55. /// > from either) may be used.
  56. ///
  57. /// - Parameters:
  58. /// - target: The target to connect to.
  59. /// - transportSecurity: Transport layer security for connections.
  60. /// - eventLoopGroup: The `EventLoopGroup` to run connections on.
  61. /// - configure: A closure which may be used to modify defaulted configuration before
  62. /// constructing the ``GRPCChannel``.
  63. /// - Throws: If it is not possible to construct an SSL context. This will never happen when
  64. /// using the ``GRPCChannelPool/Configuration/TransportSecurity-swift.struct/plaintext``
  65. /// transport security.
  66. /// - Returns: A ``GRPCChannel``.
  67. @inlinable
  68. public static func with(
  69. target: ConnectionTarget,
  70. transportSecurity: GRPCChannelPool.Configuration.TransportSecurity,
  71. eventLoopGroup: EventLoopGroup,
  72. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  73. ) throws -> GRPCChannel {
  74. let configuration = GRPCChannelPool.Configuration.with(
  75. target: target,
  76. transportSecurity: transportSecurity,
  77. eventLoopGroup: eventLoopGroup,
  78. configure
  79. )
  80. return try PooledChannel(configuration: configuration)
  81. }
  82. /// See ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)``.
  83. public static func with(
  84. configuration: GRPCChannelPool.Configuration
  85. ) throws -> GRPCChannel {
  86. return try PooledChannel(configuration: configuration)
  87. }
  88. }
  89. extension GRPCChannelPool {
  90. public struct Configuration: Sendable {
  91. @inlinable
  92. internal init(
  93. target: ConnectionTarget,
  94. transportSecurity: TransportSecurity,
  95. eventLoopGroup: EventLoopGroup
  96. ) {
  97. self.target = target
  98. self.transportSecurity = transportSecurity
  99. self.eventLoopGroup = eventLoopGroup
  100. }
  101. // Note: we use `configure` blocks to avoid having to add new initializers when properties are
  102. // added to the configuration while allowing the configuration to be constructed as a constant.
  103. /// Construct and configure a ``GRPCChannelPool/Configuration``.
  104. ///
  105. /// - Parameters:
  106. /// - target: The target to connect to.
  107. /// - transportSecurity: Transport layer security for connections. Note that the value of
  108. /// `eventLoopGroup` must be compatible with the value
  109. /// - eventLoopGroup: The `EventLoopGroup` to run connections on.
  110. /// - configure: A closure which may be used to modify defaulted configuration.
  111. @inlinable
  112. public static func with(
  113. target: ConnectionTarget,
  114. transportSecurity: TransportSecurity,
  115. eventLoopGroup: EventLoopGroup,
  116. _ configure: (inout Configuration) -> Void = { _ in }
  117. ) -> Configuration {
  118. var configuration = Configuration(
  119. target: target,
  120. transportSecurity: transportSecurity,
  121. eventLoopGroup: eventLoopGroup
  122. )
  123. configure(&configuration)
  124. return configuration
  125. }
  126. /// The target to connect to.
  127. public var target: ConnectionTarget
  128. /// Connection security.
  129. public var transportSecurity: TransportSecurity
  130. /// The `EventLoopGroup` used by the connection pool.
  131. public var eventLoopGroup: EventLoopGroup
  132. /// Connection pool configuration.
  133. public var connectionPool: ConnectionPool = .defaults
  134. /// HTTP/2 configuration.
  135. public var http2: HTTP2 = .defaults
  136. /// The connection backoff configuration.
  137. public var connectionBackoff = ConnectionBackoff()
  138. /// The amount of time to wait before closing the connection. The idle timeout will start only
  139. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  140. ///
  141. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  142. public var idleTimeout = TimeAmount.minutes(30)
  143. /// The maximum allowed age of a connection.
  144. ///
  145. /// If set, no new RPCs will be started on the connection after the connection has been opened
  146. /// for this period of time. Existing RPCs will be allowed to continue and the connection will
  147. /// close once all RPCs on the connection have finished. If this isn't set then connections have
  148. /// no limit on their lifetime.
  149. public var maxConnectionAge: TimeAmount? = nil
  150. /// The connection keepalive configuration.
  151. public var keepalive = ClientConnectionKeepalive()
  152. /// The maximum size in bytes of a message which may be received from a server. Defaults to 4MB.
  153. ///
  154. /// Any received messages whose size exceeds this limit will cause RPCs to fail with
  155. /// a `.resourceExhausted` status code.
  156. public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 {
  157. willSet {
  158. precondition(newValue >= 0, "maximumReceiveMessageLength must be positive")
  159. }
  160. }
  161. /// A channel initializer which will be run after gRPC has initialized each `NIOCore.Channel`.
  162. /// This may be used to add additional handlers to the pipeline and is intended for debugging.
  163. ///
  164. /// - Warning: The initializer closure may be invoked *multiple times*.
  165. @preconcurrency
  166. public var debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
  167. /// An error delegate which is called when errors are caught.
  168. public var errorDelegate: ClientErrorDelegate?
  169. /// A delegate which will be notified about changes to the state of connections managed by the
  170. /// pool.
  171. public var delegate: GRPCConnectionPoolDelegate?
  172. /// The period at which connection pool stats are published to the ``delegate``.
  173. ///
  174. /// Ignored if either this value or ``delegate`` are `nil`.
  175. public var statsPeriod: TimeAmount?
  176. /// A logger used for background activity, such as connection state changes.
  177. public var backgroundActivityLogger = Logger(
  178. label: "io.grpc",
  179. factory: { _ in
  180. return SwiftLogNoOpLogHandler()
  181. }
  182. )
  183. #if canImport(Network)
  184. /// `TransportServices` related configuration. This will be ignored unless an appropriate event loop group
  185. /// (e.g. `NIOTSEventLoopGroup`) is used.
  186. public var transportServices: TransportServices = .defaults
  187. #endif
  188. }
  189. }
  190. extension GRPCChannelPool.Configuration {
  191. public struct TransportSecurity: Sendable {
  192. private init(_ configuration: GRPCTLSConfiguration?) {
  193. self.tlsConfiguration = configuration
  194. }
  195. /// The TLS configuration used. A `nil` value means that no TLS will be used and
  196. /// communication at the transport layer will be plaintext.
  197. public var tlsConfiguration: Optional<GRPCTLSConfiguration>
  198. /// Secure the transport layer with TLS.
  199. ///
  200. /// The TLS backend used depends on the value of `configuration`. See ``GRPCTLSConfiguration``
  201. /// for more details.
  202. ///
  203. /// > Important: the value of `configuration` **must** be compatible with
  204. /// > ``GRPCChannelPool/Configuration/eventLoopGroup``. See the documentation of
  205. /// > ``GRPCChannelPool/with(target:transportSecurity:eventLoopGroup:_:)`` for more details.
  206. public static func tls(_ configuration: GRPCTLSConfiguration) -> TransportSecurity {
  207. return TransportSecurity(configuration)
  208. }
  209. /// Insecure plaintext communication.
  210. public static let plaintext = TransportSecurity(nil)
  211. }
  212. }
  213. extension GRPCChannelPool.Configuration {
  214. public struct HTTP2: Hashable, Sendable {
  215. private static let allowedTargetWindowSizes = (1 ... Int(Int32.max))
  216. private static let allowedMaxFrameSizes = (1 << 14) ... ((1 << 24) - 1)
  217. /// Default HTTP/2 configuration.
  218. public static let defaults = HTTP2()
  219. @inlinable
  220. public static func with(_ configure: (inout HTTP2) -> Void) -> HTTP2 {
  221. var configuration = Self.defaults
  222. configure(&configuration)
  223. return configuration
  224. }
  225. /// The HTTP/2 max frame size. Defaults to 8MB. Values are clamped between 2^14 and 2^24-1
  226. /// octets inclusive (RFC 7540 § 4.2).
  227. public var targetWindowSize = 8 * 1024 * 1024 {
  228. didSet {
  229. self.targetWindowSize = self.targetWindowSize.clamped(to: Self.allowedTargetWindowSizes)
  230. }
  231. }
  232. /// The HTTP/2 max frame size. Defaults to 16384. Value is clamped between 2^14 and 2^24-1
  233. /// octets inclusive (the minimum and maximum allowable values - HTTP/2 RFC 7540 4.2).
  234. public var maxFrameSize: Int = 16384 {
  235. didSet {
  236. self.maxFrameSize = self.maxFrameSize.clamped(to: Self.allowedMaxFrameSizes)
  237. }
  238. }
  239. /// The HTTP/2 max number of reset streams. Defaults to 32. Must be non-negative.
  240. public var maxResetStreams: Int = 32 {
  241. willSet {
  242. precondition(newValue >= 0, "maxResetStreams must be non-negative")
  243. }
  244. }
  245. }
  246. }
  247. extension GRPCChannelPool.Configuration {
  248. public struct ConnectionPool: Hashable, Sendable {
  249. /// Default connection pool configuration.
  250. public static let defaults = ConnectionPool()
  251. @inlinable
  252. public static func with(_ configure: (inout ConnectionPool) -> Void) -> ConnectionPool {
  253. var configuration = Self.defaults
  254. configure(&configuration)
  255. return configuration
  256. }
  257. /// The maximum number of connections per `EventLoop` that may be created at a given time.
  258. ///
  259. /// Defaults to 1.
  260. public var connectionsPerEventLoop: Int = 1
  261. /// The maximum number of callers which may be waiting for a stream at any given time on a
  262. /// given `EventLoop`.
  263. ///
  264. /// Any requests for a stream which would cause this limit to be exceeded will be failed
  265. /// immediately.
  266. ///
  267. /// Defaults to 100.
  268. public var maxWaitersPerEventLoop: Int = 100
  269. /// The minimum number of connections to keep open in this pool, per EventLoop.
  270. /// This number of connections per EventLoop will never go idle and be closed.
  271. public var minConnectionsPerEventLoop: Int = 0
  272. /// The maximum amount of time a caller is willing to wait for a stream for before timing out.
  273. ///
  274. /// Defaults to 30 seconds.
  275. public var maxWaitTime: TimeAmount = .seconds(30)
  276. /// The threshold which, if exceeded, when creating a stream determines whether the pool will
  277. /// establish another connection (if doing so will not violate ``connectionsPerEventLoop``).
  278. ///
  279. /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of
  280. /// waiters and the number of reserved streams) and the total number of streams which each
  281. /// thread _could support.
  282. public var reservationLoadThreshold: Double = 0.9
  283. }
  284. }
  285. #if canImport(Network)
  286. extension GRPCChannelPool.Configuration {
  287. public struct TransportServices: Sendable {
  288. /// Default transport services configuration.
  289. public static let defaults = Self()
  290. @inlinable
  291. public static func with(_ configure: (inout Self) -> Void) -> Self {
  292. var configuration = Self.defaults
  293. configure(&configuration)
  294. return configuration
  295. }
  296. /// A closure allowing to customise the `NWParameters` used when establishing a connection using `NIOTransportServices`.
  297. @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
  298. public var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
  299. get {
  300. self._nwParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
  301. }
  302. set {
  303. self._nwParametersConfigurator = newValue
  304. }
  305. }
  306. private var _nwParametersConfigurator: (any Sendable)?
  307. }
  308. }
  309. #endif // canImport(Network)
  310. /// The ID of a connection in the connection pool.
  311. public struct GRPCConnectionID: Hashable, Sendable, CustomStringConvertible {
  312. private enum Value: Sendable, Hashable {
  313. case managerID(ConnectionManagerID)
  314. case uuid(UUID)
  315. }
  316. private let id: Value
  317. public var description: String {
  318. switch self.id {
  319. case .managerID(let id):
  320. return String(describing: id)
  321. case .uuid(let uuid):
  322. return String(describing: uuid)
  323. }
  324. }
  325. internal init(_ id: ConnectionManagerID) {
  326. self.id = .managerID(id)
  327. }
  328. /// Create a new unique connection ID.
  329. ///
  330. /// Normally you don't have to create connection IDs, gRPC will create them on your behalf.
  331. /// However creating them manually is useful when testing the ``GRPCConnectionPoolDelegate``.
  332. public init() {
  333. self.id = .uuid(UUID())
  334. }
  335. }
  336. /// A delegate for the connection pool which is notified of various lifecycle events.
  337. ///
  338. /// All functions must execute quickly and may be executed on arbitrary threads. The implementor is
  339. /// responsible for ensuring thread safety.
  340. public protocol GRPCConnectionPoolDelegate: Sendable {
  341. /// A new connection was created with the given ID and added to the pool. The connection is not
  342. /// yet active (or connecting).
  343. ///
  344. /// In most cases ``startedConnecting(id:)`` will be the next function called for the given
  345. /// connection but ``connectionRemoved(id:)`` may also be called.
  346. func connectionAdded(id: GRPCConnectionID)
  347. /// The connection with the given ID was removed from the pool.
  348. func connectionRemoved(id: GRPCConnectionID)
  349. /// The connection with the given ID has started trying to establish a connection. The outcome
  350. /// of the connection will be reported as either ``connectSucceeded(id:streamCapacity:)`` or
  351. /// ``connectFailed(id:error:)``.
  352. func startedConnecting(id: GRPCConnectionID)
  353. /// A connection attempt failed with the given error. After some period of
  354. /// time ``startedConnecting(id:)`` may be called again.
  355. func connectFailed(id: GRPCConnectionID, error: Error)
  356. /// A connection was established on the connection with the given ID. `streamCapacity` streams are
  357. /// available to use on the connection. The maximum number of available streams may change over
  358. /// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The
  359. func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int)
  360. /// The utilization of the connection changed; a stream may have been used, returned or the
  361. /// maximum number of concurrent streams available on the connection changed.
  362. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int)
  363. /// The remote peer is quiescing the connection: no new streams will be created on it. The
  364. /// connection will eventually be closed and removed from the pool.
  365. func connectionQuiescing(id: GRPCConnectionID)
  366. /// The connection was closed. The connection may be established again in the future (notified
  367. /// via ``startedConnecting(id:)``).
  368. func connectionClosed(id: GRPCConnectionID, error: Error?)
  369. /// Stats about the current state of the connection pool.
  370. ///
  371. /// Each ``GRPCConnectionPoolStats`` includes the stats for a sub-pool. Each sub-pool is tied
  372. /// to an `EventLoop`.
  373. ///
  374. /// Unlike the other delegate methods, this is called periodically based on the value
  375. /// of ``GRPCChannelPool/Configuration/statsPeriod``.
  376. func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID)
  377. }
  378. extension GRPCConnectionPoolDelegate {
  379. public func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) {
  380. // Default conformance to avoid breaking changes.
  381. }
  382. }
  383. public struct GRPCSubPoolStats: Sendable, Hashable {
  384. public struct ConnectionStates: Sendable, Hashable {
  385. /// The number of idle connections.
  386. public var idle: Int
  387. /// The number of connections trying to establish a connection.
  388. public var connecting: Int
  389. /// The number of connections which are ready to use.
  390. public var ready: Int
  391. /// The number of connections which are backing off waiting to attempt to connect.
  392. public var transientFailure: Int
  393. public init() {
  394. self.idle = 0
  395. self.connecting = 0
  396. self.ready = 0
  397. self.transientFailure = 0
  398. }
  399. }
  400. /// The ID of the subpool.
  401. public var id: GRPCSubPoolID
  402. /// Counts of connection states.
  403. public var connectionStates: ConnectionStates
  404. /// The number of streams currently being used.
  405. public var streamsInUse: Int
  406. /// The number of streams which are currently free to use.
  407. ///
  408. /// The sum of this value and `streamsInUse` gives the capacity of the pool.
  409. public var streamsFreeToUse: Int
  410. /// The number of RPCs currently waiting for a stream.
  411. ///
  412. /// RPCs waiting for a stream are also known as 'waiters'.
  413. public var rpcsWaiting: Int
  414. public init(id: GRPCSubPoolID) {
  415. self.id = id
  416. self.connectionStates = ConnectionStates()
  417. self.streamsInUse = 0
  418. self.streamsFreeToUse = 0
  419. self.rpcsWaiting = 0
  420. }
  421. }