ClientConnectionBackoffTests.swift 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import Foundation
  19. import GRPC
  20. import NIOConcurrencyHelpers
  21. import NIOCore
  22. import NIOPosix
  23. import XCTest
  24. class ClientConnectionBackoffTests: GRPCTestCase {
  25. let port = 8080
  26. var client: ClientConnection!
  27. var server: EventLoopFuture<Server>!
  28. var serverGroup: EventLoopGroup!
  29. var clientGroup: EventLoopGroup!
  30. var connectionStateRecorder = RecordingConnectivityDelegate()
  31. override func setUp() {
  32. super.setUp()
  33. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  34. self.clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  35. }
  36. override func tearDown() {
  37. // We have additional state changes during tear down, in some cases we can over-fulfill a test
  38. // expectation which causes false negatives.
  39. self.client.connectivity.delegate = nil
  40. if let server = self.server {
  41. XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
  42. }
  43. XCTAssertNoThrow(try? self.serverGroup.syncShutdownGracefully())
  44. self.server = nil
  45. self.serverGroup = nil
  46. // We don't always expect a client to be closed cleanly, since in some cases we deliberately
  47. // timeout the connection.
  48. try? self.client.close().wait()
  49. XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
  50. self.client = nil
  51. self.clientGroup = nil
  52. super.tearDown()
  53. }
  54. func makeServer() -> EventLoopFuture<Server> {
  55. return Server.insecure(group: self.serverGroup)
  56. .withServiceProviders([EchoProvider()])
  57. .withLogger(self.serverLogger)
  58. .bind(host: "localhost", port: self.port)
  59. }
  60. func connectionBuilder() -> ClientConnection.Builder {
  61. return ClientConnection.insecure(group: self.clientGroup)
  62. .withConnectivityStateDelegate(self.connectionStateRecorder)
  63. .withConnectionBackoff(maximum: .milliseconds(100))
  64. .withConnectionTimeout(minimum: .milliseconds(100))
  65. .withBackgroundActivityLogger(self.clientLogger)
  66. }
  67. func testClientConnectionFailsWithNoBackoff() throws {
  68. self.connectionStateRecorder.expectChanges(2) { changes in
  69. XCTAssertEqual(
  70. changes,
  71. [
  72. Change(from: .idle, to: .connecting),
  73. Change(from: .connecting, to: .shutdown),
  74. ]
  75. )
  76. }
  77. self.client = self.connectionBuilder()
  78. .withConnectionReestablishment(enabled: false)
  79. .connect(host: "localhost", port: self.port)
  80. // Start an RPC to trigger creating a channel.
  81. let echo = Echo_EchoNIOClient(
  82. channel: self.client,
  83. defaultCallOptions: self.callOptionsWithLogger
  84. )
  85. _ = echo.get(.with { $0.text = "foo" })
  86. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  87. }
  88. func testClientConnectionFailureIsLimited() throws {
  89. self.connectionStateRecorder.expectChanges(4) { changes in
  90. XCTAssertEqual(
  91. changes,
  92. [
  93. Change(from: .idle, to: .connecting),
  94. Change(from: .connecting, to: .transientFailure),
  95. Change(from: .transientFailure, to: .connecting),
  96. Change(from: .connecting, to: .shutdown),
  97. ]
  98. )
  99. }
  100. self.client = self.connectionBuilder()
  101. .withConnectionBackoff(retries: .upTo(1))
  102. .connect(host: "localhost", port: self.port)
  103. // Start an RPC to trigger creating a channel.
  104. let echo = Echo_EchoNIOClient(
  105. channel: self.client,
  106. defaultCallOptions: self.callOptionsWithLogger
  107. )
  108. _ = echo.get(.with { $0.text = "foo" })
  109. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  110. }
  111. func testClientEventuallyConnects() throws {
  112. self.connectionStateRecorder.expectChanges(2) { changes in
  113. XCTAssertEqual(
  114. changes,
  115. [
  116. Change(from: .idle, to: .connecting),
  117. Change(from: .connecting, to: .transientFailure),
  118. ]
  119. )
  120. }
  121. // Start the client first.
  122. self.client = self.connectionBuilder()
  123. .connect(host: "localhost", port: self.port)
  124. // Start an RPC to trigger creating a channel.
  125. let echo = Echo_EchoNIOClient(
  126. channel: self.client,
  127. defaultCallOptions: self.callOptionsWithLogger
  128. )
  129. _ = echo.get(.with { $0.text = "foo" })
  130. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  131. self.connectionStateRecorder.expectChanges(2) { changes in
  132. XCTAssertEqual(
  133. changes,
  134. [
  135. Change(from: .transientFailure, to: .connecting),
  136. Change(from: .connecting, to: .ready),
  137. ]
  138. )
  139. }
  140. self.server = self.makeServer()
  141. let serverStarted = self.expectation(description: "server started")
  142. self.server.assertSuccess(fulfill: serverStarted)
  143. self.wait(for: [serverStarted], timeout: 5.0)
  144. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  145. }
  146. func testClientReconnectsAutomatically() throws {
  147. // Wait for the server to start.
  148. self.server = self.makeServer()
  149. let server = try self.server.wait()
  150. // Prepare the delegate so it expects the connection to hit `.ready`.
  151. self.connectionStateRecorder.expectChanges(2) { changes in
  152. XCTAssertEqual(
  153. changes,
  154. [
  155. Change(from: .idle, to: .connecting),
  156. Change(from: .connecting, to: .ready),
  157. ]
  158. )
  159. }
  160. // Configure the client backoff to have a short backoff.
  161. self.client = self.connectionBuilder()
  162. .withConnectionBackoff(maximum: .seconds(2))
  163. .connect(host: "localhost", port: self.port)
  164. // Start an RPC to trigger creating a channel, it's a streaming RPC so that when the server is
  165. // killed, the client still has one active RPC and transitions to transient failure (rather than
  166. // idle if there were no active RPCs).
  167. let echo = Echo_EchoNIOClient(
  168. channel: self.client,
  169. defaultCallOptions: self.callOptionsWithLogger
  170. )
  171. _ = echo.update { _ in }
  172. // Wait for the connection to be ready.
  173. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  174. // Now that we have a healthy connection, prepare for two transient failures:
  175. // 1. when the server has been killed, and
  176. // 2. when the client attempts to reconnect.
  177. self.connectionStateRecorder.expectChanges(3) { changes in
  178. XCTAssertEqual(
  179. changes,
  180. [
  181. Change(from: .ready, to: .transientFailure),
  182. Change(from: .transientFailure, to: .connecting),
  183. Change(from: .connecting, to: .transientFailure),
  184. ]
  185. )
  186. }
  187. // Okay, kill the server!
  188. try server.close().wait()
  189. try self.serverGroup.syncShutdownGracefully()
  190. self.server = nil
  191. self.serverGroup = nil
  192. // Our connection should fail now.
  193. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  194. // Get ready for the new healthy connection.
  195. self.connectionStateRecorder.expectChanges(2) { changes in
  196. XCTAssertEqual(
  197. changes,
  198. [
  199. Change(from: .transientFailure, to: .connecting),
  200. Change(from: .connecting, to: .ready),
  201. ]
  202. )
  203. }
  204. // This should succeed once we get a connection again.
  205. let get = echo.get(.with { $0.text = "hello" })
  206. // Start a new server.
  207. self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  208. self.server = self.makeServer()
  209. self.connectionStateRecorder.waitForExpectedChanges(timeout: .seconds(5))
  210. // The call should be able to succeed now.
  211. XCTAssertEqual(try get.status.map { $0.code }.wait(), .ok)
  212. try self.client.close().wait()
  213. }
  214. }