|
|
@@ -98,7 +98,7 @@ import DequeModule
|
|
|
/// ```
|
|
|
/// extension QuakeMonitor {
|
|
|
///
|
|
|
-/// static var throwingQuakes: BufferedStream<Quake, Error> {
|
|
|
+/// static var throwingQuakes: BufferedStream<Quake, any Error> {
|
|
|
/// BufferedStream { continuation in
|
|
|
/// let monitor = QuakeMonitor()
|
|
|
/// monitor.quakeHandler = { quake in
|
|
|
@@ -392,7 +392,7 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
internal func enqueueCallback(
|
|
|
callbackToken: WriteResult.CallbackToken,
|
|
|
- onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
|
|
|
+ onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
|
|
|
) {
|
|
|
self._backing.storage.enqueueProducer(
|
|
|
callbackToken: callbackToken,
|
|
|
@@ -426,14 +426,14 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
internal func write<S>(
|
|
|
contentsOf sequence: S,
|
|
|
- onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
|
|
|
+ onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
|
|
|
) where Element == S.Element, S: Sequence {
|
|
|
do {
|
|
|
let writeResult = try self.write(contentsOf: sequence)
|
|
|
|
|
|
switch writeResult {
|
|
|
case .produceMore:
|
|
|
- onProduceMore(Result<Void, Error>.success(()))
|
|
|
+ onProduceMore(Result<Void, any Error>.success(()))
|
|
|
|
|
|
case .enqueueCallback(let callbackToken):
|
|
|
self.enqueueCallback(callbackToken: callbackToken, onProduceMore: onProduceMore)
|
|
|
@@ -456,7 +456,7 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
internal func write(
|
|
|
_ element: Element,
|
|
|
- onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
|
|
|
+ onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
|
|
|
) {
|
|
|
self.write(contentsOf: CollectionOfOne(element), onProduceMore: onProduceMore)
|
|
|
}
|
|
|
@@ -542,7 +542,7 @@ extension BufferedStream {
|
|
|
/// - Parameters:
|
|
|
/// - error: The error to throw, or `nil`, to finish normally.
|
|
|
@inlinable
|
|
|
- internal func finish(throwing error: Error?) {
|
|
|
+ internal func finish(throwing error: (any Error)?) {
|
|
|
self._backing.storage.finish(error)
|
|
|
}
|
|
|
}
|
|
|
@@ -558,9 +558,9 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
internal static func makeStream(
|
|
|
of elementType: Element.Type = Element.self,
|
|
|
- throwing failureType: Error.Type = Error.self,
|
|
|
+ throwing failureType: (any Error).Type = (any Error).self,
|
|
|
backPressureStrategy: Source.BackPressureStrategy
|
|
|
- ) -> (`Self`, Source) where Error == Error {
|
|
|
+ ) -> (`Self`, Source) where any Error == any Error {
|
|
|
let storage = _BackPressuredStorage(
|
|
|
backPressureStrategy: backPressureStrategy._internalBackPressureStrategy
|
|
|
)
|
|
|
@@ -774,7 +774,7 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
func enqueueProducer(
|
|
|
callbackToken: Source.WriteResult.CallbackToken,
|
|
|
- onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
|
|
|
+ onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
|
|
|
) {
|
|
|
let action = self._stateMachine.withCriticalRegion {
|
|
|
$0.enqueueProducer(callbackToken: callbackToken, onProduceMore: onProduceMore)
|
|
|
@@ -782,10 +782,10 @@ extension BufferedStream {
|
|
|
|
|
|
switch action {
|
|
|
case .resumeProducer(let onProduceMore):
|
|
|
- onProduceMore(Result<Void, Error>.success(()))
|
|
|
+ onProduceMore(Result<Void, any Error>.success(()))
|
|
|
|
|
|
case .resumeProducerWithError(let onProduceMore, let error):
|
|
|
- onProduceMore(Result<Void, Error>.failure(error))
|
|
|
+ onProduceMore(Result<Void, any Error>.failure(error))
|
|
|
|
|
|
case .none:
|
|
|
break
|
|
|
@@ -800,7 +800,7 @@ extension BufferedStream {
|
|
|
|
|
|
switch action {
|
|
|
case .resumeProducerWithCancellationError(let onProduceMore):
|
|
|
- onProduceMore(Result<Void, Error>.failure(CancellationError()))
|
|
|
+ onProduceMore(Result<Void, any Error>.failure(CancellationError()))
|
|
|
|
|
|
case .none:
|
|
|
break
|
|
|
@@ -808,7 +808,7 @@ extension BufferedStream {
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
- func finish(_ failure: Error?) {
|
|
|
+ func finish(_ failure: (any Error)?) {
|
|
|
let action = self._stateMachine.withCriticalRegion {
|
|
|
$0.finish(failure)
|
|
|
}
|
|
|
@@ -853,7 +853,7 @@ extension BufferedStream {
|
|
|
|
|
|
case .returnElementAndResumeProducers(let element, let producerContinuations):
|
|
|
for producerContinuation in producerContinuations {
|
|
|
- producerContinuation(Result<Void, Error>.success(()))
|
|
|
+ producerContinuation(Result<Void, any Error>.success(()))
|
|
|
}
|
|
|
|
|
|
return element
|
|
|
@@ -895,7 +895,7 @@ extension BufferedStream {
|
|
|
):
|
|
|
continuation.resume(returning: element)
|
|
|
for producerContinuation in producerContinuations {
|
|
|
- producerContinuation(Result<Void, Error>.success(()))
|
|
|
+ producerContinuation(Result<Void, any Error>.success(()))
|
|
|
}
|
|
|
|
|
|
case .resumeConsumerWithErrorAndCallOnTermination(
|
|
|
@@ -996,10 +996,10 @@ extension BufferedStream {
|
|
|
var buffer: Deque<Element>
|
|
|
/// The optional consumer continuation.
|
|
|
@usableFromInline
|
|
|
- var consumerContinuation: CheckedContinuation<Element?, Error>?
|
|
|
+ var consumerContinuation: CheckedContinuation<Element?, any Error>?
|
|
|
/// The producer continuations.
|
|
|
@usableFromInline
|
|
|
- var producerContinuations: Deque<(UInt, (Result<Void, Error>) -> Void)>
|
|
|
+ var producerContinuations: Deque<(UInt, (Result<Void, any Error>) -> Void)>
|
|
|
/// The producers that have been cancelled.
|
|
|
@usableFromInline
|
|
|
var cancelledAsyncProducers: Deque<UInt>
|
|
|
@@ -1013,8 +1013,8 @@ extension BufferedStream {
|
|
|
iteratorInitialized: Bool,
|
|
|
onTermination: (@Sendable () -> Void)? = nil,
|
|
|
buffer: Deque<Element>,
|
|
|
- consumerContinuation: CheckedContinuation<Element?, Error>? = nil,
|
|
|
- producerContinuations: Deque<(UInt, (Result<Void, Error>) -> Void)>,
|
|
|
+ consumerContinuation: CheckedContinuation<Element?, any Error>? = nil,
|
|
|
+ producerContinuations: Deque<(UInt, (Result<Void, any Error>) -> Void)>,
|
|
|
cancelledAsyncProducers: Deque<UInt>,
|
|
|
hasOutstandingDemand: Bool
|
|
|
) {
|
|
|
@@ -1039,7 +1039,7 @@ extension BufferedStream {
|
|
|
var buffer: Deque<Element>
|
|
|
/// The failure that should be thrown after the last element has been consumed.
|
|
|
@usableFromInline
|
|
|
- var failure: Error?
|
|
|
+ var failure: (any Error)?
|
|
|
/// The onTermination callback.
|
|
|
@usableFromInline
|
|
|
var onTermination: (@Sendable () -> Void)?
|
|
|
@@ -1048,7 +1048,7 @@ extension BufferedStream {
|
|
|
init(
|
|
|
iteratorInitialized: Bool,
|
|
|
buffer: Deque<Element>,
|
|
|
- failure: Error? = nil,
|
|
|
+ failure: (any Error)? = nil,
|
|
|
onTermination: (@Sendable () -> Void)?
|
|
|
) {
|
|
|
self.iteratorInitialized = iteratorInitialized
|
|
|
@@ -1157,7 +1157,7 @@ extension BufferedStream {
|
|
|
case callOnTermination((@Sendable () -> Void)?)
|
|
|
/// Indicates that all producers should be failed and `onTermination` should be called.
|
|
|
case failProducersAndCallOnTermination(
|
|
|
- [(Result<Void, Error>) -> Void],
|
|
|
+ [(Result<Void, any Error>) -> Void],
|
|
|
(@Sendable () -> Void)?
|
|
|
)
|
|
|
}
|
|
|
@@ -1269,7 +1269,7 @@ extension BufferedStream {
|
|
|
case callOnTermination((@Sendable () -> Void)?)
|
|
|
/// Indicates that all producers should be failed and `onTermination` should be called.
|
|
|
case failProducersAndCallOnTermination(
|
|
|
- [(Result<Void, Error>) -> Void],
|
|
|
+ [(Result<Void, any Error>) -> Void],
|
|
|
(@Sendable () -> Void)?
|
|
|
)
|
|
|
}
|
|
|
@@ -1331,12 +1331,12 @@ extension BufferedStream {
|
|
|
case callOnTermination((() -> Void)?)
|
|
|
/// Indicates that all producers should be failed and `onTermination` should be called.
|
|
|
case failProducersAndCallOnTermination(
|
|
|
- CheckedContinuation<Element?, Error>?,
|
|
|
- [(Result<Void, Error>) -> Void],
|
|
|
+ CheckedContinuation<Element?, any Error>?,
|
|
|
+ [(Result<Void, any Error>) -> Void],
|
|
|
(@Sendable () -> Void)?
|
|
|
)
|
|
|
/// Indicates that all producers should be failed.
|
|
|
- case failProducers([(Result<Void, Error>) -> Void])
|
|
|
+ case failProducers([(Result<Void, any Error>) -> Void])
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
@@ -1395,12 +1395,12 @@ extension BufferedStream {
|
|
|
)
|
|
|
/// Indicates that the consumer should be resumed and the producer should be notified to produce more.
|
|
|
case resumeConsumerAndReturnProduceMore(
|
|
|
- continuation: CheckedContinuation<Element?, Error>,
|
|
|
+ continuation: CheckedContinuation<Element?, any Error>,
|
|
|
element: Element
|
|
|
)
|
|
|
/// Indicates that the consumer should be resumed and the producer should be suspended.
|
|
|
case resumeConsumerAndReturnEnqueue(
|
|
|
- continuation: CheckedContinuation<Element?, Error>,
|
|
|
+ continuation: CheckedContinuation<Element?, any Error>,
|
|
|
element: Element,
|
|
|
callbackToken: Source.WriteResult.CallbackToken
|
|
|
)
|
|
|
@@ -1410,7 +1410,7 @@ extension BufferedStream {
|
|
|
@inlinable
|
|
|
init(
|
|
|
callbackToken: Source.WriteResult.CallbackToken?,
|
|
|
- continuationAndElement: (CheckedContinuation<Element?, Error>, Element)? = nil
|
|
|
+ continuationAndElement: (CheckedContinuation<Element?, any Error>, Element)? = nil
|
|
|
) {
|
|
|
switch (callbackToken, continuationAndElement) {
|
|
|
case (.none, .none):
|
|
|
@@ -1510,15 +1510,15 @@ extension BufferedStream {
|
|
|
@usableFromInline
|
|
|
enum EnqueueProducerAction {
|
|
|
/// Indicates that the producer should be notified to produce more.
|
|
|
- case resumeProducer((Result<Void, Error>) -> Void)
|
|
|
+ case resumeProducer((Result<Void, any Error>) -> Void)
|
|
|
/// Indicates that the producer should be notified about an error.
|
|
|
- case resumeProducerWithError((Result<Void, Error>) -> Void, Error)
|
|
|
+ case resumeProducerWithError((Result<Void, any Error>) -> Void, any Error)
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
mutating func enqueueProducer(
|
|
|
callbackToken: Source.WriteResult.CallbackToken,
|
|
|
- onProduceMore: @Sendable @escaping (Result<Void, Error>) -> Void
|
|
|
+ onProduceMore: @Sendable @escaping (Result<Void, any Error>) -> Void
|
|
|
) -> EnqueueProducerAction? {
|
|
|
switch self._state {
|
|
|
case .initial:
|
|
|
@@ -1560,7 +1560,7 @@ extension BufferedStream {
|
|
|
@usableFromInline
|
|
|
enum CancelProducerAction {
|
|
|
/// Indicates that the producer should be notified about cancellation.
|
|
|
- case resumeProducerWithCancellationError((Result<Void, Error>) -> Void)
|
|
|
+ case resumeProducerWithCancellationError((Result<Void, any Error>) -> Void)
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
@@ -1610,18 +1610,18 @@ extension BufferedStream {
|
|
|
/// Indicates that the consumer should be resumed with the failure, the producers
|
|
|
/// should be resumed with an error and `onTermination` should be called.
|
|
|
case resumeConsumerAndCallOnTermination(
|
|
|
- consumerContinuation: CheckedContinuation<Element?, Error>,
|
|
|
- failure: Error?,
|
|
|
+ consumerContinuation: CheckedContinuation<Element?, any Error>,
|
|
|
+ failure: (any Error)?,
|
|
|
onTermination: (() -> Void)?
|
|
|
)
|
|
|
/// Indicates that the producers should be resumed with an error.
|
|
|
case resumeProducers(
|
|
|
- producerContinuations: [(Result<Void, Error>) -> Void]
|
|
|
+ producerContinuations: [(Result<Void, any Error>) -> Void]
|
|
|
)
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
- mutating func finish(_ failure: Error?) -> FinishAction? {
|
|
|
+ mutating func finish(_ failure: (any Error)?) -> FinishAction? {
|
|
|
switch self._state {
|
|
|
case .initial(let initial):
|
|
|
// Nothing was yielded nor did anybody call next
|
|
|
@@ -1685,9 +1685,9 @@ extension BufferedStream {
|
|
|
/// Indicates that the element should be returned to the caller.
|
|
|
case returnElement(Element)
|
|
|
/// Indicates that the element should be returned to the caller and that all producers should be called.
|
|
|
- case returnElementAndResumeProducers(Element, [(Result<Void, Error>) -> Void])
|
|
|
+ case returnElementAndResumeProducers(Element, [(Result<Void, any Error>) -> Void])
|
|
|
/// Indicates that the `Error` should be returned to the caller and that `onTermination` should be called.
|
|
|
- case returnErrorAndCallOnTermination(Error?, (() -> Void)?)
|
|
|
+ case returnErrorAndCallOnTermination((any Error)?, (() -> Void)?)
|
|
|
/// Indicates that the `nil` should be returned to the caller.
|
|
|
case returnNil
|
|
|
/// Indicates that the `Task` of the caller should be suspended.
|
|
|
@@ -1779,26 +1779,26 @@ extension BufferedStream {
|
|
|
@usableFromInline
|
|
|
enum SuspendNextAction {
|
|
|
/// Indicates that the consumer should be resumed.
|
|
|
- case resumeConsumerWithElement(CheckedContinuation<Element?, Error>, Element)
|
|
|
+ case resumeConsumerWithElement(CheckedContinuation<Element?, any Error>, Element)
|
|
|
/// Indicates that the consumer and all producers should be resumed.
|
|
|
case resumeConsumerWithElementAndProducers(
|
|
|
- CheckedContinuation<Element?, Error>,
|
|
|
+ CheckedContinuation<Element?, any Error>,
|
|
|
Element,
|
|
|
- [(Result<Void, Error>) -> Void]
|
|
|
+ [(Result<Void, any Error>) -> Void]
|
|
|
)
|
|
|
/// Indicates that the consumer should be resumed with the failure and that `onTermination` should be called.
|
|
|
case resumeConsumerWithErrorAndCallOnTermination(
|
|
|
- CheckedContinuation<Element?, Error>,
|
|
|
- Error?,
|
|
|
+ CheckedContinuation<Element?, any Error>,
|
|
|
+ (any Error)?,
|
|
|
(() -> Void)?
|
|
|
)
|
|
|
/// Indicates that the consumer should be resumed with `nil`.
|
|
|
- case resumeConsumerWithNil(CheckedContinuation<Element?, Error>)
|
|
|
+ case resumeConsumerWithNil(CheckedContinuation<Element?, any Error>)
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
mutating func suspendNext(
|
|
|
- continuation: CheckedContinuation<Element?, Error>
|
|
|
+ continuation: CheckedContinuation<Element?, any Error>
|
|
|
) -> SuspendNextAction? {
|
|
|
switch self._state {
|
|
|
case .initial:
|
|
|
@@ -1879,11 +1879,11 @@ extension BufferedStream {
|
|
|
enum CancelNextAction {
|
|
|
/// Indicates that the continuation should be resumed with a cancellation error, the producers should be finished and call onTermination.
|
|
|
case resumeConsumerWithCancellationErrorAndCallOnTermination(
|
|
|
- CheckedContinuation<Element?, Error>,
|
|
|
+ CheckedContinuation<Element?, any Error>,
|
|
|
(() -> Void)?
|
|
|
)
|
|
|
/// Indicates that the producers should be finished and call onTermination.
|
|
|
- case failProducersAndCallOnTermination([(Result<Void, Error>) -> Void], (() -> Void)?)
|
|
|
+ case failProducersAndCallOnTermination([(Result<Void, any Error>) -> Void], (() -> Void)?)
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
@@ -1930,7 +1930,7 @@ extension BufferedStream.Source: ClosableRPCWriterProtocol {
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
- func finish(throwing error: Error) {
|
|
|
- self.finish(throwing: error as Error?)
|
|
|
+ func finish(throwing error: any Error) {
|
|
|
+ self.finish(throwing: error as (any Error)?)
|
|
|
}
|
|
|
}
|