|
@@ -15,6 +15,7 @@
|
|
|
*/
|
|
*/
|
|
|
|
|
|
|
|
public import DequeModule // should be @usableFromInline
|
|
public import DequeModule // should be @usableFromInline
|
|
|
|
|
+public import Synchronization // should be @usableFromInline
|
|
|
|
|
|
|
|
/// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
|
|
/// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
|
|
|
///
|
|
///
|
|
@@ -156,15 +157,15 @@ enum BroadcastAsyncSequenceError: Error {
|
|
|
@usableFromInline
|
|
@usableFromInline
|
|
|
final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
@usableFromInline
|
|
@usableFromInline
|
|
|
- let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
|
|
|
|
|
|
|
+ let _state: Mutex<_BroadcastSequenceStateMachine<Element>>
|
|
|
|
|
|
|
|
@inlinable
|
|
@inlinable
|
|
|
init(bufferSize: Int) {
|
|
init(bufferSize: Int) {
|
|
|
- self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
|
|
|
|
|
|
|
+ self._state = Mutex(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
deinit {
|
|
deinit {
|
|
|
- let onDrop = self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ let onDrop = self._state.withLock { state in
|
|
|
state.dropResources()
|
|
state.dropResources()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -184,7 +185,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
/// - Parameter element: The element to write.
|
|
/// - Parameter element: The element to write.
|
|
|
@inlinable
|
|
@inlinable
|
|
|
func yield(_ element: Element) async throws {
|
|
func yield(_ element: Element) async throws {
|
|
|
- let onYield = self._state.withLockedValue { state in state.yield(element) }
|
|
|
|
|
|
|
+ let onYield = self._state.withLock { state in state.yield(element) }
|
|
|
|
|
|
|
|
switch onYield {
|
|
switch onYield {
|
|
|
case .none:
|
|
case .none:
|
|
@@ -196,7 +197,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
case .suspend(let token):
|
|
case .suspend(let token):
|
|
|
try await withTaskCancellationHandler {
|
|
try await withTaskCancellationHandler {
|
|
|
try await withCheckedThrowingContinuation { continuation in
|
|
try await withCheckedThrowingContinuation { continuation in
|
|
|
- let onProduceMore = self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ let onProduceMore = self._state.withLock { state in
|
|
|
state.waitToProduceMore(continuation: continuation, token: token)
|
|
state.waitToProduceMore(continuation: continuation, token: token)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -208,7 +209,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} onCancel: {
|
|
} onCancel: {
|
|
|
- let onCancel = self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ let onCancel = self._state.withLock { state in
|
|
|
state.cancelProducer(withToken: token)
|
|
state.cancelProducer(withToken: token)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -230,7 +231,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
/// - Parameter result: Whether the stream is finishing cleanly or because of an error.
|
|
/// - Parameter result: Whether the stream is finishing cleanly or because of an error.
|
|
|
@inlinable
|
|
@inlinable
|
|
|
func finish(_ result: Result<Void, any Error>) {
|
|
func finish(_ result: Result<Void, any Error>) {
|
|
|
- let action = self._state.withLockedValue { state in state.finish(result: result) }
|
|
|
|
|
|
|
+ let action = self._state.withLock { state in state.finish(result: result) }
|
|
|
switch action {
|
|
switch action {
|
|
|
case .none:
|
|
case .none:
|
|
|
()
|
|
()
|
|
@@ -247,7 +248,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
/// - Returns: Returns a unique subscription ID.
|
|
/// - Returns: Returns a unique subscription ID.
|
|
|
@inlinable
|
|
@inlinable
|
|
|
func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
|
|
func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
|
|
|
- self._state.withLockedValue { $0.subscribe() }
|
|
|
|
|
|
|
+ self._state.withLock { $0.subscribe() }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Returns the next element for the given subscriber, if it is available.
|
|
/// Returns the next element for the given subscriber, if it is available.
|
|
@@ -259,35 +260,32 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
|
) async throws -> Element? {
|
|
) async throws -> Element? {
|
|
|
return try await withTaskCancellationHandler {
|
|
return try await withTaskCancellationHandler {
|
|
|
- self._state.unsafe.lock()
|
|
|
|
|
- let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
|
|
|
|
|
|
|
+ let onNext = self._state.withLock {
|
|
|
$0.nextElement(forSubscriber: id)
|
|
$0.nextElement(forSubscriber: id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
switch onNext {
|
|
switch onNext {
|
|
|
case .return(let returnAndProduceMore):
|
|
case .return(let returnAndProduceMore):
|
|
|
- self._state.unsafe.unlock()
|
|
|
|
|
returnAndProduceMore.producers.resume()
|
|
returnAndProduceMore.producers.resume()
|
|
|
return try returnAndProduceMore.nextResult.get()
|
|
return try returnAndProduceMore.nextResult.get()
|
|
|
|
|
|
|
|
case .suspend:
|
|
case .suspend:
|
|
|
return try await withCheckedThrowingContinuation { continuation in
|
|
return try await withCheckedThrowingContinuation { continuation in
|
|
|
- let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
|
|
|
|
|
|
|
+ let onSetContinuation = self._state.withLock { state in
|
|
|
state.setContinuation(continuation, forSubscription: id)
|
|
state.setContinuation(continuation, forSubscription: id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- self._state.unsafe.unlock()
|
|
|
|
|
-
|
|
|
|
|
switch onSetContinuation {
|
|
switch onSetContinuation {
|
|
|
- case .resume(let continuation, let result):
|
|
|
|
|
|
|
+ case .resume(let continuation, let result, let producerContinuations):
|
|
|
continuation.resume(with: result)
|
|
continuation.resume(with: result)
|
|
|
|
|
+ producerContinuations?.resume()
|
|
|
case .none:
|
|
case .none:
|
|
|
()
|
|
()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} onCancel: {
|
|
} onCancel: {
|
|
|
- let onCancel = self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ let onCancel = self._state.withLock { state in
|
|
|
state.cancelSubscription(withID: id)
|
|
state.cancelSubscription(withID: id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -304,7 +302,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
/// elements.
|
|
/// elements.
|
|
|
@inlinable
|
|
@inlinable
|
|
|
var isKnownSafeForNextSubscriber: Bool {
|
|
var isKnownSafeForNextSubscriber: Bool {
|
|
|
- self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ self._state.withLock { state in
|
|
|
state.nextSubscriptionIsValid
|
|
state.nextSubscriptionIsValid
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -312,7 +310,7 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
|
|
|
/// Invalidates all active subscriptions.
|
|
/// Invalidates all active subscriptions.
|
|
|
@inlinable
|
|
@inlinable
|
|
|
func invalidateAllSubscriptions() {
|
|
func invalidateAllSubscriptions() {
|
|
|
- let action = self._state.withLockedValue { state in
|
|
|
|
|
|
|
+ let action = self._state.withLock { state in
|
|
|
state.invalidateAllSubscriptions()
|
|
state.invalidateAllSubscriptions()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -467,10 +465,17 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
|
|
|
_ continuation: ConsumerContinuation,
|
|
_ continuation: ConsumerContinuation,
|
|
|
forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
|
) -> OnSetContinuation {
|
|
) -> OnSetContinuation {
|
|
|
- if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
|
|
|
|
|
- return .none
|
|
|
|
|
- } else {
|
|
|
|
|
- return .resume(continuation, .failure(CancellationError()))
|
|
|
|
|
|
|
+ // 'next(id)' must be checked first: an element might've been provided between the lock
|
|
|
|
|
+ // being dropped and a continuation being created and the lock being acquired again.
|
|
|
|
|
+ switch self.next(id) {
|
|
|
|
|
+ case .return(let resultAndProducers):
|
|
|
|
|
+ return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
|
|
|
|
|
+ case .suspend:
|
|
|
|
|
+ if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
|
|
|
|
|
+ return .none
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return .resume(continuation, .failure(CancellationError()), nil)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -697,10 +702,17 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
|
|
|
_ continuation: ConsumerContinuation,
|
|
_ continuation: ConsumerContinuation,
|
|
|
forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
|
|
|
) -> OnSetContinuation {
|
|
) -> OnSetContinuation {
|
|
|
- if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
|
|
|
|
|
- return .none
|
|
|
|
|
- } else {
|
|
|
|
|
- return .resume(continuation, .failure(CancellationError()))
|
|
|
|
|
|
|
+ // 'next(id)' must be checked first: an element might've been provided between the lock
|
|
|
|
|
+ // being dropped and a continuation being created and the lock being acquired again.
|
|
|
|
|
+ switch self.next(id) {
|
|
|
|
|
+ case .return(let resultAndProducers):
|
|
|
|
|
+ return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
|
|
|
|
|
+ case .suspend:
|
|
|
|
|
+ if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
|
|
|
|
|
+ return .none
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return .resume(continuation, .failure(CancellationError()), nil)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1149,7 +1161,7 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
|
|
|
@usableFromInline
|
|
@usableFromInline
|
|
|
enum OnSetContinuation {
|
|
enum OnSetContinuation {
|
|
|
case none
|
|
case none
|
|
|
- case resume(ConsumerContinuation, Result<Element?, any Error>)
|
|
|
|
|
|
|
+ case resume(ConsumerContinuation, Result<Element?, any Error>, ProducerContinuations?)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@inlinable
|
|
@inlinable
|
|
@@ -1175,7 +1187,7 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
|
|
|
self._state = .streaming(state)
|
|
self._state = .streaming(state)
|
|
|
|
|
|
|
|
case .finished(let state):
|
|
case .finished(let state):
|
|
|
- onSetContinuation = .resume(continuation, state.result.map { _ in nil })
|
|
|
|
|
|
|
+ onSetContinuation = .resume(continuation, state.result.map { _ in nil }, nil)
|
|
|
|
|
|
|
|
case ._modifying:
|
|
case ._modifying:
|
|
|
// All values must have been produced, nothing to wait for.
|
|
// All values must have been produced, nothing to wait for.
|