ConnectionManagerTests.swift 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIOCore
  20. import NIOEmbedded
  21. import NIOHTTP2
  22. import XCTest
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #file,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #file,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let channel = manager.getHTTP2Multiplexer()
  107. self.loop.run()
  108. return channel
  109. }
  110. self.waitForStateChange(from: .connecting, to: .shutdown) {
  111. channelPromise.fail(DoomedChannelError())
  112. }
  113. XCTAssertThrowsError(try multiplexer.wait()) {
  114. XCTAssertTrue($0 is DoomedChannelError)
  115. }
  116. }
  117. func testConnectAndDisconnect() throws {
  118. let channelPromise = self.loop.makePromise(of: Channel.self)
  119. let manager = self.makeConnectionManager { _, _ in
  120. return channelPromise.futureResult
  121. }
  122. // Start the connection.
  123. self.waitForStateChange(from: .idle, to: .connecting) {
  124. _ = manager.getHTTP2Multiplexer()
  125. self.loop.run()
  126. }
  127. // Setup the real channel and activate it.
  128. let channel = EmbeddedChannel(loop: self.loop)
  129. let h2mux = HTTP2StreamMultiplexer(
  130. mode: .client,
  131. channel: channel,
  132. inboundStreamInitializer: nil
  133. )
  134. try channel.pipeline.addHandler(
  135. GRPCIdleHandler(
  136. connectionManager: manager,
  137. multiplexer: h2mux,
  138. idleTimeout: .minutes(5),
  139. keepalive: .init(),
  140. logger: self.logger
  141. )
  142. ).wait()
  143. channelPromise.succeed(channel)
  144. XCTAssertNoThrow(
  145. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  146. .wait()
  147. )
  148. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  149. try self.waitForStateChange(from: .connecting, to: .ready) {
  150. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  151. XCTAssertNoThrow(try channel.writeInbound(frame))
  152. }
  153. // Close the channel.
  154. try self.waitForStateChange(from: .ready, to: .shutdown) {
  155. // Now the channel should be available: shut it down.
  156. let shutdown = manager.shutdown()
  157. self.loop.run()
  158. XCTAssertNoThrow(try shutdown.wait())
  159. }
  160. }
  161. func testConnectAndIdle() throws {
  162. let channelPromise = self.loop.makePromise(of: Channel.self)
  163. let manager = self.makeConnectionManager { _, _ in
  164. return channelPromise.futureResult
  165. }
  166. // Start the connection.
  167. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  168. .waitForStateChange(from: .idle, to: .connecting) {
  169. let readyChannelMux = manager.getHTTP2Multiplexer()
  170. self.loop.run()
  171. return readyChannelMux
  172. }
  173. // Setup the channel.
  174. let channel = EmbeddedChannel(loop: self.loop)
  175. let h2mux = HTTP2StreamMultiplexer(
  176. mode: .client,
  177. channel: channel,
  178. inboundStreamInitializer: nil
  179. )
  180. try channel.pipeline.addHandler(
  181. GRPCIdleHandler(
  182. connectionManager: manager,
  183. multiplexer: h2mux,
  184. idleTimeout: .minutes(5),
  185. keepalive: .init(),
  186. logger: self.logger
  187. )
  188. ).wait()
  189. channelPromise.succeed(channel)
  190. XCTAssertNoThrow(
  191. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  192. .wait()
  193. )
  194. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  195. try self.waitForStateChange(from: .connecting, to: .ready) {
  196. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  197. XCTAssertNoThrow(try channel.writeInbound(frame))
  198. // Wait for the multiplexer, it _must_ be ready now.
  199. XCTAssertNoThrow(try readyChannelMux.wait())
  200. }
  201. // Go idle. This will shutdown the channel.
  202. try self.waitForStateChange(from: .ready, to: .idle) {
  203. self.loop.advanceTime(by: .minutes(5))
  204. XCTAssertNoThrow(try channel.closeFuture.wait())
  205. }
  206. // Now shutdown.
  207. try self.waitForStateChange(from: .idle, to: .shutdown) {
  208. let shutdown = manager.shutdown()
  209. self.loop.run()
  210. XCTAssertNoThrow(try shutdown.wait())
  211. }
  212. }
  213. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  214. let channelPromise = self.loop.makePromise(of: Channel.self)
  215. let manager = self.makeConnectionManager { _, _ in
  216. return channelPromise.futureResult
  217. }
  218. // Start the connection.
  219. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  220. .waitForStateChange(from: .idle, to: .connecting) {
  221. let readyChannelMux = manager.getHTTP2Multiplexer()
  222. self.loop.run()
  223. return readyChannelMux
  224. }
  225. // Setup the channel.
  226. let channel = EmbeddedChannel(loop: self.loop)
  227. let h2mux = HTTP2StreamMultiplexer(
  228. mode: .client,
  229. channel: channel,
  230. inboundStreamInitializer: nil
  231. )
  232. try channel.pipeline.addHandler(
  233. GRPCIdleHandler(
  234. connectionManager: manager,
  235. multiplexer: h2mux,
  236. idleTimeout: .minutes(5),
  237. keepalive: .init(),
  238. logger: self.logger
  239. )
  240. ).wait()
  241. channelPromise.succeed(channel)
  242. XCTAssertNoThrow(
  243. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  244. .wait()
  245. )
  246. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  247. try self.waitForStateChange(from: .connecting, to: .ready) {
  248. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  249. XCTAssertNoThrow(try channel.writeInbound(frame))
  250. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  251. XCTAssertNoThrow(try readyChannelMux.wait())
  252. }
  253. // "create" a stream; the details don't matter here.
  254. let streamCreated = NIOHTTP2StreamCreatedEvent(
  255. streamID: 1,
  256. localInitialWindowSize: nil,
  257. remoteInitialWindowSize: nil
  258. )
  259. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  260. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  261. self.loop.advanceTime(by: .minutes(5))
  262. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  263. self.waitForStateChange(from: .ready, to: .idle) {
  264. // Close the stream.
  265. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  266. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  267. // ... wait for the idle timeout,
  268. self.loop.advanceTime(by: .minutes(5))
  269. }
  270. // Now shutdown.
  271. try self.waitForStateChange(from: .idle, to: .shutdown) {
  272. let shutdown = manager.shutdown()
  273. self.loop.run()
  274. XCTAssertNoThrow(try shutdown.wait())
  275. }
  276. }
  277. func testConnectAndThenBecomeInactive() throws {
  278. let channelPromise = self.loop.makePromise(of: Channel.self)
  279. let manager = self.makeConnectionManager { _, _ in
  280. return channelPromise.futureResult
  281. }
  282. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  283. .waitForStateChange(from: .idle, to: .connecting) {
  284. let readyChannelMux = manager.getHTTP2Multiplexer()
  285. self.loop.run()
  286. return readyChannelMux
  287. }
  288. // Setup the channel.
  289. let channel = EmbeddedChannel(loop: self.loop)
  290. let h2mux = HTTP2StreamMultiplexer(
  291. mode: .client,
  292. channel: channel,
  293. inboundStreamInitializer: nil
  294. )
  295. try channel.pipeline.addHandler(
  296. GRPCIdleHandler(
  297. connectionManager: manager,
  298. multiplexer: h2mux,
  299. idleTimeout: .minutes(5),
  300. keepalive: .init(),
  301. logger: self.logger
  302. )
  303. ).wait()
  304. channelPromise.succeed(channel)
  305. XCTAssertNoThrow(
  306. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  307. .wait()
  308. )
  309. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  310. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  311. let shutdown = manager.shutdown()
  312. self.loop.run()
  313. XCTAssertNoThrow(try shutdown.wait())
  314. }
  315. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  316. // the `readyChannelMux` should error.
  317. XCTAssertThrowsError(try readyChannelMux.wait())
  318. }
  319. func testConnectOnSecondAttempt() throws {
  320. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  321. let channelFutures: [EventLoopFuture<Channel>] = [
  322. self.loop.makeFailedFuture(DoomedChannelError()),
  323. channelPromise.futureResult,
  324. ]
  325. var channelFutureIterator = channelFutures.makeIterator()
  326. var configuration = self.defaultConfiguration
  327. configuration.connectionBackoff = .oneSecondFixed
  328. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  329. guard let next = channelFutureIterator.next() else {
  330. XCTFail("Too many channels requested")
  331. return self.loop.makeFailedFuture(DoomedChannelError())
  332. }
  333. return next
  334. }
  335. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  336. Change(from: .idle, to: .connecting),
  337. Change(from: .connecting, to: .transientFailure),
  338. ]) {
  339. // Get a HTTP/2 stream multiplexer.
  340. let readyChannelMux = manager.getHTTP2Multiplexer()
  341. self.loop.run()
  342. return readyChannelMux
  343. }
  344. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  345. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  346. self.loop.run()
  347. // Move time forwards by a second to start the next connection attempt.
  348. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  349. self.loop.advanceTime(by: .seconds(1))
  350. }
  351. // Setup the actual channel and complete the promise.
  352. let channel = EmbeddedChannel(loop: self.loop)
  353. let h2mux = HTTP2StreamMultiplexer(
  354. mode: .client,
  355. channel: channel,
  356. inboundStreamInitializer: nil
  357. )
  358. try channel.pipeline.addHandler(
  359. GRPCIdleHandler(
  360. connectionManager: manager,
  361. multiplexer: h2mux,
  362. idleTimeout: .minutes(5),
  363. keepalive: .init(),
  364. logger: self.logger
  365. )
  366. ).wait()
  367. channelPromise.succeed(channel)
  368. XCTAssertNoThrow(
  369. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  370. .wait()
  371. )
  372. // Write a SETTINGS frame on the root stream.
  373. try self.waitForStateChange(from: .connecting, to: .ready) {
  374. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  375. XCTAssertNoThrow(try channel.writeInbound(frame))
  376. }
  377. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  378. XCTAssertNoThrow(try readyChannelMux.wait())
  379. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  380. // Now shutdown.
  381. try self.waitForStateChange(from: .ready, to: .shutdown) {
  382. let shutdown = manager.shutdown()
  383. self.loop.run()
  384. XCTAssertNoThrow(try shutdown.wait())
  385. }
  386. }
  387. func testShutdownWhileConnecting() throws {
  388. let channelPromise = self.loop.makePromise(of: Channel.self)
  389. let manager = self.makeConnectionManager { _, _ in
  390. return channelPromise.futureResult
  391. }
  392. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  393. .waitForStateChange(from: .idle, to: .connecting) {
  394. let readyChannelMux = manager.getHTTP2Multiplexer()
  395. self.loop.run()
  396. return readyChannelMux
  397. }
  398. // Now shutdown.
  399. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  400. from: .connecting,
  401. to: .shutdown
  402. ) {
  403. let shutdown = manager.shutdown()
  404. self.loop.run()
  405. return shutdown
  406. }
  407. // The multiplexer we were requesting should fail.
  408. XCTAssertThrowsError(try readyChannelMux.wait())
  409. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  410. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  411. let channel = try channelPromise.futureResult.wait()
  412. self.loop.run()
  413. XCTAssertNoThrow(try channel.closeFuture.wait())
  414. XCTAssertNoThrow(try shutdownFuture.wait())
  415. }
  416. func testShutdownWhileTransientFailure() throws {
  417. var configuration = self.defaultConfiguration
  418. configuration.connectionBackoff = .oneSecondFixed
  419. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  420. self.loop.makeFailedFuture(DoomedChannelError())
  421. }
  422. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  423. Change(from: .idle, to: .connecting),
  424. Change(from: .connecting, to: .transientFailure),
  425. ]) {
  426. // Get a HTTP/2 stream multiplexer.
  427. let readyChannelMux = manager.getHTTP2Multiplexer()
  428. self.loop.run()
  429. return readyChannelMux
  430. }
  431. // Now shutdown.
  432. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  433. let shutdown = manager.shutdown()
  434. self.loop.run()
  435. XCTAssertNoThrow(try shutdown.wait())
  436. }
  437. // The HTTP/2 stream mux we were requesting should fail.
  438. XCTAssertThrowsError(try readyChannelMux.wait())
  439. }
  440. func testShutdownWhileActive() throws {
  441. let channelPromise = self.loop.makePromise(of: Channel.self)
  442. let manager = self.makeConnectionManager { _, _ in
  443. return channelPromise.futureResult
  444. }
  445. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  446. .waitForStateChange(from: .idle, to: .connecting) {
  447. let readyChannelMux = manager.getHTTP2Multiplexer()
  448. self.loop.run()
  449. return readyChannelMux
  450. }
  451. // Prepare the channel
  452. let channel = EmbeddedChannel(loop: self.loop)
  453. let h2mux = HTTP2StreamMultiplexer(
  454. mode: .client,
  455. channel: channel,
  456. inboundStreamInitializer: nil
  457. )
  458. try channel.pipeline.addHandler(
  459. GRPCIdleHandler(
  460. connectionManager: manager,
  461. multiplexer: h2mux,
  462. idleTimeout: .minutes(5),
  463. keepalive: .init(),
  464. logger: self.logger
  465. )
  466. ).wait()
  467. channelPromise.succeed(channel)
  468. XCTAssertNoThrow(
  469. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  470. .wait()
  471. )
  472. // (No state change expected here: active is an internal state.)
  473. // Now shutdown.
  474. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  475. let shutdown = manager.shutdown()
  476. self.loop.run()
  477. XCTAssertNoThrow(try shutdown.wait())
  478. }
  479. // The HTTP/2 stream multiplexer we were requesting should fail.
  480. XCTAssertThrowsError(try readyChannelMux.wait())
  481. }
  482. func testShutdownWhileShutdown() throws {
  483. let manager = self.makeConnectionManager()
  484. try self.waitForStateChange(from: .idle, to: .shutdown) {
  485. let firstShutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try firstShutdown.wait())
  488. }
  489. let secondShutdown = manager.shutdown()
  490. self.loop.run()
  491. XCTAssertNoThrow(try secondShutdown.wait())
  492. }
  493. func testTransientFailureWhileActive() throws {
  494. var configuration = self.defaultConfiguration
  495. configuration.connectionBackoff = .oneSecondFixed
  496. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  497. let channelFutures: [EventLoopFuture<Channel>] = [
  498. channelPromise.futureResult,
  499. self.loop.makeFailedFuture(DoomedChannelError()),
  500. ]
  501. var channelFutureIterator = channelFutures.makeIterator()
  502. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  503. guard let next = channelFutureIterator.next() else {
  504. XCTFail("Too many channels requested")
  505. return self.loop.makeFailedFuture(DoomedChannelError())
  506. }
  507. return next
  508. }
  509. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  510. .waitForStateChange(from: .idle, to: .connecting) {
  511. let readyChannelMux = manager.getHTTP2Multiplexer()
  512. self.loop.run()
  513. return readyChannelMux
  514. }
  515. // Prepare the channel
  516. let firstChannel = EmbeddedChannel(loop: self.loop)
  517. let h2mux = HTTP2StreamMultiplexer(
  518. mode: .client,
  519. channel: firstChannel,
  520. inboundStreamInitializer: nil
  521. )
  522. try firstChannel.pipeline.addHandler(
  523. GRPCIdleHandler(
  524. connectionManager: manager,
  525. multiplexer: h2mux,
  526. idleTimeout: .minutes(5),
  527. keepalive: .init(),
  528. logger: self.logger
  529. )
  530. ).wait()
  531. channelPromise.succeed(firstChannel)
  532. XCTAssertNoThrow(
  533. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  534. .wait()
  535. )
  536. // (No state change expected here: active is an internal state.)
  537. // Close the channel (simulate e.g. TLS handshake failed)
  538. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  539. XCTAssertNoThrow(try firstChannel.close().wait())
  540. }
  541. // Start connecting again.
  542. self.waitForStateChanges([
  543. Change(from: .transientFailure, to: .connecting),
  544. Change(from: .connecting, to: .transientFailure),
  545. ]) {
  546. self.loop.advanceTime(by: .seconds(1))
  547. }
  548. // Now shutdown
  549. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  550. let shutdown = manager.shutdown()
  551. self.loop.run()
  552. XCTAssertNoThrow(try shutdown.wait())
  553. }
  554. // The channel never came up: it should be throw.
  555. XCTAssertThrowsError(try readyChannelMux.wait())
  556. }
  557. func testTransientFailureWhileReady() throws {
  558. var configuration = self.defaultConfiguration
  559. configuration.connectionBackoff = .oneSecondFixed
  560. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  561. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  562. let channelFutures: [EventLoopFuture<Channel>] = [
  563. firstChannelPromise.futureResult,
  564. secondChannelPromise.futureResult,
  565. ]
  566. var channelFutureIterator = channelFutures.makeIterator()
  567. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  568. guard let next = channelFutureIterator.next() else {
  569. XCTFail("Too many channels requested")
  570. return self.loop.makeFailedFuture(DoomedChannelError())
  571. }
  572. return next
  573. }
  574. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  575. .waitForStateChange(from: .idle, to: .connecting) {
  576. let readyChannelMux = manager.getHTTP2Multiplexer()
  577. self.loop.run()
  578. return readyChannelMux
  579. }
  580. // Prepare the first channel
  581. let firstChannel = EmbeddedChannel(loop: self.loop)
  582. let firstH2mux = HTTP2StreamMultiplexer(
  583. mode: .client,
  584. channel: firstChannel,
  585. inboundStreamInitializer: nil
  586. )
  587. try firstChannel.pipeline.addHandler(
  588. GRPCIdleHandler(
  589. connectionManager: manager,
  590. multiplexer: firstH2mux,
  591. idleTimeout: .minutes(5),
  592. keepalive: .init(),
  593. logger: self.logger
  594. )
  595. ).wait()
  596. firstChannelPromise.succeed(firstChannel)
  597. XCTAssertNoThrow(
  598. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  599. .wait()
  600. )
  601. // Write a SETTINGS frame on the root stream.
  602. try self.waitForStateChange(from: .connecting, to: .ready) {
  603. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  604. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  605. }
  606. // Channel should now be ready.
  607. XCTAssertNoThrow(try readyChannelMux.wait())
  608. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  609. let streamCreated = NIOHTTP2StreamCreatedEvent(
  610. streamID: 1,
  611. localInitialWindowSize: nil,
  612. remoteInitialWindowSize: nil
  613. )
  614. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  615. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  616. XCTAssertNoThrow(try firstChannel.close().wait())
  617. }
  618. // Run to start connecting again.
  619. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  620. self.loop.advanceTime(by: .seconds(1))
  621. }
  622. // Prepare the second channel
  623. let secondChannel = EmbeddedChannel(loop: self.loop)
  624. let secondH2mux = HTTP2StreamMultiplexer(
  625. mode: .client,
  626. channel: secondChannel,
  627. inboundStreamInitializer: nil
  628. )
  629. try secondChannel.pipeline.addHandler(
  630. GRPCIdleHandler(
  631. connectionManager: manager,
  632. multiplexer: secondH2mux,
  633. idleTimeout: .minutes(5),
  634. keepalive: .init(),
  635. logger: self.logger
  636. )
  637. ).wait()
  638. secondChannelPromise.succeed(secondChannel)
  639. XCTAssertNoThrow(
  640. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  641. .wait()
  642. )
  643. // Write a SETTINGS frame on the root stream.
  644. try self.waitForStateChange(from: .connecting, to: .ready) {
  645. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  646. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  647. }
  648. // Now shutdown
  649. try self.waitForStateChange(from: .ready, to: .shutdown) {
  650. let shutdown = manager.shutdown()
  651. self.loop.run()
  652. XCTAssertNoThrow(try shutdown.wait())
  653. }
  654. }
  655. func testGoAwayWhenReady() throws {
  656. let channelPromise = self.loop.makePromise(of: Channel.self)
  657. let manager = self.makeConnectionManager { _, _ in
  658. return channelPromise.futureResult
  659. }
  660. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  661. .waitForStateChange(from: .idle, to: .connecting) {
  662. let readyChannelMux = manager.getHTTP2Multiplexer()
  663. self.loop.run()
  664. return readyChannelMux
  665. }
  666. // Setup the channel.
  667. let channel = EmbeddedChannel(loop: self.loop)
  668. let h2mux = HTTP2StreamMultiplexer(
  669. mode: .client,
  670. channel: channel,
  671. inboundStreamInitializer: nil
  672. )
  673. try channel.pipeline.addHandler(
  674. GRPCIdleHandler(
  675. connectionManager: manager,
  676. multiplexer: h2mux,
  677. idleTimeout: .minutes(5),
  678. keepalive: .init(),
  679. logger: self.logger
  680. )
  681. ).wait()
  682. channelPromise.succeed(channel)
  683. XCTAssertNoThrow(
  684. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  685. .wait()
  686. )
  687. try self.waitForStateChange(from: .connecting, to: .ready) {
  688. // Write a SETTINGS frame on the root stream.
  689. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  690. XCTAssertNoThrow(try channel.writeInbound(frame))
  691. }
  692. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  693. XCTAssertNoThrow(try readyChannelMux.wait())
  694. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  695. // channel to close.
  696. try self.waitForStateChange(from: .ready, to: .idle) {
  697. let goAway = HTTP2Frame(
  698. streamID: .rootStream,
  699. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  700. )
  701. XCTAssertNoThrow(try channel.writeInbound(goAway))
  702. self.loop.run()
  703. }
  704. self.loop.run()
  705. XCTAssertNoThrow(try channel.closeFuture.wait())
  706. // Now shutdown
  707. try self.waitForStateChange(from: .idle, to: .shutdown) {
  708. let shutdown = manager.shutdown()
  709. self.loop.run()
  710. XCTAssertNoThrow(try shutdown.wait())
  711. }
  712. }
  713. func testDoomedOptimisticChannelFromIdle() {
  714. var configuration = self.defaultConfiguration
  715. configuration.callStartBehavior = .fastFailure
  716. let manager = ConnectionManager(
  717. configuration: configuration,
  718. channelProvider: HookedChannelProvider { _, loop in
  719. return loop.makeFailedFuture(DoomedChannelError())
  720. },
  721. connectivityDelegate: nil,
  722. logger: self.logger
  723. )
  724. let candidate = manager.getHTTP2Multiplexer()
  725. self.loop.run()
  726. XCTAssertThrowsError(try candidate.wait())
  727. }
  728. func testDoomedOptimisticChannelFromConnecting() throws {
  729. var configuration = self.defaultConfiguration
  730. configuration.callStartBehavior = .fastFailure
  731. let promise = self.loop.makePromise(of: Channel.self)
  732. let manager = self.makeConnectionManager { _, _ in
  733. return promise.futureResult
  734. }
  735. self.waitForStateChange(from: .idle, to: .connecting) {
  736. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  737. _ = manager.getHTTP2Multiplexer()
  738. self.loop.run()
  739. }
  740. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  741. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  742. self.loop.run()
  743. // Fail the promise.
  744. promise.fail(DoomedChannelError())
  745. XCTAssertThrowsError(try optimisticChannelMux.wait())
  746. }
  747. func testOptimisticChannelFromTransientFailure() throws {
  748. var configuration = self.defaultConfiguration
  749. configuration.callStartBehavior = .fastFailure
  750. configuration.connectionBackoff = ConnectionBackoff()
  751. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  752. self.loop.makeFailedFuture(DoomedChannelError())
  753. }
  754. self.waitForStateChanges([
  755. Change(from: .idle, to: .connecting),
  756. Change(from: .connecting, to: .transientFailure),
  757. ]) {
  758. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  759. _ = manager.getHTTP2Multiplexer()
  760. self.loop.run()
  761. }
  762. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  763. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  764. self.loop.run()
  765. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  766. XCTAssertTrue(error is DoomedChannelError)
  767. }
  768. }
  769. func testOptimisticChannelFromShutdown() throws {
  770. var configuration = self.defaultConfiguration
  771. configuration.callStartBehavior = .fastFailure
  772. let manager = self.makeConnectionManager { _, _ in
  773. return self.loop.makeFailedFuture(DoomedChannelError())
  774. }
  775. let shutdown = manager.shutdown()
  776. self.loop.run()
  777. XCTAssertNoThrow(try shutdown.wait())
  778. // Get a channel optimistically. It'll fail, obviously.
  779. let channelMux = manager.getHTTP2Multiplexer()
  780. self.loop.run()
  781. XCTAssertThrowsError(try channelMux.wait())
  782. }
  783. func testForceIdleAfterInactive() throws {
  784. let channelPromise = self.loop.makePromise(of: Channel.self)
  785. let manager = self.makeConnectionManager { _, _ in
  786. return channelPromise.futureResult
  787. }
  788. // Start the connection.
  789. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  790. from: .idle,
  791. to: .connecting
  792. ) {
  793. let readyChannelMux = manager.getHTTP2Multiplexer()
  794. self.loop.run()
  795. return readyChannelMux
  796. }
  797. // Setup the real channel and activate it.
  798. let channel = EmbeddedChannel(loop: self.loop)
  799. let h2mux = HTTP2StreamMultiplexer(
  800. mode: .client,
  801. channel: channel,
  802. inboundStreamInitializer: nil
  803. )
  804. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  805. GRPCIdleHandler(
  806. connectionManager: manager,
  807. multiplexer: h2mux,
  808. idleTimeout: .minutes(5),
  809. keepalive: .init(),
  810. logger: self.logger
  811. ),
  812. ]).wait())
  813. channelPromise.succeed(channel)
  814. self.loop.run()
  815. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  816. XCTAssertNoThrow(try connect.wait())
  817. // Write a SETTINGS frame on the root stream.
  818. try self.waitForStateChange(from: .connecting, to: .ready) {
  819. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  820. XCTAssertNoThrow(try channel.writeInbound(frame))
  821. }
  822. // The channel should now be ready.
  823. XCTAssertNoThrow(try readyChannelMux.wait())
  824. // Now drop the connection.
  825. try self.waitForStateChange(from: .ready, to: .shutdown) {
  826. let shutdown = manager.shutdown()
  827. self.loop.run()
  828. XCTAssertNoThrow(try shutdown.wait())
  829. }
  830. }
  831. func testCloseWithoutActiveRPCs() throws {
  832. let channelPromise = self.loop.makePromise(of: Channel.self)
  833. let manager = self.makeConnectionManager { _, _ in
  834. return channelPromise.futureResult
  835. }
  836. // Start the connection.
  837. let readyChannelMux = self.waitForStateChange(
  838. from: .idle,
  839. to: .connecting
  840. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  841. let readyChannelMux = manager.getHTTP2Multiplexer()
  842. self.loop.run()
  843. return readyChannelMux
  844. }
  845. // Setup the actual channel and activate it.
  846. let channel = EmbeddedChannel(loop: self.loop)
  847. let h2mux = HTTP2StreamMultiplexer(
  848. mode: .client,
  849. channel: channel,
  850. inboundStreamInitializer: nil
  851. )
  852. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  853. GRPCIdleHandler(
  854. connectionManager: manager,
  855. multiplexer: h2mux,
  856. idleTimeout: .minutes(5),
  857. keepalive: .init(),
  858. logger: self.logger
  859. ),
  860. ]).wait())
  861. channelPromise.succeed(channel)
  862. self.loop.run()
  863. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  864. XCTAssertNoThrow(try connect.wait())
  865. // "ready" the connection.
  866. try self.waitForStateChange(from: .connecting, to: .ready) {
  867. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  868. XCTAssertNoThrow(try channel.writeInbound(frame))
  869. }
  870. // The HTTP/2 stream multiplexer should now be ready.
  871. XCTAssertNoThrow(try readyChannelMux.wait())
  872. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  873. // failure state.
  874. self.waitForStateChange(from: .ready, to: .idle) {
  875. channel.pipeline.fireChannelInactive()
  876. }
  877. }
  878. func testIdleErrorDoesNothing() throws {
  879. let manager = self.makeConnectionManager()
  880. // Dropping an error on this manager should be fine.
  881. manager.channelError(DoomedChannelError())
  882. // Shutting down is then safe.
  883. try self.waitForStateChange(from: .idle, to: .shutdown) {
  884. let shutdown = manager.shutdown()
  885. self.loop.run()
  886. XCTAssertNoThrow(try shutdown.wait())
  887. }
  888. }
  889. func testHTTP2Delegates() throws {
  890. let channel = EmbeddedChannel(loop: self.loop)
  891. defer {
  892. XCTAssertNoThrow(try channel.finish())
  893. }
  894. let multiplexer = HTTP2StreamMultiplexer(
  895. mode: .client,
  896. channel: channel,
  897. inboundStreamInitializer: nil
  898. )
  899. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  900. var streamsClosed = 0
  901. var maxConcurrentStreams = 0
  902. func streamClosed(_ connectionManager: ConnectionManager) {
  903. self.streamsClosed += 1
  904. }
  905. func receivedSettingsMaxConcurrentStreams(
  906. _ connectionManager: ConnectionManager,
  907. maxConcurrentStreams: Int
  908. ) {
  909. self.maxConcurrentStreams = maxConcurrentStreams
  910. }
  911. }
  912. let http2 = HTTP2Delegate()
  913. let manager = ConnectionManager(
  914. eventLoop: self.loop,
  915. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  916. let idleHandler = GRPCIdleHandler(
  917. connectionManager: manager,
  918. multiplexer: multiplexer,
  919. idleTimeout: .minutes(5),
  920. keepalive: ClientConnectionKeepalive(),
  921. logger: self.logger
  922. )
  923. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  924. // us to just fire stream created/closed events into the channel.
  925. do {
  926. try channel.pipeline.syncOperations.addHandler(idleHandler)
  927. } catch {
  928. return eventLoop.makeFailedFuture(error)
  929. }
  930. return eventLoop.makeSucceededFuture(channel)
  931. },
  932. callStartBehavior: .waitsForConnectivity,
  933. connectionBackoff: ConnectionBackoff(),
  934. connectivityDelegate: nil,
  935. http2Delegate: http2,
  936. logger: self.logger
  937. )
  938. // Start connecting.
  939. let futureMultiplexer = manager.getHTTP2Multiplexer()
  940. self.loop.run()
  941. // Do the actual connecting.
  942. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  943. // The channel isn't ready until it's seen a SETTINGS frame.
  944. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  945. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  946. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  947. }
  948. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  949. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  950. // max concurrent streams.
  951. XCTAssertNoThrow(try futureMultiplexer.wait())
  952. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  953. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  954. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  955. // Open some streams.
  956. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  957. let streamCreated = NIOHTTP2StreamCreatedEvent(
  958. streamID: streamID,
  959. localInitialWindowSize: nil,
  960. remoteInitialWindowSize: nil
  961. )
  962. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  963. }
  964. // ... and then close them.
  965. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  966. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  967. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  968. }
  969. XCTAssertEqual(http2.streamsClosed, 4)
  970. }
  971. }
  972. internal struct Change: Hashable, CustomStringConvertible {
  973. var from: ConnectivityState
  974. var to: ConnectivityState
  975. var description: String {
  976. return "\(self.from) → \(self.to)"
  977. }
  978. }
  979. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  980. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  981. private let semaphore = DispatchSemaphore(value: 0)
  982. private var expectation: Expectation = .noExpectation
  983. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  984. private enum Expectation {
  985. /// We have no expectation of any changes. We'll just ignore any changes.
  986. case noExpectation
  987. /// We expect one change.
  988. case one((Change) -> Void)
  989. /// We expect 'count' changes.
  990. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  991. var count: Int {
  992. switch self {
  993. case .noExpectation:
  994. return 0
  995. case .one:
  996. return 1
  997. case let .some(count, _, _):
  998. return count
  999. }
  1000. }
  1001. }
  1002. func connectivityStateDidChange(from oldState: ConnectivityState,
  1003. to newState: ConnectivityState) {
  1004. self.serialQueue.async {
  1005. switch self.expectation {
  1006. case let .one(verify):
  1007. // We don't care about future changes.
  1008. self.expectation = .noExpectation
  1009. // Verify and notify.
  1010. verify(Change(from: oldState, to: newState))
  1011. self.semaphore.signal()
  1012. case .some(let count, var recorded, let verify):
  1013. recorded.append(Change(from: oldState, to: newState))
  1014. if recorded.count == count {
  1015. // We don't care about future changes.
  1016. self.expectation = .noExpectation
  1017. // Verify and notify.
  1018. verify(recorded)
  1019. self.semaphore.signal()
  1020. } else {
  1021. // Still need more responses.
  1022. self.expectation = .some(count: count, recorded: recorded, verify)
  1023. }
  1024. case .noExpectation:
  1025. // Ignore any changes.
  1026. ()
  1027. }
  1028. }
  1029. }
  1030. func connectionStartedQuiescing() {
  1031. self.serialQueue.async {
  1032. self.quiescingSemaphore.signal()
  1033. }
  1034. }
  1035. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1036. self.serialQueue.async {
  1037. self.expectation = .some(count: count, recorded: [], verify)
  1038. }
  1039. }
  1040. func expectChange(verify: @escaping (Change) -> Void) {
  1041. self.serialQueue.async {
  1042. self.expectation = .one(verify)
  1043. }
  1044. }
  1045. func waitForExpectedChanges(
  1046. timeout: DispatchTimeInterval,
  1047. file: StaticString = #file,
  1048. line: UInt = #line
  1049. ) {
  1050. let result = self.semaphore.wait(timeout: .now() + timeout)
  1051. switch result {
  1052. case .success:
  1053. ()
  1054. case .timedOut:
  1055. XCTFail(
  1056. "Timed out before verifying \(self.expectation.count) change(s)",
  1057. file: file, line: line
  1058. )
  1059. }
  1060. }
  1061. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1062. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1063. switch result {
  1064. case .success:
  1065. ()
  1066. case .timedOut:
  1067. XCTFail("Timed out waiting for connection to start quiescing")
  1068. }
  1069. }
  1070. }
  1071. private extension ConnectionBackoff {
  1072. static let oneSecondFixed = ConnectionBackoff(
  1073. initialBackoff: 1.0,
  1074. maximumBackoff: 1.0,
  1075. multiplier: 1.0,
  1076. jitter: 0.0
  1077. )
  1078. }
  1079. private struct DoomedChannelError: Error {}
  1080. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1081. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1082. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1083. self.provider = provider
  1084. }
  1085. func makeChannel(
  1086. managedBy connectionManager: ConnectionManager,
  1087. onEventLoop eventLoop: EventLoop,
  1088. connectTimeout: TimeAmount?,
  1089. logger: Logger
  1090. ) -> EventLoopFuture<Channel> {
  1091. return self.provider(connectionManager, eventLoop)
  1092. }
  1093. }
  1094. extension ConnectionManager {
  1095. // For backwards compatibility, to avoid large diffs in these tests.
  1096. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1097. return self.shutdown(mode: .forceful)
  1098. }
  1099. }