Browse Source

Backup a couple of changes (#1168)

George Barnett 4 years ago
parent
commit
8c43144fd6

+ 0 - 258
Sources/GRPC/ConnectionPool/HTTP2ConnectionState.swift

@@ -1,258 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Logging
-import NIO
-import NIOHTTP2
-
-/// This struct models the state of an HTTP/2 connection and provides the means to indirectly track
-/// active and available HTTP/2 streams on that connection.
-///
-/// The state -- once ready -- holds a multiplexer which it yields when an available 'token' is
-/// borrowed. One token corresponds to the creation of one HTTP/2 stream. The caller is responsible
-/// for later returning the token.
-internal struct HTTP2ConnectionState {
-  /// An identifier for this pooled connection.
-  internal let id: ObjectIdentifier
-
-  /// Indicates whether the pooled connection is idle.
-  internal var isIdle: Bool {
-    return self.state.isIdle
-  }
-
-  /// The number of tokens currently available for this connection. `availableTokens` must be
-  /// greater than zero for `borrowTokens` to be called.
-  ///
-  /// Note that it is also possible for `availableTokens` to be negative.
-  internal var availableTokens: Int {
-    switch self.state {
-    case let .ready(ready):
-      return ready.availableTokens
-    case .idle, .connectingOrBackingOff:
-      return 0
-    }
-  }
-
-  /// The number of tokens currently borrowed from this connection.
-  internal var borrowedTokens: Int {
-    switch self.state {
-    case let .ready(ready):
-      return ready.borrowedTokens
-    case .idle, .connectingOrBackingOff:
-      return 0
-    }
-  }
-
-  /// The state of the pooled connection.
-  private var state: State
-
-  private enum State {
-    /// No connection has been asked for, there are no tokens available.
-    case idle
-
-    /// A connection attempt is underway or we may be waiting to attempt to connect again.
-    case connectingOrBackingOff
-
-    /// We have an active connection which may have tokens borrowed.
-    case ready(ReadyState)
-
-    /// Whether the state is `idle`.
-    var isIdle: Bool {
-      switch self {
-      case .idle:
-        return true
-      case .connectingOrBackingOff, .ready:
-        return false
-      }
-    }
-  }
-
-  private struct ReadyState {
-    internal var multiplexer: HTTP2StreamMultiplexer
-    internal var borrowedTokens: Int
-    internal var tokenLimit: Int
-
-    internal init(multiplexer: HTTP2StreamMultiplexer) {
-      self.multiplexer = multiplexer
-      self.borrowedTokens = 0
-      // 100 is a common value for HTTP/2 SETTINGS_MAX_CONCURRENT_STREAMS so we assume this value
-      // until we know better.
-      self.tokenLimit = 100
-    }
-
-    internal var availableTokens: Int {
-      return self.tokenLimit - self.borrowedTokens
-    }
-
-    internal mutating func borrowTokens(_ count: Int) -> (HTTP2StreamMultiplexer, Int) {
-      self.borrowedTokens += count
-      assert(self.borrowedTokens <= self.tokenLimit)
-      return (self.multiplexer, self.borrowedTokens)
-    }
-
-    internal mutating func returnToken() {
-      self.borrowedTokens -= 1
-      assert(self.borrowedTokens >= 0)
-    }
-
-    internal mutating func updateTokenLimit(_ limit: Int) -> Int {
-      let oldLimit = self.tokenLimit
-      self.tokenLimit = limit
-      return oldLimit
-    }
-  }
-
-  internal init(connectionManagerID: ObjectIdentifier) {
-    self.id = connectionManagerID
-    self.state = .idle
-  }
-
-  // MARK: - Lease Management
-
-  /// Borrow tokens from the pooled connection.
-  ///
-  /// Each borrowed token corresponds to the creation of one HTTP/2 stream using the multiplexer
-  /// returned from this call. The caller must return each token once the stream is no longer
-  /// required using `returnToken(multiplexerID:)` where `multiplexerID` is the `ObjectIdentifier`
-  /// for the `HTTP2StreamMultiplexer` returned from this call.
-  ///
-  /// - Parameter tokensToBorrow: The number of tokens to borrow. This *must not*
-  ///     exceed `availableTokens`.
-  /// - Returns: A tuple of the `HTTP2StreamMultiplexer` on which streams should be created and
-  ///     total number of tokens which have been borrowed from this connection.
-  mutating func borrowTokens(_ tokensToBorrow: Int) -> (HTTP2StreamMultiplexer, Int) {
-    switch self.state {
-    case var .ready(ready):
-      let result = ready.borrowTokens(tokensToBorrow)
-      self.state = .ready(ready)
-      return result
-
-    case .idle, .connectingOrBackingOff:
-      // `availableTokens` is zero for these two states and a precondition for calling this function
-      // is that `tokensToBorrow` must not exceed the available tokens.
-      preconditionFailure()
-    }
-  }
-
-  /// Return a single token to the pooled connection.
-  mutating func returnToken() {
-    switch self.state {
-    case var .ready(ready):
-      ready.returnToken()
-      self.state = .ready(ready)
-
-    case .idle, .connectingOrBackingOff:
-      // A token may have been returned after the connection dropped.
-      ()
-    }
-  }
-
-  /// Updates the maximum number of tokens a connection may vend at any given time and returns the
-  /// previous limit.
-  ///
-  /// If the new limit is higher than the old limit then there may now be some tokens available
-  /// (i.e. `availableTokens > 0`). If the new limit is lower than the old limit `availableTokens`
-  /// will decrease and this connection may not have any available tokens.
-  ///
-  /// - Parameters:
-  ///   - newValue: The maximum number of tokens a connection may vend at a given time.
-  /// - Returns: The previous token limit.
-  mutating func updateMaximumTokens(_ newValue: Int) -> Int {
-    switch self.state {
-    case var .ready(ready):
-      let oldLimit = ready.updateTokenLimit(newValue)
-      self.state = .ready(ready)
-      return oldLimit
-
-    case .idle, .connectingOrBackingOff:
-      preconditionFailure()
-    }
-  }
-
-  /// Notify the state that a connection attempt is about to start.
-  mutating func willStartConnecting() {
-    switch self.state {
-    case .idle, .ready:
-      // We can start connecting from the 'ready' state again if the connection was dropped.
-      self.state = .connectingOrBackingOff
-
-    case .connectingOrBackingOff:
-      preconditionFailure()
-    }
-  }
-
-  /// The connection attempt succeeded.
-  ///
-  /// - Parameter multiplexer: The `HTTP2StreamMultiplexer` from the connection.
-  mutating func connected(multiplexer: HTTP2StreamMultiplexer) {
-    switch self.state {
-    case .connectingOrBackingOff:
-      self.state = .ready(ReadyState(multiplexer: multiplexer))
-
-    case .idle, .ready:
-      preconditionFailure()
-    }
-  }
-
-  /// Notify the state of a change in connectivity from the guts of the connection (as emitted by
-  /// the `ConnectivityStateDelegate`).
-  ///
-  /// - Parameter state: The new state.
-  /// - Returns: Any action to perform as a result of the state change.
-  mutating func connectivityStateChanged(to state: ConnectivityState) -> StateChangeAction {
-    // We only care about a few transitions as we mostly rely on our own state transitions. Namely,
-    // we care about a change from ready to transient failure (as we need to invalidate any borrowed
-    // tokens and start a new connection). We also care about shutting down.
-    switch (state, self.state) {
-    case (.idle, _):
-      // We always need to invalidate any state when the channel becomes idle again.
-      self.state = .idle
-      return .nothing
-
-    case (.connecting, _),
-         (.ready, _):
-      // We may bounce between 'connecting' and 'transientFailure' when we're in
-      // the 'connectingOrBackingOff', it's okay to ignore 'connecting' here.
-      //
-      // We never pay attention to receiving 'ready', rather we rely on 'connected(multiplexer:)'
-      // instead.
-      return .nothing
-
-    case (.transientFailure, .ready):
-      // If we're ready and hit a transient failure, we must start connecting again. We'll defer our
-      // own state transition until 'willStartConnecting()' is called.
-      return .startConnectingAgain
-
-    case (.transientFailure, .idle),
-         (.transientFailure, .connectingOrBackingOff):
-      return .nothing
-
-    case (.shutdown, _):
-      // The connection has been shutdown. We shouldn't pay attention to it anymore.
-      return .removeFromConnectionList
-    }
-  }
-
-  internal enum StateChangeAction: Hashable {
-    /// Do nothing.
-    case nothing
-    /// Remove the connection from the pooled connections, it has been shutdown.
-    case removeFromConnectionList
-    /// Check if any waiters exist for the connection.
-    case checkWaiters
-    /// The connection dropped: ask for a new one.
-    case startConnectingAgain
-  }
-}

+ 0 - 159
Sources/GRPC/ConnectionPool/HTTP2Connections.swift

@@ -1,159 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Logging
-import NIO
-import NIOHTTP2
-
-internal struct HTTP2Connections {
-  // TODO: The number of connections is likely to be low and insertions and deletions should be
-  // infrequent. We may benefit from using an array and doing linear scans instead.
-  private var connections: [ObjectIdentifier: HTTP2ConnectionState]
-
-  /// Returns the number of connections.
-  internal var count: Int {
-    return self.connections.count
-  }
-
-  /// The maximum number of connections which may be stored.
-  private let capacity: Int
-
-  internal init(capacity: Int) {
-    self.connections = [:]
-    self.capacity = capacity
-    self.connections.reserveCapacity(capacity)
-  }
-
-  /// Insert a connection.
-  ///
-  /// - Important: A connection with the same `id` must not already exist in the collection, and
-  ///     a connection may only be inserted if the number of connections is less than its capacity.
-  /// - Parameter connection: The connection state to add.
-  internal mutating func insert(_ connection: HTTP2ConnectionState) {
-    assert(self.count < self.capacity)
-    let oldValue = self.connections.updateValue(connection, forKey: connection.id)
-    precondition(oldValue == nil)
-  }
-
-  /// Remove a connection with the given ID.
-  ///
-  /// - Parameter id: The ID of the connection to remove.
-  /// - Returns: The connection, if one matching the given ID was returned.
-  @discardableResult
-  internal mutating func removeConnection(withID id: ObjectIdentifier) -> HTTP2ConnectionState? {
-    return self.connections.removeValue(forKey: id)
-  }
-
-  /// Remove all connections
-  internal mutating func removeAll() {
-    self.connections.removeAll()
-  }
-
-  /// Returns the ID of the first connection matching the predicate, if one exists.
-  internal func firstConnectionID(
-    where predicate: (HTTP2ConnectionState) -> Bool
-  ) -> ObjectIdentifier? {
-    return self.connections.first { _, value in
-      predicate(value)
-    }?.key
-  }
-
-  // MARK: - Tokens
-
-  /// Returns the number of tokens available for the connection with the given ID.
-  ///
-  /// Only active connections may have tokens available, idle connections or those actively
-  /// connecting have zero tokens available.
-  ///
-  /// - Parameter id: The ID of the connection to return the number of available tokens for.
-  /// - Returns: The number of tokens available for the connection identified by the given `id`
-  ///     or `nil` if no such connection exists.
-  internal func availableTokensForConnection(withID id: ObjectIdentifier) -> Int? {
-    return self.connections[id]?.availableTokens
-  }
-
-  /// Borrow tokens from the connection identified by `id`.
-  ///
-  /// - Precondition: A connection must exist with the given `id`.
-  /// - Precondition: `count` must be greater than zero and must not exceed the tokens available for
-  ///     the connection.
-  /// - Parameters:
-  ///   - count: The number of tokens to borrow.
-  ///   - id: The `id` of the connection to borrow tokens from.
-  /// - Returns: The connection's HTTP/2 multiplexer and the total number of tokens currently
-  ///    borrowed from the connection.
-  internal mutating func borrowTokens(
-    _ count: Int,
-    fromConnectionWithID id: ObjectIdentifier
-  ) -> (HTTP2StreamMultiplexer, borrowedTokens: Int) {
-    return self.connections[id]!.borrowTokens(count)
-  }
-
-  /// Return a single token to the connection with the given identifier.
-  ///
-  /// - Parameter id: The `id` of the connection to return a token to.
-  internal mutating func returnTokenToConnection(withID id: ObjectIdentifier) {
-    self.connections[id]?.returnToken()
-  }
-
-  /// Update the maximum number of tokens a connection may lend at a given time.
-  ///
-  /// - Parameters:
-  ///   - maximumTokens: The maximum number of tokens the connection may vend,
-  ///   - id: The `id` of the connection the new limit applies to.
-  /// - Returns: The previous maximum token limit if the connection exists.
-  internal mutating func updateMaximumAvailableTokens(
-    _ maximumTokens: Int,
-    forConnectionWithID id: ObjectIdentifier
-  ) -> Int? {
-    return self.connections[id]?.updateMaximumTokens(maximumTokens)
-  }
-
-  /// Start connecting the connection with the given `id`.
-  ///
-  /// - Parameters:
-  ///   - id: The `id` of the connection to start.
-  ///   - multiplexerFactory: A closure which returns an `EventLoopFuture<HTTP2StreamMultiplexer>`.
-  ///   - onConnected: A closure to execute when the connection has successfully been established.
-  internal mutating func startConnection(
-    withID id: ObjectIdentifier,
-    http2StreamMultiplexerFactory multiplexerFactory: () -> EventLoopFuture<HTTP2StreamMultiplexer>,
-    whenConnected onConnected: @escaping (HTTP2StreamMultiplexer) -> Void
-  ) {
-    self.connections[id]?.willStartConnecting()
-    multiplexerFactory().whenSuccess(onConnected)
-  }
-
-  /// Update the state of the connection identified by `id` to 'ready'.
-  internal mutating func connectionIsReady(
-    withID id: ObjectIdentifier,
-    multiplexer: HTTP2StreamMultiplexer
-  ) {
-    self.connections[id]?.connected(multiplexer: multiplexer)
-  }
-
-  /// Update connectivity state of the connection identified by `id`.
-  ///
-  /// - Parameters:
-  ///   - state: The new state of the underlying connection.
-  ///   - id: The `id` of the connection whose state has changed.
-  /// - Returns: An action to perform as a result of the state change.
-  internal mutating func updateConnectivityState(
-    _ state: ConnectivityState,
-    forConnectionWithID id: ObjectIdentifier
-  ) -> HTTP2ConnectionState.StateChangeAction? {
-    return self.connections[id]?.connectivityStateChanged(to: state)
-  }
-}

+ 1 - 35
Sources/GRPC/ConnectivityState.swift

@@ -74,7 +74,6 @@ public class ConnectivityStateMonitor {
 
   private let delegateLock = Lock()
   private var _delegate: ConnectivityStateDelegate?
-  private var _http2Delegate: HTTP2ConnectionDelegate?
   private let delegateCallbackQueue: DispatchQueue
 
   /// Creates a new connectivity state monitor.
@@ -83,7 +82,7 @@ public class ConnectivityStateMonitor {
   /// - Parameter queue: The `DispatchQueue` on which the delegate will be called.
   init(delegate: ConnectivityStateDelegate?, queue: DispatchQueue?) {
     self._delegate = delegate
-    self.delegateCallbackQueue = DispatchQueue(label: "io.grpc.connectivity", target: queue)
+    self.delegateCallbackQueue = queue ?? DispatchQueue(label: "io.grpc.connectivity")
   }
 
   /// The current state of connectivity.
@@ -106,9 +105,7 @@ public class ConnectivityStateMonitor {
       }
     }
   }
-}
 
