2
0

PoolManagerStateMachine.swift 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. @usableFromInline
  18. internal struct PoolManagerStateMachine {
  19. /// The current state.
  20. @usableFromInline
  21. internal var state: State
  22. @usableFromInline
  23. internal init(_ state: State) {
  24. self.state = state
  25. }
  26. @usableFromInline
  27. internal enum State {
  28. case inactive
  29. case active(ActiveState)
  30. case shuttingDown(EventLoopFuture<Void>)
  31. case shutdown
  32. case _modifying
  33. }
  34. @usableFromInline
  35. internal struct ActiveState {
  36. @usableFromInline
  37. internal var pools: [EventLoopID: PerPoolState]
  38. @usableFromInline
  39. internal init(
  40. poolKeys: [PoolManager.ConnectionPoolKey],
  41. assumedMaxAvailableStreamsPerPool: Int
  42. ) {
  43. self.pools = Dictionary(uniqueKeysWithValues: poolKeys.map { key in
  44. let value = PerPoolState(
  45. poolIndex: key.index,
  46. assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
  47. )
  48. return (key.eventLoopID, value)
  49. })
  50. }
  51. }
  52. /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
  53. /// `self.state` to the `State` modified by the closure.
  54. @inlinable
  55. internal mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
  56. var state = State._modifying
  57. swap(&self.state, &state)
  58. defer {
  59. self.state = state
  60. }
  61. return modify(&state)
  62. }
  63. /// Returns whether the pool is shutdown or in the process of shutting down.
  64. @usableFromInline
  65. internal var isShutdownOrShuttingDown: Bool {
  66. switch self.state {
  67. case .shuttingDown, .shutdown:
  68. return true
  69. case .inactive, .active:
  70. return false
  71. case ._modifying:
  72. preconditionFailure()
  73. }
  74. }
  75. /// Activate the pool manager by providing an array of connection pools.
  76. ///
  77. /// - Parameters:
  78. /// - keys: The index and `EventLoopID` of the pools.
  79. /// - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
  80. /// is, the product of the assumed value of max concurrent streams and the number of
  81. /// connections per pool).
  82. @usableFromInline
  83. internal mutating func activatePools(
  84. keyedBy keys: [PoolManager.ConnectionPoolKey],
  85. assumingPerPoolCapacity capacity: Int
  86. ) {
  87. self.modifyingState { state in
  88. switch state {
  89. case .inactive:
  90. state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity))
  91. case .active, .shuttingDown, .shutdown, ._modifying:
  92. preconditionFailure()
  93. }
  94. }
  95. }
  96. /// Select and reserve a stream from a connection pool.
  97. @inlinable
  98. mutating func reserveStream(
  99. preferringPoolWithEventLoopID eventLoopID: EventLoopID?
  100. ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
  101. return self.modifyingState { state in
  102. switch state {
  103. case var .active(active):
  104. let connectionPoolIndex: PoolManager.ConnectionPoolIndex
  105. if let index = eventLoopID.flatMap({ eventLoopID in
  106. active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
  107. }) {
  108. connectionPoolIndex = index
  109. } else {
  110. // Nothing on the preferred event loop; fallback to the pool with the most available
  111. // streams.
  112. connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
  113. }
  114. state = .active(active)
  115. return .success(connectionPoolIndex)
  116. case .inactive:
  117. return .failure(.notInitialized)
  118. case .shuttingDown, .shutdown:
  119. return .failure(.shutdown)
  120. case ._modifying:
  121. preconditionFailure()
  122. }
  123. }
  124. }
  125. /// Return streams to the given pool.
  126. mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
  127. self.modifyingState { state in
  128. switch state {
  129. case var .active(active):
  130. active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
  131. state = .active(active)
  132. case .shuttingDown, .shutdown:
  133. ()
  134. case .inactive, ._modifying:
  135. // If the manager is inactive there are no pools which can return streams.
  136. preconditionFailure()
  137. }
  138. }
  139. }
  140. /// Update the capacity for the given pool.
  141. mutating func changeStreamCapacity(
  142. by delta: Int,
  143. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  144. ) {
  145. self.modifyingState { state in
  146. switch state {
  147. case var .active(active):
  148. active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
  149. state = .active(active)
  150. case .shuttingDown, .shutdown:
  151. ()
  152. case .inactive, ._modifying:
  153. // If the manager is inactive there are no pools which can update their capacity.
  154. preconditionFailure()
  155. }
  156. }
  157. }
  158. enum ShutdownAction {
  159. case shutdownPools
  160. case alreadyShutdown
  161. case alreadyShuttingDown(EventLoopFuture<Void>)
  162. }
  163. mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
  164. self.modifyingState { state in
  165. switch state {
  166. case .inactive:
  167. state = .shutdown
  168. return .alreadyShutdown
  169. case .active:
  170. state = .shuttingDown(promise.futureResult)
  171. return .shutdownPools
  172. case let .shuttingDown(future):
  173. return .alreadyShuttingDown(future)
  174. case .shutdown:
  175. return .alreadyShutdown
  176. case ._modifying:
  177. preconditionFailure()
  178. }
  179. }
  180. }
  181. mutating func shutdownComplete() {
  182. self.modifyingState { state in
  183. switch state {
  184. case .shuttingDown:
  185. state = .shutdown
  186. case .inactive, .active, .shutdown, ._modifying:
  187. preconditionFailure()
  188. }
  189. }
  190. }
  191. }
  192. extension PoolManagerStateMachine.ActiveState {
  193. @usableFromInline
  194. mutating func reserveStreamFromPool(
  195. onEventLoopWithID eventLoopID: EventLoopID
  196. ) -> PoolManager.ConnectionPoolIndex? {
  197. return self.pools[eventLoopID]?.reserveStream()
  198. }
  199. @usableFromInline
  200. mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
  201. // We don't allow pools to be empty (while active).
  202. assert(!self.pools.isEmpty)
  203. var mostAvailableStreams = Int.min
  204. var mostAvailableIndex = self.pools.values.startIndex
  205. var index = mostAvailableIndex
  206. while index != self.pools.values.endIndex {
  207. let availableStreams = self.pools.values[index].availableStreams
  208. if availableStreams > mostAvailableStreams {
  209. mostAvailableIndex = index
  210. mostAvailableStreams = availableStreams
  211. }
  212. self.pools.values.formIndex(after: &index)
  213. }
  214. return self.pools.values[mostAvailableIndex].reserveStream()
  215. }
  216. mutating func returnStreams(
  217. _ count: Int,
  218. toPoolOnEventLoopWithID eventLoopID: EventLoopID
  219. ) {
  220. self.pools[eventLoopID]?.returnReservedStreams(count)
  221. }
  222. mutating func increaseMaxAvailableStreams(
  223. by delta: Int,
  224. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  225. ) {
  226. self.pools[eventLoopID]?.maxAvailableStreams += delta
  227. }
  228. }