ClientConnectionBackoffTests.swift 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPC
  18. import NIO
  19. import XCTest
  20. class ConnectivityStateCollectionDelegate: ConnectivityStateDelegate {
  21. var states: [ConnectivityState] = []
  22. func clearStates() -> [ConnectivityState] {
  23. defer {
  24. self.states = []
  25. }
  26. return self.states
  27. }
  28. func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
  29. self.states.append(newState)
  30. }
  31. }
  32. class ClientConnectionBackoffTests: XCTestCase {
  33. let port = 8080
  34. var client: ClientConnection!
  35. var server: EventLoopFuture<Server>!
  36. var serverGroup: EventLoopGroup!
  37. var clientGroup: EventLoopGroup!
  38. var stateDelegate = ConnectivityStateCollectionDelegate()
  39. override func setUp() {
  40. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  41. self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  42. }
  43. override func tearDown() {
  44. if let server = self.server {
  45. XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
  46. }
  47. XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
  48. self.server = nil
  49. self.serverGroup = nil
  50. // We don't always expect a client to be closed cleanly, since in some cases we deliberately
  51. // timeout the connection.
  52. try? self.client.close().wait()
  53. XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
  54. self.client = nil
  55. self.clientGroup = nil
  56. }
  57. func makeServer() -> EventLoopFuture<Server> {
  58. let configuration = Server.Configuration(
  59. target: .hostAndPort("localhost", self.port),
  60. eventLoopGroup: self.serverGroup,
  61. serviceProviders: [EchoProvider()])
  62. return Server.start(configuration: configuration)
  63. }
  64. func makeClientConfiguration() -> ClientConnection.Configuration {
  65. return .init(
  66. target: .hostAndPort("localhost", self.port),
  67. eventLoopGroup: self.clientGroup,
  68. connectivityStateDelegate: self.stateDelegate,
  69. connectionBackoff: ConnectionBackoff(maximumBackoff: 0.1))
  70. }
  71. func makeClientConnection(
  72. _ configuration: ClientConnection.Configuration
  73. ) -> ClientConnection {
  74. return ClientConnection(configuration: configuration)
  75. }
  76. func testClientConnectionFailsWithNoBackoff() throws {
  77. var configuration = self.makeClientConfiguration()
  78. configuration.connectionBackoff = nil
  79. let connectionShutdown = self.expectation(description: "client shutdown")
  80. self.client = self.makeClientConnection(configuration)
  81. self.client.connectivity.onNext(state: .shutdown) {
  82. connectionShutdown.fulfill()
  83. }
  84. self.wait(for: [connectionShutdown], timeout: 1.0)
  85. XCTAssertEqual(self.stateDelegate.states, [.connecting, .shutdown])
  86. }
  87. func testClientEventuallyConnects() throws {
  88. // Start the client first.
  89. self.client = self.makeClientConnection(self.makeClientConfiguration())
  90. let transientFailure = self.expectation(description: "connection transientFailure")
  91. self.client.connectivity.onNext(state: .transientFailure) {
  92. transientFailure.fulfill()
  93. }
  94. let connectionReady = self.expectation(description: "connection ready")
  95. self.client.connectivity.onNext(state: .ready) {
  96. connectionReady.fulfill()
  97. }
  98. self.wait(for: [transientFailure], timeout: 1.0)
  99. self.server = self.makeServer()
  100. let serverStarted = self.expectation(description: "server started")
  101. self.server.assertSuccess(fulfill: serverStarted)
  102. self.wait(for: [serverStarted, connectionReady], timeout: 2.0, enforceOrder: true)
  103. XCTAssertEqual(self.stateDelegate.states, [.connecting, .transientFailure, .connecting, .ready])
  104. }
  105. func testClientEventuallyTimesOut() throws {
  106. let connectionShutdown = self.expectation(description: "connection shutdown")
  107. self.client = self.makeClientConnection(self.makeClientConfiguration())
  108. self.client.connectivity.onNext(state: .shutdown) {
  109. connectionShutdown.fulfill()
  110. }
  111. self.wait(for: [connectionShutdown], timeout: 1.0)
  112. XCTAssertEqual(self.stateDelegate.states, [.connecting, .transientFailure, .connecting, .shutdown])
  113. }
  114. func testClientReconnectsAutomatically() throws {
  115. self.server = self.makeServer()
  116. let server = try self.server.wait()
  117. let connectionReady = self.expectation(description: "connection ready")
  118. var configuration = self.makeClientConfiguration()
  119. configuration.connectionBackoff!.maximumBackoff = 2.0
  120. self.client = self.makeClientConnection(configuration)
  121. self.client.connectivity.onNext(state: .ready) {
  122. connectionReady.fulfill()
  123. }
  124. // Once the connection is ready we can kill the server.
  125. self.wait(for: [connectionReady], timeout: 1.0)
  126. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
  127. try server.close().wait()
  128. try self.serverGroup.syncShutdownGracefully()
  129. self.server = nil
  130. self.serverGroup = nil
  131. let transientFailure = self.expectation(description: "connection transientFailure")
  132. self.client.connectivity.onNext(state: .transientFailure) {
  133. transientFailure.fulfill()
  134. }
  135. self.wait(for: [transientFailure], timeout: 1.0)
  136. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
  137. let reconnectionReady = self.expectation(description: "(re)connection ready")
  138. self.client.connectivity.onNext(state: .ready) {
  139. reconnectionReady.fulfill()
  140. }
  141. let echo = Echo_EchoServiceClient(connection: self.client)
  142. // This should succeed once we get a connection again.
  143. let get = echo.get(.with { $0.text = "hello" })
  144. // Start a new server.
  145. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  146. self.server = self.makeServer()
  147. self.wait(for: [reconnectionReady], timeout: 2.0)
  148. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
  149. // The call should be able to succeed now.
  150. XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
  151. try self.client.close().wait()
  152. XCTAssertEqual(self.stateDelegate.clearStates(), [.shutdown])
  153. }
  154. }