-extension ConnectivityStateMonitor {
   internal func updateState(to newValue: ConnectivityState, logger: Logger) {
     let change: (ConnectivityState, ConnectivityState)? = self.stateLock.withLock {
       let oldValue = self._state
@@ -143,34 +140,3 @@ extension ConnectivityStateMonitor {
     }
   }
 }
-
-extension ConnectivityStateMonitor: HTTP2ConnectionDelegate {
-  internal final var http2Delegate: HTTP2ConnectionDelegate? {
-    get {
-      return self.delegateLock.withLock {
-        return self._http2Delegate
-      }
-    }
-    set {
-      self.delegateLock.withLockVoid {
-        self._http2Delegate = newValue
-      }
-    }
-  }
-
-  internal final func streamClosed() {
-    if let delegate = self.http2Delegate {
-      self.delegateCallbackQueue.async {
-        delegate.streamClosed()
-      }
-    }
-  }
-
-  internal final func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
-    if let delegate = self.http2Delegate {
-      self.delegateCallbackQueue.async {
-        delegate.maxConcurrentStreamsChanged(maxConcurrentStreams)
-      }
-    }
-  }
-}

+ 0 - 9
Sources/GRPC/GRPCIdleHandler.swift

@@ -130,12 +130,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
       }
     }
 
