ConnectionManagerTests.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHTTP2
  19. import XCTest
  20. class ConnectionManagerTests: GRPCTestCase {
  21. private let loop = EmbeddedEventLoop()
  22. private let recorder = RecordingConnectivityDelegate()
  23. private var defaultConfiguration: ClientConnection.Configuration {
  24. return ClientConnection.Configuration(
  25. target: .unixDomainSocket("/ignored"),
  26. eventLoopGroup: self.loop,
  27. connectivityStateDelegate: self.recorder,
  28. connectionBackoff: nil,
  29. backgroundActivityLogger: self.clientLogger
  30. )
  31. }
  32. override func tearDown() {
  33. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  34. super.tearDown()
  35. }
  36. private func waitForStateChange<Result>(
  37. from: ConnectivityState,
  38. to: ConnectivityState,
  39. timeout: DispatchTimeInterval = .seconds(1),
  40. file: StaticString = #file,
  41. line: UInt = #line,
  42. body: () throws -> Result
  43. ) rethrows -> Result {
  44. self.recorder.expectChange {
  45. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  46. }
  47. let result = try body()
  48. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  49. return result
  50. }
  51. private func waitForStateChanges<Result>(
  52. _ changes: [Change],
  53. timeout: DispatchTimeInterval = .seconds(1),
  54. file: StaticString = #file,
  55. line: UInt = #line,
  56. body: () throws -> Result
  57. ) rethrows -> Result {
  58. self.recorder.expectChanges(changes.count) {
  59. XCTAssertEqual($0, changes)
  60. }
  61. let result = try body()
  62. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  63. return result
  64. }
  65. }
  66. extension ConnectionManagerTests {
  67. func testIdleShutdown() throws {
  68. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  69. try self.waitForStateChange(from: .idle, to: .shutdown) {
  70. let shutdown = manager.shutdown()
  71. self.loop.run()
  72. XCTAssertNoThrow(try shutdown.wait())
  73. }
  74. // Getting a channel should fail.
  75. let channel = manager.getChannel()
  76. self.loop.run()
  77. XCTAssertThrowsError(try channel.wait())
  78. }
  79. func testConnectFromIdleFailsWithNoReconnect() {
  80. let channelPromise = self.loop.makePromise(of: Channel.self)
  81. let manager = ConnectionManager.testingOnly(
  82. configuration: self.defaultConfiguration,
  83. logger: self.logger
  84. ) {
  85. channelPromise.futureResult
  86. }
  87. let channel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  88. let channel = manager.getChannel()
  89. self.loop.run()
  90. return channel
  91. }
  92. self.waitForStateChange(from: .connecting, to: .shutdown) {
  93. channelPromise.fail(DoomedChannelError())
  94. }
  95. XCTAssertThrowsError(try channel.wait()) {
  96. XCTAssertTrue($0 is DoomedChannelError)
  97. }
  98. }
  99. func testConnectAndDisconnect() throws {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = ConnectionManager.testingOnly(
  102. configuration: self.defaultConfiguration,
  103. logger: self.logger
  104. ) {
  105. channelPromise.futureResult
  106. }
  107. // Start the connection.
  108. self.waitForStateChange(from: .idle, to: .connecting) {
  109. _ = manager.getChannel()
  110. self.loop.run()
  111. }
  112. // Setup the real channel and activate it.
  113. let channel = EmbeddedChannel(
  114. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  115. loop: self.loop
  116. )
  117. channelPromise.succeed(channel)
  118. XCTAssertNoThrow(
  119. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  120. .wait()
  121. )
  122. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  123. try self.waitForStateChange(from: .connecting, to: .ready) {
  124. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  125. XCTAssertNoThrow(try channel.writeInbound(frame))
  126. }
  127. // Close the channel.
  128. try self.waitForStateChange(from: .ready, to: .shutdown) {
  129. // Now the channel should be available: shut it down.
  130. let shutdown = manager.shutdown()
  131. self.loop.run()
  132. XCTAssertNoThrow(try shutdown.wait())
  133. }
  134. }
  135. func testConnectAndIdle() throws {
  136. let channelPromise = self.loop.makePromise(of: Channel.self)
  137. let manager = ConnectionManager.testingOnly(
  138. configuration: self.defaultConfiguration,
  139. logger: self.logger
  140. ) {
  141. channelPromise.futureResult
  142. }
  143. // Start the connection.
  144. let readyChannel: EventLoopFuture<Channel> = self
  145. .waitForStateChange(from: .idle, to: .connecting) {
  146. let readyChannel = manager.getChannel()
  147. self.loop.run()
  148. return readyChannel
  149. }
  150. // Setup the channel.
  151. let channel = EmbeddedChannel(
  152. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  153. loop: self.loop
  154. )
  155. channelPromise.succeed(channel)
  156. XCTAssertNoThrow(
  157. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  158. .wait()
  159. )
  160. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  161. try self.waitForStateChange(from: .connecting, to: .ready) {
  162. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  163. XCTAssertNoThrow(try channel.writeInbound(frame))
  164. // Wait for the channel, it _must_ be ready now.
  165. XCTAssertNoThrow(try readyChannel.wait())
  166. }
  167. // Go idle. This will shutdown the channel.
  168. try self.waitForStateChange(from: .ready, to: .idle) {
  169. self.loop.advanceTime(by: .minutes(5))
  170. XCTAssertNoThrow(try readyChannel.flatMap { $0.closeFuture }.wait())
  171. }
  172. // Now shutdown.
  173. try self.waitForStateChange(from: .idle, to: .shutdown) {
  174. let shutdown = manager.shutdown()
  175. self.loop.run()
  176. XCTAssertNoThrow(try shutdown.wait())
  177. }
  178. }
  179. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  180. let channelPromise = self.loop.makePromise(of: Channel.self)
  181. let manager = ConnectionManager.testingOnly(
  182. configuration: self.defaultConfiguration,
  183. logger: self.logger
  184. ) {
  185. channelPromise.futureResult
  186. }
  187. // Start the connection.
  188. let readyChannel: EventLoopFuture<Channel> = self
  189. .waitForStateChange(from: .idle, to: .connecting) {
  190. let readyChannel = manager.getChannel()
  191. self.loop.run()
  192. return readyChannel
  193. }
  194. // Setup the channel.
  195. let channel = EmbeddedChannel(
  196. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  197. loop: self.loop
  198. )
  199. channelPromise.succeed(channel)
  200. XCTAssertNoThrow(
  201. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  202. .wait()
  203. )
  204. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  205. try self.waitForStateChange(from: .connecting, to: .ready) {
  206. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  207. XCTAssertNoThrow(try channel.writeInbound(frame))
  208. // Wait for the channel, it _must_ be ready now.
  209. XCTAssertNoThrow(try readyChannel.wait())
  210. }
  211. // "create" a stream; the details don't matter here.
  212. let streamCreated = NIOHTTP2StreamCreatedEvent(
  213. streamID: 1,
  214. localInitialWindowSize: nil,
  215. remoteInitialWindowSize: nil
  216. )
  217. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  218. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  219. self.loop.advanceTime(by: .minutes(5))
  220. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  221. self.waitForStateChange(from: .ready, to: .idle) {
  222. // Close the stream.
  223. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  224. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  225. // ... wait for the idle timeout,
  226. self.loop.advanceTime(by: .minutes(5))
  227. }
  228. // Now shutdown.
  229. try self.waitForStateChange(from: .idle, to: .shutdown) {
  230. let shutdown = manager.shutdown()
  231. self.loop.run()
  232. XCTAssertNoThrow(try shutdown.wait())
  233. }
  234. }
  235. func testConnectAndThenBecomeInactive() throws {
  236. let channelPromise = self.loop.makePromise(of: Channel.self)
  237. let manager = ConnectionManager.testingOnly(
  238. configuration: self.defaultConfiguration,
  239. logger: self.logger
  240. ) {
  241. channelPromise.futureResult
  242. }
  243. let readyChannel: EventLoopFuture<Channel> = self
  244. .waitForStateChange(from: .idle, to: .connecting) {
  245. let readyChannel = manager.getChannel()
  246. self.loop.run()
  247. return readyChannel
  248. }
  249. // Setup the channel.
  250. let channel = EmbeddedChannel(
  251. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  252. loop: self.loop
  253. )
  254. channelPromise.succeed(channel)
  255. XCTAssertNoThrow(
  256. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  257. .wait()
  258. )
  259. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  260. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  261. let shutdown = manager.shutdown()
  262. self.loop.run()
  263. XCTAssertNoThrow(try shutdown.wait())
  264. }
  265. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  266. // the `readyChannel` should error.
  267. XCTAssertThrowsError(try readyChannel.wait())
  268. }
  269. func testConnectOnSecondAttempt() throws {
  270. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  271. let channelFutures: [EventLoopFuture<Channel>] = [
  272. self.loop.makeFailedFuture(DoomedChannelError()),
  273. channelPromise.futureResult,
  274. ]
  275. var channelFutureIterator = channelFutures.makeIterator()
  276. var configuration = self.defaultConfiguration
  277. configuration.connectionBackoff = .oneSecondFixed
  278. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  279. guard let next = channelFutureIterator.next() else {
  280. XCTFail("Too many channels requested")
  281. return self.loop.makeFailedFuture(DoomedChannelError())
  282. }
  283. return next
  284. }
  285. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  286. Change(from: .idle, to: .connecting),
  287. Change(from: .connecting, to: .transientFailure),
  288. ]) {
  289. // Get a channel.
  290. let readyChannel = manager.getChannel()
  291. self.loop.run()
  292. return readyChannel
  293. }
  294. // Get a channel from the manager: it is a future for the same channel.
  295. let anotherReadyChannel = manager.getChannel()
  296. self.loop.run()
  297. // Move time forwards by a second to start the next connection attempt.
  298. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  299. self.loop.advanceTime(by: .seconds(1))
  300. }
  301. // Setup the actual channel and complete the promise.
  302. let channel = EmbeddedChannel(
  303. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  304. loop: self.loop
  305. )
  306. channelPromise.succeed(channel)
  307. XCTAssertNoThrow(
  308. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  309. .wait()
  310. )
  311. // Write a SETTINGS frame on the root stream.
  312. try self.waitForStateChange(from: .connecting, to: .ready) {
  313. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  314. XCTAssertNoThrow(try channel.writeInbound(frame))
  315. }
  316. // Wait for the channel, it _must_ be ready now.
  317. XCTAssertNoThrow(try readyChannel.wait())
  318. XCTAssertNoThrow(try anotherReadyChannel.wait())
  319. // Now shutdown.
  320. try self.waitForStateChange(from: .ready, to: .shutdown) {
  321. let shutdown = manager.shutdown()
  322. self.loop.run()
  323. XCTAssertNoThrow(try shutdown.wait())
  324. }
  325. }
  326. func testShutdownWhileConnecting() throws {
  327. let channelPromise = self.loop.makePromise(of: Channel.self)
  328. let manager = ConnectionManager.testingOnly(
  329. configuration: self.defaultConfiguration,
  330. logger: self.logger
  331. ) {
  332. channelPromise.futureResult
  333. }
  334. let readyChannel: EventLoopFuture<Channel> = self
  335. .waitForStateChange(from: .idle, to: .connecting) {
  336. let readyChannel = manager.getChannel()
  337. self.loop.run()
  338. return readyChannel
  339. }
  340. // Now shutdown.
  341. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  342. let shutdown = manager.shutdown()
  343. self.loop.run()
  344. XCTAssertNoThrow(try shutdown.wait())
  345. }
  346. // The channel we were requesting should fail.
  347. XCTAssertThrowsError(try readyChannel.wait())
  348. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  349. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  350. let channel = try channelPromise.futureResult.wait()
  351. self.loop.run()
  352. XCTAssertNoThrow(try channel.closeFuture.wait())
  353. }
  354. func testShutdownWhileTransientFailure() throws {
  355. var configuration = self.defaultConfiguration
  356. configuration.connectionBackoff = .oneSecondFixed
  357. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  358. self.loop.makeFailedFuture(DoomedChannelError())
  359. }
  360. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  361. Change(from: .idle, to: .connecting),
  362. Change(from: .connecting, to: .transientFailure),
  363. ]) {
  364. // Get a channel.
  365. let readyChannel = manager.getChannel()
  366. self.loop.run()
  367. return readyChannel
  368. }
  369. // Now shutdown.
  370. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  371. let shutdown = manager.shutdown()
  372. self.loop.run()
  373. XCTAssertNoThrow(try shutdown.wait())
  374. }
  375. // The channel we were requesting should fail.
  376. XCTAssertThrowsError(try readyChannel.wait())
  377. }
  378. func testShutdownWhileActive() throws {
  379. let channelPromise = self.loop.makePromise(of: Channel.self)
  380. let manager = ConnectionManager.testingOnly(
  381. configuration: self.defaultConfiguration,
  382. logger: self.logger
  383. ) {
  384. channelPromise.futureResult
  385. }
  386. let readyChannel: EventLoopFuture<Channel> = self
  387. .waitForStateChange(from: .idle, to: .connecting) {
  388. let readyChannel = manager.getChannel()
  389. self.loop.run()
  390. return readyChannel
  391. }
  392. // Prepare the channel
  393. let channel = EmbeddedChannel(
  394. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  395. loop: self.loop
  396. )
  397. channelPromise.succeed(channel)
  398. XCTAssertNoThrow(
  399. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  400. .wait()
  401. )
  402. // (No state change expected here: active is an internal state.)
  403. // Now shutdown.
  404. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  405. let shutdown = manager.shutdown()
  406. self.loop.run()
  407. XCTAssertNoThrow(try shutdown.wait())
  408. }
  409. // The channel we were requesting should fail.
  410. XCTAssertThrowsError(try readyChannel.wait())
  411. }
  412. func testShutdownWhileShutdown() throws {
  413. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  414. try self.waitForStateChange(from: .idle, to: .shutdown) {
  415. let firstShutdown = manager.shutdown()
  416. self.loop.run()
  417. XCTAssertNoThrow(try firstShutdown.wait())
  418. }
  419. let secondShutdown = manager.shutdown()
  420. self.loop.run()
  421. XCTAssertNoThrow(try secondShutdown.wait())
  422. }
  423. func testTransientFailureWhileActive() throws {
  424. var configuration = self.defaultConfiguration
  425. configuration.connectionBackoff = .oneSecondFixed
  426. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  427. let channelFutures: [EventLoopFuture<Channel>] = [
  428. channelPromise.futureResult,
  429. self.loop.makeFailedFuture(DoomedChannelError()),
  430. ]
  431. var channelFutureIterator = channelFutures.makeIterator()
  432. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  433. guard let next = channelFutureIterator.next() else {
  434. XCTFail("Too many channels requested")
  435. return self.loop.makeFailedFuture(DoomedChannelError())
  436. }
  437. return next
  438. }
  439. let readyChannel: EventLoopFuture<Channel> = self
  440. .waitForStateChange(from: .idle, to: .connecting) {
  441. let readyChannel = manager.getChannel()
  442. self.loop.run()
  443. return readyChannel
  444. }
  445. // Prepare the channel
  446. let firstChannel = EmbeddedChannel(
  447. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  448. loop: self.loop
  449. )
  450. channelPromise.succeed(firstChannel)
  451. XCTAssertNoThrow(
  452. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  453. .wait()
  454. )
  455. // (No state change expected here: active is an internal state.)
  456. // Close the channel (simulate e.g. TLS handshake failed)
  457. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  458. XCTAssertNoThrow(try firstChannel.close().wait())
  459. }
  460. // Start connecting again.
  461. self.waitForStateChanges([
  462. Change(from: .transientFailure, to: .connecting),
  463. Change(from: .connecting, to: .transientFailure),
  464. ]) {
  465. self.loop.advanceTime(by: .seconds(1))
  466. }
  467. // Now shutdown
  468. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  469. let shutdown = manager.shutdown()
  470. self.loop.run()
  471. XCTAssertNoThrow(try shutdown.wait())
  472. }
  473. // The channel never came up: it should be throw.
  474. XCTAssertThrowsError(try readyChannel.wait())
  475. }
  476. func testTransientFailureWhileReady() throws {
  477. var configuration = self.defaultConfiguration
  478. configuration.connectionBackoff = .oneSecondFixed
  479. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  480. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  481. let channelFutures: [EventLoopFuture<Channel>] = [
  482. firstChannelPromise.futureResult,
  483. secondChannelPromise.futureResult,
  484. ]
  485. var channelFutureIterator = channelFutures.makeIterator()
  486. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  487. guard let next = channelFutureIterator.next() else {
  488. XCTFail("Too many channels requested")
  489. return self.loop.makeFailedFuture(DoomedChannelError())
  490. }
  491. return next
  492. }
  493. let readyChannel: EventLoopFuture<Channel> = self
  494. .waitForStateChange(from: .idle, to: .connecting) {
  495. let readyChannel = manager.getChannel()
  496. self.loop.run()
  497. return readyChannel
  498. }
  499. // Prepare the first channel
  500. let firstChannel = EmbeddedChannel(
  501. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  502. loop: self.loop
  503. )
  504. firstChannelPromise.succeed(firstChannel)
  505. XCTAssertNoThrow(
  506. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  507. .wait()
  508. )
  509. // Write a SETTINGS frame on the root stream.
  510. try self.waitForStateChange(from: .connecting, to: .ready) {
  511. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  512. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  513. }
  514. // Channel should now be ready.
  515. XCTAssertNoThrow(try readyChannel.wait())
  516. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  517. let streamCreated = NIOHTTP2StreamCreatedEvent(
  518. streamID: 1,
  519. localInitialWindowSize: nil,
  520. remoteInitialWindowSize: nil
  521. )
  522. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  523. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  524. XCTAssertNoThrow(try firstChannel.close().wait())
  525. }
  526. // Run to start connecting again.
  527. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  528. self.loop.advanceTime(by: .seconds(1))
  529. }
  530. // Prepare the second channel
  531. let secondChannel = EmbeddedChannel(
  532. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  533. loop: self.loop
  534. )
  535. secondChannelPromise.succeed(secondChannel)
  536. XCTAssertNoThrow(
  537. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  538. .wait()
  539. )
  540. // Write a SETTINGS frame on the root stream.
  541. try self.waitForStateChange(from: .connecting, to: .ready) {
  542. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  543. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  544. }
  545. // Now shutdown
  546. try self.waitForStateChange(from: .ready, to: .shutdown) {
  547. let shutdown = manager.shutdown()
  548. self.loop.run()
  549. XCTAssertNoThrow(try shutdown.wait())
  550. }
  551. }
  552. func testGoAwayWhenReady() throws {
  553. let channelPromise = self.loop.makePromise(of: Channel.self)
  554. let manager = ConnectionManager.testingOnly(
  555. configuration: self.defaultConfiguration,
  556. logger: self.logger
  557. ) {
  558. channelPromise.futureResult
  559. }
  560. let readyChannel: EventLoopFuture<Channel> = self
  561. .waitForStateChange(from: .idle, to: .connecting) {
  562. let readyChannel = manager.getChannel()
  563. self.loop.run()
  564. return readyChannel
  565. }
  566. // Setup the channel.
  567. let channel = EmbeddedChannel(
  568. handler: GRPCIdleHandler(mode: .client(manager), logger: self.logger),
  569. loop: self.loop
  570. )
  571. channelPromise.succeed(channel)
  572. XCTAssertNoThrow(
  573. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  574. .wait()
  575. )
  576. try self.waitForStateChange(from: .connecting, to: .ready) {
  577. // Write a SETTINGS frame on the root stream.
  578. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  579. XCTAssertNoThrow(try channel.writeInbound(frame))
  580. }
  581. // Wait for the channel, it _must_ be ready now.
  582. XCTAssertNoThrow(try readyChannel.wait())
  583. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  584. // channel to close.
  585. try self.waitForStateChange(from: .ready, to: .idle) {
  586. let goAway = HTTP2Frame(
  587. streamID: .rootStream,
  588. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  589. )
  590. XCTAssertNoThrow(try channel.writeInbound(goAway))
  591. }
  592. self.loop.run()
  593. XCTAssertNoThrow(try channel.closeFuture.wait())
  594. // Now shutdown
  595. try self.waitForStateChange(from: .idle, to: .shutdown) {
  596. let shutdown = manager.shutdown()
  597. self.loop.run()
  598. XCTAssertNoThrow(try shutdown.wait())
  599. }
  600. }
  601. func testDoomedOptimisticChannelFromIdle() {
  602. let manager = ConnectionManager.testingOnly(
  603. configuration: self.defaultConfiguration,
  604. logger: self.logger
  605. ) {
  606. self.loop.makeFailedFuture(DoomedChannelError())
  607. }
  608. let candidate = manager.getOptimisticChannel()
  609. self.loop.run()
  610. XCTAssertThrowsError(try candidate.wait())
  611. }
  612. func testDoomedOptimisticChannelFromConnecting() throws {
  613. let promise = self.loop.makePromise(of: Channel.self)
  614. let manager = ConnectionManager.testingOnly(
  615. configuration: self.defaultConfiguration,
  616. logger: self.logger
  617. ) {
  618. promise.futureResult
  619. }
  620. self.waitForStateChange(from: .idle, to: .connecting) {
  621. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  622. _ = manager.getChannel()
  623. self.loop.run()
  624. }
  625. // We're connecting: get an optimistic channel.
  626. let optimisticChannel = manager.getOptimisticChannel()
  627. self.loop.run()
  628. // Fail the promise.
  629. promise.fail(DoomedChannelError())
  630. XCTAssertThrowsError(try optimisticChannel.wait())
  631. }
  632. func testOptimisticChannelFromTransientFailure() throws {
  633. var configuration = self.defaultConfiguration
  634. configuration.connectionBackoff = ConnectionBackoff()
  635. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  636. self.loop.makeFailedFuture(DoomedChannelError())
  637. }
  638. self.waitForStateChanges([
  639. Change(from: .idle, to: .connecting),
  640. Change(from: .connecting, to: .transientFailure),
  641. ]) {
  642. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  643. _ = manager.getChannel()
  644. self.loop.run()
  645. }
  646. // Now we're sitting in transient failure. Get a channel optimistically.
  647. let optimisticChannel = manager.getOptimisticChannel()
  648. self.loop.run()
  649. XCTAssertThrowsError(try optimisticChannel.wait())
  650. }
  651. func testOptimisticChannelFromShutdown() throws {
  652. let manager = ConnectionManager.testingOnly(
  653. configuration: self.defaultConfiguration,
  654. logger: self.logger
  655. ) {
  656. self.loop.makeFailedFuture(DoomedChannelError())
  657. }
  658. let shutdown = manager.shutdown()
  659. self.loop.run()
  660. XCTAssertNoThrow(try shutdown.wait())
  661. // Get a channel optimistically. It'll fail, obviously.
  662. let channel = manager.getOptimisticChannel()
  663. self.loop.run()
  664. XCTAssertThrowsError(try channel.wait())
  665. }
  666. func testDoubleIdle() throws {
  667. class CloseDroppingHandler: ChannelOutboundHandler {
  668. typealias OutboundIn = Any
  669. func close(context: ChannelHandlerContext, mode: CloseMode,
  670. promise: EventLoopPromise<Void>?) {
  671. promise?
  672. .fail(GRPCStatus(code: .unavailable, message: "Purposefully dropping channel close"))
  673. }
  674. }
  675. let channelPromise = self.loop.makePromise(of: Channel.self)
  676. let manager = ConnectionManager.testingOnly(
  677. configuration: self.defaultConfiguration,
  678. logger: self.logger
  679. ) {
  680. channelPromise.futureResult
  681. }
  682. // Start the connection.
  683. let readyChannel: EventLoopFuture<Channel> = self
  684. .waitForStateChange(from: .idle, to: .connecting) {
  685. let readyChannel = manager.getChannel()
  686. self.loop.run()
  687. return readyChannel
  688. }
  689. // Setup the real channel and activate it.
  690. let channel = EmbeddedChannel(loop: self.loop)
  691. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  692. CloseDroppingHandler(),
  693. GRPCIdleHandler(mode: .client(manager), logger: manager.logger),
  694. ]).wait())
  695. channelPromise.succeed(channel)
  696. self.loop.run()
  697. XCTAssertNoThrow(
  698. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  699. .wait()
  700. )
  701. // Write a SETTINGS frame on the root stream.
  702. try self.waitForStateChange(from: .connecting, to: .ready) {
  703. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  704. XCTAssertNoThrow(try channel.writeInbound(frame))
  705. }
  706. // The channel should now be ready.
  707. XCTAssertNoThrow(try readyChannel.wait())
  708. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  709. // channel to close.
  710. try self.waitForStateChange(from: .ready, to: .idle) {
  711. let goAway = HTTP2Frame(
  712. streamID: .rootStream,
  713. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  714. )
  715. XCTAssertNoThrow(try channel.writeInbound(goAway))
  716. }
  717. // We dropped the close; now wait for the scheduled idle to fire.
  718. //
  719. // Previously doing this this would fail a precondition.
  720. self.loop.advanceTime(by: .minutes(5))
  721. }
  722. func testForceIdleAfterInactive() throws {
  723. let channelPromise = self.loop.makePromise(of: Channel.self)
  724. let manager = ConnectionManager.testingOnly(
  725. configuration: self.defaultConfiguration,
  726. logger: self.logger
  727. ) {
  728. channelPromise.futureResult
  729. }
  730. // Start the connection.
  731. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(
  732. from: .idle,
  733. to: .connecting
  734. ) {
  735. let readyChannel = manager.getChannel()
  736. self.loop.run()
  737. return readyChannel
  738. }
  739. // Setup the real channel and activate it.
  740. let channel = EmbeddedChannel(loop: self.loop)
  741. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  742. GRPCIdleHandler(mode: .client(manager), logger: manager.logger),
  743. ]).wait())
  744. channelPromise.succeed(channel)
  745. self.loop.run()
  746. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  747. XCTAssertNoThrow(try connect.wait())
  748. // Write a SETTINGS frame on the root stream.
  749. try self.waitForStateChange(from: .connecting, to: .ready) {
  750. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  751. XCTAssertNoThrow(try channel.writeInbound(frame))
  752. }
  753. // The channel should now be ready.
  754. XCTAssertNoThrow(try readyChannel.wait())
  755. // Now drop the connection.
  756. try self.waitForStateChange(from: .ready, to: .shutdown) {
  757. let shutdown = manager.shutdown()
  758. self.loop.run()
  759. XCTAssertNoThrow(try shutdown.wait())
  760. }
  761. // Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
  762. // Previously this would hit a precondition failure.
  763. channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
  764. }
  765. func testCloseWithoutActiveRPCs() throws {
  766. let channelPromise = self.loop.makePromise(of: Channel.self)
  767. let manager = ConnectionManager.testingOnly(
  768. configuration: self.defaultConfiguration,
  769. logger: self.logger
  770. ) {
  771. channelPromise.futureResult
  772. }
  773. // Start the connection.
  774. let readyChannel = self.waitForStateChange(
  775. from: .idle,
  776. to: .connecting
  777. ) { () -> EventLoopFuture<Channel> in
  778. let readyChannel = manager.getChannel()
  779. self.loop.run()
  780. return readyChannel
  781. }
  782. // Setup the actual channel and activate it.
  783. let channel = EmbeddedChannel(loop: self.loop)
  784. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  785. GRPCIdleHandler(mode: .client(manager), logger: manager.logger),
  786. ]).wait())
  787. channelPromise.succeed(channel)
  788. self.loop.run()
  789. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  790. XCTAssertNoThrow(try connect.wait())
  791. // "ready" the connection.
  792. try self.waitForStateChange(from: .connecting, to: .ready) {
  793. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  794. XCTAssertNoThrow(try channel.writeInbound(frame))
  795. }
  796. // The channel should now be ready.
  797. XCTAssertNoThrow(try readyChannel.wait())
  798. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  799. // failure state.
  800. self.waitForStateChange(from: .ready, to: .idle) {
  801. channel.pipeline.fireChannelInactive()
  802. }
  803. }
  804. }
  805. internal struct Change: Hashable, CustomStringConvertible {
  806. var from: ConnectivityState
  807. var to: ConnectivityState
  808. var description: String {
  809. return "\(self.from) → \(self.to)"
  810. }
  811. }
  812. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  813. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  814. private let semaphore = DispatchSemaphore(value: 0)
  815. private var expectation: Expectation = .noExpectation
  816. private enum Expectation {
  817. /// We have no expectation of any changes. We'll just ignore any changes.
  818. case noExpectation
  819. /// We expect one change.
  820. case one((Change) -> Void)
  821. /// We expect 'count' changes.
  822. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  823. var count: Int {
  824. switch self {
  825. case .noExpectation:
  826. return 0
  827. case .one:
  828. return 1
  829. case let .some(count, _, _):
  830. return count
  831. }
  832. }
  833. }
  834. func connectivityStateDidChange(from oldState: ConnectivityState,
  835. to newState: ConnectivityState) {
  836. self.serialQueue.async {
  837. switch self.expectation {
  838. case let .one(verify):
  839. // We don't care about future changes.
  840. self.expectation = .noExpectation
  841. // Verify and notify.
  842. verify(Change(from: oldState, to: newState))
  843. self.semaphore.signal()
  844. case .some(let count, var recorded, let verify):
  845. recorded.append(Change(from: oldState, to: newState))
  846. if recorded.count == count {
  847. // We don't care about future changes.
  848. self.expectation = .noExpectation
  849. // Verify and notify.
  850. verify(recorded)
  851. self.semaphore.signal()
  852. } else {
  853. // Still need more responses.
  854. self.expectation = .some(count: count, recorded: recorded, verify)
  855. }
  856. case .noExpectation:
  857. // Ignore any changes.
  858. ()
  859. }
  860. }
  861. }
  862. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  863. self.serialQueue.async {
  864. self.expectation = .some(count: count, recorded: [], verify)
  865. }
  866. }
  867. func expectChange(verify: @escaping (Change) -> Void) {
  868. self.serialQueue.async {
  869. self.expectation = .one(verify)
  870. }
  871. }
  872. func waitForExpectedChanges(
  873. timeout: DispatchTimeInterval,
  874. file: StaticString = #file,
  875. line: UInt = #line
  876. ) {
  877. let result = self.semaphore.wait(timeout: .now() + timeout)
  878. switch result {
  879. case .success:
  880. ()
  881. case .timedOut:
  882. XCTFail(
  883. "Timed out before verifying \(self.expectation.count) change(s)",
  884. file: file, line: line
  885. )
  886. }
  887. }
  888. }
  889. private extension ConnectionBackoff {
  890. static let oneSecondFixed = ConnectionBackoff(
  891. initialBackoff: 1.0,
  892. maximumBackoff: 1.0,
  893. multiplier: 1.0,
  894. jitter: 0.0
  895. )
  896. }
  897. private struct DoomedChannelError: Error {}