ConnectionManagerTests.swift 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import Logging
  19. import NIO
  20. import NIOHTTP2
  21. import XCTest
  22. class ConnectionManagerTests: GRPCTestCase {
  23. private let loop = EmbeddedEventLoop()
  24. private let recorder = RecordingConnectivityDelegate()
  25. private var monitor: ConnectivityStateMonitor!
  26. private var defaultConfiguration: ClientConnection.Configuration {
  27. var configuration = ClientConnection.Configuration.default(
  28. target: .unixDomainSocket("/ignored"),
  29. eventLoopGroup: self.loop
  30. )
  31. configuration.connectionBackoff = nil
  32. configuration.backgroundActivityLogger = self.clientLogger
  33. return configuration
  34. }
  35. override func setUp() {
  36. super.setUp()
  37. self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
  38. }
  39. override func tearDown() {
  40. XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
  41. super.tearDown()
  42. }
  43. private func makeConnectionManager(
  44. configuration config: ClientConnection.Configuration? = nil,
  45. channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
  46. ) -> ConnectionManager {
  47. let configuration = config ?? self.defaultConfiguration
  48. return ConnectionManager(
  49. configuration: configuration,
  50. channelProvider: channelProvider.map { HookedChannelProvider($0) },
  51. connectivityDelegate: self.monitor,
  52. logger: self.logger
  53. )
  54. }
  55. private func waitForStateChange<Result>(
  56. from: ConnectivityState,
  57. to: ConnectivityState,
  58. timeout: DispatchTimeInterval = .seconds(1),
  59. file: StaticString = #file,
  60. line: UInt = #line,
  61. body: () throws -> Result
  62. ) rethrows -> Result {
  63. self.recorder.expectChange {
  64. XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
  65. }
  66. let result = try body()
  67. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  68. return result
  69. }
  70. private func waitForStateChanges<Result>(
  71. _ changes: [Change],
  72. timeout: DispatchTimeInterval = .seconds(1),
  73. file: StaticString = #file,
  74. line: UInt = #line,
  75. body: () throws -> Result
  76. ) rethrows -> Result {
  77. self.recorder.expectChanges(changes.count) {
  78. XCTAssertEqual($0, changes)
  79. }
  80. let result = try body()
  81. self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
  82. return result
  83. }
  84. }
  85. extension ConnectionManagerTests {
  86. func testIdleShutdown() throws {
  87. let manager = self.makeConnectionManager()
  88. try self.waitForStateChange(from: .idle, to: .shutdown) {
  89. let shutdown = manager.shutdown()
  90. self.loop.run()
  91. XCTAssertNoThrow(try shutdown.wait())
  92. }
  93. // Getting a multiplexer should fail.
  94. let multiplexer = manager.getHTTP2Multiplexer()
  95. self.loop.run()
  96. XCTAssertThrowsError(try multiplexer.wait())
  97. }
  98. func testConnectFromIdleFailsWithNoReconnect() {
  99. let channelPromise = self.loop.makePromise(of: Channel.self)
  100. let manager = self.makeConnectionManager { _, _ in
  101. return channelPromise.futureResult
  102. }
  103. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
  104. .waitForStateChange(from: .idle, to: .connecting) {
  105. let channel = manager.getHTTP2Multiplexer()
  106. self.loop.run()
  107. return channel
  108. }
  109. self.waitForStateChange(from: .connecting, to: .shutdown) {
  110. channelPromise.fail(DoomedChannelError())
  111. }
  112. XCTAssertThrowsError(try multiplexer.wait()) {
  113. XCTAssertTrue($0 is DoomedChannelError)
  114. }
  115. }
  116. func testConnectAndDisconnect() throws {
  117. let channelPromise = self.loop.makePromise(of: Channel.self)
  118. let manager = self.makeConnectionManager { _, _ in
  119. return channelPromise.futureResult
  120. }
  121. // Start the connection.
  122. self.waitForStateChange(from: .idle, to: .connecting) {
  123. _ = manager.getHTTP2Multiplexer()
  124. self.loop.run()
  125. }
  126. // Setup the real channel and activate it.
  127. let channel = EmbeddedChannel(loop: self.loop)
  128. let h2mux = HTTP2StreamMultiplexer(
  129. mode: .client,
  130. channel: channel,
  131. inboundStreamInitializer: nil
  132. )
  133. try channel.pipeline.addHandler(
  134. GRPCIdleHandler(
  135. connectionManager: manager,
  136. multiplexer: h2mux,
  137. idleTimeout: .minutes(5),
  138. keepalive: .init(),
  139. logger: self.logger
  140. )
  141. ).wait()
  142. channelPromise.succeed(channel)
  143. XCTAssertNoThrow(
  144. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  145. .wait()
  146. )
  147. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  148. try self.waitForStateChange(from: .connecting, to: .ready) {
  149. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  150. XCTAssertNoThrow(try channel.writeInbound(frame))
  151. }
  152. // Close the channel.
  153. try self.waitForStateChange(from: .ready, to: .shutdown) {
  154. // Now the channel should be available: shut it down.
  155. let shutdown = manager.shutdown()
  156. self.loop.run()
  157. XCTAssertNoThrow(try shutdown.wait())
  158. }
  159. }
  160. func testConnectAndIdle() throws {
  161. let channelPromise = self.loop.makePromise(of: Channel.self)
  162. let manager = self.makeConnectionManager { _, _ in
  163. return channelPromise.futureResult
  164. }
  165. // Start the connection.
  166. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  167. .waitForStateChange(from: .idle, to: .connecting) {
  168. let readyChannelMux = manager.getHTTP2Multiplexer()
  169. self.loop.run()
  170. return readyChannelMux
  171. }
  172. // Setup the channel.
  173. let channel = EmbeddedChannel(loop: self.loop)
  174. let h2mux = HTTP2StreamMultiplexer(
  175. mode: .client,
  176. channel: channel,
  177. inboundStreamInitializer: nil
  178. )
  179. try channel.pipeline.addHandler(
  180. GRPCIdleHandler(
  181. connectionManager: manager,
  182. multiplexer: h2mux,
  183. idleTimeout: .minutes(5),
  184. keepalive: .init(),
  185. logger: self.logger
  186. )
  187. ).wait()
  188. channelPromise.succeed(channel)
  189. XCTAssertNoThrow(
  190. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  191. .wait()
  192. )
  193. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  194. try self.waitForStateChange(from: .connecting, to: .ready) {
  195. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  196. XCTAssertNoThrow(try channel.writeInbound(frame))
  197. // Wait for the multiplexer, it _must_ be ready now.
  198. XCTAssertNoThrow(try readyChannelMux.wait())
  199. }
  200. // Go idle. This will shutdown the channel.
  201. try self.waitForStateChange(from: .ready, to: .idle) {
  202. self.loop.advanceTime(by: .minutes(5))
  203. XCTAssertNoThrow(try channel.closeFuture.wait())
  204. }
  205. // Now shutdown.
  206. try self.waitForStateChange(from: .idle, to: .shutdown) {
  207. let shutdown = manager.shutdown()
  208. self.loop.run()
  209. XCTAssertNoThrow(try shutdown.wait())
  210. }
  211. }
  212. func testIdleTimeoutWhenThereAreActiveStreams() throws {
  213. let channelPromise = self.loop.makePromise(of: Channel.self)
  214. let manager = self.makeConnectionManager { _, _ in
  215. return channelPromise.futureResult
  216. }
  217. // Start the connection.
  218. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  219. .waitForStateChange(from: .idle, to: .connecting) {
  220. let readyChannelMux = manager.getHTTP2Multiplexer()
  221. self.loop.run()
  222. return readyChannelMux
  223. }
  224. // Setup the channel.
  225. let channel = EmbeddedChannel(loop: self.loop)
  226. let h2mux = HTTP2StreamMultiplexer(
  227. mode: .client,
  228. channel: channel,
  229. inboundStreamInitializer: nil
  230. )
  231. try channel.pipeline.addHandler(
  232. GRPCIdleHandler(
  233. connectionManager: manager,
  234. multiplexer: h2mux,
  235. idleTimeout: .minutes(5),
  236. keepalive: .init(),
  237. logger: self.logger
  238. )
  239. ).wait()
  240. channelPromise.succeed(channel)
  241. XCTAssertNoThrow(
  242. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  243. .wait()
  244. )
  245. // Write a settings frame on the root stream; this'll make the channel 'ready'.
  246. try self.waitForStateChange(from: .connecting, to: .ready) {
  247. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  248. XCTAssertNoThrow(try channel.writeInbound(frame))
  249. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  250. XCTAssertNoThrow(try readyChannelMux.wait())
  251. }
  252. // "create" a stream; the details don't matter here.
  253. let streamCreated = NIOHTTP2StreamCreatedEvent(
  254. streamID: 1,
  255. localInitialWindowSize: nil,
  256. remoteInitialWindowSize: nil
  257. )
  258. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  259. // Wait for the idle timeout: this should _not_ cause the channel to idle.
  260. self.loop.advanceTime(by: .minutes(5))
  261. // Now we're going to close the stream and wait for an idle timeout and then shutdown.
  262. self.waitForStateChange(from: .ready, to: .idle) {
  263. // Close the stream.
  264. let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
  265. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  266. // ... wait for the idle timeout,
  267. self.loop.advanceTime(by: .minutes(5))
  268. }
  269. // Now shutdown.
  270. try self.waitForStateChange(from: .idle, to: .shutdown) {
  271. let shutdown = manager.shutdown()
  272. self.loop.run()
  273. XCTAssertNoThrow(try shutdown.wait())
  274. }
  275. }
  276. func testConnectAndThenBecomeInactive() throws {
  277. let channelPromise = self.loop.makePromise(of: Channel.self)
  278. let manager = self.makeConnectionManager { _, _ in
  279. return channelPromise.futureResult
  280. }
  281. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  282. .waitForStateChange(from: .idle, to: .connecting) {
  283. let readyChannelMux = manager.getHTTP2Multiplexer()
  284. self.loop.run()
  285. return readyChannelMux
  286. }
  287. // Setup the channel.
  288. let channel = EmbeddedChannel(loop: self.loop)
  289. let h2mux = HTTP2StreamMultiplexer(
  290. mode: .client,
  291. channel: channel,
  292. inboundStreamInitializer: nil
  293. )
  294. try channel.pipeline.addHandler(
  295. GRPCIdleHandler(
  296. connectionManager: manager,
  297. multiplexer: h2mux,
  298. idleTimeout: .minutes(5),
  299. keepalive: .init(),
  300. logger: self.logger
  301. )
  302. ).wait()
  303. channelPromise.succeed(channel)
  304. XCTAssertNoThrow(
  305. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  306. .wait()
  307. )
  308. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  309. // Okay: now close the channel; the `readyChannel` future has not been completed yet.
  310. let shutdown = manager.shutdown()
  311. self.loop.run()
  312. XCTAssertNoThrow(try shutdown.wait())
  313. }
  314. // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
  315. // the `readyChannelMux` should error.
  316. XCTAssertThrowsError(try readyChannelMux.wait())
  317. }
  318. func testConnectOnSecondAttempt() throws {
  319. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  320. let channelFutures: [EventLoopFuture<Channel>] = [
  321. self.loop.makeFailedFuture(DoomedChannelError()),
  322. channelPromise.futureResult,
  323. ]
  324. var channelFutureIterator = channelFutures.makeIterator()
  325. var configuration = self.defaultConfiguration
  326. configuration.connectionBackoff = .oneSecondFixed
  327. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  328. guard let next = channelFutureIterator.next() else {
  329. XCTFail("Too many channels requested")
  330. return self.loop.makeFailedFuture(DoomedChannelError())
  331. }
  332. return next
  333. }
  334. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  335. Change(from: .idle, to: .connecting),
  336. Change(from: .connecting, to: .transientFailure),
  337. ]) {
  338. // Get a HTTP/2 stream multiplexer.
  339. let readyChannelMux = manager.getHTTP2Multiplexer()
  340. self.loop.run()
  341. return readyChannelMux
  342. }
  343. // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
  344. let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
  345. self.loop.run()
  346. // Move time forwards by a second to start the next connection attempt.
  347. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  348. self.loop.advanceTime(by: .seconds(1))
  349. }
  350. // Setup the actual channel and complete the promise.
  351. let channel = EmbeddedChannel(loop: self.loop)
  352. let h2mux = HTTP2StreamMultiplexer(
  353. mode: .client,
  354. channel: channel,
  355. inboundStreamInitializer: nil
  356. )
  357. try channel.pipeline.addHandler(
  358. GRPCIdleHandler(
  359. connectionManager: manager,
  360. multiplexer: h2mux,
  361. idleTimeout: .minutes(5),
  362. keepalive: .init(),
  363. logger: self.logger
  364. )
  365. ).wait()
  366. channelPromise.succeed(channel)
  367. XCTAssertNoThrow(
  368. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  369. .wait()
  370. )
  371. // Write a SETTINGS frame on the root stream.
  372. try self.waitForStateChange(from: .connecting, to: .ready) {
  373. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  374. XCTAssertNoThrow(try channel.writeInbound(frame))
  375. }
  376. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  377. XCTAssertNoThrow(try readyChannelMux.wait())
  378. XCTAssertNoThrow(try anotherReadyChannelMux.wait())
  379. // Now shutdown.
  380. try self.waitForStateChange(from: .ready, to: .shutdown) {
  381. let shutdown = manager.shutdown()
  382. self.loop.run()
  383. XCTAssertNoThrow(try shutdown.wait())
  384. }
  385. }
  386. func testShutdownWhileConnecting() throws {
  387. let channelPromise = self.loop.makePromise(of: Channel.self)
  388. let manager = self.makeConnectionManager { _, _ in
  389. return channelPromise.futureResult
  390. }
  391. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  392. .waitForStateChange(from: .idle, to: .connecting) {
  393. let readyChannelMux = manager.getHTTP2Multiplexer()
  394. self.loop.run()
  395. return readyChannelMux
  396. }
  397. // Now shutdown.
  398. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  399. let shutdown = manager.shutdown()
  400. self.loop.run()
  401. XCTAssertNoThrow(try shutdown.wait())
  402. }
  403. // The multiplexer we were requesting should fail.
  404. XCTAssertThrowsError(try readyChannelMux.wait())
  405. // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
  406. channelPromise.succeed(EmbeddedChannel(loop: self.loop))
  407. let channel = try channelPromise.futureResult.wait()
  408. self.loop.run()
  409. XCTAssertNoThrow(try channel.closeFuture.wait())
  410. }
  411. func testShutdownWhileTransientFailure() throws {
  412. var configuration = self.defaultConfiguration
  413. configuration.connectionBackoff = .oneSecondFixed
  414. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  415. self.loop.makeFailedFuture(DoomedChannelError())
  416. }
  417. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
  418. Change(from: .idle, to: .connecting),
  419. Change(from: .connecting, to: .transientFailure),
  420. ]) {
  421. // Get a HTTP/2 stream multiplexer.
  422. let readyChannelMux = manager.getHTTP2Multiplexer()
  423. self.loop.run()
  424. return readyChannelMux
  425. }
  426. // Now shutdown.
  427. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  428. let shutdown = manager.shutdown()
  429. self.loop.run()
  430. XCTAssertNoThrow(try shutdown.wait())
  431. }
  432. // The HTTP/2 stream mux we were requesting should fail.
  433. XCTAssertThrowsError(try readyChannelMux.wait())
  434. }
  435. func testShutdownWhileActive() throws {
  436. let channelPromise = self.loop.makePromise(of: Channel.self)
  437. let manager = self.makeConnectionManager { _, _ in
  438. return channelPromise.futureResult
  439. }
  440. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  441. .waitForStateChange(from: .idle, to: .connecting) {
  442. let readyChannelMux = manager.getHTTP2Multiplexer()
  443. self.loop.run()
  444. return readyChannelMux
  445. }
  446. // Prepare the channel
  447. let channel = EmbeddedChannel(loop: self.loop)
  448. let h2mux = HTTP2StreamMultiplexer(
  449. mode: .client,
  450. channel: channel,
  451. inboundStreamInitializer: nil
  452. )
  453. try channel.pipeline.addHandler(
  454. GRPCIdleHandler(
  455. connectionManager: manager,
  456. multiplexer: h2mux,
  457. idleTimeout: .minutes(5),
  458. keepalive: .init(),
  459. logger: self.logger
  460. )
  461. ).wait()
  462. channelPromise.succeed(channel)
  463. XCTAssertNoThrow(
  464. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  465. .wait()
  466. )
  467. // (No state change expected here: active is an internal state.)
  468. // Now shutdown.
  469. try self.waitForStateChange(from: .connecting, to: .shutdown) {
  470. let shutdown = manager.shutdown()
  471. self.loop.run()
  472. XCTAssertNoThrow(try shutdown.wait())
  473. }
  474. // The HTTP/2 stream multiplexer we were requesting should fail.
  475. XCTAssertThrowsError(try readyChannelMux.wait())
  476. }
  477. func testShutdownWhileShutdown() throws {
  478. let manager = self.makeConnectionManager()
  479. try self.waitForStateChange(from: .idle, to: .shutdown) {
  480. let firstShutdown = manager.shutdown()
  481. self.loop.run()
  482. XCTAssertNoThrow(try firstShutdown.wait())
  483. }
  484. let secondShutdown = manager.shutdown()
  485. self.loop.run()
  486. XCTAssertNoThrow(try secondShutdown.wait())
  487. }
  488. func testTransientFailureWhileActive() throws {
  489. var configuration = self.defaultConfiguration
  490. configuration.connectionBackoff = .oneSecondFixed
  491. let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  492. let channelFutures: [EventLoopFuture<Channel>] = [
  493. channelPromise.futureResult,
  494. self.loop.makeFailedFuture(DoomedChannelError()),
  495. ]
  496. var channelFutureIterator = channelFutures.makeIterator()
  497. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  498. guard let next = channelFutureIterator.next() else {
  499. XCTFail("Too many channels requested")
  500. return self.loop.makeFailedFuture(DoomedChannelError())
  501. }
  502. return next
  503. }
  504. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  505. .waitForStateChange(from: .idle, to: .connecting) {
  506. let readyChannelMux = manager.getHTTP2Multiplexer()
  507. self.loop.run()
  508. return readyChannelMux
  509. }
  510. // Prepare the channel
  511. let firstChannel = EmbeddedChannel(loop: self.loop)
  512. let h2mux = HTTP2StreamMultiplexer(
  513. mode: .client,
  514. channel: firstChannel,
  515. inboundStreamInitializer: nil
  516. )
  517. try firstChannel.pipeline.addHandler(
  518. GRPCIdleHandler(
  519. connectionManager: manager,
  520. multiplexer: h2mux,
  521. idleTimeout: .minutes(5),
  522. keepalive: .init(),
  523. logger: self.logger
  524. )
  525. ).wait()
  526. channelPromise.succeed(firstChannel)
  527. XCTAssertNoThrow(
  528. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  529. .wait()
  530. )
  531. // (No state change expected here: active is an internal state.)
  532. // Close the channel (simulate e.g. TLS handshake failed)
  533. try self.waitForStateChange(from: .connecting, to: .transientFailure) {
  534. XCTAssertNoThrow(try firstChannel.close().wait())
  535. }
  536. // Start connecting again.
  537. self.waitForStateChanges([
  538. Change(from: .transientFailure, to: .connecting),
  539. Change(from: .connecting, to: .transientFailure),
  540. ]) {
  541. self.loop.advanceTime(by: .seconds(1))
  542. }
  543. // Now shutdown
  544. try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
  545. let shutdown = manager.shutdown()
  546. self.loop.run()
  547. XCTAssertNoThrow(try shutdown.wait())
  548. }
  549. // The channel never came up: it should be throw.
  550. XCTAssertThrowsError(try readyChannelMux.wait())
  551. }
  552. func testTransientFailureWhileReady() throws {
  553. var configuration = self.defaultConfiguration
  554. configuration.connectionBackoff = .oneSecondFixed
  555. let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  556. let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
  557. let channelFutures: [EventLoopFuture<Channel>] = [
  558. firstChannelPromise.futureResult,
  559. secondChannelPromise.futureResult,
  560. ]
  561. var channelFutureIterator = channelFutures.makeIterator()
  562. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  563. guard let next = channelFutureIterator.next() else {
  564. XCTFail("Too many channels requested")
  565. return self.loop.makeFailedFuture(DoomedChannelError())
  566. }
  567. return next
  568. }
  569. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  570. .waitForStateChange(from: .idle, to: .connecting) {
  571. let readyChannelMux = manager.getHTTP2Multiplexer()
  572. self.loop.run()
  573. return readyChannelMux
  574. }
  575. // Prepare the first channel
  576. let firstChannel = EmbeddedChannel(loop: self.loop)
  577. let firstH2mux = HTTP2StreamMultiplexer(
  578. mode: .client,
  579. channel: firstChannel,
  580. inboundStreamInitializer: nil
  581. )
  582. try firstChannel.pipeline.addHandler(
  583. GRPCIdleHandler(
  584. connectionManager: manager,
  585. multiplexer: firstH2mux,
  586. idleTimeout: .minutes(5),
  587. keepalive: .init(),
  588. logger: self.logger
  589. )
  590. ).wait()
  591. firstChannelPromise.succeed(firstChannel)
  592. XCTAssertNoThrow(
  593. try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  594. .wait()
  595. )
  596. // Write a SETTINGS frame on the root stream.
  597. try self.waitForStateChange(from: .connecting, to: .ready) {
  598. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  599. XCTAssertNoThrow(try firstChannel.writeInbound(frame))
  600. }
  601. // Channel should now be ready.
  602. XCTAssertNoThrow(try readyChannelMux.wait())
  603. // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
  604. let streamCreated = NIOHTTP2StreamCreatedEvent(
  605. streamID: 1,
  606. localInitialWindowSize: nil,
  607. remoteInitialWindowSize: nil
  608. )
  609. firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
  610. try self.waitForStateChange(from: .ready, to: .transientFailure) {
  611. XCTAssertNoThrow(try firstChannel.close().wait())
  612. }
  613. // Run to start connecting again.
  614. self.waitForStateChange(from: .transientFailure, to: .connecting) {
  615. self.loop.advanceTime(by: .seconds(1))
  616. }
  617. // Prepare the second channel
  618. let secondChannel = EmbeddedChannel(loop: self.loop)
  619. let secondH2mux = HTTP2StreamMultiplexer(
  620. mode: .client,
  621. channel: secondChannel,
  622. inboundStreamInitializer: nil
  623. )
  624. try secondChannel.pipeline.addHandler(
  625. GRPCIdleHandler(
  626. connectionManager: manager,
  627. multiplexer: secondH2mux,
  628. idleTimeout: .minutes(5),
  629. keepalive: .init(),
  630. logger: self.logger
  631. )
  632. ).wait()
  633. secondChannelPromise.succeed(secondChannel)
  634. XCTAssertNoThrow(
  635. try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  636. .wait()
  637. )
  638. // Write a SETTINGS frame on the root stream.
  639. try self.waitForStateChange(from: .connecting, to: .ready) {
  640. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  641. XCTAssertNoThrow(try secondChannel.writeInbound(frame))
  642. }
  643. // Now shutdown
  644. try self.waitForStateChange(from: .ready, to: .shutdown) {
  645. let shutdown = manager.shutdown()
  646. self.loop.run()
  647. XCTAssertNoThrow(try shutdown.wait())
  648. }
  649. }
  650. func testGoAwayWhenReady() throws {
  651. let channelPromise = self.loop.makePromise(of: Channel.self)
  652. let manager = self.makeConnectionManager { _, _ in
  653. return channelPromise.futureResult
  654. }
  655. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
  656. .waitForStateChange(from: .idle, to: .connecting) {
  657. let readyChannelMux = manager.getHTTP2Multiplexer()
  658. self.loop.run()
  659. return readyChannelMux
  660. }
  661. // Setup the channel.
  662. let channel = EmbeddedChannel(loop: self.loop)
  663. let h2mux = HTTP2StreamMultiplexer(
  664. mode: .client,
  665. channel: channel,
  666. inboundStreamInitializer: nil
  667. )
  668. try channel.pipeline.addHandler(
  669. GRPCIdleHandler(
  670. connectionManager: manager,
  671. multiplexer: h2mux,
  672. idleTimeout: .minutes(5),
  673. keepalive: .init(),
  674. logger: self.logger
  675. )
  676. ).wait()
  677. channelPromise.succeed(channel)
  678. XCTAssertNoThrow(
  679. try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
  680. .wait()
  681. )
  682. try self.waitForStateChange(from: .connecting, to: .ready) {
  683. // Write a SETTINGS frame on the root stream.
  684. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  685. XCTAssertNoThrow(try channel.writeInbound(frame))
  686. }
  687. // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
  688. XCTAssertNoThrow(try readyChannelMux.wait())
  689. // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
  690. // channel to close.
  691. try self.waitForStateChange(from: .ready, to: .idle) {
  692. let goAway = HTTP2Frame(
  693. streamID: .rootStream,
  694. payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
  695. )
  696. XCTAssertNoThrow(try channel.writeInbound(goAway))
  697. }
  698. self.loop.run()
  699. XCTAssertNoThrow(try channel.closeFuture.wait())
  700. // Now shutdown
  701. try self.waitForStateChange(from: .idle, to: .shutdown) {
  702. let shutdown = manager.shutdown()
  703. self.loop.run()
  704. XCTAssertNoThrow(try shutdown.wait())
  705. }
  706. }
  707. func testDoomedOptimisticChannelFromIdle() {
  708. var configuration = self.defaultConfiguration
  709. configuration.callStartBehavior = .fastFailure
  710. let manager = ConnectionManager(
  711. configuration: configuration,
  712. channelProvider: HookedChannelProvider { _, loop in
  713. return loop.makeFailedFuture(DoomedChannelError())
  714. },
  715. connectivityDelegate: nil,
  716. logger: self.logger
  717. )
  718. let candidate = manager.getHTTP2Multiplexer()
  719. self.loop.run()
  720. XCTAssertThrowsError(try candidate.wait())
  721. }
  722. func testDoomedOptimisticChannelFromConnecting() throws {
  723. var configuration = self.defaultConfiguration
  724. configuration.callStartBehavior = .fastFailure
  725. let promise = self.loop.makePromise(of: Channel.self)
  726. let manager = self.makeConnectionManager { _, _ in
  727. return promise.futureResult
  728. }
  729. self.waitForStateChange(from: .idle, to: .connecting) {
  730. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  731. _ = manager.getHTTP2Multiplexer()
  732. self.loop.run()
  733. }
  734. // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
  735. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  736. self.loop.run()
  737. // Fail the promise.
  738. promise.fail(DoomedChannelError())
  739. XCTAssertThrowsError(try optimisticChannelMux.wait())
  740. }
  741. func testOptimisticChannelFromTransientFailure() throws {
  742. var configuration = self.defaultConfiguration
  743. configuration.callStartBehavior = .fastFailure
  744. configuration.connectionBackoff = ConnectionBackoff()
  745. let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
  746. self.loop.makeFailedFuture(DoomedChannelError())
  747. }
  748. self.waitForStateChanges([
  749. Change(from: .idle, to: .connecting),
  750. Change(from: .connecting, to: .transientFailure),
  751. ]) {
  752. // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
  753. _ = manager.getHTTP2Multiplexer()
  754. self.loop.run()
  755. }
  756. // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
  757. let optimisticChannelMux = manager.getHTTP2Multiplexer()
  758. self.loop.run()
  759. XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
  760. XCTAssertTrue(error is DoomedChannelError)
  761. }
  762. }
  763. func testOptimisticChannelFromShutdown() throws {
  764. var configuration = self.defaultConfiguration
  765. configuration.callStartBehavior = .fastFailure
  766. let manager = self.makeConnectionManager { _, _ in
  767. return self.loop.makeFailedFuture(DoomedChannelError())
  768. }
  769. let shutdown = manager.shutdown()
  770. self.loop.run()
  771. XCTAssertNoThrow(try shutdown.wait())
  772. // Get a channel optimistically. It'll fail, obviously.
  773. let channelMux = manager.getHTTP2Multiplexer()
  774. self.loop.run()
  775. XCTAssertThrowsError(try channelMux.wait())
  776. }
  777. func testForceIdleAfterInactive() throws {
  778. let channelPromise = self.loop.makePromise(of: Channel.self)
  779. let manager = self.makeConnectionManager { _, _ in
  780. return channelPromise.futureResult
  781. }
  782. // Start the connection.
  783. let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
  784. from: .idle,
  785. to: .connecting
  786. ) {
  787. let readyChannelMux = manager.getHTTP2Multiplexer()
  788. self.loop.run()
  789. return readyChannelMux
  790. }
  791. // Setup the real channel and activate it.
  792. let channel = EmbeddedChannel(loop: self.loop)
  793. let h2mux = HTTP2StreamMultiplexer(
  794. mode: .client,
  795. channel: channel,
  796. inboundStreamInitializer: nil
  797. )
  798. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  799. GRPCIdleHandler(
  800. connectionManager: manager,
  801. multiplexer: h2mux,
  802. idleTimeout: .minutes(5),
  803. keepalive: .init(),
  804. logger: self.logger
  805. ),
  806. ]).wait())
  807. channelPromise.succeed(channel)
  808. self.loop.run()
  809. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  810. XCTAssertNoThrow(try connect.wait())
  811. // Write a SETTINGS frame on the root stream.
  812. try self.waitForStateChange(from: .connecting, to: .ready) {
  813. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  814. XCTAssertNoThrow(try channel.writeInbound(frame))
  815. }
  816. // The channel should now be ready.
  817. XCTAssertNoThrow(try readyChannelMux.wait())
  818. // Now drop the connection.
  819. try self.waitForStateChange(from: .ready, to: .shutdown) {
  820. let shutdown = manager.shutdown()
  821. self.loop.run()
  822. XCTAssertNoThrow(try shutdown.wait())
  823. }
  824. }
  825. func testCloseWithoutActiveRPCs() throws {
  826. let channelPromise = self.loop.makePromise(of: Channel.self)
  827. let manager = self.makeConnectionManager { _, _ in
  828. return channelPromise.futureResult
  829. }
  830. // Start the connection.
  831. let readyChannelMux = self.waitForStateChange(
  832. from: .idle,
  833. to: .connecting
  834. ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
  835. let readyChannelMux = manager.getHTTP2Multiplexer()
  836. self.loop.run()
  837. return readyChannelMux
  838. }
  839. // Setup the actual channel and activate it.
  840. let channel = EmbeddedChannel(loop: self.loop)
  841. let h2mux = HTTP2StreamMultiplexer(
  842. mode: .client,
  843. channel: channel,
  844. inboundStreamInitializer: nil
  845. )
  846. XCTAssertNoThrow(try channel.pipeline.addHandlers([
  847. GRPCIdleHandler(
  848. connectionManager: manager,
  849. multiplexer: h2mux,
  850. idleTimeout: .minutes(5),
  851. keepalive: .init(),
  852. logger: self.logger
  853. ),
  854. ]).wait())
  855. channelPromise.succeed(channel)
  856. self.loop.run()
  857. let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
  858. XCTAssertNoThrow(try connect.wait())
  859. // "ready" the connection.
  860. try self.waitForStateChange(from: .connecting, to: .ready) {
  861. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  862. XCTAssertNoThrow(try channel.writeInbound(frame))
  863. }
  864. // The HTTP/2 stream multiplexer should now be ready.
  865. XCTAssertNoThrow(try readyChannelMux.wait())
  866. // Close the channel. There are no active RPCs so we should idle rather than be in the transient
  867. // failure state.
  868. self.waitForStateChange(from: .ready, to: .idle) {
  869. channel.pipeline.fireChannelInactive()
  870. }
  871. }
  872. func testIdleErrorDoesNothing() throws {
  873. let manager = self.makeConnectionManager()
  874. // Dropping an error on this manager should be fine.
  875. manager.channelError(DoomedChannelError())
  876. // Shutting down is then safe.
  877. try self.waitForStateChange(from: .idle, to: .shutdown) {
  878. let shutdown = manager.shutdown()
  879. self.loop.run()
  880. XCTAssertNoThrow(try shutdown.wait())
  881. }
  882. }
  883. func testHTTP2Delegates() throws {
  884. let channel = EmbeddedChannel(loop: self.loop)
  885. defer {
  886. XCTAssertNoThrow(try channel.finish())
  887. }
  888. let multiplexer = HTTP2StreamMultiplexer(
  889. mode: .client,
  890. channel: channel,
  891. inboundStreamInitializer: nil
  892. )
  893. class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
  894. var streamsClosed = 0
  895. var maxConcurrentStreams = 0
  896. func streamClosed(_ connectionManager: ConnectionManager) {
  897. self.streamsClosed += 1
  898. }
  899. func receivedSettingsMaxConcurrentStreams(
  900. _ connectionManager: ConnectionManager,
  901. maxConcurrentStreams: Int
  902. ) {
  903. self.maxConcurrentStreams = maxConcurrentStreams
  904. }
  905. }
  906. let http2 = HTTP2Delegate()
  907. let manager = ConnectionManager(
  908. eventLoop: self.loop,
  909. channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
  910. let idleHandler = GRPCIdleHandler(
  911. connectionManager: manager,
  912. multiplexer: multiplexer,
  913. idleTimeout: .minutes(5),
  914. keepalive: ClientConnectionKeepalive(),
  915. logger: self.logger
  916. )
  917. // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
  918. // us to just fire stream created/closed events into the channel.
  919. do {
  920. try channel.pipeline.syncOperations.addHandler(idleHandler)
  921. } catch {
  922. return eventLoop.makeFailedFuture(error)
  923. }
  924. return eventLoop.makeSucceededFuture(channel)
  925. },
  926. callStartBehavior: .waitsForConnectivity,
  927. connectionBackoff: ConnectionBackoff(),
  928. connectivityDelegate: nil,
  929. http2Delegate: http2,
  930. logger: self.logger
  931. )
  932. // Start connecting.
  933. let futureMultiplexer = manager.getHTTP2Multiplexer()
  934. self.loop.run()
  935. // Do the actual connecting.
  936. XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
  937. // The channel isn't ready until it's seen a SETTINGS frame.
  938. func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
  939. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  940. return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  941. }
  942. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
  943. // We're ready now so the future multiplexer will resolve and we'll have seen an update to
  944. // max concurrent streams.
  945. XCTAssertNoThrow(try futureMultiplexer.wait())
  946. XCTAssertEqual(http2.maxConcurrentStreams, 42)
  947. XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
  948. XCTAssertEqual(http2.maxConcurrentStreams, 13)
  949. // Open some streams.
  950. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  951. let streamCreated = NIOHTTP2StreamCreatedEvent(
  952. streamID: streamID,
  953. localInitialWindowSize: nil,
  954. remoteInitialWindowSize: nil
  955. )
  956. channel.pipeline.fireUserInboundEventTriggered(streamCreated)
  957. }
  958. // ... and then close them.
  959. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
  960. let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
  961. channel.pipeline.fireUserInboundEventTriggered(streamClosed)
  962. }
  963. XCTAssertEqual(http2.streamsClosed, 4)
  964. }
  965. }
  966. internal struct Change: Hashable, CustomStringConvertible {
  967. var from: ConnectivityState
  968. var to: ConnectivityState
  969. var description: String {
  970. return "\(self.from) → \(self.to)"
  971. }
  972. }
  973. internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
  974. private let serialQueue = DispatchQueue(label: "io.grpc.testing")
  975. private let semaphore = DispatchSemaphore(value: 0)
  976. private var expectation: Expectation = .noExpectation
  977. private let quiescingSemaphore = DispatchSemaphore(value: 0)
  978. private enum Expectation {
  979. /// We have no expectation of any changes. We'll just ignore any changes.
  980. case noExpectation
  981. /// We expect one change.
  982. case one((Change) -> Void)
  983. /// We expect 'count' changes.
  984. case some(count: Int, recorded: [Change], ([Change]) -> Void)
  985. var count: Int {
  986. switch self {
  987. case .noExpectation:
  988. return 0
  989. case .one:
  990. return 1
  991. case let .some(count, _, _):
  992. return count
  993. }
  994. }
  995. }
  996. func connectivityStateDidChange(from oldState: ConnectivityState,
  997. to newState: ConnectivityState) {
  998. self.serialQueue.async {
  999. switch self.expectation {
  1000. case let .one(verify):
  1001. // We don't care about future changes.
  1002. self.expectation = .noExpectation
  1003. // Verify and notify.
  1004. verify(Change(from: oldState, to: newState))
  1005. self.semaphore.signal()
  1006. case .some(let count, var recorded, let verify):
  1007. recorded.append(Change(from: oldState, to: newState))
  1008. if recorded.count == count {
  1009. // We don't care about future changes.
  1010. self.expectation = .noExpectation
  1011. // Verify and notify.
  1012. verify(recorded)
  1013. self.semaphore.signal()
  1014. } else {
  1015. // Still need more responses.
  1016. self.expectation = .some(count: count, recorded: recorded, verify)
  1017. }
  1018. case .noExpectation:
  1019. // Ignore any changes.
  1020. ()
  1021. }
  1022. }
  1023. }
  1024. func connectionStartedQuiescing() {
  1025. self.serialQueue.async {
  1026. self.quiescingSemaphore.signal()
  1027. }
  1028. }
  1029. func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
  1030. self.serialQueue.async {
  1031. self.expectation = .some(count: count, recorded: [], verify)
  1032. }
  1033. }
  1034. func expectChange(verify: @escaping (Change) -> Void) {
  1035. self.serialQueue.async {
  1036. self.expectation = .one(verify)
  1037. }
  1038. }
  1039. func waitForExpectedChanges(
  1040. timeout: DispatchTimeInterval,
  1041. file: StaticString = #file,
  1042. line: UInt = #line
  1043. ) {
  1044. let result = self.semaphore.wait(timeout: .now() + timeout)
  1045. switch result {
  1046. case .success:
  1047. ()
  1048. case .timedOut:
  1049. XCTFail(
  1050. "Timed out before verifying \(self.expectation.count) change(s)",
  1051. file: file, line: line
  1052. )
  1053. }
  1054. }
  1055. func waitForQuiescing(timeout: DispatchTimeInterval) {
  1056. let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
  1057. switch result {
  1058. case .success:
  1059. ()
  1060. case .timedOut:
  1061. XCTFail("Timed out waiting for connection to start quiescing")
  1062. }
  1063. }
  1064. }
  1065. private extension ConnectionBackoff {
  1066. static let oneSecondFixed = ConnectionBackoff(
  1067. initialBackoff: 1.0,
  1068. maximumBackoff: 1.0,
  1069. multiplier: 1.0,
  1070. jitter: 0.0
  1071. )
  1072. }
  1073. private struct DoomedChannelError: Error {}
  1074. internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
  1075. internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  1076. init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
  1077. self.provider = provider
  1078. }
  1079. func makeChannel(
  1080. managedBy connectionManager: ConnectionManager,
  1081. onEventLoop eventLoop: EventLoop,
  1082. connectTimeout: TimeAmount?,
  1083. logger: Logger
  1084. ) -> EventLoopFuture<Channel> {
  1085. return self.provider(connectionManager, eventLoop)
  1086. }
  1087. }