PoolManagerStateMachine.swift 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. internal struct PoolManagerStateMachine {
  18. /// The current state.
  19. private var state: State
  20. internal init(_ state: State) {
  21. self.state = state
  22. }
  23. internal enum State {
  24. case inactive
  25. case active(ActiveState)
  26. case shuttingDown(EventLoopFuture<Void>)
  27. case shutdown
  28. case _modifying
  29. }
  30. internal struct ActiveState {
  31. internal var pools: [EventLoopID: PerPoolState]
  32. internal init(
  33. poolKeys: [PoolManager.ConnectionPoolKey],
  34. assumedMaxAvailableStreamsPerPool: Int
  35. ) {
  36. self.pools = Dictionary(uniqueKeysWithValues: poolKeys.map { key in
  37. let value = PerPoolState(
  38. poolIndex: key.index,
  39. assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
  40. )
  41. return (key.eventLoopID, value)
  42. })
  43. }
  44. }
  45. /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
  46. /// `self.state` to the `State` modified by the closure.
  47. private mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
  48. var state = State._modifying
  49. swap(&self.state, &state)
  50. defer {
  51. self.state = state
  52. }
  53. return modify(&state)
  54. }
  55. /// Returns whether the pool is shutdown or in the process of shutting down.
  56. internal var isShutdownOrShuttingDown: Bool {
  57. switch self.state {
  58. case .shuttingDown, .shutdown:
  59. return true
  60. case .inactive, .active:
  61. return false
  62. case ._modifying:
  63. preconditionFailure()
  64. }
  65. }
  66. /// Activate the pool manager by providing an array of connection pools.
  67. ///
  68. /// - Parameters:
  69. /// - keys: The index and `EventLoopID` of the pools.
  70. /// - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
  71. /// is, the product of the assumed value of max concurrent streams and the number of
  72. /// connections per pool).
  73. internal mutating func activatePools(
  74. keyedBy keys: [PoolManager.ConnectionPoolKey],
  75. assumingPerPoolCapacity capacity: Int
  76. ) {
  77. self.modifyingState { state in
  78. switch state {
  79. case .inactive:
  80. state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity))
  81. case .active, .shuttingDown, .shutdown, ._modifying:
  82. preconditionFailure()
  83. }
  84. }
  85. }
  86. /// Select and reserve a stream from a connection pool.
  87. mutating func reserveStream(
  88. preferringPoolWithEventLoopID eventLoopID: EventLoopID?
  89. ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
  90. return self.modifyingState { state in
  91. switch state {
  92. case var .active(active):
  93. let connectionPoolIndex: PoolManager.ConnectionPoolIndex
  94. if let index = eventLoopID.flatMap({ eventLoopID in
  95. active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
  96. }) {
  97. connectionPoolIndex = index
  98. } else {
  99. // Nothing on the preferred event loop; fallback to the pool with the most available
  100. // streams.
  101. connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
  102. }
  103. state = .active(active)
  104. return .success(connectionPoolIndex)
  105. case .inactive:
  106. return .failure(.notInitialized)
  107. case .shuttingDown, .shutdown:
  108. return .failure(.shutdown)
  109. case ._modifying:
  110. preconditionFailure()
  111. }
  112. }
  113. }
  114. /// Return streams to the given pool.
  115. mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
  116. self.modifyingState { state in
  117. switch state {
  118. case var .active(active):
  119. active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
  120. state = .active(active)
  121. case .shuttingDown, .shutdown:
  122. ()
  123. case .inactive, ._modifying:
  124. // If the manager is inactive there are no pools which can return streams.
  125. preconditionFailure()
  126. }
  127. }
  128. }
  129. /// Update the capacity for the given pool.
  130. mutating func changeStreamCapacity(
  131. by delta: Int,
  132. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  133. ) {
  134. self.modifyingState { state in
  135. switch state {
  136. case var .active(active):
  137. active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
  138. state = .active(active)
  139. case .shuttingDown, .shutdown:
  140. ()
  141. case .inactive, ._modifying:
  142. // If the manager is inactive there are no pools which can update their capacity.
  143. preconditionFailure()
  144. }
  145. }
  146. }
  147. enum ShutdownAction {
  148. case shutdownPools
  149. case alreadyShutdown
  150. case alreadyShuttingDown(EventLoopFuture<Void>)
  151. }
  152. mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
  153. self.modifyingState { state in
  154. switch state {
  155. case .inactive:
  156. state = .shutdown
  157. return .alreadyShutdown
  158. case .active:
  159. state = .shuttingDown(promise.futureResult)
  160. return .shutdownPools
  161. case let .shuttingDown(future):
  162. return .alreadyShuttingDown(future)
  163. case .shutdown:
  164. return .alreadyShutdown
  165. case ._modifying:
  166. preconditionFailure()
  167. }
  168. }
  169. }
  170. mutating func shutdownComplete() {
  171. self.modifyingState { state in
  172. switch state {
  173. case .shuttingDown:
  174. state = .shutdown
  175. case .inactive, .active, .shutdown, ._modifying:
  176. preconditionFailure()
  177. }
  178. }
  179. }
  180. }
  181. extension PoolManagerStateMachine.ActiveState {
  182. mutating func reserveStreamFromPool(
  183. onEventLoopWithID eventLoopID: EventLoopID
  184. ) -> PoolManager.ConnectionPoolIndex? {
  185. return self.pools[eventLoopID]?.reserveStream()
  186. }
  187. mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
  188. // We don't allow pools to be empty (while active).
  189. assert(!self.pools.isEmpty)
  190. var mostAvailableStreams = Int.min
  191. var mostAvailableIndex = self.pools.values.startIndex
  192. var index = mostAvailableIndex
  193. while index != self.pools.values.endIndex {
  194. let availableStreams = self.pools.values[index].availableStreams
  195. if availableStreams > mostAvailableStreams {
  196. mostAvailableIndex = index
  197. mostAvailableStreams = availableStreams
  198. }
  199. self.pools.values.formIndex(after: &index)
  200. }
  201. return self.pools.values[mostAvailableIndex].reserveStream()
  202. }
  203. mutating func returnStreams(
  204. _ count: Int,
  205. toPoolOnEventLoopWithID eventLoopID: EventLoopID
  206. ) {
  207. self.pools[eventLoopID]?.returnReservedStreams(count)
  208. }
  209. mutating func increaseMaxAvailableStreams(
  210. by delta: Int,
  211. forPoolOnEventLoopWithID eventLoopID: EventLoopID
  212. ) {
  213. self.pools[eventLoopID]?.maxAvailableStreams += delta
  214. }
  215. }