ConnectionManagerTests.swift 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIOCore
  20. import NIOEmbedded
  21. import NIOHTTP2
  22. import XCTest
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #filePath,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #filePath,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let channel = manager.getHTTP2Multiplexer()
  107. self.loop.run()
  108. return channel
  109. }
  110. self.waitForStateChange(from: .connecting, to: .shutdown) {
  111. channelPromise.fail(DoomedChannelError())
  112. }
  113. XCTAssertThrowsError(try multiplexer.wait()) {
  114. XCTAssertTrue($0 is DoomedChannelError)
  115. }
  116. }
  117. func testConnectAndDisconnect() throws {
  118. let channelPromise = self.loop.makePromise(of: Channel.self)
  119. let manager = self.makeConnectionManager { _, _ in
  120. return channelPromise.futureResult
  121. }
  122. // Start the connection.
  123. self.waitForStateChange(from: .idle, to: .connecting) {
  124. _ = manager.getHTTP2Multiplexer()
  125. self.loop.run()
  126. }
  127. // Setup the real channel and activate it.
  128. let channel = EmbeddedChannel(loop: self.loop)
  129. let idleHandler = GRPCIdleHandler(
  130. connectionManager: manager,
  131. idleTimeout: .minutes(5),
  132. keepalive: .init(),
  133. logger: self.logger
  134. )
  135. let h2handler = NIOHTTP2Handler(
  136. mode: .client,
  137. eventLoop: channel.eventLoop,
  138. streamDelegate: idleHandler
  139. ) { channel in
  140. channel.eventLoop.makeSucceededVoidFuture()
  141. }
  142. try channel.pipeline.addHandler(h2handler).wait()
  143. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  144. try channel.pipeline.addHandler(idleHandler).wait()
  145. channelPromise.succeed(channel)
  146. XCTAssertNoThrow(
  147. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  148. .wait()
  149. )
  150. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  151. try self.waitForStateChange(from: .connecting, to: .ready) {
  152. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  153. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  154. }
  155. // Close the channel.
  156. try self.waitForStateChange(from: .ready, to: .shutdown) {
  157. // Now the channel should be available: shut it down.
  158. let shutdown = manager.shutdown()
  159. self.loop.run()
  160. XCTAssertNoThrow(try shutdown.wait())
  161. }
  162. }
  163. func testConnectAndIdle() throws {
  164. let channelPromise = self.loop.makePromise(of: Channel.self)
  165. let manager = self.makeConnectionManager { _, _ in
  166. return channelPromise.futureResult
  167. }
  168. // Start the connection.
  169. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  170. .waitForStateChange(from: .idle, to: .connecting) {
  171. let readyChannelMux = manager.getHTTP2Multiplexer()
  172. self.loop.run()
  173. return readyChannelMux
  174. }
  175. // Setup the channel.
  176. let channel = EmbeddedChannel(loop: self.loop)
  177. let idleHandler = GRPCIdleHandler(
  178. connectionManager: manager,
  179. idleTimeout: .minutes(5),
  180. keepalive: .init(),
  181. logger: self.logger
  182. )
  183. let h2handler = NIOHTTP2Handler(
  184. mode: .client,
  185. eventLoop: channel.eventLoop,
  186. streamDelegate: idleHandler
  187. ) { channel in
  188. channel.eventLoop.makeSucceededVoidFuture()
  189. }
  190. try channel.pipeline.addHandler(h2handler).wait()
  191. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  192. try channel.pipeline.addHandler(idleHandler).wait()
  193. channelPromise.succeed(channel)
  194. XCTAssertNoThrow(
  195. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  196. .wait()
  197. )
  198. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  199. try self.waitForStateChange(from: .connecting, to: .ready) {
  200. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  201. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  202. // Wait for the multiplexer, it _must_ be ready now.
  203. XCTAssertNoThrow(try readyChannelMux.wait())
  204. }
  205. // Go idle. This will shutdown the channel.
  206. try self.waitForStateChange(from: .ready, to: .idle) {
  207. self.loop.advanceTime(by: .minutes(5))
  208. XCTAssertNoThrow(try channel.closeFuture.wait())
  209. }
  210. // Now shutdown.
  211. try self.waitForStateChange(from: .idle, to: .shutdown) {
  212. let shutdown = manager.shutdown()
  213. self.loop.run()
  214. XCTAssertNoThrow(try shutdown.wait())
  215. }
  216. }
  217. /// Forwards only the first `channelInactive` call
  218. ///
  219. /// This is useful in tests where we intentionally mis-use the channels
  220. /// and call `fireChannelInactive` manually during the test but don't want
  221. /// teardown to cause precondition failures due to this unexpected behavior.
  222. class SwallowSecondInactiveHandler: ChannelInboundHandler {
  223. typealias InboundIn = HTTP2Frame
  224. typealias OutboundOut = HTTP2Frame
  225. private var seenAnInactive = false
  226. func channelInactive(context: ChannelHandlerContext) {
  227. if !self.seenAnInactive {
  228. self.seenAnInactive = true
  229. context.fireChannelInactive()
  230. }
  231. }
  232. }
  233. func testChannelInactiveBeforeActiveWithNoReconnect() throws {
  234. let channel = EmbeddedChannel(loop: self.loop)
  235. let channelPromise = self.loop.makePromise(of: Channel.self)
  236. let manager = self.makeConnectionManager { _, _ in
  237. return channelPromise.futureResult
  238. }
  239. // Start the connection.
  240. self.waitForStateChange(from: .idle, to: .connecting) {
  241. // Triggers the connect.
  242. _ = manager.getHTTP2Multiplexer()
  243. self.loop.run()
  244. }
  245. let idleHandler = GRPCIdleHandler(
  246. connectionManager: manager,
  247. idleTimeout: .minutes(5),
  248. keepalive: .init(),
  249. logger: self.logger
  250. )
  251. let h2handler = NIOHTTP2Handler(
  252. mode: .client,
  253. eventLoop: channel.eventLoop,
  254. streamDelegate: idleHandler
  255. ) { channel in
  256. channel.eventLoop.makeSucceededVoidFuture()
  257. }
  258. try channel.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler())
  259. try channel.pipeline.syncOperations.addHandler(h2handler)
  260. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  261. try channel.pipeline.syncOperations.addHandler(idleHandler)
  262. try channel.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler())
  263. channelPromise.succeed(channel)
  264. // Oops: wrong way around. We should tolerate this - just don't crash.
  265. channel.pipeline.fireChannelInactive()
  266. channel.pipeline.fireChannelActive()
  267. channel.embeddedEventLoop.run()
  268. try manager.shutdown(mode: .forceful).wait()
  269. }
  270. func testChannelInactiveBeforeActiveWillReconnect() throws {
  271. var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
  272. var channelPromises: [EventLoopPromise<Channel>] = [self.loop.makePromise(),
  273. self.loop.makePromise()]
  274. var channelFutures = Array(channelPromises.map { $0.futureResult })
  275. var configuration = self.defaultConfiguration
  276. configuration.connectionBackoff = .oneSecondFixed
  277. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  278. return channelFutures.removeLast()
  279. }
  280. // Start the connection.
  281. self.waitForStateChange(from: .idle, to: .connecting) {
  282. // Triggers the connect.
  283. _ = manager.getHTTP2Multiplexer()
  284. self.loop.run()
  285. }
  286. // Setup the channel.
  287. let channel1 = channels.removeLast()
  288. let channel1Promise = channelPromises.removeLast()
  289. let idleHandler1 = GRPCIdleHandler(
  290. connectionManager: manager,
  291. idleTimeout: .minutes(5),
  292. keepalive: .init(),
  293. logger: self.logger
  294. )
  295. let h2handler1 = NIOHTTP2Handler(
  296. mode: .client,
  297. eventLoop: channel1.eventLoop,
  298. streamDelegate: idleHandler1
  299. ) { channel in
  300. channel.eventLoop.makeSucceededVoidFuture()
  301. }
  302. try channel1.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler())
  303. try channel1.pipeline.syncOperations.addHandler(h2handler1)
  304. idleHandler1.setMultiplexer(try h2handler1.syncMultiplexer())
  305. try channel1.pipeline.syncOperations.addHandler(idleHandler1)
  306. try channel1.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler())
  307. channel1Promise.succeed(channel1)
  308. // Oops: wrong way around. We should tolerate this.
  309. channel1.pipeline.fireChannelInactive()
  310. channel1.pipeline.fireChannelActive()
  311. // Start the next attempt.
  312. self.loop.advanceTime(by: .seconds(1))
  313. let channel2 = channels.removeLast()
  314. let channel2Promise = channelPromises.removeLast()
  315. let idleHandler2 = GRPCIdleHandler(
  316. connectionManager: manager,
  317. idleTimeout: .minutes(5),
  318. keepalive: .init(),
  319. logger: self.logger
  320. )
  321. let h2handler2 = NIOHTTP2Handler(
  322. mode: .client,
  323. eventLoop: channel2.eventLoop,
  324. streamDelegate: idleHandler2
  325. ) { channel in
  326. channel.eventLoop.makeSucceededVoidFuture()
  327. }
  328. try channel2.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler())
  329. try channel2.pipeline.syncOperations.addHandler(h2handler2)
  330. idleHandler2.setMultiplexer(try h2handler2.syncMultiplexer())
  331. try channel2.pipeline.syncOperations.addHandler(idleHandler2)
  332. try channel2.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler())
  333. channel2Promise.succeed(channel2)
  334. try self.waitForStateChange(from: .connecting, to: .ready) {
  335. channel2.pipeline.fireChannelActive()
  336. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  337. XCTAssertNoThrow(try channel2.writeInbound(frame.encode()))
  338. }
  339. }
  340. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  341. let channelPromise = self.loop.makePromise(of: Channel.self)
  342. let manager = self.makeConnectionManager { _, _ in
  343. return channelPromise.futureResult
  344. }
  345. // Start the connection.
  346. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  347. .waitForStateChange(from: .idle, to: .connecting) {
  348. let readyChannelMux = manager.getHTTP2Multiplexer()
  349. self.loop.run()
  350. return readyChannelMux
  351. }
  352. // Setup the channel.
  353. let channel = EmbeddedChannel(loop: self.loop)
  354. let idleHandler = GRPCIdleHandler(
  355. connectionManager: manager,
  356. idleTimeout: .minutes(5),
  357. keepalive: .init(),
  358. logger: self.logger
  359. )
  360. let h2handler = NIOHTTP2Handler(
  361. mode: .client,
  362. eventLoop: channel.eventLoop,
  363. streamDelegate: idleHandler
  364. ) { channel in
  365. channel.eventLoop.makeSucceededVoidFuture()
  366. }
  367. try channel.pipeline.addHandler(h2handler).wait()
  368. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  369. try channel.pipeline.addHandler(idleHandler).wait()
  370. channelPromise.succeed(channel)
  371. XCTAssertNoThrow(
  372. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  373. .wait()
  374. )
  375. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  376. try self.waitForStateChange(from: .connecting, to: .ready) {
  377. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  378. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  379. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  380. XCTAssertNoThrow(try readyChannelMux.wait())
  381. }
  382. // "create" a stream; the details don't matter here.
  383. idleHandler.streamCreated(1, channel: channel)
  384. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  385. self.loop.advanceTime(by: .minutes(5))
  386. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  387. self.waitForStateChange(from: .ready, to: .idle) {
  388. // Close the stream.
  389. idleHandler.streamClosed(1, channel: channel)
  390. // ... wait for the idle timeout,
  391. self.loop.advanceTime(by: .minutes(5))
  392. }
  393. // Now shutdown.
  394. try self.waitForStateChange(from: .idle, to: .shutdown) {
  395. let shutdown = manager.shutdown()
  396. self.loop.run()
  397. XCTAssertNoThrow(try shutdown.wait())
  398. }
  399. }
  400. func testConnectAndThenBecomeInactive() throws {
  401. let channelPromise = self.loop.makePromise(of: Channel.self)
  402. let manager = self.makeConnectionManager { _, _ in
  403. return channelPromise.futureResult
  404. }
  405. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  406. .waitForStateChange(from: .idle, to: .connecting) {
  407. let readyChannelMux = manager.getHTTP2Multiplexer()
  408. self.loop.run()
  409. return readyChannelMux
  410. }
  411. // Setup the channel.
  412. let channel = EmbeddedChannel(loop: self.loop)
  413. let idleHandler = GRPCIdleHandler(
  414. connectionManager: manager,
  415. idleTimeout: .minutes(5),
  416. keepalive: .init(),
  417. logger: self.logger
  418. )
  419. let h2handler = NIOHTTP2Handler(
  420. mode: .client,
  421. eventLoop: channel.eventLoop,
  422. streamDelegate: idleHandler
  423. ) { channel in
  424. channel.eventLoop.makeSucceededVoidFuture()
  425. }
  426. try channel.pipeline.addHandler(h2handler).wait()
  427. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  428. try channel.pipeline.addHandler(idleHandler).wait()
  429. channelPromise.succeed(channel)
  430. XCTAssertNoThrow(
  431. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  432. .wait()
  433. )
  434. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  435. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  436. let shutdown = manager.shutdown()
  437. self.loop.run()
  438. XCTAssertNoThrow(try shutdown.wait())
  439. }
  440. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  441. // the `readyChannelMux` should error.
  442. XCTAssertThrowsError(try readyChannelMux.wait())
  443. }
  444. func testConnectOnSecondAttempt() throws {
  445. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  446. let channelFutures: [EventLoopFuture<Channel>] = [
  447. self.loop.makeFailedFuture(DoomedChannelError()),
  448. channelPromise.futureResult,
  449. ]
  450. var channelFutureIterator = channelFutures.makeIterator()
  451. var configuration = self.defaultConfiguration
  452. configuration.connectionBackoff = .oneSecondFixed
  453. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  454. guard let next = channelFutureIterator.next() else {
  455. XCTFail("Too many channels requested")
  456. return self.loop.makeFailedFuture(DoomedChannelError())
  457. }
  458. return next
  459. }
  460. let readyChannelMux = self.waitForStateChanges([
  461. Change(from: .idle, to: .connecting),
  462. Change(from: .connecting, to: .transientFailure),
  463. ]) { () -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> in
  464. // Get a HTTP/2 stream multiplexer.
  465. let readyChannelMux = manager.getHTTP2Multiplexer()
  466. self.loop.run()
  467. return readyChannelMux
  468. }
  469. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  470. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  471. self.loop.run()
  472. // Move time forwards by a second to start the next connection attempt.
  473. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  474. self.loop.advanceTime(by: .seconds(1))
  475. }
  476. // Setup the actual channel and complete the promise.
  477. let channel = EmbeddedChannel(loop: self.loop)
  478. let idleHandler = GRPCIdleHandler(
  479. connectionManager: manager,
  480. idleTimeout: .minutes(5),
  481. keepalive: .init(),
  482. logger: self.logger
  483. )
  484. let h2handler = NIOHTTP2Handler(
  485. mode: .client,
  486. eventLoop: channel.eventLoop,
  487. streamDelegate: idleHandler
  488. ) { channel in
  489. channel.eventLoop.makeSucceededVoidFuture()
  490. }
  491. try channel.pipeline.addHandler(h2handler).wait()
  492. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  493. try channel.pipeline.addHandler(idleHandler).wait()
  494. channelPromise.succeed(channel)
  495. XCTAssertNoThrow(
  496. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  497. .wait()
  498. )
  499. // Write a SETTINGS frame on the root stream.
  500. try self.waitForStateChange(from: .connecting, to: .ready) {
  501. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  502. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  503. }
  504. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  505. XCTAssertNoThrow(try readyChannelMux.wait())
  506. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  507. // Now shutdown.
  508. try self.waitForStateChange(from: .ready, to: .shutdown) {
  509. let shutdown = manager.shutdown()
  510. self.loop.run()
  511. XCTAssertNoThrow(try shutdown.wait())
  512. }
  513. }
  514. func testShutdownWhileConnecting() throws {
  515. let channelPromise = self.loop.makePromise(of: Channel.self)
  516. let manager = self.makeConnectionManager { _, _ in
  517. return channelPromise.futureResult
  518. }
  519. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  520. .waitForStateChange(from: .idle, to: .connecting) {
  521. let readyChannelMux = manager.getHTTP2Multiplexer()
  522. self.loop.run()
  523. return readyChannelMux
  524. }
  525. // Now shutdown.
  526. let shutdownFuture: EventLoopFuture<Void> = self.waitForStateChange(
  527. from: .connecting,
  528. to: .shutdown
  529. ) {
  530. let shutdown = manager.shutdown()
  531. self.loop.run()
  532. return shutdown
  533. }
  534. // The multiplexer we were requesting should fail.
  535. XCTAssertThrowsError(try readyChannelMux.wait())
  536. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  537. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  538. let channel = try channelPromise.futureResult.wait()
  539. self.loop.run()
  540. XCTAssertNoThrow(try channel.closeFuture.wait())
  541. XCTAssertNoThrow(try shutdownFuture.wait())
  542. }
  543. func testShutdownWhileTransientFailure() throws {
  544. var configuration = self.defaultConfiguration
  545. configuration.connectionBackoff = .oneSecondFixed
  546. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  547. self.loop.makeFailedFuture(DoomedChannelError())
  548. }
  549. let readyChannelMux = self.waitForStateChanges([
  550. Change(from: .idle, to: .connecting),
  551. Change(from: .connecting, to: .transientFailure),
  552. ]) { () -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> in
  553. // Get a HTTP/2 stream multiplexer.
  554. let readyChannelMux = manager.getHTTP2Multiplexer()
  555. self.loop.run()
  556. return readyChannelMux
  557. }
  558. // Now shutdown.
  559. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  560. let shutdown = manager.shutdown()
  561. self.loop.run()
  562. XCTAssertNoThrow(try shutdown.wait())
  563. }
  564. // The HTTP/2 stream mux we were requesting should fail.
  565. XCTAssertThrowsError(try readyChannelMux.wait())
  566. }
  567. func testShutdownWhileActive() throws {
  568. let channelPromise = self.loop.makePromise(of: Channel.self)
  569. let manager = self.makeConnectionManager { _, _ in
  570. return channelPromise.futureResult
  571. }
  572. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  573. .waitForStateChange(from: .idle, to: .connecting) {
  574. let readyChannelMux = manager.getHTTP2Multiplexer()
  575. self.loop.run()
  576. return readyChannelMux
  577. }
  578. // Prepare the channel
  579. let channel = EmbeddedChannel(loop: self.loop)
  580. let idleHandler = GRPCIdleHandler(
  581. connectionManager: manager,
  582. idleTimeout: .minutes(5),
  583. keepalive: .init(),
  584. logger: self.logger
  585. )
  586. let h2handler = NIOHTTP2Handler(
  587. mode: .client,
  588. eventLoop: channel.eventLoop,
  589. streamDelegate: idleHandler
  590. ) { channel in
  591. channel.eventLoop.makeSucceededVoidFuture()
  592. }
  593. try channel.pipeline.addHandler(h2handler).wait()
  594. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  595. try channel.pipeline.addHandler(idleHandler).wait()
  596. channelPromise.succeed(channel)
  597. XCTAssertNoThrow(
  598. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  599. .wait()
  600. )
  601. // (No state change expected here: active is an internal state.)
  602. // Now shutdown.
  603. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  604. let shutdown = manager.shutdown()
  605. self.loop.run()
  606. XCTAssertNoThrow(try shutdown.wait())
  607. }
  608. // The HTTP/2 stream multiplexer we were requesting should fail.
  609. XCTAssertThrowsError(try readyChannelMux.wait())
  610. }
  611. func testShutdownWhileShutdown() throws {
  612. let manager = self.makeConnectionManager()
  613. try self.waitForStateChange(from: .idle, to: .shutdown) {
  614. let firstShutdown = manager.shutdown()
  615. self.loop.run()
  616. XCTAssertNoThrow(try firstShutdown.wait())
  617. }
  618. let secondShutdown = manager.shutdown()
  619. self.loop.run()
  620. XCTAssertNoThrow(try secondShutdown.wait())
  621. }
  622. func testTransientFailureWhileActive() throws {
  623. var configuration = self.defaultConfiguration
  624. configuration.connectionBackoff = .oneSecondFixed
  625. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  626. let channelFutures: [EventLoopFuture<Channel>] = [
  627. channelPromise.futureResult,
  628. self.loop.makeFailedFuture(DoomedChannelError()),
  629. ]
  630. var channelFutureIterator = channelFutures.makeIterator()
  631. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  632. guard let next = channelFutureIterator.next() else {
  633. XCTFail("Too many channels requested")
  634. return self.loop.makeFailedFuture(DoomedChannelError())
  635. }
  636. return next
  637. }
  638. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  639. .waitForStateChange(from: .idle, to: .connecting) {
  640. let readyChannelMux = manager.getHTTP2Multiplexer()
  641. self.loop.run()
  642. return readyChannelMux
  643. }
  644. // Prepare the channel
  645. let firstChannel = EmbeddedChannel(loop: self.loop)
  646. let idleHandler = GRPCIdleHandler(
  647. connectionManager: manager,
  648. idleTimeout: .minutes(5),
  649. keepalive: .init(),
  650. logger: self.logger
  651. )
  652. let h2handler = NIOHTTP2Handler(
  653. mode: .client,
  654. eventLoop: firstChannel.eventLoop,
  655. streamDelegate: idleHandler
  656. ) { channel in
  657. channel.eventLoop.makeSucceededVoidFuture()
  658. }
  659. try firstChannel.pipeline.addHandler(h2handler).wait()
  660. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  661. try firstChannel.pipeline.addHandler(idleHandler).wait()
  662. channelPromise.succeed(firstChannel)
  663. XCTAssertNoThrow(
  664. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  665. .wait()
  666. )
  667. // (No state change expected here: active is an internal state.)
  668. // Close the channel (simulate e.g. TLS handshake failed)
  669. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  670. XCTAssertNoThrow(try firstChannel.close().wait())
  671. }
  672. // Start connecting again.
  673. self.waitForStateChanges([
  674. Change(from: .transientFailure, to: .connecting),
  675. Change(from: .connecting, to: .transientFailure),
  676. ]) {
  677. self.loop.advanceTime(by: .seconds(1))
  678. }
  679. // Now shutdown
  680. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  681. let shutdown = manager.shutdown()
  682. self.loop.run()
  683. XCTAssertNoThrow(try shutdown.wait())
  684. }
  685. // The channel never came up: it should be throw.
  686. XCTAssertThrowsError(try readyChannelMux.wait())
  687. }
  688. func testTransientFailureWhileReady() throws {
  689. var configuration = self.defaultConfiguration
  690. configuration.connectionBackoff = .oneSecondFixed
  691. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  692. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  693. let channelFutures: [EventLoopFuture<Channel>] = [
  694. firstChannelPromise.futureResult,
  695. secondChannelPromise.futureResult,
  696. ]
  697. var channelFutureIterator = channelFutures.makeIterator()
  698. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  699. guard let next = channelFutureIterator.next() else {
  700. XCTFail("Too many channels requested")
  701. return self.loop.makeFailedFuture(DoomedChannelError())
  702. }
  703. return next
  704. }
  705. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  706. .waitForStateChange(from: .idle, to: .connecting) {
  707. let readyChannelMux = manager.getHTTP2Multiplexer()
  708. self.loop.run()
  709. return readyChannelMux
  710. }
  711. // Prepare the first channel
  712. let firstChannel = EmbeddedChannel(loop: self.loop)
  713. let firstIdleHandler = GRPCIdleHandler(
  714. connectionManager: manager,
  715. idleTimeout: .minutes(5),
  716. keepalive: .init(),
  717. logger: self.logger
  718. )
  719. let firstH2handler = NIOHTTP2Handler(
  720. mode: .client,
  721. eventLoop: firstChannel.eventLoop,
  722. streamDelegate: firstIdleHandler
  723. ) { channel in
  724. channel.eventLoop.makeSucceededVoidFuture()
  725. }
  726. try firstChannel.pipeline.addHandler(firstH2handler).wait()
  727. firstIdleHandler.setMultiplexer(try firstH2handler.syncMultiplexer())
  728. try firstChannel.pipeline.addHandler(firstIdleHandler).wait()
  729. firstChannelPromise.succeed(firstChannel)
  730. XCTAssertNoThrow(
  731. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  732. .wait()
  733. )
  734. // Write a SETTINGS frame on the root stream.
  735. try self.waitForStateChange(from: .connecting, to: .ready) {
  736. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  737. XCTAssertNoThrow(try firstChannel.writeInbound(frame.encode()))
  738. }
  739. // Channel should now be ready.
  740. XCTAssertNoThrow(try readyChannelMux.wait())
  741. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  742. firstIdleHandler.streamCreated(1, channel: firstChannel)
  743. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  744. XCTAssertNoThrow(try firstChannel.close().wait())
  745. }
  746. // Run to start connecting again.
  747. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  748. self.loop.advanceTime(by: .seconds(1))
  749. }
  750. // Prepare the second channel
  751. let secondChannel = EmbeddedChannel(loop: self.loop)
  752. let secondIdleHandler = GRPCIdleHandler(
  753. connectionManager: manager,
  754. idleTimeout: .minutes(5),
  755. keepalive: .init(),
  756. logger: self.logger
  757. )
  758. let secondH2handler = NIOHTTP2Handler(
  759. mode: .client,
  760. eventLoop: secondChannel.eventLoop,
  761. streamDelegate: secondIdleHandler
  762. ) { channel in
  763. channel.eventLoop.makeSucceededVoidFuture()
  764. }
  765. try secondChannel.pipeline.addHandler(secondH2handler).wait()
  766. secondIdleHandler.setMultiplexer(try secondH2handler.syncMultiplexer())
  767. try secondChannel.pipeline.addHandler(secondIdleHandler).wait()
  768. secondChannelPromise.succeed(secondChannel)
  769. XCTAssertNoThrow(
  770. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  771. .wait()
  772. )
  773. // Write a SETTINGS frame on the root stream.
  774. try self.waitForStateChange(from: .connecting, to: .ready) {
  775. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  776. XCTAssertNoThrow(try secondChannel.writeInbound(frame.encode()))
  777. }
  778. // Now shutdown
  779. try self.waitForStateChange(from: .ready, to: .shutdown) {
  780. let shutdown = manager.shutdown()
  781. self.loop.run()
  782. XCTAssertNoThrow(try shutdown.wait())
  783. }
  784. }
  785. func testGoAwayWhenReady() throws {
  786. let channelPromise = self.loop.makePromise(of: Channel.self)
  787. let manager = self.makeConnectionManager { _, _ in
  788. return channelPromise.futureResult
  789. }
  790. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  791. .waitForStateChange(from: .idle, to: .connecting) {
  792. let readyChannelMux = manager.getHTTP2Multiplexer()
  793. self.loop.run()
  794. return readyChannelMux
  795. }
  796. // Setup the channel.
  797. let channel = EmbeddedChannel(loop: self.loop)
  798. let idleHandler = GRPCIdleHandler(
  799. connectionManager: manager,
  800. idleTimeout: .minutes(5),
  801. keepalive: .init(),
  802. logger: self.logger
  803. )
  804. let h2handler = NIOHTTP2Handler(
  805. mode: .client,
  806. eventLoop: channel.eventLoop,
  807. streamDelegate: idleHandler
  808. ) { channel in
  809. channel.eventLoop.makeSucceededVoidFuture()
  810. }
  811. try channel.pipeline.addHandler(h2handler).wait()
  812. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  813. try channel.pipeline.addHandler(idleHandler).wait()
  814. channelPromise.succeed(channel)
  815. XCTAssertNoThrow(
  816. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  817. .wait()
  818. )
  819. try self.waitForStateChange(from: .connecting, to: .ready) {
  820. // Write a SETTINGS frame on the root stream.
  821. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  822. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  823. }
  824. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  825. XCTAssertNoThrow(try readyChannelMux.wait())
  826. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  827. // channel to close.
  828. try self.waitForStateChange(from: .ready, to: .idle) {
  829. let goAway = HTTP2Frame(
  830. streamID: .rootStream,
  831. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  832. )
  833. XCTAssertNoThrow(try channel.writeInbound(goAway.encode()))
  834. self.loop.run()
  835. }
  836. self.loop.run()
  837. XCTAssertNoThrow(try channel.closeFuture.wait())
  838. // Now shutdown
  839. try self.waitForStateChange(from: .idle, to: .shutdown) {
  840. let shutdown = manager.shutdown()
  841. self.loop.run()
  842. XCTAssertNoThrow(try shutdown.wait())
  843. }
  844. }
  845. func testDoomedOptimisticChannelFromIdle() {
  846. var configuration = self.defaultConfiguration
  847. configuration.callStartBehavior = .fastFailure
  848. let manager = ConnectionManager(
  849. configuration: configuration,
  850. channelProvider: HookedChannelProvider { _, loop in
  851. return loop.makeFailedFuture(DoomedChannelError())
  852. },
  853. connectivityDelegate: nil,
  854. logger: self.logger
  855. )
  856. let candidate = manager.getHTTP2Multiplexer()
  857. self.loop.run()
  858. XCTAssertThrowsError(try candidate.wait())
  859. }
  860. func testDoomedOptimisticChannelFromConnecting() throws {
  861. var configuration = self.defaultConfiguration
  862. configuration.callStartBehavior = .fastFailure
  863. let promise = self.loop.makePromise(of: Channel.self)
  864. let manager = self.makeConnectionManager { _, _ in
  865. return promise.futureResult
  866. }
  867. self.waitForStateChange(from: .idle, to: .connecting) {
  868. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  869. _ = manager.getHTTP2Multiplexer()
  870. self.loop.run()
  871. }
  872. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  873. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  874. self.loop.run()
  875. // Fail the promise.
  876. promise.fail(DoomedChannelError())
  877. XCTAssertThrowsError(try optimisticChannelMux.wait())
  878. }
  879. func testOptimisticChannelFromTransientFailure() throws {
  880. var configuration = self.defaultConfiguration
  881. configuration.callStartBehavior = .fastFailure
  882. configuration.connectionBackoff = ConnectionBackoff()
  883. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  884. self.loop.makeFailedFuture(DoomedChannelError())
  885. }
  886. self.waitForStateChanges([
  887. Change(from: .idle, to: .connecting),
  888. Change(from: .connecting, to: .transientFailure),
  889. ]) {
  890. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  891. _ = manager.getHTTP2Multiplexer()
  892. self.loop.run()
  893. }
  894. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  895. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  896. self.loop.run()
  897. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  898. XCTAssertTrue(error is DoomedChannelError)
  899. }
  900. }
  901. func testOptimisticChannelFromShutdown() throws {
  902. var configuration = self.defaultConfiguration
  903. configuration.callStartBehavior = .fastFailure
  904. let manager = self.makeConnectionManager { _, _ in
  905. return self.loop.makeFailedFuture(DoomedChannelError())
  906. }
  907. let shutdown = manager.shutdown()
  908. self.loop.run()
  909. XCTAssertNoThrow(try shutdown.wait())
  910. // Get a channel optimistically. It'll fail, obviously.
  911. let channelMux = manager.getHTTP2Multiplexer()
  912. self.loop.run()
  913. XCTAssertThrowsError(try channelMux.wait())
  914. }
  915. func testForceIdleAfterInactive() throws {
  916. let channelPromise = self.loop.makePromise(of: Channel.self)
  917. let manager = self.makeConnectionManager { _, _ in
  918. return channelPromise.futureResult
  919. }
  920. // Start the connection.
  921. let readyChannelMux: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self
  922. .waitForStateChange(
  923. from: .idle,
  924. to: .connecting
  925. ) {
  926. let readyChannelMux = manager.getHTTP2Multiplexer()
  927. self.loop.run()
  928. return readyChannelMux
  929. }
  930. // Setup the real channel and activate it.
  931. let channel = EmbeddedChannel(loop: self.loop)
  932. let idleHandler = GRPCIdleHandler(
  933. connectionManager: manager,
  934. idleTimeout: .minutes(5),
  935. keepalive: .init(),
  936. logger: self.logger
  937. )
  938. let h2handler = NIOHTTP2Handler(
  939. mode: .client,
  940. eventLoop: channel.eventLoop,
  941. streamDelegate: idleHandler
  942. ) { channel in
  943. channel.eventLoop.makeSucceededVoidFuture()
  944. }
  945. try channel.pipeline.addHandler(h2handler).wait()
  946. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  947. XCTAssertNoThrow(try channel.pipeline.addHandler(idleHandler).wait())
  948. channelPromise.succeed(channel)
  949. self.loop.run()
  950. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  951. XCTAssertNoThrow(try connect.wait())
  952. // Write a SETTINGS frame on the root stream.
  953. try self.waitForStateChange(from: .connecting, to: .ready) {
  954. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  955. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  956. }
  957. // The channel should now be ready.
  958. XCTAssertNoThrow(try readyChannelMux.wait())
  959. // Now drop the connection.
  960. try self.waitForStateChange(from: .ready, to: .shutdown) {
  961. let shutdown = manager.shutdown()
  962. self.loop.run()
  963. XCTAssertNoThrow(try shutdown.wait())
  964. }
  965. }
  966. func testCloseWithoutActiveRPCs() throws {
  967. let channelPromise = self.loop.makePromise(of: Channel.self)
  968. let manager = self.makeConnectionManager { _, _ in
  969. return channelPromise.futureResult
  970. }
  971. // Start the connection.
  972. let readyChannelMux = self.waitForStateChange(
  973. from: .idle,
  974. to: .connecting
  975. ) { () -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> in
  976. let readyChannelMux = manager.getHTTP2Multiplexer()
  977. self.loop.run()
  978. return readyChannelMux
  979. }
  980. // Setup the actual channel and activate it.
  981. let channel = EmbeddedChannel(loop: self.loop)
  982. let idleHandler = GRPCIdleHandler(
  983. connectionManager: manager,
  984. idleTimeout: .minutes(5),
  985. keepalive: .init(),
  986. logger: self.logger
  987. )
  988. let h2handler = NIOHTTP2Handler(
  989. mode: .client,
  990. eventLoop: channel.eventLoop,
  991. streamDelegate: idleHandler
  992. ) { channel in
  993. channel.eventLoop.makeSucceededVoidFuture()
  994. }
  995. try channel.pipeline.addHandler(h2handler).wait()
  996. idleHandler.setMultiplexer(try h2handler.syncMultiplexer())
  997. XCTAssertNoThrow(try channel.pipeline.addHandler(idleHandler).wait())
  998. channelPromise.succeed(channel)
  999. self.loop.run()
  1000. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  1001. XCTAssertNoThrow(try connect.wait())
  1002. // "ready" the connection.
  1003. try self.waitForStateChange(from: .connecting, to: .ready) {
  1004. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  1005. XCTAssertNoThrow(try channel.writeInbound(frame.encode()))
  1006. }
  1007. // The HTTP/2 stream multiplexer should now be ready.
  1008. XCTAssertNoThrow(try readyChannelMux.wait())
  1009. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  1010. // failure state.
  1011. self.waitForStateChange(from: .ready, to: .idle) {
  1012. channel.pipeline.fireChannelInactive()
  1013. }
  1014. }
  1015. func testIdleErrorDoesNothing() throws {
  1016. let manager = self.makeConnectionManager()
  1017. // Dropping an error on this manager should be fine.
  1018. manager.channelError(DoomedChannelError())
  1019. // Shutting down is then safe.
  1020. try self.waitForStateChange(from: .idle, to: .shutdown) {
  1021. let shutdown = manager.shutdown()
  1022. self.loop.run()
  1023. XCTAssertNoThrow(try shutdown.wait())
  1024. }
  1025. }
  1026. func testHTTP2Delegates() throws {
  1027. let channel = EmbeddedChannel(loop: self.loop)
  1028. defer {
  1029. XCTAssertNoThrow(try channel.finish())
  1030. }
  1031. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  1032. var streamsOpened = 0
  1033. var streamsClosed = 0
  1034. var maxConcurrentStreams = 0
  1035. func streamOpened(_ connectionManager: ConnectionManager) {
  1036. self.streamsOpened += 1
  1037. }
  1038. func streamClosed(_ connectionManager: ConnectionManager) {
  1039. self.streamsClosed += 1
  1040. }
  1041. func receivedSettingsMaxConcurrentStreams(
  1042. _ connectionManager: ConnectionManager,
  1043. maxConcurrentStreams: Int
  1044. ) {
  1045. self.maxConcurrentStreams = maxConcurrentStreams
  1046. }
  1047. }
  1048. let http2 = HTTP2Delegate()
  1049. let manager = ConnectionManager(
  1050. eventLoop: self.loop,
  1051. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  1052. let idleHandler = GRPCIdleHandler(
  1053. connectionManager: manager,
  1054. idleTimeout: .minutes(5),
  1055. keepalive: ClientConnectionKeepalive(),
  1056. logger: self.logger
  1057. )
  1058. let h2Handler = NIOHTTP2Handler(
  1059. mode: .client,
  1060. eventLoop: channel.eventLoop,
  1061. streamDelegate: idleHandler
  1062. ) { channel in
  1063. channel.eventLoop.makeSucceededVoidFuture()
  1064. }
  1065. try! channel.pipeline.syncOperations.addHandler(h2Handler)
  1066. idleHandler.setMultiplexer(try! h2Handler.syncMultiplexer())
  1067. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  1068. // us to just fire stream created/closed events into the channel.
  1069. do {
  1070. try channel.pipeline.syncOperations.addHandler(idleHandler)
  1071. } catch {
  1072. return eventLoop.makeFailedFuture(error)
  1073. }
  1074. return eventLoop.makeSucceededFuture(channel)
  1075. },
  1076. callStartBehavior: .waitsForConnectivity,
  1077. connectionBackoff: ConnectionBackoff(),
  1078. connectivityDelegate: nil,
  1079. http2Delegate: http2,
  1080. logger: self.logger
  1081. )
  1082. // Start connecting.
  1083. let futureMultiplexer = manager.getHTTP2Multiplexer()
  1084. self.loop.run()
  1085. // Do the actual connecting.
  1086. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  1087. // The channel isn't ready until it's seen a SETTINGS frame.
  1088. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  1089. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  1090. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  1091. }
  1092. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42).encode()))
  1093. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  1094. // max concurrent streams.
  1095. XCTAssertNoThrow(try futureMultiplexer.wait())
  1096. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  1097. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13).encode()))
  1098. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  1099. let streamDelegate = try channel.pipeline.handler(type: GRPCIdleHandler.self).wait()
  1100. // Open some streams.
  1101. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1102. streamDelegate.streamCreated(streamID, channel: channel)
  1103. }
  1104. // ... and then close them.
  1105. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  1106. streamDelegate.streamClosed(streamID, channel: channel)
  1107. }
  1108. XCTAssertEqual(http2.streamsOpened, 4)
  1109. XCTAssertEqual(http2.streamsClosed, 4)
  1110. }
  1111. func testChannelErrorWhenConnecting() throws {
  1112. let channelPromise = self.loop.makePromise(of: Channel.self)
  1113. let manager = self.makeConnectionManager { _, _ in
  1114. return channelPromise.futureResult
  1115. }
  1116. let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = self.waitForStateChange(
  1117. from: .idle,
  1118. to: .connecting
  1119. ) {
  1120. let channel = manager.getHTTP2Multiplexer()
  1121. self.loop.run()
  1122. return channel
  1123. }
  1124. self.waitForStateChange(from: .connecting, to: .shutdown) {
  1125. manager.channelError(EventLoopError.shutdown)
  1126. }
  1127. XCTAssertThrowsError(try multiplexer.wait())
  1128. }
  1129. }
  1130. internal struct Change: Hashable, CustomStringConvertible {
  1131. var from: ConnectivityState
  1132. var to: ConnectivityState
  1133. var description: String {
  1134. return "\(self.from) → \(self.to)"
  1135. }
  1136. }
  1137. // Unchecked as all mutable state is modified from a serial queue.
  1138. extension RecordingConnectivityDelegate: @unchecked Sendable {}
  1139. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  1140. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  1141. private let semaphore = DispatchSemaphore(value: 0)
  1142. private var expectation: Expectation = .noExpectation
  1143. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  1144. private enum Expectation {
  1145. /// We have no expectation of any changes. We'll just ignore any changes.
  1146. case noExpectation
  1147. /// We expect one change.
  1148. case one((Change) -> Void)
  1149. /// We expect 'count' changes.
  1150. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  1151. var count: Int {
  1152. switch self {
  1153. case .noExpectation:
  1154. return 0
  1155. case .one:
  1156. return 1
  1157. case let .some(count, _, _):
  1158. return count
  1159. }
  1160. }
  1161. }
  1162. func connectivityStateDidChange(
  1163. from oldState: ConnectivityState,
  1164. to newState: ConnectivityState
  1165. ) {
  1166. self.serialQueue.async {
  1167. switch self.expectation {
  1168. case let .one(verify):
  1169. // We don't care about future changes.
  1170. self.expectation = .noExpectation
  1171. // Verify and notify.
  1172. verify(Change(from: oldState, to: newState))
  1173. self.semaphore.signal()
  1174. case .some(let count, var recorded, let verify):
  1175. recorded.append(Change(from: oldState, to: newState))
  1176. if recorded.count == count {
  1177. // We don't care about future changes.
  1178. self.expectation = .noExpectation
  1179. // Verify and notify.
  1180. verify(recorded)
  1181. self.semaphore.signal()
  1182. } else {
  1183. // Still need more responses.
  1184. self.expectation = .some(count: count, recorded: recorded, verify)
  1185. }
  1186. case .noExpectation:
  1187. // Ignore any changes.
  1188. ()
  1189. }
  1190. }
  1191. }
  1192. func connectionStartedQuiescing() {
  1193. self.serialQueue.async {
  1194. self.quiescingSemaphore.signal()
  1195. }
  1196. }
  1197. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1198. self.serialQueue.async {
  1199. self.expectation = .some(count: count, recorded: [], verify)
  1200. }
  1201. }
  1202. func expectChange(verify: @escaping (Change) -> Void) {
  1203. self.serialQueue.async {
  1204. self.expectation = .one(verify)
  1205. }
  1206. }
  1207. func waitForExpectedChanges(
  1208. timeout: DispatchTimeInterval,
  1209. file: StaticString = #filePath,
  1210. line: UInt = #line
  1211. ) {
  1212. let result = self.semaphore.wait(timeout: .now() + timeout)
  1213. switch result {
  1214. case .success:
  1215. ()
  1216. case .timedOut:
  1217. XCTFail(
  1218. "Timed out before verifying \(self.expectation.count) change(s)",
  1219. file: file, line: line
  1220. )
  1221. }
  1222. }
  1223. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1224. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1225. switch result {
  1226. case .success:
  1227. ()
  1228. case .timedOut:
  1229. XCTFail("Timed out waiting for connection to start quiescing")
  1230. }
  1231. }
  1232. }
  1233. extension ConnectionBackoff {
  1234. fileprivate static let oneSecondFixed = ConnectionBackoff(
  1235. initialBackoff: 1.0,
  1236. maximumBackoff: 1.0,
  1237. multiplier: 1.0,
  1238. jitter: 0.0
  1239. )
  1240. }
  1241. private struct DoomedChannelError: Error {}
  1242. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1243. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1244. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1245. self.provider = provider
  1246. }
  1247. func makeChannel(
  1248. managedBy connectionManager: ConnectionManager,
  1249. onEventLoop eventLoop: EventLoop,
  1250. connectTimeout: TimeAmount?,
  1251. logger: Logger
  1252. ) -> EventLoopFuture<Channel> {
  1253. return self.provider(connectionManager, eventLoop)
  1254. }
  1255. }
  1256. extension ConnectionManager {
  1257. // For backwards compatibility, to avoid large diffs in these tests.
  1258. fileprivate func shutdown() -> EventLoopFuture<Void> {
  1259. return self.shutdown(mode: .forceful)
  1260. }
  1261. }