PoolManagerStateMachineTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOConcurrencyHelpers
  17. import NIOCore
  18. import NIOEmbedded
  19. import XCTest
  20. @testable import GRPC
  21. class PoolManagerStateMachineTests: GRPCTestCase {
  22. private func makeConnectionPool(
  23. on eventLoop: EventLoop,
  24. maxWaiters: Int = 100,
  25. maxConcurrentStreams: Int = 100,
  26. loadThreshold: Double = 0.9,
  27. connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
  28. makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  29. ) -> ConnectionPool {
  30. return ConnectionPool(
  31. eventLoop: eventLoop,
  32. maxWaiters: maxWaiters,
  33. minConnections: 0,
  34. reservationLoadThreshold: loadThreshold,
  35. assumedMaxConcurrentStreams: maxConcurrentStreams,
  36. connectionBackoff: connectionBackoff,
  37. channelProvider: HookedChannelProvider(makeChannel),
  38. streamLender: HookedStreamLender(
  39. onReturnStreams: { _ in },
  40. onUpdateMaxAvailableStreams: { _ in }
  41. ),
  42. delegate: nil,
  43. logger: self.logger
  44. )
  45. }
  46. private func makeInitializedPools(
  47. group: EmbeddedEventLoopGroup,
  48. connectionsPerPool: Int = 1
  49. ) -> [ConnectionPool] {
  50. let pools = group.loops.map {
  51. self.makeConnectionPool(on: $0) { _, _ in fatalError() }
  52. }
  53. for pool in pools {
  54. pool.initialize(connections: 1)
  55. }
  56. return pools
  57. }
  58. private func makeConnectionPoolKeys(
  59. for pools: [ConnectionPool]
  60. ) -> [PoolManager.ConnectionPoolKey] {
  61. return pools.enumerated().map { index, pool in
  62. return .init(index: .init(index), eventLoopID: pool.eventLoop.id)
  63. }
  64. }
  65. func testReserveStreamOnPreferredEventLoop() {
  66. let group = EmbeddedEventLoopGroup(loops: 5)
  67. defer {
  68. XCTAssertNoThrow(try group.syncShutdownGracefully())
  69. }
  70. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  71. let keys = self.makeConnectionPoolKeys(for: pools)
  72. var state = PoolManagerStateMachine(
  73. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
  74. )
  75. for (index, loop) in group.loops.enumerated() {
  76. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: loop.id)
  77. reservePreferredLoop.assertSuccess {
  78. XCTAssertEqual($0, PoolManager.ConnectionPoolIndex(index))
  79. }
  80. }
  81. }
  82. func testReserveStreamOnPreferredEventLoopWhichNoPoolUses() {
  83. let group = EmbeddedEventLoopGroup(loops: 1)
  84. defer {
  85. XCTAssertNoThrow(try group.syncShutdownGracefully())
  86. }
  87. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  88. let keys = self.makeConnectionPoolKeys(for: pools)
  89. var state = PoolManagerStateMachine(
  90. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
  91. )
  92. let anotherLoop = EmbeddedEventLoop()
  93. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: anotherLoop.id)
  94. reservePreferredLoop.assertSuccess {
  95. XCTAssert((0 ..< pools.count).contains($0.value))
  96. }
  97. }
  98. func testReserveStreamWithNoPreferenceReturnsPoolWithHighestAvailability() {
  99. let group = EmbeddedEventLoopGroup(loops: 5)
  100. defer {
  101. XCTAssertNoThrow(try group.syncShutdownGracefully())
  102. }
  103. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  104. let keys = self.makeConnectionPoolKeys(for: pools)
  105. var state = PoolManagerStateMachine(.inactive)
  106. state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100, statsTask: nil)
  107. // Reserve some streams.
  108. for (index, loop) in group.loops.enumerated() {
  109. for _ in 0 ..< 2 * index {
  110. state.reserveStream(preferringPoolWithEventLoopID: loop.id).assertSuccess()
  111. }
  112. }
  113. // We expect pools[0] to be reserved.
  114. // index: 0 1 2 3 4
  115. // available: 100 98 96 94 92
  116. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  117. XCTAssertEqual(poolIndex.value, 0)
  118. }
  119. // We expect pools[0] to be reserved again.
  120. // index: 0 1 2 3 4
  121. // available: 99 98 96 94 92
  122. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  123. XCTAssertEqual(poolIndex.value, 0)
  124. }
  125. // Return some streams to pools[3].
  126. state.returnStreams(5, toPoolOnEventLoopWithID: pools[3].eventLoop.id)
  127. // As we returned streams to pools[3] we expect this to be the current state:
  128. // index: 0 1 2 3 4
  129. // available: 98 98 96 99 92
  130. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  131. XCTAssertEqual(poolIndex.value, 3)
  132. }
  133. // Give an event loop preference for a pool which has more streams reserved.
  134. state.reserveStream(
  135. preferringPoolWithEventLoopID: pools[2].eventLoop.id
  136. ).assertSuccess { poolIndex in
  137. XCTAssertEqual(poolIndex.value, 2)
  138. }
  139. // Update the capacity for one pool, this makes it relatively more available.
  140. state.changeStreamCapacity(by: 900, forPoolOnEventLoopWithID: pools[4].eventLoop.id)
  141. // pools[4] has a bunch more streams now:
  142. // index: 0 1 2 3 4
  143. // available: 98 98 96 99 992
  144. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  145. XCTAssertEqual(poolIndex.value, 4)
  146. }
  147. }
  148. func testReserveStreamWithNoEventLoopPreference() {
  149. let group = EmbeddedEventLoopGroup(loops: 1)
  150. defer {
  151. XCTAssertNoThrow(try group.syncShutdownGracefully())
  152. }
  153. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  154. let keys = self.makeConnectionPoolKeys(for: pools)
  155. var state = PoolManagerStateMachine(
  156. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
  157. )
  158. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil)
  159. reservePreferredLoop.assertSuccess()
  160. }
  161. func testReserveStreamWhenInactive() {
  162. var state = PoolManagerStateMachine(.inactive)
  163. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  164. action.assertFailure { error in
  165. XCTAssertEqual(error, .notInitialized)
  166. }
  167. }
  168. func testReserveStreamWhenShuttingDown() {
  169. let future = EmbeddedEventLoop().makeSucceededFuture(())
  170. var state = PoolManagerStateMachine(.shuttingDown(future))
  171. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  172. action.assertFailure { error in
  173. XCTAssertEqual(error, .shutdown)
  174. }
  175. }
  176. func testReserveStreamWhenShutdown() {
  177. var state = PoolManagerStateMachine(.shutdown)
  178. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  179. action.assertFailure { error in
  180. XCTAssertEqual(error, .shutdown)
  181. }
  182. }
  183. func testShutdownWhenInactive() {
  184. let loop = EmbeddedEventLoop()
  185. let promise = loop.makePromise(of: Void.self)
  186. var state = PoolManagerStateMachine(.inactive)
  187. let action = state.shutdown(promise: promise)
  188. action.assertAlreadyShutdown()
  189. // Don't leak the promise.
  190. promise.succeed(())
  191. }
  192. func testShutdownWhenActive() {
  193. let group = EmbeddedEventLoopGroup(loops: 5)
  194. defer {
  195. XCTAssertNoThrow(try group.syncShutdownGracefully())
  196. }
  197. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  198. let keys = self.makeConnectionPoolKeys(for: pools)
  199. var state = PoolManagerStateMachine(
  200. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil))
  201. )
  202. let promise = group.loops[0].makePromise(of: Void.self)
  203. promise.succeed(())
  204. state.shutdown(promise: promise).assertShutdownPools()
  205. }
  206. func testShutdownWhenShuttingDown() {
  207. let loop = EmbeddedEventLoop()
  208. let future = loop.makeSucceededVoidFuture()
  209. var state = PoolManagerStateMachine(.shuttingDown(future))
  210. let promise = loop.makePromise(of: Void.self)
  211. promise.succeed(())
  212. let action = state.shutdown(promise: promise)
  213. action.assertAlreadyShuttingDown {
  214. XCTAssert($0 === future)
  215. }
  216. // Fully shutdown.
  217. state.shutdownComplete()
  218. state.shutdown(promise: promise).assertAlreadyShutdown()
  219. }
  220. func testShutdownWhenShutdown() {
  221. let loop = EmbeddedEventLoop()
  222. var state = PoolManagerStateMachine(.shutdown)
  223. let promise = loop.makePromise(of: Void.self)
  224. promise.succeed(())
  225. let action = state.shutdown(promise: promise)
  226. action.assertAlreadyShutdown()
  227. }
  228. }
  229. // MARK: - Test Helpers
  230. extension Result {
  231. internal func assertSuccess(
  232. file: StaticString = #filePath,
  233. line: UInt = #line,
  234. verify: (Success) -> Void = { _ in }
  235. ) {
  236. if case let .success(value) = self {
  237. verify(value)
  238. } else {
  239. XCTFail("Expected '.success' but got '\(self)'", file: file, line: line)
  240. }
  241. }
  242. internal func assertFailure(
  243. file: StaticString = #filePath,
  244. line: UInt = #line,
  245. verify: (Failure) -> Void = { _ in }
  246. ) {
  247. if case let .failure(value) = self {
  248. verify(value)
  249. } else {
  250. XCTFail("Expected '.failure' but got '\(self)'", file: file, line: line)
  251. }
  252. }
  253. }
  254. extension PoolManagerStateMachine.ShutdownAction {
  255. internal func assertShutdownPools(
  256. file: StaticString = #filePath,
  257. line: UInt = #line
  258. ) {
  259. if case .shutdownPools = self {
  260. ()
  261. } else {
  262. XCTFail("Expected '.shutdownPools' but got '\(self)'", file: file, line: line)
  263. }
  264. }
  265. internal func assertAlreadyShuttingDown(
  266. file: StaticString = #filePath,
  267. line: UInt = #line,
  268. verify: (EventLoopFuture<Void>) -> Void = { _ in }
  269. ) {
  270. if case let .alreadyShuttingDown(future) = self {
  271. verify(future)
  272. } else {
  273. XCTFail("Expected '.alreadyShuttingDown' but got '\(self)'", file: file, line: line)
  274. }
  275. }
  276. internal func assertAlreadyShutdown(file: StaticString = #filePath, line: UInt = #line) {
  277. if case .alreadyShutdown = self {
  278. ()
  279. } else {
  280. XCTFail("Expected '.alreadyShutdown' but got '\(self)'", file: file, line: line)
  281. }
  282. }
  283. }
  284. /// An `EventLoopGroup` of `EmbeddedEventLoop`s.
  285. private final class EmbeddedEventLoopGroup: EventLoopGroup {
  286. internal let loops: [EmbeddedEventLoop]
  287. internal let lock = NIOLock()
  288. internal var index = 0
  289. internal init(loops: Int) {
  290. self.loops = (0 ..< loops).map { _ in EmbeddedEventLoop() }
  291. }
  292. internal func next() -> EventLoop {
  293. let index: Int = self.lock.withLock {
  294. let index = self.index
  295. self.index += 1
  296. return index
  297. }
  298. return self.loops[index % self.loops.count]
  299. }
  300. internal func makeIterator() -> EventLoopIterator {
  301. return EventLoopIterator(self.loops)
  302. }
  303. internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
  304. var shutdownError: Error?
  305. for loop in self.loops {
  306. loop.shutdownGracefully(queue: queue) { error in
  307. if let error = error {
  308. shutdownError = error
  309. }
  310. }
  311. }
  312. queue.sync {
  313. callback(shutdownError)
  314. }
  315. }
  316. }