ConnectionPool+PerConnectionState.swift 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOHTTP2
  17. extension ConnectionPool {
  18. @usableFromInline
  19. internal struct PerConnectionState {
  20. /// The connection manager for this connection.
  21. @usableFromInline
  22. internal var manager: ConnectionManager
  23. /// Stream availability for this connection, `nil` if the connection is not available.
  24. @usableFromInline
  25. internal var _availability: StreamAvailability?
  26. @usableFromInline
  27. internal var isAvailable: Bool {
  28. return self._availability != nil
  29. }
  30. @usableFromInline
  31. internal var isQuiescing: Bool {
  32. get {
  33. return self._availability?.isQuiescing ?? false
  34. }
  35. set {
  36. self._availability?.isQuiescing = true
  37. }
  38. }
  39. @usableFromInline
  40. internal struct StreamAvailability {
  41. @usableFromInline
  42. struct Utilization {
  43. @usableFromInline
  44. var used: Int
  45. @usableFromInline
  46. var capacity: Int
  47. @usableFromInline
  48. init(used: Int, capacity: Int) {
  49. self.used = used
  50. self.capacity = capacity
  51. }
  52. }
  53. @usableFromInline
  54. var multiplexer: HTTP2StreamMultiplexer
  55. /// Maximum number of available streams.
  56. @usableFromInline
  57. var maxAvailable: Int
  58. /// Number of streams reserved.
  59. @usableFromInline
  60. var reserved: Int = 0
  61. /// Number of streams opened.
  62. @usableFromInline
  63. var open: Int = 0
  64. @usableFromInline
  65. var isQuiescing = false
  66. /// Number of available streams.
  67. @usableFromInline
  68. var available: Int {
  69. return self.isQuiescing ? 0 : self.maxAvailable - self.reserved
  70. }
  71. /// Increment the reserved streams and return the multiplexer.
  72. @usableFromInline
  73. mutating func reserve() -> HTTP2StreamMultiplexer {
  74. assert(!self.isQuiescing)
  75. self.reserved += 1
  76. return self.multiplexer
  77. }
  78. @usableFromInline
  79. mutating func opened() -> Utilization {
  80. self.open += 1
  81. return .init(used: self.open, capacity: self.maxAvailable)
  82. }
  83. /// Decrement the reserved streams by one.
  84. @usableFromInline
  85. mutating func `return`() -> Utilization {
  86. self.reserved -= 1
  87. self.open -= 1
  88. assert(self.reserved >= 0)
  89. assert(self.open >= 0)
  90. return .init(used: self.open, capacity: self.maxAvailable)
  91. }
  92. }
  93. @usableFromInline
  94. init(manager: ConnectionManager) {
  95. self.manager = manager
  96. self._availability = nil
  97. }
  98. /// The number of reserved streams.
  99. @usableFromInline
  100. internal var reservedStreams: Int {
  101. return self._availability?.reserved ?? 0
  102. }
  103. /// The number of streams available to reserve. If this value is greater than zero then it is
  104. /// safe to call `reserveStream()` and force unwrap the result.
  105. @usableFromInline
  106. internal var availableStreams: Int {
  107. return self._availability?.available ?? 0
  108. }
  109. /// The maximum number of concurrent streams which may be available for the connection, if it
  110. /// is ready.
  111. @usableFromInline
  112. internal var maxAvailableStreams: Int? {
  113. return self._availability?.maxAvailable
  114. }
  115. /// Reserve a stream and return the stream multiplexer. Returns `nil` if it is not possible
  116. /// to reserve a stream.
  117. ///
  118. /// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
  119. @usableFromInline
  120. internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
  121. return self._availability?.reserve()
  122. }
  123. @usableFromInline
  124. internal mutating func openedStream() -> PerConnectionState.StreamAvailability.Utilization? {
  125. return self._availability?.opened()
  126. }
  127. /// Return a reserved stream to the connection.
  128. @usableFromInline
  129. internal mutating func returnStream() -> PerConnectionState.StreamAvailability.Utilization? {
  130. return self._availability?.return()
  131. }
  132. /// Update the maximum concurrent streams available on the connection, marking it as available
  133. /// if it was not already.
  134. ///
  135. /// Returns the previous value for max concurrent streams if the connection was ready.
  136. @usableFromInline
  137. internal mutating func updateMaxConcurrentStreams(_ maxConcurrentStreams: Int) -> Int? {
  138. if var availability = self._availability {
  139. var oldValue = maxConcurrentStreams
  140. swap(&availability.maxAvailable, &oldValue)
  141. self._availability = availability
  142. return oldValue
  143. } else {
  144. self._availability = self.manager.sync.multiplexer.map {
  145. StreamAvailability(multiplexer: $0, maxAvailable: maxConcurrentStreams)
  146. }
  147. return nil
  148. }
  149. }
  150. /// Mark the connection as unavailable returning the number of reserved streams.
  151. @usableFromInline
  152. internal mutating func unavailable() -> Int {
  153. defer {
  154. self._availability = nil
  155. }
  156. return self._availability?.reserved ?? 0
  157. }
  158. }
  159. }