浏览代码

HTTP/2 connection state and container (#1163)

Motivation:

In order to create a pool of connections we must be able to track the
state of HTTP/2 connections including how many streams exist and what the
connections limit is for stream concurrency.

Modifications:

- Add `HTTP2ConnectionState`, the state of an HTTP/2 connection which
  also provides a means to borrow and return streams from a connection.
- Add `HTTP2Connections`, a container for multiple `HTTP2ConnectionState`s.
  keyed by some `ObjectIdentifier` (in practice this will be the
  identifier of a `ConnectionManager` which manages the underlying
  `Channel` and provides a means to get a multiplexer).

Result:

- We can track the state of an http/2 connection including the number of
  concurrent streams.
- Some progress towards #1034
George Barnett 4 年之前
父节点
当前提交
651165657b

+ 258 - 0
Sources/GRPC/ConnectionPool/HTTP2ConnectionState.swift

@@ -0,0 +1,258 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+import NIOHTTP2
+
+/// This struct models the state of an HTTP/2 connection and provides the means to indirectly track
+/// active and available HTTP/2 streams on that connection.
+///
+/// The state -- once ready -- holds a multiplexer which it yields when an available 'token' is
+/// borrowed. One token corresponds to the creation of one HTTP/2 stream. The caller is responsible
+/// for later returning the token.
+internal struct HTTP2ConnectionState {
+  /// An identifier for this pooled connection.
+  internal let id: ObjectIdentifier
+
+  /// Indicates whether the pooled connection is idle.
+  internal var isIdle: Bool {
+    return self.state.isIdle
+  }
+
+  /// The number of tokens currently available for this connection. `availableTokens` must be
+  /// greater than zero for `borrowTokens` to be called.
+  ///
+  /// Note that it is also possible for `availableTokens` to be negative.
+  internal var availableTokens: Int {
+    switch self.state {
+    case let .ready(ready):
+      return ready.availableTokens
+    case .idle, .connectingOrBackingOff:
+      return 0
+    }
+  }
+
+  /// The number of tokens currently borrowed from this connection.
+  internal var borrowedTokens: Int {
+    switch self.state {
+    case let .ready(ready):
+      return ready.borrowedTokens
+    case .idle, .connectingOrBackingOff:
+      return 0
+    }
+  }
+
+  /// The state of the pooled connection.
+  private var state: State
+
+  private enum State {
+    /// No connection has been asked for, there are no tokens available.
+    case idle
+
+    /// A connection attempt is underway or we may be waiting to attempt to connect again.
+    case connectingOrBackingOff
+
+    /// We have an active connection which may have tokens borrowed.
+    case ready(ReadyState)
+
+    /// Whether the state is `idle`.
+    var isIdle: Bool {
+      switch self {
+      case .idle:
+        return true
+      case .connectingOrBackingOff, .ready:
+        return false
+      }
+    }
+  }
+
+  private struct ReadyState {
+    internal var multiplexer: HTTP2StreamMultiplexer
+    internal var borrowedTokens: Int
+    internal var tokenLimit: Int
+
+    internal init(multiplexer: HTTP2StreamMultiplexer) {
+      self.multiplexer = multiplexer
+      self.borrowedTokens = 0
+      // 100 is a common value for HTTP/2 SETTINGS_MAX_CONCURRENT_STREAMS so we assume this value
+      // until we know better.
+      self.tokenLimit = 100
+    }
+
+    internal var availableTokens: Int {
+      return self.tokenLimit - self.borrowedTokens
+    }
+
+    internal mutating func borrowTokens(_ count: Int) -> (HTTP2StreamMultiplexer, Int) {
+      self.borrowedTokens += count
+      assert(self.borrowedTokens <= self.tokenLimit)
+      return (self.multiplexer, self.borrowedTokens)
+    }
+
+    internal mutating func returnToken() {
+      self.borrowedTokens -= 1
+      assert(self.borrowedTokens >= 0)
+    }
+
+    internal mutating func updateTokenLimit(_ limit: Int) -> Int {
+      let oldLimit = self.tokenLimit
+      self.tokenLimit = limit
+      return oldLimit
+    }
+  }
+
+  internal init(connectionManagerID: ObjectIdentifier) {
+    self.id = connectionManagerID
+    self.state = .idle
+  }
+
+  // MARK: - Lease Management
+
+  /// Borrow tokens from the pooled connection.
+  ///
+  /// Each borrowed token corresponds to the creation of one HTTP/2 stream using the multiplexer
+  /// returned from this call. The caller must return each token once the stream is no longer
+  /// required using `returnToken(multiplexerID:)` where `multiplexerID` is the `ObjectIdentifier`
+  /// for the `HTTP2StreamMultiplexer` returned from this call.
+  ///
+  /// - Parameter tokensToBorrow: The number of tokens to borrow. This *must not*
+  ///     exceed `availableTokens`.
+  /// - Returns: A tuple of the `HTTP2StreamMultiplexer` on which streams should be created and
+  ///     total number of tokens which have been borrowed from this connection.
+  mutating func borrowTokens(_ tokensToBorrow: Int) -> (HTTP2StreamMultiplexer, Int) {
+    switch self.state {
+    case var .ready(ready):
+      let result = ready.borrowTokens(tokensToBorrow)
+      self.state = .ready(ready)
+      return result
+
+    case .idle, .connectingOrBackingOff:
+      // `availableTokens` is zero for these two states and a precondition for calling this function
+      // is that `tokensToBorrow` must not exceed the available tokens.
+      preconditionFailure()
+    }
+  }
+
+  /// Return a single token to the pooled connection.
+  mutating func returnToken() {
+    switch self.state {
+    case var .ready(ready):
+      ready.returnToken()
+      self.state = .ready(ready)
+
+    case .idle, .connectingOrBackingOff:
+      // A token may have been returned after the connection dropped.
+      ()
+    }
+  }
+
+  /// Updates the maximum number of tokens a connection may vend at any given time and returns the
+  /// previous limit.
+  ///
+  /// If the new limit is higher than the old limit then there may now be some tokens available
+  /// (i.e. `availableTokens > 0`). If the new limit is lower than the old limit `availableTokens`
+  /// will decrease and this connection may not have any available tokens.
+  ///
+  /// - Parameters:
+  ///   - newValue: The maximum number of tokens a connection may vend at a given time.
+  /// - Returns: The previous token limit.
+  mutating func updateMaximumTokens(_ newValue: Int) -> Int {
+    switch self.state {
+    case var .ready(ready):
+      let oldLimit = ready.updateTokenLimit(newValue)
+      self.state = .ready(ready)
+      return oldLimit
+
+    case .idle, .connectingOrBackingOff:
+      preconditionFailure()
+    }
+  }
+
+  /// Notify the state that a connection attempt is about to start.
+  mutating func willStartConnecting() {
+    switch self.state {
+    case .idle, .ready:
+      // We can start connecting from the 'ready' state again if the connection was dropped.
+      self.state = .connectingOrBackingOff
+
+    case .connectingOrBackingOff:
+      preconditionFailure()
+    }
+  }
+
+  /// The connection attempt succeeded.
+  ///
+  /// - Parameter multiplexer: The `HTTP2StreamMultiplexer` from the connection.
+  mutating func connected(multiplexer: HTTP2StreamMultiplexer) {
+    switch self.state {
+    case .connectingOrBackingOff:
+      self.state = .ready(ReadyState(multiplexer: multiplexer))
+
+    case .idle, .ready:
+      preconditionFailure()
+    }
+  }
+
+  /// Notify the state of a change in connectivity from the guts of the connection (as emitted by
+  /// the `ConnectivityStateDelegate`).
+  ///
+  /// - Parameter state: The new state.
+  /// - Returns: Any action to perform as a result of the state change.
+  mutating func connectivityStateChanged(to state: ConnectivityState) -> StateChangeAction {
+    // We only care about a few transitions as we mostly rely on our own state transitions. Namely,
+    // we care about a change from ready to transient failure (as we need to invalidate any borrowed
+    // tokens and start a new connection). We also care about shutting down.
+    switch (state, self.state) {
+    case (.idle, _):
+      // We always need to invalidate any state when the channel becomes idle again.
+      self.state = .idle
+      return .nothing
+
+    case (.connecting, _),
+         (.ready, _):
+      // We may bounce between 'connecting' and 'transientFailure' when we're in
+      // the 'connectingOrBackingOff', it's okay to ignore 'connecting' here.
+      //
+      // We never pay attention to receiving 'ready', rather we rely on 'connected(multiplexer:)'
+      // instead.
+      return .nothing
+
+    case (.transientFailure, .ready):
+      // If we're ready and hit a transient failure, we must start connecting again. We'll defer our
+      // own state transition until 'willStartConnecting()' is called.
+      return .startConnectingAgain
+
+    case (.transientFailure, .idle),
+         (.transientFailure, .connectingOrBackingOff):
+      return .nothing
+
+    case (.shutdown, _):
+      // The connection has been shutdown. We shouldn't pay attention to it anymore.
+      return .removeFromConnectionList
+    }
+  }
+
+  internal enum StateChangeAction: Hashable {
+    /// Do nothing.
+    case nothing
+    /// Remove the connection from the pooled connections, it has been shutdown.
+    case removeFromConnectionList
+    /// Check if any waiters exist for the connection.
+    case checkWaiters
+    /// The connection dropped: ask for a new one.
+    case startConnectingAgain
+  }
+}

+ 159 - 0
Sources/GRPC/ConnectionPool/HTTP2Connections.swift

@@ -0,0 +1,159 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+import NIOHTTP2
+
+internal struct HTTP2Connections {
+  // TODO: The number of connections is likely to be low and insertions and deletions should be
+  // infrequent. We may benefit from using an array and doing linear scans instead.
+  private var connections: [ObjectIdentifier: HTTP2ConnectionState]
+
+  /// Returns the number of connections.
+  internal var count: Int {
+    return self.connections.count
+  }
+
+  /// The maximum number of connections which may be stored.
+  private let capacity: Int
+
+  internal init(capacity: Int) {
+    self.connections = [:]
+    self.capacity = capacity
+    self.connections.reserveCapacity(capacity)
+  }
+
+  /// Insert a connection.
+  ///
+  /// - Important: A connection with the same `id` must not already exist in the collection, and
+  ///     a connection may only be inserted if the number of connections is less than its capacity.
+  /// - Parameter connection: The connection state to add.
+  internal mutating func insert(_ connection: HTTP2ConnectionState) {
+    assert(self.count < self.capacity)
+    let oldValue = self.connections.updateValue(connection, forKey: connection.id)
+    precondition(oldValue == nil)
+  }
+
+  /// Remove a connection with the given ID.
+  ///
+  /// - Parameter id: The ID of the connection to remove.
+  /// - Returns: The connection, if one matching the given ID was returned.
+  @discardableResult
+  internal mutating func removeConnection(withID id: ObjectIdentifier) -> HTTP2ConnectionState? {
+    return self.connections.removeValue(forKey: id)
+  }
+
+  /// Remove all connections
+  internal mutating func removeAll() {
+    self.connections.removeAll()
+  }
+
+  /// Returns the ID of the first connection matching the predicate, if one exists.
+  internal func firstConnectionID(
+    where predicate: (HTTP2ConnectionState) -> Bool
+  ) -> ObjectIdentifier? {
+    return self.connections.first { _, value in
+      predicate(value)
+    }?.key
+  }
+
+  // MARK: - Tokens
+
+  /// Returns the number of tokens available for the connection with the given ID.
+  ///
+  /// Only active connections may have tokens available, idle connections or those actively
+  /// connecting have zero tokens available.
+  ///
+  /// - Parameter id: The ID of the connection to return the number of available tokens for.
+  /// - Returns: The number of tokens available for the connection identified by the given `id`
+  ///     or `nil` if no such connection exists.
+  internal func availableTokensForConnection(withID id: ObjectIdentifier) -> Int? {
+    return self.connections[id]?.availableTokens
+  }
+
+  /// Borrow tokens from the connection identified by `id`.
+  ///
+  /// - Precondition: A connection must exist with the given `id`.
+  /// - Precondition: `count` must be greater than zero and must not exceed the tokens available for
+  ///     the connection.
+  /// - Parameters:
+  ///   - count: The number of tokens to borrow.
+  ///   - id: The `id` of the connection to borrow tokens from.
+  /// - Returns: The connection's HTTP/2 multiplexer and the total number of tokens currently
+  ///    borrowed from the connection.
+  internal mutating func borrowTokens(
+    _ count: Int,
+    fromConnectionWithID id: ObjectIdentifier
+  ) -> (HTTP2StreamMultiplexer, borrowedTokens: Int) {
+    return self.connections[id]!.borrowTokens(count)
+  }
+
+  /// Return a single token to the connection with the given identifier.
+  ///
+  /// - Parameter id: The `id` of the connection to return a token to.
+  internal mutating func returnTokenToConnection(withID id: ObjectIdentifier) {
+    self.connections[id]?.returnToken()
+  }
+
+  /// Update the maximum number of tokens a connection may lend at a given time.
+  ///
+  /// - Parameters:
+  ///   - maximumTokens: The maximum number of tokens the connection may vend,
+  ///   - id: The `id` of the connection the new limit applies to.
+  /// - Returns: The previous maximum token limit if the connection exists.
+  internal mutating func updateMaximumAvailableTokens(
+    _ maximumTokens: Int,
+    forConnectionWithID id: ObjectIdentifier
+  ) -> Int? {
+    return self.connections[id]?.updateMaximumTokens(maximumTokens)
+  }
+
+  /// Start connecting the connection with the given `id`.
+  ///
+  /// - Parameters:
+  ///   - id: The `id` of the connection to start.
+  ///   - multiplexerFactory: A closure which returns an `EventLoopFuture<HTTP2StreamMultiplexer>`.
+  ///   - onConnected: A closure to execute when the connection has successfully been established.
+  internal mutating func startConnection(
+    withID id: ObjectIdentifier,
+    http2StreamMultiplexerFactory multiplexerFactory: () -> EventLoopFuture<HTTP2StreamMultiplexer>,
+    whenConnected onConnected: @escaping (HTTP2StreamMultiplexer) -> Void
+  ) {
+    self.connections[id]?.willStartConnecting()
+    multiplexerFactory().whenSuccess(onConnected)
+  }
+
+  /// Update the state of the connection identified by `id` to 'ready'.
+  internal mutating func connectionIsReady(
+    withID id: ObjectIdentifier,
+    multiplexer: HTTP2StreamMultiplexer
+  ) {
+    self.connections[id]?.connected(multiplexer: multiplexer)
+  }
+
+  /// Update connectivity state of the connection identified by `id`.
+  ///
+  /// - Parameters:
+  ///   - state: The new state of the underlying connection.
+  ///   - id: The `id` of the connection whose state has changed.
+  /// - Returns: An action to perform as a result of the state change.
+  internal mutating func updateConnectivityState(
+    _ state: ConnectivityState,
+    forConnectionWithID id: ObjectIdentifier
+  ) -> HTTP2ConnectionState.StateChangeAction? {
+    return self.connections[id]?.connectivityStateChanged(to: state)
+  }
+}

+ 126 - 0
Tests/GRPCTests/ConnectionPool/HTTP2ConnectionStateTests.swift

@@ -0,0 +1,126 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@testable import GRPC
+import Logging
+import NIO
+import NIOHTTP2
+import XCTest
+
+final class HTTP2ConnectionStateTests: GRPCTestCase {
+  private final class Placeholder {}
+  private var placeholders: [Placeholder] = []
+
+  private let channel = EmbeddedChannel()
+  private var multiplexer: HTTP2StreamMultiplexer!
+
+  private var eventLoop: EmbeddedEventLoop {
+    return self.channel.embeddedEventLoop
+  }
+
+  override func setUp() {
+    super.setUp()
+    self.multiplexer = HTTP2StreamMultiplexer(
+      mode: .client,
+      channel: self.channel,
+      inboundStreamInitializer: nil
+    )
+  }
+
+  private func makeHTTP2ConnectionState() -> HTTP2ConnectionState {
+    let placeholder = Placeholder()
+    self.placeholders.append(placeholder)
+    return HTTP2ConnectionState(connectionManagerID: ObjectIdentifier(placeholder))
+  }
+
+  func testNewPooledConnection() {
+    let state = self.makeHTTP2ConnectionState()
+    XCTAssertEqual(state.availableTokens, 0)
+    XCTAssertEqual(state.borrowedTokens, 0)
+    XCTAssert(state.isIdle)
+  }
+
+  func testIdleToConnected() {
+    var state = self.makeHTTP2ConnectionState()
+    state.willStartConnecting()
+    XCTAssertEqual(state.availableTokens, 0)
+    XCTAssertFalse(state.isIdle)
+
+    state.connected(multiplexer: self.multiplexer)
+    // 100 is the default value
+    XCTAssertEqual(state.availableTokens, 100)
+
+    let newTokenLimit = 10
+    let oldLimit = state.updateMaximumTokens(newTokenLimit)
+    XCTAssertEqual(oldLimit, 100)
+    XCTAssertEqual(state.availableTokens, newTokenLimit)
+  }
+
+  func testBorrowAndReturnTokens() {
+    var state = self.makeHTTP2ConnectionState()
+
+    state.willStartConnecting()
+    state.connected(multiplexer: self.multiplexer)
+    _ = state.updateMaximumTokens(10)
+
+    XCTAssertEqual(state.availableTokens, 10)
+    XCTAssertEqual(state.borrowedTokens, 0)
+
+    _ = state.borrowTokens(1)
+    XCTAssertEqual(state.borrowedTokens, 1)
+    XCTAssertEqual(state.availableTokens, 9)
+
+    _ = state.borrowTokens(9)
+    XCTAssertEqual(state.borrowedTokens, 10)
+    XCTAssertEqual(state.availableTokens, 0)
+
+    state.returnToken()
+    XCTAssertEqual(state.borrowedTokens, 9)
+    XCTAssertEqual(state.availableTokens, 1)
+  }
+
+  func testConnectivityChanges() {
+    var state = self.makeHTTP2ConnectionState()
+
+    XCTAssert(state.isIdle)
+    XCTAssertEqual(state.connectivityStateChanged(to: .idle), .nothing)
+
+    state.willStartConnecting()
+    XCTAssertFalse(state.isIdle)
+
+    // No changes expected.
+    XCTAssertEqual(state.connectivityStateChanged(to: .connecting), .nothing)
+    XCTAssertEqual(state.connectivityStateChanged(to: .transientFailure), .nothing)
+    XCTAssertEqual(state.connectivityStateChanged(to: .connecting), .nothing)
+
+    // We do nothing on '.ready', instead we wait for '.connected(multiplexer:)' as our signal
+    // that we're actually ready (since it provides the 'HTTP2StreamMultiplexer'.
+    XCTAssertEqual(state.connectivityStateChanged(to: .ready), .nothing)
+
+    state.connected(multiplexer: self.multiplexer)
+    let readyState = state
+
+    // The connection dropped, so the multiplexer we hold is no longer valid, as such we need to ask
+    // for a new one.
+    XCTAssertEqual(state.connectivityStateChanged(to: .transientFailure), .startConnectingAgain)
+
+    // Restore the connection in the ready state.
+    state = readyState
+
+    // Shutdown: we'll drop the connection from the list, it's the end of the road for this
+    // connection.
+    XCTAssertEqual(state.connectivityStateChanged(to: .shutdown), .removeFromConnectionList)
+  }
+}

+ 136 - 0
Tests/GRPCTests/ConnectionPool/HTTP2ConnectionsTests.swift

@@ -0,0 +1,136 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@testable import GRPC
+import Logging
+import NIO
+import NIOHTTP2
+import XCTest
+
+final class HTTP2ConnectionsTests: GRPCTestCase {
+  private final class Placeholder {}
+  private var placeholders: [Placeholder] = []
+
+  private let eventLoop = EmbeddedEventLoop()
+
+  override func setUp() {
+    super.setUp()
+  }
+
+  private func makeID() -> ObjectIdentifier {
+    let placeholder = Placeholder()
+    self.placeholders.append(placeholder)
+    return ObjectIdentifier(placeholder)
+  }
+
+  private func makeConnectionState(withID id: ObjectIdentifier) -> HTTP2ConnectionState {
+    return HTTP2ConnectionState(connectionManagerID: id)
+  }
+
+  func testEmpty() {
+    var connections = HTTP2Connections(capacity: 5)
+    XCTAssertEqual(connections.count, 0)
+
+    XCTAssertNil(connections.availableTokensForConnection(withID: self.makeID()))
+    XCTAssertNil(connections.firstConnectionID(where: { _ in true }))
+    XCTAssertNil(connections.removeConnection(withID: self.makeID()))
+    XCTAssertNil(connections.updateConnectivityState(.shutdown, forConnectionWithID: self.makeID()))
+    XCTAssertNil(
+      connections.updateMaximumAvailableTokens(
+        .max,
+        forConnectionWithID: self.makeID()
+      )
+    )
+  }
+
+  func testInsertAndRemove() {
+    var connections = HTTP2Connections(capacity: 8)
+    let connection1 = self.makeConnectionState(withID: self.makeID())
+    let connection2 = self.makeConnectionState(withID: self.makeID())
+
+    connections.insert(connection1)
+    XCTAssertEqual(connections.count, 1)
+
+    connections.insert(connection2)
+    XCTAssertEqual(connections.count, 2)
+
+    let removed = connections.removeConnection(withID: connection1.id)
+    XCTAssertEqual(connections.count, 1)
+    XCTAssertEqual(removed?.id, connection1.id)
+
+    connections.insert(connection1)
+    XCTAssertEqual(connections.count, 2)
+
+    connections.removeAll()
+    XCTAssertEqual(connections.count, 0)
+  }
+
+  func testFirstConnectionIDWhere() {
+    var connections = HTTP2Connections(capacity: 8)
+    let connection1 = self.makeConnectionState(withID: self.makeID())
+    connections.insert(connection1)
+    let connection2 = self.makeConnectionState(withID: self.makeID())
+    connections.insert(connection2)
+
+    XCTAssertNil(connections.firstConnectionID(where: { _ in false }))
+    XCTAssertNil(connections.firstConnectionID(where: { $0.id == self.makeID() }))
+    XCTAssertEqual(
+      connections.firstConnectionID(where: { $0.id == connection1.id }),
+      connection1.id
+    )
+    XCTAssertNotNil(connections.firstConnectionID(where: { $0.isIdle }))
+  }
+
+  func testSetupBorrowAndReturn() throws {
+    var connections = HTTP2Connections(capacity: 8)
+    let connection = self.makeConnectionState(withID: self.makeID())
+    connections.insert(connection)
+
+    var multiplexers: [HTTP2StreamMultiplexer] = []
+    connections.startConnection(
+      withID: connection.id,
+      http2StreamMultiplexerFactory: {
+        let multiplexer = HTTP2StreamMultiplexer(
+          mode: .client,
+          channel: EmbeddedChannel(loop: self.eventLoop),
+          inboundStreamInitializer: nil
+        )
+        return self.eventLoop.makeSucceededFuture(multiplexer)
+      },
+      whenConnected: {
+        multiplexers.append($0)
+      }
+    )
+
+    // We have an embedded event loop, so we should already have a multiplexer and we can tell
+    // the connections about it.
+    XCTAssertEqual(multiplexers.count, 1)
+    connections.connectionIsReady(withID: connection.id, multiplexer: multiplexers[0])
+
+    // 100 is the default.
+    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 100)
+
+    // Borrow a token.
+    let (mux, borrowed) = connections.borrowTokens(1, fromConnectionWithID: connection.id)
+    // 1 token has been borrowed in total.
+    XCTAssertEqual(borrowed, 1)
+    XCTAssertTrue(mux === multiplexers[0])
+    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 99)
+
+    // Return a token.
+    connections.returnTokenToConnection(withID: connection.id)
+    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 100)
+  }
+}