| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIO
- import NIOHTTP2
- /// This struct models the state of an HTTP/2 connection and provides the means to indirectly track
- /// active and available HTTP/2 streams on that connection.
- ///
- /// The state -- once ready -- holds a multiplexer which it yields when an available 'token' is
- /// borrowed. One token corresponds to the creation of one HTTP/2 stream. The caller is responsible
- /// for later returning the token.
- internal struct HTTP2ConnectionState {
- /// An identifier for this pooled connection.
- internal let id: ObjectIdentifier
- /// Indicates whether the pooled connection is idle.
- internal var isIdle: Bool {
- return self.state.isIdle
- }
- /// The number of tokens currently available for this connection. `availableTokens` must be
- /// greater than zero for `borrowTokens` to be called.
- ///
- /// Note that it is also possible for `availableTokens` to be negative.
- internal var availableTokens: Int {
- switch self.state {
- case let .ready(ready):
- return ready.availableTokens
- case .idle, .connectingOrBackingOff:
- return 0
- }
- }
- /// The number of tokens currently borrowed from this connection.
- internal var borrowedTokens: Int {
- switch self.state {
- case let .ready(ready):
- return ready.borrowedTokens
- case .idle, .connectingOrBackingOff:
- return 0
- }
- }
- /// The state of the pooled connection.
- private var state: State
- private enum State {
- /// No connection has been asked for, there are no tokens available.
- case idle
- /// A connection attempt is underway or we may be waiting to attempt to connect again.
- case connectingOrBackingOff
- /// We have an active connection which may have tokens borrowed.
- case ready(ReadyState)
- /// Whether the state is `idle`.
- var isIdle: Bool {
- switch self {
- case .idle:
- return true
- case .connectingOrBackingOff, .ready:
- return false
- }
- }
- }
- private struct ReadyState {
- internal var multiplexer: HTTP2StreamMultiplexer
- internal var borrowedTokens: Int
- internal var tokenLimit: Int
- internal init(multiplexer: HTTP2StreamMultiplexer) {
- self.multiplexer = multiplexer
- self.borrowedTokens = 0
- // 100 is a common value for HTTP/2 SETTINGS_MAX_CONCURRENT_STREAMS so we assume this value
- // until we know better.
- self.tokenLimit = 100
- }
- internal var availableTokens: Int {
- return self.tokenLimit - self.borrowedTokens
- }
- internal mutating func borrowTokens(_ count: Int) -> (HTTP2StreamMultiplexer, Int) {
- self.borrowedTokens += count
- assert(self.borrowedTokens <= self.tokenLimit)
- return (self.multiplexer, self.borrowedTokens)
- }
- internal mutating func returnToken() {
- self.borrowedTokens -= 1
- assert(self.borrowedTokens >= 0)
- }
- internal mutating func updateTokenLimit(_ limit: Int) -> Int {
- let oldLimit = self.tokenLimit
- self.tokenLimit = limit
- return oldLimit
- }
- }
- internal init(connectionManagerID: ObjectIdentifier) {
- self.id = connectionManagerID
- self.state = .idle
- }
- // MARK: - Lease Management
- /// Borrow tokens from the pooled connection.
- ///
- /// Each borrowed token corresponds to the creation of one HTTP/2 stream using the multiplexer
- /// returned from this call. The caller must return each token once the stream is no longer
- /// required using `returnToken(multiplexerID:)` where `multiplexerID` is the `ObjectIdentifier`
- /// for the `HTTP2StreamMultiplexer` returned from this call.
- ///
- /// - Parameter tokensToBorrow: The number of tokens to borrow. This *must not*
- /// exceed `availableTokens`.
- /// - Returns: A tuple of the `HTTP2StreamMultiplexer` on which streams should be created and
- /// total number of tokens which have been borrowed from this connection.
- mutating func borrowTokens(_ tokensToBorrow: Int) -> (HTTP2StreamMultiplexer, Int) {
- switch self.state {
- case var .ready(ready):
- let result = ready.borrowTokens(tokensToBorrow)
- self.state = .ready(ready)
- return result
- case .idle, .connectingOrBackingOff:
- // `availableTokens` is zero for these two states and a precondition for calling this function
- // is that `tokensToBorrow` must not exceed the available tokens.
- preconditionFailure()
- }
- }
- /// Return a single token to the pooled connection.
- mutating func returnToken() {
- switch self.state {
- case var .ready(ready):
- ready.returnToken()
- self.state = .ready(ready)
- case .idle, .connectingOrBackingOff:
- // A token may have been returned after the connection dropped.
- ()
- }
- }
- /// Updates the maximum number of tokens a connection may vend at any given time and returns the
- /// previous limit.
- ///
- /// If the new limit is higher than the old limit then there may now be some tokens available
- /// (i.e. `availableTokens > 0`). If the new limit is lower than the old limit `availableTokens`
- /// will decrease and this connection may not have any available tokens.
- ///
- /// - Parameters:
- /// - newValue: The maximum number of tokens a connection may vend at a given time.
- /// - Returns: The previous token limit.
- mutating func updateMaximumTokens(_ newValue: Int) -> Int {
- switch self.state {
- case var .ready(ready):
- let oldLimit = ready.updateTokenLimit(newValue)
- self.state = .ready(ready)
- return oldLimit
- case .idle, .connectingOrBackingOff:
- preconditionFailure()
- }
- }
- /// Notify the state that a connection attempt is about to start.
- mutating func willStartConnecting() {
- switch self.state {
- case .idle, .ready:
- // We can start connecting from the 'ready' state again if the connection was dropped.
- self.state = .connectingOrBackingOff
- case .connectingOrBackingOff:
- preconditionFailure()
- }
- }
- /// The connection attempt succeeded.
- ///
- /// - Parameter multiplexer: The `HTTP2StreamMultiplexer` from the connection.
- mutating func connected(multiplexer: HTTP2StreamMultiplexer) {
- switch self.state {
- case .connectingOrBackingOff:
- self.state = .ready(ReadyState(multiplexer: multiplexer))
- case .idle, .ready:
- preconditionFailure()
- }
- }
- /// Notify the state of a change in connectivity from the guts of the connection (as emitted by
- /// the `ConnectivityStateDelegate`).
- ///
- /// - Parameter state: The new state.
- /// - Returns: Any action to perform as a result of the state change.
- mutating func connectivityStateChanged(to state: ConnectivityState) -> StateChangeAction {
- // We only care about a few transitions as we mostly rely on our own state transitions. Namely,
- // we care about a change from ready to transient failure (as we need to invalidate any borrowed
- // tokens and start a new connection). We also care about shutting down.
- switch (state, self.state) {
- case (.idle, _):
- // We always need to invalidate any state when the channel becomes idle again.
- self.state = .idle
- return .nothing
- case (.connecting, _),
- (.ready, _):
- // We may bounce between 'connecting' and 'transientFailure' when we're in
- // the 'connectingOrBackingOff', it's okay to ignore 'connecting' here.
- //
- // We never pay attention to receiving 'ready', rather we rely on 'connected(multiplexer:)'
- // instead.
- return .nothing
- case (.transientFailure, .ready):
- // If we're ready and hit a transient failure, we must start connecting again. We'll defer our
- // own state transition until 'willStartConnecting()' is called.
- return .startConnectingAgain
- case (.transientFailure, .idle),
- (.transientFailure, .connectingOrBackingOff):
- return .nothing
- case (.shutdown, _):
- // The connection has been shutdown. We shouldn't pay attention to it anymore.
- return .removeFromConnectionList
- }
- }
- internal enum StateChangeAction: Hashable {
- /// Do nothing.
- case nothing
- /// Remove the connection from the pooled connections, it has been shutdown.
- case removeFromConnectionList
- /// Check if any waiters exist for the connection.
- case checkWaiters
- /// The connection dropped: ask for a new one.
- case startConnectingAgain
- }
- }
|