|
|
@@ -21,64 +21,6 @@ import NIO
|
|
|
import XCTest
|
|
|
import NIOConcurrencyHelpers
|
|
|
|
|
|
-class ConnectivityStateCollectionDelegate: ConnectivityStateDelegate {
|
|
|
- private var _states: [ConnectivityState] = []
|
|
|
- private var lock = Lock()
|
|
|
-
|
|
|
- var states: [ConnectivityState] {
|
|
|
- get {
|
|
|
- return self.lock.withLock {
|
|
|
- return self._states
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- func clearStates() -> [ConnectivityState] {
|
|
|
- self.lock.lock()
|
|
|
- defer {
|
|
|
- self._states.removeAll()
|
|
|
- self.lock.unlock()
|
|
|
- }
|
|
|
- return self._states
|
|
|
- }
|
|
|
-
|
|
|
- private var _expectations: [ConnectivityState: XCTestExpectation] = [:]
|
|
|
-
|
|
|
- var expectations: [ConnectivityState: XCTestExpectation] {
|
|
|
- get {
|
|
|
- return self.lock.withLock {
|
|
|
- self._expectations
|
|
|
- }
|
|
|
- }
|
|
|
- set {
|
|
|
- self.lock.withLockVoid {
|
|
|
- self._expectations = newValue
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- init(
|
|
|
- idle: XCTestExpectation? = nil,
|
|
|
- connecting: XCTestExpectation? = nil,
|
|
|
- ready: XCTestExpectation? = nil,
|
|
|
- transientFailure: XCTestExpectation? = nil,
|
|
|
- shutdown: XCTestExpectation? = nil
|
|
|
- ) {
|
|
|
- self.expectations[.idle] = idle
|
|
|
- self.expectations[.connecting] = connecting
|
|
|
- self.expectations[.ready] = ready
|
|
|
- self.expectations[.transientFailure] = transientFailure
|
|
|
- self.expectations[.shutdown] = shutdown
|
|
|
- }
|
|
|
-
|
|
|
- func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
|
|
|
- self.lock.withLockVoid {
|
|
|
- self._states.append(newState)
|
|
|
- self._expectations[newState]?.fulfill()
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
let port = 8080
|
|
|
|
|
|
@@ -88,7 +30,7 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
var serverGroup: EventLoopGroup!
|
|
|
var clientGroup: EventLoopGroup!
|
|
|
|
|
|
- var stateDelegate = ConnectivityStateCollectionDelegate()
|
|
|
+ var connectionStateRecorder = RecordingConnectivityDelegate()
|
|
|
|
|
|
override func setUp() {
|
|
|
self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
|
@@ -123,14 +65,19 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
|
|
|
func connectionBuilder() -> ClientConnection.Builder {
|
|
|
return ClientConnection.insecure(group: self.clientGroup)
|
|
|
- .withConnectivityStateDelegate(self.stateDelegate)
|
|
|
+ .withConnectivityStateDelegate(self.connectionStateRecorder)
|
|
|
.withConnectionBackoff(maximum: .milliseconds(100))
|
|
|
.withConnectionTimeout(minimum: .milliseconds(100))
|
|
|
}
|
|
|
|
|
|
func testClientConnectionFailsWithNoBackoff() throws {
|
|
|
- let connectionShutdown = self.expectation(description: "client shutdown")
|
|
|
- self.stateDelegate.expectations[.shutdown] = connectionShutdown
|
|
|
+ self.connectionStateRecorder.expectChanges(2) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .idle, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .shutdown)
|
|
|
+ ])
|
|
|
+ }
|
|
|
+
|
|
|
self.client = self.connectionBuilder()
|
|
|
.withConnectionReestablishment(enabled: false)
|
|
|
.connect(host: "localhost", port: self.port)
|
|
|
@@ -139,15 +86,18 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
let echo = Echo_EchoClient(channel: self.client)
|
|
|
_ = echo.get(.with { $0.text = "foo" })
|
|
|
|
|
|
- self.wait(for: [connectionShutdown], timeout: 1.0)
|
|
|
- XCTAssertEqual(self.stateDelegate.states, [.connecting, .shutdown])
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
}
|
|
|
|
|
|
func testClientConnectionFailureIsLimited() throws {
|
|
|
- let connectionShutdown = self.expectation(description: "client shutdown")
|
|
|
- let failures = self.expectation(description: "connection failed")
|
|
|
- self.stateDelegate.expectations[.shutdown] = connectionShutdown
|
|
|
- self.stateDelegate.expectations[.transientFailure] = failures
|
|
|
+ self.connectionStateRecorder.expectChanges(4) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .idle, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .transientFailure),
|
|
|
+ Change(from: .transientFailure, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .shutdown),
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
self.client = self.connectionBuilder()
|
|
|
.withConnectionBackoff(retries: .upTo(1))
|
|
|
@@ -157,15 +107,16 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
let echo = Echo_EchoClient(channel: self.client)
|
|
|
_ = echo.get(.with { $0.text = "foo" })
|
|
|
|
|
|
- self.wait(for: [connectionShutdown, failures], timeout: 1.0)
|
|
|
- XCTAssertEqual(self.stateDelegate.states, [.connecting, .transientFailure, .connecting, .shutdown])
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
}
|
|
|
|
|
|
func testClientEventuallyConnects() throws {
|
|
|
- let transientFailure = self.expectation(description: "connection transientFailure")
|
|
|
- let connectionReady = self.expectation(description: "connection ready")
|
|
|
- self.stateDelegate.expectations[.transientFailure] = transientFailure
|
|
|
- self.stateDelegate.expectations[.ready] = connectionReady
|
|
|
+ self.connectionStateRecorder.expectChanges(2) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .idle, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .transientFailure)
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
// Start the client first.
|
|
|
self.client = self.connectionBuilder()
|
|
|
@@ -175,18 +126,21 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
let echo = Echo_EchoClient(channel: self.client)
|
|
|
_ = echo.get(.with { $0.text = "foo" })
|
|
|
|
|
|
- self.wait(for: [transientFailure], timeout: 1.0)
|
|
|
- self.stateDelegate.expectations[.transientFailure] = nil
|
|
|
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
+
|
|
|
+ self.connectionStateRecorder.expectChanges(2) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .transientFailure, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .ready)
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
self.server = self.makeServer()
|
|
|
let serverStarted = self.expectation(description: "server started")
|
|
|
self.server.assertSuccess(fulfill: serverStarted)
|
|
|
|
|
|
- self.wait(for: [serverStarted, connectionReady], timeout: 2.0, enforceOrder: true)
|
|
|
- // We can have other transient failures and connection attempts while the server starts, we only
|
|
|
- // care about the last two.
|
|
|
- XCTAssertEqual(self.stateDelegate.states.suffix(2), [.connecting, .ready])
|
|
|
+ self.wait(for: [serverStarted], timeout: 5.0)
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
}
|
|
|
|
|
|
func testClientReconnectsAutomatically() throws {
|
|
|
@@ -195,8 +149,12 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
let server = try self.server.wait()
|
|
|
|
|
|
// Prepare the delegate so it expects the connection to hit `.ready`.
|
|
|
- let connectionReady = self.expectation(description: "connection ready")
|
|
|
- self.stateDelegate.expectations[.ready] = connectionReady
|
|
|
+ self.connectionStateRecorder.expectChanges(2) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .idle, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .ready)
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
// Configure the client backoff to have a short backoff.
|
|
|
self.client = self.connectionBuilder()
|
|
|
@@ -208,16 +166,18 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
_ = echo.get(.with { $0.text = "foo" })
|
|
|
|
|
|
// Wait for the connection to be ready.
|
|
|
- self.wait(for: [connectionReady], timeout: 1.0)
|
|
|
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
|
|
|
// Now that we have a healthy connection, prepare for two transient failures:
|
|
|
// 1. when the server has been killed, and
|
|
|
// 2. when the client attempts to reconnect.
|
|
|
- let transientFailure = self.expectation(description: "connection transientFailure")
|
|
|
- transientFailure.expectedFulfillmentCount = 2
|
|
|
- self.stateDelegate.expectations[.transientFailure] = transientFailure
|
|
|
- self.stateDelegate.expectations[.ready] = nil
|
|
|
+ self.connectionStateRecorder.expectChanges(3) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .ready, to: .transientFailure),
|
|
|
+ Change(from: .transientFailure, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .transientFailure),
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
// Okay, kill the server!
|
|
|
try server.close().wait()
|
|
|
@@ -226,13 +186,15 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
self.serverGroup = nil
|
|
|
|
|
|
// Our connection should fail now.
|
|
|
- self.wait(for: [transientFailure], timeout: 1.0)
|
|
|
- XCTAssertEqual(self.stateDelegate.clearStates(), [.transientFailure, .connecting, .transientFailure])
|
|
|
- self.stateDelegate.expectations[.transientFailure] = nil
|
|
|
-
|
|
|
- // Prepare an expectation for a new healthy connection.
|
|
|
- let reconnectionReady = self.expectation(description: "(re)connection ready")
|
|
|
- self.stateDelegate.expectations[.ready] = reconnectionReady
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
+
|
|
|
+ // Get ready for the new healthy connection.
|
|
|
+ self.connectionStateRecorder.expectChanges(2) { changes in
|
|
|
+ XCTAssertEqual(changes, [
|
|
|
+ Change(from: .transientFailure, to: .connecting),
|
|
|
+ Change(from: .connecting, to: .ready)
|
|
|
+ ])
|
|
|
+ }
|
|
|
|
|
|
// This should succeed once we get a connection again.
|
|
|
let get = echo.get(.with { $0.text = "hello" })
|
|
|
@@ -241,13 +203,11 @@ class ClientConnectionBackoffTests: GRPCTestCase {
|
|
|
self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
|
self.server = self.makeServer()
|
|
|
|
|
|
- self.wait(for: [reconnectionReady], timeout: 2.0)
|
|
|
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
|
|
|
+ self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
|
|
|
|
|
|
// The call should be able to succeed now.
|
|
|
XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
|
|
|
|
|
|
try self.client.close().wait()
|
|
|
- XCTAssertEqual(self.stateDelegate.clearStates(), [.shutdown])
|
|
|
}
|
|
|
}
|