ConnectionManagerTests.swift 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ConnectionManagerTests: GRPCTestCase {
  22. private let loop = EmbeddedEventLoop()
  23. private let recorder = RecordingConnectivityDelegate()
  24. private var defaultConfiguration: ClientConnection.Configuration {
  25. return ClientConnection.Configuration(
  26. target: .unixDomainSocket("/ignored"),
  27. eventLoopGroup: self.loop,
  28. connectivityStateDelegate: self.recorder,
  29. connectionBackoff: nil,
  30. backgroundActivityLogger: self.clientLogger
  31. )
  32. }
  33. override func tearDown() {
  34. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. private func waitForStateChange<Result>(
  38. from: ConnectivityState,
  39. to: ConnectivityState,
  40. timeout: DispatchTimeInterval = .seconds(1),
  41. file: StaticString = #file,
  42. line: UInt = #line,
  43. body: () throws -> Result
  44. ) rethrows -> Result {
  45. self.recorder.expectChange {
  46. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  47. }
  48. let result = try body()
  49. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  50. return result
  51. }
  52. private func waitForStateChanges<Result>(
  53. _ changes: [Change],
  54. timeout: DispatchTimeInterval = .seconds(1),
  55. file: StaticString = #file,
  56. line: UInt = #line,
  57. body: () throws -> Result
  58. ) rethrows -> Result {
  59. self.recorder.expectChanges(changes.count) {
  60. XCTAssertEqual($0, changes)
  61. }
  62. let result = try body()
  63. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  64. return result
  65. }
  66. }
  67. extension ConnectionManagerTests {
  68. func testIdleShutdown() throws {
  69. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  70. try self.waitForStateChange(from: .idle, to: .shutdown) {
  71. let shutdown = manager.shutdown()
  72. self.loop.run()
  73. XCTAssertNoThrow(try shutdown.wait())
  74. }
  75. // Getting a multiplexer should fail.
  76. let multiplexer = manager.getHTTP2Multiplexer()
  77. self.loop.run()
  78. XCTAssertThrowsError(try multiplexer.wait())
  79. }
  80. func testConnectFromIdleFailsWithNoReconnect() {
  81. let channelPromise = self.loop.makePromise(of: Channel.self)
  82. let manager = ConnectionManager.testingOnly(
  83. configuration: self.defaultConfiguration,
  84. logger: self.logger
  85. ) {
  86. channelPromise.futureResult
  87. }
  88. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  89. .waitForStateChange(from: .idle, to: .connecting) {
  90. let channel = manager.getHTTP2Multiplexer()
  91. self.loop.run()
  92. return channel
  93. }
  94. self.waitForStateChange(from: .connecting, to: .shutdown) {
  95. channelPromise.fail(DoomedChannelError())
  96. }
  97. XCTAssertThrowsError(try multiplexer.wait()) {
  98. XCTAssertTrue($0 is DoomedChannelError)
  99. }
  100. }
  101. func testConnectAndDisconnect() throws {
  102. let channelPromise = self.loop.makePromise(of: Channel.self)
  103. let manager = ConnectionManager.testingOnly(
  104. configuration: self.defaultConfiguration,
  105. logger: self.logger
  106. ) {
  107. channelPromise.futureResult
  108. }
  109. // Start the connection.
  110. self.waitForStateChange(from: .idle, to: .connecting) {
  111. _ = manager.getHTTP2Multiplexer()
  112. self.loop.run()
  113. }
  114. // Setup the real channel and activate it.
  115. let channel = EmbeddedChannel(loop: self.loop)
  116. let h2mux = HTTP2StreamMultiplexer(
  117. mode: .client,
  118. channel: channel,
  119. inboundStreamInitializer: nil
  120. )
  121. try channel.pipeline.addHandler(
  122. GRPCIdleHandler(
  123. mode: .client(manager, h2mux),
  124. logger: self.logger,
  125. idleTimeout: .minutes(5)
  126. )
  127. ).wait()
  128. channelPromise.succeed(channel)
  129. XCTAssertNoThrow(
  130. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  131. .wait()
  132. )
  133. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  134. try self.waitForStateChange(from: .connecting, to: .ready) {
  135. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  136. XCTAssertNoThrow(try channel.writeInbound(frame))
  137. }
  138. // Close the channel.
  139. try self.waitForStateChange(from: .ready, to: .shutdown) {
  140. // Now the channel should be available: shut it down.
  141. let shutdown = manager.shutdown()
  142. self.loop.run()
  143. XCTAssertNoThrow(try shutdown.wait())
  144. }
  145. }
  146. func testConnectAndIdle() throws {
  147. let channelPromise = self.loop.makePromise(of: Channel.self)
  148. let manager = ConnectionManager.testingOnly(
  149. configuration: self.defaultConfiguration,
  150. logger: self.logger
  151. ) {
  152. channelPromise.futureResult
  153. }
  154. // Start the connection.
  155. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  156. .waitForStateChange(from: .idle, to: .connecting) {
  157. let readyChannelMux = manager.getHTTP2Multiplexer()
  158. self.loop.run()
  159. return readyChannelMux
  160. }
  161. // Setup the channel.
  162. let channel = EmbeddedChannel(loop: self.loop)
  163. let h2mux = HTTP2StreamMultiplexer(
  164. mode: .client,
  165. channel: channel,
  166. inboundStreamInitializer: nil
  167. )
  168. try channel.pipeline.addHandler(
  169. GRPCIdleHandler(
  170. mode: .client(manager, h2mux),
  171. logger: self.logger,
  172. idleTimeout: .minutes(5)
  173. )
  174. ).wait()
  175. channelPromise.succeed(channel)
  176. XCTAssertNoThrow(
  177. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  178. .wait()
  179. )
  180. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  181. try self.waitForStateChange(from: .connecting, to: .ready) {
  182. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  183. XCTAssertNoThrow(try channel.writeInbound(frame))
  184. // Wait for the multiplexer, it _must_ be ready now.
  185. XCTAssertNoThrow(try readyChannelMux.wait())
  186. }
  187. // Go idle. This will shutdown the channel.
  188. try self.waitForStateChange(from: .ready, to: .idle) {
  189. self.loop.advanceTime(by: .minutes(5))
  190. XCTAssertNoThrow(try channel.closeFuture.wait())
  191. }
  192. // Now shutdown.
  193. try self.waitForStateChange(from: .idle, to: .shutdown) {
  194. let shutdown = manager.shutdown()
  195. self.loop.run()
  196. XCTAssertNoThrow(try shutdown.wait())
  197. }
  198. }
  199. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  200. let channelPromise = self.loop.makePromise(of: Channel.self)
  201. let manager = ConnectionManager.testingOnly(
  202. configuration: self.defaultConfiguration,
  203. logger: self.logger
  204. ) {
  205. channelPromise.futureResult
  206. }
  207. // Start the connection.
  208. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  209. .waitForStateChange(from: .idle, to: .connecting) {
  210. let readyChannelMux = manager.getHTTP2Multiplexer()
  211. self.loop.run()
  212. return readyChannelMux
  213. }
  214. // Setup the channel.
  215. let channel = EmbeddedChannel(loop: self.loop)
  216. let h2mux = HTTP2StreamMultiplexer(
  217. mode: .client,
  218. channel: channel,
  219. inboundStreamInitializer: nil
  220. )
  221. try channel.pipeline.addHandler(
  222. GRPCIdleHandler(
  223. mode: .client(manager, h2mux),
  224. logger: self.logger,
  225. idleTimeout: .minutes(5)
  226. )
  227. ).wait()
  228. channelPromise.succeed(channel)
  229. XCTAssertNoThrow(
  230. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  231. .wait()
  232. )
  233. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  234. try self.waitForStateChange(from: .connecting, to: .ready) {
  235. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  236. XCTAssertNoThrow(try channel.writeInbound(frame))
  237. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  238. XCTAssertNoThrow(try readyChannelMux.wait())
  239. }
  240. // "create" a stream; the details don't matter here.
  241. let streamCreated = NIOHTTP2StreamCreatedEvent(
  242. streamID: 1,
  243. localInitialWindowSize: nil,
  244. remoteInitialWindowSize: nil
  245. )
  246. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  247. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  248. self.loop.advanceTime(by: .minutes(5))
  249. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  250. self.waitForStateChange(from: .ready, to: .idle) {
  251. // Close the stream.
  252. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  253. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  254. // ... wait for the idle timeout,
  255. self.loop.advanceTime(by: .minutes(5))
  256. }
  257. // Now shutdown.
  258. try self.waitForStateChange(from: .idle, to: .shutdown) {
  259. let shutdown = manager.shutdown()
  260. self.loop.run()
  261. XCTAssertNoThrow(try shutdown.wait())
  262. }
  263. }
  264. func testConnectAndThenBecomeInactive() throws {
  265. let channelPromise = self.loop.makePromise(of: Channel.self)
  266. let manager = ConnectionManager.testingOnly(
  267. configuration: self.defaultConfiguration,
  268. logger: self.logger
  269. ) {
  270. channelPromise.futureResult
  271. }
  272. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  273. .waitForStateChange(from: .idle, to: .connecting) {
  274. let readyChannelMux = manager.getHTTP2Multiplexer()
  275. self.loop.run()
  276. return readyChannelMux
  277. }
  278. // Setup the channel.
  279. let channel = EmbeddedChannel(loop: self.loop)
  280. let h2mux = HTTP2StreamMultiplexer(
  281. mode: .client,
  282. channel: channel,
  283. inboundStreamInitializer: nil
  284. )
  285. try channel.pipeline.addHandler(
  286. GRPCIdleHandler(
  287. mode: .client(manager, h2mux),
  288. logger: self.logger,
  289. idleTimeout: .minutes(5)
  290. )
  291. ).wait()
  292. channelPromise.succeed(channel)
  293. XCTAssertNoThrow(
  294. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  295. .wait()
  296. )
  297. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  298. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  299. let shutdown = manager.shutdown()
  300. self.loop.run()
  301. XCTAssertNoThrow(try shutdown.wait())
  302. }
  303. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  304. // the `readyChannelMux` should error.
  305. XCTAssertThrowsError(try readyChannelMux.wait())
  306. }
  307. func testConnectOnSecondAttempt() throws {
  308. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  309. let channelFutures: [EventLoopFuture<Channel>] = [
  310. self.loop.makeFailedFuture(DoomedChannelError()),
  311. channelPromise.futureResult,
  312. ]
  313. var channelFutureIterator = channelFutures.makeIterator()
  314. var configuration = self.defaultConfiguration
  315. configuration.connectionBackoff = .oneSecondFixed
  316. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  317. guard let next = channelFutureIterator.next() else {
  318. XCTFail("Too many channels requested")
  319. return self.loop.makeFailedFuture(DoomedChannelError())
  320. }
  321. return next
  322. }
  323. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  324. Change(from: .idle, to: .connecting),
  325. Change(from: .connecting, to: .transientFailure),
  326. ]) {
  327. // Get a HTTP/2 stream multiplexer.
  328. let readyChannelMux = manager.getHTTP2Multiplexer()
  329. self.loop.run()
  330. return readyChannelMux
  331. }
  332. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  333. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  334. self.loop.run()
  335. // Move time forwards by a second to start the next connection attempt.
  336. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  337. self.loop.advanceTime(by: .seconds(1))
  338. }
  339. // Setup the actual channel and complete the promise.
  340. let channel = EmbeddedChannel(loop: self.loop)
  341. let h2mux = HTTP2StreamMultiplexer(
  342. mode: .client,
  343. channel: channel,
  344. inboundStreamInitializer: nil
  345. )
  346. try channel.pipeline.addHandler(
  347. GRPCIdleHandler(
  348. mode: .client(manager, h2mux),
  349. logger: self.logger,
  350. idleTimeout: .minutes(5)
  351. )
  352. ).wait()
  353. channelPromise.succeed(channel)
  354. XCTAssertNoThrow(
  355. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  356. .wait()
  357. )
  358. // Write a SETTINGS frame on the root stream.
  359. try self.waitForStateChange(from: .connecting, to: .ready) {
  360. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  361. XCTAssertNoThrow(try channel.writeInbound(frame))
  362. }
  363. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  364. XCTAssertNoThrow(try readyChannelMux.wait())
  365. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  366. // Now shutdown.
  367. try self.waitForStateChange(from: .ready, to: .shutdown) {
  368. let shutdown = manager.shutdown()
  369. self.loop.run()
  370. XCTAssertNoThrow(try shutdown.wait())
  371. }
  372. }
  373. func testShutdownWhileConnecting() throws {
  374. let channelPromise = self.loop.makePromise(of: Channel.self)
  375. let manager = ConnectionManager.testingOnly(
  376. configuration: self.defaultConfiguration,
  377. logger: self.logger
  378. ) {
  379. channelPromise.futureResult
  380. }
  381. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  382. .waitForStateChange(from: .idle, to: .connecting) {
  383. let readyChannelMux = manager.getHTTP2Multiplexer()
  384. self.loop.run()
  385. return readyChannelMux
  386. }
  387. // Now shutdown.
  388. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  389. let shutdown = manager.shutdown()
  390. self.loop.run()
  391. XCTAssertNoThrow(try shutdown.wait())
  392. }
  393. // The multiplexer we were requesting should fail.
  394. XCTAssertThrowsError(try readyChannelMux.wait())
  395. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  396. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  397. let channel = try channelPromise.futureResult.wait()
  398. self.loop.run()
  399. XCTAssertNoThrow(try channel.closeFuture.wait())
  400. }
  401. func testShutdownWhileTransientFailure() throws {
  402. var configuration = self.defaultConfiguration
  403. configuration.connectionBackoff = .oneSecondFixed
  404. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  405. self.loop.makeFailedFuture(DoomedChannelError())
  406. }
  407. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  408. Change(from: .idle, to: .connecting),
  409. Change(from: .connecting, to: .transientFailure),
  410. ]) {
  411. // Get a HTTP/2 stream multiplexer.
  412. let readyChannelMux = manager.getHTTP2Multiplexer()
  413. self.loop.run()
  414. return readyChannelMux
  415. }
  416. // Now shutdown.
  417. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  418. let shutdown = manager.shutdown()
  419. self.loop.run()
  420. XCTAssertNoThrow(try shutdown.wait())
  421. }
  422. // The HTTP/2 stream mux we were requesting should fail.
  423. XCTAssertThrowsError(try readyChannelMux.wait())
  424. }
  425. func testShutdownWhileActive() throws {
  426. let channelPromise = self.loop.makePromise(of: Channel.self)
  427. let manager = ConnectionManager.testingOnly(
  428. configuration: self.defaultConfiguration,
  429. logger: self.logger
  430. ) {
  431. channelPromise.futureResult
  432. }
  433. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  434. .waitForStateChange(from: .idle, to: .connecting) {
  435. let readyChannelMux = manager.getHTTP2Multiplexer()
  436. self.loop.run()
  437. return readyChannelMux
  438. }
  439. // Prepare the channel
  440. let channel = EmbeddedChannel(loop: self.loop)
  441. let h2mux = HTTP2StreamMultiplexer(
  442. mode: .client,
  443. channel: channel,
  444. inboundStreamInitializer: nil
  445. )
  446. try channel.pipeline.addHandler(
  447. GRPCIdleHandler(
  448. mode: .client(manager, h2mux),
  449. logger: self.logger,
  450. idleTimeout: .minutes(5)
  451. )
  452. ).wait()
  453. channelPromise.succeed(channel)
  454. XCTAssertNoThrow(
  455. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  456. .wait()
  457. )
  458. // (No state change expected here: active is an internal state.)
  459. // Now shutdown.
  460. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  461. let shutdown = manager.shutdown()
  462. self.loop.run()
  463. XCTAssertNoThrow(try shutdown.wait())
  464. }
  465. // The HTTP/2 stream multiplexer we were requesting should fail.
  466. XCTAssertThrowsError(try readyChannelMux.wait())
  467. }
  468. func testShutdownWhileShutdown() throws {
  469. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  470. try self.waitForStateChange(from: .idle, to: .shutdown) {
  471. let firstShutdown = manager.shutdown()
  472. self.loop.run()
  473. XCTAssertNoThrow(try firstShutdown.wait())
  474. }
  475. let secondShutdown = manager.shutdown()
  476. self.loop.run()
  477. XCTAssertNoThrow(try secondShutdown.wait())
  478. }
  479. func testTransientFailureWhileActive() throws {
  480. var configuration = self.defaultConfiguration
  481. configuration.connectionBackoff = .oneSecondFixed
  482. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  483. let channelFutures: [EventLoopFuture<Channel>] = [
  484. channelPromise.futureResult,
  485. self.loop.makeFailedFuture(DoomedChannelError()),
  486. ]
  487. var channelFutureIterator = channelFutures.makeIterator()
  488. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  489. guard let next = channelFutureIterator.next() else {
  490. XCTFail("Too many channels requested")
  491. return self.loop.makeFailedFuture(DoomedChannelError())
  492. }
  493. return next
  494. }
  495. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  496. .waitForStateChange(from: .idle, to: .connecting) {
  497. let readyChannelMux = manager.getHTTP2Multiplexer()
  498. self.loop.run()
  499. return readyChannelMux
  500. }
  501. // Prepare the channel
  502. let firstChannel = EmbeddedChannel(loop: self.loop)
  503. let h2mux = HTTP2StreamMultiplexer(
  504. mode: .client,
  505. channel: firstChannel,
  506. inboundStreamInitializer: nil
  507. )
  508. try firstChannel.pipeline.addHandler(
  509. GRPCIdleHandler(
  510. mode: .client(manager, h2mux),
  511. logger: self.logger,
  512. idleTimeout: .minutes(5)
  513. )
  514. ).wait()
  515. channelPromise.succeed(firstChannel)
  516. XCTAssertNoThrow(
  517. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  518. .wait()
  519. )
  520. // (No state change expected here: active is an internal state.)
  521. // Close the channel (simulate e.g. TLS handshake failed)
  522. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  523. XCTAssertNoThrow(try firstChannel.close().wait())
  524. }
  525. // Start connecting again.
  526. self.waitForStateChanges([
  527. Change(from: .transientFailure, to: .connecting),
  528. Change(from: .connecting, to: .transientFailure),
  529. ]) {
  530. self.loop.advanceTime(by: .seconds(1))
  531. }
  532. // Now shutdown
  533. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  534. let shutdown = manager.shutdown()
  535. self.loop.run()
  536. XCTAssertNoThrow(try shutdown.wait())
  537. }
  538. // The channel never came up: it should be throw.
  539. XCTAssertThrowsError(try readyChannelMux.wait())
  540. }
  541. func testTransientFailureWhileReady() throws {
  542. var configuration = self.defaultConfiguration
  543. configuration.connectionBackoff = .oneSecondFixed
  544. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  545. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  546. let channelFutures: [EventLoopFuture<Channel>] = [
  547. firstChannelPromise.futureResult,
  548. secondChannelPromise.futureResult,
  549. ]
  550. var channelFutureIterator = channelFutures.makeIterator()
  551. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  552. guard let next = channelFutureIterator.next() else {
  553. XCTFail("Too many channels requested")
  554. return self.loop.makeFailedFuture(DoomedChannelError())
  555. }
  556. return next
  557. }
  558. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  559. .waitForStateChange(from: .idle, to: .connecting) {
  560. let readyChannelMux = manager.getHTTP2Multiplexer()
  561. self.loop.run()
  562. return readyChannelMux
  563. }
  564. // Prepare the first channel
  565. let firstChannel = EmbeddedChannel(loop: self.loop)
  566. let firstH2mux = HTTP2StreamMultiplexer(
  567. mode: .client,
  568. channel: firstChannel,
  569. inboundStreamInitializer: nil
  570. )
  571. try firstChannel.pipeline.addHandler(
  572. GRPCIdleHandler(
  573. mode: .client(manager, firstH2mux),
  574. logger: self.logger,
  575. idleTimeout: .minutes(5)
  576. )
  577. ).wait()
  578. firstChannelPromise.succeed(firstChannel)
  579. XCTAssertNoThrow(
  580. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  581. .wait()
  582. )
  583. // Write a SETTINGS frame on the root stream.
  584. try self.waitForStateChange(from: .connecting, to: .ready) {
  585. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  586. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  587. }
  588. // Channel should now be ready.
  589. XCTAssertNoThrow(try readyChannelMux.wait())
  590. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  591. let streamCreated = NIOHTTP2StreamCreatedEvent(
  592. streamID: 1,
  593. localInitialWindowSize: nil,
  594. remoteInitialWindowSize: nil
  595. )
  596. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  597. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  598. XCTAssertNoThrow(try firstChannel.close().wait())
  599. }
  600. // Run to start connecting again.
  601. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  602. self.loop.advanceTime(by: .seconds(1))
  603. }
  604. // Prepare the second channel
  605. let secondChannel = EmbeddedChannel(loop: self.loop)
  606. let secondH2mux = HTTP2StreamMultiplexer(
  607. mode: .client,
  608. channel: secondChannel,
  609. inboundStreamInitializer: nil
  610. )
  611. try secondChannel.pipeline.addHandler(
  612. GRPCIdleHandler(
  613. mode: .client(manager, secondH2mux),
  614. logger: self.logger,
  615. idleTimeout: .minutes(5)
  616. )
  617. ).wait()
  618. secondChannelPromise.succeed(secondChannel)
  619. XCTAssertNoThrow(
  620. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  621. .wait()
  622. )
  623. // Write a SETTINGS frame on the root stream.
  624. try self.waitForStateChange(from: .connecting, to: .ready) {
  625. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  626. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  627. }
  628. // Now shutdown
  629. try self.waitForStateChange(from: .ready, to: .shutdown) {
  630. let shutdown = manager.shutdown()
  631. self.loop.run()
  632. XCTAssertNoThrow(try shutdown.wait())
  633. }
  634. }
  635. func testGoAwayWhenReady() throws {
  636. let channelPromise = self.loop.makePromise(of: Channel.self)
  637. let manager = ConnectionManager.testingOnly(
  638. configuration: self.defaultConfiguration,
  639. logger: self.logger
  640. ) {
  641. channelPromise.futureResult
  642. }
  643. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  644. .waitForStateChange(from: .idle, to: .connecting) {
  645. let readyChannelMux = manager.getHTTP2Multiplexer()
  646. self.loop.run()
  647. return readyChannelMux
  648. }
  649. // Setup the channel.
  650. let channel = EmbeddedChannel(loop: self.loop)
  651. let h2mux = HTTP2StreamMultiplexer(
  652. mode: .client,
  653. channel: channel,
  654. inboundStreamInitializer: nil
  655. )
  656. try channel.pipeline.addHandler(
  657. GRPCIdleHandler(
  658. mode: .client(manager, h2mux),
  659. logger: self.logger,
  660. idleTimeout: .minutes(5)
  661. )
  662. ).wait()
  663. channelPromise.succeed(channel)
  664. XCTAssertNoThrow(
  665. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  666. .wait()
  667. )
  668. try self.waitForStateChange(from: .connecting, to: .ready) {
  669. // Write a SETTINGS frame on the root stream.
  670. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  671. XCTAssertNoThrow(try channel.writeInbound(frame))
  672. }
  673. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  674. XCTAssertNoThrow(try readyChannelMux.wait())
  675. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  676. // channel to close.
  677. try self.waitForStateChange(from: .ready, to: .idle) {
  678. let goAway = HTTP2Frame(
  679. streamID: .rootStream,
  680. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  681. )
  682. XCTAssertNoThrow(try channel.writeInbound(goAway))
  683. }
  684. self.loop.run()
  685. XCTAssertNoThrow(try channel.closeFuture.wait())
  686. // Now shutdown
  687. try self.waitForStateChange(from: .idle, to: .shutdown) {
  688. let shutdown = manager.shutdown()
  689. self.loop.run()
  690. XCTAssertNoThrow(try shutdown.wait())
  691. }
  692. }
  693. func testDoomedOptimisticChannelFromIdle() {
  694. var configuration = self.defaultConfiguration
  695. configuration.callStartBehavior = .fastFailure
  696. let manager = ConnectionManager.testingOnly(
  697. configuration: configuration,
  698. logger: self.logger
  699. ) {
  700. self.loop.makeFailedFuture(DoomedChannelError())
  701. }
  702. let candidate = manager.getHTTP2Multiplexer()
  703. self.loop.run()
  704. XCTAssertThrowsError(try candidate.wait())
  705. }
  706. func testDoomedOptimisticChannelFromConnecting() throws {
  707. var configuration = self.defaultConfiguration
  708. configuration.callStartBehavior = .fastFailure
  709. let promise = self.loop.makePromise(of: Channel.self)
  710. let manager = ConnectionManager.testingOnly(
  711. configuration: configuration,
  712. logger: self.logger
  713. ) {
  714. promise.futureResult
  715. }
  716. self.waitForStateChange(from: .idle, to: .connecting) {
  717. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  718. _ = manager.getHTTP2Multiplexer()
  719. self.loop.run()
  720. }
  721. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  722. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  723. self.loop.run()
  724. // Fail the promise.
  725. promise.fail(DoomedChannelError())
  726. XCTAssertThrowsError(try optimisticChannelMux.wait())
  727. }
  728. func testOptimisticChannelFromTransientFailure() throws {
  729. var configuration = self.defaultConfiguration
  730. configuration.callStartBehavior = .fastFailure
  731. configuration.connectionBackoff = ConnectionBackoff()
  732. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  733. self.loop.makeFailedFuture(DoomedChannelError())
  734. }
  735. self.waitForStateChanges([
  736. Change(from: .idle, to: .connecting),
  737. Change(from: .connecting, to: .transientFailure),
  738. ]) {
  739. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  740. _ = manager.getHTTP2Multiplexer()
  741. self.loop.run()
  742. }
  743. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  744. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  745. self.loop.run()
  746. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  747. XCTAssertTrue(error is DoomedChannelError)
  748. }
  749. }
  750. func testOptimisticChannelFromShutdown() throws {
  751. var configuration = self.defaultConfiguration
  752. configuration.callStartBehavior = .fastFailure
  753. let manager = ConnectionManager.testingOnly(
  754. configuration: configuration,
  755. logger: self.logger
  756. ) {
  757. self.loop.makeFailedFuture(DoomedChannelError())
  758. }
  759. let shutdown = manager.shutdown()
  760. self.loop.run()
  761. XCTAssertNoThrow(try shutdown.wait())
  762. // Get a channel optimistically. It'll fail, obviously.
  763. let channelMux = manager.getHTTP2Multiplexer()
  764. self.loop.run()
  765. XCTAssertThrowsError(try channelMux.wait())
  766. }
  767. func testForceIdleAfterInactive() throws {
  768. let channelPromise = self.loop.makePromise(of: Channel.self)
  769. let manager = ConnectionManager.testingOnly(
  770. configuration: self.defaultConfiguration,
  771. logger: self.logger
  772. ) {
  773. channelPromise.futureResult
  774. }
  775. // Start the connection.
  776. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  777. from: .idle,
  778. to: .connecting
  779. ) {
  780. let readyChannelMux = manager.getHTTP2Multiplexer()
  781. self.loop.run()
  782. return readyChannelMux
  783. }
  784. // Setup the real channel and activate it.
  785. let channel = EmbeddedChannel(loop: self.loop)
  786. let h2mux = HTTP2StreamMultiplexer(
  787. mode: .client,
  788. channel: channel,
  789. inboundStreamInitializer: nil
  790. )
  791. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  792. GRPCIdleHandler(
  793. mode: .client(manager, h2mux),
  794. logger: manager.logger,
  795. idleTimeout: .minutes(5)
  796. ),
  797. ]).wait())
  798. channelPromise.succeed(channel)
  799. self.loop.run()
  800. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  801. XCTAssertNoThrow(try connect.wait())
  802. // Write a SETTINGS frame on the root stream.
  803. try self.waitForStateChange(from: .connecting, to: .ready) {
  804. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  805. XCTAssertNoThrow(try channel.writeInbound(frame))
  806. }
  807. // The channel should now be ready.
  808. XCTAssertNoThrow(try readyChannelMux.wait())
  809. // Now drop the connection.
  810. try self.waitForStateChange(from: .ready, to: .shutdown) {
  811. let shutdown = manager.shutdown()
  812. self.loop.run()
  813. XCTAssertNoThrow(try shutdown.wait())
  814. }
  815. // Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
  816. // Previously this would hit a precondition failure.
  817. channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
  818. }
  819. func testCloseWithoutActiveRPCs() throws {
  820. let channelPromise = self.loop.makePromise(of: Channel.self)
  821. let manager = ConnectionManager.testingOnly(
  822. configuration: self.defaultConfiguration,
  823. logger: self.logger
  824. ) {
  825. channelPromise.futureResult
  826. }
  827. // Start the connection.
  828. let readyChannelMux = self.waitForStateChange(
  829. from: .idle,
  830. to: .connecting
  831. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  832. let readyChannelMux = manager.getHTTP2Multiplexer()
  833. self.loop.run()
  834. return readyChannelMux
  835. }
  836. // Setup the actual channel and activate it.
  837. let channel = EmbeddedChannel(loop: self.loop)
  838. let h2mux = HTTP2StreamMultiplexer(
  839. mode: .client,
  840. channel: channel,
  841. inboundStreamInitializer: nil
  842. )
  843. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  844. GRPCIdleHandler(
  845. mode: .client(manager, h2mux),
  846. logger: manager.logger,
  847. idleTimeout: .minutes(5)
  848. ),
  849. ]).wait())
  850. channelPromise.succeed(channel)
  851. self.loop.run()
  852. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  853. XCTAssertNoThrow(try connect.wait())
  854. // "ready" the connection.
  855. try self.waitForStateChange(from: .connecting, to: .ready) {
  856. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  857. XCTAssertNoThrow(try channel.writeInbound(frame))
  858. }
  859. // The HTTP/2 stream multiplexer should now be ready.
  860. XCTAssertNoThrow(try readyChannelMux.wait())
  861. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  862. // failure state.
  863. self.waitForStateChange(from: .ready, to: .idle) {
  864. channel.pipeline.fireChannelInactive()
  865. }
  866. }
  867. }
  868. internal struct Change: Hashable, CustomStringConvertible {
  869. var from: ConnectivityState
  870. var to: ConnectivityState
  871. var description: String {
  872. return "\(self.from) → \(self.to)"
  873. }
  874. }
  875. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  876. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  877. private let semaphore = DispatchSemaphore(value: 0)
  878. private var expectation: Expectation = .noExpectation
  879. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  880. private enum Expectation {
  881. /// We have no expectation of any changes. We'll just ignore any changes.
  882. case noExpectation
  883. /// We expect one change.
  884. case one((Change) -> Void)
  885. /// We expect 'count' changes.
  886. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  887. var count: Int {
  888. switch self {
  889. case .noExpectation:
  890. return 0
  891. case .one:
  892. return 1
  893. case let .some(count, _, _):
  894. return count
  895. }
  896. }
  897. }
  898. func connectivityStateDidChange(from oldState: ConnectivityState,
  899. to newState: ConnectivityState) {
  900. self.serialQueue.async {
  901. switch self.expectation {
  902. case let .one(verify):
  903. // We don't care about future changes.
  904. self.expectation = .noExpectation
  905. // Verify and notify.
  906. verify(Change(from: oldState, to: newState))
  907. self.semaphore.signal()
  908. case .some(let count, var recorded, let verify):
  909. recorded.append(Change(from: oldState, to: newState))
  910. if recorded.count == count {
  911. // We don't care about future changes.
  912. self.expectation = .noExpectation
  913. // Verify and notify.
  914. verify(recorded)
  915. self.semaphore.signal()
  916. } else {
  917. // Still need more responses.
  918. self.expectation = .some(count: count, recorded: recorded, verify)
  919. }
  920. case .noExpectation:
  921. // Ignore any changes.
  922. ()
  923. }
  924. }
  925. }
  926. func connectionStartedQuiescing() {
  927. self.serialQueue.async {
  928. self.quiescingSemaphore.signal()
  929. }
  930. }
  931. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  932. self.serialQueue.async {
  933. self.expectation = .some(count: count, recorded: [], verify)
  934. }
  935. }
  936. func expectChange(verify: @escaping (Change) -> Void) {
  937. self.serialQueue.async {
  938. self.expectation = .one(verify)
  939. }
  940. }
  941. func waitForExpectedChanges(
  942. timeout: DispatchTimeInterval,
  943. file: StaticString = #file,
  944. line: UInt = #line
  945. ) {
  946. let result = self.semaphore.wait(timeout: .now() + timeout)
  947. switch result {
  948. case .success:
  949. ()
  950. case .timedOut:
  951. XCTFail(
  952. "Timed out before verifying \(self.expectation.count) change(s)",
  953. file: file, line: line
  954. )
  955. }
  956. }
  957. func waitForQuiescing(timeout: DispatchTimeInterval) {
  958. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  959. switch result {
  960. case .success:
  961. ()
  962. case .timedOut:
  963. XCTFail("Timed out waiting for connection to start quiescing")
  964. }
  965. }
  966. }
  967. private extension ConnectionBackoff {
  968. static let oneSecondFixed = ConnectionBackoff(
  969. initialBackoff: 1.0,
  970. maximumBackoff: 1.0,
  971. multiplier: 1.0,
  972. jitter: 0.0
  973. )
  974. }
  975. private struct DoomedChannelError: Error {}