PoolManagerStateMachineTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOEmbedded
  20. import XCTest
  21. class PoolManagerStateMachineTests: GRPCTestCase {
  22. private func makeConnectionPool(
  23. on eventLoop: EventLoop,
  24. maxWaiters: Int = 100,
  25. maxConcurrentStreams: Int = 100,
  26. loadThreshold: Double = 0.9,
  27. connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
  28. makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  29. ) -> ConnectionPool {
  30. return ConnectionPool(
  31. eventLoop: eventLoop,
  32. maxWaiters: maxWaiters,
  33. reservationLoadThreshold: loadThreshold,
  34. assumedMaxConcurrentStreams: maxConcurrentStreams,
  35. connectionBackoff: connectionBackoff,
  36. channelProvider: HookedChannelProvider(makeChannel),
  37. streamLender: HookedStreamLender(
  38. onReturnStreams: { _ in },
  39. onUpdateMaxAvailableStreams: { _ in }
  40. ),
  41. delegate: nil,
  42. logger: self.logger.wrapped
  43. )
  44. }
  45. private func makeInitializedPools(
  46. group: EmbeddedEventLoopGroup,
  47. connectionsPerPool: Int = 1
  48. ) -> [ConnectionPool] {
  49. let pools = group.loops.map {
  50. self.makeConnectionPool(on: $0) { _, _ in fatalError() }
  51. }
  52. for pool in pools {
  53. pool.initialize(connections: 1)
  54. }
  55. return pools
  56. }
  57. private func makeConnectionPoolKeys(
  58. for pools: [ConnectionPool]
  59. ) -> [PoolManager.ConnectionPoolKey] {
  60. return pools.enumerated().map { index, pool in
  61. return .init(index: .init(index), eventLoopID: pool.eventLoop.id)
  62. }
  63. }
  64. func testReserveStreamOnPreferredEventLoop() {
  65. let group = EmbeddedEventLoopGroup(loops: 5)
  66. defer {
  67. XCTAssertNoThrow(try group.syncShutdownGracefully())
  68. }
  69. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  70. let keys = self.makeConnectionPoolKeys(for: pools)
  71. var state = PoolManagerStateMachine(
  72. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  73. )
  74. for (index, loop) in group.loops.enumerated() {
  75. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: loop.id)
  76. reservePreferredLoop.assertSuccess {
  77. XCTAssertEqual($0, PoolManager.ConnectionPoolIndex(index))
  78. }
  79. }
  80. }
  81. func testReserveStreamOnPreferredEventLoopWhichNoPoolUses() {
  82. let group = EmbeddedEventLoopGroup(loops: 1)
  83. defer {
  84. XCTAssertNoThrow(try group.syncShutdownGracefully())
  85. }
  86. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  87. let keys = self.makeConnectionPoolKeys(for: pools)
  88. var state = PoolManagerStateMachine(
  89. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  90. )
  91. let anotherLoop = EmbeddedEventLoop()
  92. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: anotherLoop.id)
  93. reservePreferredLoop.assertSuccess {
  94. XCTAssert((0 ..< pools.count).contains($0.value))
  95. }
  96. }
  97. func testReserveStreamWithNoPreferenceReturnsPoolWithHighestAvailability() {
  98. let group = EmbeddedEventLoopGroup(loops: 5)
  99. defer {
  100. XCTAssertNoThrow(try group.syncShutdownGracefully())
  101. }
  102. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  103. let keys = self.makeConnectionPoolKeys(for: pools)
  104. var state = PoolManagerStateMachine(.inactive)
  105. state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100)
  106. // Reserve some streams.
  107. for (index, loop) in group.loops.enumerated() {
  108. for _ in 0 ..< 2 * index {
  109. state.reserveStream(preferringPoolWithEventLoopID: loop.id).assertSuccess()
  110. }
  111. }
  112. // We expect pools[0] to be reserved.
  113. // index: 0 1 2 3 4
  114. // available: 100 98 96 94 92
  115. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  116. XCTAssertEqual(poolIndex.value, 0)
  117. }
  118. // We expect pools[0] to be reserved again.
  119. // index: 0 1 2 3 4
  120. // available: 99 98 96 94 92
  121. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  122. XCTAssertEqual(poolIndex.value, 0)
  123. }
  124. // Return some streams to pools[3].
  125. state.returnStreams(5, toPoolOnEventLoopWithID: pools[3].eventLoop.id)
  126. // As we returned streams to pools[3] we expect this to be the current state:
  127. // index: 0 1 2 3 4
  128. // available: 98 98 96 99 92
  129. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  130. XCTAssertEqual(poolIndex.value, 3)
  131. }
  132. // Give an event loop preference for a pool which has more streams reserved.
  133. state.reserveStream(
  134. preferringPoolWithEventLoopID: pools[2].eventLoop.id
  135. ).assertSuccess { poolIndex in
  136. XCTAssertEqual(poolIndex.value, 2)
  137. }
  138. // Update the capacity for one pool, this makes it relatively more available.
  139. state.changeStreamCapacity(by: 900, forPoolOnEventLoopWithID: pools[4].eventLoop.id)
  140. // pools[4] has a bunch more streams now:
  141. // index: 0 1 2 3 4
  142. // available: 98 98 96 99 992
  143. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  144. XCTAssertEqual(poolIndex.value, 4)
  145. }
  146. }
  147. func testReserveStreamWithNoEventLoopPreference() {
  148. let group = EmbeddedEventLoopGroup(loops: 1)
  149. defer {
  150. XCTAssertNoThrow(try group.syncShutdownGracefully())
  151. }
  152. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  153. let keys = self.makeConnectionPoolKeys(for: pools)
  154. var state = PoolManagerStateMachine(
  155. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  156. )
  157. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil)
  158. reservePreferredLoop.assertSuccess()
  159. }
  160. func testReserveStreamWhenInactive() {
  161. var state = PoolManagerStateMachine(.inactive)
  162. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  163. action.assertFailure { error in
  164. XCTAssertEqual(error, .notInitialized)
  165. }
  166. }
  167. func testReserveStreamWhenShuttingDown() {
  168. let future = EmbeddedEventLoop().makeSucceededFuture(())
  169. var state = PoolManagerStateMachine(.shuttingDown(future))
  170. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  171. action.assertFailure { error in
  172. XCTAssertEqual(error, .shutdown)
  173. }
  174. }
  175. func testReserveStreamWhenShutdown() {
  176. var state = PoolManagerStateMachine(.shutdown)
  177. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  178. action.assertFailure { error in
  179. XCTAssertEqual(error, .shutdown)
  180. }
  181. }
  182. func testShutdownWhenInactive() {
  183. let loop = EmbeddedEventLoop()
  184. let promise = loop.makePromise(of: Void.self)
  185. var state = PoolManagerStateMachine(.inactive)
  186. let action = state.shutdown(promise: promise)
  187. action.assertAlreadyShutdown()
  188. // Don't leak the promise.
  189. promise.succeed(())
  190. }
  191. func testShutdownWhenActive() {
  192. let group = EmbeddedEventLoopGroup(loops: 5)
  193. defer {
  194. XCTAssertNoThrow(try group.syncShutdownGracefully())
  195. }
  196. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  197. let keys = self.makeConnectionPoolKeys(for: pools)
  198. var state = PoolManagerStateMachine(
  199. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  200. )
  201. let promise = group.loops[0].makePromise(of: Void.self)
  202. promise.succeed(())
  203. state.shutdown(promise: promise).assertShutdownPools()
  204. }
  205. func testShutdownWhenShuttingDown() {
  206. let loop = EmbeddedEventLoop()
  207. let future = loop.makeSucceededVoidFuture()
  208. var state = PoolManagerStateMachine(.shuttingDown(future))
  209. let promise = loop.makePromise(of: Void.self)
  210. promise.succeed(())
  211. let action = state.shutdown(promise: promise)
  212. action.assertAlreadyShuttingDown {
  213. XCTAssert($0 === future)
  214. }
  215. // Fully shutdown.
  216. state.shutdownComplete()
  217. state.shutdown(promise: promise).assertAlreadyShutdown()
  218. }
  219. func testShutdownWhenShutdown() {
  220. let loop = EmbeddedEventLoop()
  221. var state = PoolManagerStateMachine(.shutdown)
  222. let promise = loop.makePromise(of: Void.self)
  223. promise.succeed(())
  224. let action = state.shutdown(promise: promise)
  225. action.assertAlreadyShutdown()
  226. }
  227. }
  228. // MARK: - Test Helpers
  229. extension Result {
  230. internal func assertSuccess(
  231. file: StaticString = #filePath,
  232. line: UInt = #line,
  233. verify: (Success) -> Void = { _ in }
  234. ) {
  235. if case let .success(value) = self {
  236. verify(value)
  237. } else {
  238. XCTFail("Expected '.success' but got '\(self)'", file: file, line: line)
  239. }
  240. }
  241. internal func assertFailure(
  242. file: StaticString = #filePath,
  243. line: UInt = #line,
  244. verify: (Failure) -> Void = { _ in }
  245. ) {
  246. if case let .failure(value) = self {
  247. verify(value)
  248. } else {
  249. XCTFail("Expected '.failure' but got '\(self)'", file: file, line: line)
  250. }
  251. }
  252. }
  253. extension PoolManagerStateMachine.ShutdownAction {
  254. internal func assertShutdownPools(
  255. file: StaticString = #filePath,
  256. line: UInt = #line
  257. ) {
  258. if case .shutdownPools = self {
  259. ()
  260. } else {
  261. XCTFail("Expected '.shutdownPools' but got '\(self)'", file: file, line: line)
  262. }
  263. }
  264. internal func assertAlreadyShuttingDown(
  265. file: StaticString = #filePath,
  266. line: UInt = #line,
  267. verify: (EventLoopFuture<Void>) -> Void = { _ in }
  268. ) {
  269. if case let .alreadyShuttingDown(future) = self {
  270. verify(future)
  271. } else {
  272. XCTFail("Expected '.alreadyShuttingDown' but got '\(self)'", file: file, line: line)
  273. }
  274. }
  275. internal func assertAlreadyShutdown(file: StaticString = #filePath, line: UInt = #line) {
  276. if case .alreadyShutdown = self {
  277. ()
  278. } else {
  279. XCTFail("Expected '.alreadyShutdown' but got '\(self)'", file: file, line: line)
  280. }
  281. }
  282. }
  283. /// An `EventLoopGroup` of `EmbeddedEventLoop`s.
  284. private final class EmbeddedEventLoopGroup: EventLoopGroup {
  285. internal let loops: [EmbeddedEventLoop]
  286. internal let lock = NIOLock()
  287. internal var index = 0
  288. internal init(loops: Int) {
  289. self.loops = (0 ..< loops).map { _ in EmbeddedEventLoop() }
  290. }
  291. internal func next() -> EventLoop {
  292. let index: Int = self.lock.withLock {
  293. let index = self.index
  294. self.index += 1
  295. return index
  296. }
  297. return self.loops[index % self.loops.count]
  298. }
  299. internal func makeIterator() -> EventLoopIterator {
  300. return EventLoopIterator(self.loops)
  301. }
  302. internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
  303. var shutdownError: Error?
  304. for loop in self.loops {
  305. loop.shutdownGracefully(queue: queue) { error in
  306. if let error = error {
  307. shutdownError = error
  308. }
  309. }
  310. }
  311. queue.sync {
  312. callback(shutdownError)
  313. }
  314. }
  315. }