ConnectionManagerTests.swift 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIOCore
  20. import NIOEmbedded
  21. import NIOHTTP2
  22. import XCTest
  23. class ConnectionManagerTests: GRPCTestCase {
  24. private let loop = EmbeddedEventLoop()
  25. private let recorder = RecordingConnectivityDelegate()
  26. private var monitor: ConnectivityStateMonitor!
  27. private var defaultConfiguration: ClientConnection.Configuration {
  28. var configuration = ClientConnection.Configuration.default(
  29. target: .unixDomainSocket("/ignored"),
  30. eventLoopGroup: self.loop
  31. )
  32. configuration.connectionBackoff = nil
  33. configuration.backgroundActivityLogger = self.clientLogger
  34. return configuration
  35. }
  36. override func setUp() {
  37. super.setUp()
  38. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func makeConnectionManager(
  45. configuration config: ClientConnection.Configuration? = nil,
  46. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  47. ) -> ConnectionManager {
  48. let configuration = config ?? self.defaultConfiguration
  49. return ConnectionManager(
  50. configuration: configuration,
  51. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  52. connectivityDelegate: self.monitor,
  53. logger: self.logger
  54. )
  55. }
  56. private func waitForStateChange<Result>(
  57. from: ConnectivityState,
  58. to: ConnectivityState,
  59. timeout: DispatchTimeInterval = .seconds(1),
  60. file: StaticString = #file,
  61. line: UInt = #line,
  62. body: () throws -> Result
  63. ) rethrows -> Result {
  64. self.recorder.expectChange {
  65. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  66. }
  67. let result = try body()
  68. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  69. return result
  70. }
  71. private func waitForStateChanges<Result>(
  72. _ changes: [Change],
  73. timeout: DispatchTimeInterval = .seconds(1),
  74. file: StaticString = #file,
  75. line: UInt = #line,
  76. body: () throws -> Result
  77. ) rethrows -> Result {
  78. self.recorder.expectChanges(changes.count) {
  79. XCTAssertEqual($0, changes)
  80. }
  81. let result = try body()
  82. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  83. return result
  84. }
  85. }
  86. extension ConnectionManagerTests {
  87. func testIdleShutdown() throws {
  88. let manager = self.makeConnectionManager()
  89. try self.waitForStateChange(from: .idle, to: .shutdown) {
  90. let shutdown = manager.shutdown()
  91. self.loop.run()
  92. XCTAssertNoThrow(try shutdown.wait())
  93. }
  94. // Getting a multiplexer should fail.
  95. let multiplexer = manager.getHTTP2Multiplexer()
  96. self.loop.run()
  97. XCTAssertThrowsError(try multiplexer.wait())
  98. }
  99. func testConnectFromIdleFailsWithNoReconnect() {
  100. let channelPromise = self.loop.makePromise(of: Channel.self)
  101. let manager = self.makeConnectionManager { _, _ in
  102. return channelPromise.futureResult
  103. }
  104. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  105. .waitForStateChange(from: .idle, to: .connecting) {
  106. let channel = manager.getHTTP2Multiplexer()
  107. self.loop.run()
  108. return channel
  109. }
  110. self.waitForStateChange(from: .connecting, to: .shutdown) {
  111. channelPromise.fail(DoomedChannelError())
  112. }
  113. XCTAssertThrowsError(try multiplexer.wait()) {
  114. XCTAssertTrue($0 is DoomedChannelError)
  115. }
  116. }
  117. func testConnectAndDisconnect() throws {
  118. let channelPromise = self.loop.makePromise(of: Channel.self)
  119. let manager = self.makeConnectionManager { _, _ in
  120. return channelPromise.futureResult
  121. }
  122. // Start the connection.
  123. self.waitForStateChange(from: .idle, to: .connecting) {
  124. _ = manager.getHTTP2Multiplexer()
  125. self.loop.run()
  126. }
  127. // Setup the real channel and activate it.
  128. let channel = EmbeddedChannel(loop: self.loop)
  129. let h2mux = HTTP2StreamMultiplexer(
  130. mode: .client,
  131. channel: channel,
  132. inboundStreamInitializer: nil
  133. )
  134. try channel.pipeline.addHandler(
  135. GRPCIdleHandler(
  136. connectionManager: manager,
  137. multiplexer: h2mux,
  138. idleTimeout: .minutes(5),
  139. keepalive: .init(),
  140. logger: self.logger
  141. )
  142. ).wait()
  143. channelPromise.succeed(channel)
  144. XCTAssertNoThrow(
  145. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  146. .wait()
  147. )
  148. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  149. try self.waitForStateChange(from: .connecting, to: .ready) {
  150. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  151. XCTAssertNoThrow(try channel.writeInbound(frame))
  152. }
  153. // Close the channel.
  154. try self.waitForStateChange(from: .ready, to: .shutdown) {
  155. // Now the channel should be available: shut it down.
  156. let shutdown = manager.shutdown()
  157. self.loop.run()
  158. XCTAssertNoThrow(try shutdown.wait())
  159. }
  160. }
  161. func testConnectAndIdle() throws {
  162. let channelPromise = self.loop.makePromise(of: Channel.self)
  163. let manager = self.makeConnectionManager { _, _ in
  164. return channelPromise.futureResult
  165. }
  166. // Start the connection.
  167. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  168. .waitForStateChange(from: .idle, to: .connecting) {
  169. let readyChannelMux = manager.getHTTP2Multiplexer()
  170. self.loop.run()
  171. return readyChannelMux
  172. }
  173. // Setup the channel.
  174. let channel = EmbeddedChannel(loop: self.loop)
  175. let h2mux = HTTP2StreamMultiplexer(
  176. mode: .client,
  177. channel: channel,
  178. inboundStreamInitializer: nil
  179. )
  180. try channel.pipeline.addHandler(
  181. GRPCIdleHandler(
  182. connectionManager: manager,
  183. multiplexer: h2mux,
  184. idleTimeout: .minutes(5),
  185. keepalive: .init(),
  186. logger: self.logger
  187. )
  188. ).wait()
  189. channelPromise.succeed(channel)
  190. XCTAssertNoThrow(
  191. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  192. .wait()
  193. )
  194. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  195. try self.waitForStateChange(from: .connecting, to: .ready) {
  196. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  197. XCTAssertNoThrow(try channel.writeInbound(frame))
  198. // Wait for the multiplexer, it _must_ be ready now.
  199. XCTAssertNoThrow(try readyChannelMux.wait())
  200. }
  201. // Go idle. This will shutdown the channel.
  202. try self.waitForStateChange(from: .ready, to: .idle) {
  203. self.loop.advanceTime(by: .minutes(5))
  204. XCTAssertNoThrow(try channel.closeFuture.wait())
  205. }
  206. // Now shutdown.
  207. try self.waitForStateChange(from: .idle, to: .shutdown) {
  208. let shutdown = manager.shutdown()
  209. self.loop.run()
  210. XCTAssertNoThrow(try shutdown.wait())
  211. }
  212. }
  213. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  214. let channelPromise = self.loop.makePromise(of: Channel.self)
  215. let manager = self.makeConnectionManager { _, _ in
  216. return channelPromise.futureResult
  217. }
  218. // Start the connection.
  219. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  220. .waitForStateChange(from: .idle, to: .connecting) {
  221. let readyChannelMux = manager.getHTTP2Multiplexer()
  222. self.loop.run()
  223. return readyChannelMux
  224. }
  225. // Setup the channel.
  226. let channel = EmbeddedChannel(loop: self.loop)
  227. let h2mux = HTTP2StreamMultiplexer(
  228. mode: .client,
  229. channel: channel,
  230. inboundStreamInitializer: nil
  231. )
  232. try channel.pipeline.addHandler(
  233. GRPCIdleHandler(
  234. connectionManager: manager,
  235. multiplexer: h2mux,
  236. idleTimeout: .minutes(5),
  237. keepalive: .init(),
  238. logger: self.logger
  239. )
  240. ).wait()
  241. channelPromise.succeed(channel)
  242. XCTAssertNoThrow(
  243. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  244. .wait()
  245. )
  246. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  247. try self.waitForStateChange(from: .connecting, to: .ready) {
  248. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  249. XCTAssertNoThrow(try channel.writeInbound(frame))
  250. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  251. XCTAssertNoThrow(try readyChannelMux.wait())
  252. }
  253. // "create" a stream; the details don't matter here.
  254. let streamCreated = NIOHTTP2StreamCreatedEvent(
  255. streamID: 1,
  256. localInitialWindowSize: nil,
  257. remoteInitialWindowSize: nil
  258. )
  259. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  260. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  261. self.loop.advanceTime(by: .minutes(5))
  262. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  263. self.waitForStateChange(from: .ready, to: .idle) {
  264. // Close the stream.
  265. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  266. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  267. // ... wait for the idle timeout,
  268. self.loop.advanceTime(by: .minutes(5))
  269. }
  270. // Now shutdown.
  271. try self.waitForStateChange(from: .idle, to: .shutdown) {
  272. let shutdown = manager.shutdown()
  273. self.loop.run()
  274. XCTAssertNoThrow(try shutdown.wait())
  275. }
  276. }
  277. func testConnectAndThenBecomeInactive() throws {
  278. let channelPromise = self.loop.makePromise(of: Channel.self)
  279. let manager = self.makeConnectionManager { _, _ in
  280. return channelPromise.futureResult
  281. }
  282. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  283. .waitForStateChange(from: .idle, to: .connecting) {
  284. let readyChannelMux = manager.getHTTP2Multiplexer()
  285. self.loop.run()
  286. return readyChannelMux
  287. }
  288. // Setup the channel.
  289. let channel = EmbeddedChannel(loop: self.loop)
  290. let h2mux = HTTP2StreamMultiplexer(
  291. mode: .client,
  292. channel: channel,
  293. inboundStreamInitializer: nil
  294. )
  295. try channel.pipeline.addHandler(
  296. GRPCIdleHandler(
  297. connectionManager: manager,
  298. multiplexer: h2mux,
  299. idleTimeout: .minutes(5),
  300. keepalive: .init(),
  301. logger: self.logger
  302. )
  303. ).wait()
  304. channelPromise.succeed(channel)
  305. XCTAssertNoThrow(
  306. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  307. .wait()
  308. )
  309. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  310. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  311. let shutdown = manager.shutdown()
  312. self.loop.run()
  313. XCTAssertNoThrow(try shutdown.wait())
  314. }
  315. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  316. // the `readyChannelMux` should error.
  317. XCTAssertThrowsError(try readyChannelMux.wait())
  318. }
  319. func testConnectOnSecondAttempt() throws {
  320. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  321. let channelFutures: [EventLoopFuture<Channel>] = [
  322. self.loop.makeFailedFuture(DoomedChannelError()),
  323. channelPromise.futureResult,
  324. ]
  325. var channelFutureIterator = channelFutures.makeIterator()
  326. var configuration = self.defaultConfiguration
  327. configuration.connectionBackoff = .oneSecondFixed
  328. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  329. guard let next = channelFutureIterator.next() else {
  330. XCTFail("Too many channels requested")
  331. return self.loop.makeFailedFuture(DoomedChannelError())
  332. }
  333. return next
  334. }
  335. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  336. Change(from: .idle, to: .connecting),
  337. Change(from: .connecting, to: .transientFailure),
  338. ]) {
  339. // Get a HTTP/2 stream multiplexer.
  340. let readyChannelMux = manager.getHTTP2Multiplexer()
  341. self.loop.run()
  342. return readyChannelMux
  343. }
  344. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  345. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  346. self.loop.run()
  347. // Move time forwards by a second to start the next connection attempt.
  348. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  349. self.loop.advanceTime(by: .seconds(1))
  350. }
  351. // Setup the actual channel and complete the promise.
  352. let channel = EmbeddedChannel(loop: self.loop)
  353. let h2mux = HTTP2StreamMultiplexer(
  354. mode: .client,
  355. channel: channel,
  356. inboundStreamInitializer: nil
  357. )
  358. try channel.pipeline.addHandler(
  359. GRPCIdleHandler(
  360. connectionManager: manager,
  361. multiplexer: h2mux,
  362. idleTimeout: .minutes(5),
  363. keepalive: .init(),
  364. logger: self.logger
  365. )
  366. ).wait()
  367. channelPromise.succeed(channel)
  368. XCTAssertNoThrow(
  369. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  370. .wait()
  371. )
  372. // Write a SETTINGS frame on the root stream.
  373. try self.waitForStateChange(from: .connecting, to: .ready) {
  374. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  375. XCTAssertNoThrow(try channel.writeInbound(frame))
  376. }
  377. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  378. XCTAssertNoThrow(try readyChannelMux.wait())
  379. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  380. // Now shutdown.
  381. try self.waitForStateChange(from: .ready, to: .shutdown) {
  382. let shutdown = manager.shutdown()
  383. self.loop.run()
  384. XCTAssertNoThrow(try shutdown.wait())
  385. }
  386. }
  387. func testShutdownWhileConnecting() throws {
  388. let channelPromise = self.loop.makePromise(of: Channel.self)
  389. let manager = self.makeConnectionManager { _, _ in
  390. return channelPromise.futureResult
  391. }
  392. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  393. .waitForStateChange(from: .idle, to: .connecting) {
  394. let readyChannelMux = manager.getHTTP2Multiplexer()
  395. self.loop.run()
  396. return readyChannelMux
  397. }
  398. // Now shutdown.
  399. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  400. let shutdown = manager.shutdown()
  401. self.loop.run()
  402. XCTAssertNoThrow(try shutdown.wait())
  403. }
  404. // The multiplexer we were requesting should fail.
  405. XCTAssertThrowsError(try readyChannelMux.wait())
  406. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  407. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  408. let channel = try channelPromise.futureResult.wait()
  409. self.loop.run()
  410. XCTAssertNoThrow(try channel.closeFuture.wait())
  411. }
  412. func testShutdownWhileTransientFailure() throws {
  413. var configuration = self.defaultConfiguration
  414. configuration.connectionBackoff = .oneSecondFixed
  415. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  416. self.loop.makeFailedFuture(DoomedChannelError())
  417. }
  418. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  419. Change(from: .idle, to: .connecting),
  420. Change(from: .connecting, to: .transientFailure),
  421. ]) {
  422. // Get a HTTP/2 stream multiplexer.
  423. let readyChannelMux = manager.getHTTP2Multiplexer()
  424. self.loop.run()
  425. return readyChannelMux
  426. }
  427. // Now shutdown.
  428. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  429. let shutdown = manager.shutdown()
  430. self.loop.run()
  431. XCTAssertNoThrow(try shutdown.wait())
  432. }
  433. // The HTTP/2 stream mux we were requesting should fail.
  434. XCTAssertThrowsError(try readyChannelMux.wait())
  435. }
  436. func testShutdownWhileActive() throws {
  437. let channelPromise = self.loop.makePromise(of: Channel.self)
  438. let manager = self.makeConnectionManager { _, _ in
  439. return channelPromise.futureResult
  440. }
  441. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  442. .waitForStateChange(from: .idle, to: .connecting) {
  443. let readyChannelMux = manager.getHTTP2Multiplexer()
  444. self.loop.run()
  445. return readyChannelMux
  446. }
  447. // Prepare the channel
  448. let channel = EmbeddedChannel(loop: self.loop)
  449. let h2mux = HTTP2StreamMultiplexer(
  450. mode: .client,
  451. channel: channel,
  452. inboundStreamInitializer: nil
  453. )
  454. try channel.pipeline.addHandler(
  455. GRPCIdleHandler(
  456. connectionManager: manager,
  457. multiplexer: h2mux,
  458. idleTimeout: .minutes(5),
  459. keepalive: .init(),
  460. logger: self.logger
  461. )
  462. ).wait()
  463. channelPromise.succeed(channel)
  464. XCTAssertNoThrow(
  465. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  466. .wait()
  467. )
  468. // (No state change expected here: active is an internal state.)
  469. // Now shutdown.
  470. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  471. let shutdown = manager.shutdown()
  472. self.loop.run()
  473. XCTAssertNoThrow(try shutdown.wait())
  474. }
  475. // The HTTP/2 stream multiplexer we were requesting should fail.
  476. XCTAssertThrowsError(try readyChannelMux.wait())
  477. }
  478. func testShutdownWhileShutdown() throws {
  479. let manager = self.makeConnectionManager()
  480. try self.waitForStateChange(from: .idle, to: .shutdown) {
  481. let firstShutdown = manager.shutdown()
  482. self.loop.run()
  483. XCTAssertNoThrow(try firstShutdown.wait())
  484. }
  485. let secondShutdown = manager.shutdown()
  486. self.loop.run()
  487. XCTAssertNoThrow(try secondShutdown.wait())
  488. }
  489. func testTransientFailureWhileActive() throws {
  490. var configuration = self.defaultConfiguration
  491. configuration.connectionBackoff = .oneSecondFixed
  492. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  493. let channelFutures: [EventLoopFuture<Channel>] = [
  494. channelPromise.futureResult,
  495. self.loop.makeFailedFuture(DoomedChannelError()),
  496. ]
  497. var channelFutureIterator = channelFutures.makeIterator()
  498. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  499. guard let next = channelFutureIterator.next() else {
  500. XCTFail("Too many channels requested")
  501. return self.loop.makeFailedFuture(DoomedChannelError())
  502. }
  503. return next
  504. }
  505. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  506. .waitForStateChange(from: .idle, to: .connecting) {
  507. let readyChannelMux = manager.getHTTP2Multiplexer()
  508. self.loop.run()
  509. return readyChannelMux
  510. }
  511. // Prepare the channel
  512. let firstChannel = EmbeddedChannel(loop: self.loop)
  513. let h2mux = HTTP2StreamMultiplexer(
  514. mode: .client,
  515. channel: firstChannel,
  516. inboundStreamInitializer: nil
  517. )
  518. try firstChannel.pipeline.addHandler(
  519. GRPCIdleHandler(
  520. connectionManager: manager,
  521. multiplexer: h2mux,
  522. idleTimeout: .minutes(5),
  523. keepalive: .init(),
  524. logger: self.logger
  525. )
  526. ).wait()
  527. channelPromise.succeed(firstChannel)
  528. XCTAssertNoThrow(
  529. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  530. .wait()
  531. )
  532. // (No state change expected here: active is an internal state.)
  533. // Close the channel (simulate e.g. TLS handshake failed)
  534. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  535. XCTAssertNoThrow(try firstChannel.close().wait())
  536. }
  537. // Start connecting again.
  538. self.waitForStateChanges([
  539. Change(from: .transientFailure, to: .connecting),
  540. Change(from: .connecting, to: .transientFailure),
  541. ]) {
  542. self.loop.advanceTime(by: .seconds(1))
  543. }
  544. // Now shutdown
  545. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  546. let shutdown = manager.shutdown()
  547. self.loop.run()
  548. XCTAssertNoThrow(try shutdown.wait())
  549. }
  550. // The channel never came up: it should be throw.
  551. XCTAssertThrowsError(try readyChannelMux.wait())
  552. }
  553. func testTransientFailureWhileReady() throws {
  554. var configuration = self.defaultConfiguration
  555. configuration.connectionBackoff = .oneSecondFixed
  556. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  557. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  558. let channelFutures: [EventLoopFuture<Channel>] = [
  559. firstChannelPromise.futureResult,
  560. secondChannelPromise.futureResult,
  561. ]
  562. var channelFutureIterator = channelFutures.makeIterator()
  563. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  564. guard let next = channelFutureIterator.next() else {
  565. XCTFail("Too many channels requested")
  566. return self.loop.makeFailedFuture(DoomedChannelError())
  567. }
  568. return next
  569. }
  570. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  571. .waitForStateChange(from: .idle, to: .connecting) {
  572. let readyChannelMux = manager.getHTTP2Multiplexer()
  573. self.loop.run()
  574. return readyChannelMux
  575. }
  576. // Prepare the first channel
  577. let firstChannel = EmbeddedChannel(loop: self.loop)
  578. let firstH2mux = HTTP2StreamMultiplexer(
  579. mode: .client,
  580. channel: firstChannel,
  581. inboundStreamInitializer: nil
  582. )
  583. try firstChannel.pipeline.addHandler(
  584. GRPCIdleHandler(
  585. connectionManager: manager,
  586. multiplexer: firstH2mux,
  587. idleTimeout: .minutes(5),
  588. keepalive: .init(),
  589. logger: self.logger
  590. )
  591. ).wait()
  592. firstChannelPromise.succeed(firstChannel)
  593. XCTAssertNoThrow(
  594. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  595. .wait()
  596. )
  597. // Write a SETTINGS frame on the root stream.
  598. try self.waitForStateChange(from: .connecting, to: .ready) {
  599. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  600. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  601. }
  602. // Channel should now be ready.
  603. XCTAssertNoThrow(try readyChannelMux.wait())
  604. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  605. let streamCreated = NIOHTTP2StreamCreatedEvent(
  606. streamID: 1,
  607. localInitialWindowSize: nil,
  608. remoteInitialWindowSize: nil
  609. )
  610. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  611. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  612. XCTAssertNoThrow(try firstChannel.close().wait())
  613. }
  614. // Run to start connecting again.
  615. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  616. self.loop.advanceTime(by: .seconds(1))
  617. }
  618. // Prepare the second channel
  619. let secondChannel = EmbeddedChannel(loop: self.loop)
  620. let secondH2mux = HTTP2StreamMultiplexer(
  621. mode: .client,
  622. channel: secondChannel,
  623. inboundStreamInitializer: nil
  624. )
  625. try secondChannel.pipeline.addHandler(
  626. GRPCIdleHandler(
  627. connectionManager: manager,
  628. multiplexer: secondH2mux,
  629. idleTimeout: .minutes(5),
  630. keepalive: .init(),
  631. logger: self.logger
  632. )
  633. ).wait()
  634. secondChannelPromise.succeed(secondChannel)
  635. XCTAssertNoThrow(
  636. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  637. .wait()
  638. )
  639. // Write a SETTINGS frame on the root stream.
  640. try self.waitForStateChange(from: .connecting, to: .ready) {
  641. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  642. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  643. }
  644. // Now shutdown
  645. try self.waitForStateChange(from: .ready, to: .shutdown) {
  646. let shutdown = manager.shutdown()
  647. self.loop.run()
  648. XCTAssertNoThrow(try shutdown.wait())
  649. }
  650. }
  651. func testGoAwayWhenReady() throws {
  652. let channelPromise = self.loop.makePromise(of: Channel.self)
  653. let manager = self.makeConnectionManager { _, _ in
  654. return channelPromise.futureResult
  655. }
  656. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  657. .waitForStateChange(from: .idle, to: .connecting) {
  658. let readyChannelMux = manager.getHTTP2Multiplexer()
  659. self.loop.run()
  660. return readyChannelMux
  661. }
  662. // Setup the channel.
  663. let channel = EmbeddedChannel(loop: self.loop)
  664. let h2mux = HTTP2StreamMultiplexer(
  665. mode: .client,
  666. channel: channel,
  667. inboundStreamInitializer: nil
  668. )
  669. try channel.pipeline.addHandler(
  670. GRPCIdleHandler(
  671. connectionManager: manager,
  672. multiplexer: h2mux,
  673. idleTimeout: .minutes(5),
  674. keepalive: .init(),
  675. logger: self.logger
  676. )
  677. ).wait()
  678. channelPromise.succeed(channel)
  679. XCTAssertNoThrow(
  680. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  681. .wait()
  682. )
  683. try self.waitForStateChange(from: .connecting, to: .ready) {
  684. // Write a SETTINGS frame on the root stream.
  685. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  686. XCTAssertNoThrow(try channel.writeInbound(frame))
  687. }
  688. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  689. XCTAssertNoThrow(try readyChannelMux.wait())
  690. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  691. // channel to close.
  692. try self.waitForStateChange(from: .ready, to: .idle) {
  693. let goAway = HTTP2Frame(
  694. streamID: .rootStream,
  695. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  696. )
  697. XCTAssertNoThrow(try channel.writeInbound(goAway))
  698. }
  699. self.loop.run()
  700. XCTAssertNoThrow(try channel.closeFuture.wait())
  701. // Now shutdown
  702. try self.waitForStateChange(from: .idle, to: .shutdown) {
  703. let shutdown = manager.shutdown()
  704. self.loop.run()
  705. XCTAssertNoThrow(try shutdown.wait())
  706. }
  707. }
  708. func testDoomedOptimisticChannelFromIdle() {
  709. var configuration = self.defaultConfiguration
  710. configuration.callStartBehavior = .fastFailure
  711. let manager = ConnectionManager(
  712. configuration: configuration,
  713. channelProvider: HookedChannelProvider { _, loop in
  714. return loop.makeFailedFuture(DoomedChannelError())
  715. },
  716. connectivityDelegate: nil,
  717. logger: self.logger
  718. )
  719. let candidate = manager.getHTTP2Multiplexer()
  720. self.loop.run()
  721. XCTAssertThrowsError(try candidate.wait())
  722. }
  723. func testDoomedOptimisticChannelFromConnecting() throws {
  724. var configuration = self.defaultConfiguration
  725. configuration.callStartBehavior = .fastFailure
  726. let promise = self.loop.makePromise(of: Channel.self)
  727. let manager = self.makeConnectionManager { _, _ in
  728. return promise.futureResult
  729. }
  730. self.waitForStateChange(from: .idle, to: .connecting) {
  731. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  732. _ = manager.getHTTP2Multiplexer()
  733. self.loop.run()
  734. }
  735. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  736. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  737. self.loop.run()
  738. // Fail the promise.
  739. promise.fail(DoomedChannelError())
  740. XCTAssertThrowsError(try optimisticChannelMux.wait())
  741. }
  742. func testOptimisticChannelFromTransientFailure() throws {
  743. var configuration = self.defaultConfiguration
  744. configuration.callStartBehavior = .fastFailure
  745. configuration.connectionBackoff = ConnectionBackoff()
  746. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  747. self.loop.makeFailedFuture(DoomedChannelError())
  748. }
  749. self.waitForStateChanges([
  750. Change(from: .idle, to: .connecting),
  751. Change(from: .connecting, to: .transientFailure),
  752. ]) {
  753. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  754. _ = manager.getHTTP2Multiplexer()
  755. self.loop.run()
  756. }
  757. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  758. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  759. self.loop.run()
  760. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  761. XCTAssertTrue(error is DoomedChannelError)
  762. }
  763. }
  764. func testOptimisticChannelFromShutdown() throws {
  765. var configuration = self.defaultConfiguration
  766. configuration.callStartBehavior = .fastFailure
  767. let manager = self.makeConnectionManager { _, _ in
  768. return self.loop.makeFailedFuture(DoomedChannelError())
  769. }
  770. let shutdown = manager.shutdown()
  771. self.loop.run()
  772. XCTAssertNoThrow(try shutdown.wait())
  773. // Get a channel optimistically. It'll fail, obviously.
  774. let channelMux = manager.getHTTP2Multiplexer()
  775. self.loop.run()
  776. XCTAssertThrowsError(try channelMux.wait())
  777. }
  778. func testForceIdleAfterInactive() throws {
  779. let channelPromise = self.loop.makePromise(of: Channel.self)
  780. let manager = self.makeConnectionManager { _, _ in
  781. return channelPromise.futureResult
  782. }
  783. // Start the connection.
  784. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  785. from: .idle,
  786. to: .connecting
  787. ) {
  788. let readyChannelMux = manager.getHTTP2Multiplexer()
  789. self.loop.run()
  790. return readyChannelMux
  791. }
  792. // Setup the real channel and activate it.
  793. let channel = EmbeddedChannel(loop: self.loop)
  794. let h2mux = HTTP2StreamMultiplexer(
  795. mode: .client,
  796. channel: channel,
  797. inboundStreamInitializer: nil
  798. )
  799. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  800. GRPCIdleHandler(
  801. connectionManager: manager,
  802. multiplexer: h2mux,
  803. idleTimeout: .minutes(5),
  804. keepalive: .init(),
  805. logger: self.logger
  806. ),
  807. ]).wait())
  808. channelPromise.succeed(channel)
  809. self.loop.run()
  810. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  811. XCTAssertNoThrow(try connect.wait())
  812. // Write a SETTINGS frame on the root stream.
  813. try self.waitForStateChange(from: .connecting, to: .ready) {
  814. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  815. XCTAssertNoThrow(try channel.writeInbound(frame))
  816. }
  817. // The channel should now be ready.
  818. XCTAssertNoThrow(try readyChannelMux.wait())
  819. // Now drop the connection.
  820. try self.waitForStateChange(from: .ready, to: .shutdown) {
  821. let shutdown = manager.shutdown()
  822. self.loop.run()
  823. XCTAssertNoThrow(try shutdown.wait())
  824. }
  825. }
  826. func testCloseWithoutActiveRPCs() throws {
  827. let channelPromise = self.loop.makePromise(of: Channel.self)
  828. let manager = self.makeConnectionManager { _, _ in
  829. return channelPromise.futureResult
  830. }
  831. // Start the connection.
  832. let readyChannelMux = self.waitForStateChange(
  833. from: .idle,
  834. to: .connecting
  835. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  836. let readyChannelMux = manager.getHTTP2Multiplexer()
  837. self.loop.run()
  838. return readyChannelMux
  839. }
  840. // Setup the actual channel and activate it.
  841. let channel = EmbeddedChannel(loop: self.loop)
  842. let h2mux = HTTP2StreamMultiplexer(
  843. mode: .client,
  844. channel: channel,
  845. inboundStreamInitializer: nil
  846. )
  847. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  848. GRPCIdleHandler(
  849. connectionManager: manager,
  850. multiplexer: h2mux,
  851. idleTimeout: .minutes(5),
  852. keepalive: .init(),
  853. logger: self.logger
  854. ),
  855. ]).wait())
  856. channelPromise.succeed(channel)
  857. self.loop.run()
  858. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  859. XCTAssertNoThrow(try connect.wait())
  860. // "ready" the connection.
  861. try self.waitForStateChange(from: .connecting, to: .ready) {
  862. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  863. XCTAssertNoThrow(try channel.writeInbound(frame))
  864. }
  865. // The HTTP/2 stream multiplexer should now be ready.
  866. XCTAssertNoThrow(try readyChannelMux.wait())
  867. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  868. // failure state.
  869. self.waitForStateChange(from: .ready, to: .idle) {
  870. channel.pipeline.fireChannelInactive()
  871. }
  872. }
  873. func testIdleErrorDoesNothing() throws {
  874. let manager = self.makeConnectionManager()
  875. // Dropping an error on this manager should be fine.
  876. manager.channelError(DoomedChannelError())
  877. // Shutting down is then safe.
  878. try self.waitForStateChange(from: .idle, to: .shutdown) {
  879. let shutdown = manager.shutdown()
  880. self.loop.run()
  881. XCTAssertNoThrow(try shutdown.wait())
  882. }
  883. }
  884. func testHTTP2Delegates() throws {
  885. let channel = EmbeddedChannel(loop: self.loop)
  886. defer {
  887. XCTAssertNoThrow(try channel.finish())
  888. }
  889. let multiplexer = HTTP2StreamMultiplexer(
  890. mode: .client,
  891. channel: channel,
  892. inboundStreamInitializer: nil
  893. )
  894. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  895. var streamsClosed = 0
  896. var maxConcurrentStreams = 0
  897. func streamClosed(_ connectionManager: ConnectionManager) {
  898. self.streamsClosed += 1
  899. }
  900. func receivedSettingsMaxConcurrentStreams(
  901. _ connectionManager: ConnectionManager,
  902. maxConcurrentStreams: Int
  903. ) {
  904. self.maxConcurrentStreams = maxConcurrentStreams
  905. }
  906. }
  907. let http2 = HTTP2Delegate()
  908. let manager = ConnectionManager(
  909. eventLoop: self.loop,
  910. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  911. let idleHandler = GRPCIdleHandler(
  912. connectionManager: manager,
  913. multiplexer: multiplexer,
  914. idleTimeout: .minutes(5),
  915. keepalive: ClientConnectionKeepalive(),
  916. logger: self.logger
  917. )
  918. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  919. // us to just fire stream created/closed events into the channel.
  920. do {
  921. try channel.pipeline.syncOperations.addHandler(idleHandler)
  922. } catch {
  923. return eventLoop.makeFailedFuture(error)
  924. }
  925. return eventLoop.makeSucceededFuture(channel)
  926. },
  927. callStartBehavior: .waitsForConnectivity,
  928. connectionBackoff: ConnectionBackoff(),
  929. connectivityDelegate: nil,
  930. http2Delegate: http2,
  931. logger: self.logger
  932. )
  933. // Start connecting.
  934. let futureMultiplexer = manager.getHTTP2Multiplexer()
  935. self.loop.run()
  936. // Do the actual connecting.
  937. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  938. // The channel isn't ready until it's seen a SETTINGS frame.
  939. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  940. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  941. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  942. }
  943. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  944. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  945. // max concurrent streams.
  946. XCTAssertNoThrow(try futureMultiplexer.wait())
  947. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  948. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  949. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  950. // Open some streams.
  951. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  952. let streamCreated = NIOHTTP2StreamCreatedEvent(
  953. streamID: streamID,
  954. localInitialWindowSize: nil,
  955. remoteInitialWindowSize: nil
  956. )
  957. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  958. }
  959. // ... and then close them.
  960. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  961. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  962. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  963. }
  964. XCTAssertEqual(http2.streamsClosed, 4)
  965. }
  966. }
  967. internal struct Change: Hashable, CustomStringConvertible {
  968. var from: ConnectivityState
  969. var to: ConnectivityState
  970. var description: String {
  971. return "\(self.from) → \(self.to)"
  972. }
  973. }
  974. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  975. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  976. private let semaphore = DispatchSemaphore(value: 0)
  977. private var expectation: Expectation = .noExpectation
  978. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  979. private enum Expectation {
  980. /// We have no expectation of any changes. We'll just ignore any changes.
  981. case noExpectation
  982. /// We expect one change.
  983. case one((Change) -> Void)
  984. /// We expect 'count' changes.
  985. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  986. var count: Int {
  987. switch self {
  988. case .noExpectation:
  989. return 0
  990. case .one:
  991. return 1
  992. case let .some(count, _, _):
  993. return count
  994. }
  995. }
  996. }
  997. func connectivityStateDidChange(from oldState: ConnectivityState,
  998. to newState: ConnectivityState) {
  999. self.serialQueue.async {
  1000. switch self.expectation {
  1001. case let .one(verify):
  1002. // We don't care about future changes.
  1003. self.expectation = .noExpectation
  1004. // Verify and notify.
  1005. verify(Change(from: oldState, to: newState))
  1006. self.semaphore.signal()
  1007. case .some(let count, var recorded, let verify):
  1008. recorded.append(Change(from: oldState, to: newState))
  1009. if recorded.count == count {
  1010. // We don't care about future changes.
  1011. self.expectation = .noExpectation
  1012. // Verify and notify.
  1013. verify(recorded)
  1014. self.semaphore.signal()
  1015. } else {
  1016. // Still need more responses.
  1017. self.expectation = .some(count: count, recorded: recorded, verify)
  1018. }
  1019. case .noExpectation:
  1020. // Ignore any changes.
  1021. ()
  1022. }
  1023. }
  1024. }
  1025. func connectionStartedQuiescing() {
  1026. self.serialQueue.async {
  1027. self.quiescingSemaphore.signal()
  1028. }
  1029. }
  1030. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1031. self.serialQueue.async {
  1032. self.expectation = .some(count: count, recorded: [], verify)
  1033. }
  1034. }
  1035. func expectChange(verify: @escaping (Change) -> Void) {
  1036. self.serialQueue.async {
  1037. self.expectation = .one(verify)
  1038. }
  1039. }
  1040. func waitForExpectedChanges(
  1041. timeout: DispatchTimeInterval,
  1042. file: StaticString = #file,
  1043. line: UInt = #line
  1044. ) {
  1045. let result = self.semaphore.wait(timeout: .now() + timeout)
  1046. switch result {
  1047. case .success:
  1048. ()
  1049. case .timedOut:
  1050. XCTFail(
  1051. "Timed out before verifying \(self.expectation.count) change(s)",
  1052. file: file, line: line
  1053. )
  1054. }
  1055. }
  1056. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1057. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1058. switch result {
  1059. case .success:
  1060. ()
  1061. case .timedOut:
  1062. XCTFail("Timed out waiting for connection to start quiescing")
  1063. }
  1064. }
  1065. }
  1066. private extension ConnectionBackoff {
  1067. static let oneSecondFixed = ConnectionBackoff(
  1068. initialBackoff: 1.0,
  1069. maximumBackoff: 1.0,
  1070. multiplier: 1.0,
  1071. jitter: 0.0
  1072. )
  1073. }
  1074. private struct DoomedChannelError: Error {}
  1075. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1076. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1077. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1078. self.provider = provider
  1079. }
  1080. func makeChannel(
  1081. managedBy connectionManager: ConnectionManager,
  1082. onEventLoop eventLoop: EventLoop,
  1083. connectTimeout: TimeAmount?,
  1084. logger: Logger
  1085. ) -> EventLoopFuture<Channel> {
  1086. return self.provider(connectionManager, eventLoop)
  1087. }
  1088. }