ClientConnectionBackoffTests.swift 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import Foundation
  19. import GRPC
  20. import NIO
  21. import NIOConcurrencyHelpers
  22. import XCTest
  23. class ClientConnectionBackoffTests: GRPCTestCase {
  24. let port = 8080
  25. var client: ClientConnection!
  26. var server: EventLoopFuture<Server>!
  27. var serverGroup: EventLoopGroup!
  28. var clientGroup: EventLoopGroup!
  29. var connectionStateRecorder = RecordingConnectivityDelegate()
  30. override func setUp() {
  31. super.setUp()
  32. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  33. self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  34. }
  35. override func tearDown() {
  36. // We have additional state changes during tear down, in some cases we can over-fulfill a test
  37. // expectation which causes false negatives.
  38. self.client.connectivity.delegate = nil
  39. if let server = self.server {
  40. XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
  41. }
  42. XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
  43. self.server = nil
  44. self.serverGroup = nil
  45. // We don't always expect a client to be closed cleanly, since in some cases we deliberately
  46. // timeout the connection.
  47. try? self.client.close().wait()
  48. XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
  49. self.client = nil
  50. self.clientGroup = nil
  51. super.tearDown()
  52. }
  53. func makeServer() -> EventLoopFuture<Server> {
  54. return Server.insecure(group: self.serverGroup)
  55. .withServiceProviders([EchoProvider()])
  56. .withLogger(self.serverLogger)
  57. .bind(host: "localhost", port: self.port)
  58. }
  59. func connectionBuilder() -> ClientConnection.Builder {
  60. return ClientConnection.insecure(group: self.clientGroup)
  61. .withConnectivityStateDelegate(self.connectionStateRecorder)
  62. .withConnectionBackoff(maximum: .milliseconds(100))
  63. .withConnectionTimeout(minimum: .milliseconds(100))
  64. .withBackgroundActivityLogger(self.clientLogger)
  65. }
  66. func testClientConnectionFailsWithNoBackoff() throws {
  67. self.connectionStateRecorder.expectChanges(2) { changes in
  68. XCTAssertEqual(changes, [
  69. Change(from: .idle, to: .connecting),
  70. Change(from: .connecting, to: .shutdown),
  71. ])
  72. }
  73. self.client = self.connectionBuilder()
  74. .withConnectionReestablishment(enabled: false)
  75. .connect(host: "localhost", port: self.port)
  76. // Start an RPC to trigger creating a channel.
  77. let echo = Echo_EchoClient(channel: self.client, defaultCallOptions: self.callOptionsWithLogger)
  78. _ = echo.get(.with { $0.text = "foo" })
  79. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  80. }
  81. func testClientConnectionFailureIsLimited() throws {
  82. self.connectionStateRecorder.expectChanges(4) { changes in
  83. XCTAssertEqual(changes, [
  84. Change(from: .idle, to: .connecting),
  85. Change(from: .connecting, to: .transientFailure),
  86. Change(from: .transientFailure, to: .connecting),
  87. Change(from: .connecting, to: .shutdown),
  88. ])
  89. }
  90. self.client = self.connectionBuilder()
  91. .withConnectionBackoff(retries: .upTo(1))
  92. .connect(host: "localhost", port: self.port)
  93. // Start an RPC to trigger creating a channel.
  94. let echo = Echo_EchoClient(channel: self.client, defaultCallOptions: self.callOptionsWithLogger)
  95. _ = echo.get(.with { $0.text = "foo" })
  96. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  97. }
  98. func testClientEventuallyConnects() throws {
  99. self.connectionStateRecorder.expectChanges(2) { changes in
  100. XCTAssertEqual(changes, [
  101. Change(from: .idle, to: .connecting),
  102. Change(from: .connecting, to: .transientFailure),
  103. ])
  104. }
  105. // Start the client first.
  106. self.client = self.connectionBuilder()
  107. .connect(host: "localhost", port: self.port)
  108. // Start an RPC to trigger creating a channel.
  109. let echo = Echo_EchoClient(channel: self.client, defaultCallOptions: self.callOptionsWithLogger)
  110. _ = echo.get(.with { $0.text = "foo" })
  111. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  112. self.connectionStateRecorder.expectChanges(2) { changes in
  113. XCTAssertEqual(changes, [
  114. Change(from: .transientFailure, to: .connecting),
  115. Change(from: .connecting, to: .ready),
  116. ])
  117. }
  118. self.server = self.makeServer()
  119. let serverStarted = self.expectation(description: "server started")
  120. self.server.assertSuccess(fulfill: serverStarted)
  121. self.wait(for: [serverStarted], timeout: 5.0)
  122. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  123. }
  124. func testClientReconnectsAutomatically() throws {
  125. // Wait for the server to start.
  126. self.server = self.makeServer()
  127. let server = try self.server.wait()
  128. // Prepare the delegate so it expects the connection to hit `.ready`.
  129. self.connectionStateRecorder.expectChanges(2) { changes in
  130. XCTAssertEqual(changes, [
  131. Change(from: .idle, to: .connecting),
  132. Change(from: .connecting, to: .ready),
  133. ])
  134. }
  135. // Configure the client backoff to have a short backoff.
  136. self.client = self.connectionBuilder()
  137. .withConnectionBackoff(maximum: .seconds(2))
  138. .connect(host: "localhost", port: self.port)
  139. // Start an RPC to trigger creating a channel, it's a streaming RPC so that when the server is
  140. // killed, the client still has one active RPC and transitions to transient failure (rather than
  141. // idle if there were no active RPCs).
  142. let echo = Echo_EchoClient(channel: self.client, defaultCallOptions: self.callOptionsWithLogger)
  143. _ = echo.update { _ in }
  144. // Wait for the connection to be ready.
  145. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  146. // Now that we have a healthy connection, prepare for two transient failures:
  147. // 1. when the server has been killed, and
  148. // 2. when the client attempts to reconnect.
  149. self.connectionStateRecorder.expectChanges(3) { changes in
  150. XCTAssertEqual(changes, [
  151. Change(from: .ready, to: .transientFailure),
  152. Change(from: .transientFailure, to: .connecting),
  153. Change(from: .connecting, to: .transientFailure),
  154. ])
  155. }
  156. // Okay, kill the server!
  157. try server.close().wait()
  158. try self.serverGroup.syncShutdownGracefully()
  159. self.server = nil
  160. self.serverGroup = nil
  161. // Our connection should fail now.
  162. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  163. // Get ready for the new healthy connection.
  164. self.connectionStateRecorder.expectChanges(2) { changes in
  165. XCTAssertEqual(changes, [
  166. Change(from: .transientFailure, to: .connecting),
  167. Change(from: .connecting, to: .ready),
  168. ])
  169. }
  170. // This should succeed once we get a connection again.
  171. let get = echo.get(.with { $0.text = "hello" })
  172. // Start a new server.
  173. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  174. self.server = self.makeServer()
  175. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  176. // The call should be able to succeed now.
  177. XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
  178. try self.client.close().wait()
  179. }
  180. }