ConnectionManagerTests.swift 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ConnectionManagerTests: GRPCTestCase {
  22. private let loop = EmbeddedEventLoop()
  23. private let recorder = RecordingConnectivityDelegate()
  24. private var defaultConfiguration: ClientConnection.Configuration {
  25. return ClientConnection.Configuration(
  26. target: .unixDomainSocket("/ignored"),
  27. eventLoopGroup: self.loop,
  28. connectivityStateDelegate: self.recorder,
  29. connectionBackoff: nil,
  30. backgroundActivityLogger: self.clientLogger
  31. )
  32. }
  33. override func tearDown() {
  34. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. private func waitForStateChange<Result>(
  38. from: ConnectivityState,
  39. to: ConnectivityState,
  40. timeout: DispatchTimeInterval = .seconds(1),
  41. file: StaticString = #file,
  42. line: UInt = #line,
  43. body: () throws -> Result
  44. ) rethrows -> Result {
  45. self.recorder.expectChange {
  46. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  47. }
  48. let result = try body()
  49. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  50. return result
  51. }
  52. private func waitForStateChanges<Result>(
  53. _ changes: [Change],
  54. timeout: DispatchTimeInterval = .seconds(1),
  55. file: StaticString = #file,
  56. line: UInt = #line,
  57. body: () throws -> Result
  58. ) rethrows -> Result {
  59. self.recorder.expectChanges(changes.count) {
  60. XCTAssertEqual($0, changes)
  61. }
  62. let result = try body()
  63. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  64. return result
  65. }
  66. }
  67. extension ConnectionManagerTests {
  68. func testIdleShutdown() throws {
  69. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  70. try self.waitForStateChange(from: .idle, to: .shutdown) {
  71. let shutdown = manager.shutdown()
  72. self.loop.run()
  73. XCTAssertNoThrow(try shutdown.wait())
  74. }
  75. // Getting a multiplexer should fail.
  76. let multiplexer = manager.getHTTP2Multiplexer()
  77. self.loop.run()
  78. XCTAssertThrowsError(try multiplexer.wait())
  79. }
  80. func testConnectFromIdleFailsWithNoReconnect() {
  81. let channelPromise = self.loop.makePromise(of: Channel.self)
  82. let manager = ConnectionManager.testingOnly(
  83. configuration: self.defaultConfiguration,
  84. logger: self.logger
  85. ) {
  86. channelPromise.futureResult
  87. }
  88. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  89. .waitForStateChange(from: .idle, to: .connecting) {
  90. let channel = manager.getHTTP2Multiplexer()
  91. self.loop.run()
  92. return channel
  93. }
  94. self.waitForStateChange(from: .connecting, to: .shutdown) {
  95. channelPromise.fail(DoomedChannelError())
  96. }
  97. XCTAssertThrowsError(try multiplexer.wait()) {
  98. XCTAssertTrue($0 is DoomedChannelError)
  99. }
  100. }
  101. func testConnectAndDisconnect() throws {
  102. let channelPromise = self.loop.makePromise(of: Channel.self)
  103. let manager = ConnectionManager.testingOnly(
  104. configuration: self.defaultConfiguration,
  105. logger: self.logger
  106. ) {
  107. channelPromise.futureResult
  108. }
  109. // Start the connection.
  110. self.waitForStateChange(from: .idle, to: .connecting) {
  111. _ = manager.getHTTP2Multiplexer()
  112. self.loop.run()
  113. }
  114. // Setup the real channel and activate it.
  115. let channel = EmbeddedChannel(loop: self.loop)
  116. let h2mux = HTTP2StreamMultiplexer(
  117. mode: .client,
  118. channel: channel,
  119. inboundStreamInitializer: nil
  120. )
  121. try channel.pipeline.addHandler(
  122. GRPCIdleHandler(
  123. connectionManager: manager,
  124. multiplexer: h2mux,
  125. idleTimeout: .minutes(5),
  126. keepalive: .init(),
  127. logger: self.logger
  128. )
  129. ).wait()
  130. channelPromise.succeed(channel)
  131. XCTAssertNoThrow(
  132. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  133. .wait()
  134. )
  135. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  136. try self.waitForStateChange(from: .connecting, to: .ready) {
  137. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  138. XCTAssertNoThrow(try channel.writeInbound(frame))
  139. }
  140. // Close the channel.
  141. try self.waitForStateChange(from: .ready, to: .shutdown) {
  142. // Now the channel should be available: shut it down.
  143. let shutdown = manager.shutdown()
  144. self.loop.run()
  145. XCTAssertNoThrow(try shutdown.wait())
  146. }
  147. }
  148. func testConnectAndIdle() throws {
  149. let channelPromise = self.loop.makePromise(of: Channel.self)
  150. let manager = ConnectionManager.testingOnly(
  151. configuration: self.defaultConfiguration,
  152. logger: self.logger
  153. ) {
  154. channelPromise.futureResult
  155. }
  156. // Start the connection.
  157. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  158. .waitForStateChange(from: .idle, to: .connecting) {
  159. let readyChannelMux = manager.getHTTP2Multiplexer()
  160. self.loop.run()
  161. return readyChannelMux
  162. }
  163. // Setup the channel.
  164. let channel = EmbeddedChannel(loop: self.loop)
  165. let h2mux = HTTP2StreamMultiplexer(
  166. mode: .client,
  167. channel: channel,
  168. inboundStreamInitializer: nil
  169. )
  170. try channel.pipeline.addHandler(
  171. GRPCIdleHandler(
  172. connectionManager: manager,
  173. multiplexer: h2mux,
  174. idleTimeout: .minutes(5),
  175. keepalive: .init(),
  176. logger: self.logger
  177. )
  178. ).wait()
  179. channelPromise.succeed(channel)
  180. XCTAssertNoThrow(
  181. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  182. .wait()
  183. )
  184. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  185. try self.waitForStateChange(from: .connecting, to: .ready) {
  186. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  187. XCTAssertNoThrow(try channel.writeInbound(frame))
  188. // Wait for the multiplexer, it _must_ be ready now.
  189. XCTAssertNoThrow(try readyChannelMux.wait())
  190. }
  191. // Go idle. This will shutdown the channel.
  192. try self.waitForStateChange(from: .ready, to: .idle) {
  193. self.loop.advanceTime(by: .minutes(5))
  194. XCTAssertNoThrow(try channel.closeFuture.wait())
  195. }
  196. // Now shutdown.
  197. try self.waitForStateChange(from: .idle, to: .shutdown) {
  198. let shutdown = manager.shutdown()
  199. self.loop.run()
  200. XCTAssertNoThrow(try shutdown.wait())
  201. }
  202. }
  203. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  204. let channelPromise = self.loop.makePromise(of: Channel.self)
  205. let manager = ConnectionManager.testingOnly(
  206. configuration: self.defaultConfiguration,
  207. logger: self.logger
  208. ) {
  209. channelPromise.futureResult
  210. }
  211. // Start the connection.
  212. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  213. .waitForStateChange(from: .idle, to: .connecting) {
  214. let readyChannelMux = manager.getHTTP2Multiplexer()
  215. self.loop.run()
  216. return readyChannelMux
  217. }
  218. // Setup the channel.
  219. let channel = EmbeddedChannel(loop: self.loop)
  220. let h2mux = HTTP2StreamMultiplexer(
  221. mode: .client,
  222. channel: channel,
  223. inboundStreamInitializer: nil
  224. )
  225. try channel.pipeline.addHandler(
  226. GRPCIdleHandler(
  227. connectionManager: manager,
  228. multiplexer: h2mux,
  229. idleTimeout: .minutes(5),
  230. keepalive: .init(),
  231. logger: self.logger
  232. )
  233. ).wait()
  234. channelPromise.succeed(channel)
  235. XCTAssertNoThrow(
  236. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  237. .wait()
  238. )
  239. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  240. try self.waitForStateChange(from: .connecting, to: .ready) {
  241. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  242. XCTAssertNoThrow(try channel.writeInbound(frame))
  243. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  244. XCTAssertNoThrow(try readyChannelMux.wait())
  245. }
  246. // "create" a stream; the details don't matter here.
  247. let streamCreated = NIOHTTP2StreamCreatedEvent(
  248. streamID: 1,
  249. localInitialWindowSize: nil,
  250. remoteInitialWindowSize: nil
  251. )
  252. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  253. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  254. self.loop.advanceTime(by: .minutes(5))
  255. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  256. self.waitForStateChange(from: .ready, to: .idle) {
  257. // Close the stream.
  258. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  259. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  260. // ... wait for the idle timeout,
  261. self.loop.advanceTime(by: .minutes(5))
  262. }
  263. // Now shutdown.
  264. try self.waitForStateChange(from: .idle, to: .shutdown) {
  265. let shutdown = manager.shutdown()
  266. self.loop.run()
  267. XCTAssertNoThrow(try shutdown.wait())
  268. }
  269. }
  270. func testConnectAndThenBecomeInactive() throws {
  271. let channelPromise = self.loop.makePromise(of: Channel.self)
  272. let manager = ConnectionManager.testingOnly(
  273. configuration: self.defaultConfiguration,
  274. logger: self.logger
  275. ) {
  276. channelPromise.futureResult
  277. }
  278. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  279. .waitForStateChange(from: .idle, to: .connecting) {
  280. let readyChannelMux = manager.getHTTP2Multiplexer()
  281. self.loop.run()
  282. return readyChannelMux
  283. }
  284. // Setup the channel.
  285. let channel = EmbeddedChannel(loop: self.loop)
  286. let h2mux = HTTP2StreamMultiplexer(
  287. mode: .client,
  288. channel: channel,
  289. inboundStreamInitializer: nil
  290. )
  291. try channel.pipeline.addHandler(
  292. GRPCIdleHandler(
  293. connectionManager: manager,
  294. multiplexer: h2mux,
  295. idleTimeout: .minutes(5),
  296. keepalive: .init(),
  297. logger: self.logger
  298. )
  299. ).wait()
  300. channelPromise.succeed(channel)
  301. XCTAssertNoThrow(
  302. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  303. .wait()
  304. )
  305. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  306. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  307. let shutdown = manager.shutdown()
  308. self.loop.run()
  309. XCTAssertNoThrow(try shutdown.wait())
  310. }
  311. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  312. // the `readyChannelMux` should error.
  313. XCTAssertThrowsError(try readyChannelMux.wait())
  314. }
  315. func testConnectOnSecondAttempt() throws {
  316. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  317. let channelFutures: [EventLoopFuture<Channel>] = [
  318. self.loop.makeFailedFuture(DoomedChannelError()),
  319. channelPromise.futureResult,
  320. ]
  321. var channelFutureIterator = channelFutures.makeIterator()
  322. var configuration = self.defaultConfiguration
  323. configuration.connectionBackoff = .oneSecondFixed
  324. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  325. guard let next = channelFutureIterator.next() else {
  326. XCTFail("Too many channels requested")
  327. return self.loop.makeFailedFuture(DoomedChannelError())
  328. }
  329. return next
  330. }
  331. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  332. Change(from: .idle, to: .connecting),
  333. Change(from: .connecting, to: .transientFailure),
  334. ]) {
  335. // Get a HTTP/2 stream multiplexer.
  336. let readyChannelMux = manager.getHTTP2Multiplexer()
  337. self.loop.run()
  338. return readyChannelMux
  339. }
  340. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  341. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  342. self.loop.run()
  343. // Move time forwards by a second to start the next connection attempt.
  344. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  345. self.loop.advanceTime(by: .seconds(1))
  346. }
  347. // Setup the actual channel and complete the promise.
  348. let channel = EmbeddedChannel(loop: self.loop)
  349. let h2mux = HTTP2StreamMultiplexer(
  350. mode: .client,
  351. channel: channel,
  352. inboundStreamInitializer: nil
  353. )
  354. try channel.pipeline.addHandler(
  355. GRPCIdleHandler(
  356. connectionManager: manager,
  357. multiplexer: h2mux,
  358. idleTimeout: .minutes(5),
  359. keepalive: .init(),
  360. logger: self.logger
  361. )
  362. ).wait()
  363. channelPromise.succeed(channel)
  364. XCTAssertNoThrow(
  365. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  366. .wait()
  367. )
  368. // Write a SETTINGS frame on the root stream.
  369. try self.waitForStateChange(from: .connecting, to: .ready) {
  370. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  371. XCTAssertNoThrow(try channel.writeInbound(frame))
  372. }
  373. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  374. XCTAssertNoThrow(try readyChannelMux.wait())
  375. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  376. // Now shutdown.
  377. try self.waitForStateChange(from: .ready, to: .shutdown) {
  378. let shutdown = manager.shutdown()
  379. self.loop.run()
  380. XCTAssertNoThrow(try shutdown.wait())
  381. }
  382. }
  383. func testShutdownWhileConnecting() throws {
  384. let channelPromise = self.loop.makePromise(of: Channel.self)
  385. let manager = ConnectionManager.testingOnly(
  386. configuration: self.defaultConfiguration,
  387. logger: self.logger
  388. ) {
  389. channelPromise.futureResult
  390. }
  391. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  392. .waitForStateChange(from: .idle, to: .connecting) {
  393. let readyChannelMux = manager.getHTTP2Multiplexer()
  394. self.loop.run()
  395. return readyChannelMux
  396. }
  397. // Now shutdown.
  398. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  399. let shutdown = manager.shutdown()
  400. self.loop.run()
  401. XCTAssertNoThrow(try shutdown.wait())
  402. }
  403. // The multiplexer we were requesting should fail.
  404. XCTAssertThrowsError(try readyChannelMux.wait())
  405. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  406. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  407. let channel = try channelPromise.futureResult.wait()
  408. self.loop.run()
  409. XCTAssertNoThrow(try channel.closeFuture.wait())
  410. }
  411. func testShutdownWhileTransientFailure() throws {
  412. var configuration = self.defaultConfiguration
  413. configuration.connectionBackoff = .oneSecondFixed
  414. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  415. self.loop.makeFailedFuture(DoomedChannelError())
  416. }
  417. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  418. Change(from: .idle, to: .connecting),
  419. Change(from: .connecting, to: .transientFailure),
  420. ]) {
  421. // Get a HTTP/2 stream multiplexer.
  422. let readyChannelMux = manager.getHTTP2Multiplexer()
  423. self.loop.run()
  424. return readyChannelMux
  425. }
  426. // Now shutdown.
  427. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  428. let shutdown = manager.shutdown()
  429. self.loop.run()
  430. XCTAssertNoThrow(try shutdown.wait())
  431. }
  432. // The HTTP/2 stream mux we were requesting should fail.
  433. XCTAssertThrowsError(try readyChannelMux.wait())
  434. }
  435. func testShutdownWhileActive() throws {
  436. let channelPromise = self.loop.makePromise(of: Channel.self)
  437. let manager = ConnectionManager.testingOnly(
  438. configuration: self.defaultConfiguration,
  439. logger: self.logger
  440. ) {
  441. channelPromise.futureResult
  442. }
  443. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  444. .waitForStateChange(from: .idle, to: .connecting) {
  445. let readyChannelMux = manager.getHTTP2Multiplexer()
  446. self.loop.run()
  447. return readyChannelMux
  448. }
  449. // Prepare the channel
  450. let channel = EmbeddedChannel(loop: self.loop)
  451. let h2mux = HTTP2StreamMultiplexer(
  452. mode: .client,
  453. channel: channel,
  454. inboundStreamInitializer: nil
  455. )
  456. try channel.pipeline.addHandler(
  457. GRPCIdleHandler(
  458. connectionManager: manager,
  459. multiplexer: h2mux,
  460. idleTimeout: .minutes(5),
  461. keepalive: .init(),
  462. logger: self.logger
  463. )
  464. ).wait()
  465. channelPromise.succeed(channel)
  466. XCTAssertNoThrow(
  467. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  468. .wait()
  469. )
  470. // (No state change expected here: active is an internal state.)
  471. // Now shutdown.
  472. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  473. let shutdown = manager.shutdown()
  474. self.loop.run()
  475. XCTAssertNoThrow(try shutdown.wait())
  476. }
  477. // The HTTP/2 stream multiplexer we were requesting should fail.
  478. XCTAssertThrowsError(try readyChannelMux.wait())
  479. }
  480. func testShutdownWhileShutdown() throws {
  481. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  482. try self.waitForStateChange(from: .idle, to: .shutdown) {
  483. let firstShutdown = manager.shutdown()
  484. self.loop.run()
  485. XCTAssertNoThrow(try firstShutdown.wait())
  486. }
  487. let secondShutdown = manager.shutdown()
  488. self.loop.run()
  489. XCTAssertNoThrow(try secondShutdown.wait())
  490. }
  491. func testTransientFailureWhileActive() throws {
  492. var configuration = self.defaultConfiguration
  493. configuration.connectionBackoff = .oneSecondFixed
  494. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  495. let channelFutures: [EventLoopFuture<Channel>] = [
  496. channelPromise.futureResult,
  497. self.loop.makeFailedFuture(DoomedChannelError()),
  498. ]
  499. var channelFutureIterator = channelFutures.makeIterator()
  500. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  501. guard let next = channelFutureIterator.next() else {
  502. XCTFail("Too many channels requested")
  503. return self.loop.makeFailedFuture(DoomedChannelError())
  504. }
  505. return next
  506. }
  507. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  508. .waitForStateChange(from: .idle, to: .connecting) {
  509. let readyChannelMux = manager.getHTTP2Multiplexer()
  510. self.loop.run()
  511. return readyChannelMux
  512. }
  513. // Prepare the channel
  514. let firstChannel = EmbeddedChannel(loop: self.loop)
  515. let h2mux = HTTP2StreamMultiplexer(
  516. mode: .client,
  517. channel: firstChannel,
  518. inboundStreamInitializer: nil
  519. )
  520. try firstChannel.pipeline.addHandler(
  521. GRPCIdleHandler(
  522. connectionManager: manager,
  523. multiplexer: h2mux,
  524. idleTimeout: .minutes(5),
  525. keepalive: .init(),
  526. logger: self.logger
  527. )
  528. ).wait()
  529. channelPromise.succeed(firstChannel)
  530. XCTAssertNoThrow(
  531. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  532. .wait()
  533. )
  534. // (No state change expected here: active is an internal state.)
  535. // Close the channel (simulate e.g. TLS handshake failed)
  536. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  537. XCTAssertNoThrow(try firstChannel.close().wait())
  538. }
  539. // Start connecting again.
  540. self.waitForStateChanges([
  541. Change(from: .transientFailure, to: .connecting),
  542. Change(from: .connecting, to: .transientFailure),
  543. ]) {
  544. self.loop.advanceTime(by: .seconds(1))
  545. }
  546. // Now shutdown
  547. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  548. let shutdown = manager.shutdown()
  549. self.loop.run()
  550. XCTAssertNoThrow(try shutdown.wait())
  551. }
  552. // The channel never came up: it should be throw.
  553. XCTAssertThrowsError(try readyChannelMux.wait())
  554. }
  555. func testTransientFailureWhileReady() throws {
  556. var configuration = self.defaultConfiguration
  557. configuration.connectionBackoff = .oneSecondFixed
  558. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  559. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  560. let channelFutures: [EventLoopFuture<Channel>] = [
  561. firstChannelPromise.futureResult,
  562. secondChannelPromise.futureResult,
  563. ]
  564. var channelFutureIterator = channelFutures.makeIterator()
  565. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  566. guard let next = channelFutureIterator.next() else {
  567. XCTFail("Too many channels requested")
  568. return self.loop.makeFailedFuture(DoomedChannelError())
  569. }
  570. return next
  571. }
  572. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  573. .waitForStateChange(from: .idle, to: .connecting) {
  574. let readyChannelMux = manager.getHTTP2Multiplexer()
  575. self.loop.run()
  576. return readyChannelMux
  577. }
  578. // Prepare the first channel
  579. let firstChannel = EmbeddedChannel(loop: self.loop)
  580. let firstH2mux = HTTP2StreamMultiplexer(
  581. mode: .client,
  582. channel: firstChannel,
  583. inboundStreamInitializer: nil
  584. )
  585. try firstChannel.pipeline.addHandler(
  586. GRPCIdleHandler(
  587. connectionManager: manager,
  588. multiplexer: firstH2mux,
  589. idleTimeout: .minutes(5),
  590. keepalive: .init(),
  591. logger: self.logger
  592. )
  593. ).wait()
  594. firstChannelPromise.succeed(firstChannel)
  595. XCTAssertNoThrow(
  596. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  597. .wait()
  598. )
  599. // Write a SETTINGS frame on the root stream.
  600. try self.waitForStateChange(from: .connecting, to: .ready) {
  601. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  602. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  603. }
  604. // Channel should now be ready.
  605. XCTAssertNoThrow(try readyChannelMux.wait())
  606. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  607. let streamCreated = NIOHTTP2StreamCreatedEvent(
  608. streamID: 1,
  609. localInitialWindowSize: nil,
  610. remoteInitialWindowSize: nil
  611. )
  612. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  613. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  614. XCTAssertNoThrow(try firstChannel.close().wait())
  615. }
  616. // Run to start connecting again.
  617. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  618. self.loop.advanceTime(by: .seconds(1))
  619. }
  620. // Prepare the second channel
  621. let secondChannel = EmbeddedChannel(loop: self.loop)
  622. let secondH2mux = HTTP2StreamMultiplexer(
  623. mode: .client,
  624. channel: secondChannel,
  625. inboundStreamInitializer: nil
  626. )
  627. try secondChannel.pipeline.addHandler(
  628. GRPCIdleHandler(
  629. connectionManager: manager,
  630. multiplexer: secondH2mux,
  631. idleTimeout: .minutes(5),
  632. keepalive: .init(),
  633. logger: self.logger
  634. )
  635. ).wait()
  636. secondChannelPromise.succeed(secondChannel)
  637. XCTAssertNoThrow(
  638. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  639. .wait()
  640. )
  641. // Write a SETTINGS frame on the root stream.
  642. try self.waitForStateChange(from: .connecting, to: .ready) {
  643. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  644. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  645. }
  646. // Now shutdown
  647. try self.waitForStateChange(from: .ready, to: .shutdown) {
  648. let shutdown = manager.shutdown()
  649. self.loop.run()
  650. XCTAssertNoThrow(try shutdown.wait())
  651. }
  652. }
  653. func testGoAwayWhenReady() throws {
  654. let channelPromise = self.loop.makePromise(of: Channel.self)
  655. let manager = ConnectionManager.testingOnly(
  656. configuration: self.defaultConfiguration,
  657. logger: self.logger
  658. ) {
  659. channelPromise.futureResult
  660. }
  661. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  662. .waitForStateChange(from: .idle, to: .connecting) {
  663. let readyChannelMux = manager.getHTTP2Multiplexer()
  664. self.loop.run()
  665. return readyChannelMux
  666. }
  667. // Setup the channel.
  668. let channel = EmbeddedChannel(loop: self.loop)
  669. let h2mux = HTTP2StreamMultiplexer(
  670. mode: .client,
  671. channel: channel,
  672. inboundStreamInitializer: nil
  673. )
  674. try channel.pipeline.addHandler(
  675. GRPCIdleHandler(
  676. connectionManager: manager,
  677. multiplexer: h2mux,
  678. idleTimeout: .minutes(5),
  679. keepalive: .init(),
  680. logger: self.logger
  681. )
  682. ).wait()
  683. channelPromise.succeed(channel)
  684. XCTAssertNoThrow(
  685. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  686. .wait()
  687. )
  688. try self.waitForStateChange(from: .connecting, to: .ready) {
  689. // Write a SETTINGS frame on the root stream.
  690. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  691. XCTAssertNoThrow(try channel.writeInbound(frame))
  692. }
  693. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  694. XCTAssertNoThrow(try readyChannelMux.wait())
  695. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  696. // channel to close.
  697. try self.waitForStateChange(from: .ready, to: .idle) {
  698. let goAway = HTTP2Frame(
  699. streamID: .rootStream,
  700. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  701. )
  702. XCTAssertNoThrow(try channel.writeInbound(goAway))
  703. }
  704. self.loop.run()
  705. XCTAssertNoThrow(try channel.closeFuture.wait())
  706. // Now shutdown
  707. try self.waitForStateChange(from: .idle, to: .shutdown) {
  708. let shutdown = manager.shutdown()
  709. self.loop.run()
  710. XCTAssertNoThrow(try shutdown.wait())
  711. }
  712. }
  713. func testDoomedOptimisticChannelFromIdle() {
  714. var configuration = self.defaultConfiguration
  715. configuration.callStartBehavior = .fastFailure
  716. let manager = ConnectionManager.testingOnly(
  717. configuration: configuration,
  718. logger: self.logger
  719. ) {
  720. self.loop.makeFailedFuture(DoomedChannelError())
  721. }
  722. let candidate = manager.getHTTP2Multiplexer()
  723. self.loop.run()
  724. XCTAssertThrowsError(try candidate.wait())
  725. }
  726. func testDoomedOptimisticChannelFromConnecting() throws {
  727. var configuration = self.defaultConfiguration
  728. configuration.callStartBehavior = .fastFailure
  729. let promise = self.loop.makePromise(of: Channel.self)
  730. let manager = ConnectionManager.testingOnly(
  731. configuration: configuration,
  732. logger: self.logger
  733. ) {
  734. promise.futureResult
  735. }
  736. self.waitForStateChange(from: .idle, to: .connecting) {
  737. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  738. _ = manager.getHTTP2Multiplexer()
  739. self.loop.run()
  740. }
  741. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  742. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  743. self.loop.run()
  744. // Fail the promise.
  745. promise.fail(DoomedChannelError())
  746. XCTAssertThrowsError(try optimisticChannelMux.wait())
  747. }
  748. func testOptimisticChannelFromTransientFailure() throws {
  749. var configuration = self.defaultConfiguration
  750. configuration.callStartBehavior = .fastFailure
  751. configuration.connectionBackoff = ConnectionBackoff()
  752. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  753. self.loop.makeFailedFuture(DoomedChannelError())
  754. }
  755. self.waitForStateChanges([
  756. Change(from: .idle, to: .connecting),
  757. Change(from: .connecting, to: .transientFailure),
  758. ]) {
  759. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  760. _ = manager.getHTTP2Multiplexer()
  761. self.loop.run()
  762. }
  763. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  764. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  765. self.loop.run()
  766. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  767. XCTAssertTrue(error is DoomedChannelError)
  768. }
  769. }
  770. func testOptimisticChannelFromShutdown() throws {
  771. var configuration = self.defaultConfiguration
  772. configuration.callStartBehavior = .fastFailure
  773. let manager = ConnectionManager.testingOnly(
  774. configuration: configuration,
  775. logger: self.logger
  776. ) {
  777. self.loop.makeFailedFuture(DoomedChannelError())
  778. }
  779. let shutdown = manager.shutdown()
  780. self.loop.run()
  781. XCTAssertNoThrow(try shutdown.wait())
  782. // Get a channel optimistically. It'll fail, obviously.
  783. let channelMux = manager.getHTTP2Multiplexer()
  784. self.loop.run()
  785. XCTAssertThrowsError(try channelMux.wait())
  786. }
  787. func testForceIdleAfterInactive() throws {
  788. let channelPromise = self.loop.makePromise(of: Channel.self)
  789. let manager = ConnectionManager.testingOnly(
  790. configuration: self.defaultConfiguration,
  791. logger: self.logger
  792. ) {
  793. channelPromise.futureResult
  794. }
  795. // Start the connection.
  796. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  797. from: .idle,
  798. to: .connecting
  799. ) {
  800. let readyChannelMux = manager.getHTTP2Multiplexer()
  801. self.loop.run()
  802. return readyChannelMux
  803. }
  804. // Setup the real channel and activate it.
  805. let channel = EmbeddedChannel(loop: self.loop)
  806. let h2mux = HTTP2StreamMultiplexer(
  807. mode: .client,
  808. channel: channel,
  809. inboundStreamInitializer: nil
  810. )
  811. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  812. GRPCIdleHandler(
  813. connectionManager: manager,
  814. multiplexer: h2mux,
  815. idleTimeout: .minutes(5),
  816. keepalive: .init(),
  817. logger: self.logger
  818. ),
  819. ]).wait())
  820. channelPromise.succeed(channel)
  821. self.loop.run()
  822. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  823. XCTAssertNoThrow(try connect.wait())
  824. // Write a SETTINGS frame on the root stream.
  825. try self.waitForStateChange(from: .connecting, to: .ready) {
  826. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  827. XCTAssertNoThrow(try channel.writeInbound(frame))
  828. }
  829. // The channel should now be ready.
  830. XCTAssertNoThrow(try readyChannelMux.wait())
  831. // Now drop the connection.
  832. try self.waitForStateChange(from: .ready, to: .shutdown) {
  833. let shutdown = manager.shutdown()
  834. self.loop.run()
  835. XCTAssertNoThrow(try shutdown.wait())
  836. }
  837. }
  838. func testCloseWithoutActiveRPCs() throws {
  839. let channelPromise = self.loop.makePromise(of: Channel.self)
  840. let manager = ConnectionManager.testingOnly(
  841. configuration: self.defaultConfiguration,
  842. logger: self.logger
  843. ) {
  844. channelPromise.futureResult
  845. }
  846. // Start the connection.
  847. let readyChannelMux = self.waitForStateChange(
  848. from: .idle,
  849. to: .connecting
  850. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  851. let readyChannelMux = manager.getHTTP2Multiplexer()
  852. self.loop.run()
  853. return readyChannelMux
  854. }
  855. // Setup the actual channel and activate it.
  856. let channel = EmbeddedChannel(loop: self.loop)
  857. let h2mux = HTTP2StreamMultiplexer(
  858. mode: .client,
  859. channel: channel,
  860. inboundStreamInitializer: nil
  861. )
  862. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  863. GRPCIdleHandler(
  864. connectionManager: manager,
  865. multiplexer: h2mux,
  866. idleTimeout: .minutes(5),
  867. keepalive: .init(),
  868. logger: self.logger
  869. ),
  870. ]).wait())
  871. channelPromise.succeed(channel)
  872. self.loop.run()
  873. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  874. XCTAssertNoThrow(try connect.wait())
  875. // "ready" the connection.
  876. try self.waitForStateChange(from: .connecting, to: .ready) {
  877. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  878. XCTAssertNoThrow(try channel.writeInbound(frame))
  879. }
  880. // The HTTP/2 stream multiplexer should now be ready.
  881. XCTAssertNoThrow(try readyChannelMux.wait())
  882. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  883. // failure state.
  884. self.waitForStateChange(from: .ready, to: .idle) {
  885. channel.pipeline.fireChannelInactive()
  886. }
  887. }
  888. func testIdleErrorDoesNothing() throws {
  889. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  890. // Dropping an error on this manager should be fine.
  891. manager.channelError(DoomedChannelError())
  892. // Shutting down is then safe.
  893. try self.waitForStateChange(from: .idle, to: .shutdown) {
  894. let shutdown = manager.shutdown()
  895. self.loop.run()
  896. XCTAssertNoThrow(try shutdown.wait())
  897. }
  898. }
  899. }
  900. internal struct Change: Hashable, CustomStringConvertible {
  901. var from: ConnectivityState
  902. var to: ConnectivityState
  903. var description: String {
  904. return "\(self.from) → \(self.to)"
  905. }
  906. }
  907. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  908. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  909. private let semaphore = DispatchSemaphore(value: 0)
  910. private var expectation: Expectation = .noExpectation
  911. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  912. private enum Expectation {
  913. /// We have no expectation of any changes. We'll just ignore any changes.
  914. case noExpectation
  915. /// We expect one change.
  916. case one((Change) -> Void)
  917. /// We expect 'count' changes.
  918. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  919. var count: Int {
  920. switch self {
  921. case .noExpectation:
  922. return 0
  923. case .one:
  924. return 1
  925. case let .some(count, _, _):
  926. return count
  927. }
  928. }
  929. }
  930. func connectivityStateDidChange(from oldState: ConnectivityState,
  931. to newState: ConnectivityState) {
  932. self.serialQueue.async {
  933. switch self.expectation {
  934. case let .one(verify):
  935. // We don't care about future changes.
  936. self.expectation = .noExpectation
  937. // Verify and notify.
  938. verify(Change(from: oldState, to: newState))
  939. self.semaphore.signal()
  940. case .some(let count, var recorded, let verify):
  941. recorded.append(Change(from: oldState, to: newState))
  942. if recorded.count == count {
  943. // We don't care about future changes.
  944. self.expectation = .noExpectation
  945. // Verify and notify.
  946. verify(recorded)
  947. self.semaphore.signal()
  948. } else {
  949. // Still need more responses.
  950. self.expectation = .some(count: count, recorded: recorded, verify)
  951. }
  952. case .noExpectation:
  953. // Ignore any changes.
  954. ()
  955. }
  956. }
  957. }
  958. func connectionStartedQuiescing() {
  959. self.serialQueue.async {
  960. self.quiescingSemaphore.signal()
  961. }
  962. }
  963. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  964. self.serialQueue.async {
  965. self.expectation = .some(count: count, recorded: [], verify)
  966. }
  967. }
  968. func expectChange(verify: @escaping (Change) -> Void) {
  969. self.serialQueue.async {
  970. self.expectation = .one(verify)
  971. }
  972. }
  973. func waitForExpectedChanges(
  974. timeout: DispatchTimeInterval,
  975. file: StaticString = #file,
  976. line: UInt = #line
  977. ) {
  978. let result = self.semaphore.wait(timeout: .now() + timeout)
  979. switch result {
  980. case .success:
  981. ()
  982. case .timedOut:
  983. XCTFail(
  984. "Timed out before verifying \(self.expectation.count) change(s)",
  985. file: file, line: line
  986. )
  987. }
  988. }
  989. func waitForQuiescing(timeout: DispatchTimeInterval) {
  990. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  991. switch result {
  992. case .success:
  993. ()
  994. case .timedOut:
  995. XCTFail("Timed out waiting for connection to start quiescing")
  996. }
  997. }
  998. }
  999. private extension ConnectionBackoff {
  1000. static let oneSecondFixed = ConnectionBackoff(
  1001. initialBackoff: 1.0,
  1002. maximumBackoff: 1.0,
  1003. multiplier: 1.0,
  1004. jitter: 0.0
  1005. )
  1006. }
  1007. private struct DoomedChannelError: Error {}