PoolManagerStateMachineTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOEmbedded
  20. import XCTest
  21. class PoolManagerStateMachineTests: GRPCTestCase {
  22. private func makeConnectionPool(
  23. on eventLoop: EventLoop,
  24. maxWaiters: Int = 100,
  25. maxConcurrentStreams: Int = 100,
  26. loadThreshold: Double = 0.9,
  27. makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
  28. ) -> ConnectionPool {
  29. return ConnectionPool(
  30. eventLoop: eventLoop,
  31. maxWaiters: maxWaiters,
  32. reservationLoadThreshold: loadThreshold,
  33. assumedMaxConcurrentStreams: maxConcurrentStreams,
  34. channelProvider: HookedChannelProvider(makeChannel),
  35. streamLender: HookedStreamLender(
  36. onReturnStreams: { _ in },
  37. onUpdateMaxAvailableStreams: { _ in }
  38. ),
  39. logger: self.logger.wrapped
  40. )
  41. }
  42. private func makeInitializedPools(
  43. group: EmbeddedEventLoopGroup,
  44. connectionsPerPool: Int = 1
  45. ) -> [ConnectionPool] {
  46. let pools = group.loops.map {
  47. self.makeConnectionPool(on: $0) { _, _ in fatalError() }
  48. }
  49. for pool in pools {
  50. pool.initialize(connections: 1)
  51. }
  52. return pools
  53. }
  54. private func makeConnectionPoolKeys(
  55. for pools: [ConnectionPool]
  56. ) -> [PoolManager.ConnectionPoolKey] {
  57. return pools.enumerated().map { index, pool in
  58. return .init(index: .init(index), eventLoopID: pool.eventLoop.id)
  59. }
  60. }
  61. func testReserveStreamOnPreferredEventLoop() {
  62. let group = EmbeddedEventLoopGroup(loops: 5)
  63. defer {
  64. XCTAssertNoThrow(try group.syncShutdownGracefully())
  65. }
  66. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  67. let keys = self.makeConnectionPoolKeys(for: pools)
  68. var state = PoolManagerStateMachine(
  69. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  70. )
  71. for (index, loop) in group.loops.enumerated() {
  72. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: loop.id)
  73. reservePreferredLoop.assertSuccess {
  74. XCTAssertEqual($0, PoolManager.ConnectionPoolIndex(index))
  75. }
  76. }
  77. }
  78. func testReserveStreamOnPreferredEventLoopWhichNoPoolUses() {
  79. let group = EmbeddedEventLoopGroup(loops: 1)
  80. defer {
  81. XCTAssertNoThrow(try group.syncShutdownGracefully())
  82. }
  83. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  84. let keys = self.makeConnectionPoolKeys(for: pools)
  85. var state = PoolManagerStateMachine(
  86. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  87. )
  88. let anotherLoop = EmbeddedEventLoop()
  89. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: anotherLoop.id)
  90. reservePreferredLoop.assertSuccess {
  91. XCTAssert((0 ..< pools.count).contains($0.value))
  92. }
  93. }
  94. func testReserveStreamWithNoPreferenceReturnsPoolWithHighestAvailability() {
  95. let group = EmbeddedEventLoopGroup(loops: 5)
  96. defer {
  97. XCTAssertNoThrow(try group.syncShutdownGracefully())
  98. }
  99. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  100. let keys = self.makeConnectionPoolKeys(for: pools)
  101. var state = PoolManagerStateMachine(.inactive)
  102. state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100)
  103. // Reserve some streams.
  104. for (index, loop) in group.loops.enumerated() {
  105. for _ in 0 ..< 2 * index {
  106. state.reserveStream(preferringPoolWithEventLoopID: loop.id).assertSuccess()
  107. }
  108. }
  109. // We expect pools[0] to be reserved.
  110. // index: 0 1 2 3 4
  111. // available: 100 98 96 94 92
  112. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  113. XCTAssertEqual(poolIndex.value, 0)
  114. }
  115. // We expect pools[0] to be reserved again.
  116. // index: 0 1 2 3 4
  117. // available: 99 98 96 94 92
  118. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  119. XCTAssertEqual(poolIndex.value, 0)
  120. }
  121. // Return some streams to pools[3].
  122. state.returnStreams(5, toPoolOnEventLoopWithID: pools[3].eventLoop.id)
  123. // As we returned streams to pools[3] we expect this to be the current state:
  124. // index: 0 1 2 3 4
  125. // available: 98 98 96 99 92
  126. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  127. XCTAssertEqual(poolIndex.value, 3)
  128. }
  129. // Give an event loop preference for a pool which has more streams reserved.
  130. state.reserveStream(
  131. preferringPoolWithEventLoopID: pools[2].eventLoop.id
  132. ).assertSuccess { poolIndex in
  133. XCTAssertEqual(poolIndex.value, 2)
  134. }
  135. // Update the capacity for one pool, this makes it relatively more available.
  136. state.changeStreamCapacity(by: 900, forPoolOnEventLoopWithID: pools[4].eventLoop.id)
  137. // pools[4] has a bunch more streams now:
  138. // index: 0 1 2 3 4
  139. // available: 98 98 96 99 992
  140. state.reserveStream(preferringPoolWithEventLoopID: nil).assertSuccess { poolIndex in
  141. XCTAssertEqual(poolIndex.value, 4)
  142. }
  143. }
  144. func testReserveStreamWithNoEventLoopPreference() {
  145. let group = EmbeddedEventLoopGroup(loops: 1)
  146. defer {
  147. XCTAssertNoThrow(try group.syncShutdownGracefully())
  148. }
  149. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  150. let keys = self.makeConnectionPoolKeys(for: pools)
  151. var state = PoolManagerStateMachine(
  152. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  153. )
  154. let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil)
  155. reservePreferredLoop.assertSuccess()
  156. }
  157. func testReserveStreamWhenInactive() {
  158. var state = PoolManagerStateMachine(.inactive)
  159. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  160. action.assertFailure { error in
  161. XCTAssertEqual(error, .notInitialized)
  162. }
  163. }
  164. func testReserveStreamWhenShuttingDown() {
  165. let future = EmbeddedEventLoop().makeSucceededFuture(())
  166. var state = PoolManagerStateMachine(.shuttingDown(future))
  167. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  168. action.assertFailure { error in
  169. XCTAssertEqual(error, .shutdown)
  170. }
  171. }
  172. func testReserveStreamWhenShutdown() {
  173. var state = PoolManagerStateMachine(.shutdown)
  174. let action = state.reserveStream(preferringPoolWithEventLoopID: nil)
  175. action.assertFailure { error in
  176. XCTAssertEqual(error, .shutdown)
  177. }
  178. }
  179. func testShutdownWhenInactive() {
  180. let loop = EmbeddedEventLoop()
  181. let promise = loop.makePromise(of: Void.self)
  182. var state = PoolManagerStateMachine(.inactive)
  183. let action = state.shutdown(promise: promise)
  184. action.assertAlreadyShutdown()
  185. // Don't leak the promise.
  186. promise.succeed(())
  187. }
  188. func testShutdownWhenActive() {
  189. let group = EmbeddedEventLoopGroup(loops: 5)
  190. defer {
  191. XCTAssertNoThrow(try group.syncShutdownGracefully())
  192. }
  193. let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1)
  194. let keys = self.makeConnectionPoolKeys(for: pools)
  195. var state = PoolManagerStateMachine(
  196. .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100))
  197. )
  198. let promise = group.loops[0].makePromise(of: Void.self)
  199. promise.succeed(())
  200. state.shutdown(promise: promise).assertShutdownPools()
  201. }
  202. func testShutdownWhenShuttingDown() {
  203. let loop = EmbeddedEventLoop()
  204. let future = loop.makeSucceededVoidFuture()
  205. var state = PoolManagerStateMachine(.shuttingDown(future))
  206. let promise = loop.makePromise(of: Void.self)
  207. promise.succeed(())
  208. let action = state.shutdown(promise: promise)
  209. action.assertAlreadyShuttingDown {
  210. XCTAssert($0 === future)
  211. }
  212. // Fully shutdown.
  213. state.shutdownComplete()
  214. state.shutdown(promise: promise).assertAlreadyShutdown()
  215. }
  216. func testShutdownWhenShutdown() {
  217. let loop = EmbeddedEventLoop()
  218. var state = PoolManagerStateMachine(.shutdown)
  219. let promise = loop.makePromise(of: Void.self)
  220. promise.succeed(())
  221. let action = state.shutdown(promise: promise)
  222. action.assertAlreadyShutdown()
  223. }
  224. }
  225. // MARK: - Test Helpers
  226. extension Result {
  227. internal func assertSuccess(
  228. file: StaticString = #file,
  229. line: UInt = #line,
  230. verify: (Success) -> Void = { _ in }
  231. ) {
  232. if case let .success(value) = self {
  233. verify(value)
  234. } else {
  235. XCTFail("Expected '.success' but got '\(self)'", file: file, line: line)
  236. }
  237. }
  238. internal func assertFailure(
  239. file: StaticString = #file,
  240. line: UInt = #line,
  241. verify: (Failure) -> Void = { _ in }
  242. ) {
  243. if case let .failure(value) = self {
  244. verify(value)
  245. } else {
  246. XCTFail("Expected '.failure' but got '\(self)'", file: file, line: line)
  247. }
  248. }
  249. }
  250. extension PoolManagerStateMachine.ShutdownAction {
  251. internal func assertShutdownPools(
  252. file: StaticString = #file,
  253. line: UInt = #line
  254. ) {
  255. if case .shutdownPools = self {
  256. ()
  257. } else {
  258. XCTFail("Expected '.shutdownPools' but got '\(self)'", file: file, line: line)
  259. }
  260. }
  261. internal func assertAlreadyShuttingDown(
  262. file: StaticString = #file,
  263. line: UInt = #line,
  264. verify: (EventLoopFuture<Void>) -> Void = { _ in }
  265. ) {
  266. if case let .alreadyShuttingDown(future) = self {
  267. verify(future)
  268. } else {
  269. XCTFail("Expected '.alreadyShuttingDown' but got '\(self)'", file: file, line: line)
  270. }
  271. }
  272. internal func assertAlreadyShutdown(file: StaticString = #file, line: UInt = #line) {
  273. if case .alreadyShutdown = self {
  274. ()
  275. } else {
  276. XCTFail("Expected '.alreadyShutdown' but got '\(self)'", file: file, line: line)
  277. }
  278. }
  279. }
  280. /// An `EventLoopGroup` of `EmbeddedEventLoop`s.
  281. private final class EmbeddedEventLoopGroup: EventLoopGroup {
  282. internal let loops: [EmbeddedEventLoop]
  283. internal let lock = Lock()
  284. internal var index = 0
  285. internal init(loops: Int) {
  286. self.loops = (0 ..< loops).map { _ in EmbeddedEventLoop() }
  287. }
  288. internal func next() -> EventLoop {
  289. let index: Int = self.lock.withLock {
  290. let index = self.index
  291. self.index += 1
  292. return index
  293. }
  294. return self.loops[index % self.loops.count]
  295. }
  296. internal func makeIterator() -> EventLoopIterator {
  297. return EventLoopIterator(self.loops)
  298. }
  299. internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
  300. var shutdownError: Error?
  301. for loop in self.loops {
  302. loop.shutdownGracefully(queue: queue) { error in
  303. if let error = error {
  304. shutdownError = error
  305. }
  306. }
  307. }
  308. queue.sync {
  309. callback(shutdownError)
  310. }
  311. }
  312. }