ConnectionManagerTests.swift 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIOCore
  20. import NIOEmbedded
  21. import NIOHTTP2
  22. import XCTest
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #file,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #file,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let channel = manager.getHTTP2Multiplexer()
  107. self.loop.run()
  108. return channel
  109. }
  110. self.waitForStateChange(from: .connecting, to: .shutdown) {
  111. channelPromise.fail(DoomedChannelError())
  112. }
  113. XCTAssertThrowsError(try multiplexer.wait()) {
  114. XCTAssertTrue($0 is DoomedChannelError)
  115. }
  116. }
  117. func testConnectAndDisconnect() throws {
  118. let channelPromise = self.loop.makePromise(of: Channel.self)
  119. let manager = self.makeConnectionManager { _, _ in
  120. return channelPromise.futureResult
  121. }
  122. // Start the connection.
  123. self.waitForStateChange(from: .idle, to: .connecting) {
  124. _ = manager.getHTTP2Multiplexer()
  125. self.loop.run()
  126. }
  127. // Setup the real channel and activate it.
  128. let channel = EmbeddedChannel(loop: self.loop)
  129. let h2mux = HTTP2StreamMultiplexer(
  130. mode: .client,
  131. channel: channel,
  132. inboundStreamInitializer: nil
  133. )
  134. try channel.pipeline.addHandler(
  135. GRPCIdleHandler(
  136. connectionManager: manager,
  137. multiplexer: h2mux,
  138. idleTimeout: .minutes(5),
  139. keepalive: .init(),
  140. logger: self.logger
  141. )
  142. ).wait()
  143. channelPromise.succeed(channel)
  144. XCTAssertNoThrow(
  145. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  146. .wait()
  147. )
  148. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  149. try self.waitForStateChange(from: .connecting, to: .ready) {
  150. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  151. XCTAssertNoThrow(try channel.writeInbound(frame))
  152. }
  153. // Close the channel.
  154. try self.waitForStateChange(from: .ready, to: .shutdown) {
  155. // Now the channel should be available: shut it down.
  156. let shutdown = manager.shutdown()
  157. self.loop.run()
  158. XCTAssertNoThrow(try shutdown.wait())
  159. }
  160. }
  161. func testConnectAndIdle() throws {
  162. let channelPromise = self.loop.makePromise(of: Channel.self)
  163. let manager = self.makeConnectionManager { _, _ in
  164. return channelPromise.futureResult
  165. }
  166. // Start the connection.
  167. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  168. .waitForStateChange(from: .idle, to: .connecting) {
  169. let readyChannelMux = manager.getHTTP2Multiplexer()
  170. self.loop.run()
  171. return readyChannelMux
  172. }
  173. // Setup the channel.
  174. let channel = EmbeddedChannel(loop: self.loop)
  175. let h2mux = HTTP2StreamMultiplexer(
  176. mode: .client,
  177. channel: channel,
  178. inboundStreamInitializer: nil
  179. )
  180. try channel.pipeline.addHandler(
  181. GRPCIdleHandler(
  182. connectionManager: manager,
  183. multiplexer: h2mux,
  184. idleTimeout: .minutes(5),
  185. keepalive: .init(),
  186. logger: self.logger
  187. )
  188. ).wait()
  189. channelPromise.succeed(channel)
  190. XCTAssertNoThrow(
  191. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  192. .wait()
  193. )
  194. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  195. try self.waitForStateChange(from: .connecting, to: .ready) {
  196. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  197. XCTAssertNoThrow(try channel.writeInbound(frame))
  198. // Wait for the multiplexer, it _must_ be ready now.
  199. XCTAssertNoThrow(try readyChannelMux.wait())
  200. }
  201. // Go idle. This will shutdown the channel.
  202. try self.waitForStateChange(from: .ready, to: .idle) {
  203. self.loop.advanceTime(by: .minutes(5))
  204. XCTAssertNoThrow(try channel.closeFuture.wait())
  205. }
  206. // Now shutdown.
  207. try self.waitForStateChange(from: .idle, to: .shutdown) {
  208. let shutdown = manager.shutdown()
  209. self.loop.run()
  210. XCTAssertNoThrow(try shutdown.wait())
  211. }
  212. }
  213. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  214. let channelPromise = self.loop.makePromise(of: Channel.self)
  215. let manager = self.makeConnectionManager { _, _ in
  216. return channelPromise.futureResult
  217. }
  218. // Start the connection.
  219. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  220. .waitForStateChange(from: .idle, to: .connecting) {
  221. let readyChannelMux = manager.getHTTP2Multiplexer()
  222. self.loop.run()
  223. return readyChannelMux
  224. }
  225. // Setup the channel.
  226. let channel = EmbeddedChannel(loop: self.loop)
  227. let h2mux = HTTP2StreamMultiplexer(
  228. mode: .client,
  229. channel: channel,
  230. inboundStreamInitializer: nil
  231. )
  232. try channel.pipeline.addHandler(
  233. GRPCIdleHandler(
  234. connectionManager: manager,
  235. multiplexer: h2mux,
  236. idleTimeout: .minutes(5),
  237. keepalive: .init(),
  238. logger: self.logger
  239. )
  240. ).wait()
  241. channelPromise.succeed(channel)
  242. XCTAssertNoThrow(
  243. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  244. .wait()
  245. )
  246. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  247. try self.waitForStateChange(from: .connecting, to: .ready) {
  248. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  249. XCTAssertNoThrow(try channel.writeInbound(frame))
  250. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  251. XCTAssertNoThrow(try readyChannelMux.wait())
  252. }
  253. // "create" a stream; the details don't matter here.
  254. let streamCreated = NIOHTTP2StreamCreatedEvent(
  255. streamID: 1,
  256. localInitialWindowSize: nil,
  257. remoteInitialWindowSize: nil
  258. )
  259. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  260. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  261. self.loop.advanceTime(by: .minutes(5))
  262. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  263. self.waitForStateChange(from: .ready, to: .idle) {
  264. // Close the stream.
  265. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  266. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  267. // ... wait for the idle timeout,
  268. self.loop.advanceTime(by: .minutes(5))
  269. }
  270. // Now shutdown.
  271. try self.waitForStateChange(from: .idle, to: .shutdown) {
  272. let shutdown = manager.shutdown()
  273. self.loop.run()
  274. XCTAssertNoThrow(try shutdown.wait())
  275. }
  276. }
  277. func testConnectAndThenBecomeInactive() throws {
  278. let channelPromise = self.loop.makePromise(of: Channel.self)
  279. let manager = self.makeConnectionManager { _, _ in
  280. return channelPromise.futureResult
  281. }
  282. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  283. .waitForStateChange(from: .idle, to: .connecting) {
  284. let readyChannelMux = manager.getHTTP2Multiplexer()
  285. self.loop.run()
  286. return readyChannelMux
  287. }
  288. // Setup the channel.
  289. let channel = EmbeddedChannel(loop: self.loop)
  290. let h2mux = HTTP2StreamMultiplexer(
  291. mode: .client,
  292. channel: channel,
  293. inboundStreamInitializer: nil
  294. )
  295. try channel.pipeline.addHandler(
  296. GRPCIdleHandler(
  297. connectionManager: manager,
  298. multiplexer: h2mux,
  299. idleTimeout: .minutes(5),
  300. keepalive: .init(),
  301. logger: self.logger
  302. )
  303. ).wait()
  304. channelPromise.succeed(channel)
  305. XCTAssertNoThrow(
  306. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  307. .wait()
  308. )
  309. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  310. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  311. let shutdown = manager.shutdown()
  312. self.loop.run()
  313. XCTAssertNoThrow(try shutdown.wait())
  314. }
  315. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  316. // the `readyChannelMux` should error.
  317. XCTAssertThrowsError(try readyChannelMux.wait())
  318. }
  319. func testConnectOnSecondAttempt() throws {
  320. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  321. let channelFutures: [EventLoopFuture<Channel>] = [
  322. self.loop.makeFailedFuture(DoomedChannelError()),
  323. channelPromise.futureResult,
  324. ]
  325. var channelFutureIterator = channelFutures.makeIterator()
  326. var configuration = self.defaultConfiguration
  327. configuration.connectionBackoff = .oneSecondFixed
  328. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  329. guard let next = channelFutureIterator.next() else {
  330. XCTFail("Too many channels requested")
  331. return self.loop.makeFailedFuture(DoomedChannelError())
  332. }
  333. return next
  334. }
  335. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  336. Change(from: .idle, to: .connecting),
  337. Change(from: .connecting, to: .transientFailure),
  338. ]) {
  339. // Get a HTTP/2 stream multiplexer.
  340. let readyChannelMux = manager.getHTTP2Multiplexer()
  341. self.loop.run()
  342. return readyChannelMux
  343. }
  344. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  345. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  346. self.loop.run()
  347. // Move time forwards by a second to start the next connection attempt.
  348. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  349. self.loop.advanceTime(by: .seconds(1))
  350. }
  351. // Setup the actual channel and complete the promise.
  352. let channel = EmbeddedChannel(loop: self.loop)
  353. let h2mux = HTTP2StreamMultiplexer(
  354. mode: .client,
  355. channel: channel,
  356. inboundStreamInitializer: nil
  357. )
  358. try channel.pipeline.addHandler(
  359. GRPCIdleHandler(
  360. connectionManager: manager,
  361. multiplexer: h2mux,
  362. idleTimeout: .minutes(5),
  363. keepalive: .init(),
  364. logger: self.logger
  365. )
  366. ).wait()
  367. channelPromise.succeed(channel)
  368. XCTAssertNoThrow(
  369. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  370. .wait()
  371. )
  372. // Write a SETTINGS frame on the root stream.
  373. try self.waitForStateChange(from: .connecting, to: .ready) {
  374. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  375. XCTAssertNoThrow(try channel.writeInbound(frame))
  376. }
  377. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  378. XCTAssertNoThrow(try readyChannelMux.wait())
  379. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  380. // Now shutdown.
  381. try self.waitForStateChange(from: .ready, to: .shutdown) {
  382. let shutdown = manager.shutdown()
  383. self.loop.run()
  384. XCTAssertNoThrow(try shutdown.wait())
  385. }
  386. }
  387. func testShutdownWhileConnecting() throws {
  388. let channelPromise = self.loop.makePromise(of: Channel.self)
  389. let manager = self.makeConnectionManager { _, _ in
  390. return channelPromise.futureResult
  391. }
  392. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  393. .waitForStateChange(from: .idle, to: .connecting) {
  394. let readyChannelMux = manager.getHTTP2Multiplexer()
  395. self.loop.run()
  396. return readyChannelMux
  397. }
  398. // Now shutdown.
  399. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  400. from: .connecting,
  401. to: .shutdown
  402. ) {
  403. let shutdown = manager.shutdown()
  404. self.loop.run()
  405. return shutdown
  406. }
  407. // The multiplexer we were requesting should fail.
  408. XCTAssertThrowsError(try readyChannelMux.wait())
  409. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  410. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  411. let channel = try channelPromise.futureResult.wait()
  412. self.loop.run()
  413. XCTAssertNoThrow(try channel.closeFuture.wait())
  414. XCTAssertNoThrow(try shutdownFuture.wait())
  415. }
  416. func testShutdownWhileTransientFailure() throws {
  417. var configuration = self.defaultConfiguration
  418. configuration.connectionBackoff = .oneSecondFixed
  419. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  420. self.loop.makeFailedFuture(DoomedChannelError())
  421. }
  422. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  423. Change(from: .idle, to: .connecting),
  424. Change(from: .connecting, to: .transientFailure),
  425. ]) {
  426. // Get a HTTP/2 stream multiplexer.
  427. let readyChannelMux = manager.getHTTP2Multiplexer()
  428. self.loop.run()
  429. return readyChannelMux
  430. }
  431. // Now shutdown.
  432. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  433. let shutdown = manager.shutdown()
  434. self.loop.run()
  435. XCTAssertNoThrow(try shutdown.wait())
  436. }
  437. // The HTTP/2 stream mux we were requesting should fail.
  438. XCTAssertThrowsError(try readyChannelMux.wait())
  439. }
  440. func testShutdownWhileActive() throws {
  441. let channelPromise = self.loop.makePromise(of: Channel.self)
  442. let manager = self.makeConnectionManager { _, _ in
  443. return channelPromise.futureResult
  444. }
  445. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  446. .waitForStateChange(from: .idle, to: .connecting) {
  447. let readyChannelMux = manager.getHTTP2Multiplexer()
  448. self.loop.run()
  449. return readyChannelMux
  450. }
  451. // Prepare the channel
  452. let channel = EmbeddedChannel(loop: self.loop)
  453. let h2mux = HTTP2StreamMultiplexer(
  454. mode: .client,
  455. channel: channel,
  456. inboundStreamInitializer: nil
  457. )
  458. try channel.pipeline.addHandler(
  459. GRPCIdleHandler(
  460. connectionManager: manager,
  461. multiplexer: h2mux,
  462. idleTimeout: .minutes(5),
  463. keepalive: .init(),
  464. logger: self.logger
  465. )
  466. ).wait()
  467. channelPromise.succeed(channel)
  468. XCTAssertNoThrow(
  469. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  470. .wait()
  471. )
  472. // (No state change expected here: active is an internal state.)
  473. // Now shutdown.
  474. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  475. let shutdown = manager.shutdown()
  476. self.loop.run()
  477. XCTAssertNoThrow(try shutdown.wait())
  478. }
  479. // The HTTP/2 stream multiplexer we were requesting should fail.
  480. XCTAssertThrowsError(try readyChannelMux.wait())
  481. }
  482. func testShutdownWhileShutdown() throws {
  483. let manager = self.makeConnectionManager()
  484. try self.waitForStateChange(from: .idle, to: .shutdown) {
  485. let firstShutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try firstShutdown.wait())
  488. }
  489. let secondShutdown = manager.shutdown()
  490. self.loop.run()
  491. XCTAssertNoThrow(try secondShutdown.wait())
  492. }
  493. func testTransientFailureWhileActive() throws {
  494. var configuration = self.defaultConfiguration
  495. configuration.connectionBackoff = .oneSecondFixed
  496. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  497. let channelFutures: [EventLoopFuture<Channel>] = [
  498. channelPromise.futureResult,
  499. self.loop.makeFailedFuture(DoomedChannelError()),
  500. ]
  501. var channelFutureIterator = channelFutures.makeIterator()
  502. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  503. guard let next = channelFutureIterator.next() else {
  504. XCTFail("Too many channels requested")
  505. return self.loop.makeFailedFuture(DoomedChannelError())
  506. }
  507. return next
  508. }
  509. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  510. .waitForStateChange(from: .idle, to: .connecting) {
  511. let readyChannelMux = manager.getHTTP2Multiplexer()
  512. self.loop.run()
  513. return readyChannelMux
  514. }
  515. // Prepare the channel
  516. let firstChannel = EmbeddedChannel(loop: self.loop)
  517. let h2mux = HTTP2StreamMultiplexer(
  518. mode: .client,
  519. channel: firstChannel,
  520. inboundStreamInitializer: nil
  521. )
  522. try firstChannel.pipeline.addHandler(
  523. GRPCIdleHandler(
  524. connectionManager: manager,
  525. multiplexer: h2mux,
  526. idleTimeout: .minutes(5),
  527. keepalive: .init(),
  528. logger: self.logger
  529. )
  530. ).wait()
  531. channelPromise.succeed(firstChannel)
  532. XCTAssertNoThrow(
  533. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  534. .wait()
  535. )
  536. // (No state change expected here: active is an internal state.)
  537. // Close the channel (simulate e.g. TLS handshake failed)
  538. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  539. XCTAssertNoThrow(try firstChannel.close().wait())
  540. }
  541. // Start connecting again.
  542. self.waitForStateChanges([
  543. Change(from: .transientFailure, to: .connecting),
  544. Change(from: .connecting, to: .transientFailure),
  545. ]) {
  546. self.loop.advanceTime(by: .seconds(1))
  547. }
  548. // Now shutdown
  549. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  550. let shutdown = manager.shutdown()
  551. self.loop.run()
  552. XCTAssertNoThrow(try shutdown.wait())
  553. }
  554. // The channel never came up: it should be throw.
  555. XCTAssertThrowsError(try readyChannelMux.wait())
  556. }
  557. func testTransientFailureWhileReady() throws {
  558. var configuration = self.defaultConfiguration
  559. configuration.connectionBackoff = .oneSecondFixed
  560. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  561. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  562. let channelFutures: [EventLoopFuture<Channel>] = [
  563. firstChannelPromise.futureResult,
  564. secondChannelPromise.futureResult,
  565. ]
  566. var channelFutureIterator = channelFutures.makeIterator()
  567. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  568. guard let next = channelFutureIterator.next() else {
  569. XCTFail("Too many channels requested")
  570. return self.loop.makeFailedFuture(DoomedChannelError())
  571. }
  572. return next
  573. }
  574. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  575. .waitForStateChange(from: .idle, to: .connecting) {
  576. let readyChannelMux = manager.getHTTP2Multiplexer()
  577. self.loop.run()
  578. return readyChannelMux
  579. }
  580. // Prepare the first channel
  581. let firstChannel = EmbeddedChannel(loop: self.loop)
  582. let firstH2mux = HTTP2StreamMultiplexer(
  583. mode: .client,
  584. channel: firstChannel,
  585. inboundStreamInitializer: nil
  586. )
  587. try firstChannel.pipeline.addHandler(
  588. GRPCIdleHandler(
  589. connectionManager: manager,
  590. multiplexer: firstH2mux,
  591. idleTimeout: .minutes(5),
  592. keepalive: .init(),
  593. logger: self.logger
  594. )
  595. ).wait()
  596. firstChannelPromise.succeed(firstChannel)
  597. XCTAssertNoThrow(
  598. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  599. .wait()
  600. )
  601. // Write a SETTINGS frame on the root stream.
  602. try self.waitForStateChange(from: .connecting, to: .ready) {
  603. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  604. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  605. }
  606. // Channel should now be ready.
  607. XCTAssertNoThrow(try readyChannelMux.wait())
  608. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  609. let streamCreated = NIOHTTP2StreamCreatedEvent(
  610. streamID: 1,
  611. localInitialWindowSize: nil,
  612. remoteInitialWindowSize: nil
  613. )
  614. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  615. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  616. XCTAssertNoThrow(try firstChannel.close().wait())
  617. }
  618. // Run to start connecting again.
  619. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  620. self.loop.advanceTime(by: .seconds(1))
  621. }
  622. // Prepare the second channel
  623. let secondChannel = EmbeddedChannel(loop: self.loop)
  624. let secondH2mux = HTTP2StreamMultiplexer(
  625. mode: .client,
  626. channel: secondChannel,
  627. inboundStreamInitializer: nil
  628. )
  629. try secondChannel.pipeline.addHandler(
  630. GRPCIdleHandler(
  631. connectionManager: manager,
  632. multiplexer: secondH2mux,
  633. idleTimeout: .minutes(5),
  634. keepalive: .init(),
  635. logger: self.logger
  636. )
  637. ).wait()
  638. secondChannelPromise.succeed(secondChannel)
  639. XCTAssertNoThrow(
  640. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  641. .wait()
  642. )
  643. // Write a SETTINGS frame on the root stream.
  644. try self.waitForStateChange(from: .connecting, to: .ready) {
  645. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  646. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  647. }
  648. // Now shutdown
  649. try self.waitForStateChange(from: .ready, to: .shutdown) {
  650. let shutdown = manager.shutdown()
  651. self.loop.run()
  652. XCTAssertNoThrow(try shutdown.wait())
  653. }
  654. }
  655. func testGoAwayWhenReady() throws {
  656. let channelPromise = self.loop.makePromise(of: Channel.self)
  657. let manager = self.makeConnectionManager { _, _ in
  658. return channelPromise.futureResult
  659. }
  660. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  661. .waitForStateChange(from: .idle, to: .connecting) {
  662. let readyChannelMux = manager.getHTTP2Multiplexer()
  663. self.loop.run()
  664. return readyChannelMux
  665. }
  666. // Setup the channel.
  667. let channel = EmbeddedChannel(loop: self.loop)
  668. let h2mux = HTTP2StreamMultiplexer(
  669. mode: .client,
  670. channel: channel,
  671. inboundStreamInitializer: nil
  672. )
  673. try channel.pipeline.addHandler(
  674. GRPCIdleHandler(
  675. connectionManager: manager,
  676. multiplexer: h2mux,
  677. idleTimeout: .minutes(5),
  678. keepalive: .init(),
  679. logger: self.logger
  680. )
  681. ).wait()
  682. channelPromise.succeed(channel)
  683. XCTAssertNoThrow(
  684. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  685. .wait()
  686. )
  687. try self.waitForStateChange(from: .connecting, to: .ready) {
  688. // Write a SETTINGS frame on the root stream.
  689. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  690. XCTAssertNoThrow(try channel.writeInbound(frame))
  691. }
  692. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  693. XCTAssertNoThrow(try readyChannelMux.wait())
  694. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  695. // channel to close.
  696. try self.waitForStateChange(from: .ready, to: .idle) {
  697. let goAway = HTTP2Frame(
  698. streamID: .rootStream,
  699. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  700. )
  701. XCTAssertNoThrow(try channel.writeInbound(goAway))
  702. self.loop.run()
  703. }
  704. self.loop.run()
  705. XCTAssertNoThrow(try channel.closeFuture.wait())
  706. // Now shutdown
  707. try self.waitForStateChange(from: .idle, to: .shutdown) {
  708. let shutdown = manager.shutdown()
  709. self.loop.run()
  710. XCTAssertNoThrow(try shutdown.wait())
  711. }
  712. }
  713. func testDoomedOptimisticChannelFromIdle() {
  714. var configuration = self.defaultConfiguration
  715. configuration.callStartBehavior = .fastFailure
  716. let manager = ConnectionManager(
  717. configuration: configuration,
  718. channelProvider: HookedChannelProvider { _, loop in
  719. return loop.makeFailedFuture(DoomedChannelError())
  720. },
  721. connectivityDelegate: nil,
  722. logger: self.logger
  723. )
  724. let candidate = manager.getHTTP2Multiplexer()
  725. self.loop.run()
  726. XCTAssertThrowsError(try candidate.wait())
  727. }
  728. func testDoomedOptimisticChannelFromConnecting() throws {
  729. var configuration = self.defaultConfiguration
  730. configuration.callStartBehavior = .fastFailure
  731. let promise = self.loop.makePromise(of: Channel.self)
  732. let manager = self.makeConnectionManager { _, _ in
  733. return promise.futureResult
  734. }
  735. self.waitForStateChange(from: .idle, to: .connecting) {
  736. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  737. _ = manager.getHTTP2Multiplexer()
  738. self.loop.run()
  739. }
  740. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  741. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  742. self.loop.run()
  743. // Fail the promise.
  744. promise.fail(DoomedChannelError())
  745. XCTAssertThrowsError(try optimisticChannelMux.wait())
  746. }
  747. func testOptimisticChannelFromTransientFailure() throws {
  748. var configuration = self.defaultConfiguration
  749. configuration.callStartBehavior = .fastFailure
  750. configuration.connectionBackoff = ConnectionBackoff()
  751. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  752. self.loop.makeFailedFuture(DoomedChannelError())
  753. }
  754. self.waitForStateChanges([
  755. Change(from: .idle, to: .connecting),
  756. Change(from: .connecting, to: .transientFailure),
  757. ]) {
  758. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  759. _ = manager.getHTTP2Multiplexer()
  760. self.loop.run()
  761. }
  762. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  763. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  764. self.loop.run()
  765. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  766. XCTAssertTrue(error is DoomedChannelError)
  767. }
  768. }
  769. func testOptimisticChannelFromShutdown() throws {
  770. var configuration = self.defaultConfiguration
  771. configuration.callStartBehavior = .fastFailure
  772. let manager = self.makeConnectionManager { _, _ in
  773. return self.loop.makeFailedFuture(DoomedChannelError())
  774. }
  775. let shutdown = manager.shutdown()
  776. self.loop.run()
  777. XCTAssertNoThrow(try shutdown.wait())
  778. // Get a channel optimistically. It'll fail, obviously.
  779. let channelMux = manager.getHTTP2Multiplexer()
  780. self.loop.run()
  781. XCTAssertThrowsError(try channelMux.wait())
  782. }
  783. func testForceIdleAfterInactive() throws {
  784. let channelPromise = self.loop.makePromise(of: Channel.self)
  785. let manager = self.makeConnectionManager { _, _ in
  786. return channelPromise.futureResult
  787. }
  788. // Start the connection.
  789. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  790. from: .idle,
  791. to: .connecting
  792. ) {
  793. let readyChannelMux = manager.getHTTP2Multiplexer()
  794. self.loop.run()
  795. return readyChannelMux
  796. }
  797. // Setup the real channel and activate it.
  798. let channel = EmbeddedChannel(loop: self.loop)
  799. let h2mux = HTTP2StreamMultiplexer(
  800. mode: .client,
  801. channel: channel,
  802. inboundStreamInitializer: nil
  803. )
  804. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  805. GRPCIdleHandler(
  806. connectionManager: manager,
  807. multiplexer: h2mux,
  808. idleTimeout: .minutes(5),
  809. keepalive: .init(),
  810. logger: self.logger
  811. ),
  812. ]).wait())
  813. channelPromise.succeed(channel)
  814. self.loop.run()
  815. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  816. XCTAssertNoThrow(try connect.wait())
  817. // Write a SETTINGS frame on the root stream.
  818. try self.waitForStateChange(from: .connecting, to: .ready) {
  819. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  820. XCTAssertNoThrow(try channel.writeInbound(frame))
  821. }
  822. // The channel should now be ready.
  823. XCTAssertNoThrow(try readyChannelMux.wait())
  824. // Now drop the connection.
  825. try self.waitForStateChange(from: .ready, to: .shutdown) {
  826. let shutdown = manager.shutdown()
  827. self.loop.run()
  828. XCTAssertNoThrow(try shutdown.wait())
  829. }
  830. }
  831. func testCloseWithoutActiveRPCs() throws {
  832. let channelPromise = self.loop.makePromise(of: Channel.self)
  833. let manager = self.makeConnectionManager { _, _ in
  834. return channelPromise.futureResult
  835. }
  836. // Start the connection.
  837. let readyChannelMux = self.waitForStateChange(
  838. from: .idle,
  839. to: .connecting
  840. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  841. let readyChannelMux = manager.getHTTP2Multiplexer()
  842. self.loop.run()
  843. return readyChannelMux
  844. }
  845. // Setup the actual channel and activate it.
  846. let channel = EmbeddedChannel(loop: self.loop)
  847. let h2mux = HTTP2StreamMultiplexer(
  848. mode: .client,
  849. channel: channel,
  850. inboundStreamInitializer: nil
  851. )
  852. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  853. GRPCIdleHandler(
  854. connectionManager: manager,
  855. multiplexer: h2mux,
  856. idleTimeout: .minutes(5),
  857. keepalive: .init(),
  858. logger: self.logger
  859. ),
  860. ]).wait())
  861. channelPromise.succeed(channel)
  862. self.loop.run()
  863. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  864. XCTAssertNoThrow(try connect.wait())
  865. // "ready" the connection.
  866. try self.waitForStateChange(from: .connecting, to: .ready) {
  867. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  868. XCTAssertNoThrow(try channel.writeInbound(frame))
  869. }
  870. // The HTTP/2 stream multiplexer should now be ready.
  871. XCTAssertNoThrow(try readyChannelMux.wait())
  872. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  873. // failure state.
  874. self.waitForStateChange(from: .ready, to: .idle) {
  875. channel.pipeline.fireChannelInactive()
  876. }
  877. }
  878. func testIdleErrorDoesNothing() throws {
  879. let manager = self.makeConnectionManager()
  880. // Dropping an error on this manager should be fine.
  881. manager.channelError(DoomedChannelError())
  882. // Shutting down is then safe.
  883. try self.waitForStateChange(from: .idle, to: .shutdown) {
  884. let shutdown = manager.shutdown()
  885. self.loop.run()
  886. XCTAssertNoThrow(try shutdown.wait())
  887. }
  888. }
  889. func testHTTP2Delegates() throws {
  890. let channel = EmbeddedChannel(loop: self.loop)
  891. defer {
  892. XCTAssertNoThrow(try channel.finish())
  893. }
  894. let multiplexer = HTTP2StreamMultiplexer(
  895. mode: .client,
  896. channel: channel,
  897. inboundStreamInitializer: nil
  898. )
  899. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  900. var streamsClosed = 0
  901. var maxConcurrentStreams = 0
  902. func streamClosed(_ connectionManager: ConnectionManager) {
  903. self.streamsClosed += 1
  904. }
  905. func receivedSettingsMaxConcurrentStreams(
  906. _ connectionManager: ConnectionManager,
  907. maxConcurrentStreams: Int
  908. ) {
  909. self.maxConcurrentStreams = maxConcurrentStreams
  910. }
  911. }
  912. let http2 = HTTP2Delegate()
  913. let manager = ConnectionManager(
  914. eventLoop: self.loop,
  915. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  916. let idleHandler = GRPCIdleHandler(
  917. connectionManager: manager,
  918. multiplexer: multiplexer,
  919. idleTimeout: .minutes(5),
  920. keepalive: ClientConnectionKeepalive(),
  921. logger: self.logger
  922. )
  923. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  924. // us to just fire stream created/closed events into the channel.
  925. do {
  926. try channel.pipeline.syncOperations.addHandler(idleHandler)
  927. } catch {
  928. return eventLoop.makeFailedFuture(error)
  929. }
  930. return eventLoop.makeSucceededFuture(channel)
  931. },
  932. callStartBehavior: .waitsForConnectivity,
  933. connectionBackoff: ConnectionBackoff(),
  934. connectivityDelegate: nil,
  935. http2Delegate: http2,
  936. logger: self.logger
  937. )
  938. // Start connecting.
  939. let futureMultiplexer = manager.getHTTP2Multiplexer()
  940. self.loop.run()
  941. // Do the actual connecting.
  942. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  943. // The channel isn't ready until it's seen a SETTINGS frame.
  944. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  945. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  946. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  947. }
  948. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  949. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  950. // max concurrent streams.
  951. XCTAssertNoThrow(try futureMultiplexer.wait())
  952. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  953. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  954. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  955. // Open some streams.
  956. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  957. let streamCreated = NIOHTTP2StreamCreatedEvent(
  958. streamID: streamID,
  959. localInitialWindowSize: nil,
  960. remoteInitialWindowSize: nil
  961. )
  962. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  963. }
  964. // ... and then close them.
  965. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  966. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  967. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  968. }
  969. XCTAssertEqual(http2.streamsClosed, 4)
  970. }
  971. func testChannelErrorWhenConnecting() throws {
  972. let channelPromise = self.loop.makePromise(of: Channel.self)
  973. let manager = self.makeConnectionManager { _, _ in
  974. return channelPromise.futureResult
  975. }
  976. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  977. from: .idle,
  978. to: .connecting
  979. ) {
  980. let channel = manager.getHTTP2Multiplexer()
  981. self.loop.run()
  982. return channel
  983. }
  984. self.waitForStateChange(from: .connecting, to: .shutdown) {
  985. manager.channelError(EventLoopError.shutdown)
  986. }
  987. XCTAssertThrowsError(try multiplexer.wait())
  988. }
  989. }
  990. internal struct Change: Hashable, CustomStringConvertible {
  991. var from: ConnectivityState
  992. var to: ConnectivityState
  993. var description: String {
  994. return "\(self.from) → \(self.to)"
  995. }
  996. }
  997. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  998. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  999. private let semaphore = DispatchSemaphore(value: 0)
  1000. private var expectation: Expectation = .noExpectation
  1001. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  1002. private enum Expectation {
  1003. /// We have no expectation of any changes. We'll just ignore any changes.
  1004. case noExpectation
  1005. /// We expect one change.
  1006. case one((Change) -> Void)
  1007. /// We expect 'count' changes.
  1008. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  1009. var count: Int {
  1010. switch self {
  1011. case .noExpectation:
  1012. return 0
  1013. case .one:
  1014. return 1
  1015. case let .some(count, _, _):
  1016. return count
  1017. }
  1018. }
  1019. }
  1020. func connectivityStateDidChange(from oldState: ConnectivityState,
  1021. to newState: ConnectivityState) {
  1022. self.serialQueue.async {
  1023. switch self.expectation {
  1024. case let .one(verify):
  1025. // We don't care about future changes.
  1026. self.expectation = .noExpectation
  1027. // Verify and notify.
  1028. verify(Change(from: oldState, to: newState))
  1029. self.semaphore.signal()
  1030. case .some(let count, var recorded, let verify):
  1031. recorded.append(Change(from: oldState, to: newState))
  1032. if recorded.count == count {
  1033. // We don't care about future changes.
  1034. self.expectation = .noExpectation
  1035. // Verify and notify.
  1036. verify(recorded)
  1037. self.semaphore.signal()
  1038. } else {
  1039. // Still need more responses.
  1040. self.expectation = .some(count: count, recorded: recorded, verify)
  1041. }
  1042. case .noExpectation:
  1043. // Ignore any changes.
  1044. ()
  1045. }
  1046. }
  1047. }
  1048. func connectionStartedQuiescing() {
  1049. self.serialQueue.async {
  1050. self.quiescingSemaphore.signal()
  1051. }
  1052. }
  1053. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1054. self.serialQueue.async {
  1055. self.expectation = .some(count: count, recorded: [], verify)
  1056. }
  1057. }
  1058. func expectChange(verify: @escaping (Change) -> Void) {
  1059. self.serialQueue.async {
  1060. self.expectation = .one(verify)
  1061. }
  1062. }
  1063. func waitForExpectedChanges(
  1064. timeout: DispatchTimeInterval,
  1065. file: StaticString = #file,
  1066. line: UInt = #line
  1067. ) {
  1068. let result = self.semaphore.wait(timeout: .now() + timeout)
  1069. switch result {
  1070. case .success:
  1071. ()
  1072. case .timedOut:
  1073. XCTFail(
  1074. "Timed out before verifying \(self.expectation.count) change(s)",
  1075. file: file, line: line
  1076. )
  1077. }
  1078. }
  1079. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1080. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1081. switch result {
  1082. case .success:
  1083. ()
  1084. case .timedOut:
  1085. XCTFail("Timed out waiting for connection to start quiescing")
  1086. }
  1087. }
  1088. }
  1089. private extension ConnectionBackoff {
  1090. static let oneSecondFixed = ConnectionBackoff(
  1091. initialBackoff: 1.0,
  1092. maximumBackoff: 1.0,
  1093. multiplier: 1.0,
  1094. jitter: 0.0
  1095. )
  1096. }
  1097. private struct DoomedChannelError: Error {}
  1098. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1099. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1100. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1101. self.provider = provider
  1102. }
  1103. func makeChannel(
  1104. managedBy connectionManager: ConnectionManager,
  1105. onEventLoop eventLoop: EventLoop,
  1106. connectTimeout: TimeAmount?,
  1107. logger: Logger
  1108. ) -> EventLoopFuture<Channel> {
  1109. return self.provider(connectionManager, eventLoop)
  1110. }
  1111. }
  1112. extension ConnectionManager {
  1113. // For backwards compatibility, to avoid large diffs in these tests.
  1114. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1115. return self.shutdown(mode: .forceful)
  1116. }
  1117. }