ConnectionManagerTests.swift 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ConnectionManagerTests: GRPCTestCase {
  22. private let loop = EmbeddedEventLoop()
  23. private let recorder = RecordingConnectivityDelegate()
  24. private var defaultConfiguration: ClientConnection.Configuration {
  25. return ClientConnection.Configuration(
  26. target: .unixDomainSocket("/ignored"),
  27. eventLoopGroup: self.loop,
  28. connectivityStateDelegate: self.recorder,
  29. connectionBackoff: nil,
  30. backgroundActivityLogger: self.clientLogger
  31. )
  32. }
  33. override func tearDown() {
  34. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. private func waitForStateChange<Result>(
  38. from: ConnectivityState,
  39. to: ConnectivityState,
  40. timeout: DispatchTimeInterval = .seconds(1),
  41. file: StaticString = #file,
  42. line: UInt = #line,
  43. body: () throws -> Result
  44. ) rethrows -> Result {
  45. self.recorder.expectChange {
  46. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  47. }
  48. let result = try body()
  49. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  50. return result
  51. }
  52. private func waitForStateChanges<Result>(
  53. _ changes: [Change],
  54. timeout: DispatchTimeInterval = .seconds(1),
  55. file: StaticString = #file,
  56. line: UInt = #line,
  57. body: () throws -> Result
  58. ) rethrows -> Result {
  59. self.recorder.expectChanges(changes.count) {
  60. XCTAssertEqual($0, changes)
  61. }
  62. let result = try body()
  63. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  64. return result
  65. }
  66. }
  67. extension ConnectionManagerTests {
  68. func testIdleShutdown() throws {
  69. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  70. try self.waitForStateChange(from: .idle, to: .shutdown) {
  71. let shutdown = manager.shutdown()
  72. self.loop.run()
  73. XCTAssertNoThrow(try shutdown.wait())
  74. }
  75. // Getting a multiplexer should fail.
  76. let multiplexer = manager.getHTTP2Multiplexer()
  77. self.loop.run()
  78. XCTAssertThrowsError(try multiplexer.wait())
  79. }
  80. func testConnectFromIdleFailsWithNoReconnect() {
  81. let channelPromise = self.loop.makePromise(of: Channel.self)
  82. let manager = ConnectionManager.testingOnly(
  83. configuration: self.defaultConfiguration,
  84. logger: self.logger
  85. ) {
  86. channelPromise.futureResult
  87. }
  88. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  89. .waitForStateChange(from: .idle, to: .connecting) {
  90. let channel = manager.getHTTP2Multiplexer()
  91. self.loop.run()
  92. return channel
  93. }
  94. self.waitForStateChange(from: .connecting, to: .shutdown) {
  95. channelPromise.fail(DoomedChannelError())
  96. }
  97. XCTAssertThrowsError(try multiplexer.wait()) {
  98. XCTAssertTrue($0 is DoomedChannelError)
  99. }
  100. }
  101. func testConnectAndDisconnect() throws {
  102. let channelPromise = self.loop.makePromise(of: Channel.self)
  103. let manager = ConnectionManager.testingOnly(
  104. configuration: self.defaultConfiguration,
  105. logger: self.logger
  106. ) {
  107. channelPromise.futureResult
  108. }
  109. // Start the connection.
  110. self.waitForStateChange(from: .idle, to: .connecting) {
  111. _ = manager.getHTTP2Multiplexer()
  112. self.loop.run()
  113. }
  114. // Setup the real channel and activate it.
  115. let channel = EmbeddedChannel(loop: self.loop)
  116. let h2mux = HTTP2StreamMultiplexer(
  117. mode: .client,
  118. channel: channel,
  119. inboundStreamInitializer: nil
  120. )
  121. try channel.pipeline.addHandler(
  122. GRPCIdleHandler(
  123. mode: .client(manager, h2mux),
  124. logger: self.logger,
  125. idleTimeout: .minutes(5)
  126. )
  127. ).wait()
  128. channelPromise.succeed(channel)
  129. XCTAssertNoThrow(
  130. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  131. .wait()
  132. )
  133. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  134. try self.waitForStateChange(from: .connecting, to: .ready) {
  135. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  136. XCTAssertNoThrow(try channel.writeInbound(frame))
  137. }
  138. // Close the channel.
  139. try self.waitForStateChange(from: .ready, to: .shutdown) {
  140. // Now the channel should be available: shut it down.
  141. let shutdown = manager.shutdown()
  142. self.loop.run()
  143. XCTAssertNoThrow(try shutdown.wait())
  144. }
  145. }
  146. func testConnectAndIdle() throws {
  147. let channelPromise = self.loop.makePromise(of: Channel.self)
  148. let manager = ConnectionManager.testingOnly(
  149. configuration: self.defaultConfiguration,
  150. logger: self.logger
  151. ) {
  152. channelPromise.futureResult
  153. }
  154. // Start the connection.
  155. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  156. .waitForStateChange(from: .idle, to: .connecting) {
  157. let readyChannelMux = manager.getHTTP2Multiplexer()
  158. self.loop.run()
  159. return readyChannelMux
  160. }
  161. // Setup the channel.
  162. let channel = EmbeddedChannel(loop: self.loop)
  163. let h2mux = HTTP2StreamMultiplexer(
  164. mode: .client,
  165. channel: channel,
  166. inboundStreamInitializer: nil
  167. )
  168. try channel.pipeline.addHandler(
  169. GRPCIdleHandler(
  170. mode: .client(manager, h2mux),
  171. logger: self.logger,
  172. idleTimeout: .minutes(5)
  173. )
  174. ).wait()
  175. channelPromise.succeed(channel)
  176. XCTAssertNoThrow(
  177. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  178. .wait()
  179. )
  180. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  181. try self.waitForStateChange(from: .connecting, to: .ready) {
  182. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  183. XCTAssertNoThrow(try channel.writeInbound(frame))
  184. // Wait for the multiplexer, it _must_ be ready now.
  185. XCTAssertNoThrow(try readyChannelMux.wait())
  186. }
  187. // Go idle. This will shutdown the channel.
  188. try self.waitForStateChange(from: .ready, to: .idle) {
  189. self.loop.advanceTime(by: .minutes(5))
  190. XCTAssertNoThrow(try channel.closeFuture.wait())
  191. }
  192. // Now shutdown.
  193. try self.waitForStateChange(from: .idle, to: .shutdown) {
  194. let shutdown = manager.shutdown()
  195. self.loop.run()
  196. XCTAssertNoThrow(try shutdown.wait())
  197. }
  198. }
  199. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  200. let channelPromise = self.loop.makePromise(of: Channel.self)
  201. let manager = ConnectionManager.testingOnly(
  202. configuration: self.defaultConfiguration,
  203. logger: self.logger
  204. ) {
  205. channelPromise.futureResult
  206. }
  207. // Start the connection.
  208. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  209. .waitForStateChange(from: .idle, to: .connecting) {
  210. let readyChannelMux = manager.getHTTP2Multiplexer()
  211. self.loop.run()
  212. return readyChannelMux
  213. }
  214. // Setup the channel.
  215. let channel = EmbeddedChannel(loop: self.loop)
  216. let h2mux = HTTP2StreamMultiplexer(
  217. mode: .client,
  218. channel: channel,
  219. inboundStreamInitializer: nil
  220. )
  221. try channel.pipeline.addHandler(
  222. GRPCIdleHandler(
  223. mode: .client(manager, h2mux),
  224. logger: self.logger,
  225. idleTimeout: .minutes(5)
  226. )
  227. ).wait()
  228. channelPromise.succeed(channel)
  229. XCTAssertNoThrow(
  230. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  231. .wait()
  232. )
  233. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  234. try self.waitForStateChange(from: .connecting, to: .ready) {
  235. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  236. XCTAssertNoThrow(try channel.writeInbound(frame))
  237. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  238. XCTAssertNoThrow(try readyChannelMux.wait())
  239. }
  240. // "create" a stream; the details don't matter here.
  241. let streamCreated = NIOHTTP2StreamCreatedEvent(
  242. streamID: 1,
  243. localInitialWindowSize: nil,
  244. remoteInitialWindowSize: nil
  245. )
  246. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  247. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  248. self.loop.advanceTime(by: .minutes(5))
  249. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  250. self.waitForStateChange(from: .ready, to: .idle) {
  251. // Close the stream.
  252. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  253. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  254. // ... wait for the idle timeout,
  255. self.loop.advanceTime(by: .minutes(5))
  256. }
  257. // Now shutdown.
  258. try self.waitForStateChange(from: .idle, to: .shutdown) {
  259. let shutdown = manager.shutdown()
  260. self.loop.run()
  261. XCTAssertNoThrow(try shutdown.wait())
  262. }
  263. }
  264. func testConnectAndThenBecomeInactive() throws {
  265. let channelPromise = self.loop.makePromise(of: Channel.self)
  266. let manager = ConnectionManager.testingOnly(
  267. configuration: self.defaultConfiguration,
  268. logger: self.logger
  269. ) {
  270. channelPromise.futureResult
  271. }
  272. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  273. .waitForStateChange(from: .idle, to: .connecting) {
  274. let readyChannelMux = manager.getHTTP2Multiplexer()
  275. self.loop.run()
  276. return readyChannelMux
  277. }
  278. // Setup the channel.
  279. let channel = EmbeddedChannel(loop: self.loop)
  280. let h2mux = HTTP2StreamMultiplexer(
  281. mode: .client,
  282. channel: channel,
  283. inboundStreamInitializer: nil
  284. )
  285. try channel.pipeline.addHandler(
  286. GRPCIdleHandler(
  287. mode: .client(manager, h2mux),
  288. logger: self.logger,
  289. idleTimeout: .minutes(5)
  290. )
  291. ).wait()
  292. channelPromise.succeed(channel)
  293. XCTAssertNoThrow(
  294. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  295. .wait()
  296. )
  297. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  298. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  299. let shutdown = manager.shutdown()
  300. self.loop.run()
  301. XCTAssertNoThrow(try shutdown.wait())
  302. }
  303. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  304. // the `readyChannelMux` should error.
  305. XCTAssertThrowsError(try readyChannelMux.wait())
  306. }
  307. func testConnectOnSecondAttempt() throws {
  308. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  309. let channelFutures: [EventLoopFuture<Channel>] = [
  310. self.loop.makeFailedFuture(DoomedChannelError()),
  311. channelPromise.futureResult,
  312. ]
  313. var channelFutureIterator = channelFutures.makeIterator()
  314. var configuration = self.defaultConfiguration
  315. configuration.connectionBackoff = .oneSecondFixed
  316. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  317. guard let next = channelFutureIterator.next() else {
  318. XCTFail("Too many channels requested")
  319. return self.loop.makeFailedFuture(DoomedChannelError())
  320. }
  321. return next
  322. }
  323. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  324. Change(from: .idle, to: .connecting),
  325. Change(from: .connecting, to: .transientFailure),
  326. ]) {
  327. // Get a HTTP/2 stream multiplexer.
  328. let readyChannelMux = manager.getHTTP2Multiplexer()
  329. self.loop.run()
  330. return readyChannelMux
  331. }
  332. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  333. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  334. self.loop.run()
  335. // Move time forwards by a second to start the next connection attempt.
  336. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  337. self.loop.advanceTime(by: .seconds(1))
  338. }
  339. // Setup the actual channel and complete the promise.
  340. let channel = EmbeddedChannel(loop: self.loop)
  341. let h2mux = HTTP2StreamMultiplexer(
  342. mode: .client,
  343. channel: channel,
  344. inboundStreamInitializer: nil
  345. )
  346. try channel.pipeline.addHandler(
  347. GRPCIdleHandler(
  348. mode: .client(manager, h2mux),
  349. logger: self.logger,
  350. idleTimeout: .minutes(5)
  351. )
  352. ).wait()
  353. channelPromise.succeed(channel)
  354. XCTAssertNoThrow(
  355. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  356. .wait()
  357. )
  358. // Write a SETTINGS frame on the root stream.
  359. try self.waitForStateChange(from: .connecting, to: .ready) {
  360. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  361. XCTAssertNoThrow(try channel.writeInbound(frame))
  362. }
  363. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  364. XCTAssertNoThrow(try readyChannelMux.wait())
  365. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  366. // Now shutdown.
  367. try self.waitForStateChange(from: .ready, to: .shutdown) {
  368. let shutdown = manager.shutdown()
  369. self.loop.run()
  370. XCTAssertNoThrow(try shutdown.wait())
  371. }
  372. }
  373. func testShutdownWhileConnecting() throws {
  374. let channelPromise = self.loop.makePromise(of: Channel.self)
  375. let manager = ConnectionManager.testingOnly(
  376. configuration: self.defaultConfiguration,
  377. logger: self.logger
  378. ) {
  379. channelPromise.futureResult
  380. }
  381. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  382. .waitForStateChange(from: .idle, to: .connecting) {
  383. let readyChannelMux = manager.getHTTP2Multiplexer()
  384. self.loop.run()
  385. return readyChannelMux
  386. }
  387. // Now shutdown.
  388. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  389. let shutdown = manager.shutdown()
  390. self.loop.run()
  391. XCTAssertNoThrow(try shutdown.wait())
  392. }
  393. // The multiplexer we were requesting should fail.
  394. XCTAssertThrowsError(try readyChannelMux.wait())
  395. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  396. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  397. let channel = try channelPromise.futureResult.wait()
  398. self.loop.run()
  399. XCTAssertNoThrow(try channel.closeFuture.wait())
  400. }
  401. func testShutdownWhileTransientFailure() throws {
  402. var configuration = self.defaultConfiguration
  403. configuration.connectionBackoff = .oneSecondFixed
  404. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  405. self.loop.makeFailedFuture(DoomedChannelError())
  406. }
  407. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  408. Change(from: .idle, to: .connecting),
  409. Change(from: .connecting, to: .transientFailure),
  410. ]) {
  411. // Get a HTTP/2 stream multiplexer.
  412. let readyChannelMux = manager.getHTTP2Multiplexer()
  413. self.loop.run()
  414. return readyChannelMux
  415. }
  416. // Now shutdown.
  417. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  418. let shutdown = manager.shutdown()
  419. self.loop.run()
  420. XCTAssertNoThrow(try shutdown.wait())
  421. }
  422. // The HTTP/2 stream mux we were requesting should fail.
  423. XCTAssertThrowsError(try readyChannelMux.wait())
  424. }
  425. func testShutdownWhileActive() throws {
  426. let channelPromise = self.loop.makePromise(of: Channel.self)
  427. let manager = ConnectionManager.testingOnly(
  428. configuration: self.defaultConfiguration,
  429. logger: self.logger
  430. ) {
  431. channelPromise.futureResult
  432. }
  433. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  434. .waitForStateChange(from: .idle, to: .connecting) {
  435. let readyChannelMux = manager.getHTTP2Multiplexer()
  436. self.loop.run()
  437. return readyChannelMux
  438. }
  439. // Prepare the channel
  440. let channel = EmbeddedChannel(loop: self.loop)
  441. let h2mux = HTTP2StreamMultiplexer(
  442. mode: .client,
  443. channel: channel,
  444. inboundStreamInitializer: nil
  445. )
  446. try channel.pipeline.addHandler(
  447. GRPCIdleHandler(
  448. mode: .client(manager, h2mux),
  449. logger: self.logger,
  450. idleTimeout: .minutes(5)
  451. )
  452. ).wait()
  453. channelPromise.succeed(channel)
  454. XCTAssertNoThrow(
  455. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  456. .wait()
  457. )
  458. // (No state change expected here: active is an internal state.)
  459. // Now shutdown.
  460. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  461. let shutdown = manager.shutdown()
  462. self.loop.run()
  463. XCTAssertNoThrow(try shutdown.wait())
  464. }
  465. // The HTTP/2 stream multiplexer we were requesting should fail.
  466. XCTAssertThrowsError(try readyChannelMux.wait())
  467. }
  468. func testShutdownWhileShutdown() throws {
  469. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  470. try self.waitForStateChange(from: .idle, to: .shutdown) {
  471. let firstShutdown = manager.shutdown()
  472. self.loop.run()
  473. XCTAssertNoThrow(try firstShutdown.wait())
  474. }
  475. let secondShutdown = manager.shutdown()
  476. self.loop.run()
  477. XCTAssertNoThrow(try secondShutdown.wait())
  478. }
  479. func testTransientFailureWhileActive() throws {
  480. var configuration = self.defaultConfiguration
  481. configuration.connectionBackoff = .oneSecondFixed
  482. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  483. let channelFutures: [EventLoopFuture<Channel>] = [
  484. channelPromise.futureResult,
  485. self.loop.makeFailedFuture(DoomedChannelError()),
  486. ]
  487. var channelFutureIterator = channelFutures.makeIterator()
  488. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  489. guard let next = channelFutureIterator.next() else {
  490. XCTFail("Too many channels requested")
  491. return self.loop.makeFailedFuture(DoomedChannelError())
  492. }
  493. return next
  494. }
  495. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  496. .waitForStateChange(from: .idle, to: .connecting) {
  497. let readyChannelMux = manager.getHTTP2Multiplexer()
  498. self.loop.run()
  499. return readyChannelMux
  500. }
  501. // Prepare the channel
  502. let firstChannel = EmbeddedChannel(loop: self.loop)
  503. let h2mux = HTTP2StreamMultiplexer(
  504. mode: .client,
  505. channel: firstChannel,
  506. inboundStreamInitializer: nil
  507. )
  508. try firstChannel.pipeline.addHandler(
  509. GRPCIdleHandler(
  510. mode: .client(manager, h2mux),
  511. logger: self.logger,
  512. idleTimeout: .minutes(5)
  513. )
  514. ).wait()
  515. channelPromise.succeed(firstChannel)
  516. XCTAssertNoThrow(
  517. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  518. .wait()
  519. )
  520. // (No state change expected here: active is an internal state.)
  521. // Close the channel (simulate e.g. TLS handshake failed)
  522. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  523. XCTAssertNoThrow(try firstChannel.close().wait())
  524. }
  525. // Start connecting again.
  526. self.waitForStateChanges([
  527. Change(from: .transientFailure, to: .connecting),
  528. Change(from: .connecting, to: .transientFailure),
  529. ]) {
  530. self.loop.advanceTime(by: .seconds(1))
  531. }
  532. // Now shutdown
  533. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  534. let shutdown = manager.shutdown()
  535. self.loop.run()
  536. XCTAssertNoThrow(try shutdown.wait())
  537. }
  538. // The channel never came up: it should be throw.
  539. XCTAssertThrowsError(try readyChannelMux.wait())
  540. }
  541. func testTransientFailureWhileReady() throws {
  542. var configuration = self.defaultConfiguration
  543. configuration.connectionBackoff = .oneSecondFixed
  544. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  545. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  546. let channelFutures: [EventLoopFuture<Channel>] = [
  547. firstChannelPromise.futureResult,
  548. secondChannelPromise.futureResult,
  549. ]
  550. var channelFutureIterator = channelFutures.makeIterator()
  551. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  552. guard let next = channelFutureIterator.next() else {
  553. XCTFail("Too many channels requested")
  554. return self.loop.makeFailedFuture(DoomedChannelError())
  555. }
  556. return next
  557. }
  558. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  559. .waitForStateChange(from: .idle, to: .connecting) {
  560. let readyChannelMux = manager.getHTTP2Multiplexer()
  561. self.loop.run()
  562. return readyChannelMux
  563. }
  564. // Prepare the first channel
  565. let firstChannel = EmbeddedChannel(loop: self.loop)
  566. let firstH2mux = HTTP2StreamMultiplexer(
  567. mode: .client,
  568. channel: firstChannel,
  569. inboundStreamInitializer: nil
  570. )
  571. try firstChannel.pipeline.addHandler(
  572. GRPCIdleHandler(
  573. mode: .client(manager, firstH2mux),
  574. logger: self.logger,
  575. idleTimeout: .minutes(5)
  576. )
  577. ).wait()
  578. firstChannelPromise.succeed(firstChannel)
  579. XCTAssertNoThrow(
  580. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  581. .wait()
  582. )
  583. // Write a SETTINGS frame on the root stream.
  584. try self.waitForStateChange(from: .connecting, to: .ready) {
  585. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  586. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  587. }
  588. // Channel should now be ready.
  589. XCTAssertNoThrow(try readyChannelMux.wait())
  590. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  591. let streamCreated = NIOHTTP2StreamCreatedEvent(
  592. streamID: 1,
  593. localInitialWindowSize: nil,
  594. remoteInitialWindowSize: nil
  595. )
  596. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  597. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  598. XCTAssertNoThrow(try firstChannel.close().wait())
  599. }
  600. // Run to start connecting again.
  601. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  602. self.loop.advanceTime(by: .seconds(1))
  603. }
  604. // Prepare the second channel
  605. let secondChannel = EmbeddedChannel(loop: self.loop)
  606. let secondH2mux = HTTP2StreamMultiplexer(
  607. mode: .client,
  608. channel: secondChannel,
  609. inboundStreamInitializer: nil
  610. )
  611. try secondChannel.pipeline.addHandler(
  612. GRPCIdleHandler(
  613. mode: .client(manager, secondH2mux),
  614. logger: self.logger,
  615. idleTimeout: .minutes(5)
  616. )
  617. ).wait()
  618. secondChannelPromise.succeed(secondChannel)
  619. XCTAssertNoThrow(
  620. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  621. .wait()
  622. )
  623. // Write a SETTINGS frame on the root stream.
  624. try self.waitForStateChange(from: .connecting, to: .ready) {
  625. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  626. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  627. }
  628. // Now shutdown
  629. try self.waitForStateChange(from: .ready, to: .shutdown) {
  630. let shutdown = manager.shutdown()
  631. self.loop.run()
  632. XCTAssertNoThrow(try shutdown.wait())
  633. }
  634. }
  635. func testGoAwayWhenReady() throws {
  636. let channelPromise = self.loop.makePromise(of: Channel.self)
  637. let manager = ConnectionManager.testingOnly(
  638. configuration: self.defaultConfiguration,
  639. logger: self.logger
  640. ) {
  641. channelPromise.futureResult
  642. }
  643. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  644. .waitForStateChange(from: .idle, to: .connecting) {
  645. let readyChannelMux = manager.getHTTP2Multiplexer()
  646. self.loop.run()
  647. return readyChannelMux
  648. }
  649. // Setup the channel.
  650. let channel = EmbeddedChannel(loop: self.loop)
  651. let h2mux = HTTP2StreamMultiplexer(
  652. mode: .client,
  653. channel: channel,
  654. inboundStreamInitializer: nil
  655. )
  656. try channel.pipeline.addHandler(
  657. GRPCIdleHandler(
  658. mode: .client(manager, h2mux),
  659. logger: self.logger,
  660. idleTimeout: .minutes(5)
  661. )
  662. ).wait()
  663. channelPromise.succeed(channel)
  664. XCTAssertNoThrow(
  665. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  666. .wait()
  667. )
  668. try self.waitForStateChange(from: .connecting, to: .ready) {
  669. // Write a SETTINGS frame on the root stream.
  670. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  671. XCTAssertNoThrow(try channel.writeInbound(frame))
  672. }
  673. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  674. XCTAssertNoThrow(try readyChannelMux.wait())
  675. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  676. // channel to close.
  677. try self.waitForStateChange(from: .ready, to: .idle) {
  678. let goAway = HTTP2Frame(
  679. streamID: .rootStream,
  680. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  681. )
  682. XCTAssertNoThrow(try channel.writeInbound(goAway))
  683. }
  684. self.loop.run()
  685. XCTAssertNoThrow(try channel.closeFuture.wait())
  686. // Now shutdown
  687. try self.waitForStateChange(from: .idle, to: .shutdown) {
  688. let shutdown = manager.shutdown()
  689. self.loop.run()
  690. XCTAssertNoThrow(try shutdown.wait())
  691. }
  692. }
  693. func testDoomedOptimisticChannelFromIdle() {
  694. var configuration = self.defaultConfiguration
  695. configuration.callStartBehavior = .fastFailure
  696. let manager = ConnectionManager.testingOnly(
  697. configuration: configuration,
  698. logger: self.logger
  699. ) {
  700. self.loop.makeFailedFuture(DoomedChannelError())
  701. }
  702. let candidate = manager.getHTTP2Multiplexer()
  703. self.loop.run()
  704. XCTAssertThrowsError(try candidate.wait())
  705. }
  706. func testDoomedOptimisticChannelFromConnecting() throws {
  707. var configuration = self.defaultConfiguration
  708. configuration.callStartBehavior = .fastFailure
  709. let promise = self.loop.makePromise(of: Channel.self)
  710. let manager = ConnectionManager.testingOnly(
  711. configuration: configuration,
  712. logger: self.logger
  713. ) {
  714. promise.futureResult
  715. }
  716. self.waitForStateChange(from: .idle, to: .connecting) {
  717. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  718. _ = manager.getHTTP2Multiplexer()
  719. self.loop.run()
  720. }
  721. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  722. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  723. self.loop.run()
  724. // Fail the promise.
  725. promise.fail(DoomedChannelError())
  726. XCTAssertThrowsError(try optimisticChannelMux.wait())
  727. }
  728. func testOptimisticChannelFromTransientFailure() throws {
  729. var configuration = self.defaultConfiguration
  730. configuration.callStartBehavior = .fastFailure
  731. configuration.connectionBackoff = ConnectionBackoff()
  732. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  733. self.loop.makeFailedFuture(DoomedChannelError())
  734. }
  735. self.waitForStateChanges([
  736. Change(from: .idle, to: .connecting),
  737. Change(from: .connecting, to: .transientFailure),
  738. ]) {
  739. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  740. _ = manager.getHTTP2Multiplexer()
  741. self.loop.run()
  742. }
  743. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  744. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  745. self.loop.run()
  746. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  747. XCTAssertTrue(error is DoomedChannelError)
  748. }
  749. }
  750. func testOptimisticChannelFromShutdown() throws {
  751. var configuration = self.defaultConfiguration
  752. configuration.callStartBehavior = .fastFailure
  753. let manager = ConnectionManager.testingOnly(
  754. configuration: configuration,
  755. logger: self.logger
  756. ) {
  757. self.loop.makeFailedFuture(DoomedChannelError())
  758. }
  759. let shutdown = manager.shutdown()
  760. self.loop.run()
  761. XCTAssertNoThrow(try shutdown.wait())
  762. // Get a channel optimistically. It'll fail, obviously.
  763. let channelMux = manager.getHTTP2Multiplexer()
  764. self.loop.run()
  765. XCTAssertThrowsError(try channelMux.wait())
  766. }
  767. func testDoubleIdle() throws {
  768. class CloseDroppingHandler: ChannelOutboundHandler {
  769. typealias OutboundIn = Any
  770. func close(context: ChannelHandlerContext, mode: CloseMode,
  771. promise: EventLoopPromise<Void>?) {
  772. promise?
  773. .fail(GRPCStatus(code: .unavailable, message: "Purposefully dropping channel close"))
  774. }
  775. }
  776. let channelPromise = self.loop.makePromise(of: Channel.self)
  777. let manager = ConnectionManager.testingOnly(
  778. configuration: self.defaultConfiguration,
  779. logger: self.logger
  780. ) {
  781. channelPromise.futureResult
  782. }
  783. // Start the connection.
  784. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  785. .waitForStateChange(from: .idle, to: .connecting) {
  786. let readyChannelMux = manager.getHTTP2Multiplexer()
  787. self.loop.run()
  788. return readyChannelMux
  789. }
  790. // Setup the real channel and activate it.
  791. let channel = EmbeddedChannel(loop: self.loop)
  792. let h2mux = HTTP2StreamMultiplexer(
  793. mode: .client,
  794. channel: channel,
  795. inboundStreamInitializer: nil
  796. )
  797. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  798. CloseDroppingHandler(),
  799. GRPCIdleHandler(
  800. mode: .client(manager, h2mux),
  801. logger: manager.logger,
  802. idleTimeout: .minutes(5)
  803. ),
  804. ]).wait())
  805. channelPromise.succeed(channel)
  806. self.loop.run()
  807. XCTAssertNoThrow(
  808. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  809. .wait()
  810. )
  811. // Write a SETTINGS frame on the root stream.
  812. try self.waitForStateChange(from: .connecting, to: .ready) {
  813. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  814. XCTAssertNoThrow(try channel.writeInbound(frame))
  815. }
  816. // The HTTP/2 stream mux should now be ready.
  817. XCTAssertNoThrow(try readyChannelMux.wait())
  818. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  819. // channel to close.
  820. try self.waitForStateChange(from: .ready, to: .idle) {
  821. let goAway = HTTP2Frame(
  822. streamID: .rootStream,
  823. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  824. )
  825. XCTAssertNoThrow(try channel.writeInbound(goAway))
  826. }
  827. // We dropped the close; now wait for the scheduled idle to fire.
  828. //
  829. // Previously doing this this would fail a precondition.
  830. self.loop.advanceTime(by: .minutes(5))
  831. }
  832. func testForceIdleAfterInactive() throws {
  833. let channelPromise = self.loop.makePromise(of: Channel.self)
  834. let manager = ConnectionManager.testingOnly(
  835. configuration: self.defaultConfiguration,
  836. logger: self.logger
  837. ) {
  838. channelPromise.futureResult
  839. }
  840. // Start the connection.
  841. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  842. from: .idle,
  843. to: .connecting
  844. ) {
  845. let readyChannelMux = manager.getHTTP2Multiplexer()
  846. self.loop.run()
  847. return readyChannelMux
  848. }
  849. // Setup the real channel and activate it.
  850. let channel = EmbeddedChannel(loop: self.loop)
  851. let h2mux = HTTP2StreamMultiplexer(
  852. mode: .client,
  853. channel: channel,
  854. inboundStreamInitializer: nil
  855. )
  856. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  857. GRPCIdleHandler(
  858. mode: .client(manager, h2mux),
  859. logger: manager.logger,
  860. idleTimeout: .minutes(5)
  861. ),
  862. ]).wait())
  863. channelPromise.succeed(channel)
  864. self.loop.run()
  865. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  866. XCTAssertNoThrow(try connect.wait())
  867. // Write a SETTINGS frame on the root stream.
  868. try self.waitForStateChange(from: .connecting, to: .ready) {
  869. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  870. XCTAssertNoThrow(try channel.writeInbound(frame))
  871. }
  872. // The channel should now be ready.
  873. XCTAssertNoThrow(try readyChannelMux.wait())
  874. // Now drop the connection.
  875. try self.waitForStateChange(from: .ready, to: .shutdown) {
  876. let shutdown = manager.shutdown()
  877. self.loop.run()
  878. XCTAssertNoThrow(try shutdown.wait())
  879. }
  880. // Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
  881. // Previously this would hit a precondition failure.
  882. channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
  883. }
  884. func testCloseWithoutActiveRPCs() throws {
  885. let channelPromise = self.loop.makePromise(of: Channel.self)
  886. let manager = ConnectionManager.testingOnly(
  887. configuration: self.defaultConfiguration,
  888. logger: self.logger
  889. ) {
  890. channelPromise.futureResult
  891. }
  892. // Start the connection.
  893. let readyChannelMux = self.waitForStateChange(
  894. from: .idle,
  895. to: .connecting
  896. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  897. let readyChannelMux = manager.getHTTP2Multiplexer()
  898. self.loop.run()
  899. return readyChannelMux
  900. }
  901. // Setup the actual channel and activate it.
  902. let channel = EmbeddedChannel(loop: self.loop)
  903. let h2mux = HTTP2StreamMultiplexer(
  904. mode: .client,
  905. channel: channel,
  906. inboundStreamInitializer: nil
  907. )
  908. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  909. GRPCIdleHandler(
  910. mode: .client(manager, h2mux),
  911. logger: manager.logger,
  912. idleTimeout: .minutes(5)
  913. ),
  914. ]).wait())
  915. channelPromise.succeed(channel)
  916. self.loop.run()
  917. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  918. XCTAssertNoThrow(try connect.wait())
  919. // "ready" the connection.
  920. try self.waitForStateChange(from: .connecting, to: .ready) {
  921. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  922. XCTAssertNoThrow(try channel.writeInbound(frame))
  923. }
  924. // The HTTP/2 stream multiplexer should now be ready.
  925. XCTAssertNoThrow(try readyChannelMux.wait())
  926. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  927. // failure state.
  928. self.waitForStateChange(from: .ready, to: .idle) {
  929. channel.pipeline.fireChannelInactive()
  930. }
  931. }
  932. }
  933. internal struct Change: Hashable, CustomStringConvertible {
  934. var from: ConnectivityState
  935. var to: ConnectivityState
  936. var description: String {
  937. return "\(self.from) → \(self.to)"
  938. }
  939. }
  940. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  941. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  942. private let semaphore = DispatchSemaphore(value: 0)
  943. private var expectation: Expectation = .noExpectation
  944. private enum Expectation {
  945. /// We have no expectation of any changes. We'll just ignore any changes.
  946. case noExpectation
  947. /// We expect one change.
  948. case one((Change) -> Void)
  949. /// We expect 'count' changes.
  950. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  951. var count: Int {
  952. switch self {
  953. case .noExpectation:
  954. return 0
  955. case .one:
  956. return 1
  957. case let .some(count, _, _):
  958. return count
  959. }
  960. }
  961. }
  962. func connectivityStateDidChange(from oldState: ConnectivityState,
  963. to newState: ConnectivityState) {
  964. self.serialQueue.async {
  965. switch self.expectation {
  966. case let .one(verify):
  967. // We don't care about future changes.
  968. self.expectation = .noExpectation
  969. // Verify and notify.
  970. verify(Change(from: oldState, to: newState))
  971. self.semaphore.signal()
  972. case .some(let count, var recorded, let verify):
  973. recorded.append(Change(from: oldState, to: newState))
  974. if recorded.count == count {
  975. // We don't care about future changes.
  976. self.expectation = .noExpectation
  977. // Verify and notify.
  978. verify(recorded)
  979. self.semaphore.signal()
  980. } else {
  981. // Still need more responses.
  982. self.expectation = .some(count: count, recorded: recorded, verify)
  983. }
  984. case .noExpectation:
  985. // Ignore any changes.
  986. ()
  987. }
  988. }
  989. }
  990. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  991. self.serialQueue.async {
  992. self.expectation = .some(count: count, recorded: [], verify)
  993. }
  994. }
  995. func expectChange(verify: @escaping (Change) -> Void) {
  996. self.serialQueue.async {
  997. self.expectation = .one(verify)
  998. }
  999. }
  1000. func waitForExpectedChanges(
  1001. timeout: DispatchTimeInterval,
  1002. file: StaticString = #file,
  1003. line: UInt = #line
  1004. ) {
  1005. let result = self.semaphore.wait(timeout: .now() + timeout)
  1006. switch result {
  1007. case .success:
  1008. ()
  1009. case .timedOut:
  1010. XCTFail(
  1011. "Timed out before verifying \(self.expectation.count) change(s)",
  1012. file: file, line: line
  1013. )
  1014. }
  1015. }
  1016. }
  1017. private extension ConnectionBackoff {
  1018. static let oneSecondFixed = ConnectionBackoff(
  1019. initialBackoff: 1.0,
  1020. maximumBackoff: 1.0,
  1021. multiplier: 1.0,
  1022. jitter: 0.0
  1023. )
  1024. }
  1025. private struct DoomedChannelError: Error {}