PassthroughMessageSource.swift 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. /// The source of messages for a ``PassthroughMessageSequence``.`
  20. ///
  21. /// Values may be provided to the source with calls to ``yield(_:)`` which returns whether the value
  22. /// was accepted (and how many values are yet to be consumed) -- or dropped.
  23. ///
  24. /// The backing storage has an unbounded capacity and callers should use the number of unconsumed
  25. /// values returned from ``yield(_:)`` as an indication of when to stop providing values.
  26. ///
  27. /// The source must be finished exactly once by calling ``finish()`` or ``finish(throwing:)`` to
  28. /// indicate that the sequence should end with an error.
  29. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  30. @usableFromInline
  31. internal final class PassthroughMessageSource<Element: Sendable, Failure: Error> {
  32. @usableFromInline
  33. internal typealias _ContinuationResult = Result<Element?, Error>
  34. /// All state in this class must be accessed via the lock.
  35. ///
  36. /// - Important: We use a `class` with a lock rather than an `actor` as we must guarantee that
  37. /// calls to ``yield(_:)`` are not reordered.
  38. @usableFromInline
  39. internal let _lock: Lock
  40. /// A queue of elements which may be consumed as soon as there is demand.
  41. @usableFromInline
  42. internal var _continuationResults: CircularBuffer<_ContinuationResult>
  43. /// A continuation which will be resumed in the future. The continuation must be `nil`
  44. /// if ``continuationResults`` is not empty.
  45. @usableFromInline
  46. internal var _continuation: Optional<CheckedContinuation<Element?, Error>>
  47. /// True if a terminal continuation result (`.success(nil)` or `.failure()`) has been seen.
  48. /// No more values may be enqueued to `continuationResults` if this is `true`.
  49. @usableFromInline
  50. internal var _isTerminated: Bool
  51. @usableFromInline
  52. internal init(initialBufferCapacity: Int = 16) {
  53. self._lock = Lock()
  54. self._continuationResults = CircularBuffer(initialCapacity: initialBufferCapacity)
  55. self._continuation = nil
  56. self._isTerminated = false
  57. }
  58. // MARK: - Append / Yield
  59. @usableFromInline
  60. internal enum YieldResult: Hashable {
  61. /// The value was accepted. The `queueDepth` indicates how many elements are waiting to be
  62. /// consumed.
  63. ///
  64. /// If `queueDepth` is zero then the value was consumed immediately.
  65. case accepted(queueDepth: Int)
  66. /// The value was dropped because the source has already been finished.
  67. case dropped
  68. }
  69. @inlinable
  70. @discardableResult
  71. internal func yield(_ element: Element) -> YieldResult {
  72. let continuationResult: _ContinuationResult = .success(element)
  73. return self._yield(continuationResult, isTerminator: false)
  74. }
  75. @inlinable
  76. @discardableResult
  77. internal func finish(throwing error: Failure? = nil) -> YieldResult {
  78. let continuationResult: _ContinuationResult = error.map { .failure($0) } ?? .success(nil)
  79. return self._yield(continuationResult, isTerminator: true)
  80. }
  81. @usableFromInline
  82. internal enum _YieldResult {
  83. /// The sequence has already been terminated; drop the element.
  84. case alreadyTerminated
  85. /// The element was added to the queue to be consumed later.
  86. case queued(Int)
  87. /// Demand for an element already existed: complete the continuation with the result being
  88. /// yielded.
  89. case resume(CheckedContinuation<Element?, Error>)
  90. }
  91. @inlinable
  92. internal func _yield(
  93. _ continuationResult: _ContinuationResult, isTerminator: Bool
  94. ) -> YieldResult {
  95. let result: _YieldResult = self._lock.withLock {
  96. if self._isTerminated {
  97. return .alreadyTerminated
  98. } else {
  99. self._isTerminated = isTerminator
  100. }
  101. if let continuation = self._continuation {
  102. self._continuation = nil
  103. return .resume(continuation)
  104. } else {
  105. self._continuationResults.append(continuationResult)
  106. return .queued(self._continuationResults.count)
  107. }
  108. }
  109. let yieldResult: YieldResult
  110. switch result {
  111. case let .queued(size):
  112. yieldResult = .accepted(queueDepth: size)
  113. case let .resume(continuation):
  114. // If we resume a continuation then the queue must be empty
  115. yieldResult = .accepted(queueDepth: 0)
  116. continuation.resume(with: continuationResult)
  117. case .alreadyTerminated:
  118. yieldResult = .dropped
  119. }
  120. return yieldResult
  121. }
  122. // MARK: - Next
  123. @inlinable
  124. internal func consumeNextElement() async throws -> Element? {
  125. self._lock.lock()
  126. if let nextResult = self._continuationResults.popFirst() {
  127. self._lock.unlock()
  128. return try nextResult.get()
  129. } else if self._isTerminated {
  130. self._lock.unlock()
  131. return nil
  132. }
  133. // Slow path; we need a continuation.
  134. return try await withTaskCancellationHandler {
  135. try await withCheckedThrowingContinuation { continuation in
  136. // Nothing buffered and not terminated yet: save the continuation for later.
  137. precondition(self._continuation == nil)
  138. self._continuation = continuation
  139. self._lock.unlock()
  140. }
  141. } onCancel: {
  142. let continuation: CheckedContinuation<Element?, Error>? = self._lock.withLock {
  143. let cont = self._continuation
  144. self._continuation = nil
  145. return cont
  146. }
  147. continuation?.resume(throwing: CancellationError())
  148. }
  149. }
  150. }
  151. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  152. // @unchecked is ok: mutable state is accessed/modified via a lock.
  153. extension PassthroughMessageSource: @unchecked Sendable {}
  154. #endif // compiler(>=5.6)