| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- /*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import EchoImplementation
- import EchoModel
- import Foundation
- import GRPC
- import NIOConcurrencyHelpers
- import NIOCore
- import NIOPosix
- import XCTest
- class ClientConnectionBackoffTests: GRPCTestCase {
- let port = 8080
- var client: ClientConnection!
- var server: EventLoopFuture<Server>!
- var serverGroup: EventLoopGroup!
- var clientGroup: EventLoopGroup!
- var connectionStateRecorder = RecordingConnectivityDelegate()
- override func setUp() {
- super.setUp()
- self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- }
- override func tearDown() {
- // We have additional state changes during tear down, in some cases we can over-fulfill a test
- // expectation which causes false negatives.
- self.client.connectivity.delegate = nil
- if let server = self.server {
- XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
- }
- XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
- self.server = nil
- self.serverGroup = nil
- // We don't always expect a client to be closed cleanly, since in some cases we deliberately
- // timeout the connection.
- try? self.client.close().wait()
- XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
- self.client = nil
- self.clientGroup = nil
- super.tearDown()
- }
- func makeServer() -> EventLoopFuture<Server> {
- return Server.insecure(group: self.serverGroup)
- .withServiceProviders([EchoProvider()])
- .withLogger(self.serverLogger)
- .bind(host: "localhost", port: self.port)
- }
- func connectionBuilder() -> ClientConnection.Builder {
- return ClientConnection.insecure(group: self.clientGroup)
- .withConnectivityStateDelegate(self.connectionStateRecorder)
- .withConnectionBackoff(maximum: .milliseconds(100))
- .withConnectionTimeout(minimum: .milliseconds(100))
- .withBackgroundActivityLogger(self.clientLogger)
- }
- func testClientConnectionFailsWithNoBackoff() throws {
- self.connectionStateRecorder.expectChanges(2) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .shutdown),
- ]
- )
- }
- self.client = self.connectionBuilder()
- .withConnectionReestablishment(enabled: false)
- .connect(host: "localhost", port: self.port)
- // Start an RPC to trigger creating a channel.
- let echo = Echo_EchoNIOClient(
- channel: self.client,
- defaultCallOptions: self.callOptionsWithLogger
- )
- _ = echo.get(.with { $0.text = "foo" })
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- }
- func testClientConnectionFailureIsLimited() throws {
- self.connectionStateRecorder.expectChanges(4) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- Change(from: .transientFailure, to: .connecting),
- Change(from: .connecting, to: .shutdown),
- ]
- )
- }
- self.client = self.connectionBuilder()
- .withConnectionBackoff(retries: .upTo(1))
- .connect(host: "localhost", port: self.port)
- // Start an RPC to trigger creating a channel.
- let echo = Echo_EchoNIOClient(
- channel: self.client,
- defaultCallOptions: self.callOptionsWithLogger
- )
- _ = echo.get(.with { $0.text = "foo" })
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- }
- func testClientEventuallyConnects() throws {
- self.connectionStateRecorder.expectChanges(2) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]
- )
- }
- // Start the client first.
- self.client = self.connectionBuilder()
- .connect(host: "localhost", port: self.port)
- // Start an RPC to trigger creating a channel.
- let echo = Echo_EchoNIOClient(
- channel: self.client,
- defaultCallOptions: self.callOptionsWithLogger
- )
- _ = echo.get(.with { $0.text = "foo" })
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- self.connectionStateRecorder.expectChanges(2) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .transientFailure, to: .connecting),
- Change(from: .connecting, to: .ready),
- ]
- )
- }
- self.server = self.makeServer()
- let serverStarted = self.expectation(description: "server started")
- self.server.assertSuccess(fulfill: serverStarted)
- self.wait(for: [serverStarted], timeout: 5.0)
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- }
- func testClientReconnectsAutomatically() throws {
- // Wait for the server to start.
- self.server = self.makeServer()
- let server = try self.server.wait()
- // Prepare the delegate so it expects the connection to hit `.ready`.
- self.connectionStateRecorder.expectChanges(2) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .ready),
- ]
- )
- }
- // Configure the client backoff to have a short backoff.
- self.client = self.connectionBuilder()
- .withConnectionBackoff(maximum: .seconds(2))
- .connect(host: "localhost", port: self.port)
- // Start an RPC to trigger creating a channel, it's a streaming RPC so that when the server is
- // killed, the client still has one active RPC and transitions to transient failure (rather than
- // idle if there were no active RPCs).
- let echo = Echo_EchoNIOClient(
- channel: self.client,
- defaultCallOptions: self.callOptionsWithLogger
- )
- _ = echo.update { _ in }
- // Wait for the connection to be ready.
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- // Now that we have a healthy connection, prepare for two transient failures:
- // 1. when the server has been killed, and
- // 2. when the client attempts to reconnect.
- self.connectionStateRecorder.expectChanges(3) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .ready, to: .transientFailure),
- Change(from: .transientFailure, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]
- )
- }
- // Okay, kill the server!
- try server.close().wait()
- try self.serverGroup.syncShutdownGracefully()
- self.server = nil
- self.serverGroup = nil
- // Our connection should fail now.
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- // Get ready for the new healthy connection.
- self.connectionStateRecorder.expectChanges(2) { changes in
- XCTAssertEqual(
- changes,
- [
- Change(from: .transientFailure, to: .connecting),
- Change(from: .connecting, to: .ready),
- ]
- )
- }
- // This should succeed once we get a connection again.
- let get = echo.get(.with { $0.text = "hello" })
- // Start a new server.
- self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- self.server = self.makeServer()
- self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
- // The call should be able to succeed now.
- XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
- try self.client.close().wait()
- }
- }
|