ConnectionPoolTests.swift 26 KB


  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import Logging
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. final class ConnectionPoolTests: GRPCTestCase {
  22. private enum TestError: Error {
  23. case noChannelExpected
  24. }
  25. private var eventLoop: EmbeddedEventLoop!
  26. private var tearDownBlocks: [() throws -> Void] = []
  27. override func setUp() {
  28. super.setUp()
  29. self.eventLoop = EmbeddedEventLoop()
  30. }
  31. override func tearDown() {
  32. XCTAssertNoThrow(try self.eventLoop.close())
  33. self.tearDownBlocks.forEach { try? $0() }
  34. super.tearDown()
  35. }
  36. private func noChannelExpected(
  37. _: ConnectionManager,
  38. _ eventLoop: EventLoop,
  39. line: UInt = #line
  40. ) -> EventLoopFuture<Channel> {
  41. XCTFail("Channel unexpectedly created", line: line)
  42. return eventLoop.makeFailedFuture(TestError.noChannelExpected)
  43. }
  44. private func makePool(
  45. waiters: Int = 1000,
  46. reservationLoadThreshold: Double = 0.9,
  47. now: @escaping () -> NIODeadline = { .now() },
  48. onReservationReturned: @escaping (Int) -> Void = { _ in },
  49. onMaximumReservationsChange: @escaping (Int) -> Void = { _ in },
  50. channelProvider: ConnectionManagerChannelProvider
  51. ) -> ConnectionPool {
  52. return ConnectionPool(
  53. eventLoop: self.eventLoop,
  54. maxWaiters: waiters,
  55. reservationLoadThreshold: reservationLoadThreshold,
  56. assumedMaxConcurrentStreams: 100,
  57. channelProvider: channelProvider,
  58. streamLender: HookedStreamLender(
  59. onReturnStreams: onReservationReturned,
  60. onUpdateMaxAvailableStreams: onMaximumReservationsChange
  61. ),
  62. logger: self.logger.wrapped,
  63. now: now
  64. )
  65. }
  66. private func makePool(
  67. waiters: Int = 1000,
  68. makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  69. ) -> ConnectionPool {
  70. return self.makePool(
  71. waiters: waiters,
  72. channelProvider: HookedChannelProvider(makeChannel)
  73. )
  74. }
  75. private func setUpPoolAndController(
  76. waiters: Int = 1000,
  77. reservationLoadThreshold: Double = 0.9,
  78. now: @escaping () -> NIODeadline = { .now() },
  79. onReservationReturned: @escaping (Int) -> Void = { _ in },
  80. onMaximumReservationsChange: @escaping (Int) -> Void = { _ in }
  81. ) -> (ConnectionPool, ChannelController) {
  82. let controller = ChannelController()
  83. let pool = self.makePool(
  84. waiters: waiters,
  85. reservationLoadThreshold: reservationLoadThreshold,
  86. now: now,
  87. onReservationReturned: onReservationReturned,
  88. onMaximumReservationsChange: onMaximumReservationsChange,
  89. channelProvider: controller
  90. )
  91. self.tearDownBlocks.append {
  92. let shutdown = pool.shutdown()
  93. self.eventLoop.run()
  94. XCTAssertNoThrow(try shutdown.wait())
  95. controller.finish()
  96. }
  97. return (pool, controller)
  98. }
  99. func testEmptyConnectionPool() {
  100. let pool = self.makePool {
  101. self.noChannelExpected($0, $1)
  102. }
  103. XCTAssertEqual(pool.sync.connections, 0)
  104. XCTAssertEqual(pool.sync.waiters, 0)
  105. XCTAssertEqual(pool.sync.availableStreams, 0)
  106. XCTAssertEqual(pool.sync.reservedStreams, 0)
  107. pool.initialize(connections: 20)
  108. XCTAssertEqual(pool.sync.connections, 20)
  109. XCTAssertEqual(pool.sync.waiters, 0)
  110. XCTAssertEqual(pool.sync.availableStreams, 0)
  111. XCTAssertEqual(pool.sync.reservedStreams, 0)
  112. XCTAssertNoThrow(try pool.shutdown().wait())
  113. }
  114. func testShutdownEmptyPool() {
  115. let pool = self.makePool {
  116. self.noChannelExpected($0, $1)
  117. }
  118. XCTAssertNoThrow(try pool.shutdown().wait())
  119. // Shutting down twice should also be fine.
  120. XCTAssertNoThrow(try pool.shutdown().wait())
  121. }
  122. func testMakeStreamWhenShutdown() {
  123. let pool = self.makePool {
  124. self.noChannelExpected($0, $1)
  125. }
  126. XCTAssertNoThrow(try pool.shutdown().wait())
  127. let stream = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  128. $0.eventLoop.makeSucceededVoidFuture()
  129. }
  130. XCTAssertThrowsError(try stream.wait()) { error in
  131. XCTAssertEqual(error as? ConnectionPoolError, .shutdown)
  132. }
  133. }
  134. func testMakeStreamWhenWaiterQueueIsFull() {
  135. let maxWaiters = 5
  136. let pool = self.makePool(waiters: maxWaiters) {
  137. self.noChannelExpected($0, $1)
  138. }
  139. let waiting = (0 ..< maxWaiters).map { _ in
  140. return pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  141. $0.eventLoop.makeSucceededVoidFuture()
  142. }
  143. }
  144. let tooManyWaiters = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  145. $0.eventLoop.makeSucceededVoidFuture()
  146. }
  147. XCTAssertThrowsError(try tooManyWaiters.wait()) { error in
  148. XCTAssertEqual(error as? ConnectionPoolError, .tooManyWaiters)
  149. }
  150. XCTAssertNoThrow(try pool.shutdown().wait())
  151. // All 'waiting' futures will be failed by the shutdown promise.
  152. for waiter in waiting {
  153. XCTAssertThrowsError(try waiter.wait()) { error in
  154. XCTAssertEqual(error as? ConnectionPoolError, .shutdown)
  155. }
  156. }
  157. }
  158. func testWaiterTimingOut() {
  159. let pool = self.makePool {
  160. self.noChannelExpected($0, $1)
  161. }
  162. let waiter = pool.makeStream(deadline: .uptimeNanoseconds(10), logger: self.logger.wrapped) {
  163. $0.eventLoop.makeSucceededVoidFuture()
  164. }
  165. XCTAssertEqual(pool.sync.waiters, 1)
  166. self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))
  167. XCTAssertThrowsError(try waiter.wait()) { error in
  168. XCTAssertEqual(error as? ConnectionPoolError, .deadlineExceeded)
  169. }
  170. XCTAssertEqual(pool.sync.waiters, 0)
  171. }
  172. func testWaiterTimingOutInPast() {
  173. let pool = self.makePool {
  174. self.noChannelExpected($0, $1)
  175. }
  176. self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))
  177. let waiter = pool.makeStream(deadline: .uptimeNanoseconds(5), logger: self.logger.wrapped) {
  178. $0.eventLoop.makeSucceededVoidFuture()
  179. }
  180. XCTAssertEqual(pool.sync.waiters, 1)
  181. self.eventLoop.run()
  182. XCTAssertThrowsError(try waiter.wait()) { error in
  183. XCTAssertEqual(error as? ConnectionPoolError, .deadlineExceeded)
  184. }
  185. XCTAssertEqual(pool.sync.waiters, 0)
  186. }
  187. func testMakeStreamTriggersChannelCreation() {
  188. let (pool, controller) = self.setUpPoolAndController()
  189. pool.initialize(connections: 1)
  190. XCTAssertEqual(pool.sync.connections, 1)
  191. // No channels yet.
  192. XCTAssertEqual(controller.count, 0)
  193. let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  194. $0.eventLoop.makeSucceededVoidFuture()
  195. }
  196. // Start creating the channel.
  197. self.eventLoop.run()
  198. // We should have been asked for a channel now.
  199. XCTAssertEqual(controller.count, 1)
  200. // The connection isn't ready yet though, so no streams available.
  201. XCTAssertEqual(pool.sync.availableStreams, 0)
  202. // Make the connection 'ready'.
  203. controller.connectChannel(atIndex: 0)
  204. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
  205. // We have a multiplexer and a 'ready' connection.
  206. XCTAssertEqual(pool.sync.reservedStreams, 1)
  207. XCTAssertEqual(pool.sync.availableStreams, 9)
  208. XCTAssertEqual(pool.sync.waiters, 0)
  209. // Run the loop to create the stream, we need to fire the event too.
  210. self.eventLoop.run()
  211. XCTAssertNoThrow(try waiter.wait())
  212. controller.openStreamInChannel(atIndex: 0)
  213. // Now close the stream.
  214. controller.closeStreamInChannel(atIndex: 0)
  215. XCTAssertEqual(pool.sync.reservedStreams, 0)
  216. XCTAssertEqual(pool.sync.availableStreams, 10)
  217. }
  218. func testMakeStreamWhenConnectionIsAlreadyAvailable() {
  219. let (pool, controller) = self.setUpPoolAndController()
  220. pool.initialize(connections: 1)
  221. let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  222. $0.eventLoop.makeSucceededVoidFuture()
  223. }
  224. // Start creating the channel.
  225. self.eventLoop.run()
  226. XCTAssertEqual(controller.count, 1)
  227. // Fire up the connection.
  228. controller.connectChannel(atIndex: 0)
  229. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
  230. // Run the loop to create the stream, we need to fire the stream creation event too.
  231. self.eventLoop.run()
  232. XCTAssertNoThrow(try waiter.wait())
  233. controller.openStreamInChannel(atIndex: 0)
  234. // Now we can create another stream, but as there's already an available stream on an active
  235. // connection we won't have to wait.
  236. XCTAssertEqual(pool.sync.waiters, 0)
  237. XCTAssertEqual(pool.sync.reservedStreams, 1)
  238. let notWaiting = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  239. $0.eventLoop.makeSucceededVoidFuture()
  240. }
  241. // Still no waiters.
  242. XCTAssertEqual(pool.sync.waiters, 0)
  243. XCTAssertEqual(pool.sync.reservedStreams, 2)
  244. // Run the loop to create the stream, we need to fire the stream creation event too.
  245. self.eventLoop.run()
  246. XCTAssertNoThrow(try notWaiting.wait())
  247. controller.openStreamInChannel(atIndex: 0)
  248. }
  249. func testMakeMoreWaitersThanConnectionCanHandle() {
  250. var returnedStreams: [Int] = []
  251. let (pool, controller) = self.setUpPoolAndController(onReservationReturned: {
  252. returnedStreams.append($0)
  253. })
  254. pool.initialize(connections: 1)
  255. // Enqueue twice as many waiters as the connection will be able to handle.
  256. let maxConcurrentStreams = 10
  257. let waiters = (0 ..< maxConcurrentStreams * 2).map { _ in
  258. return pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  259. $0.eventLoop.makeSucceededVoidFuture()
  260. }
  261. }
  262. XCTAssertEqual(pool.sync.waiters, 2 * maxConcurrentStreams)
  263. // Fire up the connection.
  264. self.eventLoop.run()
  265. controller.connectChannel(atIndex: 0)
  266. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: maxConcurrentStreams)
  267. // We should have assigned a bunch of streams to waiters now.
  268. XCTAssertEqual(pool.sync.waiters, maxConcurrentStreams)
  269. XCTAssertEqual(pool.sync.reservedStreams, maxConcurrentStreams)
  270. XCTAssertEqual(pool.sync.availableStreams, 0)
  271. // Do the stream creation and make sure the first batch are succeeded.
  272. self.eventLoop.run()
  273. let firstBatch = waiters.prefix(maxConcurrentStreams)
  274. var others = waiters.dropFirst(maxConcurrentStreams)
  275. for waiter in firstBatch {
  276. XCTAssertNoThrow(try waiter.wait())
  277. controller.openStreamInChannel(atIndex: 0)
  278. }
  279. // Close a stream.
  280. controller.closeStreamInChannel(atIndex: 0)
  281. XCTAssertEqual(returnedStreams, [1])
  282. // We have another stream so a waiter should be succeeded.
  283. XCTAssertEqual(pool.sync.waiters, maxConcurrentStreams - 1)
  284. self.eventLoop.run()
  285. XCTAssertNoThrow(try others.popFirst()?.wait())
  286. // Shutdown the pool: the remaining waiters should be failed.
  287. let shutdown = pool.shutdown()
  288. self.eventLoop.run()
  289. XCTAssertNoThrow(try shutdown.wait())
  290. for waiter in others {
  291. XCTAssertThrowsError(try waiter.wait()) { error in
  292. XCTAssertEqual(error as? ConnectionPoolError, .shutdown)
  293. }
  294. }
  295. }
  296. func testDropConnectionWithOutstandingReservations() {
  297. var streamsReturned: [Int] = []
  298. let (pool, controller) = self.setUpPoolAndController(
  299. onReservationReturned: { streamsReturned.append($0) }
  300. )
  301. pool.initialize(connections: 1)
  302. let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  303. $0.eventLoop.makeSucceededVoidFuture()
  304. }
  305. // Start creating the channel.
  306. self.eventLoop.run()
  307. XCTAssertEqual(controller.count, 1)
  308. // Fire up the connection.
  309. controller.connectChannel(atIndex: 0)
  310. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
  311. // Run the loop to create the stream, we need to fire the stream creation event too.
  312. self.eventLoop.run()
  313. XCTAssertNoThrow(try waiter.wait())
  314. controller.openStreamInChannel(atIndex: 0)
  315. // Create a handful of streams.
  316. XCTAssertEqual(pool.sync.availableStreams, 9)
  317. for _ in 0 ..< 5 {
  318. let notWaiting = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  319. $0.eventLoop.makeSucceededVoidFuture()
  320. }
  321. self.eventLoop.run()
  322. XCTAssertNoThrow(try notWaiting.wait())
  323. controller.openStreamInChannel(atIndex: 0)
  324. }
  325. XCTAssertEqual(pool.sync.availableStreams, 4)
  326. XCTAssertEqual(pool.sync.reservedStreams, 6)
  327. // Blast the connection away. We'll be notified about dropped reservations.
  328. XCTAssertEqual(streamsReturned, [])
  329. controller.throwError(ChannelError.ioOnClosedChannel, inChannelAtIndex: 0)
  330. controller.fireChannelInactiveForChannel(atIndex: 0)
  331. XCTAssertEqual(streamsReturned, [6])
  332. XCTAssertEqual(pool.sync.availableStreams, 0)
  333. XCTAssertEqual(pool.sync.reservedStreams, 0)
  334. }
  335. func testDropConnectionWithOutstandingReservationsAndWaiters() {
  336. var streamsReturned: [Int] = []
  337. let (pool, controller) = self.setUpPoolAndController(
  338. onReservationReturned: { streamsReturned.append($0) }
  339. )
  340. pool.initialize(connections: 1)
  341. // Reserve a bunch of streams.
  342. let waiters = (0 ..< 10).map { _ in
  343. return pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  344. $0.eventLoop.makeSucceededVoidFuture()
  345. }
  346. }
  347. // Connect and setup all the streams.
  348. self.eventLoop.run()
  349. controller.connectChannel(atIndex: 0)
  350. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
  351. self.eventLoop.run()
  352. for waiter in waiters {
  353. XCTAssertNoThrow(try waiter.wait())
  354. controller.openStreamInChannel(atIndex: 0)
  355. }
  356. // All streams should be reserved.
  357. XCTAssertEqual(pool.sync.availableStreams, 0)
  358. XCTAssertEqual(pool.sync.reservedStreams, 10)
  359. // Add a waiter.
  360. XCTAssertEqual(pool.sync.waiters, 0)
  361. let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  362. $0.eventLoop.makeSucceededVoidFuture()
  363. }
  364. XCTAssertEqual(pool.sync.waiters, 1)
  365. // Now bork the connection. We'll be notified about the 10 dropped reservation but not the one
  366. // waiter .
  367. XCTAssertEqual(streamsReturned, [])
  368. controller.throwError(ChannelError.ioOnClosedChannel, inChannelAtIndex: 0)
  369. controller.fireChannelInactiveForChannel(atIndex: 0)
  370. XCTAssertEqual(streamsReturned, [10])
  371. // The connection dropped, let the reconnect kick in.
  372. self.eventLoop.run()
  373. XCTAssertEqual(controller.count, 2)
  374. controller.connectChannel(atIndex: 1)
  375. controller.sendSettingsToChannel(atIndex: 1, maxConcurrentStreams: 10)
  376. self.eventLoop.run()
  377. XCTAssertNoThrow(try waiter.wait())
  378. controller.openStreamInChannel(atIndex: 1)
  379. controller.closeStreamInChannel(atIndex: 1)
  380. XCTAssertEqual(streamsReturned, [10, 1])
  381. XCTAssertEqual(pool.sync.availableStreams, 10)
  382. XCTAssertEqual(pool.sync.reservedStreams, 0)
  383. }
  384. func testDeadlineExceededInSameTickAsSucceedingWaiters() {
  385. // deadline must be exceeded just as servicing waiter is done
  386. // - setup waiter with deadline x
  387. // - start connecting
  388. // - set time to x
  389. // - finish connecting
  390. let (pool, controller) = self.setUpPoolAndController(now: {
  391. return NIODeadline.uptimeNanoseconds(12)
  392. })
  393. pool.initialize(connections: 1)
  394. let waiter1 = pool.makeStream(deadline: .uptimeNanoseconds(10), logger: self.logger.wrapped) {
  395. $0.eventLoop.makeSucceededVoidFuture()
  396. }
  397. let waiter2 = pool.makeStream(deadline: .uptimeNanoseconds(15), logger: self.logger.wrapped) {
  398. $0.eventLoop.makeSucceededVoidFuture()
  399. }
  400. // Start creating the channel.
  401. self.eventLoop.run()
  402. XCTAssertEqual(controller.count, 1)
  403. // Fire up the connection.
  404. controller.connectChannel(atIndex: 0)
  405. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
  406. // The deadline for the first waiter is already after 'now', so it'll fail with deadline
  407. // exceeded.
  408. self.eventLoop.run()
  409. // We need to advance the time to fire the timeout to fail the waiter.
  410. self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))
  411. XCTAssertThrowsError(try waiter1.wait()) { error in
  412. XCTAssertEqual(error as? ConnectionPoolError, .deadlineExceeded)
  413. }
  414. self.eventLoop.run()
  415. XCTAssertNoThrow(try waiter2.wait())
  416. controller.openStreamInChannel(atIndex: 0)
  417. XCTAssertEqual(pool.sync.waiters, 0)
  418. XCTAssertEqual(pool.sync.reservedStreams, 1)
  419. XCTAssertEqual(pool.sync.availableStreams, 9)
  420. controller.closeStreamInChannel(atIndex: 0)
  421. XCTAssertEqual(pool.sync.waiters, 0)
  422. XCTAssertEqual(pool.sync.reservedStreams, 0)
  423. XCTAssertEqual(pool.sync.availableStreams, 10)
  424. }
  425. func testConnectionsAreBroughtUpAtAppropriateTimes() {
  426. let (pool, controller) = self.setUpPoolAndController(reservationLoadThreshold: 0.2)
  427. // We'll allow 3 connections and configure max concurrent streams to 10. With our reservation
  428. // threshold we'll bring up a new connection after enqueueing the 1st, 2nd and 4th waiters.
  429. pool.initialize(connections: 3)
  430. let maxConcurrentStreams = 10
  431. // No demand so all three connections are idle.
  432. XCTAssertEqual(pool.sync.idleConnections, 3)
  433. let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  434. $0.eventLoop.makeSucceededVoidFuture()
  435. }
  436. // demand=1, available=0, load=infinite, one connection should be non-idle
  437. XCTAssertEqual(pool.sync.idleConnections, 2)
  438. // Connect the first channel and write the first settings frame; this allows us to lower the
  439. // default max concurrent streams value (from 100).
  440. self.eventLoop.run()
  441. controller.connectChannel(atIndex: 0)
  442. controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: maxConcurrentStreams)
  443. self.eventLoop.run()
  444. XCTAssertNoThrow(try w1.wait())
  445. controller.openStreamInChannel(atIndex: 0)
  446. let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  447. $0.eventLoop.makeSucceededVoidFuture()
  448. }
  449. self.eventLoop.run()
  450. XCTAssertNoThrow(try w2.wait())
  451. controller.openStreamInChannel(atIndex: 0)
  452. // demand=2, available=10, load=0.2; only one idle connection now.
  453. XCTAssertEqual(pool.sync.idleConnections, 1)
  454. // Add more demand before the second connection comes up.
  455. let w3 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  456. $0.eventLoop.makeSucceededVoidFuture()
  457. }
  458. // demand=3, available=20, load=0.15; still one idle connection.
  459. XCTAssertEqual(pool.sync.idleConnections, 1)
  460. // Connection the next channel
  461. self.eventLoop.run()
  462. controller.connectChannel(atIndex: 1)
  463. controller.sendSettingsToChannel(atIndex: 1, maxConcurrentStreams: maxConcurrentStreams)
  464. XCTAssertNoThrow(try w3.wait())
  465. controller.openStreamInChannel(atIndex: 1)
  466. }
  467. func testQuiescingConnectionIsReplaced() {
  468. var reservationsReturned: [Int] = []
  469. let (pool, controller) = self.setUpPoolAndController(onReservationReturned: {
  470. reservationsReturned.append($0)
  471. })
  472. pool.initialize(connections: 1)
  473. XCTAssertEqual(pool.sync.connections, 1)
  474. let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  475. $0.eventLoop.makeSucceededVoidFuture()
  476. }
  477. // Start creating the channel.
  478. self.eventLoop.run()
  479. // Make the connection 'ready'.
  480. controller.connectChannel(atIndex: 0)
  481. controller.sendSettingsToChannel(atIndex: 0)
  482. // Run the loop to create the stream.
  483. self.eventLoop.run()
  484. XCTAssertNoThrow(try w1.wait())
  485. controller.openStreamInChannel(atIndex: 0)
  486. // One stream reserved by 'w1' on the only connection in the pool (which isn't idle).
  487. XCTAssertEqual(pool.sync.reservedStreams, 1)
  488. XCTAssertEqual(pool.sync.connections, 1)
  489. XCTAssertEqual(pool.sync.idleConnections, 0)
  490. // Quiesce the connection. It should be punted from the pool and any active RPCs allowed to run
  491. // their course. A new (idle) connection should replace it in the pool.
  492. controller.sendGoAwayToChannel(atIndex: 0)
  493. // The quiescing connection had 1 stream reserved, it's now returned to the outer pool and we
  494. // have a new idle connection in place of the old one.
  495. XCTAssertEqual(reservationsReturned, [1])
  496. XCTAssertEqual(pool.sync.reservedStreams, 0)
  497. XCTAssertEqual(pool.sync.availableStreams, 0)
  498. XCTAssertEqual(pool.sync.idleConnections, 1)
  499. // Ask for another stream: this will be on the new idle connection.
  500. let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
  501. $0.eventLoop.makeSucceededVoidFuture()
  502. }
  503. self.eventLoop.run()
  504. XCTAssertEqual(controller.count, 2)
  505. // Make the connection 'ready'.
  506. controller.connectChannel(atIndex: 1)
  507. controller.sendSettingsToChannel(atIndex: 1)
  508. self.eventLoop.run()
  509. XCTAssertNoThrow(try w2.wait())
  510. controller.openStreamInChannel(atIndex: 1)
  511. XCTAssertEqual(pool.sync.reservedStreams, 1)
  512. XCTAssertEqual(pool.sync.availableStreams, 99)
  513. // Return a stream for the _quiescing_ connection: nothing should change in the pool.
  514. controller.closeStreamInChannel(atIndex: 0)
  515. XCTAssertEqual(pool.sync.reservedStreams, 1)
  516. XCTAssertEqual(pool.sync.availableStreams, 99)
  517. // Return a stream for the new connection.
  518. controller.closeStreamInChannel(atIndex: 1)
  519. XCTAssertEqual(reservationsReturned, [1, 1])
  520. XCTAssertEqual(pool.sync.reservedStreams, 0)
  521. XCTAssertEqual(pool.sync.availableStreams, 100)
  522. }
  523. }
  524. // MARK: - Helpers
  525. internal final class ChannelController {
  526. private var channels: [EmbeddedChannel] = []
  527. internal var count: Int {
  528. return self.channels.count
  529. }
  530. internal func finish() {
  531. while let channel = self.channels.popLast() {
  532. // We're okay with this throwing: some channels are left in a bad state (i.e. with errors).
  533. _ = try? channel.finish()
  534. }
  535. }
  536. private func isValidIndex(
  537. _ index: Int,
  538. file: StaticString = #file,
  539. line: UInt = #line
  540. ) -> Bool {
  541. let isValid = self.channels.indices.contains(index)
  542. XCTAssertTrue(isValid, "Invalid connection index '\(index)'", file: file, line: line)
  543. return isValid
  544. }
  545. internal func connectChannel(
  546. atIndex index: Int,
  547. file: StaticString = #file,
  548. line: UInt = #line
  549. ) {
  550. guard self.isValidIndex(index, file: file, line: line) else { return }
  551. XCTAssertNoThrow(
  552. try self.channels[index].connect(to: .init(unixDomainSocketPath: "/")),
  553. file: file,
  554. line: line
  555. )
  556. }
  557. internal func fireChannelInactiveForChannel(
  558. atIndex index: Int,
  559. file: StaticString = #file,
  560. line: UInt = #line
  561. ) {
  562. guard self.isValidIndex(index, file: file, line: line) else { return }
  563. self.channels[index].pipeline.fireChannelInactive()
  564. }
  565. internal func throwError(
  566. _ error: Error,
  567. inChannelAtIndex index: Int,
  568. file: StaticString = #file,
  569. line: UInt = #line
  570. ) {
  571. guard self.isValidIndex(index, file: file, line: line) else { return }
  572. self.channels[index].pipeline.fireErrorCaught(error)
  573. }
  574. internal func sendSettingsToChannel(
  575. atIndex index: Int,
  576. maxConcurrentStreams: Int = 100,
  577. file: StaticString = #file,
  578. line: UInt = #line
  579. ) {
  580. guard self.isValidIndex(index, file: file, line: line) else { return }
  581. let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
  582. let settingsFrame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  583. XCTAssertNoThrow(try self.channels[index].writeInbound(settingsFrame), file: file, line: line)
  584. }
  585. internal func sendGoAwayToChannel(
  586. atIndex index: Int,
  587. file: StaticString = #file,
  588. line: UInt = #line
  589. ) {
  590. guard self.isValidIndex(index, file: file, line: line) else { return }
  591. let goAwayFrame = HTTP2Frame(
  592. streamID: .rootStream,
  593. payload: .goAway(lastStreamID: .maxID, errorCode: .noError, opaqueData: nil)
  594. )
  595. XCTAssertNoThrow(try self.channels[index].writeInbound(goAwayFrame), file: file, line: line)
  596. }
  597. internal func openStreamInChannel(
  598. atIndex index: Int,
  599. file: StaticString = #file,
  600. line: UInt = #line
  601. ) {
  602. guard self.isValidIndex(index, file: file, line: line) else { return }
  603. // The details don't matter here.
  604. let event = NIOHTTP2StreamCreatedEvent(
  605. streamID: .rootStream,
  606. localInitialWindowSize: nil,
  607. remoteInitialWindowSize: nil
  608. )
  609. self.channels[index].pipeline.fireUserInboundEventTriggered(event)
  610. }
  611. internal func closeStreamInChannel(
  612. atIndex index: Int,
  613. file: StaticString = #file,
  614. line: UInt = #line
  615. ) {
  616. guard self.isValidIndex(index, file: file, line: line) else { return }
  617. // The details don't matter here.
  618. let event = StreamClosedEvent(streamID: .rootStream, reason: nil)
  619. self.channels[index].pipeline.fireUserInboundEventTriggered(event)
  620. }
  621. }
  622. extension ChannelController: ConnectionManagerChannelProvider {
  623. internal func makeChannel(
  624. managedBy connectionManager: ConnectionManager,
  625. onEventLoop eventLoop: EventLoop,
  626. connectTimeout: TimeAmount?,
  627. logger: Logger
  628. ) -> EventLoopFuture<Channel> {
  629. let channel = EmbeddedChannel(loop: eventLoop as! EmbeddedEventLoop)
  630. self.channels.append(channel)
  631. let multiplexer = HTTP2StreamMultiplexer(
  632. mode: .client,
  633. channel: channel,
  634. inboundStreamInitializer: nil
  635. )
  636. let idleHandler = GRPCIdleHandler(
  637. connectionManager: connectionManager,
  638. multiplexer: multiplexer,
  639. idleTimeout: .minutes(5),
  640. keepalive: ClientConnectionKeepalive(),
  641. logger: logger
  642. )
  643. XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(idleHandler))
  644. XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(multiplexer))
  645. return eventLoop.makeSucceededFuture(channel)
  646. }
  647. }
  648. internal struct HookedStreamLender: StreamLender {
  649. internal var onReturnStreams: (Int) -> Void
  650. internal var onUpdateMaxAvailableStreams: (Int) -> Void
  651. internal func returnStreams(_ count: Int, to pool: ConnectionPool) {
  652. self.onReturnStreams(count)
  653. }
  654. internal func changeStreamCapacity(by max: Int, for pool: ConnectionPool) {
  655. self.onUpdateMaxAvailableStreams(max)
  656. }
  657. }