2
0

PoolManagerStateMachine.swift 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. @usableFromInline
  18. internal struct PoolManagerStateMachine {
  19. /// The current state.
  20. @usableFromInline
  21. internal var state: State
  22. @usableFromInline
  23. internal init(_ state: State) {
  24. self.state = state
  25. }
  26. @usableFromInline
  27. internal enum State {
  28. case inactive
  29. case active(ActiveState)
  30. case shuttingDown(EventLoopFuture<Void>)
  31. case shutdown
  32. case _modifying
  33. }
  34. @usableFromInline
  35. internal struct ActiveState {
  36. @usableFromInline
  37. internal var pools: [EventLoopID: PerPoolState]
  38. @usableFromInline
  39. internal init(
  40. poolKeys: [PoolManager.ConnectionPoolKey],
  41. assumedMaxAvailableStreamsPerPool: Int
  42. ) {
  43. self.pools = Dictionary(
  44. uniqueKeysWithValues: poolKeys.map { key in
  45. let value = PerPoolState(
  46. poolIndex: key.index,
  47. assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
  48. )
  49. return (key.eventLoopID, value)
  50. }
  51. )
  52. }
  53. }
  54. /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
  55. /// `self.state` to the `State` modified by the closure.
  56. @inlinable
  57. internal mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
  58. var state = State._modifying
  59. swap(&self.state, &state)
  60. defer {
  61. self.state = state
  62. }
  63. return modify(&state)
  64. }
  65. /// Returns whether the pool is shutdown or in the process of shutting down.
  66. @usableFromInline
  67. internal var isShutdownOrShuttingDown: Bool {
  68. switch self.state {
  69. case .shuttingDown, .shutdown:
  70. return true
  71. case .inactive, .active:
  72. return false
  73. case ._modifying:
  74. preconditionFailure()
  75. }
  76. }
  77. /// Activate the pool manager by providing an array of connection pools.
  78. ///
  79. /// - Parameters:
  80. /// - keys: The index and `EventLoopID` of the pools.
  81. /// - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
  82. /// is, the product of the assumed value of max concurrent streams and the number of
  83. /// connections per pool).
  84. @usableFromInline
  85. internal mutating func activatePools(
  86. keyedBy keys: [PoolManager.ConnectionPoolKey],
  87. assumingPerPoolCapacity capacity: Int
  88. ) {
  89. self.modifyingState { state in
  90. switch state {
  91. case .inactive:
  92. state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity))
  93. case .active, .shuttingDown, .shutdown, ._modifying:
  94. preconditionFailure()
  95. }
  96. }
  97. }
  98. /// Select and reserve a stream from a connection pool.
  99. @inlinable
  100. mutating func reserveStream(
  101. preferringPoolWithEventLoopID eventLoopID: EventLoopID?
  102. ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
  103. return self.modifyingState { state in
  104. switch state {
  105. case var .active(active):
  106. let connectionPoolIndex: PoolManager.ConnectionPoolIndex
  107. if let index = eventLoopID.flatMap({ eventLoopID in
  108. active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
  109. }) {
  110. connectionPoolIndex = index
  111. } else {
  112. // Nothing on the preferred event loop; fallback to the pool with the most available
  113. // streams.
  114. connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
  115. }
  116. state = .active(active)
  117. return .success(connectionPoolIndex)
  118. case .inactive:
  119. return .failure(.notInitialized)
  120. case .shuttingDown, .shutdown:
  121. return .failure(.shutdown)
  122. case ._modifying:
  123. preconditionFailure()
  124. }
  125. }
  126. }
  127. /// Return streams to the given pool.
  128. mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
  129. self.modifyingState { state in
  130. switch state {
  131. case var .active(active):
  132. active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
  133. state = .active(active)
  134. case .shuttingDown, .shutdown:
  135. ()
  136. case .inactive, ._modifying:
  137. // If the manager is inactive there are no pools which can return streams.
  138. preconditionFailure()
  139. }
  140. }
  141. }
  142. /// Update the capacity for the given pool.
  143. mutating func changeStreamCapacity(
  144. by delta: Int,
  145. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  146. ) {
  147. self.modifyingState { state in
  148. switch state {
  149. case var .active(active):
  150. active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
  151. state = .active(active)
  152. case .shuttingDown, .shutdown:
  153. ()
  154. case .inactive, ._modifying:
  155. // If the manager is inactive there are no pools which can update their capacity.
  156. preconditionFailure()
  157. }
  158. }
  159. }
  160. enum ShutdownAction {
  161. case shutdownPools
  162. case alreadyShutdown
  163. case alreadyShuttingDown(EventLoopFuture<Void>)
  164. }
  165. mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
  166. self.modifyingState { state in
  167. switch state {
  168. case .inactive:
  169. state = .shutdown
  170. return .alreadyShutdown
  171. case .active:
  172. state = .shuttingDown(promise.futureResult)
  173. return .shutdownPools
  174. case let .shuttingDown(future):
  175. return .alreadyShuttingDown(future)
  176. case .shutdown:
  177. return .alreadyShutdown
  178. case ._modifying:
  179. preconditionFailure()
  180. }
  181. }
  182. }
  183. mutating func shutdownComplete() {
  184. self.modifyingState { state in
  185. switch state {
  186. case .shuttingDown:
  187. state = .shutdown
  188. case .inactive, .active, .shutdown, ._modifying:
  189. preconditionFailure()
  190. }
  191. }
  192. }
  193. }
  194. extension PoolManagerStateMachine.ActiveState {
  195. @usableFromInline
  196. mutating func reserveStreamFromPool(
  197. onEventLoopWithID eventLoopID: EventLoopID
  198. ) -> PoolManager.ConnectionPoolIndex? {
  199. return self.pools[eventLoopID]?.reserveStream()
  200. }
  201. @usableFromInline
  202. mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
  203. // We don't allow pools to be empty (while active).
  204. assert(!self.pools.isEmpty)
  205. var mostAvailableStreams = Int.min
  206. var mostAvailableIndex = self.pools.values.startIndex
  207. var index = mostAvailableIndex
  208. while index != self.pools.values.endIndex {
  209. let availableStreams = self.pools.values[index].availableStreams
  210. if availableStreams > mostAvailableStreams {
  211. mostAvailableIndex = index
  212. mostAvailableStreams = availableStreams
  213. }
  214. self.pools.values.formIndex(after: &index)
  215. }
  216. return self.pools.values[mostAvailableIndex].reserveStream()
  217. }
  218. mutating func returnStreams(
  219. _ count: Int,
  220. toPoolOnEventLoopWithID eventLoopID: EventLoopID
  221. ) {
  222. self.pools[eventLoopID]?.returnReservedStreams(count)
  223. }
  224. mutating func increaseMaxAvailableStreams(
  225. by delta: Int,
  226. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  227. ) {
  228. self.pools[eventLoopID]?.maxAvailableStreams += delta
  229. }
  230. }