ConnectionManagerTests.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHTTP2
  19. import XCTest
  20. class ConnectionManagerTests: GRPCTestCase {
  21. private let loop = EmbeddedEventLoop()
  22. private let recorder = RecordingConnectivityDelegate()
  23. private var defaultConfiguration: ClientConnection.Configuration {
  24. return ClientConnection.Configuration(
  25. target: .unixDomainSocket("/ignored"),
  26. eventLoopGroup: self.loop,
  27. connectivityStateDelegate: self.recorder,
  28. connectionBackoff: nil
  29. )
  30. }
  31. override func tearDown() {
  32. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  33. super.tearDown()
  34. }
  35. private func waitForStateChange<Result>(
  36. from: ConnectivityState,
  37. to: ConnectivityState,
  38. timeout: DispatchTimeInterval = .seconds(1),
  39. body: () throws -> Result
  40. ) rethrows -> Result {
  41. self.recorder.expectChange {
  42. XCTAssertEqual($0, Change(from: from, to: to))
  43. }
  44. let result = try body()
  45. self.recorder.waitForExpectedChanges(timeout: timeout)
  46. return result
  47. }
  48. private func waitForStateChanges<Result>(
  49. _ changes: [Change],
  50. timeout: DispatchTimeInterval = .seconds(1),
  51. body: () throws -> Result
  52. ) rethrows -> Result {
  53. self.recorder.expectChanges(changes.count) {
  54. XCTAssertEqual($0, changes)
  55. }
  56. let result = try body()
  57. self.recorder.waitForExpectedChanges(timeout: timeout)
  58. return result
  59. }
  60. }
  61. extension ConnectionManagerTests {
  62. func testIdleShutdown() throws {
  63. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  64. try self.waitForStateChange(from: .idle, to: .shutdown) {
  65. let shutdown = manager.shutdown()
  66. self.loop.run()
  67. XCTAssertNoThrow(try shutdown.wait())
  68. }
  69. // Getting a channel should fail.
  70. let channel = manager.getChannel()
  71. self.loop.run()
  72. XCTAssertThrowsError(try channel.wait())
  73. }
  74. func testConnectFromIdleFailsWithNoReconnect() {
  75. let channelPromise = self.loop.makePromise(of: Channel.self)
  76. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  77. return channelPromise.futureResult
  78. }
  79. let channel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  80. let channel = manager.getChannel()
  81. self.loop.run()
  82. return channel
  83. }
  84. self.waitForStateChange(from: .connecting, to: .shutdown) {
  85. channelPromise.fail(DoomedChannelError())
  86. }
  87. XCTAssertThrowsError(try channel.wait()) {
  88. XCTAssertTrue($0 is DoomedChannelError)
  89. }
  90. }
  91. func testConnectAndDisconnect() throws {
  92. let channelPromise = self.loop.makePromise(of: Channel.self)
  93. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  94. return channelPromise.futureResult
  95. }
  96. // Start the connection.
  97. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  98. let readyChannel = manager.getChannel()
  99. self.loop.run()
  100. return readyChannel
  101. }
  102. // Setup the real channel and activate it.
  103. let channel = EmbeddedChannel(
  104. handler: GRPCIdleHandler(mode: .client(manager)),
  105. loop: self.loop
  106. )
  107. channelPromise.succeed(channel)
  108. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  109. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  110. try self.waitForStateChange(from: .connecting, to: .ready) {
  111. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  112. XCTAssertNoThrow(try channel.writeInbound(frame))
  113. }
  114. // Close the channel.
  115. try self.waitForStateChange(from: .ready, to: .shutdown) {
  116. // Now the channel should be available: shut it down,
  117. XCTAssertNoThrow(try readyChannel.flatMap { $0.close(mode: .all) }.wait())
  118. }
  119. }
  120. func testConnectAndIdle() throws {
  121. let channelPromise = self.loop.makePromise(of: Channel.self)
  122. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  123. return channelPromise.futureResult
  124. }
  125. // Start the connection.
  126. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  127. let readyChannel = manager.getChannel()
  128. self.loop.run()
  129. return readyChannel
  130. }
  131. // Setup the channel.
  132. let channel = EmbeddedChannel(
  133. handler: GRPCIdleHandler(mode: .client(manager)),
  134. loop: self.loop
  135. )
  136. channelPromise.succeed(channel)
  137. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  138. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  139. try self.waitForStateChange(from: .connecting, to: .ready) {
  140. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  141. XCTAssertNoThrow(try channel.writeInbound(frame))
  142. // Wait for the channel, it _must_ be ready now.
  143. XCTAssertNoThrow(try readyChannel.wait())
  144. }
  145. // Go idle. This will shutdown the channel.
  146. try self.waitForStateChange(from: .ready, to: .idle) {
  147. self.loop.advanceTime(by: .minutes(5))
  148. XCTAssertNoThrow(try readyChannel.flatMap { $0.closeFuture }.wait())
  149. }
  150. // Now shutdown.
  151. try self.waitForStateChange(from: .idle, to: .shutdown) {
  152. let shutdown = manager.shutdown()
  153. self.loop.run()
  154. XCTAssertNoThrow(try shutdown.wait())
  155. }
  156. }
  157. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  158. let channelPromise = self.loop.makePromise(of: Channel.self)
  159. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  160. return channelPromise.futureResult
  161. }
  162. // Start the connection.
  163. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  164. let readyChannel = manager.getChannel()
  165. self.loop.run()
  166. return readyChannel
  167. }
  168. // Setup the channel.
  169. let channel = EmbeddedChannel(
  170. handler: GRPCIdleHandler(mode: .client(manager)),
  171. loop: self.loop
  172. )
  173. channelPromise.succeed(channel)
  174. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  175. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  176. try self.waitForStateChange(from: .connecting, to: .ready) {
  177. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  178. XCTAssertNoThrow(try channel.writeInbound(frame))
  179. // Wait for the channel, it _must_ be ready now.
  180. XCTAssertNoThrow(try readyChannel.wait())
  181. }
  182. // "create" a stream; the details don't matter here.
  183. let streamCreated = NIOHTTP2StreamCreatedEvent(
  184. streamID: 1,
  185. localInitialWindowSize: nil,
  186. remoteInitialWindowSize: nil
  187. )
  188. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  189. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  190. self.loop.advanceTime(by: .minutes(5))
  191. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  192. self.waitForStateChange(from: .ready, to: .idle) {
  193. // Close the stream.
  194. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  195. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  196. // ... wait for the idle timeout,
  197. self.loop.advanceTime(by: .minutes(5))
  198. }
  199. // Now shutdown.
  200. try self.waitForStateChange(from: .idle, to: .shutdown) {
  201. let shutdown = manager.shutdown()
  202. self.loop.run()
  203. XCTAssertNoThrow(try shutdown.wait())
  204. }
  205. }
  206. func testConnectAndThenBecomeInactive() throws {
  207. let channelPromise = self.loop.makePromise(of: Channel.self)
  208. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  209. return channelPromise.futureResult
  210. }
  211. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  212. let readyChannel = manager.getChannel()
  213. self.loop.run()
  214. return readyChannel
  215. }
  216. // Setup the channel.
  217. let channel = EmbeddedChannel(
  218. handler: GRPCIdleHandler(mode: .client(manager)),
  219. loop: self.loop
  220. )
  221. channelPromise.succeed(channel)
  222. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  223. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  224. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  225. XCTAssertNoThrow(try channel.close(mode: .all).wait())
  226. }
  227. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  228. // the `readyChannel` should error.
  229. XCTAssertThrowsError(try readyChannel.wait())
  230. }
  231. func testConnectOnSecondAttempt() throws {
  232. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  233. let channelFutures: [EventLoopFuture<Channel>] = [
  234. self.loop.makeFailedFuture(DoomedChannelError()),
  235. channelPromise.futureResult
  236. ]
  237. var channelFutureIterator = channelFutures.makeIterator()
  238. var configuration = self.defaultConfiguration
  239. configuration.connectionBackoff = .oneSecondFixed
  240. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  241. guard let next = channelFutureIterator.next() else {
  242. XCTFail("Too many channels requested")
  243. return self.loop.makeFailedFuture(DoomedChannelError())
  244. }
  245. return next
  246. }
  247. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  248. Change(from: .idle, to: .connecting),
  249. Change(from: .connecting, to: .transientFailure)
  250. ]) {
  251. // Get a channel.
  252. let readyChannel = manager.getChannel()
  253. self.loop.run()
  254. return readyChannel
  255. }
  256. // Get a channel from the manager: it is a future for the same channel.
  257. let anotherReadyChannel = manager.getChannel()
  258. self.loop.run()
  259. // Move time forwards by a second to start the next connection attempt.
  260. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  261. self.loop.advanceTime(by: .seconds(1))
  262. }
  263. // Setup the actual channel and complete the promise.
  264. let channel = EmbeddedChannel(
  265. handler: GRPCIdleHandler(mode: .client(manager)),
  266. loop: self.loop
  267. )
  268. channelPromise.succeed(channel)
  269. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  270. // Write a SETTINGS frame on the root stream.
  271. try self.waitForStateChange(from: .connecting, to: .ready) {
  272. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  273. XCTAssertNoThrow(try channel.writeInbound(frame))
  274. }
  275. // Wait for the channel, it _must_ be ready now.
  276. XCTAssertNoThrow(try readyChannel.wait())
  277. XCTAssertNoThrow(try anotherReadyChannel.wait())
  278. // Now shutdown.
  279. try self.waitForStateChange(from: .ready, to: .shutdown) {
  280. let shutdown = manager.shutdown()
  281. self.loop.run()
  282. XCTAssertNoThrow(try shutdown.wait())
  283. }
  284. }
  285. func testShutdownWhileConnecting() throws {
  286. let channelPromise = self.loop.makePromise(of: Channel.self)
  287. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  288. return channelPromise.futureResult
  289. }
  290. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  291. let readyChannel = manager.getChannel()
  292. self.loop.run()
  293. return readyChannel
  294. }
  295. // Now shutdown.
  296. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  297. let shutdown = manager.shutdown()
  298. self.loop.run()
  299. XCTAssertNoThrow(try shutdown.wait())
  300. }
  301. // The channel we were requesting should fail.
  302. XCTAssertThrowsError(try readyChannel.wait())
  303. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  304. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  305. let channel = try channelPromise.futureResult.wait()
  306. self.loop.run()
  307. XCTAssertNoThrow(try channel.closeFuture.wait())
  308. }
  309. func testShutdownWhileTransientFailure() throws {
  310. var configuration = self.defaultConfiguration
  311. configuration.connectionBackoff = .oneSecondFixed
  312. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  313. return self.loop.makeFailedFuture(DoomedChannelError())
  314. }
  315. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChanges([
  316. Change(from: .idle, to: .connecting),
  317. Change(from: .connecting, to: .transientFailure)
  318. ]) {
  319. // Get a channel.
  320. let readyChannel = manager.getChannel()
  321. self.loop.run()
  322. return readyChannel
  323. }
  324. // Now shutdown.
  325. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  326. let shutdown = manager.shutdown()
  327. self.loop.run()
  328. XCTAssertNoThrow(try shutdown.wait())
  329. }
  330. // The channel we were requesting should fail.
  331. XCTAssertThrowsError(try readyChannel.wait())
  332. }
  333. func testShutdownWhileActive() throws {
  334. let channelPromise = self.loop.makePromise(of: Channel.self)
  335. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  336. return channelPromise.futureResult
  337. }
  338. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  339. let readyChannel = manager.getChannel()
  340. self.loop.run()
  341. return readyChannel
  342. }
  343. // Prepare the channel
  344. let channel = EmbeddedChannel(
  345. handler: GRPCIdleHandler(mode: .client(manager)),
  346. loop: self.loop
  347. )
  348. channelPromise.succeed(channel)
  349. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  350. // (No state change expected here: active is an internal state.)
  351. // Now shutdown.
  352. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  353. let shutdown = manager.shutdown()
  354. self.loop.run()
  355. XCTAssertNoThrow(try shutdown.wait())
  356. }
  357. // The channel we were requesting should fail.
  358. XCTAssertThrowsError(try readyChannel.wait())
  359. }
  360. func testShutdownWhileShutdown() throws {
  361. let manager = ConnectionManager(configuration: self.defaultConfiguration, logger: self.logger)
  362. try self.waitForStateChange(from: .idle, to: .shutdown) {
  363. let firstShutdown = manager.shutdown()
  364. self.loop.run()
  365. XCTAssertNoThrow(try firstShutdown.wait())
  366. }
  367. let secondShutdown = manager.shutdown()
  368. self.loop.run()
  369. XCTAssertNoThrow(try secondShutdown.wait())
  370. }
  371. func testTransientFailureWhileActive() throws {
  372. var configuration = self.defaultConfiguration
  373. configuration.connectionBackoff = .oneSecondFixed
  374. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  375. let channelFutures: [EventLoopFuture<Channel>] = [
  376. channelPromise.futureResult,
  377. self.loop.makeFailedFuture(DoomedChannelError())
  378. ]
  379. var channelFutureIterator = channelFutures.makeIterator()
  380. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  381. guard let next = channelFutureIterator.next() else {
  382. XCTFail("Too many channels requested")
  383. return self.loop.makeFailedFuture(DoomedChannelError())
  384. }
  385. return next
  386. }
  387. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  388. let readyChannel = manager.getChannel()
  389. self.loop.run()
  390. return readyChannel
  391. }
  392. // Prepare the channel
  393. let firstChannel = EmbeddedChannel(
  394. handler: GRPCIdleHandler(mode: .client(manager)),
  395. loop: self.loop
  396. )
  397. channelPromise.succeed(firstChannel)
  398. XCTAssertNoThrow(try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  399. // (No state change expected here: active is an internal state.)
  400. // Close the channel (simulate e.g. TLS handshake failed)
  401. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  402. XCTAssertNoThrow(try firstChannel.close().wait())
  403. }
  404. // Start connecting again.
  405. self.waitForStateChanges([
  406. Change(from: .transientFailure, to: .connecting),
  407. Change(from: .connecting, to: .transientFailure)
  408. ]) {
  409. self.loop.advanceTime(by: .seconds(1))
  410. }
  411. // Now shutdown
  412. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  413. let shutdown = manager.shutdown()
  414. self.loop.run()
  415. XCTAssertNoThrow(try shutdown.wait())
  416. }
  417. // The channel never came up: it should be throw.
  418. XCTAssertThrowsError(try readyChannel.wait())
  419. }
  420. func testTransientFailureWhileReady() throws {
  421. var configuration = self.defaultConfiguration
  422. configuration.connectionBackoff = .oneSecondFixed
  423. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  424. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  425. let channelFutures: [EventLoopFuture<Channel>] = [
  426. firstChannelPromise.futureResult,
  427. secondChannelPromise.futureResult
  428. ]
  429. var channelFutureIterator = channelFutures.makeIterator()
  430. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  431. guard let next = channelFutureIterator.next() else {
  432. XCTFail("Too many channels requested")
  433. return self.loop.makeFailedFuture(DoomedChannelError())
  434. }
  435. return next
  436. }
  437. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  438. let readyChannel = manager.getChannel()
  439. self.loop.run()
  440. return readyChannel
  441. }
  442. // Prepare the first channel
  443. let firstChannel = EmbeddedChannel(
  444. handler: GRPCIdleHandler(mode: .client(manager)),
  445. loop: self.loop
  446. )
  447. firstChannelPromise.succeed(firstChannel)
  448. XCTAssertNoThrow(try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  449. // Write a SETTINGS frame on the root stream.
  450. try self.waitForStateChange(from: .connecting, to: .ready) {
  451. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  452. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  453. }
  454. // Channel should now be ready.
  455. XCTAssertNoThrow(try readyChannel.wait())
  456. // Kill the first channel.
  457. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  458. XCTAssertNoThrow(try firstChannel.close().wait())
  459. }
  460. // Run to start connecting again.
  461. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  462. self.loop.advanceTime(by: .seconds(1))
  463. }
  464. // Prepare the second channel
  465. let secondChannel = EmbeddedChannel(
  466. handler: GRPCIdleHandler(mode: .client(manager)),
  467. loop: self.loop
  468. )
  469. secondChannelPromise.succeed(secondChannel)
  470. XCTAssertNoThrow(try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  471. // Write a SETTINGS frame on the root stream.
  472. try self.waitForStateChange(from: .connecting, to: .ready) {
  473. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  474. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  475. }
  476. // Now shutdown
  477. try self.waitForStateChange(from: .ready, to: .shutdown) {
  478. let shutdown = manager.shutdown()
  479. self.loop.run()
  480. XCTAssertNoThrow(try shutdown.wait())
  481. }
  482. }
  483. func testGoAwayWhenReady() throws {
  484. let channelPromise = self.loop.makePromise(of: Channel.self)
  485. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  486. return channelPromise.futureResult
  487. }
  488. let readyChannel: EventLoopFuture<Channel> = self.waitForStateChange(from: .idle, to: .connecting) {
  489. let readyChannel = manager.getChannel()
  490. self.loop.run()
  491. return readyChannel
  492. }
  493. // Setup the channel.
  494. let channel = EmbeddedChannel(
  495. handler: GRPCIdleHandler(mode: .client(manager)),
  496. loop: self.loop
  497. )
  498. channelPromise.succeed(channel)
  499. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")).wait())
  500. try self.waitForStateChange(from: .connecting, to: .ready) {
  501. // Write a SETTINGS frame on the root stream.
  502. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  503. XCTAssertNoThrow(try channel.writeInbound(frame))
  504. }
  505. // Wait for the channel, it _must_ be ready now.
  506. XCTAssertNoThrow(try readyChannel.wait())
  507. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  508. // channel to close.
  509. try self.waitForStateChange(from: .ready, to: .idle) {
  510. let goAway = HTTP2Frame(
  511. streamID: .rootStream,
  512. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  513. )
  514. XCTAssertNoThrow(try channel.writeInbound(goAway))
  515. }
  516. self.loop.run()
  517. XCTAssertNoThrow(try channel.closeFuture.wait())
  518. // Now shutdown
  519. try self.waitForStateChange(from: .idle, to: .shutdown) {
  520. let shutdown = manager.shutdown()
  521. self.loop.run()
  522. XCTAssertNoThrow(try shutdown.wait())
  523. }
  524. }
  525. func testDoomedOptimisticChannelFromIdle() {
  526. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  527. return self.loop.makeFailedFuture(DoomedChannelError())
  528. }
  529. let candidate = manager.getOptimisticChannel()
  530. self.loop.run()
  531. XCTAssertThrowsError(try candidate.wait())
  532. }
  533. func testDoomedOptimisticChannelFromConnecting() throws {
  534. let promise = self.loop.makePromise(of: Channel.self)
  535. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  536. return promise.futureResult
  537. }
  538. self.waitForStateChange(from: .idle, to: .connecting) {
  539. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  540. _ = manager.getChannel()
  541. self.loop.run()
  542. }
  543. // We're connecting: get an optimistic channel.
  544. let optimisticChannel = manager.getOptimisticChannel()
  545. self.loop.run()
  546. // Fail the promise.
  547. promise.fail(DoomedChannelError())
  548. XCTAssertThrowsError(try optimisticChannel.wait())
  549. }
  550. func testOptimisticChannelFromTransientFailure() throws {
  551. var configuration = self.defaultConfiguration
  552. configuration.connectionBackoff = ConnectionBackoff()
  553. let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
  554. return self.loop.makeFailedFuture(DoomedChannelError())
  555. }
  556. self.waitForStateChanges([
  557. Change(from: .idle, to: .connecting),
  558. Change(from: .connecting, to: .transientFailure)
  559. ]) {
  560. // Trigger channel creation, and a connection attempt, we don't care about the channel.
  561. _ = manager.getChannel()
  562. self.loop.run()
  563. }
  564. // Now we're sitting in transient failure. Get a channel optimistically.
  565. let optimisticChannel = manager.getOptimisticChannel()
  566. self.loop.run()
  567. XCTAssertThrowsError(try optimisticChannel.wait())
  568. }
  569. func testOptimisticChannelFromShutdown() throws {
  570. let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
  571. return self.loop.makeFailedFuture(DoomedChannelError())
  572. }
  573. let shutdown = manager.shutdown()
  574. self.loop.run()
  575. XCTAssertNoThrow(try shutdown.wait())
  576. // Get a channel optimistically. It'll fail, obviously.
  577. let channel = manager.getOptimisticChannel()
  578. self.loop.run()
  579. XCTAssertThrowsError(try channel.wait())
  580. }
  581. }
  582. internal struct Change: Hashable, CustomStringConvertible {
  583. var from: ConnectivityState
  584. var to: ConnectivityState
  585. var description: String {
  586. return "\(self.from) → \(self.to)"
  587. }
  588. }
  589. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  590. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  591. private let semaphore = DispatchSemaphore(value: 0)
  592. private var expectation: Expectation = .noExpectation
  593. private enum Expectation {
  594. /// We have no expectation of any changes. We'll just ignore any changes.
  595. case noExpectation
  596. /// We expect one change.
  597. case one((Change) -> ())
  598. /// We expect 'count' changes.
  599. case some(count: Int, recorded: [Change], ([Change]) -> ())
  600. var count: Int {
  601. switch self {
  602. case .noExpectation:
  603. return 0
  604. case .one:
  605. return 1
  606. case .some(let count, _, _):
  607. return count
  608. }
  609. }
  610. }
  611. func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
  612. self.serialQueue.async {
  613. switch self.expectation {
  614. case .one(let verify):
  615. // We don't care about future changes.
  616. self.expectation = .noExpectation
  617. // Verify and notify.
  618. verify(Change(from: oldState, to: newState))
  619. self.semaphore.signal()
  620. case .some(let count, var recorded, let verify):
  621. recorded.append(Change(from: oldState, to: newState))
  622. if recorded.count == count {
  623. // We don't care about future changes.
  624. self.expectation = .noExpectation
  625. // Verify and notify.
  626. verify(recorded)
  627. self.semaphore.signal()
  628. } else {
  629. // Still need more responses.
  630. self.expectation = .some(count: count, recorded: recorded, verify)
  631. }
  632. case .noExpectation:
  633. // Ignore any changes.
  634. ()
  635. }
  636. }
  637. }
  638. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> ()) {
  639. self.serialQueue.async {
  640. self.expectation = .some(count: count, recorded: [], verify)
  641. }
  642. }
  643. func expectChange(verify: @escaping (Change) -> ()) {
  644. self.serialQueue.async {
  645. self.expectation = .one(verify)
  646. }
  647. }
  648. func waitForExpectedChanges(timeout: DispatchTimeInterval) {
  649. let result = self.semaphore.wait(timeout: .now() + timeout)
  650. switch result {
  651. case .success:
  652. ()
  653. case .timedOut:
  654. XCTFail("Timed out before verifying \(self.expectation.count) change(s)")
  655. }
  656. }
  657. }
  658. fileprivate extension ConnectionBackoff {
  659. static let oneSecondFixed = ConnectionBackoff(
  660. initialBackoff: 1.0,
  661. maximumBackoff: 1.0,
  662. multiplier: 1.0,
  663. jitter: 0.0
  664. )
  665. }
  666. fileprivate struct DoomedChannelError: Error {}