ConnectionManagerTests.swift 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHTTP2
  19. import XCTest
  20. class ConnectionManagerTests: GRPCTestCase {
  21. private let loop = EmbeddedEventLoop()
  22. private let recorder = RecordingConnectivityDelegate()
  23. private var defaultConfiguration: ClientConnection.Configuration {
  24. return ClientConnection.Configuration(
  25. target: .unixDomainSocket("/ignored"),
  26. eventLoopGroup: self.loop,
  27. connectivityStateDelegate: self.recorder,
  28. connectionBackoff: nil,
  29. backgroundActivityLogger: self.clientLogger
  30. )
  31. }
  32. override func tearDown() {
  33. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  34. super.tearDown()
  35. }
  36. private func waitForStateChange<Result>(
  37. from: ConnectivityState,
  38. to: ConnectivityState,
  39. timeout: DispatchTimeInterval = .seconds(1),
  40. body: () throws -> Result
  41. ) rethrows -> Result {
  42. self.recorder.expectChange {
  43. XCTAssertEqual($0, Change(from: from, to: to))
  44. }
  45. let result = try body()
  46. self.recorder.waitForExpectedChanges(timeout: timeout)
  47. return result
  48. }
  49. private func waitForStateChanges<Result>(
  50. _ changes: [Change],
  51. timeout: DispatchTimeInterval = .seconds(1),
  52. body: () throws -> Result
  53. ) rethrows -> Result {
  54. self.recorder.expectChanges(changes.count) {
  55. XCTAssertEqual($0, changes)
  56. }
  57. let result = try body()
  58. self.recorder.waitForExpectedChanges(timeout: timeout)
  59. return result
  60. }
  61. }
  62. extension ConnectionManagerTests {
  63. func testIdleShutdown() throws {
  64. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  65. try self.waitForStateChange(from: .idle, to: .shutdown) {
  66. let shutdown = manager.shutdown()
  67. self.loop.run()
  68. XCTAssertNoThrow(try shutdown.wait())
  69. }
  70. // Getting a channel should fail.
  71. let channel = manager.getChannel()
  72. self.loop.run()
  73. XCTAssertThrowsError(try channel.wait())
  74. }
  75. func testConnectFromIdleFailsWithNoReconnect() {
  76. let channelPromise = self.loop.makePromise(of: Channel.self)
  77. let manager = ConnectionManager.testingOnly(
  78. configuration: self.defaultConfiguration,
  79. logger: self.logger
  80. ) {
  81. channelPromise.futureResult
  82. }
  83. let channel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  84. let channel = manager.getChannel()
  85. self.loop.run()
  86. return channel
  87. }
  88. self.waitForStateChange(from: .connecting, to: .shutdown) {
  89. channelPromise.fail(DoomedChannelError())
  90. }
  91. XCTAssertThrowsError(try channel.wait()) {
  92. XCTAssertTrue($0 is DoomedChannelError)
  93. }
  94. }
  95. func testConnectAndDisconnect() throws {
  96. let channelPromise = self.loop.makePromise(of: Channel.self)
  97. let manager = ConnectionManager.testingOnly(
  98. configuration: self.defaultConfiguration,
  99. logger: self.logger
  100. ) {
  101. channelPromise.futureResult
  102. }
  103. // Start the connection.
  104. let readyChannel: EventLoopFuture<Channel> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let readyChannel = manager.getChannel()
  107. self.loop.run()
  108. return readyChannel
  109. }
  110. // Setup the real channel and activate it.
  111. let channel = EmbeddedChannel(
  112. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  113. loop: self.loop
  114. )
  115. channelPromise.succeed(channel)
  116. XCTAssertNoThrow(
  117. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  118. .wait()
  119. )
  120. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  121. try self.waitForStateChange(from: .connecting, to: .ready) {
  122. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  123. XCTAssertNoThrow(try channel.writeInbound(frame))
  124. }
  125. // Close the channel.
  126. try self.waitForStateChange(from: .ready, to: .shutdown) {
  127. // Now the channel should be available: shut it down,
  128. XCTAssertNoThrow(try readyChannel.flatMap { $0.close(mode: .all) }.wait())
  129. }
  130. }
  131. func testConnectAndIdle() throws {
  132. let channelPromise = self.loop.makePromise(of: Channel.self)
  133. let manager = ConnectionManager.testingOnly(
  134. configuration: self.defaultConfiguration,
  135. logger: self.logger
  136. ) {
  137. channelPromise.futureResult
  138. }
  139. // Start the connection.
  140. let readyChannel: EventLoopFuture<Channel> = self
  141. .waitForStateChange(from: .idle, to: .connecting) {
  142. let readyChannel = manager.getChannel()
  143. self.loop.run()
  144. return readyChannel
  145. }
  146. // Setup the channel.
  147. let channel = EmbeddedChannel(
  148. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  149. loop: self.loop
  150. )
  151. channelPromise.succeed(channel)
  152. XCTAssertNoThrow(
  153. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  154. .wait()
  155. )
  156. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  157. try self.waitForStateChange(from: .connecting, to: .ready) {
  158. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  159. XCTAssertNoThrow(try channel.writeInbound(frame))
  160. // Wait for the channel, it _must_ be ready now.
  161. XCTAssertNoThrow(try readyChannel.wait())
  162. }
  163. // Go idle. This will shutdown the channel.
  164. try self.waitForStateChange(from: .ready, to: .idle) {
  165. self.loop.advanceTime(by: .minutes(5))
  166. XCTAssertNoThrow(try readyChannel.flatMap { $0.closeFuture }.wait())
  167. }
  168. // Now shutdown.
  169. try self.waitForStateChange(from: .idle, to: .shutdown) {
  170. let shutdown = manager.shutdown()
  171. self.loop.run()
  172. XCTAssertNoThrow(try shutdown.wait())
  173. }
  174. }
  175. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  176. let channelPromise = self.loop.makePromise(of: Channel.self)
  177. let manager = ConnectionManager.testingOnly(
  178. configuration: self.defaultConfiguration,
  179. logger: self.logger
  180. ) {
  181. channelPromise.futureResult
  182. }
  183. // Start the connection.
  184. let readyChannel: EventLoopFuture<Channel> = self
  185. .waitForStateChange(from: .idle, to: .connecting) {
  186. let readyChannel = manager.getChannel()
  187. self.loop.run()
  188. return readyChannel
  189. }
  190. // Setup the channel.
  191. let channel = EmbeddedChannel(
  192. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  193. loop: self.loop
  194. )
  195. channelPromise.succeed(channel)
  196. XCTAssertNoThrow(
  197. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  198. .wait()
  199. )
  200. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  201. try self.waitForStateChange(from: .connecting, to: .ready) {
  202. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  203. XCTAssertNoThrow(try channel.writeInbound(frame))
  204. // Wait for the channel, it _must_ be ready now.
  205. XCTAssertNoThrow(try readyChannel.wait())
  206. }
  207. // "create" a stream; the details don't matter here.
  208. let streamCreated = NIOHTTP2StreamCreatedEvent(
  209. streamID: 1,
  210. localInitialWindowSize: nil,
  211. remoteInitialWindowSize: nil
  212. )
  213. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  214. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  215. self.loop.advanceTime(by: .minutes(5))
  216. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  217. self.waitForStateChange(from: .ready, to: .idle) {
  218. // Close the stream.
  219. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  220. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  221. // ... wait for the idle timeout,
  222. self.loop.advanceTime(by: .minutes(5))
  223. }
  224. // Now shutdown.
  225. try self.waitForStateChange(from: .idle, to: .shutdown) {
  226. let shutdown = manager.shutdown()
  227. self.loop.run()
  228. XCTAssertNoThrow(try shutdown.wait())
  229. }
  230. }
  231. func testConnectAndThenBecomeInactive() throws {
  232. let channelPromise = self.loop.makePromise(of: Channel.self)
  233. let manager = ConnectionManager.testingOnly(
  234. configuration: self.defaultConfiguration,
  235. logger: self.logger
  236. ) {
  237. channelPromise.futureResult
  238. }
  239. let readyChannel: EventLoopFuture<Channel> = self
  240. .waitForStateChange(from: .idle, to: .connecting) {
  241. let readyChannel = manager.getChannel()
  242. self.loop.run()
  243. return readyChannel
  244. }
  245. // Setup the channel.
  246. let channel = EmbeddedChannel(
  247. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  248. loop: self.loop
  249. )
  250. channelPromise.succeed(channel)
  251. XCTAssertNoThrow(
  252. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  253. .wait()
  254. )
  255. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  256. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  257. XCTAssertNoThrow(try channel.close(mode: .all).wait())
  258. }
  259. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  260. // the `readyChannel` should error.
  261. XCTAssertThrowsError(try readyChannel.wait())
  262. }
  263. func testConnectOnSecondAttempt() throws {
  264. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  265. let channelFutures: [EventLoopFuture<Channel>] = [
  266. self.loop.makeFailedFuture(DoomedChannelError()),
  267. channelPromise.futureResult,
  268. ]
  269. var channelFutureIterator = channelFutures.makeIterator()
  270. var configuration = self.defaultConfiguration
  271. configuration.connectionBackoff = .oneSecondFixed
  272. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  273. guard let next = channelFutureIterator.next() else {
  274. XCTFail("Too many channels requested")
  275. return self.loop.makeFailedFuture(DoomedChannelError())
  276. }
  277. return next
  278. }
  279. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  280. Change(from: .idle, to: .connecting),
  281. Change(from: .connecting, to: .transientFailure),
  282. ]) {
  283. // Get a channel.
  284. let readyChannel = manager.getChannel()
  285. self.loop.run()
  286. return readyChannel
  287. }
  288. // Get a channel from the manager: it is a future for the same channel.
  289. let anotherReadyChannel = manager.getChannel()
  290. self.loop.run()
  291. // Move time forwards by a second to start the next connection attempt.
  292. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  293. self.loop.advanceTime(by: .seconds(1))
  294. }
  295. // Setup the actual channel and complete the promise.
  296. let channel = EmbeddedChannel(
  297. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  298. loop: self.loop
  299. )
  300. channelPromise.succeed(channel)
  301. XCTAssertNoThrow(
  302. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  303. .wait()
  304. )
  305. // Write a SETTINGS frame on the root stream.
  306. try self.waitForStateChange(from: .connecting, to: .ready) {
  307. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  308. XCTAssertNoThrow(try channel.writeInbound(frame))
  309. }
  310. // Wait for the channel, it _must_ be ready now.
  311. XCTAssertNoThrow(try readyChannel.wait())
  312. XCTAssertNoThrow(try anotherReadyChannel.wait())
  313. // Now shutdown.
  314. try self.waitForStateChange(from: .ready, to: .shutdown) {
  315. let shutdown = manager.shutdown()
  316. self.loop.run()
  317. XCTAssertNoThrow(try shutdown.wait())
  318. }
  319. }
  320. func testShutdownWhileConnecting() throws {
  321. let channelPromise = self.loop.makePromise(of: Channel.self)
  322. let manager = ConnectionManager.testingOnly(
  323. configuration: self.defaultConfiguration,
  324. logger: self.logger
  325. ) {
  326. channelPromise.futureResult
  327. }
  328. let readyChannel: EventLoopFuture<Channel> = self
  329. .waitForStateChange(from: .idle, to: .connecting) {
  330. let readyChannel = manager.getChannel()
  331. self.loop.run()
  332. return readyChannel
  333. }
  334. // Now shutdown.
  335. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  336. let shutdown = manager.shutdown()
  337. self.loop.run()
  338. XCTAssertNoThrow(try shutdown.wait())
  339. }
  340. // The channel we were requesting should fail.
  341. XCTAssertThrowsError(try readyChannel.wait())
  342. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  343. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  344. let channel = try channelPromise.futureResult.wait()
  345. self.loop.run()
  346. XCTAssertNoThrow(try channel.closeFuture.wait())
  347. }
  348. func testShutdownWhileTransientFailure() throws {
  349. var configuration = self.defaultConfiguration
  350. configuration.connectionBackoff = .oneSecondFixed
  351. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  352. self.loop.makeFailedFuture(DoomedChannelError())
  353. }
  354. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  355. Change(from: .idle, to: .connecting),
  356. Change(from: .connecting, to: .transientFailure),
  357. ]) {
  358. // Get a channel.
  359. let readyChannel = manager.getChannel()
  360. self.loop.run()
  361. return readyChannel
  362. }
  363. // Now shutdown.
  364. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  365. let shutdown = manager.shutdown()
  366. self.loop.run()
  367. XCTAssertNoThrow(try shutdown.wait())
  368. }
  369. // The channel we were requesting should fail.
  370. XCTAssertThrowsError(try readyChannel.wait())
  371. }
  372. func testShutdownWhileActive() throws {
  373. let channelPromise = self.loop.makePromise(of: Channel.self)
  374. let manager = ConnectionManager.testingOnly(
  375. configuration: self.defaultConfiguration,
  376. logger: self.logger
  377. ) {
  378. channelPromise.futureResult
  379. }
  380. let readyChannel: EventLoopFuture<Channel> = self
  381. .waitForStateChange(from: .idle, to: .connecting) {
  382. let readyChannel = manager.getChannel()
  383. self.loop.run()
  384. return readyChannel
  385. }
  386. // Prepare the channel
  387. let channel = EmbeddedChannel(
  388. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  389. loop: self.loop
  390. )
  391. channelPromise.succeed(channel)
  392. XCTAssertNoThrow(
  393. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  394. .wait()
  395. )
  396. // (No state change expected here: active is an internal state.)
  397. // Now shutdown.
  398. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  399. let shutdown = manager.shutdown()
  400. self.loop.run()
  401. XCTAssertNoThrow(try shutdown.wait())
  402. }
  403. // The channel we were requesting should fail.
  404. XCTAssertThrowsError(try readyChannel.wait())
  405. }
  406. func testShutdownWhileShutdown() throws {
  407. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  408. try self.waitForStateChange(from: .idle, to: .shutdown) {
  409. let firstShutdown = manager.shutdown()
  410. self.loop.run()
  411. XCTAssertNoThrow(try firstShutdown.wait())
  412. }
  413. let secondShutdown = manager.shutdown()
  414. self.loop.run()
  415. XCTAssertNoThrow(try secondShutdown.wait())
  416. }
  417. func testTransientFailureWhileActive() throws {
  418. var configuration = self.defaultConfiguration
  419. configuration.connectionBackoff = .oneSecondFixed
  420. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  421. let channelFutures: [EventLoopFuture<Channel>] = [
  422. channelPromise.futureResult,
  423. self.loop.makeFailedFuture(DoomedChannelError()),
  424. ]
  425. var channelFutureIterator = channelFutures.makeIterator()
  426. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  427. guard let next = channelFutureIterator.next() else {
  428. XCTFail("Too many channels requested")
  429. return self.loop.makeFailedFuture(DoomedChannelError())
  430. }
  431. return next
  432. }
  433. let readyChannel: EventLoopFuture<Channel> = self
  434. .waitForStateChange(from: .idle, to: .connecting) {
  435. let readyChannel = manager.getChannel()
  436. self.loop.run()
  437. return readyChannel
  438. }
  439. // Prepare the channel
  440. let firstChannel = EmbeddedChannel(
  441. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  442. loop: self.loop
  443. )
  444. channelPromise.succeed(firstChannel)
  445. XCTAssertNoThrow(
  446. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  447. .wait()
  448. )
  449. // (No state change expected here: active is an internal state.)
  450. // Close the channel (simulate e.g. TLS handshake failed)
  451. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  452. XCTAssertNoThrow(try firstChannel.close().wait())
  453. }
  454. // Start connecting again.
  455. self.waitForStateChanges([
  456. Change(from: .transientFailure, to: .connecting),
  457. Change(from: .connecting, to: .transientFailure),
  458. ]) {
  459. self.loop.advanceTime(by: .seconds(1))
  460. }
  461. // Now shutdown
  462. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  463. let shutdown = manager.shutdown()
  464. self.loop.run()
  465. XCTAssertNoThrow(try shutdown.wait())
  466. }
  467. // The channel never came up: it should be throw.
  468. XCTAssertThrowsError(try readyChannel.wait())
  469. }
  470. func testTransientFailureWhileReady() throws {
  471. var configuration = self.defaultConfiguration
  472. configuration.connectionBackoff = .oneSecondFixed
  473. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  474. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  475. let channelFutures: [EventLoopFuture<Channel>] = [
  476. firstChannelPromise.futureResult,
  477. secondChannelPromise.futureResult,
  478. ]
  479. var channelFutureIterator = channelFutures.makeIterator()
  480. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  481. guard let next = channelFutureIterator.next() else {
  482. XCTFail("Too many channels requested")
  483. return self.loop.makeFailedFuture(DoomedChannelError())
  484. }
  485. return next
  486. }
  487. let readyChannel: EventLoopFuture<Channel> = self
  488. .waitForStateChange(from: .idle, to: .connecting) {
  489. let readyChannel = manager.getChannel()
  490. self.loop.run()
  491. return readyChannel
  492. }
  493. // Prepare the first channel
  494. let firstChannel = EmbeddedChannel(
  495. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  496. loop: self.loop
  497. )
  498. firstChannelPromise.succeed(firstChannel)
  499. XCTAssertNoThrow(
  500. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  501. .wait()
  502. )
  503. // Write a SETTINGS frame on the root stream.
  504. try self.waitForStateChange(from: .connecting, to: .ready) {
  505. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  506. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  507. }
  508. // Channel should now be ready.
  509. XCTAssertNoThrow(try readyChannel.wait())
  510. // Kill the first channel.
  511. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  512. XCTAssertNoThrow(try firstChannel.close().wait())
  513. }
  514. // Run to start connecting again.
  515. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  516. self.loop.advanceTime(by: .seconds(1))
  517. }
  518. // Prepare the second channel
  519. let secondChannel = EmbeddedChannel(
  520. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  521. loop: self.loop
  522. )
  523. secondChannelPromise.succeed(secondChannel)
  524. XCTAssertNoThrow(
  525. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  526. .wait()
  527. )
  528. // Write a SETTINGS frame on the root stream.
  529. try self.waitForStateChange(from: .connecting, to: .ready) {
  530. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  531. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  532. }
  533. // Now shutdown
  534. try self.waitForStateChange(from: .ready, to: .shutdown) {
  535. let shutdown = manager.shutdown()
  536. self.loop.run()
  537. XCTAssertNoThrow(try shutdown.wait())
  538. }
  539. }
  540. func testGoAwayWhenReady() throws {
  541. let channelPromise = self.loop.makePromise(of: Channel.self)
  542. let manager = ConnectionManager.testingOnly(
  543. configuration: self.defaultConfiguration,
  544. logger: self.logger
  545. ) {
  546. channelPromise.futureResult
  547. }
  548. let readyChannel: EventLoopFuture<Channel> = self
  549. .waitForStateChange(from: .idle, to: .connecting) {
  550. let readyChannel = manager.getChannel()
  551. self.loop.run()
  552. return readyChannel
  553. }
  554. // Setup the channel.
  555. let channel = EmbeddedChannel(
  556. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  557. loop: self.loop
  558. )
  559. channelPromise.succeed(channel)
  560. XCTAssertNoThrow(
  561. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  562. .wait()
  563. )
  564. try self.waitForStateChange(from: .connecting, to: .ready) {
  565. // Write a SETTINGS frame on the root stream.
  566. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  567. XCTAssertNoThrow(try channel.writeInbound(frame))
  568. }
  569. // Wait for the channel, it _must_ be ready now.
  570. XCTAssertNoThrow(try readyChannel.wait())
  571. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  572. // channel to close.
  573. try self.waitForStateChange(from: .ready, to: .idle) {
  574. let goAway = HTTP2Frame(
  575. streamID: .rootStream,
  576. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  577. )
  578. XCTAssertNoThrow(try channel.writeInbound(goAway))
  579. }
  580. self.loop.run()
  581. XCTAssertNoThrow(try channel.closeFuture.wait())
  582. // Now shutdown
  583. try self.waitForStateChange(from: .idle, to: .shutdown) {
  584. let shutdown = manager.shutdown()
  585. self.loop.run()
  586. XCTAssertNoThrow(try shutdown.wait())
  587. }
  588. }
  589. func testDoomedOptimisticChannelFromIdle() {
  590. let manager = ConnectionManager.testingOnly(
  591. configuration: self.defaultConfiguration,
  592. logger: self.logger
  593. ) {
  594. self.loop.makeFailedFuture(DoomedChannelError())
  595. }
  596. let candidate = manager.getOptimisticChannel()
  597. self.loop.run()
  598. XCTAssertThrowsError(try candidate.wait())
  599. }
  600. func testDoomedOptimisticChannelFromConnecting() throws {
  601. let promise = self.loop.makePromise(of: Channel.self)
  602. let manager = ConnectionManager.testingOnly(
  603. configuration: self.defaultConfiguration,
  604. logger: self.logger
  605. ) {
  606. promise.futureResult
  607. }
  608. self.waitForStateChange(from: .idle, to: .connecting) {
  609. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  610. _ = manager.getChannel()
  611. self.loop.run()
  612. }
  613. // We're connecting: get an optimistic channel.
  614. let optimisticChannel = manager.getOptimisticChannel()
  615. self.loop.run()
  616. // Fail the promise.
  617. promise.fail(DoomedChannelError())
  618. XCTAssertThrowsError(try optimisticChannel.wait())
  619. }
  620. func testOptimisticChannelFromTransientFailure() throws {
  621. var configuration = self.defaultConfiguration
  622. configuration.connectionBackoff = ConnectionBackoff()
  623. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  624. self.loop.makeFailedFuture(DoomedChannelError())
  625. }
  626. self.waitForStateChanges([
  627. Change(from: .idle, to: .connecting),
  628. Change(from: .connecting, to: .transientFailure),
  629. ]) {
  630. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  631. _ = manager.getChannel()
  632. self.loop.run()
  633. }
  634. // Now we're sitting in transient failure. Get a channel optimistically.
  635. let optimisticChannel = manager.getOptimisticChannel()
  636. self.loop.run()
  637. XCTAssertThrowsError(try optimisticChannel.wait())
  638. }
  639. func testOptimisticChannelFromShutdown() throws {
  640. let manager = ConnectionManager.testingOnly(
  641. configuration: self.defaultConfiguration,
  642. logger: self.logger
  643. ) {
  644. self.loop.makeFailedFuture(DoomedChannelError())
  645. }
  646. let shutdown = manager.shutdown()
  647. self.loop.run()
  648. XCTAssertNoThrow(try shutdown.wait())
  649. // Get a channel optimistically. It'll fail, obviously.
  650. let channel = manager.getOptimisticChannel()
  651. self.loop.run()
  652. XCTAssertThrowsError(try channel.wait())
  653. }
  654. func testDoubleIdle() throws {
  655. class CloseDroppingHandler: ChannelOutboundHandler {
  656. typealias OutboundIn = Any
  657. func close(context: ChannelHandlerContext, mode: CloseMode,
  658. promise: EventLoopPromise<Void>?) {
  659. promise?
  660. .fail(GRPCStatus(code: .unavailable, message: "Purposefully dropping channel close"))
  661. }
  662. }
  663. let channelPromise = self.loop.makePromise(of: Channel.self)
  664. let manager = ConnectionManager.testingOnly(
  665. configuration: self.defaultConfiguration,
  666. logger: self.logger
  667. ) {
  668. channelPromise.futureResult
  669. }
  670. // Start the connection.
  671. let readyChannel: EventLoopFuture<Channel> = self
  672. .waitForStateChange(from: .idle, to: .connecting) {
  673. let readyChannel = manager.getChannel()
  674. self.loop.run()
  675. return readyChannel
  676. }
  677. // Setup the real channel and activate it.
  678. let channel = EmbeddedChannel(loop: self.loop)
  679. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  680. CloseDroppingHandler(),
  681. GRPCIdleHandler(mode: .client(manager), logger: manager.logger),
  682. ]).wait())
  683. channelPromise.succeed(channel)
  684. self.loop.run()
  685. XCTAssertNoThrow(
  686. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  687. .wait()
  688. )
  689. // Write a SETTINGS frame on the root stream.
  690. try self.waitForStateChange(from: .connecting, to: .ready) {
  691. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  692. XCTAssertNoThrow(try channel.writeInbound(frame))
  693. }
  694. // The channel should now be ready.
  695. XCTAssertNoThrow(try readyChannel.wait())
  696. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  697. // channel to close.
  698. try self.waitForStateChange(from: .ready, to: .idle) {
  699. let goAway = HTTP2Frame(
  700. streamID: .rootStream,
  701. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  702. )
  703. XCTAssertNoThrow(try channel.writeInbound(goAway))
  704. }
  705. // We dropped the close; now wait for the scheduled idle to fire.
  706. //
  707. // Previously doing this this would fail a precondition.
  708. self.loop.advanceTime(by: .minutes(5))
  709. }
  710. }
  711. internal struct Change: Hashable, CustomStringConvertible {
  712. var from: ConnectivityState
  713. var to: ConnectivityState
  714. var description: String {
  715. return "\(self.from) → \(self.to)"
  716. }
  717. }
  718. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  719. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  720. private let semaphore = DispatchSemaphore(value: 0)
  721. private var expectation: Expectation = .noExpectation
  722. private enum Expectation {
  723. /// We have no expectation of any changes. We'll just ignore any changes.
  724. case noExpectation
  725. /// We expect one change.
  726. case one((Change) -> Void)
  727. /// We expect 'count' changes.
  728. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  729. var count: Int {
  730. switch self {
  731. case .noExpectation:
  732. return 0
  733. case .one:
  734. return 1
  735. case let .some(count, _, _):
  736. return count
  737. }
  738. }
  739. }
  740. func connectivityStateDidChange(from oldState: ConnectivityState,
  741. to newState: ConnectivityState) {
  742. self.serialQueue.async {
  743. switch self.expectation {
  744. case let .one(verify):
  745. // We don't care about future changes.
  746. self.expectation = .noExpectation
  747. // Verify and notify.
  748. verify(Change(from: oldState, to: newState))
  749. self.semaphore.signal()
  750. case .some(let count, var recorded, let verify):
  751. recorded.append(Change(from: oldState, to: newState))
  752. if recorded.count == count {
  753. // We don't care about future changes.
  754. self.expectation = .noExpectation
  755. // Verify and notify.
  756. verify(recorded)
  757. self.semaphore.signal()
  758. } else {
  759. // Still need more responses.
  760. self.expectation = .some(count: count, recorded: recorded, verify)
  761. }
  762. case .noExpectation:
  763. // Ignore any changes.
  764. ()
  765. }
  766. }
  767. }
  768. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  769. self.serialQueue.async {
  770. self.expectation = .some(count: count, recorded: [], verify)
  771. }
  772. }
  773. func expectChange(verify: @escaping (Change) -> Void) {
  774. self.serialQueue.async {
  775. self.expectation = .one(verify)
  776. }
  777. }
  778. func waitForExpectedChanges(timeout: DispatchTimeInterval) {
  779. let result = self.semaphore.wait(timeout: .now() + timeout)
  780. switch result {
  781. case .success:
  782. ()
  783. case .timedOut:
  784. XCTFail("Timed out before verifying \(self.expectation.count) change(s)")
  785. }
  786. }
  787. }
  788. private extension ConnectionBackoff {
  789. static let oneSecondFixed = ConnectionBackoff(
  790. initialBackoff: 1.0,
  791. maximumBackoff: 1.0,
  792. multiplier: 1.0,
  793. jitter: 0.0
  794. )
  795. }
  796. private struct DoomedChannelError: Error {}