| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #if compiler(>=5.5)
- import NIOConcurrencyHelpers
- import NIOCore
- /// The source of messages for a ``PassthroughMessageSequence``.`
- ///
- /// Values may be provided to the source with calls to ``yield(_:)`` which returns whether the value
- /// was accepted (and how many values are yet to be consumed) -- or dropped.
- ///
- /// The backing storage has an unbounded capacity and callers should use the number of unconsumed
- /// values returned from ``yield(_:)`` as an indication of when to stop providing values.
- ///
- /// The source must be finished exactly once by calling ``finish()`` or ``finish(throwing:)`` to
- /// indicate that the sequence should end with an error.
- @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
- @usableFromInline
- internal final class PassthroughMessageSource<Element, Failure: Error> {
- @usableFromInline
- internal typealias _ContinuationResult = Result<Element?, Error>
- /// All state in this class must be accessed via the lock.
- ///
- /// - Important: We use a `class` with a lock rather than an `actor` as we must guarantee that
- /// calls to ``yield(_:)`` are not reordered.
- @usableFromInline
- internal let _lock: Lock
- /// A queue of elements which may be consumed as soon as there is demand.
- @usableFromInline
- internal var _continuationResults: CircularBuffer<_ContinuationResult>
- /// A continuation which will be resumed in the future. The continuation must be `nil`
- /// if ``continuationResults`` is not empty.
- @usableFromInline
- internal var _continuation: Optional<CheckedContinuation<Element?, Error>>
- /// True if a terminal continuation result (`.success(nil)` or `.failure()`) has been seen.
- /// No more values may be enqueued to `continuationResults` if this is `true`.
- @usableFromInline
- internal var _isTerminated: Bool
- @usableFromInline
- internal init(initialBufferCapacity: Int = 16) {
- self._lock = Lock()
- self._continuationResults = CircularBuffer(initialCapacity: initialBufferCapacity)
- self._continuation = nil
- self._isTerminated = false
- }
- // MARK: - Append / Yield
- @usableFromInline
- internal enum YieldResult: Hashable {
- /// The value was accepted. The `queueDepth` indicates how many elements are waiting to be
- /// consumed.
- ///
- /// If `queueDepth` is zero then the value was consumed immediately.
- case accepted(queueDepth: Int)
- /// The value was dropped because the source has already been finished.
- case dropped
- }
- @inlinable
- internal func yield(_ element: Element) -> YieldResult {
- let continuationResult: _ContinuationResult = .success(element)
- return self._yield(continuationResult, isTerminator: false)
- }
- @inlinable
- internal func finish(throwing error: Failure? = nil) -> YieldResult {
- let continuationResult: _ContinuationResult = error.map { .failure($0) } ?? .success(nil)
- return self._yield(continuationResult, isTerminator: true)
- }
- @usableFromInline
- internal enum _YieldResult {
- /// The sequence has already been terminated; drop the element.
- case alreadyTerminated
- /// The element was added to the queue to be consumed later.
- case queued(Int)
- /// Demand for an element already existed: complete the continuation with the result being
- /// yielded.
- case resume(CheckedContinuation<Element?, Error>)
- }
- @inlinable
- internal func _yield(
- _ continuationResult: _ContinuationResult, isTerminator: Bool
- ) -> YieldResult {
- let result: _YieldResult = self._lock.withLock {
- if self._isTerminated {
- return .alreadyTerminated
- } else if let continuation = self._continuation {
- self._continuation = nil
- return .resume(continuation)
- } else {
- self._isTerminated = isTerminator
- self._continuationResults.append(continuationResult)
- return .queued(self._continuationResults.count)
- }
- }
- let yieldResult: YieldResult
- switch result {
- case let .queued(size):
- yieldResult = .accepted(queueDepth: size)
- case let .resume(continuation):
- // If we resume a continuation then the queue must be empty
- yieldResult = .accepted(queueDepth: 0)
- continuation.resume(with: continuationResult)
- case .alreadyTerminated:
- yieldResult = .dropped
- }
- return yieldResult
- }
- // MARK: - Next
- @inlinable
- internal func consumeNextElement() async throws -> Element? {
- return try await withCheckedThrowingContinuation {
- self._consumeNextElement(continuation: $0)
- }
- }
- @inlinable
- internal func _consumeNextElement(continuation: CheckedContinuation<Element?, Error>) {
- let continuationResult: _ContinuationResult? = self._lock.withLock {
- if let nextResult = self._continuationResults.popFirst() {
- return nextResult
- } else {
- // Nothing buffered and not terminated yet: save the continuation for later.
- assert(self._continuation == nil)
- self._continuation = continuation
- return nil
- }
- }
- if let continuationResult = continuationResult {
- continuation.resume(with: continuationResult)
- }
- }
- }
- #endif // compiler(>=5.5)
|