ConnectionManagerTests.swift 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIOCore
  20. import NIOEmbedded
  21. import NIOHTTP2
  22. import XCTest
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #filePath,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #filePath,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let channel = manager.getHTTP2Multiplexer()
  107. self.loop.run()
  108. return channel
  109. }
  110. self.waitForStateChange(from: .connecting, to: .shutdown) {
  111. channelPromise.fail(DoomedChannelError())
  112. }
  113. XCTAssertThrowsError(try multiplexer.wait()) {
  114. XCTAssertTrue($0 is DoomedChannelError)
  115. }
  116. }
  117. func testConnectAndDisconnect() throws {
  118. let channelPromise = self.loop.makePromise(of: Channel.self)
  119. let manager = self.makeConnectionManager { _, _ in
  120. return channelPromise.futureResult
  121. }
  122. // Start the connection.
  123. self.waitForStateChange(from: .idle, to: .connecting) {
  124. _ = manager.getHTTP2Multiplexer()
  125. self.loop.run()
  126. }
  127. // Setup the real channel and activate it.
  128. let channel = EmbeddedChannel(loop: self.loop)
  129. let h2mux = HTTP2StreamMultiplexer(
  130. mode: .client,
  131. channel: channel,
  132. inboundStreamInitializer: nil
  133. )
  134. try channel.pipeline.addHandler(
  135. GRPCIdleHandler(
  136. connectionManager: manager,
  137. multiplexer: h2mux,
  138. idleTimeout: .minutes(5),
  139. keepalive: .init(),
  140. logger: self.logger
  141. )
  142. ).wait()
  143. channelPromise.succeed(channel)
  144. XCTAssertNoThrow(
  145. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  146. .wait()
  147. )
  148. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  149. try self.waitForStateChange(from: .connecting, to: .ready) {
  150. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  151. XCTAssertNoThrow(try channel.writeInbound(frame))
  152. }
  153. // Close the channel.
  154. try self.waitForStateChange(from: .ready, to: .shutdown) {
  155. // Now the channel should be available: shut it down.
  156. let shutdown = manager.shutdown()
  157. self.loop.run()
  158. XCTAssertNoThrow(try shutdown.wait())
  159. }
  160. }
  161. func testConnectAndIdle() throws {
  162. let channelPromise = self.loop.makePromise(of: Channel.self)
  163. let manager = self.makeConnectionManager { _, _ in
  164. return channelPromise.futureResult
  165. }
  166. // Start the connection.
  167. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  168. .waitForStateChange(from: .idle, to: .connecting) {
  169. let readyChannelMux = manager.getHTTP2Multiplexer()
  170. self.loop.run()
  171. return readyChannelMux
  172. }
  173. // Setup the channel.
  174. let channel = EmbeddedChannel(loop: self.loop)
  175. let h2mux = HTTP2StreamMultiplexer(
  176. mode: .client,
  177. channel: channel,
  178. inboundStreamInitializer: nil
  179. )
  180. try channel.pipeline.addHandler(
  181. GRPCIdleHandler(
  182. connectionManager: manager,
  183. multiplexer: h2mux,
  184. idleTimeout: .minutes(5),
  185. keepalive: .init(),
  186. logger: self.logger
  187. )
  188. ).wait()
  189. channelPromise.succeed(channel)
  190. XCTAssertNoThrow(
  191. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  192. .wait()
  193. )
  194. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  195. try self.waitForStateChange(from: .connecting, to: .ready) {
  196. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  197. XCTAssertNoThrow(try channel.writeInbound(frame))
  198. // Wait for the multiplexer, it _must_ be ready now.
  199. XCTAssertNoThrow(try readyChannelMux.wait())
  200. }
  201. // Go idle. This will shutdown the channel.
  202. try self.waitForStateChange(from: .ready, to: .idle) {
  203. self.loop.advanceTime(by: .minutes(5))
  204. XCTAssertNoThrow(try channel.closeFuture.wait())
  205. }
  206. // Now shutdown.
  207. try self.waitForStateChange(from: .idle, to: .shutdown) {
  208. let shutdown = manager.shutdown()
  209. self.loop.run()
  210. XCTAssertNoThrow(try shutdown.wait())
  211. }
  212. }
  213. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  214. let channelPromise = self.loop.makePromise(of: Channel.self)
  215. let manager = self.makeConnectionManager { _, _ in
  216. return channelPromise.futureResult
  217. }
  218. // Start the connection.
  219. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  220. .waitForStateChange(from: .idle, to: .connecting) {
  221. let readyChannelMux = manager.getHTTP2Multiplexer()
  222. self.loop.run()
  223. return readyChannelMux
  224. }
  225. // Setup the channel.
  226. let channel = EmbeddedChannel(loop: self.loop)
  227. let h2mux = HTTP2StreamMultiplexer(
  228. mode: .client,
  229. channel: channel,
  230. inboundStreamInitializer: nil
  231. )
  232. try channel.pipeline.addHandler(
  233. GRPCIdleHandler(
  234. connectionManager: manager,
  235. multiplexer: h2mux,
  236. idleTimeout: .minutes(5),
  237. keepalive: .init(),
  238. logger: self.logger
  239. )
  240. ).wait()
  241. channelPromise.succeed(channel)
  242. XCTAssertNoThrow(
  243. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  244. .wait()
  245. )
  246. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  247. try self.waitForStateChange(from: .connecting, to: .ready) {
  248. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  249. XCTAssertNoThrow(try channel.writeInbound(frame))
  250. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  251. XCTAssertNoThrow(try readyChannelMux.wait())
  252. }
  253. // "create" a stream; the details don't matter here.
  254. let streamCreated = NIOHTTP2StreamCreatedEvent(
  255. streamID: 1,
  256. localInitialWindowSize: nil,
  257. remoteInitialWindowSize: nil
  258. )
  259. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  260. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  261. self.loop.advanceTime(by: .minutes(5))
  262. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  263. self.waitForStateChange(from: .ready, to: .idle) {
  264. // Close the stream.
  265. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  266. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  267. // ... wait for the idle timeout,
  268. self.loop.advanceTime(by: .minutes(5))
  269. }
  270. // Now shutdown.
  271. try self.waitForStateChange(from: .idle, to: .shutdown) {
  272. let shutdown = manager.shutdown()
  273. self.loop.run()
  274. XCTAssertNoThrow(try shutdown.wait())
  275. }
  276. }
  277. func testConnectAndThenBecomeInactive() throws {
  278. let channelPromise = self.loop.makePromise(of: Channel.self)
  279. let manager = self.makeConnectionManager { _, _ in
  280. return channelPromise.futureResult
  281. }
  282. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  283. .waitForStateChange(from: .idle, to: .connecting) {
  284. let readyChannelMux = manager.getHTTP2Multiplexer()
  285. self.loop.run()
  286. return readyChannelMux
  287. }
  288. // Setup the channel.
  289. let channel = EmbeddedChannel(loop: self.loop)
  290. let h2mux = HTTP2StreamMultiplexer(
  291. mode: .client,
  292. channel: channel,
  293. inboundStreamInitializer: nil
  294. )
  295. try channel.pipeline.addHandler(
  296. GRPCIdleHandler(
  297. connectionManager: manager,
  298. multiplexer: h2mux,
  299. idleTimeout: .minutes(5),
  300. keepalive: .init(),
  301. logger: self.logger
  302. )
  303. ).wait()
  304. channelPromise.succeed(channel)
  305. XCTAssertNoThrow(
  306. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  307. .wait()
  308. )
  309. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  310. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  311. let shutdown = manager.shutdown()
  312. self.loop.run()
  313. XCTAssertNoThrow(try shutdown.wait())
  314. }
  315. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  316. // the `readyChannelMux` should error.
  317. XCTAssertThrowsError(try readyChannelMux.wait())
  318. }
  319. func testConnectOnSecondAttempt() throws {
  320. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  321. let channelFutures: [EventLoopFuture<Channel>] = [
  322. self.loop.makeFailedFuture(DoomedChannelError()),
  323. channelPromise.futureResult,
  324. ]
  325. var channelFutureIterator = channelFutures.makeIterator()
  326. var configuration = self.defaultConfiguration
  327. configuration.connectionBackoff = .oneSecondFixed
  328. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  329. guard let next = channelFutureIterator.next() else {
  330. XCTFail("Too many channels requested")
  331. return self.loop.makeFailedFuture(DoomedChannelError())
  332. }
  333. return next
  334. }
  335. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  336. Change(from: .idle, to: .connecting),
  337. Change(from: .connecting, to: .transientFailure),
  338. ]) {
  339. // Get a HTTP/2 stream multiplexer.
  340. let readyChannelMux = manager.getHTTP2Multiplexer()
  341. self.loop.run()
  342. return readyChannelMux
  343. }
  344. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  345. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  346. self.loop.run()
  347. // Move time forwards by a second to start the next connection attempt.
  348. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  349. self.loop.advanceTime(by: .seconds(1))
  350. }
  351. // Setup the actual channel and complete the promise.
  352. let channel = EmbeddedChannel(loop: self.loop)
  353. let h2mux = HTTP2StreamMultiplexer(
  354. mode: .client,
  355. channel: channel,
  356. inboundStreamInitializer: nil
  357. )
  358. try channel.pipeline.addHandler(
  359. GRPCIdleHandler(
  360. connectionManager: manager,
  361. multiplexer: h2mux,
  362. idleTimeout: .minutes(5),
  363. keepalive: .init(),
  364. logger: self.logger
  365. )
  366. ).wait()
  367. channelPromise.succeed(channel)
  368. XCTAssertNoThrow(
  369. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  370. .wait()
  371. )
  372. // Write a SETTINGS frame on the root stream.
  373. try self.waitForStateChange(from: .connecting, to: .ready) {
  374. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  375. XCTAssertNoThrow(try channel.writeInbound(frame))
  376. }
  377. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  378. XCTAssertNoThrow(try readyChannelMux.wait())
  379. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  380. // Now shutdown.
  381. try self.waitForStateChange(from: .ready, to: .shutdown) {
  382. let shutdown = manager.shutdown()
  383. self.loop.run()
  384. XCTAssertNoThrow(try shutdown.wait())
  385. }
  386. }
  387. func testShutdownWhileConnecting() throws {
  388. let channelPromise = self.loop.makePromise(of: Channel.self)
  389. let manager = self.makeConnectionManager { _, _ in
  390. return channelPromise.futureResult
  391. }
  392. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  393. .waitForStateChange(from: .idle, to: .connecting) {
  394. let readyChannelMux = manager.getHTTP2Multiplexer()
  395. self.loop.run()
  396. return readyChannelMux
  397. }
  398. // Now shutdown.
  399. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  400. from: .connecting,
  401. to: .shutdown
  402. ) {
  403. let shutdown = manager.shutdown()
  404. self.loop.run()
  405. return shutdown
  406. }
  407. // The multiplexer we were requesting should fail.
  408. XCTAssertThrowsError(try readyChannelMux.wait())
  409. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  410. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  411. let channel = try channelPromise.futureResult.wait()
  412. self.loop.run()
  413. XCTAssertNoThrow(try channel.closeFuture.wait())
  414. XCTAssertNoThrow(try shutdownFuture.wait())
  415. }
  416. func testShutdownWhileTransientFailure() throws {
  417. var configuration = self.defaultConfiguration
  418. configuration.connectionBackoff = .oneSecondFixed
  419. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  420. self.loop.makeFailedFuture(DoomedChannelError())
  421. }
  422. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  423. Change(from: .idle, to: .connecting),
  424. Change(from: .connecting, to: .transientFailure),
  425. ]) {
  426. // Get a HTTP/2 stream multiplexer.
  427. let readyChannelMux = manager.getHTTP2Multiplexer()
  428. self.loop.run()
  429. return readyChannelMux
  430. }
  431. // Now shutdown.
  432. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  433. let shutdown = manager.shutdown()
  434. self.loop.run()
  435. XCTAssertNoThrow(try shutdown.wait())
  436. }
  437. // The HTTP/2 stream mux we were requesting should fail.
  438. XCTAssertThrowsError(try readyChannelMux.wait())
  439. }
  440. func testShutdownWhileActive() throws {
  441. let channelPromise = self.loop.makePromise(of: Channel.self)
  442. let manager = self.makeConnectionManager { _, _ in
  443. return channelPromise.futureResult
  444. }
  445. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  446. .waitForStateChange(from: .idle, to: .connecting) {
  447. let readyChannelMux = manager.getHTTP2Multiplexer()
  448. self.loop.run()
  449. return readyChannelMux
  450. }
  451. // Prepare the channel
  452. let channel = EmbeddedChannel(loop: self.loop)
  453. let h2mux = HTTP2StreamMultiplexer(
  454. mode: .client,
  455. channel: channel,
  456. inboundStreamInitializer: nil
  457. )
  458. try channel.pipeline.addHandler(
  459. GRPCIdleHandler(
  460. connectionManager: manager,
  461. multiplexer: h2mux,
  462. idleTimeout: .minutes(5),
  463. keepalive: .init(),
  464. logger: self.logger
  465. )
  466. ).wait()
  467. channelPromise.succeed(channel)
  468. XCTAssertNoThrow(
  469. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  470. .wait()
  471. )
  472. // (No state change expected here: active is an internal state.)
  473. // Now shutdown.
  474. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  475. let shutdown = manager.shutdown()
  476. self.loop.run()
  477. XCTAssertNoThrow(try shutdown.wait())
  478. }
  479. // The HTTP/2 stream multiplexer we were requesting should fail.
  480. XCTAssertThrowsError(try readyChannelMux.wait())
  481. }
  482. func testShutdownWhileShutdown() throws {
  483. let manager = self.makeConnectionManager()
  484. try self.waitForStateChange(from: .idle, to: .shutdown) {
  485. let firstShutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try firstShutdown.wait())
  488. }
  489. let secondShutdown = manager.shutdown()
  490. self.loop.run()
  491. XCTAssertNoThrow(try secondShutdown.wait())
  492. }
  493. func testTransientFailureWhileActive() throws {
  494. var configuration = self.defaultConfiguration
  495. configuration.connectionBackoff = .oneSecondFixed
  496. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  497. let channelFutures: [EventLoopFuture<Channel>] = [
  498. channelPromise.futureResult,
  499. self.loop.makeFailedFuture(DoomedChannelError()),
  500. ]
  501. var channelFutureIterator = channelFutures.makeIterator()
  502. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  503. guard let next = channelFutureIterator.next() else {
  504. XCTFail("Too many channels requested")
  505. return self.loop.makeFailedFuture(DoomedChannelError())
  506. }
  507. return next
  508. }
  509. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  510. .waitForStateChange(from: .idle, to: .connecting) {
  511. let readyChannelMux = manager.getHTTP2Multiplexer()
  512. self.loop.run()
  513. return readyChannelMux
  514. }
  515. // Prepare the channel
  516. let firstChannel = EmbeddedChannel(loop: self.loop)
  517. let h2mux = HTTP2StreamMultiplexer(
  518. mode: .client,
  519. channel: firstChannel,
  520. inboundStreamInitializer: nil
  521. )
  522. try firstChannel.pipeline.addHandler(
  523. GRPCIdleHandler(
  524. connectionManager: manager,
  525. multiplexer: h2mux,
  526. idleTimeout: .minutes(5),
  527. keepalive: .init(),
  528. logger: self.logger
  529. )
  530. ).wait()
  531. channelPromise.succeed(firstChannel)
  532. XCTAssertNoThrow(
  533. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  534. .wait()
  535. )
  536. // (No state change expected here: active is an internal state.)
  537. // Close the channel (simulate e.g. TLS handshake failed)
  538. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  539. XCTAssertNoThrow(try firstChannel.close().wait())
  540. }
  541. // Start connecting again.
  542. self.waitForStateChanges([
  543. Change(from: .transientFailure, to: .connecting),
  544. Change(from: .connecting, to: .transientFailure),
  545. ]) {
  546. self.loop.advanceTime(by: .seconds(1))
  547. }
  548. // Now shutdown
  549. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  550. let shutdown = manager.shutdown()
  551. self.loop.run()
  552. XCTAssertNoThrow(try shutdown.wait())
  553. }
  554. // The channel never came up: it should be throw.
  555. XCTAssertThrowsError(try readyChannelMux.wait())
  556. }
  557. func testTransientFailureWhileReady() throws {
  558. var configuration = self.defaultConfiguration
  559. configuration.connectionBackoff = .oneSecondFixed
  560. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  561. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  562. let channelFutures: [EventLoopFuture<Channel>] = [
  563. firstChannelPromise.futureResult,
  564. secondChannelPromise.futureResult,
  565. ]
  566. var channelFutureIterator = channelFutures.makeIterator()
  567. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  568. guard let next = channelFutureIterator.next() else {
  569. XCTFail("Too many channels requested")
  570. return self.loop.makeFailedFuture(DoomedChannelError())
  571. }
  572. return next
  573. }
  574. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  575. .waitForStateChange(from: .idle, to: .connecting) {
  576. let readyChannelMux = manager.getHTTP2Multiplexer()
  577. self.loop.run()
  578. return readyChannelMux
  579. }
  580. // Prepare the first channel
  581. let firstChannel = EmbeddedChannel(loop: self.loop)
  582. let firstH2mux = HTTP2StreamMultiplexer(
  583. mode: .client,
  584. channel: firstChannel,
  585. inboundStreamInitializer: nil
  586. )
  587. try firstChannel.pipeline.addHandler(
  588. GRPCIdleHandler(
  589. connectionManager: manager,
  590. multiplexer: firstH2mux,
  591. idleTimeout: .minutes(5),
  592. keepalive: .init(),
  593. logger: self.logger
  594. )
  595. ).wait()
  596. firstChannelPromise.succeed(firstChannel)
  597. XCTAssertNoThrow(
  598. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  599. .wait()
  600. )
  601. // Write a SETTINGS frame on the root stream.
  602. try self.waitForStateChange(from: .connecting, to: .ready) {
  603. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  604. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  605. }
  606. // Channel should now be ready.
  607. XCTAssertNoThrow(try readyChannelMux.wait())
  608. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  609. let streamCreated = NIOHTTP2StreamCreatedEvent(
  610. streamID: 1,
  611. localInitialWindowSize: nil,
  612. remoteInitialWindowSize: nil
  613. )
  614. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  615. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  616. XCTAssertNoThrow(try firstChannel.close().wait())
  617. }
  618. // Run to start connecting again.
  619. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  620. self.loop.advanceTime(by: .seconds(1))
  621. }
  622. // Prepare the second channel
  623. let secondChannel = EmbeddedChannel(loop: self.loop)
  624. let secondH2mux = HTTP2StreamMultiplexer(
  625. mode: .client,
  626. channel: secondChannel,
  627. inboundStreamInitializer: nil
  628. )
  629. try secondChannel.pipeline.addHandler(
  630. GRPCIdleHandler(
  631. connectionManager: manager,
  632. multiplexer: secondH2mux,
  633. idleTimeout: .minutes(5),
  634. keepalive: .init(),
  635. logger: self.logger
  636. )
  637. ).wait()
  638. secondChannelPromise.succeed(secondChannel)
  639. XCTAssertNoThrow(
  640. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  641. .wait()
  642. )
  643. // Write a SETTINGS frame on the root stream.
  644. try self.waitForStateChange(from: .connecting, to: .ready) {
  645. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  646. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  647. }
  648. // Now shutdown
  649. try self.waitForStateChange(from: .ready, to: .shutdown) {
  650. let shutdown = manager.shutdown()
  651. self.loop.run()
  652. XCTAssertNoThrow(try shutdown.wait())
  653. }
  654. }
  655. func testGoAwayWhenReady() throws {
  656. let channelPromise = self.loop.makePromise(of: Channel.self)
  657. let manager = self.makeConnectionManager { _, _ in
  658. return channelPromise.futureResult
  659. }
  660. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  661. .waitForStateChange(from: .idle, to: .connecting) {
  662. let readyChannelMux = manager.getHTTP2Multiplexer()
  663. self.loop.run()
  664. return readyChannelMux
  665. }
  666. // Setup the channel.
  667. let channel = EmbeddedChannel(loop: self.loop)
  668. let h2mux = HTTP2StreamMultiplexer(
  669. mode: .client,
  670. channel: channel,
  671. inboundStreamInitializer: nil
  672. )
  673. try channel.pipeline.addHandler(
  674. GRPCIdleHandler(
  675. connectionManager: manager,
  676. multiplexer: h2mux,
  677. idleTimeout: .minutes(5),
  678. keepalive: .init(),
  679. logger: self.logger
  680. )
  681. ).wait()
  682. channelPromise.succeed(channel)
  683. XCTAssertNoThrow(
  684. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  685. .wait()
  686. )
  687. try self.waitForStateChange(from: .connecting, to: .ready) {
  688. // Write a SETTINGS frame on the root stream.
  689. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  690. XCTAssertNoThrow(try channel.writeInbound(frame))
  691. }
  692. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  693. XCTAssertNoThrow(try readyChannelMux.wait())
  694. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  695. // channel to close.
  696. try self.waitForStateChange(from: .ready, to: .idle) {
  697. let goAway = HTTP2Frame(
  698. streamID: .rootStream,
  699. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  700. )
  701. XCTAssertNoThrow(try channel.writeInbound(goAway))
  702. self.loop.run()
  703. }
  704. self.loop.run()
  705. XCTAssertNoThrow(try channel.closeFuture.wait())
  706. // Now shutdown
  707. try self.waitForStateChange(from: .idle, to: .shutdown) {
  708. let shutdown = manager.shutdown()
  709. self.loop.run()
  710. XCTAssertNoThrow(try shutdown.wait())
  711. }
  712. }
  713. func testDoomedOptimisticChannelFromIdle() {
  714. var configuration = self.defaultConfiguration
  715. configuration.callStartBehavior = .fastFailure
  716. let manager = ConnectionManager(
  717. configuration: configuration,
  718. channelProvider: HookedChannelProvider { _, loop in
  719. return loop.makeFailedFuture(DoomedChannelError())
  720. },
  721. connectivityDelegate: nil,
  722. logger: self.logger
  723. )
  724. let candidate = manager.getHTTP2Multiplexer()
  725. self.loop.run()
  726. XCTAssertThrowsError(try candidate.wait())
  727. }
  728. func testDoomedOptimisticChannelFromConnecting() throws {
  729. var configuration = self.defaultConfiguration
  730. configuration.callStartBehavior = .fastFailure
  731. let promise = self.loop.makePromise(of: Channel.self)
  732. let manager = self.makeConnectionManager { _, _ in
  733. return promise.futureResult
  734. }
  735. self.waitForStateChange(from: .idle, to: .connecting) {
  736. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  737. _ = manager.getHTTP2Multiplexer()
  738. self.loop.run()
  739. }
  740. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  741. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  742. self.loop.run()
  743. // Fail the promise.
  744. promise.fail(DoomedChannelError())
  745. XCTAssertThrowsError(try optimisticChannelMux.wait())
  746. }
  747. func testOptimisticChannelFromTransientFailure() throws {
  748. var configuration = self.defaultConfiguration
  749. configuration.callStartBehavior = .fastFailure
  750. configuration.connectionBackoff = ConnectionBackoff()
  751. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  752. self.loop.makeFailedFuture(DoomedChannelError())
  753. }
  754. self.waitForStateChanges([
  755. Change(from: .idle, to: .connecting),
  756. Change(from: .connecting, to: .transientFailure),
  757. ]) {
  758. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  759. _ = manager.getHTTP2Multiplexer()
  760. self.loop.run()
  761. }
  762. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  763. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  764. self.loop.run()
  765. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  766. XCTAssertTrue(error is DoomedChannelError)
  767. }
  768. }
  769. func testOptimisticChannelFromShutdown() throws {
  770. var configuration = self.defaultConfiguration
  771. configuration.callStartBehavior = .fastFailure
  772. let manager = self.makeConnectionManager { _, _ in
  773. return self.loop.makeFailedFuture(DoomedChannelError())
  774. }
  775. let shutdown = manager.shutdown()
  776. self.loop.run()
  777. XCTAssertNoThrow(try shutdown.wait())
  778. // Get a channel optimistically. It'll fail, obviously.
  779. let channelMux = manager.getHTTP2Multiplexer()
  780. self.loop.run()
  781. XCTAssertThrowsError(try channelMux.wait())
  782. }
  783. func testForceIdleAfterInactive() throws {
  784. let channelPromise = self.loop.makePromise(of: Channel.self)
  785. let manager = self.makeConnectionManager { _, _ in
  786. return channelPromise.futureResult
  787. }
  788. // Start the connection.
  789. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  790. from: .idle,
  791. to: .connecting
  792. ) {
  793. let readyChannelMux = manager.getHTTP2Multiplexer()
  794. self.loop.run()
  795. return readyChannelMux
  796. }
  797. // Setup the real channel and activate it.
  798. let channel = EmbeddedChannel(loop: self.loop)
  799. let h2mux = HTTP2StreamMultiplexer(
  800. mode: .client,
  801. channel: channel,
  802. inboundStreamInitializer: nil
  803. )
  804. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  805. GRPCIdleHandler(
  806. connectionManager: manager,
  807. multiplexer: h2mux,
  808. idleTimeout: .minutes(5),
  809. keepalive: .init(),
  810. logger: self.logger
  811. ),
  812. ]).wait())
  813. channelPromise.succeed(channel)
  814. self.loop.run()
  815. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  816. XCTAssertNoThrow(try connect.wait())
  817. // Write a SETTINGS frame on the root stream.
  818. try self.waitForStateChange(from: .connecting, to: .ready) {
  819. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  820. XCTAssertNoThrow(try channel.writeInbound(frame))
  821. }
  822. // The channel should now be ready.
  823. XCTAssertNoThrow(try readyChannelMux.wait())
  824. // Now drop the connection.
  825. try self.waitForStateChange(from: .ready, to: .shutdown) {
  826. let shutdown = manager.shutdown()
  827. self.loop.run()
  828. XCTAssertNoThrow(try shutdown.wait())
  829. }
  830. }
  831. func testCloseWithoutActiveRPCs() throws {
  832. let channelPromise = self.loop.makePromise(of: Channel.self)
  833. let manager = self.makeConnectionManager { _, _ in
  834. return channelPromise.futureResult
  835. }
  836. // Start the connection.
  837. let readyChannelMux = self.waitForStateChange(
  838. from: .idle,
  839. to: .connecting
  840. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  841. let readyChannelMux = manager.getHTTP2Multiplexer()
  842. self.loop.run()
  843. return readyChannelMux
  844. }
  845. // Setup the actual channel and activate it.
  846. let channel = EmbeddedChannel(loop: self.loop)
  847. let h2mux = HTTP2StreamMultiplexer(
  848. mode: .client,
  849. channel: channel,
  850. inboundStreamInitializer: nil
  851. )
  852. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  853. GRPCIdleHandler(
  854. connectionManager: manager,
  855. multiplexer: h2mux,
  856. idleTimeout: .minutes(5),
  857. keepalive: .init(),
  858. logger: self.logger
  859. ),
  860. ]).wait())
  861. channelPromise.succeed(channel)
  862. self.loop.run()
  863. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  864. XCTAssertNoThrow(try connect.wait())
  865. // "ready" the connection.
  866. try self.waitForStateChange(from: .connecting, to: .ready) {
  867. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  868. XCTAssertNoThrow(try channel.writeInbound(frame))
  869. }
  870. // The HTTP/2 stream multiplexer should now be ready.
  871. XCTAssertNoThrow(try readyChannelMux.wait())
  872. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  873. // failure state.
  874. self.waitForStateChange(from: .ready, to: .idle) {
  875. channel.pipeline.fireChannelInactive()
  876. }
  877. }
  878. func testIdleErrorDoesNothing() throws {
  879. let manager = self.makeConnectionManager()
  880. // Dropping an error on this manager should be fine.
  881. manager.channelError(DoomedChannelError())
  882. // Shutting down is then safe.
  883. try self.waitForStateChange(from: .idle, to: .shutdown) {
  884. let shutdown = manager.shutdown()
  885. self.loop.run()
  886. XCTAssertNoThrow(try shutdown.wait())
  887. }
  888. }
  889. func testHTTP2Delegates() throws {
  890. let channel = EmbeddedChannel(loop: self.loop)
  891. defer {
  892. XCTAssertNoThrow(try channel.finish())
  893. }
  894. let multiplexer = HTTP2StreamMultiplexer(
  895. mode: .client,
  896. channel: channel,
  897. inboundStreamInitializer: nil
  898. )
  899. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  900. var streamsOpened = 0
  901. var streamsClosed = 0
  902. var maxConcurrentStreams = 0
  903. func streamOpened(_ connectionManager: ConnectionManager) {
  904. self.streamsOpened += 1
  905. }
  906. func streamClosed(_ connectionManager: ConnectionManager) {
  907. self.streamsClosed += 1
  908. }
  909. func receivedSettingsMaxConcurrentStreams(
  910. _ connectionManager: ConnectionManager,
  911. maxConcurrentStreams: Int
  912. ) {
  913. self.maxConcurrentStreams = maxConcurrentStreams
  914. }
  915. }
  916. let http2 = HTTP2Delegate()
  917. let manager = ConnectionManager(
  918. eventLoop: self.loop,
  919. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  920. let idleHandler = GRPCIdleHandler(
  921. connectionManager: manager,
  922. multiplexer: multiplexer,
  923. idleTimeout: .minutes(5),
  924. keepalive: ClientConnectionKeepalive(),
  925. logger: self.logger
  926. )
  927. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  928. // us to just fire stream created/closed events into the channel.
  929. do {
  930. try channel.pipeline.syncOperations.addHandler(idleHandler)
  931. } catch {
  932. return eventLoop.makeFailedFuture(error)
  933. }
  934. return eventLoop.makeSucceededFuture(channel)
  935. },
  936. callStartBehavior: .waitsForConnectivity,
  937. connectionBackoff: ConnectionBackoff(),
  938. connectivityDelegate: nil,
  939. http2Delegate: http2,
  940. logger: self.logger
  941. )
  942. // Start connecting.
  943. let futureMultiplexer = manager.getHTTP2Multiplexer()
  944. self.loop.run()
  945. // Do the actual connecting.
  946. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  947. // The channel isn't ready until it's seen a SETTINGS frame.
  948. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  949. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  950. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  951. }
  952. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  953. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  954. // max concurrent streams.
  955. XCTAssertNoThrow(try futureMultiplexer.wait())
  956. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  957. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  958. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  959. // Open some streams.
  960. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  961. let streamCreated = NIOHTTP2StreamCreatedEvent(
  962. streamID: streamID,
  963. localInitialWindowSize: nil,
  964. remoteInitialWindowSize: nil
  965. )
  966. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  967. }
  968. // ... and then close them.
  969. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  970. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  971. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  972. }
  973. XCTAssertEqual(http2.streamsOpened, 4)
  974. XCTAssertEqual(http2.streamsClosed, 4)
  975. }
  976. func testChannelErrorWhenConnecting() throws {
  977. let channelPromise = self.loop.makePromise(of: Channel.self)
  978. let manager = self.makeConnectionManager { _, _ in
  979. return channelPromise.futureResult
  980. }
  981. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  982. from: .idle,
  983. to: .connecting
  984. ) {
  985. let channel = manager.getHTTP2Multiplexer()
  986. self.loop.run()
  987. return channel
  988. }
  989. self.waitForStateChange(from: .connecting, to: .shutdown) {
  990. manager.channelError(EventLoopError.shutdown)
  991. }
  992. XCTAssertThrowsError(try multiplexer.wait())
  993. }
  994. }
  995. internal struct Change: Hashable, CustomStringConvertible {
  996. var from: ConnectivityState
  997. var to: ConnectivityState
  998. var description: String {
  999. return "\(self.from) → \(self.to)"
  1000. }
  1001. }
  1002. #if compiler(>=5.6)
  1003. // Unchecked as all mutable state is modified from a serial queue.
  1004. extension RecordingConnectivityDelegate: @unchecked Sendable {}
  1005. #endif // compiler(>=5.6)
  1006. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  1007. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  1008. private let semaphore = DispatchSemaphore(value: 0)
  1009. private var expectation: Expectation = .noExpectation
  1010. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  1011. private enum Expectation {
  1012. /// We have no expectation of any changes. We'll just ignore any changes.
  1013. case noExpectation
  1014. /// We expect one change.
  1015. case one((Change) -> Void)
  1016. /// We expect 'count' changes.
  1017. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  1018. var count: Int {
  1019. switch self {
  1020. case .noExpectation:
  1021. return 0
  1022. case .one:
  1023. return 1
  1024. case let .some(count, _, _):
  1025. return count
  1026. }
  1027. }
  1028. }
  1029. func connectivityStateDidChange(
  1030. from oldState: ConnectivityState,
  1031. to newState: ConnectivityState
  1032. ) {
  1033. self.serialQueue.async {
  1034. switch self.expectation {
  1035. case let .one(verify):
  1036. // We don't care about future changes.
  1037. self.expectation = .noExpectation
  1038. // Verify and notify.
  1039. verify(Change(from: oldState, to: newState))
  1040. self.semaphore.signal()
  1041. case .some(let count, var recorded, let verify):
  1042. recorded.append(Change(from: oldState, to: newState))
  1043. if recorded.count == count {
  1044. // We don't care about future changes.
  1045. self.expectation = .noExpectation
  1046. // Verify and notify.
  1047. verify(recorded)
  1048. self.semaphore.signal()
  1049. } else {
  1050. // Still need more responses.
  1051. self.expectation = .some(count: count, recorded: recorded, verify)
  1052. }
  1053. case .noExpectation:
  1054. // Ignore any changes.
  1055. ()
  1056. }
  1057. }
  1058. }
  1059. func connectionStartedQuiescing() {
  1060. self.serialQueue.async {
  1061. self.quiescingSemaphore.signal()
  1062. }
  1063. }
  1064. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1065. self.serialQueue.async {
  1066. self.expectation = .some(count: count, recorded: [], verify)
  1067. }
  1068. }
  1069. func expectChange(verify: @escaping (Change) -> Void) {
  1070. self.serialQueue.async {
  1071. self.expectation = .one(verify)
  1072. }
  1073. }
  1074. func waitForExpectedChanges(
  1075. timeout: DispatchTimeInterval,
  1076. file: StaticString = #filePath,
  1077. line: UInt = #line
  1078. ) {
  1079. let result = self.semaphore.wait(timeout: .now() + timeout)
  1080. switch result {
  1081. case .success:
  1082. ()
  1083. case .timedOut:
  1084. XCTFail(
  1085. "Timed out before verifying \(self.expectation.count) change(s)",
  1086. file: file, line: line
  1087. )
  1088. }
  1089. }
  1090. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1091. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1092. switch result {
  1093. case .success:
  1094. ()
  1095. case .timedOut:
  1096. XCTFail("Timed out waiting for connection to start quiescing")
  1097. }
  1098. }
  1099. }
  1100. extension ConnectionBackoff {
  1101. fileprivate static let oneSecondFixed = ConnectionBackoff(
  1102. initialBackoff: 1.0,
  1103. maximumBackoff: 1.0,
  1104. multiplier: 1.0,
  1105. jitter: 0.0
  1106. )
  1107. }
  1108. private struct DoomedChannelError: Error {}
  1109. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1110. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1111. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1112. self.provider = provider
  1113. }
  1114. func makeChannel(
  1115. managedBy connectionManager: ConnectionManager,
  1116. onEventLoop eventLoop: EventLoop,
  1117. connectTimeout: TimeAmount?,
  1118. logger: Logger
  1119. ) -> EventLoopFuture<Channel> {
  1120. return self.provider(connectionManager, eventLoop)
  1121. }
  1122. }
  1123. extension ConnectionManager {
  1124. // For backwards compatibility, to avoid large diffs in these tests.
  1125. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1126. return self.shutdown(mode: .forceful)
  1127. }
  1128. }