| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- /*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Foundation
- import GRPC
- import EchoModel
- import EchoImplementation
- import NIO
- import XCTest
- import NIOConcurrencyHelpers
- class ConnectivityStateCollectionDelegate: ConnectivityStateDelegate {
- private var _states: [ConnectivityState] = []
- private var lock = Lock()
- var states: [ConnectivityState] {
- get {
- return self.lock.withLock {
- return self._states
- }
- }
- }
- func clearStates() -> [ConnectivityState] {
- self.lock.lock()
- defer {
- self._states.removeAll()
- self.lock.unlock()
- }
- return self._states
- }
- private var _expectations: [ConnectivityState: XCTestExpectation] = [:]
- var expectations: [ConnectivityState: XCTestExpectation] {
- get {
- return self.lock.withLock {
- self._expectations
- }
- }
- set {
- self.lock.withLockVoid {
- self._expectations = newValue
- }
- }
- }
- init(
- idle: XCTestExpectation? = nil,
- connecting: XCTestExpectation? = nil,
- ready: XCTestExpectation? = nil,
- transientFailure: XCTestExpectation? = nil,
- shutdown: XCTestExpectation? = nil
- ) {
- self.expectations[.idle] = idle
- self.expectations[.connecting] = connecting
- self.expectations[.ready] = ready
- self.expectations[.transientFailure] = transientFailure
- self.expectations[.shutdown] = shutdown
- }
- func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
- self.lock.withLockVoid {
- self._states.append(newState)
- self._expectations[newState]?.fulfill()
- }
- }
- }
- class ClientConnectionBackoffTests: GRPCTestCase {
- let port = 8080
- var client: ClientConnection!
- var server: EventLoopFuture<Server>!
- var serverGroup: EventLoopGroup!
- var clientGroup: EventLoopGroup!
- var stateDelegate = ConnectivityStateCollectionDelegate()
- override func setUp() {
- self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- }
- override func tearDown() {
- // We have additional state changes during tear down, in some cases we can over-fulfill a test
- // expectation which causes false negatives.
- self.client.connectivity.delegate = nil
- if let server = self.server {
- XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
- }
- XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
- self.server = nil
- self.serverGroup = nil
- // We don't always expect a client to be closed cleanly, since in some cases we deliberately
- // timeout the connection.
- try? self.client.close().wait()
- XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
- self.client = nil
- self.clientGroup = nil
- }
- func makeServer() -> EventLoopFuture<Server> {
- return Server.insecure(group: self.serverGroup)
- .withServiceProviders([EchoProvider()])
- .bind(host: "localhost", port: self.port)
- }
- func connectionBuilder() -> ClientConnection.Builder {
- return ClientConnection.insecure(group: self.clientGroup)
- .withConnectivityStateDelegate(self.stateDelegate)
- .withConnectionBackoff(maximum: .milliseconds(100))
- .withConnectionTimeout(minimum: .milliseconds(100))
- }
- func testClientConnectionFailsWithNoBackoff() throws {
- let connectionShutdown = self.expectation(description: "client shutdown")
- self.stateDelegate.expectations[.shutdown] = connectionShutdown
- self.client = self.connectionBuilder()
- .withConnectionReestablishment(enabled: false)
- .connect(host: "localhost", port: self.port)
- self.wait(for: [connectionShutdown], timeout: 1.0)
- XCTAssertEqual(self.stateDelegate.states, [.connecting, .shutdown])
- }
- func testClientEventuallyConnects() throws {
- let transientFailure = self.expectation(description: "connection transientFailure")
- let connectionReady = self.expectation(description: "connection ready")
- self.stateDelegate.expectations[.transientFailure] = transientFailure
- self.stateDelegate.expectations[.ready] = connectionReady
- // Start the client first.
- self.client = self.connectionBuilder()
- .connect(host: "localhost", port: self.port)
- self.wait(for: [transientFailure], timeout: 1.0)
- self.stateDelegate.expectations[.transientFailure] = nil
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
- self.server = self.makeServer()
- let serverStarted = self.expectation(description: "server started")
- self.server.assertSuccess(fulfill: serverStarted)
- self.wait(for: [serverStarted, connectionReady], timeout: 2.0, enforceOrder: true)
- // We can have other transient failures and connection attempts while the server starts, we only
- // care about the last two.
- XCTAssertEqual(self.stateDelegate.states.suffix(2), [.connecting, .ready])
- }
- func testClientReconnectsAutomatically() throws {
- // Wait for the server to start.
- self.server = self.makeServer()
- let server = try self.server.wait()
- // Prepare the delegate so it expects the connection to hit `.ready`.
- let connectionReady = self.expectation(description: "connection ready")
- self.stateDelegate.expectations[.ready] = connectionReady
- // Configure the client backoff to have a short backoff.
- self.client = self.connectionBuilder()
- .withConnectionBackoff(maximum: .seconds(2))
- .connect(host: "localhost", port: self.port)
- // Wait for the connection to be ready.
- self.wait(for: [connectionReady], timeout: 1.0)
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
- // Now that we have a healthy connectiony, prepare for two transient failures:
- // 1. when the server has been killed, and
- // 2. when the client attempts to reconnect.
- let transientFailure = self.expectation(description: "connection transientFailure")
- transientFailure.expectedFulfillmentCount = 2
- self.stateDelegate.expectations[.transientFailure] = transientFailure
- self.stateDelegate.expectations[.ready] = nil
- // Okay, kill the server!
- try server.close().wait()
- try self.serverGroup.syncShutdownGracefully()
- self.server = nil
- self.serverGroup = nil
- // Our connection should fail now.
- self.wait(for: [transientFailure], timeout: 1.0)
- XCTAssertEqual(self.stateDelegate.clearStates(), [.transientFailure, .connecting, .transientFailure])
- self.stateDelegate.expectations[.transientFailure] = nil
- // Prepare an expectation for a new healthy connection.
- let reconnectionReady = self.expectation(description: "(re)connection ready")
- self.stateDelegate.expectations[.ready] = reconnectionReady
- let echo = Echo_EchoClient(channel: self.client)
- // This should succeed once we get a connection again.
- let get = echo.get(.with { $0.text = "hello" })
- // Start a new server.
- self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- self.server = self.makeServer()
- self.wait(for: [reconnectionReady], timeout: 2.0)
- XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
- // The call should be able to succeed now.
- XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
- try self.client.close().wait()
- XCTAssertEqual(self.stateDelegate.clearStates(), [.shutdown])
- }
- }
|