ConnectionManagerTests.swift 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Logging
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPC
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #filePath,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #filePath,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> =
  105. self
  106. .waitForStateChange(from: .idle, to: .connecting) {
  107. let channel = manager.getHTTP2Multiplexer()
  108. self.loop.run()
  109. return channel
  110. }
  111. self.waitForStateChange(from: .connecting, to: .shutdown) {
  112. channelPromise.fail(DoomedChannelError())
  113. }
  114. XCTAssertThrowsError(try multiplexer.wait()) {
  115. XCTAssertTrue($0 is DoomedChannelError)
  116. }
  117. }
  118. func testConnectAndDisconnect() throws {
  119. let channelPromise = self.loop.makePromise(of: Channel.self)
  120. let manager = self.makeConnectionManager { _, _ in
  121. return channelPromise.futureResult
  122. }
  123. // Start the connection.
  124. self.waitForStateChange(from: .idle, to: .connecting) {
  125. _ = manager.getHTTP2Multiplexer()
  126. self.loop.run()
  127. }
  128. // Setup the real channel and activate it.
  129. let channel = EmbeddedChannel(loop: self.loop)
  130. let h2mux = HTTP2StreamMultiplexer(
  131. mode: .client,
  132. channel: channel,
  133. inboundStreamInitializer: nil
  134. )
  135. try channel.pipeline.addHandler(
  136. GRPCIdleHandler(
  137. connectionManager: manager,
  138. multiplexer: h2mux,
  139. idleTimeout: .minutes(5),
  140. keepalive: .init(),
  141. logger: self.logger
  142. )
  143. ).wait()
  144. channelPromise.succeed(channel)
  145. XCTAssertNoThrow(
  146. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  147. .wait()
  148. )
  149. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  150. try self.waitForStateChange(from: .connecting, to: .ready) {
  151. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  152. XCTAssertNoThrow(try channel.writeInbound(frame))
  153. }
  154. // Close the channel.
  155. try self.waitForStateChange(from: .ready, to: .shutdown) {
  156. // Now the channel should be available: shut it down.
  157. let shutdown = manager.shutdown()
  158. self.loop.run()
  159. XCTAssertNoThrow(try shutdown.wait())
  160. }
  161. }
  162. func testConnectAndIdle() throws {
  163. let channelPromise = self.loop.makePromise(of: Channel.self)
  164. let manager = self.makeConnectionManager { _, _ in
  165. return channelPromise.futureResult
  166. }
  167. // Start the connection.
  168. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  169. self
  170. .waitForStateChange(from: .idle, to: .connecting) {
  171. let readyChannelMux = manager.getHTTP2Multiplexer()
  172. self.loop.run()
  173. return readyChannelMux
  174. }
  175. // Setup the channel.
  176. let channel = EmbeddedChannel(loop: self.loop)
  177. let h2mux = HTTP2StreamMultiplexer(
  178. mode: .client,
  179. channel: channel,
  180. inboundStreamInitializer: nil
  181. )
  182. try channel.pipeline.addHandler(
  183. GRPCIdleHandler(
  184. connectionManager: manager,
  185. multiplexer: h2mux,
  186. idleTimeout: .minutes(5),
  187. keepalive: .init(),
  188. logger: self.logger
  189. )
  190. ).wait()
  191. channelPromise.succeed(channel)
  192. XCTAssertNoThrow(
  193. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  194. .wait()
  195. )
  196. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  197. try self.waitForStateChange(from: .connecting, to: .ready) {
  198. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  199. XCTAssertNoThrow(try channel.writeInbound(frame))
  200. // Wait for the multiplexer, it _must_ be ready now.
  201. XCTAssertNoThrow(try readyChannelMux.wait())
  202. }
  203. // Go idle. This will shutdown the channel.
  204. try self.waitForStateChange(from: .ready, to: .idle) {
  205. self.loop.advanceTime(by: .minutes(5))
  206. XCTAssertNoThrow(try channel.closeFuture.wait())
  207. }
  208. // Now shutdown.
  209. try self.waitForStateChange(from: .idle, to: .shutdown) {
  210. let shutdown = manager.shutdown()
  211. self.loop.run()
  212. XCTAssertNoThrow(try shutdown.wait())
  213. }
  214. }
  215. func testChannelInactiveBeforeActiveWithNoReconnect() throws {
  216. let channel = EmbeddedChannel(loop: self.loop)
  217. let channelPromise = self.loop.makePromise(of: Channel.self)
  218. let manager = self.makeConnectionManager { _, _ in
  219. return channelPromise.futureResult
  220. }
  221. // Start the connection.
  222. self.waitForStateChange(from: .idle, to: .connecting) {
  223. // Triggers the connect.
  224. _ = manager.getHTTP2Multiplexer()
  225. self.loop.run()
  226. }
  227. try channel.pipeline.syncOperations.addHandler(
  228. GRPCIdleHandler(
  229. connectionManager: manager,
  230. multiplexer: HTTP2StreamMultiplexer(
  231. mode: .client,
  232. channel: channel,
  233. inboundStreamInitializer: nil
  234. ),
  235. idleTimeout: .minutes(5),
  236. keepalive: .init(),
  237. logger: self.logger
  238. )
  239. )
  240. channelPromise.succeed(channel)
  241. // Oops: wrong way around. We should tolerate this.
  242. self.waitForStateChange(from: .connecting, to: .shutdown) {
  243. channel.pipeline.fireChannelInactive()
  244. }
  245. // Should be ignored.
  246. channel.pipeline.fireChannelActive()
  247. }
  248. func testChannelInactiveBeforeActiveWillReconnect() throws {
  249. var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
  250. var channelPromises: [EventLoopPromise<Channel>] = [
  251. self.loop.makePromise(),
  252. self.loop.makePromise(),
  253. ]
  254. var channelFutures = Array(channelPromises.map { $0.futureResult })
  255. var configuration = self.defaultConfiguration
  256. configuration.connectionBackoff = .oneSecondFixed
  257. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  258. return channelFutures.removeLast()
  259. }
  260. // Start the connection.
  261. self.waitForStateChange(from: .idle, to: .connecting) {
  262. // Triggers the connect.
  263. _ = manager.getHTTP2Multiplexer()
  264. self.loop.run()
  265. }
  266. // Setup the channel.
  267. let channel1 = channels.removeLast()
  268. let channel1Promise = channelPromises.removeLast()
  269. try channel1.pipeline.syncOperations.addHandler(
  270. GRPCIdleHandler(
  271. connectionManager: manager,
  272. multiplexer: HTTP2StreamMultiplexer(
  273. mode: .client,
  274. channel: channel1,
  275. inboundStreamInitializer: nil
  276. ),
  277. idleTimeout: .minutes(5),
  278. keepalive: .init(),
  279. logger: self.logger
  280. )
  281. )
  282. channel1Promise.succeed(channel1)
  283. // Oops: wrong way around. We should tolerate this.
  284. self.waitForStateChange(from: .connecting, to: .transientFailure) {
  285. channel1.pipeline.fireChannelInactive()
  286. }
  287. channel1.pipeline.fireChannelActive()
  288. // Start the next attempt.
  289. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  290. self.loop.advanceTime(by: .seconds(1))
  291. }
  292. let channel2 = channels.removeLast()
  293. let channel2Promise = channelPromises.removeLast()
  294. try channel2.pipeline.syncOperations.addHandler(
  295. GRPCIdleHandler(
  296. connectionManager: manager,
  297. multiplexer: HTTP2StreamMultiplexer(
  298. mode: .client,
  299. channel: channel1,
  300. inboundStreamInitializer: nil
  301. ),
  302. idleTimeout: .minutes(5),
  303. keepalive: .init(),
  304. logger: self.logger
  305. )
  306. )
  307. channel2Promise.succeed(channel2)
  308. try self.waitForStateChange(from: .connecting, to: .ready) {
  309. channel2.pipeline.fireChannelActive()
  310. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  311. XCTAssertNoThrow(try channel2.writeInbound(frame))
  312. }
  313. }
  314. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  315. let channelPromise = self.loop.makePromise(of: Channel.self)
  316. let manager = self.makeConnectionManager { _, _ in
  317. return channelPromise.futureResult
  318. }
  319. // Start the connection.
  320. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  321. self
  322. .waitForStateChange(from: .idle, to: .connecting) {
  323. let readyChannelMux = manager.getHTTP2Multiplexer()
  324. self.loop.run()
  325. return readyChannelMux
  326. }
  327. // Setup the channel.
  328. let channel = EmbeddedChannel(loop: self.loop)
  329. let h2mux = HTTP2StreamMultiplexer(
  330. mode: .client,
  331. channel: channel,
  332. inboundStreamInitializer: nil
  333. )
  334. try channel.pipeline.addHandler(
  335. GRPCIdleHandler(
  336. connectionManager: manager,
  337. multiplexer: h2mux,
  338. idleTimeout: .minutes(5),
  339. keepalive: .init(),
  340. logger: self.logger
  341. )
  342. ).wait()
  343. channelPromise.succeed(channel)
  344. XCTAssertNoThrow(
  345. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  346. .wait()
  347. )
  348. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  349. try self.waitForStateChange(from: .connecting, to: .ready) {
  350. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  351. XCTAssertNoThrow(try channel.writeInbound(frame))
  352. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  353. XCTAssertNoThrow(try readyChannelMux.wait())
  354. }
  355. // "create" a stream; the details don't matter here.
  356. let streamCreated = NIOHTTP2StreamCreatedEvent(
  357. streamID: 1,
  358. localInitialWindowSize: nil,
  359. remoteInitialWindowSize: nil
  360. )
  361. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  362. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  363. self.loop.advanceTime(by: .minutes(5))
  364. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  365. self.waitForStateChange(from: .ready, to: .idle) {
  366. // Close the stream.
  367. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  368. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  369. // ... wait for the idle timeout,
  370. self.loop.advanceTime(by: .minutes(5))
  371. }
  372. // Now shutdown.
  373. try self.waitForStateChange(from: .idle, to: .shutdown) {
  374. let shutdown = manager.shutdown()
  375. self.loop.run()
  376. XCTAssertNoThrow(try shutdown.wait())
  377. }
  378. }
  379. func testConnectAndThenBecomeInactive() throws {
  380. let channelPromise = self.loop.makePromise(of: Channel.self)
  381. let manager = self.makeConnectionManager { _, _ in
  382. return channelPromise.futureResult
  383. }
  384. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  385. self
  386. .waitForStateChange(from: .idle, to: .connecting) {
  387. let readyChannelMux = manager.getHTTP2Multiplexer()
  388. self.loop.run()
  389. return readyChannelMux
  390. }
  391. // Setup the channel.
  392. let channel = EmbeddedChannel(loop: self.loop)
  393. let h2mux = HTTP2StreamMultiplexer(
  394. mode: .client,
  395. channel: channel,
  396. inboundStreamInitializer: nil
  397. )
  398. try channel.pipeline.addHandler(
  399. GRPCIdleHandler(
  400. connectionManager: manager,
  401. multiplexer: h2mux,
  402. idleTimeout: .minutes(5),
  403. keepalive: .init(),
  404. logger: self.logger
  405. )
  406. ).wait()
  407. channelPromise.succeed(channel)
  408. XCTAssertNoThrow(
  409. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  410. .wait()
  411. )
  412. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  413. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  414. let shutdown = manager.shutdown()
  415. self.loop.run()
  416. XCTAssertNoThrow(try shutdown.wait())
  417. }
  418. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  419. // the `readyChannelMux` should error.
  420. XCTAssertThrowsError(try readyChannelMux.wait())
  421. }
  422. func testConnectOnSecondAttempt() throws {
  423. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  424. let channelFutures: [EventLoopFuture<Channel>] = [
  425. self.loop.makeFailedFuture(DoomedChannelError()),
  426. channelPromise.futureResult,
  427. ]
  428. var channelFutureIterator = channelFutures.makeIterator()
  429. var configuration = self.defaultConfiguration
  430. configuration.connectionBackoff = .oneSecondFixed
  431. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  432. guard let next = channelFutureIterator.next() else {
  433. XCTFail("Too many channels requested")
  434. return self.loop.makeFailedFuture(DoomedChannelError())
  435. }
  436. return next
  437. }
  438. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  439. Change(from: .idle, to: .connecting),
  440. Change(from: .connecting, to: .transientFailure),
  441. ]) {
  442. // Get a HTTP/2 stream multiplexer.
  443. let readyChannelMux = manager.getHTTP2Multiplexer()
  444. self.loop.run()
  445. return readyChannelMux
  446. }
  447. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  448. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  449. self.loop.run()
  450. // Move time forwards by a second to start the next connection attempt.
  451. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  452. self.loop.advanceTime(by: .seconds(1))
  453. }
  454. // Setup the actual channel and complete the promise.
  455. let channel = EmbeddedChannel(loop: self.loop)
  456. let h2mux = HTTP2StreamMultiplexer(
  457. mode: .client,
  458. channel: channel,
  459. inboundStreamInitializer: nil
  460. )
  461. try channel.pipeline.addHandler(
  462. GRPCIdleHandler(
  463. connectionManager: manager,
  464. multiplexer: h2mux,
  465. idleTimeout: .minutes(5),
  466. keepalive: .init(),
  467. logger: self.logger
  468. )
  469. ).wait()
  470. channelPromise.succeed(channel)
  471. XCTAssertNoThrow(
  472. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  473. .wait()
  474. )
  475. // Write a SETTINGS frame on the root stream.
  476. try self.waitForStateChange(from: .connecting, to: .ready) {
  477. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  478. XCTAssertNoThrow(try channel.writeInbound(frame))
  479. }
  480. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  481. XCTAssertNoThrow(try readyChannelMux.wait())
  482. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  483. // Now shutdown.
  484. try self.waitForStateChange(from: .ready, to: .shutdown) {
  485. let shutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try shutdown.wait())
  488. }
  489. }
  490. func testShutdownWhileConnecting() throws {
  491. let channelPromise = self.loop.makePromise(of: Channel.self)
  492. let manager = self.makeConnectionManager { _, _ in
  493. return channelPromise.futureResult
  494. }
  495. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  496. self
  497. .waitForStateChange(from: .idle, to: .connecting) {
  498. let readyChannelMux = manager.getHTTP2Multiplexer()
  499. self.loop.run()
  500. return readyChannelMux
  501. }
  502. // Now shutdown.
  503. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  504. from: .connecting,
  505. to: .shutdown
  506. ) {
  507. let shutdown = manager.shutdown()
  508. self.loop.run()
  509. return shutdown
  510. }
  511. // The multiplexer we were requesting should fail.
  512. XCTAssertThrowsError(try readyChannelMux.wait())
  513. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  514. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  515. let channel = try channelPromise.futureResult.wait()
  516. self.loop.run()
  517. XCTAssertNoThrow(try channel.closeFuture.wait())
  518. XCTAssertNoThrow(try shutdownFuture.wait())
  519. }
  520. func testShutdownWhileTransientFailure() throws {
  521. var configuration = self.defaultConfiguration
  522. configuration.connectionBackoff = .oneSecondFixed
  523. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  524. self.loop.makeFailedFuture(DoomedChannelError())
  525. }
  526. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  527. Change(from: .idle, to: .connecting),
  528. Change(from: .connecting, to: .transientFailure),
  529. ]) {
  530. // Get a HTTP/2 stream multiplexer.
  531. let readyChannelMux = manager.getHTTP2Multiplexer()
  532. self.loop.run()
  533. return readyChannelMux
  534. }
  535. // Now shutdown.
  536. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  537. let shutdown = manager.shutdown()
  538. self.loop.run()
  539. XCTAssertNoThrow(try shutdown.wait())
  540. }
  541. // The HTTP/2 stream mux we were requesting should fail.
  542. XCTAssertThrowsError(try readyChannelMux.wait())
  543. }
  544. func testShutdownWhileActive() throws {
  545. let channelPromise = self.loop.makePromise(of: Channel.self)
  546. let manager = self.makeConnectionManager { _, _ in
  547. return channelPromise.futureResult
  548. }
  549. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  550. self
  551. .waitForStateChange(from: .idle, to: .connecting) {
  552. let readyChannelMux = manager.getHTTP2Multiplexer()
  553. self.loop.run()
  554. return readyChannelMux
  555. }
  556. // Prepare the channel
  557. let channel = EmbeddedChannel(loop: self.loop)
  558. let h2mux = HTTP2StreamMultiplexer(
  559. mode: .client,
  560. channel: channel,
  561. inboundStreamInitializer: nil
  562. )
  563. try channel.pipeline.addHandler(
  564. GRPCIdleHandler(
  565. connectionManager: manager,
  566. multiplexer: h2mux,
  567. idleTimeout: .minutes(5),
  568. keepalive: .init(),
  569. logger: self.logger
  570. )
  571. ).wait()
  572. channelPromise.succeed(channel)
  573. XCTAssertNoThrow(
  574. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  575. .wait()
  576. )
  577. // (No state change expected here: active is an internal state.)
  578. // Now shutdown.
  579. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  580. let shutdown = manager.shutdown()
  581. self.loop.run()
  582. XCTAssertNoThrow(try shutdown.wait())
  583. }
  584. // The HTTP/2 stream multiplexer we were requesting should fail.
  585. XCTAssertThrowsError(try readyChannelMux.wait())
  586. }
  587. func testShutdownWhileShutdown() throws {
  588. let manager = self.makeConnectionManager()
  589. try self.waitForStateChange(from: .idle, to: .shutdown) {
  590. let firstShutdown = manager.shutdown()
  591. self.loop.run()
  592. XCTAssertNoThrow(try firstShutdown.wait())
  593. }
  594. let secondShutdown = manager.shutdown()
  595. self.loop.run()
  596. XCTAssertNoThrow(try secondShutdown.wait())
  597. }
  598. func testTransientFailureWhileActive() throws {
  599. var configuration = self.defaultConfiguration
  600. configuration.connectionBackoff = .oneSecondFixed
  601. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  602. let channelFutures: [EventLoopFuture<Channel>] = [
  603. channelPromise.futureResult,
  604. self.loop.makeFailedFuture(DoomedChannelError()),
  605. ]
  606. var channelFutureIterator = channelFutures.makeIterator()
  607. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  608. guard let next = channelFutureIterator.next() else {
  609. XCTFail("Too many channels requested")
  610. return self.loop.makeFailedFuture(DoomedChannelError())
  611. }
  612. return next
  613. }
  614. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  615. self
  616. .waitForStateChange(from: .idle, to: .connecting) {
  617. let readyChannelMux = manager.getHTTP2Multiplexer()
  618. self.loop.run()
  619. return readyChannelMux
  620. }
  621. // Prepare the channel
  622. let firstChannel = EmbeddedChannel(loop: self.loop)
  623. let h2mux = HTTP2StreamMultiplexer(
  624. mode: .client,
  625. channel: firstChannel,
  626. inboundStreamInitializer: nil
  627. )
  628. try firstChannel.pipeline.addHandler(
  629. GRPCIdleHandler(
  630. connectionManager: manager,
  631. multiplexer: h2mux,
  632. idleTimeout: .minutes(5),
  633. keepalive: .init(),
  634. logger: self.logger
  635. )
  636. ).wait()
  637. channelPromise.succeed(firstChannel)
  638. XCTAssertNoThrow(
  639. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  640. .wait()
  641. )
  642. // (No state change expected here: active is an internal state.)
  643. // Close the channel (simulate e.g. TLS handshake failed)
  644. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  645. XCTAssertNoThrow(try firstChannel.close().wait())
  646. }
  647. // Start connecting again.
  648. self.waitForStateChanges([
  649. Change(from: .transientFailure, to: .connecting),
  650. Change(from: .connecting, to: .transientFailure),
  651. ]) {
  652. self.loop.advanceTime(by: .seconds(1))
  653. }
  654. // Now shutdown
  655. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  656. let shutdown = manager.shutdown()
  657. self.loop.run()
  658. XCTAssertNoThrow(try shutdown.wait())
  659. }
  660. // The channel never came up: it should be throw.
  661. XCTAssertThrowsError(try readyChannelMux.wait())
  662. }
  663. func testTransientFailureWhileReady() throws {
  664. var configuration = self.defaultConfiguration
  665. configuration.connectionBackoff = .oneSecondFixed
  666. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  667. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  668. let channelFutures: [EventLoopFuture<Channel>] = [
  669. firstChannelPromise.futureResult,
  670. secondChannelPromise.futureResult,
  671. ]
  672. var channelFutureIterator = channelFutures.makeIterator()
  673. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  674. guard let next = channelFutureIterator.next() else {
  675. XCTFail("Too many channels requested")
  676. return self.loop.makeFailedFuture(DoomedChannelError())
  677. }
  678. return next
  679. }
  680. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  681. self
  682. .waitForStateChange(from: .idle, to: .connecting) {
  683. let readyChannelMux = manager.getHTTP2Multiplexer()
  684. self.loop.run()
  685. return readyChannelMux
  686. }
  687. // Prepare the first channel
  688. let firstChannel = EmbeddedChannel(loop: self.loop)
  689. let firstH2mux = HTTP2StreamMultiplexer(
  690. mode: .client,
  691. channel: firstChannel,
  692. inboundStreamInitializer: nil
  693. )
  694. try firstChannel.pipeline.addHandler(
  695. GRPCIdleHandler(
  696. connectionManager: manager,
  697. multiplexer: firstH2mux,
  698. idleTimeout: .minutes(5),
  699. keepalive: .init(),
  700. logger: self.logger
  701. )
  702. ).wait()
  703. firstChannelPromise.succeed(firstChannel)
  704. XCTAssertNoThrow(
  705. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  706. .wait()
  707. )
  708. // Write a SETTINGS frame on the root stream.
  709. try self.waitForStateChange(from: .connecting, to: .ready) {
  710. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  711. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  712. }
  713. // Channel should now be ready.
  714. XCTAssertNoThrow(try readyChannelMux.wait())
  715. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  716. let streamCreated = NIOHTTP2StreamCreatedEvent(
  717. streamID: 1,
  718. localInitialWindowSize: nil,
  719. remoteInitialWindowSize: nil
  720. )
  721. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  722. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  723. XCTAssertNoThrow(try firstChannel.close().wait())
  724. }
  725. // Run to start connecting again.
  726. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  727. self.loop.advanceTime(by: .seconds(1))
  728. }
  729. // Prepare the second channel
  730. let secondChannel = EmbeddedChannel(loop: self.loop)
  731. let secondH2mux = HTTP2StreamMultiplexer(
  732. mode: .client,
  733. channel: secondChannel,
  734. inboundStreamInitializer: nil
  735. )
  736. try secondChannel.pipeline.addHandler(
  737. GRPCIdleHandler(
  738. connectionManager: manager,
  739. multiplexer: secondH2mux,
  740. idleTimeout: .minutes(5),
  741. keepalive: .init(),
  742. logger: self.logger
  743. )
  744. ).wait()
  745. secondChannelPromise.succeed(secondChannel)
  746. XCTAssertNoThrow(
  747. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  748. .wait()
  749. )
  750. // Write a SETTINGS frame on the root stream.
  751. try self.waitForStateChange(from: .connecting, to: .ready) {
  752. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  753. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  754. }
  755. // Now shutdown
  756. try self.waitForStateChange(from: .ready, to: .shutdown) {
  757. let shutdown = manager.shutdown()
  758. self.loop.run()
  759. XCTAssertNoThrow(try shutdown.wait())
  760. }
  761. }
  762. func testGoAwayWhenReady() throws {
  763. let channelPromise = self.loop.makePromise(of: Channel.self)
  764. let manager = self.makeConnectionManager { _, _ in
  765. return channelPromise.futureResult
  766. }
  767. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  768. self
  769. .waitForStateChange(from: .idle, to: .connecting) {
  770. let readyChannelMux = manager.getHTTP2Multiplexer()
  771. self.loop.run()
  772. return readyChannelMux
  773. }
  774. // Setup the channel.
  775. let channel = EmbeddedChannel(loop: self.loop)
  776. let h2mux = HTTP2StreamMultiplexer(
  777. mode: .client,
  778. channel: channel,
  779. inboundStreamInitializer: nil
  780. )
  781. try channel.pipeline.addHandler(
  782. GRPCIdleHandler(
  783. connectionManager: manager,
  784. multiplexer: h2mux,
  785. idleTimeout: .minutes(5),
  786. keepalive: .init(),
  787. logger: self.logger
  788. )
  789. ).wait()
  790. channelPromise.succeed(channel)
  791. XCTAssertNoThrow(
  792. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  793. .wait()
  794. )
  795. try self.waitForStateChange(from: .connecting, to: .ready) {
  796. // Write a SETTINGS frame on the root stream.
  797. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  798. XCTAssertNoThrow(try channel.writeInbound(frame))
  799. }
  800. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  801. XCTAssertNoThrow(try readyChannelMux.wait())
  802. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  803. // channel to close.
  804. try self.waitForStateChange(from: .ready, to: .idle) {
  805. let goAway = HTTP2Frame(
  806. streamID: .rootStream,
  807. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  808. )
  809. XCTAssertNoThrow(try channel.writeInbound(goAway))
  810. self.loop.run()
  811. }
  812. self.loop.run()
  813. XCTAssertNoThrow(try channel.closeFuture.wait())
  814. // Now shutdown
  815. try self.waitForStateChange(from: .idle, to: .shutdown) {
  816. let shutdown = manager.shutdown()
  817. self.loop.run()
  818. XCTAssertNoThrow(try shutdown.wait())
  819. }
  820. }
  821. func testDoomedOptimisticChannelFromIdle() {
  822. var configuration = self.defaultConfiguration
  823. configuration.callStartBehavior = .fastFailure
  824. let manager = ConnectionManager(
  825. configuration: configuration,
  826. channelProvider: HookedChannelProvider { _, loop in
  827. return loop.makeFailedFuture(DoomedChannelError())
  828. },
  829. connectivityDelegate: nil,
  830. logger: self.logger
  831. )
  832. let candidate = manager.getHTTP2Multiplexer()
  833. self.loop.run()
  834. XCTAssertThrowsError(try candidate.wait())
  835. }
  836. func testDoomedOptimisticChannelFromConnecting() throws {
  837. var configuration = self.defaultConfiguration
  838. configuration.callStartBehavior = .fastFailure
  839. let promise = self.loop.makePromise(of: Channel.self)
  840. let manager = self.makeConnectionManager { _, _ in
  841. return promise.futureResult
  842. }
  843. self.waitForStateChange(from: .idle, to: .connecting) {
  844. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  845. _ = manager.getHTTP2Multiplexer()
  846. self.loop.run()
  847. }
  848. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  849. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  850. self.loop.run()
  851. // Fail the promise.
  852. promise.fail(DoomedChannelError())
  853. XCTAssertThrowsError(try optimisticChannelMux.wait())
  854. }
  855. func testOptimisticChannelFromTransientFailure() throws {
  856. var configuration = self.defaultConfiguration
  857. configuration.callStartBehavior = .fastFailure
  858. configuration.connectionBackoff = ConnectionBackoff()
  859. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  860. self.loop.makeFailedFuture(DoomedChannelError())
  861. }
  862. self.waitForStateChanges([
  863. Change(from: .idle, to: .connecting),
  864. Change(from: .connecting, to: .transientFailure),
  865. ]) {
  866. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  867. _ = manager.getHTTP2Multiplexer()
  868. self.loop.run()
  869. }
  870. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  871. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  872. self.loop.run()
  873. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  874. XCTAssertTrue(error is DoomedChannelError)
  875. }
  876. }
  877. func testOptimisticChannelFromShutdown() throws {
  878. var configuration = self.defaultConfiguration
  879. configuration.callStartBehavior = .fastFailure
  880. let manager = self.makeConnectionManager { _, _ in
  881. return self.loop.makeFailedFuture(DoomedChannelError())
  882. }
  883. let shutdown = manager.shutdown()
  884. self.loop.run()
  885. XCTAssertNoThrow(try shutdown.wait())
  886. // Get a channel optimistically. It'll fail, obviously.
  887. let channelMux = manager.getHTTP2Multiplexer()
  888. self.loop.run()
  889. XCTAssertThrowsError(try channelMux.wait())
  890. }
  891. func testForceIdleAfterInactive() throws {
  892. let channelPromise = self.loop.makePromise(of: Channel.self)
  893. let manager = self.makeConnectionManager { _, _ in
  894. return channelPromise.futureResult
  895. }
  896. // Start the connection.
  897. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  898. from: .idle,
  899. to: .connecting
  900. ) {
  901. let readyChannelMux = manager.getHTTP2Multiplexer()
  902. self.loop.run()
  903. return readyChannelMux
  904. }
  905. // Setup the real channel and activate it.
  906. let channel = EmbeddedChannel(loop: self.loop)
  907. let h2mux = HTTP2StreamMultiplexer(
  908. mode: .client,
  909. channel: channel,
  910. inboundStreamInitializer: nil
  911. )
  912. XCTAssertNoThrow(
  913. try channel.pipeline.addHandlers([
  914. GRPCIdleHandler(
  915. connectionManager: manager,
  916. multiplexer: h2mux,
  917. idleTimeout: .minutes(5),
  918. keepalive: .init(),
  919. logger: self.logger
  920. )
  921. ]).wait()
  922. )
  923. channelPromise.succeed(channel)
  924. self.loop.run()
  925. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  926. XCTAssertNoThrow(try connect.wait())
  927. // Write a SETTINGS frame on the root stream.
  928. try self.waitForStateChange(from: .connecting, to: .ready) {
  929. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  930. XCTAssertNoThrow(try channel.writeInbound(frame))
  931. }
  932. // The channel should now be ready.
  933. XCTAssertNoThrow(try readyChannelMux.wait())
  934. // Now drop the connection.
  935. try self.waitForStateChange(from: .ready, to: .shutdown) {
  936. let shutdown = manager.shutdown()
  937. self.loop.run()
  938. XCTAssertNoThrow(try shutdown.wait())
  939. }
  940. }
  941. func testCloseWithoutActiveRPCs() throws {
  942. let channelPromise = self.loop.makePromise(of: Channel.self)
  943. let manager = self.makeConnectionManager { _, _ in
  944. return channelPromise.futureResult
  945. }
  946. // Start the connection.
  947. let readyChannelMux = self.waitForStateChange(
  948. from: .idle,
  949. to: .connecting
  950. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  951. let readyChannelMux = manager.getHTTP2Multiplexer()
  952. self.loop.run()
  953. return readyChannelMux
  954. }
  955. // Setup the actual channel and activate it.
  956. let channel = EmbeddedChannel(loop: self.loop)
  957. let h2mux = HTTP2StreamMultiplexer(
  958. mode: .client,
  959. channel: channel,
  960. inboundStreamInitializer: nil
  961. )
  962. XCTAssertNoThrow(
  963. try channel.pipeline.addHandlers([
  964. GRPCIdleHandler(
  965. connectionManager: manager,
  966. multiplexer: h2mux,
  967. idleTimeout: .minutes(5),
  968. keepalive: .init(),
  969. logger: self.logger
  970. )
  971. ]).wait()
  972. )
  973. channelPromise.succeed(channel)
  974. self.loop.run()
  975. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  976. XCTAssertNoThrow(try connect.wait())
  977. // "ready" the connection.
  978. try self.waitForStateChange(from: .connecting, to: .ready) {
  979. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  980. XCTAssertNoThrow(try channel.writeInbound(frame))
  981. }
  982. // The HTTP/2 stream multiplexer should now be ready.
  983. XCTAssertNoThrow(try readyChannelMux.wait())
  984. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  985. // failure state.
  986. self.waitForStateChange(from: .ready, to: .idle) {
  987. channel.pipeline.fireChannelInactive()
  988. }
  989. }
  990. func testIdleErrorDoesNothing() throws {
  991. let manager = self.makeConnectionManager()
  992. // Dropping an error on this manager should be fine.
  993. manager.channelError(DoomedChannelError())
  994. // Shutting down is then safe.
  995. try self.waitForStateChange(from: .idle, to: .shutdown) {
  996. let shutdown = manager.shutdown()
  997. self.loop.run()
  998. XCTAssertNoThrow(try shutdown.wait())
  999. }
  1000. }
  1001. func testHTTP2Delegates() throws {
  1002. let channel = EmbeddedChannel(loop: self.loop)
  1003. defer {
  1004. XCTAssertNoThrow(try channel.finish())
  1005. }
  1006. let multiplexer = HTTP2StreamMultiplexer(
  1007. mode: .client,
  1008. channel: channel,
  1009. inboundStreamInitializer: nil
  1010. )
  1011. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  1012. var streamsOpened = 0
  1013. var streamsClosed = 0
  1014. var maxConcurrentStreams = 0
  1015. func streamOpened(_ connectionManager: ConnectionManager) {
  1016. self.streamsOpened += 1
  1017. }
  1018. func streamClosed(_ connectionManager: ConnectionManager) {
  1019. self.streamsClosed += 1
  1020. }
  1021. func receivedSettingsMaxConcurrentStreams(
  1022. _ connectionManager: ConnectionManager,
  1023. maxConcurrentStreams: Int
  1024. ) {
  1025. self.maxConcurrentStreams = maxConcurrentStreams
  1026. }
  1027. }
  1028. let http2 = HTTP2Delegate()
  1029. let manager = ConnectionManager(
  1030. eventLoop: self.loop,
  1031. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  1032. let idleHandler = GRPCIdleHandler(
  1033. connectionManager: manager,
  1034. multiplexer: multiplexer,
  1035. idleTimeout: .minutes(5),
  1036. keepalive: ClientConnectionKeepalive(),
  1037. logger: self.logger
  1038. )
  1039. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  1040. // us to just fire stream created/closed events into the channel.
  1041. do {
  1042. try channel.pipeline.syncOperations.addHandler(idleHandler)
  1043. } catch {
  1044. return eventLoop.makeFailedFuture(error)
  1045. }
  1046. return eventLoop.makeSucceededFuture(channel)
  1047. },
  1048. callStartBehavior: .waitsForConnectivity,
  1049. connectionBackoff: ConnectionBackoff(),
  1050. connectivityDelegate: nil,
  1051. http2Delegate: http2,
  1052. logger: self.logger
  1053. )
  1054. // Start connecting.
  1055. let futureMultiplexer = manager.getHTTP2Multiplexer()
  1056. self.loop.run()
  1057. // Do the actual connecting.
  1058. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  1059. // The channel isn't ready until it's seen a SETTINGS frame.
  1060. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  1061. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  1062. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  1063. }
  1064. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  1065. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  1066. // max concurrent streams.
  1067. XCTAssertNoThrow(try futureMultiplexer.wait())
  1068. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  1069. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  1070. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  1071. // Open some streams.
  1072. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1073. let streamCreated = NIOHTTP2StreamCreatedEvent(
  1074. streamID: streamID,
  1075. localInitialWindowSize: nil,
  1076. remoteInitialWindowSize: nil
  1077. )
  1078. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  1079. }
  1080. // ... and then close them.
  1081. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1082. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  1083. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  1084. }
  1085. XCTAssertEqual(http2.streamsOpened, 4)
  1086. XCTAssertEqual(http2.streamsClosed, 4)
  1087. }
  1088. func testChannelErrorWhenConnecting() throws {
  1089. let channelPromise = self.loop.makePromise(of: Channel.self)
  1090. let manager = self.makeConnectionManager { _, _ in
  1091. return channelPromise.futureResult
  1092. }
  1093. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  1094. from: .idle,
  1095. to: .connecting
  1096. ) {
  1097. let channel = manager.getHTTP2Multiplexer()
  1098. self.loop.run()
  1099. return channel
  1100. }
  1101. self.waitForStateChange(from: .connecting, to: .shutdown) {
  1102. manager.channelError(EventLoopError.shutdown)
  1103. }
  1104. XCTAssertThrowsError(try multiplexer.wait())
  1105. }
  1106. }
  1107. internal struct Change: Hashable, CustomStringConvertible {
  1108. var from: ConnectivityState
  1109. var to: ConnectivityState
  1110. var description: String {
  1111. return "\(self.from) → \(self.to)"
  1112. }
  1113. }
  1114. // Unchecked as all mutable state is modified from a serial queue.
  1115. extension RecordingConnectivityDelegate: @unchecked Sendable {}
  1116. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  1117. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  1118. private let semaphore = DispatchSemaphore(value: 0)
  1119. private var expectation: Expectation = .noExpectation
  1120. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  1121. private enum Expectation {
  1122. /// We have no expectation of any changes. We'll just ignore any changes.
  1123. case noExpectation
  1124. /// We expect one change.
  1125. case one((Change) -> Void)
  1126. /// We expect 'count' changes.
  1127. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  1128. var count: Int {
  1129. switch self {
  1130. case .noExpectation:
  1131. return 0
  1132. case .one:
  1133. return 1
  1134. case let .some(count, _, _):
  1135. return count
  1136. }
  1137. }
  1138. }
  1139. func connectivityStateDidChange(
  1140. from oldState: ConnectivityState,
  1141. to newState: ConnectivityState
  1142. ) {
  1143. self.serialQueue.async {
  1144. switch self.expectation {
  1145. case let .one(verify):
  1146. // We don't care about future changes.
  1147. self.expectation = .noExpectation
  1148. // Verify and notify.
  1149. verify(Change(from: oldState, to: newState))
  1150. self.semaphore.signal()
  1151. case .some(let count, var recorded, let verify):
  1152. recorded.append(Change(from: oldState, to: newState))
  1153. if recorded.count == count {
  1154. // We don't care about future changes.
  1155. self.expectation = .noExpectation
  1156. // Verify and notify.
  1157. verify(recorded)
  1158. self.semaphore.signal()
  1159. } else {
  1160. // Still need more responses.
  1161. self.expectation = .some(count: count, recorded: recorded, verify)
  1162. }
  1163. case .noExpectation:
  1164. // Ignore any changes.
  1165. ()
  1166. }
  1167. }
  1168. }
  1169. func connectionStartedQuiescing() {
  1170. self.serialQueue.async {
  1171. self.quiescingSemaphore.signal()
  1172. }
  1173. }
  1174. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1175. self.serialQueue.async {
  1176. self.expectation = .some(count: count, recorded: [], verify)
  1177. }
  1178. }
  1179. func expectChange(verify: @escaping (Change) -> Void) {
  1180. self.serialQueue.async {
  1181. self.expectation = .one(verify)
  1182. }
  1183. }
  1184. func waitForExpectedChanges(
  1185. timeout: DispatchTimeInterval,
  1186. file: StaticString = #filePath,
  1187. line: UInt = #line
  1188. ) {
  1189. let result = self.semaphore.wait(timeout: .now() + timeout)
  1190. switch result {
  1191. case .success:
  1192. ()
  1193. case .timedOut:
  1194. XCTFail(
  1195. "Timed out before verifying \(self.expectation.count) change(s)",
  1196. file: file,
  1197. line: line
  1198. )
  1199. }
  1200. }
  1201. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1202. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1203. switch result {
  1204. case .success:
  1205. ()
  1206. case .timedOut:
  1207. XCTFail("Timed out waiting for connection to start quiescing")
  1208. }
  1209. }
  1210. }
  1211. extension ConnectionBackoff {
  1212. fileprivate static let oneSecondFixed = ConnectionBackoff(
  1213. initialBackoff: 1.0,
  1214. maximumBackoff: 1.0,
  1215. multiplier: 1.0,
  1216. jitter: 0.0
  1217. )
  1218. }
  1219. private struct DoomedChannelError: Error {}
  1220. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1221. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1222. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1223. self.provider = provider
  1224. }
  1225. func makeChannel(
  1226. managedBy connectionManager: ConnectionManager,
  1227. onEventLoop eventLoop: EventLoop,
  1228. connectTimeout: TimeAmount?,
  1229. logger: Logger
  1230. ) -> EventLoopFuture<Channel> {
  1231. return self.provider(connectionManager, eventLoop)
  1232. }
  1233. }
  1234. extension ConnectionManager {
  1235. // For backwards compatibility, to avoid large diffs in these tests.
  1236. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1237. return self.shutdown(mode: .forceful)
  1238. }
  1239. }