ConnectionManagerTests.swift 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Logging
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPC
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #filePath,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #filePath,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> =
  105. self
  106. .waitForStateChange(from: .idle, to: .connecting) {
  107. let channel = manager.getHTTP2Multiplexer()
  108. self.loop.run()
  109. return channel
  110. }
  111. self.waitForStateChange(from: .connecting, to: .shutdown) {
  112. channelPromise.fail(DoomedChannelError())
  113. }
  114. XCTAssertThrowsError(try multiplexer.wait()) {
  115. XCTAssertTrue($0 is DoomedChannelError)
  116. }
  117. }
  118. func testConnectAndDisconnect() throws {
  119. let channelPromise = self.loop.makePromise(of: Channel.self)
  120. let manager = self.makeConnectionManager { _, _ in
  121. return channelPromise.futureResult
  122. }
  123. // Start the connection.
  124. self.waitForStateChange(from: .idle, to: .connecting) {
  125. _ = manager.getHTTP2Multiplexer()
  126. self.loop.run()
  127. }
  128. // Setup the real channel and activate it.
  129. let channel = EmbeddedChannel(loop: self.loop)
  130. let h2mux = HTTP2StreamMultiplexer(
  131. mode: .client,
  132. channel: channel,
  133. inboundStreamInitializer: nil
  134. )
  135. try channel.pipeline.addHandler(
  136. GRPCIdleHandler(
  137. connectionManager: manager,
  138. multiplexer: h2mux,
  139. idleTimeout: .minutes(5),
  140. keepalive: .init(),
  141. logger: self.logger
  142. )
  143. ).wait()
  144. channelPromise.succeed(channel)
  145. XCTAssertNoThrow(
  146. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  147. .wait()
  148. )
  149. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  150. try self.waitForStateChange(from: .connecting, to: .ready) {
  151. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  152. XCTAssertNoThrow(try channel.writeInbound(frame))
  153. }
  154. // Close the channel.
  155. try self.waitForStateChange(from: .ready, to: .shutdown) {
  156. // Now the channel should be available: shut it down.
  157. let shutdown = manager.shutdown()
  158. self.loop.run()
  159. XCTAssertNoThrow(try shutdown.wait())
  160. }
  161. }
  162. func testConnectAndIdle() throws {
  163. let channelPromise = self.loop.makePromise(of: Channel.self)
  164. let manager = self.makeConnectionManager { _, _ in
  165. return channelPromise.futureResult
  166. }
  167. // Start the connection.
  168. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  169. self
  170. .waitForStateChange(from: .idle, to: .connecting) {
  171. let readyChannelMux = manager.getHTTP2Multiplexer()
  172. self.loop.run()
  173. return readyChannelMux
  174. }
  175. // Setup the channel.
  176. let channel = EmbeddedChannel(loop: self.loop)
  177. let h2mux = HTTP2StreamMultiplexer(
  178. mode: .client,
  179. channel: channel,
  180. inboundStreamInitializer: nil
  181. )
  182. try channel.pipeline.addHandler(
  183. GRPCIdleHandler(
  184. connectionManager: manager,
  185. multiplexer: h2mux,
  186. idleTimeout: .minutes(5),
  187. keepalive: .init(),
  188. logger: self.logger
  189. )
  190. ).wait()
  191. channelPromise.succeed(channel)
  192. XCTAssertNoThrow(
  193. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  194. .wait()
  195. )
  196. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  197. try self.waitForStateChange(from: .connecting, to: .ready) {
  198. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  199. XCTAssertNoThrow(try channel.writeInbound(frame))
  200. // Wait for the multiplexer, it _must_ be ready now.
  201. XCTAssertNoThrow(try readyChannelMux.wait())
  202. }
  203. // Go idle. This will shutdown the channel.
  204. try self.waitForStateChange(from: .ready, to: .idle) {
  205. self.loop.advanceTime(by: .minutes(5))
  206. XCTAssertNoThrow(try channel.closeFuture.wait())
  207. }
  208. // Now shutdown.
  209. try self.waitForStateChange(from: .idle, to: .shutdown) {
  210. let shutdown = manager.shutdown()
  211. self.loop.run()
  212. XCTAssertNoThrow(try shutdown.wait())
  213. }
  214. }
  215. func testChannelInactiveBeforeActiveWithNoReconnect() throws {
  216. let channel = EmbeddedChannel(loop: self.loop)
  217. let channelPromise = self.loop.makePromise(of: Channel.self)
  218. let manager = self.makeConnectionManager { _, _ in
  219. return channelPromise.futureResult
  220. }
  221. // Start the connection.
  222. self.waitForStateChange(from: .idle, to: .connecting) {
  223. // Triggers the connect.
  224. _ = manager.getHTTP2Multiplexer()
  225. self.loop.run()
  226. }
  227. try channel.pipeline.syncOperations.addHandler(
  228. GRPCIdleHandler(
  229. connectionManager: manager,
  230. multiplexer: HTTP2StreamMultiplexer(
  231. mode: .client,
  232. channel: channel,
  233. inboundStreamInitializer: nil
  234. ),
  235. idleTimeout: .minutes(5),
  236. keepalive: .init(),
  237. logger: self.logger
  238. )
  239. )
  240. channelPromise.succeed(channel)
  241. // Oops: wrong way around. We should tolerate this.
  242. self.waitForStateChange(from: .connecting, to: .shutdown) {
  243. channel.pipeline.fireChannelInactive()
  244. }
  245. // Should be ignored.
  246. channel.pipeline.fireChannelActive()
  247. }
  248. func testChannelInactiveBeforeActiveWillReconnect() throws {
  249. var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
  250. var channelPromises: [EventLoopPromise<Channel>] = [
  251. self.loop.makePromise(),
  252. self.loop.makePromise(),
  253. ]
  254. var channelFutures = Array(channelPromises.map { $0.futureResult })
  255. var configuration = self.defaultConfiguration
  256. configuration.connectionBackoff = .oneSecondFixed
  257. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  258. return channelFutures.removeLast()
  259. }
  260. // Start the connection.
  261. self.waitForStateChange(from: .idle, to: .connecting) {
  262. // Triggers the connect.
  263. _ = manager.getHTTP2Multiplexer()
  264. self.loop.run()
  265. }
  266. // Setup the channel.
  267. let channel1 = channels.removeLast()
  268. let channel1Promise = channelPromises.removeLast()
  269. try channel1.pipeline.syncOperations.addHandler(
  270. GRPCIdleHandler(
  271. connectionManager: manager,
  272. multiplexer: HTTP2StreamMultiplexer(
  273. mode: .client,
  274. channel: channel1,
  275. inboundStreamInitializer: nil
  276. ),
  277. idleTimeout: .minutes(5),
  278. keepalive: .init(),
  279. logger: self.logger
  280. )
  281. )
  282. channel1Promise.succeed(channel1)
  283. // Oops: wrong way around. We should tolerate this.
  284. self.waitForStateChange(from: .connecting, to: .transientFailure) {
  285. channel1.pipeline.fireChannelInactive()
  286. }
  287. channel1.pipeline.fireChannelActive()
  288. // Start the next attempt.
  289. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  290. self.loop.advanceTime(by: .seconds(1))
  291. }
  292. let channel2 = channels.removeLast()
  293. let channel2Promise = channelPromises.removeLast()
  294. try channel2.pipeline.syncOperations.addHandler(
  295. GRPCIdleHandler(
  296. connectionManager: manager,
  297. multiplexer: HTTP2StreamMultiplexer(
  298. mode: .client,
  299. channel: channel1,
  300. inboundStreamInitializer: nil
  301. ),
  302. idleTimeout: .minutes(5),
  303. keepalive: .init(),
  304. logger: self.logger
  305. )
  306. )
  307. channel2Promise.succeed(channel2)
  308. try self.waitForStateChange(from: .connecting, to: .ready) {
  309. channel2.pipeline.fireChannelActive()
  310. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  311. XCTAssertNoThrow(try channel2.writeInbound(frame))
  312. }
  313. }
  314. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  315. let channelPromise = self.loop.makePromise(of: Channel.self)
  316. let manager = self.makeConnectionManager { _, _ in
  317. return channelPromise.futureResult
  318. }
  319. // Start the connection.
  320. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  321. self
  322. .waitForStateChange(from: .idle, to: .connecting) {
  323. let readyChannelMux = manager.getHTTP2Multiplexer()
  324. self.loop.run()
  325. return readyChannelMux
  326. }
  327. // Setup the channel.
  328. let channel = EmbeddedChannel(loop: self.loop)
  329. let h2mux = HTTP2StreamMultiplexer(
  330. mode: .client,
  331. channel: channel,
  332. inboundStreamInitializer: nil
  333. )
  334. try channel.pipeline.addHandler(
  335. GRPCIdleHandler(
  336. connectionManager: manager,
  337. multiplexer: h2mux,
  338. idleTimeout: .minutes(5),
  339. keepalive: .init(),
  340. logger: self.logger
  341. )
  342. ).wait()
  343. channelPromise.succeed(channel)
  344. XCTAssertNoThrow(
  345. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  346. .wait()
  347. )
  348. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  349. try self.waitForStateChange(from: .connecting, to: .ready) {
  350. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  351. XCTAssertNoThrow(try channel.writeInbound(frame))
  352. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  353. XCTAssertNoThrow(try readyChannelMux.wait())
  354. }
  355. // "create" a stream; the details don't matter here.
  356. let streamCreated = NIOHTTP2StreamCreatedEvent(
  357. streamID: 1,
  358. localInitialWindowSize: nil,
  359. remoteInitialWindowSize: nil
  360. )
  361. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  362. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  363. self.loop.advanceTime(by: .minutes(5))
  364. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  365. self.waitForStateChange(from: .ready, to: .idle) {
  366. // Close the stream.
  367. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  368. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  369. // ... wait for the idle timeout,
  370. self.loop.advanceTime(by: .minutes(5))
  371. }
  372. // Now shutdown.
  373. try self.waitForStateChange(from: .idle, to: .shutdown) {
  374. let shutdown = manager.shutdown()
  375. self.loop.run()
  376. XCTAssertNoThrow(try shutdown.wait())
  377. }
  378. }
  379. func testConnectAndThenBecomeInactive() throws {
  380. let channelPromise = self.loop.makePromise(of: Channel.self)
  381. let manager = self.makeConnectionManager { _, _ in
  382. return channelPromise.futureResult
  383. }
  384. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  385. self
  386. .waitForStateChange(from: .idle, to: .connecting) {
  387. let readyChannelMux = manager.getHTTP2Multiplexer()
  388. self.loop.run()
  389. return readyChannelMux
  390. }
  391. // Setup the channel.
  392. let channel = EmbeddedChannel(loop: self.loop)
  393. let h2mux = HTTP2StreamMultiplexer(
  394. mode: .client,
  395. channel: channel,
  396. inboundStreamInitializer: nil
  397. )
  398. try channel.pipeline.addHandler(
  399. GRPCIdleHandler(
  400. connectionManager: manager,
  401. multiplexer: h2mux,
  402. idleTimeout: .minutes(5),
  403. keepalive: .init(),
  404. logger: self.logger
  405. )
  406. ).wait()
  407. channelPromise.succeed(channel)
  408. XCTAssertNoThrow(
  409. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  410. .wait()
  411. )
  412. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  413. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  414. let shutdown = manager.shutdown()
  415. self.loop.run()
  416. XCTAssertNoThrow(try shutdown.wait())
  417. }
  418. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  419. // the `readyChannelMux` should error.
  420. XCTAssertThrowsError(try readyChannelMux.wait())
  421. }
  422. func testConnectOnSecondAttempt() throws {
  423. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  424. let channelFutures: [EventLoopFuture<Channel>] = [
  425. self.loop.makeFailedFuture(DoomedChannelError()),
  426. channelPromise.futureResult,
  427. ]
  428. var channelFutureIterator = channelFutures.makeIterator()
  429. var configuration = self.defaultConfiguration
  430. configuration.connectionBackoff = .oneSecondFixed
  431. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  432. guard let next = channelFutureIterator.next() else {
  433. XCTFail("Too many channels requested")
  434. return self.loop.makeFailedFuture(DoomedChannelError())
  435. }
  436. return next
  437. }
  438. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  439. Change(from: .idle, to: .connecting),
  440. Change(from: .connecting, to: .transientFailure),
  441. ]) {
  442. // Get a HTTP/2 stream multiplexer.
  443. let readyChannelMux = manager.getHTTP2Multiplexer()
  444. self.loop.run()
  445. return readyChannelMux
  446. }
  447. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  448. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  449. self.loop.run()
  450. // Move time forwards by a second to start the next connection attempt.
  451. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  452. self.loop.advanceTime(by: .seconds(1))
  453. }
  454. // Setup the actual channel and complete the promise.
  455. let channel = EmbeddedChannel(loop: self.loop)
  456. let h2mux = HTTP2StreamMultiplexer(
  457. mode: .client,
  458. channel: channel,
  459. inboundStreamInitializer: nil
  460. )
  461. try channel.pipeline.addHandler(
  462. GRPCIdleHandler(
  463. connectionManager: manager,
  464. multiplexer: h2mux,
  465. idleTimeout: .minutes(5),
  466. keepalive: .init(),
  467. logger: self.logger
  468. )
  469. ).wait()
  470. channelPromise.succeed(channel)
  471. XCTAssertNoThrow(
  472. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  473. .wait()
  474. )
  475. // Write a SETTINGS frame on the root stream.
  476. try self.waitForStateChange(from: .connecting, to: .ready) {
  477. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  478. XCTAssertNoThrow(try channel.writeInbound(frame))
  479. }
  480. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  481. XCTAssertNoThrow(try readyChannelMux.wait())
  482. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  483. // Now shutdown.
  484. try self.waitForStateChange(from: .ready, to: .shutdown) {
  485. let shutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try shutdown.wait())
  488. }
  489. }
  490. func testShutdownWhileConnecting() throws {
  491. let channelPromise = self.loop.makePromise(of: Channel.self)
  492. let manager = self.makeConnectionManager { _, _ in
  493. return channelPromise.futureResult
  494. }
  495. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  496. self
  497. .waitForStateChange(from: .idle, to: .connecting) {
  498. let readyChannelMux = manager.getHTTP2Multiplexer()
  499. self.loop.run()
  500. return readyChannelMux
  501. }
  502. // Now shutdown.
  503. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  504. from: .connecting,
  505. to: .shutdown
  506. ) {
  507. let shutdown = manager.shutdown()
  508. self.loop.run()
  509. return shutdown
  510. }
  511. // The multiplexer we were requesting should fail.
  512. XCTAssertThrowsError(try readyChannelMux.wait())
  513. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  514. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  515. let channel = try channelPromise.futureResult.wait()
  516. self.loop.run()
  517. XCTAssertNoThrow(try channel.closeFuture.wait())
  518. XCTAssertNoThrow(try shutdownFuture.wait())
  519. }
  520. func testShutdownWhileTransientFailure() throws {
  521. var configuration = self.defaultConfiguration
  522. configuration.connectionBackoff = .oneSecondFixed
  523. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  524. self.loop.makeFailedFuture(DoomedChannelError())
  525. }
  526. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  527. Change(from: .idle, to: .connecting),
  528. Change(from: .connecting, to: .transientFailure),
  529. ]) {
  530. // Get a HTTP/2 stream multiplexer.
  531. let readyChannelMux = manager.getHTTP2Multiplexer()
  532. self.loop.run()
  533. return readyChannelMux
  534. }
  535. // Now shutdown.
  536. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  537. let shutdown = manager.shutdown()
  538. self.loop.run()
  539. XCTAssertNoThrow(try shutdown.wait())
  540. }
  541. // The HTTP/2 stream mux we were requesting should fail.
  542. XCTAssertThrowsError(try readyChannelMux.wait())
  543. }
  544. func testShutdownWhileActive() throws {
  545. let channelPromise = self.loop.makePromise(of: Channel.self)
  546. let manager = self.makeConnectionManager { _, _ in
  547. return channelPromise.futureResult
  548. }
  549. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  550. self
  551. .waitForStateChange(from: .idle, to: .connecting) {
  552. let readyChannelMux = manager.getHTTP2Multiplexer()
  553. self.loop.run()
  554. return readyChannelMux
  555. }
  556. // Prepare the channel
  557. let channel = EmbeddedChannel(loop: self.loop)
  558. let h2mux = HTTP2StreamMultiplexer(
  559. mode: .client,
  560. channel: channel,
  561. inboundStreamInitializer: nil
  562. )
  563. try channel.pipeline.addHandler(
  564. GRPCIdleHandler(
  565. connectionManager: manager,
  566. multiplexer: h2mux,
  567. idleTimeout: .minutes(5),
  568. keepalive: .init(),
  569. logger: self.logger
  570. )
  571. ).wait()
  572. channelPromise.succeed(channel)
  573. XCTAssertNoThrow(
  574. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  575. .wait()
  576. )
  577. // (No state change expected here: active is an internal state.)
  578. // Now shutdown.
  579. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  580. let shutdown = manager.shutdown()
  581. self.loop.run()
  582. XCTAssertNoThrow(try shutdown.wait())
  583. }
  584. // The HTTP/2 stream multiplexer we were requesting should fail.
  585. XCTAssertThrowsError(try readyChannelMux.wait())
  586. }
  587. func testShutdownWhileShutdown() throws {
  588. let manager = self.makeConnectionManager()
  589. try self.waitForStateChange(from: .idle, to: .shutdown) {
  590. let firstShutdown = manager.shutdown()
  591. self.loop.run()
  592. XCTAssertNoThrow(try firstShutdown.wait())
  593. }
  594. let secondShutdown = manager.shutdown()
  595. self.loop.run()
  596. XCTAssertNoThrow(try secondShutdown.wait())
  597. }
  598. func testTransientFailureWhileActive() throws {
  599. var configuration = self.defaultConfiguration
  600. configuration.connectionBackoff = .oneSecondFixed
  601. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  602. let channelFutures: [EventLoopFuture<Channel>] = [
  603. channelPromise.futureResult,
  604. self.loop.makeFailedFuture(DoomedChannelError()),
  605. ]
  606. var channelFutureIterator = channelFutures.makeIterator()
  607. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  608. guard let next = channelFutureIterator.next() else {
  609. XCTFail("Too many channels requested")
  610. return self.loop.makeFailedFuture(DoomedChannelError())
  611. }
  612. return next
  613. }
  614. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  615. self
  616. .waitForStateChange(from: .idle, to: .connecting) {
  617. let readyChannelMux = manager.getHTTP2Multiplexer()
  618. self.loop.run()
  619. return readyChannelMux
  620. }
  621. // Prepare the channel
  622. let firstChannel = EmbeddedChannel(loop: self.loop)
  623. let h2mux = HTTP2StreamMultiplexer(
  624. mode: .client,
  625. channel: firstChannel,
  626. inboundStreamInitializer: nil
  627. )
  628. try firstChannel.pipeline.addHandler(
  629. GRPCIdleHandler(
  630. connectionManager: manager,
  631. multiplexer: h2mux,
  632. idleTimeout: .minutes(5),
  633. keepalive: .init(),
  634. logger: self.logger
  635. )
  636. ).wait()
  637. channelPromise.succeed(firstChannel)
  638. XCTAssertNoThrow(
  639. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  640. .wait()
  641. )
  642. // (No state change expected here: active is an internal state.)
  643. // Close the channel (simulate e.g. TLS handshake failed)
  644. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  645. XCTAssertNoThrow(try firstChannel.close().wait())
  646. }
  647. // Start connecting again.
  648. self.waitForStateChanges([
  649. Change(from: .transientFailure, to: .connecting),
  650. Change(from: .connecting, to: .transientFailure),
  651. ]) {
  652. self.loop.advanceTime(by: .seconds(1))
  653. }
  654. // Now shutdown
  655. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  656. let shutdown = manager.shutdown()
  657. self.loop.run()
  658. XCTAssertNoThrow(try shutdown.wait())
  659. }
  660. // The channel never came up: it should be throw.
  661. XCTAssertThrowsError(try readyChannelMux.wait())
  662. }
  663. func testTransientFailureWhileReady() throws {
  664. var configuration = self.defaultConfiguration
  665. configuration.connectionBackoff = .oneSecondFixed
  666. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  667. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  668. let channelFutures: [EventLoopFuture<Channel>] = [
  669. firstChannelPromise.futureResult,
  670. secondChannelPromise.futureResult,
  671. ]
  672. var channelFutureIterator = channelFutures.makeIterator()
  673. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  674. guard let next = channelFutureIterator.next() else {
  675. XCTFail("Too many channels requested")
  676. return self.loop.makeFailedFuture(DoomedChannelError())
  677. }
  678. return next
  679. }
  680. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  681. self
  682. .waitForStateChange(from: .idle, to: .connecting) {
  683. let readyChannelMux = manager.getHTTP2Multiplexer()
  684. self.loop.run()
  685. return readyChannelMux
  686. }
  687. // Prepare the first channel
  688. let firstChannel = EmbeddedChannel(loop: self.loop)
  689. let firstH2mux = HTTP2StreamMultiplexer(
  690. mode: .client,
  691. channel: firstChannel,
  692. inboundStreamInitializer: nil
  693. )
  694. try firstChannel.pipeline.addHandler(
  695. GRPCIdleHandler(
  696. connectionManager: manager,
  697. multiplexer: firstH2mux,
  698. idleTimeout: .minutes(5),
  699. keepalive: .init(),
  700. logger: self.logger
  701. )
  702. ).wait()
  703. firstChannelPromise.succeed(firstChannel)
  704. XCTAssertNoThrow(
  705. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  706. .wait()
  707. )
  708. // Write a SETTINGS frame on the root stream.
  709. try self.waitForStateChange(from: .connecting, to: .ready) {
  710. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  711. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  712. }
  713. // Channel should now be ready.
  714. XCTAssertNoThrow(try readyChannelMux.wait())
  715. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  716. let streamCreated = NIOHTTP2StreamCreatedEvent(
  717. streamID: 1,
  718. localInitialWindowSize: nil,
  719. remoteInitialWindowSize: nil
  720. )
  721. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  722. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  723. XCTAssertNoThrow(try firstChannel.close().wait())
  724. }
  725. // Run to start connecting again.
  726. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  727. self.loop.advanceTime(by: .seconds(1))
  728. }
  729. // Prepare the second channel
  730. let secondChannel = EmbeddedChannel(loop: self.loop)
  731. let secondH2mux = HTTP2StreamMultiplexer(
  732. mode: .client,
  733. channel: secondChannel,
  734. inboundStreamInitializer: nil
  735. )
  736. try secondChannel.pipeline.addHandler(
  737. GRPCIdleHandler(
  738. connectionManager: manager,
  739. multiplexer: secondH2mux,
  740. idleTimeout: .minutes(5),
  741. keepalive: .init(),
  742. logger: self.logger
  743. )
  744. ).wait()
  745. secondChannelPromise.succeed(secondChannel)
  746. XCTAssertNoThrow(
  747. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  748. .wait()
  749. )
  750. // Write a SETTINGS frame on the root stream.
  751. try self.waitForStateChange(from: .connecting, to: .ready) {
  752. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  753. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  754. }
  755. // Now shutdown
  756. try self.waitForStateChange(from: .ready, to: .shutdown) {
  757. let shutdown = manager.shutdown()
  758. self.loop.run()
  759. XCTAssertNoThrow(try shutdown.wait())
  760. }
  761. }
  762. func testGoAwayWhenReady() throws {
  763. let channelPromise = self.loop.makePromise(of: Channel.self)
  764. let manager = self.makeConnectionManager { _, _ in
  765. return channelPromise.futureResult
  766. }
  767. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> =
  768. self
  769. .waitForStateChange(from: .idle, to: .connecting) {
  770. let readyChannelMux = manager.getHTTP2Multiplexer()
  771. self.loop.run()
  772. return readyChannelMux
  773. }
  774. // Setup the channel.
  775. let channel = EmbeddedChannel(loop: self.loop)
  776. let h2mux = HTTP2StreamMultiplexer(
  777. mode: .client,
  778. channel: channel,
  779. inboundStreamInitializer: nil
  780. )
  781. try channel.pipeline.addHandler(
  782. GRPCIdleHandler(
  783. connectionManager: manager,
  784. multiplexer: h2mux,
  785. idleTimeout: .minutes(5),
  786. keepalive: .init(),
  787. logger: self.logger
  788. )
  789. ).wait()
  790. channelPromise.succeed(channel)
  791. XCTAssertNoThrow(
  792. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  793. .wait()
  794. )
  795. try self.waitForStateChange(from: .connecting, to: .ready) {
  796. // Write a SETTINGS frame on the root stream.
  797. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  798. XCTAssertNoThrow(try channel.writeInbound(frame))
  799. }
  800. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  801. XCTAssertNoThrow(try readyChannelMux.wait())
  802. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  803. // channel to close.
  804. try self.waitForStateChange(from: .ready, to: .idle) {
  805. let goAway = HTTP2Frame(
  806. streamID: .rootStream,
  807. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  808. )
  809. XCTAssertNoThrow(try channel.writeInbound(goAway))
  810. self.loop.run()
  811. }
  812. self.loop.run()
  813. XCTAssertNoThrow(try channel.closeFuture.wait())
  814. // Now shutdown
  815. try self.waitForStateChange(from: .idle, to: .shutdown) {
  816. let shutdown = manager.shutdown()
  817. self.loop.run()
  818. XCTAssertNoThrow(try shutdown.wait())
  819. }
  820. }
  821. func testDoomedOptimisticChannelFromIdle() {
  822. var configuration = self.defaultConfiguration
  823. configuration.callStartBehavior = .fastFailure
  824. let manager = ConnectionManager(
  825. configuration: configuration,
  826. channelProvider: HookedChannelProvider { _, loop in
  827. return loop.makeFailedFuture(DoomedChannelError())
  828. },
  829. connectivityDelegate: nil,
  830. logger: self.logger
  831. )
  832. let candidate = manager.getHTTP2Multiplexer()
  833. self.loop.run()
  834. XCTAssertThrowsError(try candidate.wait())
  835. }
  836. func testDoomedOptimisticChannelFromConnecting() throws {
  837. var configuration = self.defaultConfiguration
  838. configuration.callStartBehavior = .fastFailure
  839. let promise = self.loop.makePromise(of: Channel.self)
  840. let manager = self.makeConnectionManager { _, _ in
  841. return promise.futureResult
  842. }
  843. self.waitForStateChange(from: .idle, to: .connecting) {
  844. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  845. _ = manager.getHTTP2Multiplexer()
  846. self.loop.run()
  847. }
  848. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  849. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  850. self.loop.run()
  851. // Fail the promise.
  852. promise.fail(DoomedChannelError())
  853. XCTAssertThrowsError(try optimisticChannelMux.wait())
  854. }
  855. func testOptimisticChannelFromTransientFailure() throws {
  856. var configuration = self.defaultConfiguration
  857. configuration.callStartBehavior = .fastFailure
  858. configuration.connectionBackoff = ConnectionBackoff()
  859. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  860. self.loop.makeFailedFuture(DoomedChannelError())
  861. }
  862. self.waitForStateChanges([
  863. Change(from: .idle, to: .connecting),
  864. Change(from: .connecting, to: .transientFailure),
  865. ]) {
  866. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  867. _ = manager.getHTTP2Multiplexer()
  868. self.loop.run()
  869. }
  870. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  871. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  872. self.loop.run()
  873. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  874. XCTAssertTrue(error is DoomedChannelError)
  875. }
  876. }
  877. func testOptimisticChannelFromShutdown() throws {
  878. var configuration = self.defaultConfiguration
  879. configuration.callStartBehavior = .fastFailure
  880. let manager = self.makeConnectionManager { _, _ in
  881. return self.loop.makeFailedFuture(DoomedChannelError())
  882. }
  883. let shutdown = manager.shutdown()
  884. self.loop.run()
  885. XCTAssertNoThrow(try shutdown.wait())
  886. // Get a channel optimistically. It'll fail, obviously.
  887. let channelMux = manager.getHTTP2Multiplexer()
  888. self.loop.run()
  889. XCTAssertThrowsError(try channelMux.wait())
  890. }
  891. func testForceIdleAfterInactive() throws {
  892. let channelPromise = self.loop.makePromise(of: Channel.self)
  893. let manager = self.makeConnectionManager { _, _ in
  894. return channelPromise.futureResult
  895. }
  896. // Start the connection.
  897. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  898. from: .idle,
  899. to: .connecting
  900. ) {
  901. let readyChannelMux = manager.getHTTP2Multiplexer()
  902. self.loop.run()
  903. return readyChannelMux
  904. }
  905. // Setup the real channel and activate it.
  906. let channel = EmbeddedChannel(loop: self.loop)
  907. let h2mux = HTTP2StreamMultiplexer(
  908. mode: .client,
  909. channel: channel,
  910. inboundStreamInitializer: nil
  911. )
  912. XCTAssertNoThrow(
  913. try channel.pipeline.addHandlers([
  914. GRPCIdleHandler(
  915. connectionManager: manager,
  916. multiplexer: h2mux,
  917. idleTimeout: .minutes(5),
  918. keepalive: .init(),
  919. logger: self.logger
  920. )
  921. ]).wait()
  922. )
  923. channelPromise.succeed(channel)
  924. self.loop.run()
  925. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  926. XCTAssertNoThrow(try connect.wait())
  927. // Write a SETTINGS frame on the root stream.
  928. try self.waitForStateChange(from: .connecting, to: .ready) {
  929. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  930. XCTAssertNoThrow(try channel.writeInbound(frame))
  931. }
  932. // The channel should now be ready.
  933. XCTAssertNoThrow(try readyChannelMux.wait())
  934. // Now drop the connection.
  935. try self.waitForStateChange(from: .ready, to: .shutdown) {
  936. let shutdown = manager.shutdown()
  937. self.loop.run()
  938. XCTAssertNoThrow(try shutdown.wait())
  939. }
  940. }
  941. func testCloseWithoutActiveRPCs() throws {
  942. let channelPromise = self.loop.makePromise(of: Channel.self)
  943. let manager = self.makeConnectionManager { _, _ in
  944. return channelPromise.futureResult
  945. }
  946. // Start the connection.
  947. let readyChannelMux = self.waitForStateChange(
  948. from: .idle,
  949. to: .connecting
  950. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  951. let readyChannelMux = manager.getHTTP2Multiplexer()
  952. self.loop.run()
  953. return readyChannelMux
  954. }
  955. // Setup the actual channel and activate it.
  956. let channel = EmbeddedChannel(loop: self.loop)
  957. let h2mux = HTTP2StreamMultiplexer(
  958. mode: .client,
  959. channel: channel,
  960. inboundStreamInitializer: nil
  961. )
  962. XCTAssertNoThrow(
  963. try channel.pipeline.addHandlers([
  964. GRPCIdleHandler(
  965. connectionManager: manager,
  966. multiplexer: h2mux,
  967. idleTimeout: .minutes(5),
  968. keepalive: .init(),
  969. logger: self.logger
  970. )
  971. ]).wait()
  972. )
  973. channelPromise.succeed(channel)
  974. self.loop.run()
  975. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  976. XCTAssertNoThrow(try connect.wait())
  977. // "ready" the connection.
  978. try self.waitForStateChange(from: .connecting, to: .ready) {
  979. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  980. XCTAssertNoThrow(try channel.writeInbound(frame))
  981. }
  982. // The HTTP/2 stream multiplexer should now be ready.
  983. XCTAssertNoThrow(try readyChannelMux.wait())
  984. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  985. // failure state.
  986. self.waitForStateChange(from: .ready, to: .idle) {
  987. channel.pipeline.fireChannelInactive()
  988. }
  989. }
  990. func testIdleErrorDoesNothing() throws {
  991. let manager = self.makeConnectionManager()
  992. // Dropping an error on this manager should be fine.
  993. manager.channelError(DoomedChannelError())
  994. // Shutting down is then safe.
  995. try self.waitForStateChange(from: .idle, to: .shutdown) {
  996. let shutdown = manager.shutdown()
  997. self.loop.run()
  998. XCTAssertNoThrow(try shutdown.wait())
  999. }
  1000. }
  1001. func testHTTP2Delegates() throws {
  1002. let channel = EmbeddedChannel(loop: self.loop)
  1003. defer {
  1004. XCTAssertNoThrow(try channel.finish())
  1005. }
  1006. let multiplexer = HTTP2StreamMultiplexer(
  1007. mode: .client,
  1008. channel: channel,
  1009. inboundStreamInitializer: nil
  1010. )
  1011. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  1012. var streamsOpened = 0
  1013. var streamsClosed = 0
  1014. var maxConcurrentStreams = 0
  1015. func streamOpened(_ connectionManager: ConnectionManager) {
  1016. self.streamsOpened += 1
  1017. }
  1018. func streamClosed(_ connectionManager: ConnectionManager) {
  1019. self.streamsClosed += 1
  1020. }
  1021. func receivedSettingsMaxConcurrentStreams(
  1022. _ connectionManager: ConnectionManager,
  1023. maxConcurrentStreams: Int
  1024. ) {
  1025. self.maxConcurrentStreams = maxConcurrentStreams
  1026. }
  1027. }
  1028. let http2 = HTTP2Delegate()
  1029. let manager = ConnectionManager(
  1030. eventLoop: self.loop,
  1031. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  1032. let idleHandler = GRPCIdleHandler(
  1033. connectionManager: manager,
  1034. multiplexer: multiplexer,
  1035. idleTimeout: .minutes(5),
  1036. keepalive: ClientConnectionKeepalive(),
  1037. logger: self.logger
  1038. )
  1039. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  1040. // us to just fire stream created/closed events into the channel.
  1041. do {
  1042. try channel.pipeline.syncOperations.addHandler(idleHandler)
  1043. } catch {
  1044. return eventLoop.makeFailedFuture(error)
  1045. }
  1046. return eventLoop.makeSucceededFuture(channel)
  1047. },
  1048. callStartBehavior: .waitsForConnectivity,
  1049. connectionBackoff: ConnectionBackoff(),
  1050. connectivityDelegate: nil,
  1051. http2Delegate: http2,
  1052. logger: self.logger
  1053. )
  1054. // Start connecting.
  1055. let futureMultiplexer = manager.getHTTP2Multiplexer()
  1056. self.loop.run()
  1057. // Do the actual connecting.
  1058. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  1059. // The channel isn't ready until it's seen a SETTINGS frame.
  1060. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  1061. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  1062. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  1063. }
  1064. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  1065. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  1066. // max concurrent streams.
  1067. XCTAssertNoThrow(try futureMultiplexer.wait())
  1068. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  1069. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  1070. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  1071. // Open some streams.
  1072. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1073. let streamCreated = NIOHTTP2StreamCreatedEvent(
  1074. streamID: streamID,
  1075. localInitialWindowSize: nil,
  1076. remoteInitialWindowSize: nil
  1077. )
  1078. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  1079. }
  1080. // ... and then close them.
  1081. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1082. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  1083. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  1084. }
  1085. XCTAssertEqual(http2.streamsOpened, 4)
  1086. XCTAssertEqual(http2.streamsClosed, 4)
  1087. }
  1088. func testChannelErrorWhenConnecting() throws {
  1089. let channelPromise = self.loop.makePromise(of: Channel.self)
  1090. let manager = self.makeConnectionManager { _, _ in
  1091. return channelPromise.futureResult
  1092. }
  1093. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  1094. from: .idle,
  1095. to: .connecting
  1096. ) {
  1097. let channel = manager.getHTTP2Multiplexer()
  1098. self.loop.run()
  1099. return channel
  1100. }
  1101. self.waitForStateChange(from: .connecting, to: .shutdown) {
  1102. channelPromise.fail(EventLoopError.shutdown)
  1103. }
  1104. XCTAssertThrowsError(try multiplexer.wait())
  1105. }
  1106. func testChannelErrorAndConnectFailWhenConnecting() throws {
  1107. // This test checks a path through the connection manager which previously led to an invalid
  1108. // state (a connect failure in a state other than connecting). To trigger these we need to
  1109. // fire an error down the pipeline containing the idle handler and fail the connect promise.
  1110. let escapedChannelPromise = self.loop.makePromise(of: Channel.self)
  1111. let channelPromise = self.loop.makePromise(of: Channel.self)
  1112. var configuration = self.defaultConfiguration
  1113. configuration.connectionBackoff = ConnectionBackoff()
  1114. let manager = self.makeConnectionManager(
  1115. configuration: configuration
  1116. ) { connectionManager, loop in
  1117. let channel = EmbeddedChannel(loop: loop as! EmbeddedEventLoop)
  1118. let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: channel) {
  1119. $0.eventLoop.makeSucceededVoidFuture()
  1120. }
  1121. let idleHandler = GRPCIdleHandler(
  1122. connectionManager: connectionManager,
  1123. multiplexer: multiplexer,
  1124. idleTimeout: .minutes(60),
  1125. keepalive: .init(),
  1126. logger: self.clientLogger
  1127. )
  1128. channel.pipeline.addHandler(idleHandler).whenSuccess {
  1129. escapedChannelPromise.succeed(channel)
  1130. }
  1131. return channelPromise.futureResult
  1132. }
  1133. // Ask for the multiplexer to trigger channel creation.
  1134. self.waitForStateChange(from: .idle, to: .connecting) {
  1135. _ = manager.getHTTP2Multiplexer()
  1136. self.loop.run()
  1137. }
  1138. // Fire an error down the pipeline.
  1139. let channel = try escapedChannelPromise.futureResult.wait()
  1140. channel.pipeline.fireErrorCaught(GRPCStatus(code: .unavailable))
  1141. // Fail the channel promise.
  1142. channelPromise.fail(GRPCStatus(code: .unavailable))
  1143. }
  1144. func testClientKeepaliveJitterWithoutClamping() {
  1145. let original = ClientConnectionKeepalive(interval: .seconds(2), timeout: .seconds(1))
  1146. let keepalive = original.jitteringInterval(byAtMost: .milliseconds(500))
  1147. XCTAssertGreaterThanOrEqual(keepalive.interval, .milliseconds(1500))
  1148. XCTAssertLessThanOrEqual(keepalive.interval, .milliseconds(2500))
  1149. }
  1150. func testClientKeepaliveJitterClampedToTimeout() {
  1151. let original = ClientConnectionKeepalive(interval: .seconds(2), timeout: .seconds(1))
  1152. let keepalive = original.jitteringInterval(byAtMost: .seconds(2))
  1153. // Strictly greater than the timeout of 1 seconds.
  1154. XCTAssertGreaterThan(keepalive.interval, .seconds(1))
  1155. XCTAssertLessThanOrEqual(keepalive.interval, .seconds(4))
  1156. }
  1157. func testServerKeepaliveJitterWithoutClamping() {
  1158. let original = ServerConnectionKeepalive(interval: .seconds(2), timeout: .seconds(1))
  1159. let keepalive = original.jitteringInterval(byAtMost: .milliseconds(500))
  1160. XCTAssertGreaterThanOrEqual(keepalive.interval, .milliseconds(1500))
  1161. XCTAssertLessThanOrEqual(keepalive.interval, .milliseconds(2500))
  1162. }
  1163. func testServerKeepaliveJitterClampedToTimeout() {
  1164. let original = ServerConnectionKeepalive(interval: .seconds(2), timeout: .seconds(1))
  1165. let keepalive = original.jitteringInterval(byAtMost: .seconds(2))
  1166. // Strictly greater than the timeout of 1 seconds.
  1167. XCTAssertGreaterThan(keepalive.interval, .seconds(1))
  1168. XCTAssertLessThanOrEqual(keepalive.interval, .seconds(4))
  1169. }
  1170. func testConnectTimeoutIsRespectedWithNoRetries() {
  1171. // Setup a factory which makes channels. We'll use this as the point to check that the
  1172. // connect timeout is as expected.
  1173. struct Provider: ConnectionManagerChannelProvider {
  1174. func makeChannel(
  1175. managedBy connectionManager: ConnectionManager,
  1176. onEventLoop eventLoop: any EventLoop,
  1177. connectTimeout: TimeAmount?,
  1178. logger: Logger
  1179. ) -> EventLoopFuture<Channel> {
  1180. XCTAssertEqual(connectTimeout, .seconds(314_159_265))
  1181. return eventLoop.makeFailedFuture(DoomedChannelError())
  1182. }
  1183. }
  1184. var configuration = self.defaultConfiguration
  1185. configuration.connectionBackoff = ConnectionBackoff(
  1186. minimumConnectionTimeout: 314_159_265,
  1187. retries: .none
  1188. )
  1189. let manager = ConnectionManager(
  1190. configuration: configuration,
  1191. channelProvider: Provider(),
  1192. connectivityDelegate: self.monitor,
  1193. logger: self.logger
  1194. )
  1195. // Setup the state change expectations and trigger them by asking for the multiplexer.
  1196. // We expect connecting to shutdown as no connect retries are configured and the factory
  1197. // always returns errors.
  1198. let multiplexer = self.waitForStateChanges([
  1199. Change(from: .idle, to: .connecting),
  1200. Change(from: .connecting, to: .shutdown),
  1201. ]) {
  1202. let multiplexer = manager.getHTTP2Multiplexer()
  1203. self.loop.run()
  1204. return multiplexer
  1205. }
  1206. XCTAssertThrowsError(try multiplexer.wait()) { error in
  1207. XCTAssert(error is DoomedChannelError)
  1208. }
  1209. }
  1210. }
  1211. internal struct Change: Hashable, CustomStringConvertible {
  1212. var from: ConnectivityState
  1213. var to: ConnectivityState
  1214. var description: String {
  1215. return "\(self.from) → \(self.to)"
  1216. }
  1217. }
  1218. // Unchecked as all mutable state is modified from a serial queue.
  1219. extension RecordingConnectivityDelegate: @unchecked Sendable {}
  1220. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  1221. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  1222. private let semaphore = DispatchSemaphore(value: 0)
  1223. private var expectation: Expectation = .noExpectation
  1224. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  1225. private enum Expectation {
  1226. /// We have no expectation of any changes. We'll just ignore any changes.
  1227. case noExpectation
  1228. /// We expect one change.
  1229. case one((Change) -> Void)
  1230. /// We expect 'count' changes.
  1231. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  1232. var count: Int {
  1233. switch self {
  1234. case .noExpectation:
  1235. return 0
  1236. case .one:
  1237. return 1
  1238. case let .some(count, _, _):
  1239. return count
  1240. }
  1241. }
  1242. }
  1243. func connectivityStateDidChange(
  1244. from oldState: ConnectivityState,
  1245. to newState: ConnectivityState
  1246. ) {
  1247. self.serialQueue.async {
  1248. switch self.expectation {
  1249. case let .one(verify):
  1250. // We don't care about future changes.
  1251. self.expectation = .noExpectation
  1252. // Verify and notify.
  1253. verify(Change(from: oldState, to: newState))
  1254. self.semaphore.signal()
  1255. case .some(let count, var recorded, let verify):
  1256. recorded.append(Change(from: oldState, to: newState))
  1257. if recorded.count == count {
  1258. // We don't care about future changes.
  1259. self.expectation = .noExpectation
  1260. // Verify and notify.
  1261. verify(recorded)
  1262. self.semaphore.signal()
  1263. } else {
  1264. // Still need more responses.
  1265. self.expectation = .some(count: count, recorded: recorded, verify)
  1266. }
  1267. case .noExpectation:
  1268. // Ignore any changes.
  1269. ()
  1270. }
  1271. }
  1272. }
  1273. func connectionStartedQuiescing() {
  1274. self.serialQueue.async {
  1275. self.quiescingSemaphore.signal()
  1276. }
  1277. }
  1278. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1279. self.serialQueue.async {
  1280. self.expectation = .some(count: count, recorded: [], verify)
  1281. }
  1282. }
  1283. func expectChange(verify: @escaping (Change) -> Void) {
  1284. self.serialQueue.async {
  1285. self.expectation = .one(verify)
  1286. }
  1287. }
  1288. func waitForExpectedChanges(
  1289. timeout: DispatchTimeInterval,
  1290. file: StaticString = #filePath,
  1291. line: UInt = #line
  1292. ) {
  1293. let result = self.semaphore.wait(timeout: .now() + timeout)
  1294. switch result {
  1295. case .success:
  1296. ()
  1297. case .timedOut:
  1298. XCTFail(
  1299. "Timed out before verifying \(self.expectation.count) change(s)",
  1300. file: file,
  1301. line: line
  1302. )
  1303. }
  1304. }
  1305. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1306. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1307. switch result {
  1308. case .success:
  1309. ()
  1310. case .timedOut:
  1311. XCTFail("Timed out waiting for connection to start quiescing")
  1312. }
  1313. }
  1314. }
  1315. extension ConnectionBackoff {
  1316. fileprivate static let oneSecondFixed = ConnectionBackoff(
  1317. initialBackoff: 1.0,
  1318. maximumBackoff: 1.0,
  1319. multiplier: 1.0,
  1320. jitter: 0.0
  1321. )
  1322. }
  1323. private struct DoomedChannelError: Error {}
  1324. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1325. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1326. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1327. self.provider = provider
  1328. }
  1329. func makeChannel(
  1330. managedBy connectionManager: ConnectionManager,
  1331. onEventLoop eventLoop: EventLoop,
  1332. connectTimeout: TimeAmount?,
  1333. logger: Logger
  1334. ) -> EventLoopFuture<Channel> {
  1335. return self.provider(connectionManager, eventLoop)
  1336. }
  1337. }
  1338. extension ConnectionManager {
  1339. // For backwards compatibility, to avoid large diffs in these tests.
  1340. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1341. return self.shutdown(mode: .forceful)
  1342. }
  1343. }