ConnectionManagerTests.swift 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHTTP2
  19. import XCTest
  20. class ConnectionManagerTests: GRPCTestCase {
  21. private let loop = EmbeddedEventLoop()
  22. private let recorder = RecordingConnectivityDelegate()
  23. private var defaultConfiguration: ClientConnection.Configuration {
  24. return ClientConnection.Configuration(
  25. target: .unixDomainSocket("/ignored"),
  26. eventLoopGroup: self.loop,
  27. connectivityStateDelegate: self.recorder,
  28. connectionBackoff: nil,
  29. backgroundActivityLogger: self.clientLogger
  30. )
  31. }
  32. override func tearDown() {
  33. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  34. super.tearDown()
  35. }
  36. private func waitForStateChange<Result>(
  37. from: ConnectivityState,
  38. to: ConnectivityState,
  39. timeout: DispatchTimeInterval = .seconds(1),
  40. file: StaticString = #file,
  41. line: UInt = #line,
  42. body: () throws -> Result
  43. ) rethrows -> Result {
  44. self.recorder.expectChange {
  45. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  46. }
  47. let result = try body()
  48. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  49. return result
  50. }
  51. private func waitForStateChanges<Result>(
  52. _ changes: [Change],
  53. timeout: DispatchTimeInterval = .seconds(1),
  54. file: StaticString = #file,
  55. line: UInt = #line,
  56. body: () throws -> Result
  57. ) rethrows -> Result {
  58. self.recorder.expectChanges(changes.count) {
  59. XCTAssertEqual($0, changes)
  60. }
  61. let result = try body()
  62. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  63. return result
  64. }
  65. }
  66. extension ConnectionManagerTests {
  67. func testIdleShutdown() throws {
  68. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  69. try self.waitForStateChange(from: .idle, to: .shutdown) {
  70. let shutdown = manager.shutdown()
  71. self.loop.run()
  72. XCTAssertNoThrow(try shutdown.wait())
  73. }
  74. // Getting a channel should fail.
  75. let channel = manager.getChannel()
  76. self.loop.run()
  77. XCTAssertThrowsError(try channel.wait())
  78. }
  79. func testConnectFromIdleFailsWithNoReconnect() {
  80. let channelPromise = self.loop.makePromise(of: Channel.self)
  81. let manager = ConnectionManager.testingOnly(
  82. configuration: self.defaultConfiguration,
  83. logger: self.logger
  84. ) {
  85. channelPromise.futureResult
  86. }
  87. let channel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  88. let channel = manager.getChannel()
  89. self.loop.run()
  90. return channel
  91. }
  92. self.waitForStateChange(from: .connecting, to: .shutdown) {
  93. channelPromise.fail(DoomedChannelError())
  94. }
  95. XCTAssertThrowsError(try channel.wait()) {
  96. XCTAssertTrue($0 is DoomedChannelError)
  97. }
  98. }
  99. func testConnectAndDisconnect() throws {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = ConnectionManager.testingOnly(
  102. configuration: self.defaultConfiguration,
  103. logger: self.logger
  104. ) {
  105. channelPromise.futureResult
  106. }
  107. // Start the connection.
  108. self.waitForStateChange(from: .idle, to: .connecting) {
  109. _ = manager.getChannel()
  110. self.loop.run()
  111. }
  112. // Setup the real channel and activate it.
  113. let channel = EmbeddedChannel(
  114. handler: GRPCIdleHandler(
  115. mode: .client(manager),
  116. logger: self.logger,
  117. idleTimeout: .minutes(5)
  118. ),
  119. loop: self.loop
  120. )
  121. channelPromise.succeed(channel)
  122. XCTAssertNoThrow(
  123. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  124. .wait()
  125. )
  126. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  127. try self.waitForStateChange(from: .connecting, to: .ready) {
  128. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  129. XCTAssertNoThrow(try channel.writeInbound(frame))
  130. }
  131. // Close the channel.
  132. try self.waitForStateChange(from: .ready, to: .shutdown) {
  133. // Now the channel should be available: shut it down.
  134. let shutdown = manager.shutdown()
  135. self.loop.run()
  136. XCTAssertNoThrow(try shutdown.wait())
  137. }
  138. }
  139. func testConnectAndIdle() throws {
  140. let channelPromise = self.loop.makePromise(of: Channel.self)
  141. let manager = ConnectionManager.testingOnly(
  142. configuration: self.defaultConfiguration,
  143. logger: self.logger
  144. ) {
  145. channelPromise.futureResult
  146. }
  147. // Start the connection.
  148. let readyChannel: EventLoopFuture<Channel> = self
  149. .waitForStateChange(from: .idle, to: .connecting) {
  150. let readyChannel = manager.getChannel()
  151. self.loop.run()
  152. return readyChannel
  153. }
  154. // Setup the channel.
  155. let channel = EmbeddedChannel(
  156. handler: GRPCIdleHandler(
  157. mode: .client(manager),
  158. logger: self.logger,
  159. idleTimeout: .minutes(5)
  160. ),
  161. loop: self.loop
  162. )
  163. channelPromise.succeed(channel)
  164. XCTAssertNoThrow(
  165. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  166. .wait()
  167. )
  168. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  169. try self.waitForStateChange(from: .connecting, to: .ready) {
  170. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  171. XCTAssertNoThrow(try channel.writeInbound(frame))
  172. // Wait for the channel, it _must_ be ready now.
  173. XCTAssertNoThrow(try readyChannel.wait())
  174. }
  175. // Go idle. This will shutdown the channel.
  176. try self.waitForStateChange(from: .ready, to: .idle) {
  177. self.loop.advanceTime(by: .minutes(5))
  178. XCTAssertNoThrow(try readyChannel.flatMap { $0.closeFuture }.wait())
  179. }
  180. // Now shutdown.
  181. try self.waitForStateChange(from: .idle, to: .shutdown) {
  182. let shutdown = manager.shutdown()
  183. self.loop.run()
  184. XCTAssertNoThrow(try shutdown.wait())
  185. }
  186. }
  187. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  188. let channelPromise = self.loop.makePromise(of: Channel.self)
  189. let manager = ConnectionManager.testingOnly(
  190. configuration: self.defaultConfiguration,
  191. logger: self.logger
  192. ) {
  193. channelPromise.futureResult
  194. }
  195. // Start the connection.
  196. let readyChannel: EventLoopFuture<Channel> = self
  197. .waitForStateChange(from: .idle, to: .connecting) {
  198. let readyChannel = manager.getChannel()
  199. self.loop.run()
  200. return readyChannel
  201. }
  202. // Setup the channel.
  203. let channel = EmbeddedChannel(
  204. handler: GRPCIdleHandler(
  205. mode: .client(manager),
  206. logger: self.logger,
  207. idleTimeout: .minutes(5)
  208. ),
  209. loop: self.loop
  210. )
  211. channelPromise.succeed(channel)
  212. XCTAssertNoThrow(
  213. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  214. .wait()
  215. )
  216. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  217. try self.waitForStateChange(from: .connecting, to: .ready) {
  218. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  219. XCTAssertNoThrow(try channel.writeInbound(frame))
  220. // Wait for the channel, it _must_ be ready now.
  221. XCTAssertNoThrow(try readyChannel.wait())
  222. }
  223. // "create" a stream; the details don't matter here.
  224. let streamCreated = NIOHTTP2StreamCreatedEvent(
  225. streamID: 1,
  226. localInitialWindowSize: nil,
  227. remoteInitialWindowSize: nil
  228. )
  229. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  230. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  231. self.loop.advanceTime(by: .minutes(5))
  232. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  233. self.waitForStateChange(from: .ready, to: .idle) {
  234. // Close the stream.
  235. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  236. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  237. // ... wait for the idle timeout,
  238. self.loop.advanceTime(by: .minutes(5))
  239. }
  240. // Now shutdown.
  241. try self.waitForStateChange(from: .idle, to: .shutdown) {
  242. let shutdown = manager.shutdown()
  243. self.loop.run()
  244. XCTAssertNoThrow(try shutdown.wait())
  245. }
  246. }
  247. func testConnectAndThenBecomeInactive() throws {
  248. let channelPromise = self.loop.makePromise(of: Channel.self)
  249. let manager = ConnectionManager.testingOnly(
  250. configuration: self.defaultConfiguration,
  251. logger: self.logger
  252. ) {
  253. channelPromise.futureResult
  254. }
  255. let readyChannel: EventLoopFuture<Channel> = self
  256. .waitForStateChange(from: .idle, to: .connecting) {
  257. let readyChannel = manager.getChannel()
  258. self.loop.run()
  259. return readyChannel
  260. }
  261. // Setup the channel.
  262. let channel = EmbeddedChannel(
  263. handler: GRPCIdleHandler(
  264. mode: .client(manager),
  265. logger: self.logger,
  266. idleTimeout: .minutes(5)
  267. ),
  268. loop: self.loop
  269. )
  270. channelPromise.succeed(channel)
  271. XCTAssertNoThrow(
  272. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  273. .wait()
  274. )
  275. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  276. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  277. let shutdown = manager.shutdown()
  278. self.loop.run()
  279. XCTAssertNoThrow(try shutdown.wait())
  280. }
  281. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  282. // the `readyChannel` should error.
  283. XCTAssertThrowsError(try readyChannel.wait())
  284. }
  285. func testConnectOnSecondAttempt() throws {
  286. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  287. let channelFutures: [EventLoopFuture<Channel>] = [
  288. self.loop.makeFailedFuture(DoomedChannelError()),
  289. channelPromise.futureResult,
  290. ]
  291. var channelFutureIterator = channelFutures.makeIterator()
  292. var configuration = self.defaultConfiguration
  293. configuration.connectionBackoff = .oneSecondFixed
  294. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  295. guard let next = channelFutureIterator.next() else {
  296. XCTFail("Too many channels requested")
  297. return self.loop.makeFailedFuture(DoomedChannelError())
  298. }
  299. return next
  300. }
  301. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  302. Change(from: .idle, to: .connecting),
  303. Change(from: .connecting, to: .transientFailure),
  304. ]) {
  305. // Get a channel.
  306. let readyChannel = manager.getChannel()
  307. self.loop.run()
  308. return readyChannel
  309. }
  310. // Get a channel from the manager: it is a future for the same channel.
  311. let anotherReadyChannel = manager.getChannel()
  312. self.loop.run()
  313. // Move time forwards by a second to start the next connection attempt.
  314. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  315. self.loop.advanceTime(by: .seconds(1))
  316. }
  317. // Setup the actual channel and complete the promise.
  318. let channel = EmbeddedChannel(
  319. handler: GRPCIdleHandler(
  320. mode: .client(manager),
  321. logger: self.logger,
  322. idleTimeout: .minutes(5)
  323. ),
  324. loop: self.loop
  325. )
  326. channelPromise.succeed(channel)
  327. XCTAssertNoThrow(
  328. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  329. .wait()
  330. )
  331. // Write a SETTINGS frame on the root stream.
  332. try self.waitForStateChange(from: .connecting, to: .ready) {
  333. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  334. XCTAssertNoThrow(try channel.writeInbound(frame))
  335. }
  336. // Wait for the channel, it _must_ be ready now.
  337. XCTAssertNoThrow(try readyChannel.wait())
  338. XCTAssertNoThrow(try anotherReadyChannel.wait())
  339. // Now shutdown.
  340. try self.waitForStateChange(from: .ready, to: .shutdown) {
  341. let shutdown = manager.shutdown()
  342. self.loop.run()
  343. XCTAssertNoThrow(try shutdown.wait())
  344. }
  345. }
  346. func testShutdownWhileConnecting() throws {
  347. let channelPromise = self.loop.makePromise(of: Channel.self)
  348. let manager = ConnectionManager.testingOnly(
  349. configuration: self.defaultConfiguration,
  350. logger: self.logger
  351. ) {
  352. channelPromise.futureResult
  353. }
  354. let readyChannel: EventLoopFuture<Channel> = self
  355. .waitForStateChange(from: .idle, to: .connecting) {
  356. let readyChannel = manager.getChannel()
  357. self.loop.run()
  358. return readyChannel
  359. }
  360. // Now shutdown.
  361. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  362. let shutdown = manager.shutdown()
  363. self.loop.run()
  364. XCTAssertNoThrow(try shutdown.wait())
  365. }
  366. // The channel we were requesting should fail.
  367. XCTAssertThrowsError(try readyChannel.wait())
  368. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  369. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  370. let channel = try channelPromise.futureResult.wait()
  371. self.loop.run()
  372. XCTAssertNoThrow(try channel.closeFuture.wait())
  373. }
  374. func testShutdownWhileTransientFailure() throws {
  375. var configuration = self.defaultConfiguration
  376. configuration.connectionBackoff = .oneSecondFixed
  377. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  378. self.loop.makeFailedFuture(DoomedChannelError())
  379. }
  380. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  381. Change(from: .idle, to: .connecting),
  382. Change(from: .connecting, to: .transientFailure),
  383. ]) {
  384. // Get a channel.
  385. let readyChannel = manager.getChannel()
  386. self.loop.run()
  387. return readyChannel
  388. }
  389. // Now shutdown.
  390. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  391. let shutdown = manager.shutdown()
  392. self.loop.run()
  393. XCTAssertNoThrow(try shutdown.wait())
  394. }
  395. // The channel we were requesting should fail.
  396. XCTAssertThrowsError(try readyChannel.wait())
  397. }
  398. func testShutdownWhileActive() throws {
  399. let channelPromise = self.loop.makePromise(of: Channel.self)
  400. let manager = ConnectionManager.testingOnly(
  401. configuration: self.defaultConfiguration,
  402. logger: self.logger
  403. ) {
  404. channelPromise.futureResult
  405. }
  406. let readyChannel: EventLoopFuture<Channel> = self
  407. .waitForStateChange(from: .idle, to: .connecting) {
  408. let readyChannel = manager.getChannel()
  409. self.loop.run()
  410. return readyChannel
  411. }
  412. // Prepare the channel
  413. let channel = EmbeddedChannel(
  414. handler: GRPCIdleHandler(
  415. mode: .client(manager),
  416. logger: self.logger,
  417. idleTimeout: .minutes(5)
  418. ),
  419. loop: self.loop
  420. )
  421. channelPromise.succeed(channel)
  422. XCTAssertNoThrow(
  423. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  424. .wait()
  425. )
  426. // (No state change expected here: active is an internal state.)
  427. // Now shutdown.
  428. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  429. let shutdown = manager.shutdown()
  430. self.loop.run()
  431. XCTAssertNoThrow(try shutdown.wait())
  432. }
  433. // The channel we were requesting should fail.
  434. XCTAssertThrowsError(try readyChannel.wait())
  435. }
  436. func testShutdownWhileShutdown() throws {
  437. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  438. try self.waitForStateChange(from: .idle, to: .shutdown) {
  439. let firstShutdown = manager.shutdown()
  440. self.loop.run()
  441. XCTAssertNoThrow(try firstShutdown.wait())
  442. }
  443. let secondShutdown = manager.shutdown()
  444. self.loop.run()
  445. XCTAssertNoThrow(try secondShutdown.wait())
  446. }
  447. func testTransientFailureWhileActive() throws {
  448. var configuration = self.defaultConfiguration
  449. configuration.connectionBackoff = .oneSecondFixed
  450. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  451. let channelFutures: [EventLoopFuture<Channel>] = [
  452. channelPromise.futureResult,
  453. self.loop.makeFailedFuture(DoomedChannelError()),
  454. ]
  455. var channelFutureIterator = channelFutures.makeIterator()
  456. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  457. guard let next = channelFutureIterator.next() else {
  458. XCTFail("Too many channels requested")
  459. return self.loop.makeFailedFuture(DoomedChannelError())
  460. }
  461. return next
  462. }
  463. let readyChannel: EventLoopFuture<Channel> = self
  464. .waitForStateChange(from: .idle, to: .connecting) {
  465. let readyChannel = manager.getChannel()
  466. self.loop.run()
  467. return readyChannel
  468. }
  469. // Prepare the channel
  470. let firstChannel = EmbeddedChannel(
  471. handler: GRPCIdleHandler(
  472. mode: .client(manager),
  473. logger: self.logger,
  474. idleTimeout: .minutes(5)
  475. ),
  476. loop: self.loop
  477. )
  478. channelPromise.succeed(firstChannel)
  479. XCTAssertNoThrow(
  480. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  481. .wait()
  482. )
  483. // (No state change expected here: active is an internal state.)
  484. // Close the channel (simulate e.g. TLS handshake failed)
  485. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  486. XCTAssertNoThrow(try firstChannel.close().wait())
  487. }
  488. // Start connecting again.
  489. self.waitForStateChanges([
  490. Change(from: .transientFailure, to: .connecting),
  491. Change(from: .connecting, to: .transientFailure),
  492. ]) {
  493. self.loop.advanceTime(by: .seconds(1))
  494. }
  495. // Now shutdown
  496. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  497. let shutdown = manager.shutdown()
  498. self.loop.run()
  499. XCTAssertNoThrow(try shutdown.wait())
  500. }
  501. // The channel never came up: it should be throw.
  502. XCTAssertThrowsError(try readyChannel.wait())
  503. }
  504. func testTransientFailureWhileReady() throws {
  505. var configuration = self.defaultConfiguration
  506. configuration.connectionBackoff = .oneSecondFixed
  507. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  508. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  509. let channelFutures: [EventLoopFuture<Channel>] = [
  510. firstChannelPromise.futureResult,
  511. secondChannelPromise.futureResult,
  512. ]
  513. var channelFutureIterator = channelFutures.makeIterator()
  514. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  515. guard let next = channelFutureIterator.next() else {
  516. XCTFail("Too many channels requested")
  517. return self.loop.makeFailedFuture(DoomedChannelError())
  518. }
  519. return next
  520. }
  521. let readyChannel: EventLoopFuture<Channel> = self
  522. .waitForStateChange(from: .idle, to: .connecting) {
  523. let readyChannel = manager.getChannel()
  524. self.loop.run()
  525. return readyChannel
  526. }
  527. // Prepare the first channel
  528. let firstChannel = EmbeddedChannel(
  529. handler: GRPCIdleHandler(
  530. mode: .client(manager),
  531. logger: self.logger,
  532. idleTimeout: .minutes(5)
  533. ),
  534. loop: self.loop
  535. )
  536. firstChannelPromise.succeed(firstChannel)
  537. XCTAssertNoThrow(
  538. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  539. .wait()
  540. )
  541. // Write a SETTINGS frame on the root stream.
  542. try self.waitForStateChange(from: .connecting, to: .ready) {
  543. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  544. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  545. }
  546. // Channel should now be ready.
  547. XCTAssertNoThrow(try readyChannel.wait())
  548. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  549. let streamCreated = NIOHTTP2StreamCreatedEvent(
  550. streamID: 1,
  551. localInitialWindowSize: nil,
  552. remoteInitialWindowSize: nil
  553. )
  554. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  555. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  556. XCTAssertNoThrow(try firstChannel.close().wait())
  557. }
  558. // Run to start connecting again.
  559. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  560. self.loop.advanceTime(by: .seconds(1))
  561. }
  562. // Prepare the second channel
  563. let secondChannel = EmbeddedChannel(
  564. handler: GRPCIdleHandler(
  565. mode: .client(manager),
  566. logger: self.logger,
  567. idleTimeout: .minutes(5)
  568. ),
  569. loop: self.loop
  570. )
  571. secondChannelPromise.succeed(secondChannel)
  572. XCTAssertNoThrow(
  573. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  574. .wait()
  575. )
  576. // Write a SETTINGS frame on the root stream.
  577. try self.waitForStateChange(from: .connecting, to: .ready) {
  578. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  579. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  580. }
  581. // Now shutdown
  582. try self.waitForStateChange(from: .ready, to: .shutdown) {
  583. let shutdown = manager.shutdown()
  584. self.loop.run()
  585. XCTAssertNoThrow(try shutdown.wait())
  586. }
  587. }
  588. func testGoAwayWhenReady() throws {
  589. let channelPromise = self.loop.makePromise(of: Channel.self)
  590. let manager = ConnectionManager.testingOnly(
  591. configuration: self.defaultConfiguration,
  592. logger: self.logger
  593. ) {
  594. channelPromise.futureResult
  595. }
  596. let readyChannel: EventLoopFuture<Channel> = self
  597. .waitForStateChange(from: .idle, to: .connecting) {
  598. let readyChannel = manager.getChannel()
  599. self.loop.run()
  600. return readyChannel
  601. }
  602. // Setup the channel.
  603. let channel = EmbeddedChannel(
  604. handler: GRPCIdleHandler(
  605. mode: .client(manager),
  606. logger: self.logger,
  607. idleTimeout: .minutes(5)
  608. ),
  609. loop: self.loop
  610. )
  611. channelPromise.succeed(channel)
  612. XCTAssertNoThrow(
  613. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  614. .wait()
  615. )
  616. try self.waitForStateChange(from: .connecting, to: .ready) {
  617. // Write a SETTINGS frame on the root stream.
  618. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  619. XCTAssertNoThrow(try channel.writeInbound(frame))
  620. }
  621. // Wait for the channel, it _must_ be ready now.
  622. XCTAssertNoThrow(try readyChannel.wait())
  623. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  624. // channel to close.
  625. try self.waitForStateChange(from: .ready, to: .idle) {
  626. let goAway = HTTP2Frame(
  627. streamID: .rootStream,
  628. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  629. )
  630. XCTAssertNoThrow(try channel.writeInbound(goAway))
  631. }
  632. self.loop.run()
  633. XCTAssertNoThrow(try channel.closeFuture.wait())
  634. // Now shutdown
  635. try self.waitForStateChange(from: .idle, to: .shutdown) {
  636. let shutdown = manager.shutdown()
  637. self.loop.run()
  638. XCTAssertNoThrow(try shutdown.wait())
  639. }
  640. }
  641. func testDoomedOptimisticChannelFromIdle() {
  642. let manager = ConnectionManager.testingOnly(
  643. configuration: self.defaultConfiguration,
  644. logger: self.logger
  645. ) {
  646. self.loop.makeFailedFuture(DoomedChannelError())
  647. }
  648. let candidate = manager.getOptimisticChannel()
  649. self.loop.run()
  650. XCTAssertThrowsError(try candidate.wait())
  651. }
  652. func testDoomedOptimisticChannelFromConnecting() throws {
  653. let promise = self.loop.makePromise(of: Channel.self)
  654. let manager = ConnectionManager.testingOnly(
  655. configuration: self.defaultConfiguration,
  656. logger: self.logger
  657. ) {
  658. promise.futureResult
  659. }
  660. self.waitForStateChange(from: .idle, to: .connecting) {
  661. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  662. _ = manager.getChannel()
  663. self.loop.run()
  664. }
  665. // We're connecting: get an optimistic channel.
  666. let optimisticChannel = manager.getOptimisticChannel()
  667. self.loop.run()
  668. // Fail the promise.
  669. promise.fail(DoomedChannelError())
  670. XCTAssertThrowsError(try optimisticChannel.wait())
  671. }
  672. func testOptimisticChannelFromTransientFailure() throws {
  673. var configuration = self.defaultConfiguration
  674. configuration.connectionBackoff = ConnectionBackoff()
  675. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  676. self.loop.makeFailedFuture(DoomedChannelError())
  677. }
  678. self.waitForStateChanges([
  679. Change(from: .idle, to: .connecting),
  680. Change(from: .connecting, to: .transientFailure),
  681. ]) {
  682. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  683. _ = manager.getChannel()
  684. self.loop.run()
  685. }
  686. // Now we're sitting in transient failure. Get a channel optimistically.
  687. let optimisticChannel = manager.getOptimisticChannel()
  688. self.loop.run()
  689. XCTAssertThrowsError(try optimisticChannel.wait())
  690. }
  691. func testOptimisticChannelFromShutdown() throws {
  692. let manager = ConnectionManager.testingOnly(
  693. configuration: self.defaultConfiguration,
  694. logger: self.logger
  695. ) {
  696. self.loop.makeFailedFuture(DoomedChannelError())
  697. }
  698. let shutdown = manager.shutdown()
  699. self.loop.run()
  700. XCTAssertNoThrow(try shutdown.wait())
  701. // Get a channel optimistically. It'll fail, obviously.
  702. let channel = manager.getOptimisticChannel()
  703. self.loop.run()
  704. XCTAssertThrowsError(try channel.wait())
  705. }
  706. func testDoubleIdle() throws {
  707. class CloseDroppingHandler: ChannelOutboundHandler {
  708. typealias OutboundIn = Any
  709. func close(context: ChannelHandlerContext, mode: CloseMode,
  710. promise: EventLoopPromise<Void>?) {
  711. promise?
  712. .fail(GRPCStatus(code: .unavailable, message: "Purposefully dropping channel close"))
  713. }
  714. }
  715. let channelPromise = self.loop.makePromise(of: Channel.self)
  716. let manager = ConnectionManager.testingOnly(
  717. configuration: self.defaultConfiguration,
  718. logger: self.logger
  719. ) {
  720. channelPromise.futureResult
  721. }
  722. // Start the connection.
  723. let readyChannel: EventLoopFuture<Channel> = self
  724. .waitForStateChange(from: .idle, to: .connecting) {
  725. let readyChannel = manager.getChannel()
  726. self.loop.run()
  727. return readyChannel
  728. }
  729. // Setup the real channel and activate it.
  730. let channel = EmbeddedChannel(loop: self.loop)
  731. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  732. CloseDroppingHandler(),
  733. GRPCIdleHandler(mode: .client(manager), logger: manager.logger, idleTimeout: .minutes(5)),
  734. ]).wait())
  735. channelPromise.succeed(channel)
  736. self.loop.run()
  737. XCTAssertNoThrow(
  738. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  739. .wait()
  740. )
  741. // Write a SETTINGS frame on the root stream.
  742. try self.waitForStateChange(from: .connecting, to: .ready) {
  743. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  744. XCTAssertNoThrow(try channel.writeInbound(frame))
  745. }
  746. // The channel should now be ready.
  747. XCTAssertNoThrow(try readyChannel.wait())
  748. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  749. // channel to close.
  750. try self.waitForStateChange(from: .ready, to: .idle) {
  751. let goAway = HTTP2Frame(
  752. streamID: .rootStream,
  753. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  754. )
  755. XCTAssertNoThrow(try channel.writeInbound(goAway))
  756. }
  757. // We dropped the close; now wait for the scheduled idle to fire.
  758. //
  759. // Previously doing this this would fail a precondition.
  760. self.loop.advanceTime(by: .minutes(5))
  761. }
  762. func testForceIdleAfterInactive() throws {
  763. let channelPromise = self.loop.makePromise(of: Channel.self)
  764. let manager = ConnectionManager.testingOnly(
  765. configuration: self.defaultConfiguration,
  766. logger: self.logger
  767. ) {
  768. channelPromise.futureResult
  769. }
  770. // Start the connection.
  771. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(
  772. from: .idle,
  773. to: .connecting
  774. ) {
  775. let readyChannel = manager.getChannel()
  776. self.loop.run()
  777. return readyChannel
  778. }
  779. // Setup the real channel and activate it.
  780. let channel = EmbeddedChannel(loop: self.loop)
  781. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  782. GRPCIdleHandler(mode: .client(manager), logger: manager.logger, idleTimeout: .minutes(5)),
  783. ]).wait())
  784. channelPromise.succeed(channel)
  785. self.loop.run()
  786. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  787. XCTAssertNoThrow(try connect.wait())
  788. // Write a SETTINGS frame on the root stream.
  789. try self.waitForStateChange(from: .connecting, to: .ready) {
  790. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  791. XCTAssertNoThrow(try channel.writeInbound(frame))
  792. }
  793. // The channel should now be ready.
  794. XCTAssertNoThrow(try readyChannel.wait())
  795. // Now drop the connection.
  796. try self.waitForStateChange(from: .ready, to: .shutdown) {
  797. let shutdown = manager.shutdown()
  798. self.loop.run()
  799. XCTAssertNoThrow(try shutdown.wait())
  800. }
  801. // Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
  802. // Previously this would hit a precondition failure.
  803. channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
  804. }
  805. func testCloseWithoutActiveRPCs() throws {
  806. let channelPromise = self.loop.makePromise(of: Channel.self)
  807. let manager = ConnectionManager.testingOnly(
  808. configuration: self.defaultConfiguration,
  809. logger: self.logger
  810. ) {
  811. channelPromise.futureResult
  812. }
  813. // Start the connection.
  814. let readyChannel = self.waitForStateChange(
  815. from: .idle,
  816. to: .connecting
  817. ) { () -> EventLoopFuture<Channel> in
  818. let readyChannel = manager.getChannel()
  819. self.loop.run()
  820. return readyChannel
  821. }
  822. // Setup the actual channel and activate it.
  823. let channel = EmbeddedChannel(loop: self.loop)
  824. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  825. GRPCIdleHandler(mode: .client(manager), logger: manager.logger, idleTimeout: .minutes(5)),
  826. ]).wait())
  827. channelPromise.succeed(channel)
  828. self.loop.run()
  829. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  830. XCTAssertNoThrow(try connect.wait())
  831. // "ready" the connection.
  832. try self.waitForStateChange(from: .connecting, to: .ready) {
  833. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  834. XCTAssertNoThrow(try channel.writeInbound(frame))
  835. }
  836. // The channel should now be ready.
  837. XCTAssertNoThrow(try readyChannel.wait())
  838. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  839. // failure state.
  840. self.waitForStateChange(from: .ready, to: .idle) {
  841. channel.pipeline.fireChannelInactive()
  842. }
  843. }
  844. }
  845. internal struct Change: Hashable, CustomStringConvertible {
  846. var from: ConnectivityState
  847. var to: ConnectivityState
  848. var description: String {
  849. return "\(self.from) → \(self.to)"
  850. }
  851. }
  852. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  853. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  854. private let semaphore = DispatchSemaphore(value: 0)
  855. private var expectation: Expectation = .noExpectation
  856. private enum Expectation {
  857. /// We have no expectation of any changes. We'll just ignore any changes.
  858. case noExpectation
  859. /// We expect one change.
  860. case one((Change) -> Void)
  861. /// We expect 'count' changes.
  862. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  863. var count: Int {
  864. switch self {
  865. case .noExpectation:
  866. return 0
  867. case .one:
  868. return 1
  869. case let .some(count, _, _):
  870. return count
  871. }
  872. }
  873. }
  874. func connectivityStateDidChange(from oldState: ConnectivityState,
  875. to newState: ConnectivityState) {
  876. self.serialQueue.async {
  877. switch self.expectation {
  878. case let .one(verify):
  879. // We don't care about future changes.
  880. self.expectation = .noExpectation
  881. // Verify and notify.
  882. verify(Change(from: oldState, to: newState))
  883. self.semaphore.signal()
  884. case .some(let count, var recorded, let verify):
  885. recorded.append(Change(from: oldState, to: newState))
  886. if recorded.count == count {
  887. // We don't care about future changes.
  888. self.expectation = .noExpectation
  889. // Verify and notify.
  890. verify(recorded)
  891. self.semaphore.signal()
  892. } else {
  893. // Still need more responses.
  894. self.expectation = .some(count: count, recorded: recorded, verify)
  895. }
  896. case .noExpectation:
  897. // Ignore any changes.
  898. ()
  899. }
  900. }
  901. }
  902. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  903. self.serialQueue.async {
  904. self.expectation = .some(count: count, recorded: [], verify)
  905. }
  906. }
  907. func expectChange(verify: @escaping (Change) -> Void) {
  908. self.serialQueue.async {
  909. self.expectation = .one(verify)
  910. }
  911. }
  912. func waitForExpectedChanges(
  913. timeout: DispatchTimeInterval,
  914. file: StaticString = #file,
  915. line: UInt = #line
  916. ) {
  917. let result = self.semaphore.wait(timeout: .now() + timeout)
  918. switch result {
  919. case .success:
  920. ()
  921. case .timedOut:
  922. XCTFail(
  923. "Timed out before verifying \(self.expectation.count) change(s)",
  924. file: file, line: line
  925. )
  926. }
  927. }
  928. }
  929. private extension ConnectionBackoff {
  930. static let oneSecondFixed = ConnectionBackoff(
  931. initialBackoff: 1.0,
  932. maximumBackoff: 1.0,
  933. multiplier: 1.0,
  934. jitter: 0.0
  935. )
  936. }
  937. private struct DoomedChannelError: Error {}