ClientConnectionBackoffTests.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPC
  18. import NIO
  19. import XCTest
  20. import NIOConcurrencyHelpers
  21. class ConnectivityStateCollectionDelegate: ConnectivityStateDelegate {
  22. private var _states: [ConnectivityState] = []
  23. private var lock = Lock()
  24. var states: [ConnectivityState] {
  25. get {
  26. return self.lock.withLock {
  27. return self._states
  28. }
  29. }
  30. }
  31. func clearStates() -> [ConnectivityState] {
  32. self.lock.lock()
  33. defer {
  34. self._states.removeAll()
  35. self.lock.unlock()
  36. }
  37. return self._states
  38. }
  39. var idleExpectation: XCTestExpectation?
  40. var connectingExpectation: XCTestExpectation?
  41. var readyExpectation: XCTestExpectation?
  42. var transientFailureExpectation: XCTestExpectation?
  43. var shutdownExpectation: XCTestExpectation?
  44. init(
  45. idle: XCTestExpectation? = nil,
  46. connecting: XCTestExpectation? = nil,
  47. ready: XCTestExpectation? = nil,
  48. transientFailure: XCTestExpectation? = nil,
  49. shutdown: XCTestExpectation? = nil
  50. ) {
  51. self.idleExpectation = idle
  52. self.connectingExpectation = connecting
  53. self.readyExpectation = ready
  54. self.transientFailureExpectation = transientFailure
  55. self.shutdownExpectation = shutdown
  56. }
  57. func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
  58. self.lock.withLockVoid {
  59. self._states.append(newState)
  60. }
  61. switch newState {
  62. case .idle:
  63. self.idleExpectation?.fulfill()
  64. case .connecting:
  65. self.connectingExpectation?.fulfill()
  66. case .ready:
  67. self.readyExpectation?.fulfill()
  68. case .transientFailure:
  69. self.transientFailureExpectation?.fulfill()
  70. case .shutdown:
  71. self.shutdownExpectation?.fulfill()
  72. }
  73. }
  74. }
  75. class ClientConnectionBackoffTests: GRPCTestCase {
  76. let port = 8080
  77. var client: ClientConnection!
  78. var server: EventLoopFuture<Server>!
  79. var serverGroup: EventLoopGroup!
  80. var clientGroup: EventLoopGroup!
  81. var stateDelegate = ConnectivityStateCollectionDelegate()
  82. override func setUp() {
  83. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  84. self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  85. }
  86. override func tearDown() {
  87. // We have additional state changes during tear down, in some cases we can over-fulfill a test
  88. // expectation which causes false negatives.
  89. self.client.connectivity.delegate = nil
  90. if let server = self.server {
  91. XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
  92. }
  93. XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
  94. self.server = nil
  95. self.serverGroup = nil
  96. // We don't always expect a client to be closed cleanly, since in some cases we deliberately
  97. // timeout the connection.
  98. try? self.client.close().wait()
  99. XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
  100. self.client = nil
  101. self.clientGroup = nil
  102. }
  103. func makeServer() -> EventLoopFuture<Server> {
  104. let configuration = Server.Configuration(
  105. target: .hostAndPort("localhost", self.port),
  106. eventLoopGroup: self.serverGroup,
  107. serviceProviders: [EchoProvider()])
  108. return Server.start(configuration: configuration)
  109. }
  110. func makeClientConfiguration() -> ClientConnection.Configuration {
  111. return .init(
  112. target: .hostAndPort("localhost", self.port),
  113. eventLoopGroup: self.clientGroup,
  114. connectivityStateDelegate: self.stateDelegate,
  115. connectionBackoff: ConnectionBackoff(maximumBackoff: 0.1,
  116. minimumConnectionTimeout: 0.1))
  117. }
  118. func testClientConnectionFailsWithNoBackoff() throws {
  119. var configuration = self.makeClientConfiguration()
  120. configuration.connectionBackoff = nil
  121. let connectionShutdown = self.expectation(description: "client shutdown")
  122. self.stateDelegate.shutdownExpectation = connectionShutdown
  123. self.client = ClientConnection(configuration: configuration)
  124. self.wait(for: [connectionShutdown], timeout: 1.0)
  125. XCTAssertEqual(self.stateDelegate.states, [.connecting, .shutdown])
  126. }
  127. func testClientEventuallyConnects() throws {
  128. let transientFailure = self.expectation(description: "connection transientFailure")
  129. let connectionReady = self.expectation(description: "connection ready")
  130. self.stateDelegate.transientFailureExpectation = transientFailure
  131. self.stateDelegate.readyExpectation = connectionReady
  132. // Start the client first.
  133. self.client = ClientConnection(configuration: self.makeClientConfiguration())
  134. self.wait(for: [transientFailure], timeout: 1.0)
  135. self.stateDelegate.transientFailureExpectation = nil
  136. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
  137. self.server = self.makeServer()
  138. let serverStarted = self.expectation(description: "server started")
  139. self.server.assertSuccess(fulfill: serverStarted)
  140. self.wait(for: [serverStarted, connectionReady], timeout: 2.0, enforceOrder: true)
  141. // We can have other transient failures and connection attempts while the server starts, we only
  142. // care about the last two.
  143. XCTAssertEqual(self.stateDelegate.states.suffix(2), [.connecting, .ready])
  144. }
  145. func testClientReconnectsAutomatically() throws {
  146. self.server = self.makeServer()
  147. let server = try self.server.wait()
  148. var configuration = self.makeClientConfiguration()
  149. configuration.connectionBackoff!.maximumBackoff = 2.0
  150. let connectionReady = self.expectation(description: "connection ready")
  151. let transientFailure = self.expectation(description: "connection transientFailure")
  152. self.stateDelegate.readyExpectation = connectionReady
  153. self.stateDelegate.transientFailureExpectation = transientFailure
  154. self.client = ClientConnection(configuration: configuration)
  155. // Once the connection is ready we can kill the server.
  156. self.wait(for: [connectionReady], timeout: 1.0)
  157. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
  158. try server.close().wait()
  159. try self.serverGroup.syncShutdownGracefully()
  160. self.server = nil
  161. self.serverGroup = nil
  162. self.wait(for: [transientFailure], timeout: 1.0)
  163. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
  164. // Replace the ready expectation (since it's already been fulfilled).
  165. let reconnectionReady = self.expectation(description: "(re)connection ready")
  166. self.stateDelegate.readyExpectation = reconnectionReady
  167. let echo = Echo_EchoServiceClient(connection: self.client)
  168. // This should succeed once we get a connection again.
  169. let get = echo.get(.with { $0.text = "hello" })
  170. // Start a new server.
  171. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  172. self.server = self.makeServer()
  173. self.wait(for: [reconnectionReady], timeout: 2.0)
  174. XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
  175. // The call should be able to succeed now.
  176. XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
  177. try self.client.close().wait()
  178. XCTAssertEqual(self.stateDelegate.clearStates(), [.shutdown])
  179. }
  180. }