-    // Max concurrent streams changed.
-    if let manager = self.mode.connectionManager,
-      let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
-      manager.monitor.maxConcurrentStreamsChanged(maxConcurrentStreams)
-    }
-
     // Handle idle timeout creation/cancellation.
     if let idleTask = operations.idleTask {
       switch idleTask {
@@ -228,9 +222,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     } else if let closed = event as? StreamClosedEvent {
       self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
       self.handlePingAction(self.pingHandler.streamClosed())
-      if let manager = self.mode.connectionManager {
-        manager.monitor.streamClosed()
-      }
       context.fireUserInboundEventTriggered(event)
     } else if event is ChannelShouldQuiesceEvent {
       self.perform(operations: self.stateMachine.initiateGracefulShutdown())

+ 0 - 9
Sources/GRPC/GRPCIdleHandlerStateMachine.swift

@@ -186,9 +186,6 @@ struct GRPCIdleHandlerStateMachine {
     /// Whether the channel should be closed.
     private(set) var shouldCloseChannel: Bool
 
-    /// The new value of 'SETTINGS_MAX_CONCURRENT_STREAMS'.
-    private(set) var maxConcurrentStreamsChange: Int?
-
     fileprivate static let none = Operations()
 
     fileprivate mutating func sendGoAwayFrame(lastPeerInitiatedStreamID streamID: HTTP2StreamID) {
@@ -211,10 +208,6 @@ struct GRPCIdleHandlerStateMachine {
       self.connectionManagerEvent = event
     }
 
-    fileprivate mutating func maxConcurrentStreamsChanged(_ count: Int) {
-      self.maxConcurrentStreamsChange = count
-    }
-
     private init() {
       self.connectionManagerEvent = nil
       self.idleTask = nil
@@ -509,7 +502,6 @@ struct GRPCIdleHandlerStateMachine {
 
       // Update max concurrent streams.
       if let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value {
-        operations.maxConcurrentStreamsChanged(maxStreams)
         state.maxConcurrentStreams = maxStreams
       }
       self.state = .operating(state)
@@ -517,7 +509,6 @@ struct GRPCIdleHandlerStateMachine {
     case var .waitingToIdle(state):
       // Update max concurrent streams.
       if let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value {
-        operations.maxConcurrentStreamsChanged(maxStreams)
         state.maxConcurrentStreams = maxStreams
       }
       self.state = .waitingToIdle(state)

+ 0 - 26
Sources/GRPC/HTTP2ConnectionDelegate.swift

@@ -1,26 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import NIO
-import NIOHTTP2
-
-internal protocol HTTP2ConnectionDelegate {
-  /// An HTTP/2 stream was closed.
-  func streamClosed()
-
-  /// The value of 'SETTINGS_MAX_CONCURRENT_STREAMS' changed.
-  /// - Parameter maxConcurrentStreams: The new value for 'SETTINGS_MAX_CONCURRENT_STREAMS'.
-  func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int)
-}

+ 0 - 126
Tests/GRPCTests/ConnectionPool/HTTP2ConnectionStateTests.swift

@@ -1,126 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@testable import GRPC
-import Logging
-import NIO
-import NIOHTTP2
-import XCTest
-
-final class HTTP2ConnectionStateTests: GRPCTestCase {
-  private final class Placeholder {}
-  private var placeholders: [Placeholder] = []
-
-  private let channel = EmbeddedChannel()
-  private var multiplexer: HTTP2StreamMultiplexer!
-
-  private var eventLoop: EmbeddedEventLoop {
-    return self.channel.embeddedEventLoop
-  }
-
-  override func setUp() {
-    super.setUp()
-    self.multiplexer = HTTP2StreamMultiplexer(
-      mode: .client,
-      channel: self.channel,
-      inboundStreamInitializer: nil
-    )
-  }
-
-  private func makeHTTP2ConnectionState() -> HTTP2ConnectionState {
-    let placeholder = Placeholder()
-    self.placeholders.append(placeholder)
-    return HTTP2ConnectionState(connectionManagerID: ObjectIdentifier(placeholder))
-  }
-
-  func testNewPooledConnection() {
-    let state = self.makeHTTP2ConnectionState()
-    XCTAssertEqual(state.availableTokens, 0)
-    XCTAssertEqual(state.borrowedTokens, 0)
-    XCTAssert(state.isIdle)
-  }
-
-  func testIdleToConnected() {
-    var state = self.makeHTTP2ConnectionState()
-    state.willStartConnecting()
-    XCTAssertEqual(state.availableTokens, 0)
-    XCTAssertFalse(state.isIdle)
-
-    state.connected(multiplexer: self.multiplexer)
-    // 100 is the default value
-    XCTAssertEqual(state.availableTokens, 100)
-
-    let newTokenLimit = 10
-    let oldLimit = state.updateMaximumTokens(newTokenLimit)
-    XCTAssertEqual(oldLimit, 100)
-    XCTAssertEqual(state.availableTokens, newTokenLimit)
-  }
-
-  func testBorrowAndReturnTokens() {
-    var state = self.makeHTTP2ConnectionState()
-
-    state.willStartConnecting()
-    state.connected(multiplexer: self.multiplexer)
-    _ = state.updateMaximumTokens(10)
-
-    XCTAssertEqual(state.availableTokens, 10)
-    XCTAssertEqual(state.borrowedTokens, 0)
-
-    _ = state.borrowTokens(1)
-    XCTAssertEqual(state.borrowedTokens, 1)
-    XCTAssertEqual(state.availableTokens, 9)
-
-    _ = state.borrowTokens(9)
-    XCTAssertEqual(state.borrowedTokens, 10)
-    XCTAssertEqual(state.availableTokens, 0)
-
-    state.returnToken()
-    XCTAssertEqual(state.borrowedTokens, 9)
-    XCTAssertEqual(state.availableTokens, 1)
-  }
-
-  func testConnectivityChanges() {
-    var state = self.makeHTTP2ConnectionState()
-
-    XCTAssert(state.isIdle)
-    XCTAssertEqual(state.connectivityStateChanged(to: .idle), .nothing)
-
-    state.willStartConnecting()
-    XCTAssertFalse(state.isIdle)
-
-    // No changes expected.
-    XCTAssertEqual(state.connectivityStateChanged(to: .connecting), .nothing)
-    XCTAssertEqual(state.connectivityStateChanged(to: .transientFailure), .nothing)
-    XCTAssertEqual(state.connectivityStateChanged(to: .connecting), .nothing)
-
-    // We do nothing on '.ready', instead we wait for '.connected(multiplexer:)' as our signal
-    // that we're actually ready (since it provides the 'HTTP2StreamMultiplexer'.
-    XCTAssertEqual(state.connectivityStateChanged(to: .ready), .nothing)
-
-    state.connected(multiplexer: self.multiplexer)
-    let readyState = state
-
-    // The connection dropped, so the multiplexer we hold is no longer valid, as such we need to ask
-    // for a new one.
-    XCTAssertEqual(state.connectivityStateChanged(to: .transientFailure), .startConnectingAgain)
-
-    // Restore the connection in the ready state.
-    state = readyState
-
-    // Shutdown: we'll drop the connection from the list, it's the end of the road for this
-    // connection.
-    XCTAssertEqual(state.connectivityStateChanged(to: .shutdown), .removeFromConnectionList)
-  }
-}

+ 0 - 136
Tests/GRPCTests/ConnectionPool/HTTP2ConnectionsTests.swift

@@ -1,136 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@testable import GRPC
-import Logging
-import NIO
-import NIOHTTP2
-import XCTest
-
-final class HTTP2ConnectionsTests: GRPCTestCase {
-  private final class Placeholder {}
-  private var placeholders: [Placeholder] = []
-
-  private let eventLoop = EmbeddedEventLoop()
-
-  override func setUp() {
-    super.setUp()
-  }
-
-  private func makeID() -> ObjectIdentifier {
-    let placeholder = Placeholder()
-    self.placeholders.append(placeholder)
-    return ObjectIdentifier(placeholder)
-  }
-
-  private func makeConnectionState(withID id: ObjectIdentifier) -> HTTP2ConnectionState {
-    return HTTP2ConnectionState(connectionManagerID: id)
-  }
-
-  func testEmpty() {
-    var connections = HTTP2Connections(capacity: 5)
-    XCTAssertEqual(connections.count, 0)
-
-    XCTAssertNil(connections.availableTokensForConnection(withID: self.makeID()))
-    XCTAssertNil(connections.firstConnectionID(where: { _ in true }))
-    XCTAssertNil(connections.removeConnection(withID: self.makeID()))
-    XCTAssertNil(connections.updateConnectivityState(.shutdown, forConnectionWithID: self.makeID()))
-    XCTAssertNil(
-      connections.updateMaximumAvailableTokens(
-        .max,
-        forConnectionWithID: self.makeID()
-      )
-    )
-  }
-
-  func testInsertAndRemove() {
-    var connections = HTTP2Connections(capacity: 8)
-    let connection1 = self.makeConnectionState(withID: self.makeID())
-    let connection2 = self.makeConnectionState(withID: self.makeID())
-
-    connections.insert(connection1)
-    XCTAssertEqual(connections.count, 1)
-
-    connections.insert(connection2)
-    XCTAssertEqual(connections.count, 2)
-
-    let removed = connections.removeConnection(withID: connection1.id)
-    XCTAssertEqual(connections.count, 1)
-    XCTAssertEqual(removed?.id, connection1.id)
-
-    connections.insert(connection1)
-    XCTAssertEqual(connections.count, 2)
-
-    connections.removeAll()
-    XCTAssertEqual(connections.count, 0)
-  }
-
-  func testFirstConnectionIDWhere() {
-    var connections = HTTP2Connections(capacity: 8)
-    let connection1 = self.makeConnectionState(withID: self.makeID())
-    connections.insert(connection1)
-    let connection2 = self.makeConnectionState(withID: self.makeID())
-    connections.insert(connection2)
-
-    XCTAssertNil(connections.firstConnectionID(where: { _ in false }))
-    XCTAssertNil(connections.firstConnectionID(where: { $0.id == self.makeID() }))
-    XCTAssertEqual(
-      connections.firstConnectionID(where: { $0.id == connection1.id }),
-      connection1.id
-    )
-    XCTAssertNotNil(connections.firstConnectionID(where: { $0.isIdle }))
-  }
-
-  func testSetupBorrowAndReturn() throws {
-    var connections = HTTP2Connections(capacity: 8)
-    let connection = self.makeConnectionState(withID: self.makeID())
-    connections.insert(connection)
-
-    var multiplexers: [HTTP2StreamMultiplexer] = []
-    connections.startConnection(
-      withID: connection.id,
-      http2StreamMultiplexerFactory: {
-        let multiplexer = HTTP2StreamMultiplexer(
-          mode: .client,
-          channel: EmbeddedChannel(loop: self.eventLoop),
-          inboundStreamInitializer: nil
-        )
-        return self.eventLoop.makeSucceededFuture(multiplexer)
-      },
-      whenConnected: {
-        multiplexers.append($0)
-      }
-    )
-
-    // We have an embedded event loop, so we should already have a multiplexer and we can tell
-    // the connections about it.
-    XCTAssertEqual(multiplexers.count, 1)
-    connections.connectionIsReady(withID: connection.id, multiplexer: multiplexers[0])
-
-    // 100 is the default.
-    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 100)
-
-    // Borrow a token.
-    let (mux, borrowed) = connections.borrowTokens(1, fromConnectionWithID: connection.id)
-    // 1 token has been borrowed in total.
-    XCTAssertEqual(borrowed, 1)
-    XCTAssertTrue(mux === multiplexers[0])
-    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 99)
-
-    // Return a token.
-    connections.returnTokenToConnection(withID: connection.id)
-    XCTAssertEqual(connections.availableTokensForConnection(withID: connection.id), 100)
-  }
-}

+ 0 - 31
Tests/GRPCTests/ConnectivityStateMonitorTests.swift

@@ -43,35 +43,4 @@ class ConnectivityStateMonitorTests: GRPCTestCase {
 
     recorder.waitForExpectedChanges(timeout: .seconds(1))
   }
-
-  func testHTTP2Delegate() {
-    let http2Delegate = RecordingHTTP2Delegate()
-    let queue = DispatchQueue(label: "io.grpc.testing")
-    let monitor = ConnectivityStateMonitor(delegate: nil, queue: queue)
-    monitor.http2Delegate = http2Delegate
-
-    monitor.streamClosed()
-    monitor.streamClosed()
-    monitor.streamClosed()
-
-    monitor.maxConcurrentStreamsChanged(31)
-    monitor.maxConcurrentStreamsChanged(41)
-    monitor.maxConcurrentStreamsChanged(49)
-
-    XCTAssertEqual(queue.sync { http2Delegate.streamsClosed }, 3)
-    XCTAssertEqual(queue.sync { http2Delegate.maxConcurrentStreamsChanges }, [31, 41, 49])
-  }
-}
-
-internal final class RecordingHTTP2Delegate: HTTP2ConnectionDelegate {
-  private(set) var streamsClosed = 0
-  private(set) var maxConcurrentStreamsChanges: [Int] = []
-
-  internal func streamClosed() {
-    self.streamsClosed += 1
-  }
-
-  internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
-    self.maxConcurrentStreamsChanges.append(maxConcurrentStreams)
-  }
 }

+ 0 - 68
Tests/GRPCTests/HTTP2DelegateTests.swift

@@ -1,68 +0,0 @@
-/*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import EchoImplementation
-import EchoModel
-@testable import GRPC
-import NIO
-import XCTest
-
-final class HTTP2ConnectionDelegateTests: GRPCTestCase {
-  private var group: EventLoopGroup!
-  private var server: Server!
-  private var connection: ClientConnection!
-  private let queue = DispatchQueue(label: "io.grpc.testing")
-
-  override func setUp() {
-    super.setUp()
-    self.group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
-    self.server = try! Server.insecure(group: self.group)
-      .withLogger(self.serverLogger)
-      .withServiceProviders([EchoProvider()])
-      .bind(host: "127.0.0.1", port: 0)
-      .wait()
-
-    self.connection = ClientConnection.insecure(group: self.group)
-      .withBackgroundActivityLogger(self.clientLogger)
-      // The http/2 delegate is internal but uses the same queue as the connectivity state delegate,
-      // so this looks odd but is fine.
-      .withConnectivityStateDelegate(nil, executingOn: self.queue)
-      .connect(host: "127.0.0.1", port: self.server!.channel.localAddress!.port!)
-  }
-
-  override func tearDown() {
-    XCTAssertNoThrow(try self.connection.close().wait())
-    XCTAssertNoThrow(try self.server.close().wait())
-    XCTAssertNoThrow(try self.group.syncShutdownGracefully())
-    super.tearDown()
-  }
-
-  func testDelegate() {
-    let http2Delegate = RecordingHTTP2Delegate()
-    self.connection.connectivity.http2Delegate = http2Delegate
-
-    let echo = Echo_EchoClient(channel: self.connection)
-
-    // Fire off a handful of RPCs.
-    for _ in 0 ..< 10 {
-      let get = echo.get(.with { $0.text = "" })
-      XCTAssertNoThrow(try get.status.wait())
-    }
-
-    // 10 RPCs, 10 streams closed.
-    XCTAssertEqual(self.queue.sync { http2Delegate.streamsClosed }, 10)
-    XCTAssertEqual(self.queue.sync { http2Delegate.maxConcurrentStreamsChanges }, [100])
-  }
-}