PoolManagerStateMachine.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. @usableFromInline
  18. internal struct PoolManagerStateMachine {
  19. /// The current state.
  20. @usableFromInline
  21. internal var state: State
  22. @usableFromInline
  23. internal init(_ state: State) {
  24. self.state = state
  25. }
  26. @usableFromInline
  27. internal enum State {
  28. case inactive
  29. case active(ActiveState)
  30. case shuttingDown(EventLoopFuture<Void>)
  31. case shutdown
  32. case _modifying
  33. }
  34. @usableFromInline
  35. internal struct ActiveState {
  36. @usableFromInline
  37. internal var pools: [EventLoopID: PerPoolState]
  38. @usableFromInline
  39. internal var statsTask: RepeatedTask?
  40. @usableFromInline
  41. internal init(
  42. poolKeys: [PoolManager.ConnectionPoolKey],
  43. assumedMaxAvailableStreamsPerPool: Int,
  44. statsTask: RepeatedTask?
  45. ) {
  46. self.pools = Dictionary(
  47. uniqueKeysWithValues: poolKeys.map { key in
  48. let value = PerPoolState(
  49. poolIndex: key.index,
  50. assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
  51. )
  52. return (key.eventLoopID, value)
  53. }
  54. )
  55. self.statsTask = statsTask
  56. }
  57. }
  58. /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
  59. /// `self.state` to the `State` modified by the closure.
  60. @inlinable
  61. internal mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
  62. var state = State._modifying
  63. swap(&self.state, &state)
  64. defer {
  65. self.state = state
  66. }
  67. return modify(&state)
  68. }
  69. /// Returns whether the pool is shutdown or in the process of shutting down.
  70. @usableFromInline
  71. internal var isShutdownOrShuttingDown: Bool {
  72. switch self.state {
  73. case .shuttingDown, .shutdown:
  74. return true
  75. case .inactive, .active:
  76. return false
  77. case ._modifying:
  78. preconditionFailure()
  79. }
  80. }
  81. /// Activate the pool manager by providing an array of connection pools.
  82. ///
  83. /// - Parameters:
  84. /// - keys: The index and `EventLoopID` of the pools.
  85. /// - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
  86. /// is, the product of the assumed value of max concurrent streams and the number of
  87. /// connections per pool).
  88. @usableFromInline
  89. internal mutating func activatePools(
  90. keyedBy keys: [PoolManager.ConnectionPoolKey],
  91. assumingPerPoolCapacity capacity: Int,
  92. statsTask: RepeatedTask?
  93. ) {
  94. self.modifyingState { state in
  95. switch state {
  96. case .inactive:
  97. let active = ActiveState(
  98. poolKeys: keys,
  99. assumedMaxAvailableStreamsPerPool: capacity,
  100. statsTask: statsTask
  101. )
  102. state = .active(active)
  103. case .active, .shuttingDown, .shutdown, ._modifying:
  104. preconditionFailure()
  105. }
  106. }
  107. }
  108. /// Select and reserve a stream from a connection pool.
  109. @inlinable
  110. mutating func reserveStream(
  111. preferringPoolWithEventLoopID eventLoopID: EventLoopID?
  112. ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
  113. return self.modifyingState { state in
  114. switch state {
  115. case var .active(active):
  116. let connectionPoolIndex: PoolManager.ConnectionPoolIndex
  117. if let index = eventLoopID.flatMap({ eventLoopID in
  118. active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
  119. }) {
  120. connectionPoolIndex = index
  121. } else {
  122. // Nothing on the preferred event loop; fallback to the pool with the most available
  123. // streams.
  124. connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
  125. }
  126. state = .active(active)
  127. return .success(connectionPoolIndex)
  128. case .inactive:
  129. return .failure(.notInitialized)
  130. case .shuttingDown, .shutdown:
  131. return .failure(.shutdown)
  132. case ._modifying:
  133. preconditionFailure()
  134. }
  135. }
  136. }
  137. /// Return streams to the given pool.
  138. mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
  139. self.modifyingState { state in
  140. switch state {
  141. case var .active(active):
  142. active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
  143. state = .active(active)
  144. case .shuttingDown, .shutdown:
  145. ()
  146. case .inactive, ._modifying:
  147. // If the manager is inactive there are no pools which can return streams.
  148. preconditionFailure()
  149. }
  150. }
  151. }
  152. /// Update the capacity for the given pool.
  153. mutating func changeStreamCapacity(
  154. by delta: Int,
  155. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  156. ) {
  157. self.modifyingState { state in
  158. switch state {
  159. case var .active(active):
  160. active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
  161. state = .active(active)
  162. case .shuttingDown, .shutdown:
  163. ()
  164. case .inactive, ._modifying:
  165. // If the manager is inactive there are no pools which can update their capacity.
  166. preconditionFailure()
  167. }
  168. }
  169. }
  170. enum ShutdownAction {
  171. case shutdownPools(RepeatedTask?)
  172. case alreadyShutdown
  173. case alreadyShuttingDown(EventLoopFuture<Void>)
  174. }
  175. mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
  176. self.modifyingState { state in
  177. switch state {
  178. case .inactive:
  179. state = .shutdown
  180. return .alreadyShutdown
  181. case .active(let active):
  182. state = .shuttingDown(promise.futureResult)
  183. return .shutdownPools(active.statsTask)
  184. case let .shuttingDown(future):
  185. return .alreadyShuttingDown(future)
  186. case .shutdown:
  187. return .alreadyShutdown
  188. case ._modifying:
  189. preconditionFailure()
  190. }
  191. }
  192. }
  193. mutating func shutdownComplete() {
  194. self.modifyingState { state in
  195. switch state {
  196. case .shuttingDown:
  197. state = .shutdown
  198. case .inactive, .active, .shutdown, ._modifying:
  199. preconditionFailure()
  200. }
  201. }
  202. }
  203. }
  204. extension PoolManagerStateMachine.ActiveState {
  205. @usableFromInline
  206. mutating func reserveStreamFromPool(
  207. onEventLoopWithID eventLoopID: EventLoopID
  208. ) -> PoolManager.ConnectionPoolIndex? {
  209. return self.pools[eventLoopID]?.reserveStream()
  210. }
  211. @usableFromInline
  212. mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
  213. // We don't allow pools to be empty (while active).
  214. assert(!self.pools.isEmpty)
  215. var mostAvailableStreams = Int.min
  216. var mostAvailableIndex = self.pools.values.startIndex
  217. var index = mostAvailableIndex
  218. while index != self.pools.values.endIndex {
  219. let availableStreams = self.pools.values[index].availableStreams
  220. if availableStreams > mostAvailableStreams {
  221. mostAvailableIndex = index
  222. mostAvailableStreams = availableStreams
  223. }
  224. self.pools.values.formIndex(after: &index)
  225. }
  226. return self.pools.values[mostAvailableIndex].reserveStream()
  227. }
  228. mutating func returnStreams(
  229. _ count: Int,
  230. toPoolOnEventLoopWithID eventLoopID: EventLoopID
  231. ) {
  232. self.pools[eventLoopID]?.returnReservedStreams(count)
  233. }
  234. mutating func increaseMaxAvailableStreams(
  235. by delta: Int,
  236. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  237. ) {
  238. self.pools[eventLoopID]?.maxAvailableStreams += delta
  239. }
  240. }