ConnectionPool+PerConnectionState.swift 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOHTTP2
  17. extension ConnectionPool {
  18. @usableFromInline
  19. internal struct PerConnectionState {
  20. /// The connection manager for this connection.
  21. @usableFromInline
  22. internal var manager: ConnectionManager
  23. /// Stream availability for this connection, `nil` if the connection is not available.
  24. @usableFromInline
  25. internal var _availability: StreamAvailability?
  26. @usableFromInline
  27. internal struct StreamAvailability {
  28. @usableFromInline
  29. var multiplexer: HTTP2StreamMultiplexer
  30. /// Maximum number of available streams.
  31. @usableFromInline
  32. var maxAvailable: Int
  33. /// Number of streams reserved.
  34. @usableFromInline
  35. var reserved: Int = 0
  36. /// Number of available streams.
  37. @usableFromInline
  38. var available: Int {
  39. return self.maxAvailable - self.reserved
  40. }
  41. /// Increment the reserved streams and return the multiplexer.
  42. @usableFromInline
  43. mutating func reserve() -> HTTP2StreamMultiplexer {
  44. self.reserved += 1
  45. return self.multiplexer
  46. }
  47. /// Decrement the reserved streams by one.
  48. @usableFromInline
  49. mutating func `return`() {
  50. self.reserved -= 1
  51. assert(self.reserved >= 0)
  52. }
  53. }
  54. @usableFromInline
  55. init(manager: ConnectionManager) {
  56. self.manager = manager
  57. self._availability = nil
  58. }
  59. /// The number of reserved streams.
  60. @usableFromInline
  61. internal var reservedStreams: Int {
  62. return self._availability?.reserved ?? 0
  63. }
  64. /// The number of streams available to reserve. If this value is greater than zero then it is
  65. /// safe to call `reserveStream()` and force unwrap the result.
  66. @usableFromInline
  67. internal var availableStreams: Int {
  68. return self._availability?.available ?? 0
  69. }
  70. /// The maximum number of concurrent streams which may be available for the connection, if it
  71. /// is ready.
  72. @usableFromInline
  73. internal var maxAvailableStreams: Int? {
  74. return self._availability?.maxAvailable
  75. }
  76. /// Reserve a stream and return the stream multiplexer. Returns `nil` if it is not possible
  77. /// to reserve a stream.
  78. ///
  79. /// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
  80. @usableFromInline
  81. internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
  82. return self._availability?.reserve()
  83. }
  84. /// Return a reserved stream to the connection.
  85. @usableFromInline
  86. internal mutating func returnStream() {
  87. self._availability?.return()
  88. }
  89. /// Update the maximum concurrent streams available on the connection, marking it as available
  90. /// if it was not already.
  91. ///
  92. /// Returns the previous value for max concurrent streams if the connection was ready.
  93. @usableFromInline
  94. internal mutating func updateMaxConcurrentStreams(_ maxConcurrentStreams: Int) -> Int? {
  95. if var availability = self._availability {
  96. var oldValue = maxConcurrentStreams
  97. swap(&availability.maxAvailable, &oldValue)
  98. self._availability = availability
  99. return oldValue
  100. } else {
  101. self._availability = self.manager.sync.multiplexer.map {
  102. StreamAvailability(multiplexer: $0, maxAvailable: maxConcurrentStreams)
  103. }
  104. return nil
  105. }
  106. }
  107. /// Mark the connection as unavailable returning the number of reserved streams.
  108. @usableFromInline
  109. internal mutating func unavailable() -> Int {
  110. defer {
  111. self._availability = nil
  112. }
  113. return self._availability?.reserved ?? 0
  114. }
  115. }
  116. }