ConnectionPool+PerConnectionState.swift 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOHTTP2
  17. extension ConnectionPool {
  18. @usableFromInline
  19. internal struct PerConnectionState {
  20. /// The connection manager for this connection.
  21. @usableFromInline
  22. internal var manager: ConnectionManager
  23. /// Stream availability for this connection, `nil` if the connection is not available.
  24. @usableFromInline
  25. internal var _availability: StreamAvailability?
  26. @usableFromInline
  27. internal var isQuiescing: Bool {
  28. get {
  29. return self._availability?.isQuiescing ?? false
  30. }
  31. set {
  32. self._availability?.isQuiescing = true
  33. }
  34. }
  35. @usableFromInline
  36. internal struct StreamAvailability {
  37. @usableFromInline
  38. struct Utilization {
  39. @usableFromInline
  40. var used: Int
  41. @usableFromInline
  42. var capacity: Int
  43. @usableFromInline
  44. init(used: Int, capacity: Int) {
  45. self.used = used
  46. self.capacity = capacity
  47. }
  48. }
  49. @usableFromInline
  50. var multiplexer: HTTP2StreamMultiplexer
  51. /// Maximum number of available streams.
  52. @usableFromInline
  53. var maxAvailable: Int
  54. /// Number of streams reserved.
  55. @usableFromInline
  56. var reserved: Int = 0
  57. /// Number of streams opened.
  58. @usableFromInline
  59. var open: Int = 0
  60. @usableFromInline
  61. var isQuiescing = false
  62. /// Number of available streams.
  63. @usableFromInline
  64. var available: Int {
  65. return self.isQuiescing ? 0 : self.maxAvailable - self.reserved
  66. }
  67. /// Increment the reserved streams and return the multiplexer.
  68. @usableFromInline
  69. mutating func reserve() -> HTTP2StreamMultiplexer {
  70. assert(!self.isQuiescing)
  71. self.reserved += 1
  72. return self.multiplexer
  73. }
  74. @usableFromInline
  75. mutating func opened() -> Utilization {
  76. self.open += 1
  77. return .init(used: self.open, capacity: self.maxAvailable)
  78. }
  79. /// Decrement the reserved streams by one.
  80. @usableFromInline
  81. mutating func `return`() -> Utilization {
  82. self.reserved -= 1
  83. self.open -= 1
  84. assert(self.reserved >= 0)
  85. assert(self.open >= 0)
  86. return .init(used: self.open, capacity: self.maxAvailable)
  87. }
  88. }
  89. @usableFromInline
  90. init(manager: ConnectionManager) {
  91. self.manager = manager
  92. self._availability = nil
  93. }
  94. /// The number of reserved streams.
  95. @usableFromInline
  96. internal var reservedStreams: Int {
  97. return self._availability?.reserved ?? 0
  98. }
  99. /// The number of streams available to reserve. If this value is greater than zero then it is
  100. /// safe to call `reserveStream()` and force unwrap the result.
  101. @usableFromInline
  102. internal var availableStreams: Int {
  103. return self._availability?.available ?? 0
  104. }
  105. /// The maximum number of concurrent streams which may be available for the connection, if it
  106. /// is ready.
  107. @usableFromInline
  108. internal var maxAvailableStreams: Int? {
  109. return self._availability?.maxAvailable
  110. }
  111. /// Reserve a stream and return the stream multiplexer. Returns `nil` if it is not possible
  112. /// to reserve a stream.
  113. ///
  114. /// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
  115. @usableFromInline
  116. internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
  117. return self._availability?.reserve()
  118. }
  119. @usableFromInline
  120. internal mutating func openedStream() -> PerConnectionState.StreamAvailability.Utilization? {
  121. return self._availability?.opened()
  122. }
  123. /// Return a reserved stream to the connection.
  124. @usableFromInline
  125. internal mutating func returnStream() -> PerConnectionState.StreamAvailability.Utilization? {
  126. return self._availability?.return()
  127. }
  128. /// Update the maximum concurrent streams available on the connection, marking it as available
  129. /// if it was not already.
  130. ///
  131. /// Returns the previous value for max concurrent streams if the connection was ready.
  132. @usableFromInline
  133. internal mutating func updateMaxConcurrentStreams(_ maxConcurrentStreams: Int) -> Int? {
  134. if var availability = self._availability {
  135. var oldValue = maxConcurrentStreams
  136. swap(&availability.maxAvailable, &oldValue)
  137. self._availability = availability
  138. return oldValue
  139. } else {
  140. self._availability = self.manager.sync.multiplexer.map {
  141. StreamAvailability(multiplexer: $0, maxAvailable: maxConcurrentStreams)
  142. }
  143. return nil
  144. }
  145. }
  146. /// Mark the connection as unavailable returning the number of reserved streams.
  147. @usableFromInline
  148. internal mutating func unavailable() -> Int {
  149. defer {
  150. self._availability = nil
  151. }
  152. return self._availability?.reserved ?? 0
  153. }
  154. }
  155. }