ConnectionManagerTests.swift 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ConnectionManagerTests: GRPCTestCase {
  22. private let loop = EmbeddedEventLoop()
  23. private let recorder = RecordingConnectivityDelegate()
  24. private var defaultConfiguration: ClientConnection.Configuration {
  25. return ClientConnection.Configuration(
  26. target: .unixDomainSocket("/ignored"),
  27. eventLoopGroup: self.loop,
  28. connectivityStateDelegate: self.recorder,
  29. connectionBackoff: nil,
  30. backgroundActivityLogger: self.clientLogger
  31. )
  32. }
  33. override func tearDown() {
  34. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. private func waitForStateChange<Result>(
  38. from: ConnectivityState,
  39. to: ConnectivityState,
  40. timeout: DispatchTimeInterval = .seconds(1),
  41. file: StaticString = #file,
  42. line: UInt = #line,
  43. body: () throws -> Result
  44. ) rethrows -> Result {
  45. self.recorder.expectChange {
  46. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  47. }
  48. let result = try body()
  49. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  50. return result
  51. }
  52. private func waitForStateChanges<Result>(
  53. _ changes: [Change],
  54. timeout: DispatchTimeInterval = .seconds(1),
  55. file: StaticString = #file,
  56. line: UInt = #line,
  57. body: () throws -> Result
  58. ) rethrows -> Result {
  59. self.recorder.expectChanges(changes.count) {
  60. XCTAssertEqual($0, changes)
  61. }
  62. let result = try body()
  63. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  64. return result
  65. }
  66. }
  67. extension ConnectionManagerTests {
  68. func testIdleShutdown() throws {
  69. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  70. try self.waitForStateChange(from: .idle, to: .shutdown) {
  71. let shutdown = manager.shutdown()
  72. self.loop.run()
  73. XCTAssertNoThrow(try shutdown.wait())
  74. }
  75. // Getting a multiplexer should fail.
  76. let multiplexer = manager.getHTTP2Multiplexer()
  77. self.loop.run()
  78. XCTAssertThrowsError(try multiplexer.wait())
  79. }
  80. func testConnectFromIdleFailsWithNoReconnect() {
  81. let channelPromise = self.loop.makePromise(of: Channel.self)
  82. let manager = ConnectionManager.testingOnly(
  83. configuration: self.defaultConfiguration,
  84. logger: self.logger
  85. ) {
  86. channelPromise.futureResult
  87. }
  88. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  89. .waitForStateChange(from: .idle, to: .connecting) {
  90. let channel = manager.getHTTP2Multiplexer()
  91. self.loop.run()
  92. return channel
  93. }
  94. self.waitForStateChange(from: .connecting, to: .shutdown) {
  95. channelPromise.fail(DoomedChannelError())
  96. }
  97. XCTAssertThrowsError(try multiplexer.wait()) {
  98. XCTAssertTrue($0 is DoomedChannelError)
  99. }
  100. }
  101. func testConnectAndDisconnect() throws {
  102. let channelPromise = self.loop.makePromise(of: Channel.self)
  103. let manager = ConnectionManager.testingOnly(
  104. configuration: self.defaultConfiguration,
  105. logger: self.logger
  106. ) {
  107. channelPromise.futureResult
  108. }
  109. // Start the connection.
  110. self.waitForStateChange(from: .idle, to: .connecting) {
  111. _ = manager.getHTTP2Multiplexer()
  112. self.loop.run()
  113. }
  114. // Setup the real channel and activate it.
  115. let channel = EmbeddedChannel(loop: self.loop)
  116. let h2mux = HTTP2StreamMultiplexer(
  117. mode: .client,
  118. channel: channel,
  119. inboundStreamInitializer: nil
  120. )
  121. try channel.pipeline.addHandler(
  122. GRPCIdleHandler(
  123. connectionManager: manager,
  124. multiplexer: h2mux,
  125. idleTimeout: .minutes(5),
  126. keepalive: .init(),
  127. logger: self.logger
  128. )
  129. ).wait()
  130. channelPromise.succeed(channel)
  131. XCTAssertNoThrow(
  132. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  133. .wait()
  134. )
  135. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  136. try self.waitForStateChange(from: .connecting, to: .ready) {
  137. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  138. XCTAssertNoThrow(try channel.writeInbound(frame))
  139. }
  140. // Close the channel.
  141. try self.waitForStateChange(from: .ready, to: .shutdown) {
  142. // Now the channel should be available: shut it down.
  143. let shutdown = manager.shutdown()
  144. self.loop.run()
  145. XCTAssertNoThrow(try shutdown.wait())
  146. }
  147. }
  148. func testConnectAndIdle() throws {
  149. let channelPromise = self.loop.makePromise(of: Channel.self)
  150. let manager = ConnectionManager.testingOnly(
  151. configuration: self.defaultConfiguration,
  152. logger: self.logger
  153. ) {
  154. channelPromise.futureResult
  155. }
  156. // Start the connection.
  157. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  158. .waitForStateChange(from: .idle, to: .connecting) {
  159. let readyChannelMux = manager.getHTTP2Multiplexer()
  160. self.loop.run()
  161. return readyChannelMux
  162. }
  163. // Setup the channel.
  164. let channel = EmbeddedChannel(loop: self.loop)
  165. let h2mux = HTTP2StreamMultiplexer(
  166. mode: .client,
  167. channel: channel,
  168. inboundStreamInitializer: nil
  169. )
  170. try channel.pipeline.addHandler(
  171. GRPCIdleHandler(
  172. connectionManager: manager,
  173. multiplexer: h2mux,
  174. idleTimeout: .minutes(5),
  175. keepalive: .init(),
  176. logger: self.logger
  177. )
  178. ).wait()
  179. channelPromise.succeed(channel)
  180. XCTAssertNoThrow(
  181. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  182. .wait()
  183. )
  184. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  185. try self.waitForStateChange(from: .connecting, to: .ready) {
  186. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  187. XCTAssertNoThrow(try channel.writeInbound(frame))
  188. // Wait for the multiplexer, it _must_ be ready now.
  189. XCTAssertNoThrow(try readyChannelMux.wait())
  190. }
  191. // Go idle. This will shutdown the channel.
  192. try self.waitForStateChange(from: .ready, to: .idle) {
  193. self.loop.advanceTime(by: .minutes(5))
  194. XCTAssertNoThrow(try channel.closeFuture.wait())
  195. }
  196. // Now shutdown.
  197. try self.waitForStateChange(from: .idle, to: .shutdown) {
  198. let shutdown = manager.shutdown()
  199. self.loop.run()
  200. XCTAssertNoThrow(try shutdown.wait())
  201. }
  202. }
  203. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  204. let channelPromise = self.loop.makePromise(of: Channel.self)
  205. let manager = ConnectionManager.testingOnly(
  206. configuration: self.defaultConfiguration,
  207. logger: self.logger
  208. ) {
  209. channelPromise.futureResult
  210. }
  211. // Start the connection.
  212. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  213. .waitForStateChange(from: .idle, to: .connecting) {
  214. let readyChannelMux = manager.getHTTP2Multiplexer()
  215. self.loop.run()
  216. return readyChannelMux
  217. }
  218. // Setup the channel.
  219. let channel = EmbeddedChannel(loop: self.loop)
  220. let h2mux = HTTP2StreamMultiplexer(
  221. mode: .client,
  222. channel: channel,
  223. inboundStreamInitializer: nil
  224. )
  225. try channel.pipeline.addHandler(
  226. GRPCIdleHandler(
  227. connectionManager: manager,
  228. multiplexer: h2mux,
  229. idleTimeout: .minutes(5),
  230. keepalive: .init(),
  231. logger: self.logger
  232. )
  233. ).wait()
  234. channelPromise.succeed(channel)
  235. XCTAssertNoThrow(
  236. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  237. .wait()
  238. )
  239. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  240. try self.waitForStateChange(from: .connecting, to: .ready) {
  241. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  242. XCTAssertNoThrow(try channel.writeInbound(frame))
  243. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  244. XCTAssertNoThrow(try readyChannelMux.wait())
  245. }
  246. // "create" a stream; the details don't matter here.
  247. let streamCreated = NIOHTTP2StreamCreatedEvent(
  248. streamID: 1,
  249. localInitialWindowSize: nil,
  250. remoteInitialWindowSize: nil
  251. )
  252. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  253. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  254. self.loop.advanceTime(by: .minutes(5))
  255. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  256. self.waitForStateChange(from: .ready, to: .idle) {
  257. // Close the stream.
  258. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  259. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  260. // ... wait for the idle timeout,
  261. self.loop.advanceTime(by: .minutes(5))
  262. }
  263. // Now shutdown.
  264. try self.waitForStateChange(from: .idle, to: .shutdown) {
  265. let shutdown = manager.shutdown()
  266. self.loop.run()
  267. XCTAssertNoThrow(try shutdown.wait())
  268. }
  269. }
  270. func testConnectAndThenBecomeInactive() throws {
  271. let channelPromise = self.loop.makePromise(of: Channel.self)
  272. let manager = ConnectionManager.testingOnly(
  273. configuration: self.defaultConfiguration,
  274. logger: self.logger
  275. ) {
  276. channelPromise.futureResult
  277. }
  278. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  279. .waitForStateChange(from: .idle, to: .connecting) {
  280. let readyChannelMux = manager.getHTTP2Multiplexer()
  281. self.loop.run()
  282. return readyChannelMux
  283. }
  284. // Setup the channel.
  285. let channel = EmbeddedChannel(loop: self.loop)
  286. let h2mux = HTTP2StreamMultiplexer(
  287. mode: .client,
  288. channel: channel,
  289. inboundStreamInitializer: nil
  290. )
  291. try channel.pipeline.addHandler(
  292. GRPCIdleHandler(
  293. connectionManager: manager,
  294. multiplexer: h2mux,
  295. idleTimeout: .minutes(5),
  296. keepalive: .init(),
  297. logger: self.logger
  298. )
  299. ).wait()
  300. channelPromise.succeed(channel)
  301. XCTAssertNoThrow(
  302. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  303. .wait()
  304. )
  305. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  306. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  307. let shutdown = manager.shutdown()
  308. self.loop.run()
  309. XCTAssertNoThrow(try shutdown.wait())
  310. }
  311. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  312. // the `readyChannelMux` should error.
  313. XCTAssertThrowsError(try readyChannelMux.wait())
  314. }
  315. func testConnectOnSecondAttempt() throws {
  316. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  317. let channelFutures: [EventLoopFuture<Channel>] = [
  318. self.loop.makeFailedFuture(DoomedChannelError()),
  319. channelPromise.futureResult,
  320. ]
  321. var channelFutureIterator = channelFutures.makeIterator()
  322. var configuration = self.defaultConfiguration
  323. configuration.connectionBackoff = .oneSecondFixed
  324. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  325. guard let next = channelFutureIterator.next() else {
  326. XCTFail("Too many channels requested")
  327. return self.loop.makeFailedFuture(DoomedChannelError())
  328. }
  329. return next
  330. }
  331. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  332. Change(from: .idle, to: .connecting),
  333. Change(from: .connecting, to: .transientFailure),
  334. ]) {
  335. // Get a HTTP/2 stream multiplexer.
  336. let readyChannelMux = manager.getHTTP2Multiplexer()
  337. self.loop.run()
  338. return readyChannelMux
  339. }
  340. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  341. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  342. self.loop.run()
  343. // Move time forwards by a second to start the next connection attempt.
  344. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  345. self.loop.advanceTime(by: .seconds(1))
  346. }
  347. // Setup the actual channel and complete the promise.
  348. let channel = EmbeddedChannel(loop: self.loop)
  349. let h2mux = HTTP2StreamMultiplexer(
  350. mode: .client,
  351. channel: channel,
  352. inboundStreamInitializer: nil
  353. )
  354. try channel.pipeline.addHandler(
  355. GRPCIdleHandler(
  356. connectionManager: manager,
  357. multiplexer: h2mux,
  358. idleTimeout: .minutes(5),
  359. keepalive: .init(),
  360. logger: self.logger
  361. )
  362. ).wait()
  363. channelPromise.succeed(channel)
  364. XCTAssertNoThrow(
  365. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  366. .wait()
  367. )
  368. // Write a SETTINGS frame on the root stream.
  369. try self.waitForStateChange(from: .connecting, to: .ready) {
  370. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  371. XCTAssertNoThrow(try channel.writeInbound(frame))
  372. }
  373. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  374. XCTAssertNoThrow(try readyChannelMux.wait())
  375. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  376. // Now shutdown.
  377. try self.waitForStateChange(from: .ready, to: .shutdown) {
  378. let shutdown = manager.shutdown()
  379. self.loop.run()
  380. XCTAssertNoThrow(try shutdown.wait())
  381. }
  382. }
  383. func testShutdownWhileConnecting() throws {
  384. let channelPromise = self.loop.makePromise(of: Channel.self)
  385. let manager = ConnectionManager.testingOnly(
  386. configuration: self.defaultConfiguration,
  387. logger: self.logger
  388. ) {
  389. channelPromise.futureResult
  390. }
  391. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  392. .waitForStateChange(from: .idle, to: .connecting) {
  393. let readyChannelMux = manager.getHTTP2Multiplexer()
  394. self.loop.run()
  395. return readyChannelMux
  396. }
  397. // Now shutdown.
  398. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  399. let shutdown = manager.shutdown()
  400. self.loop.run()
  401. XCTAssertNoThrow(try shutdown.wait())
  402. }
  403. // The multiplexer we were requesting should fail.
  404. XCTAssertThrowsError(try readyChannelMux.wait())
  405. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  406. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  407. let channel = try channelPromise.futureResult.wait()
  408. self.loop.run()
  409. XCTAssertNoThrow(try channel.closeFuture.wait())
  410. }
  411. func testShutdownWhileTransientFailure() throws {
  412. var configuration = self.defaultConfiguration
  413. configuration.connectionBackoff = .oneSecondFixed
  414. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  415. self.loop.makeFailedFuture(DoomedChannelError())
  416. }
  417. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  418. Change(from: .idle, to: .connecting),
  419. Change(from: .connecting, to: .transientFailure),
  420. ]) {
  421. // Get a HTTP/2 stream multiplexer.
  422. let readyChannelMux = manager.getHTTP2Multiplexer()
  423. self.loop.run()
  424. return readyChannelMux
  425. }
  426. // Now shutdown.
  427. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  428. let shutdown = manager.shutdown()
  429. self.loop.run()
  430. XCTAssertNoThrow(try shutdown.wait())
  431. }
  432. // The HTTP/2 stream mux we were requesting should fail.
  433. XCTAssertThrowsError(try readyChannelMux.wait())
  434. }
  435. func testShutdownWhileActive() throws {
  436. let channelPromise = self.loop.makePromise(of: Channel.self)
  437. let manager = ConnectionManager.testingOnly(
  438. configuration: self.defaultConfiguration,
  439. logger: self.logger
  440. ) {
  441. channelPromise.futureResult
  442. }
  443. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  444. .waitForStateChange(from: .idle, to: .connecting) {
  445. let readyChannelMux = manager.getHTTP2Multiplexer()
  446. self.loop.run()
  447. return readyChannelMux
  448. }
  449. // Prepare the channel
  450. let channel = EmbeddedChannel(loop: self.loop)
  451. let h2mux = HTTP2StreamMultiplexer(
  452. mode: .client,
  453. channel: channel,
  454. inboundStreamInitializer: nil
  455. )
  456. try channel.pipeline.addHandler(
  457. GRPCIdleHandler(
  458. connectionManager: manager,
  459. multiplexer: h2mux,
  460. idleTimeout: .minutes(5),
  461. keepalive: .init(),
  462. logger: self.logger
  463. )
  464. ).wait()
  465. channelPromise.succeed(channel)
  466. XCTAssertNoThrow(
  467. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  468. .wait()
  469. )
  470. // (No state change expected here: active is an internal state.)
  471. // Now shutdown.
  472. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  473. let shutdown = manager.shutdown()
  474. self.loop.run()
  475. XCTAssertNoThrow(try shutdown.wait())
  476. }
  477. // The HTTP/2 stream multiplexer we were requesting should fail.
  478. XCTAssertThrowsError(try readyChannelMux.wait())
  479. }
  480. func testShutdownWhileShutdown() throws {
  481. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  482. try self.waitForStateChange(from: .idle, to: .shutdown) {
  483. let firstShutdown = manager.shutdown()
  484. self.loop.run()
  485. XCTAssertNoThrow(try firstShutdown.wait())
  486. }
  487. let secondShutdown = manager.shutdown()
  488. self.loop.run()
  489. XCTAssertNoThrow(try secondShutdown.wait())
  490. }
  491. func testTransientFailureWhileActive() throws {
  492. var configuration = self.defaultConfiguration
  493. configuration.connectionBackoff = .oneSecondFixed
  494. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  495. let channelFutures: [EventLoopFuture<Channel>] = [
  496. channelPromise.futureResult,
  497. self.loop.makeFailedFuture(DoomedChannelError()),
  498. ]
  499. var channelFutureIterator = channelFutures.makeIterator()
  500. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  501. guard let next = channelFutureIterator.next() else {
  502. XCTFail("Too many channels requested")
  503. return self.loop.makeFailedFuture(DoomedChannelError())
  504. }
  505. return next
  506. }
  507. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  508. .waitForStateChange(from: .idle, to: .connecting) {
  509. let readyChannelMux = manager.getHTTP2Multiplexer()
  510. self.loop.run()
  511. return readyChannelMux
  512. }
  513. // Prepare the channel
  514. let firstChannel = EmbeddedChannel(loop: self.loop)
  515. let h2mux = HTTP2StreamMultiplexer(
  516. mode: .client,
  517. channel: firstChannel,
  518. inboundStreamInitializer: nil
  519. )
  520. try firstChannel.pipeline.addHandler(
  521. GRPCIdleHandler(
  522. connectionManager: manager,
  523. multiplexer: h2mux,
  524. idleTimeout: .minutes(5),
  525. keepalive: .init(),
  526. logger: self.logger
  527. )
  528. ).wait()
  529. channelPromise.succeed(firstChannel)
  530. XCTAssertNoThrow(
  531. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  532. .wait()
  533. )
  534. // (No state change expected here: active is an internal state.)
  535. // Close the channel (simulate e.g. TLS handshake failed)
  536. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  537. XCTAssertNoThrow(try firstChannel.close().wait())
  538. }
  539. // Start connecting again.
  540. self.waitForStateChanges([
  541. Change(from: .transientFailure, to: .connecting),
  542. Change(from: .connecting, to: .transientFailure),
  543. ]) {
  544. self.loop.advanceTime(by: .seconds(1))
  545. }
  546. // Now shutdown
  547. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  548. let shutdown = manager.shutdown()
  549. self.loop.run()
  550. XCTAssertNoThrow(try shutdown.wait())
  551. }
  552. // The channel never came up: it should be throw.
  553. XCTAssertThrowsError(try readyChannelMux.wait())
  554. }
  555. func testTransientFailureWhileReady() throws {
  556. var configuration = self.defaultConfiguration
  557. configuration.connectionBackoff = .oneSecondFixed
  558. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  559. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  560. let channelFutures: [EventLoopFuture<Channel>] = [
  561. firstChannelPromise.futureResult,
  562. secondChannelPromise.futureResult,
  563. ]
  564. var channelFutureIterator = channelFutures.makeIterator()
  565. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  566. guard let next = channelFutureIterator.next() else {
  567. XCTFail("Too many channels requested")
  568. return self.loop.makeFailedFuture(DoomedChannelError())
  569. }
  570. return next
  571. }
  572. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  573. .waitForStateChange(from: .idle, to: .connecting) {
  574. let readyChannelMux = manager.getHTTP2Multiplexer()
  575. self.loop.run()
  576. return readyChannelMux
  577. }
  578. // Prepare the first channel
  579. let firstChannel = EmbeddedChannel(loop: self.loop)
  580. let firstH2mux = HTTP2StreamMultiplexer(
  581. mode: .client,
  582. channel: firstChannel,
  583. inboundStreamInitializer: nil
  584. )
  585. try firstChannel.pipeline.addHandler(
  586. GRPCIdleHandler(
  587. connectionManager: manager,
  588. multiplexer: firstH2mux,
  589. idleTimeout: .minutes(5),
  590. keepalive: .init(),
  591. logger: self.logger
  592. )
  593. ).wait()
  594. firstChannelPromise.succeed(firstChannel)
  595. XCTAssertNoThrow(
  596. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  597. .wait()
  598. )
  599. // Write a SETTINGS frame on the root stream.
  600. try self.waitForStateChange(from: .connecting, to: .ready) {
  601. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  602. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  603. }
  604. // Channel should now be ready.
  605. XCTAssertNoThrow(try readyChannelMux.wait())
  606. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  607. let streamCreated = NIOHTTP2StreamCreatedEvent(
  608. streamID: 1,
  609. localInitialWindowSize: nil,
  610. remoteInitialWindowSize: nil
  611. )
  612. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  613. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  614. XCTAssertNoThrow(try firstChannel.close().wait())
  615. }
  616. // Run to start connecting again.
  617. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  618. self.loop.advanceTime(by: .seconds(1))
  619. }
  620. // Prepare the second channel
  621. let secondChannel = EmbeddedChannel(loop: self.loop)
  622. let secondH2mux = HTTP2StreamMultiplexer(
  623. mode: .client,
  624. channel: secondChannel,
  625. inboundStreamInitializer: nil
  626. )
  627. try secondChannel.pipeline.addHandler(
  628. GRPCIdleHandler(
  629. connectionManager: manager,
  630. multiplexer: secondH2mux,
  631. idleTimeout: .minutes(5),
  632. keepalive: .init(),
  633. logger: self.logger
  634. )
  635. ).wait()
  636. secondChannelPromise.succeed(secondChannel)
  637. XCTAssertNoThrow(
  638. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  639. .wait()
  640. )
  641. // Write a SETTINGS frame on the root stream.
  642. try self.waitForStateChange(from: .connecting, to: .ready) {
  643. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  644. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  645. }
  646. // Now shutdown
  647. try self.waitForStateChange(from: .ready, to: .shutdown) {
  648. let shutdown = manager.shutdown()
  649. self.loop.run()
  650. XCTAssertNoThrow(try shutdown.wait())
  651. }
  652. }
  653. func testGoAwayWhenReady() throws {
  654. let channelPromise = self.loop.makePromise(of: Channel.self)
  655. let manager = ConnectionManager.testingOnly(
  656. configuration: self.defaultConfiguration,
  657. logger: self.logger
  658. ) {
  659. channelPromise.futureResult
  660. }
  661. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  662. .waitForStateChange(from: .idle, to: .connecting) {
  663. let readyChannelMux = manager.getHTTP2Multiplexer()
  664. self.loop.run()
  665. return readyChannelMux
  666. }
  667. // Setup the channel.
  668. let channel = EmbeddedChannel(loop: self.loop)
  669. let h2mux = HTTP2StreamMultiplexer(
  670. mode: .client,
  671. channel: channel,
  672. inboundStreamInitializer: nil
  673. )
  674. try channel.pipeline.addHandler(
  675. GRPCIdleHandler(
  676. connectionManager: manager,
  677. multiplexer: h2mux,
  678. idleTimeout: .minutes(5),
  679. keepalive: .init(),
  680. logger: self.logger
  681. )
  682. ).wait()
  683. channelPromise.succeed(channel)
  684. XCTAssertNoThrow(
  685. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  686. .wait()
  687. )
  688. try self.waitForStateChange(from: .connecting, to: .ready) {
  689. // Write a SETTINGS frame on the root stream.
  690. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  691. XCTAssertNoThrow(try channel.writeInbound(frame))
  692. }
  693. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  694. XCTAssertNoThrow(try readyChannelMux.wait())
  695. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  696. // channel to close.
  697. try self.waitForStateChange(from: .ready, to: .idle) {
  698. let goAway = HTTP2Frame(
  699. streamID: .rootStream,
  700. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  701. )
  702. XCTAssertNoThrow(try channel.writeInbound(goAway))
  703. }
  704. self.loop.run()
  705. XCTAssertNoThrow(try channel.closeFuture.wait())
  706. // Now shutdown
  707. try self.waitForStateChange(from: .idle, to: .shutdown) {
  708. let shutdown = manager.shutdown()
  709. self.loop.run()
  710. XCTAssertNoThrow(try shutdown.wait())
  711. }
  712. }
  713. func testDoomedOptimisticChannelFromIdle() {
  714. var configuration = self.defaultConfiguration
  715. configuration.callStartBehavior = .fastFailure
  716. let manager = ConnectionManager.testingOnly(
  717. configuration: configuration,
  718. logger: self.logger
  719. ) {
  720. self.loop.makeFailedFuture(DoomedChannelError())
  721. }
  722. let candidate = manager.getHTTP2Multiplexer()
  723. self.loop.run()
  724. XCTAssertThrowsError(try candidate.wait())
  725. }
  726. func testDoomedOptimisticChannelFromConnecting() throws {
  727. var configuration = self.defaultConfiguration
  728. configuration.callStartBehavior = .fastFailure
  729. let promise = self.loop.makePromise(of: Channel.self)
  730. let manager = ConnectionManager.testingOnly(
  731. configuration: configuration,
  732. logger: self.logger
  733. ) {
  734. promise.futureResult
  735. }
  736. self.waitForStateChange(from: .idle, to: .connecting) {
  737. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  738. _ = manager.getHTTP2Multiplexer()
  739. self.loop.run()
  740. }
  741. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  742. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  743. self.loop.run()
  744. // Fail the promise.
  745. promise.fail(DoomedChannelError())
  746. XCTAssertThrowsError(try optimisticChannelMux.wait())
  747. }
  748. func testOptimisticChannelFromTransientFailure() throws {
  749. var configuration = self.defaultConfiguration
  750. configuration.callStartBehavior = .fastFailure
  751. configuration.connectionBackoff = ConnectionBackoff()
  752. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  753. self.loop.makeFailedFuture(DoomedChannelError())
  754. }
  755. self.waitForStateChanges([
  756. Change(from: .idle, to: .connecting),
  757. Change(from: .connecting, to: .transientFailure),
  758. ]) {
  759. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  760. _ = manager.getHTTP2Multiplexer()
  761. self.loop.run()
  762. }
  763. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  764. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  765. self.loop.run()
  766. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  767. XCTAssertTrue(error is DoomedChannelError)
  768. }
  769. }
  770. func testOptimisticChannelFromShutdown() throws {
  771. var configuration = self.defaultConfiguration
  772. configuration.callStartBehavior = .fastFailure
  773. let manager = ConnectionManager.testingOnly(
  774. configuration: configuration,
  775. logger: self.logger
  776. ) {
  777. self.loop.makeFailedFuture(DoomedChannelError())
  778. }
  779. let shutdown = manager.shutdown()
  780. self.loop.run()
  781. XCTAssertNoThrow(try shutdown.wait())
  782. // Get a channel optimistically. It'll fail, obviously.
  783. let channelMux = manager.getHTTP2Multiplexer()
  784. self.loop.run()
  785. XCTAssertThrowsError(try channelMux.wait())
  786. }
  787. func testForceIdleAfterInactive() throws {
  788. let channelPromise = self.loop.makePromise(of: Channel.self)
  789. let manager = ConnectionManager.testingOnly(
  790. configuration: self.defaultConfiguration,
  791. logger: self.logger
  792. ) {
  793. channelPromise.futureResult
  794. }
  795. // Start the connection.
  796. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  797. from: .idle,
  798. to: .connecting
  799. ) {
  800. let readyChannelMux = manager.getHTTP2Multiplexer()
  801. self.loop.run()
  802. return readyChannelMux
  803. }
  804. // Setup the real channel and activate it.
  805. let channel = EmbeddedChannel(loop: self.loop)
  806. let h2mux = HTTP2StreamMultiplexer(
  807. mode: .client,
  808. channel: channel,
  809. inboundStreamInitializer: nil
  810. )
  811. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  812. GRPCIdleHandler(
  813. connectionManager: manager,
  814. multiplexer: h2mux,
  815. idleTimeout: .minutes(5),
  816. keepalive: .init(),
  817. logger: self.logger
  818. ),
  819. ]).wait())
  820. channelPromise.succeed(channel)
  821. self.loop.run()
  822. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  823. XCTAssertNoThrow(try connect.wait())
  824. // Write a SETTINGS frame on the root stream.
  825. try self.waitForStateChange(from: .connecting, to: .ready) {
  826. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  827. XCTAssertNoThrow(try channel.writeInbound(frame))
  828. }
  829. // The channel should now be ready.
  830. XCTAssertNoThrow(try readyChannelMux.wait())
  831. // Now drop the connection.
  832. try self.waitForStateChange(from: .ready, to: .shutdown) {
  833. let shutdown = manager.shutdown()
  834. self.loop.run()
  835. XCTAssertNoThrow(try shutdown.wait())
  836. }
  837. }
  838. func testCloseWithoutActiveRPCs() throws {
  839. let channelPromise = self.loop.makePromise(of: Channel.self)
  840. let manager = ConnectionManager.testingOnly(
  841. configuration: self.defaultConfiguration,
  842. logger: self.logger
  843. ) {
  844. channelPromise.futureResult
  845. }
  846. // Start the connection.
  847. let readyChannelMux = self.waitForStateChange(
  848. from: .idle,
  849. to: .connecting
  850. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  851. let readyChannelMux = manager.getHTTP2Multiplexer()
  852. self.loop.run()
  853. return readyChannelMux
  854. }
  855. // Setup the actual channel and activate it.
  856. let channel = EmbeddedChannel(loop: self.loop)
  857. let h2mux = HTTP2StreamMultiplexer(
  858. mode: .client,
  859. channel: channel,
  860. inboundStreamInitializer: nil
  861. )
  862. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  863. GRPCIdleHandler(
  864. connectionManager: manager,
  865. multiplexer: h2mux,
  866. idleTimeout: .minutes(5),
  867. keepalive: .init(),
  868. logger: self.logger
  869. ),
  870. ]).wait())
  871. channelPromise.succeed(channel)
  872. self.loop.run()
  873. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  874. XCTAssertNoThrow(try connect.wait())
  875. // "ready" the connection.
  876. try self.waitForStateChange(from: .connecting, to: .ready) {
  877. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  878. XCTAssertNoThrow(try channel.writeInbound(frame))
  879. }
  880. // The HTTP/2 stream multiplexer should now be ready.
  881. XCTAssertNoThrow(try readyChannelMux.wait())
  882. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  883. // failure state.
  884. self.waitForStateChange(from: .ready, to: .idle) {
  885. channel.pipeline.fireChannelInactive()
  886. }
  887. }
  888. }
  889. internal struct Change: Hashable, CustomStringConvertible {
  890. var from: ConnectivityState
  891. var to: ConnectivityState
  892. var description: String {
  893. return "\(self.from) → \(self.to)"
  894. }
  895. }
  896. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  897. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  898. private let semaphore = DispatchSemaphore(value: 0)
  899. private var expectation: Expectation = .noExpectation
  900. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  901. private enum Expectation {
  902. /// We have no expectation of any changes. We'll just ignore any changes.
  903. case noExpectation
  904. /// We expect one change.
  905. case one((Change) -> Void)
  906. /// We expect 'count' changes.
  907. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  908. var count: Int {
  909. switch self {
  910. case .noExpectation:
  911. return 0
  912. case .one:
  913. return 1
  914. case let .some(count, _, _):
  915. return count
  916. }
  917. }
  918. }
  919. func connectivityStateDidChange(from oldState: ConnectivityState,
  920. to newState: ConnectivityState) {
  921. self.serialQueue.async {
  922. switch self.expectation {
  923. case let .one(verify):
  924. // We don't care about future changes.
  925. self.expectation = .noExpectation
  926. // Verify and notify.
  927. verify(Change(from: oldState, to: newState))
  928. self.semaphore.signal()
  929. case .some(let count, var recorded, let verify):
  930. recorded.append(Change(from: oldState, to: newState))
  931. if recorded.count == count {
  932. // We don't care about future changes.
  933. self.expectation = .noExpectation
  934. // Verify and notify.
  935. verify(recorded)
  936. self.semaphore.signal()
  937. } else {
  938. // Still need more responses.
  939. self.expectation = .some(count: count, recorded: recorded, verify)
  940. }
  941. case .noExpectation:
  942. // Ignore any changes.
  943. ()
  944. }
  945. }
  946. }
  947. func connectionStartedQuiescing() {
  948. self.serialQueue.async {
  949. self.quiescingSemaphore.signal()
  950. }
  951. }
  952. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  953. self.serialQueue.async {
  954. self.expectation = .some(count: count, recorded: [], verify)
  955. }
  956. }
  957. func expectChange(verify: @escaping (Change) -> Void) {
  958. self.serialQueue.async {
  959. self.expectation = .one(verify)
  960. }
  961. }
  962. func waitForExpectedChanges(
  963. timeout: DispatchTimeInterval,
  964. file: StaticString = #file,
  965. line: UInt = #line
  966. ) {
  967. let result = self.semaphore.wait(timeout: .now() + timeout)
  968. switch result {
  969. case .success:
  970. ()
  971. case .timedOut:
  972. XCTFail(
  973. "Timed out before verifying \(self.expectation.count) change(s)",
  974. file: file, line: line
  975. )
  976. }
  977. }
  978. func waitForQuiescing(timeout: DispatchTimeInterval) {
  979. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  980. switch result {
  981. case .success:
  982. ()
  983. case .timedOut:
  984. XCTFail("Timed out waiting for connection to start quiescing")
  985. }
  986. }
  987. }
  988. private extension ConnectionBackoff {
  989. static let oneSecondFixed = ConnectionBackoff(
  990. initialBackoff: 1.0,
  991. maximumBackoff: 1.0,
  992. multiplier: 1.0,
  993. jitter: 0.0
  994. )
  995. }
  996. private struct DoomedChannelError: Error {}