| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799 |
- /*
- * Copyright 2023, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- public import DequeModule // should be @usableFromInline
- /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
- ///
- /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
- /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
- ///
- /// In order to achieve this it maintains on an internal buffer of elements which is limited in
- /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
- /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
- /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
- /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
- /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
- ///
- /// The expectation is that the number of subscribers will be low; for retries there will be at most
- /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- @usableFromInline
- struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
- @usableFromInline
- let _storage: _BroadcastSequenceStorage<Element>
- @inlinable
- init(_storage: _BroadcastSequenceStorage<Element>) {
- self._storage = _storage
- }
- /// Make a new stream and continuation.
- ///
- /// - Parameters:
- /// - elementType: The type of element this sequence produces.
- /// - bufferSize: The number of elements this sequence may store.
- /// - Returns: A stream and continuation.
- @inlinable
- static func makeStream(
- of elementType: Element.Type = Element.self,
- bufferSize: Int
- ) -> (stream: Self, continuation: Self.Source) {
- let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
- let stream = Self(_storage: storage)
- let continuation = Self.Source(_storage: storage)
- return (stream, continuation)
- }
- @inlinable
- func makeAsyncIterator() -> AsyncIterator {
- let id = self._storage.subscribe()
- return AsyncIterator(_storage: _storage, id: id)
- }
- /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
- /// consume elements.
- ///
- /// This function can return `false` if there are active subscribers or the internal buffer no
- /// longer contains the first element in the sequence.
- @inlinable
- var isKnownSafeForNextSubscriber: Bool {
- self._storage.isKnownSafeForNextSubscriber
- }
- /// Invalidates all active subscribers.
- ///
- /// Any active subscriber will receive an error the next time they attempt to consume an element.
- @inlinable
- func invalidateAllSubscriptions() {
- self._storage.invalidateAllSubscriptions()
- }
- }
- // MARK: - AsyncIterator
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension BroadcastAsyncSequence {
- @usableFromInline
- struct AsyncIterator: AsyncIteratorProtocol {
- @usableFromInline
- let _storage: _BroadcastSequenceStorage<Element>
- @usableFromInline
- let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- @inlinable
- init(
- _storage: _BroadcastSequenceStorage<Element>,
- id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) {
- self._storage = _storage
- self._subscriberID = id
- }
- @inlinable
- mutating func next() async throws -> Element? {
- try await self._storage.nextElement(forSubscriber: self._subscriberID)
- }
- }
- }
- // MARK: - Continuation
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension BroadcastAsyncSequence {
- @usableFromInline
- struct Source: Sendable {
- @usableFromInline
- let _storage: _BroadcastSequenceStorage<Element>
- @usableFromInline
- init(_storage: _BroadcastSequenceStorage<Element>) {
- self._storage = _storage
- }
- @inlinable
- func write(_ element: Element) async throws {
- try await self._storage.yield(element)
- }
- @inlinable
- func finish(with result: Result<Void, any Error>) {
- self._storage.finish(result)
- }
- @inlinable
- func finish() {
- self.finish(with: .success(()))
- }
- @inlinable
- func finish(throwing error: any Error) {
- self.finish(with: .failure(error))
- }
- }
- }
- @usableFromInline
- enum BroadcastAsyncSequenceError: Error {
- /// The consumer was too slow.
- case consumingTooSlow
- /// The producer has already finished.
- case productionAlreadyFinished
- }
- // MARK: - Storage
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- @usableFromInline
- final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
- @usableFromInline
- let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
- @inlinable
- init(bufferSize: Int) {
- self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
- }
- deinit {
- let onDrop = self._state.withLockedValue { state in
- state.dropResources()
- }
- switch onDrop {
- case .none:
- ()
- case .resume(let consumers, let producers):
- consumers.resume()
- producers.resume()
- }
- }
- // MARK - Producer
- /// Yield a single element to the stream. Suspends if the stream's buffer is full.
- ///
- /// - Parameter element: The element to write.
- @inlinable
- func yield(_ element: Element) async throws {
- let onYield = self._state.withLockedValue { state in state.yield(element) }
- switch onYield {
- case .none:
- ()
- case .resume(let continuations):
- continuations.resume()
- case .suspend(let token):
- try await withTaskCancellationHandler {
- try await withCheckedThrowingContinuation { continuation in
- let onProduceMore = self._state.withLockedValue { state in
- state.waitToProduceMore(continuation: continuation, token: token)
- }
- switch onProduceMore {
- case .resume(let continuation, let result):
- continuation.resume(with: result)
- case .none:
- ()
- }
- }
- } onCancel: {
- let onCancel = self._state.withLockedValue { state in
- state.cancelProducer(withToken: token)
- }
- switch onCancel {
- case .resume(let continuation, let result):
- continuation.resume(with: result)
- case .none:
- ()
- }
- }
- case .throwAlreadyFinished:
- throw BroadcastAsyncSequenceError.productionAlreadyFinished
- }
- }
- /// Indicate that no more values will be produced.
- ///
- /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
- @inlinable
- func finish(_ result: Result<Void, any Error>) {
- let action = self._state.withLockedValue { state in state.finish(result: result) }
- switch action {
- case .none:
- ()
- case .resume(let subscribers, let producers):
- subscribers.resume()
- producers.resume()
- }
- }
- // MARK: - Consumer
- /// Create a subscription to the stream.
- ///
- /// - Returns: Returns a unique subscription ID.
- @inlinable
- func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
- self._state.withLockedValue { $0.subscribe() }
- }
- /// Returns the next element for the given subscriber, if it is available.
- ///
- /// - Parameter id: The ID of the subscriber requesting the element.
- /// - Returns: The next element or `nil` if the stream has been terminated.
- @inlinable
- func nextElement(
- forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) async throws -> Element? {
- return try await withTaskCancellationHandler {
- self._state.unsafe.lock()
- let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
- $0.nextElement(forSubscriber: id)
- }
- switch onNext {
- case .return(let returnAndProduceMore):
- self._state.unsafe.unlock()
- returnAndProduceMore.producers.resume()
- return try returnAndProduceMore.nextResult.get()
- case .suspend:
- return try await withCheckedThrowingContinuation { continuation in
- let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
- state.setContinuation(continuation, forSubscription: id)
- }
- self._state.unsafe.unlock()
- switch onSetContinuation {
- case .resume(let continuation, let result):
- continuation.resume(with: result)
- case .none:
- ()
- }
- }
- }
- } onCancel: {
- let onCancel = self._state.withLockedValue { state in
- state.cancelSubscription(withID: id)
- }
- switch onCancel {
- case .resume(let continuation, let result):
- continuation.resume(with: result)
- case .none:
- ()
- }
- }
- }
- /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
- /// elements.
- @inlinable
- var isKnownSafeForNextSubscriber: Bool {
- self._state.withLockedValue { state in
- state.nextSubscriptionIsValid
- }
- }
- /// Invalidates all active subscriptions.
- @inlinable
- func invalidateAllSubscriptions() {
- let action = self._state.withLockedValue { state in
- state.invalidateAllSubscriptions()
- }
- switch action {
- case .resume(let continuations):
- continuations.resume()
- case .none:
- ()
- }
- }
- }
- // MARK: - State machine
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- @usableFromInline
- struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
- @usableFromInline
- typealias ConsumerContinuation = CheckedContinuation<Element?, any Error>
- @usableFromInline
- typealias ProducerContinuation = CheckedContinuation<Void, any Error>
- @usableFromInline
- struct ConsumerContinuations {
- @usableFromInline
- var continuations: _OneOrMany<ConsumerContinuation>
- @usableFromInline
- var result: Result<Element?, any Error>
- @inlinable
- init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, any Error>) {
- self.continuations = continuations
- self.result = result
- }
- @inlinable
- func resume() {
- switch self.continuations {
- case .one(let continuation):
- continuation.resume(with: self.result)
- case .many(let continuations):
- for continuation in continuations {
- continuation.resume(with: self.result)
- }
- }
- }
- }
- @usableFromInline
- struct ProducerContinuations {
- @usableFromInline
- var continuations: [ProducerContinuation]
- @usableFromInline
- var result: Result<Void, any Error>
- @inlinable
- init(continuations: [ProducerContinuation], result: Result<Void, any Error>) {
- self.continuations = continuations
- self.result = result
- }
- @inlinable
- func resume() {
- for continuation in self.continuations {
- continuation.resume(with: self.result)
- }
- }
- }
- @usableFromInline
- enum State: Sendable {
- /// No subscribers and no elements have been produced.
- case initial(Initial)
- /// Subscribers exist but no elements have been produced.
- case subscribed(Subscribed)
- /// Elements have been produced, there may or may not be subscribers.
- case streaming(Streaming)
- /// No more elements will be produced. There may or may not been subscribers.
- case finished(Finished)
- /// Temporary state to avoid CoWs.
- case _modifying
- @inlinable
- init(bufferSize: Int) {
- self = .initial(Initial(bufferSize: bufferSize))
- }
- @usableFromInline
- struct Initial: Sendable {
- @usableFromInline
- let bufferSize: Int
- @inlinable
- init(bufferSize: Int) {
- self.bufferSize = bufferSize
- }
- }
- @usableFromInline
- struct Subscribed: Sendable {
- /// Active subscriptions.
- @usableFromInline
- var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
- /// Subscriptions to fail and remove when they next request an element.
- @usableFromInline
- var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
- /// The maximum size of the element buffer.
- @usableFromInline
- let bufferSize: Int
- @inlinable
- init(from state: Initial) {
- self.subscriptions = Subscriptions()
- self.subscriptionsToDrop = []
- self.bufferSize = state.bufferSize
- }
- @inlinable
- mutating func finish(result: Result<Void, any Error>) -> OnFinish {
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- return .resume(
- .init(continuations: continuations, result: result.map { nil }),
- .init(continuations: [], result: .success(()))
- )
- }
- @inlinable
- mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
- // Not streaming, so suspend or remove if the subscription should be dropped.
- guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
- return .suspend
- }
- self.subscriptionsToDrop.remove(at: index)
- return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
- }
- @inlinable
- mutating func cancel(
- _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnCancelSubscription {
- let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
- if let continuation = continuation {
- return .resume(continuation, .failure(CancellationError()))
- } else {
- return .none
- }
- }
- @inlinable
- mutating func setContinuation(
- _ continuation: ConsumerContinuation,
- forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnSetContinuation {
- if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
- return .none
- } else {
- return .resume(continuation, .failure(CancellationError()))
- }
- }
- @inlinable
- mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
- self.subscriptions.subscribe()
- }
- @inlinable
- mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
- // Remove subscriptions with continuations, they need to be failed.
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
- )
- // Remove any others to be failed when they next call 'next'.
- let ids = self.subscriptions.removeAllSubscribers()
- self.subscriptionsToDrop.append(contentsOf: ids)
- return .resume(consumerContinuations)
- }
- @inlinable
- mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(error)
- )
- let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
- return .resume(consumerContinuations, producerContinuations)
- }
- }
- @usableFromInline
- struct Streaming: Sendable {
- /// A deque of elements tagged with IDs.
- @usableFromInline
- var elements: Elements
- /// The maximum size of the element buffer.
- @usableFromInline
- let bufferSize: Int
- // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
- /// Producers which have been suspended.
- @usableFromInline
- var producers: [(ProducerContinuation, Int)]
- /// The IDs of producers which have been cancelled.
- @usableFromInline
- var cancelledProducers: [Int]
- /// The next token for a producer.
- @usableFromInline
- var producerToken: Int
- /// Active subscriptions.
- @usableFromInline
- var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
- /// Subscriptions to fail and remove when they next request an element.
- @usableFromInline
- var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
- @inlinable
- init(from state: Initial) {
- self.elements = Elements()
- self.producers = []
- self.producerToken = 0
- self.cancelledProducers = []
- self.subscriptions = Subscriptions()
- self.subscriptionsToDrop = []
- self.bufferSize = state.bufferSize
- }
- @inlinable
- init(from state: Subscribed) {
- self.elements = Elements()
- self.producers = []
- self.producerToken = 0
- self.cancelledProducers = []
- self.subscriptions = state.subscriptions
- self.subscriptionsToDrop = state.subscriptionsToDrop
- self.bufferSize = state.bufferSize
- }
- @inlinable
- mutating func append(_ element: Element) -> OnYield {
- let onYield: OnYield
- self.elements.append(element)
- if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
- // If the buffer is too large then:
- // - if all subscribers are equally slow suspend the producer
- // - if some subscribers are slow then remove them and the oldest value
- // - if no subscribers are slow then remove the oldest value
- let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
- switch slowConsumers.count {
- case 0:
- if self.subscriptions.isEmpty {
- // No consumers.
- let token = self.producerToken
- self.producerToken += 1
- onYield = .suspend(token)
- } else {
- // No consumers are slow, some subscribers exist, a subset of which are waiting
- // for a value. Drop the oldest value and resume the fastest consumers.
- self.elements.removeFirst()
- let continuations = self.subscriptions.takeContinuations().map {
- ConsumerContinuations(continuations: $0, result: .success(element))
- }
- if let continuations = continuations {
- onYield = .resume(continuations)
- } else {
- onYield = .none
- }
- }
- case self.subscriptions.count:
- // All consumers are slow; stop the production of new value.
- let token = self.producerToken
- self.producerToken += 1
- onYield = .suspend(token)
- default:
- // Some consumers are slow, but not all. Remove the slow consumers and drop the
- // oldest value.
- self.elements.removeFirst()
- self.subscriptions.removeAllSubscribers(in: slowConsumers)
- self.subscriptionsToDrop.append(contentsOf: slowConsumers)
- onYield = .none
- }
- } else {
- // The buffer isn't full. Take the continuations of subscriptions which have them; they
- // must be waiting for the value we just appended.
- let continuations = self.subscriptions.takeContinuations().map {
- ConsumerContinuations(continuations: $0, result: .success(element))
- }
- if let continuations = continuations {
- onYield = .resume(continuations)
- } else {
- onYield = .none
- }
- }
- return onYield
- }
- @inlinable
- mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
- let onNext: OnNext
- // 1. Lookup the subscriber by ID to get their next offset
- // 2. If the element exists, update the element pointer and return the element
- // 3. Else if the ID is in the future, wait
- // 4. Else the ID is in the past, fail and remove the subscription.
- // Lookup the subscriber with the given ID.
- let onNextForSubscription = self.subscriptions.withMutableElementID(
- forSubscriber: id
- ) { elementID -> (OnNext, Bool) in
- let onNext: OnNext
- let removeSubscription: Bool
- // Subscriber exists; do we have the element it requires next?
- switch self.elements.lookupElement(withID: elementID) {
- case .found(let element):
- // Element exists in the buffer. Advance our element ID.
- elementID.formNext()
- onNext = .return(.init(nextResult: .success(element)))
- removeSubscription = false
- case .maybeAvailableLater:
- // Element may exist in the future.
- onNext = .suspend
- removeSubscription = false
- case .noLongerAvailable:
- // Element existed in the past but was dropped from the buffer.
- onNext = .return(
- .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
- )
- removeSubscription = true
- }
- return (onNext, removeSubscription)
- }
- switch onNextForSubscription {
- case .return(var resultAndResume):
- // The producer only suspends when all consumers are equally slow or there are no
- // consumers at all. The latter can't be true: this function can only be called by a
- // consumer. The former can't be true anymore because consumption isn't concurrent
- // so this consumer must be faster than the others so let the producer resume.
- //
- // Note that this doesn't mean that all other consumers will be dropped: they can continue
- // to produce until the producer provides more values.
- resultAndResume.producers = ProducerContinuations(
- continuations: self.producers.map { $0.0 },
- result: .success(())
- )
- self.producers.removeAll()
- onNext = .return(resultAndResume)
- case .suspend:
- onNext = .suspend
- case .none:
- // No subscription found, must have been dropped or already finished.
- if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
- self.subscriptionsToDrop.remove(at: index)
- onNext = .return(
- .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
- )
- } else {
- // Unknown subscriber, i.e. already finished.
- onNext = .return(.init(nextResult: .success(nil)))
- }
- }
- return onNext
- }
- @inlinable
- mutating func setContinuation(
- _ continuation: ConsumerContinuation,
- forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnSetContinuation {
- if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
- return .none
- } else {
- return .resume(continuation, .failure(CancellationError()))
- }
- }
- @inlinable
- mutating func cancel(
- _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnCancelSubscription {
- let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
- if let continuation = continuation {
- return .resume(continuation, .failure(CancellationError()))
- } else {
- return .none
- }
- }
- @inlinable
- mutating func waitToProduceMore(
- _ continuation: ProducerContinuation,
- token: Int
- ) -> OnWaitToProduceMore {
- let onWaitToProduceMore: OnWaitToProduceMore
- if self.elements.count < self.bufferSize {
- // Buffer has free space, no need to suspend.
- onWaitToProduceMore = .resume(continuation, .success(()))
- } else if let index = self.cancelledProducers.firstIndex(of: token) {
- // Producer was cancelled before suspending.
- self.cancelledProducers.remove(at: index)
- onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
- } else {
- // Store the continuation to resume later.
- self.producers.append((continuation, token))
- onWaitToProduceMore = .none
- }
- return onWaitToProduceMore
- }
- @inlinable
- mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
- guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
- self.cancelledProducers.append(token)
- return .none
- }
- let (continuation, _) = self.producers.remove(at: index)
- return .resume(continuation, .failure(CancellationError()))
- }
- @inlinable
- mutating func finish(result: Result<Void, any Error>) -> OnFinish {
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let producers = self.producers.map { $0.0 }
- self.producers.removeAll()
- return .resume(
- .init(continuations: continuations, result: result.map { nil }),
- .init(continuations: producers, result: .success(()))
- )
- }
- @inlinable
- mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
- self.subscriptions.subscribe()
- }
- @inlinable
- mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
- // Remove subscriptions with continuations, they need to be failed.
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
- )
- // Remove any others to be failed when they next call 'next'.
- let ids = self.subscriptions.removeAllSubscribers()
- self.subscriptionsToDrop.append(contentsOf: ids)
- return .resume(consumerContinuations)
- }
- @inlinable
- mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(error)
- )
- let producers = ProducerContinuations(
- continuations: self.producers.map { $0.0 },
- result: .failure(error)
- )
- self.producers.removeAll()
- return .resume(consumerContinuations, producers)
- }
- @inlinable
- func nextSubscriptionIsValid() -> Bool {
- return self.subscriptions.isEmpty && self.elements.lowestID == .initial
- }
- }
- @usableFromInline
- struct Finished: Sendable {
- /// A deque of elements tagged with IDs.
- @usableFromInline
- var elements: Elements
- /// Active subscriptions.
- @usableFromInline
- var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
- /// Subscriptions to fail and remove when they next request an element.
- @usableFromInline
- var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
- /// The terminating result of the sequence.
- @usableFromInline
- let result: Result<Void, any Error>
- @inlinable
- init(from state: Initial, result: Result<Void, any Error>) {
- self.elements = Elements()
- self.subscriptions = Subscriptions()
- self.subscriptionsToDrop = []
- self.result = result
- }
- @inlinable
- init(from state: Subscribed, result: Result<Void, any Error>) {
- self.elements = Elements()
- self.subscriptions = state.subscriptions
- self.subscriptionsToDrop = []
- self.result = result
- }
- @inlinable
- init(from state: Streaming, result: Result<Void, any Error>) {
- self.elements = state.elements
- self.subscriptions = state.subscriptions
- self.subscriptionsToDrop = state.subscriptionsToDrop
- self.result = result
- }
- @inlinable
- mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
- let onNext: OnNext
- let onNextForSubscription = self.subscriptions.withMutableElementID(
- forSubscriber: id
- ) { elementID -> (OnNext, Bool) in
- let onNext: OnNext
- let removeSubscription: Bool
- switch self.elements.lookupElement(withID: elementID) {
- case .found(let element):
- elementID.formNext()
- onNext = .return(.init(nextResult: .success(element)))
- removeSubscription = false
- case .maybeAvailableLater:
- onNext = .return(.init(nextResult: self.result.map { nil }))
- removeSubscription = true
- case .noLongerAvailable:
- onNext = .return(
- .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
- )
- removeSubscription = true
- }
- return (onNext, removeSubscription)
- }
- switch onNextForSubscription {
- case .return(let result):
- onNext = .return(result)
- case .none:
- // No subscriber with the given ID, it was likely dropped previously.
- if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
- self.subscriptionsToDrop.remove(at: index)
- onNext = .return(
- .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
- )
- } else {
- // Unknown subscriber, i.e. already finished.
- onNext = .return(.init(nextResult: .success(nil)))
- }
- case .suspend:
- fatalError("Internal inconsistency")
- }
- return onNext
- }
- @inlinable
- mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
- self.subscriptions.subscribe()
- }
- @inlinable
- mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
- // Remove subscriptions with continuations, they need to be failed.
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
- )
- // Remove any others to be failed when they next call 'next'.
- let ids = self.subscriptions.removeAllSubscribers()
- self.subscriptionsToDrop.append(contentsOf: ids)
- return .resume(consumerContinuations)
- }
- @inlinable
- mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
- let continuations = self.subscriptions.removeSubscribersWithContinuations()
- let consumerContinuations = ConsumerContinuations(
- continuations: continuations,
- result: .failure(error)
- )
- let producers = ProducerContinuations(continuations: [], result: .failure(error))
- return .resume(consumerContinuations, producers)
- }
- @inlinable
- func nextSubscriptionIsValid() -> Bool {
- self.elements.lowestID == .initial
- }
- @inlinable
- mutating func cancel(
- _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnCancelSubscription {
- let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
- if let continuation = continuation {
- return .resume(continuation, .failure(CancellationError()))
- } else {
- return .none
- }
- }
- }
- }
- @usableFromInline
- var _state: State
- @inlinable
- init(bufferSize: Int) {
- self._state = State(bufferSize: bufferSize)
- }
- @inlinable
- var nextSubscriptionIsValid: Bool {
- let isValid: Bool
- switch self._state {
- case .initial:
- isValid = true
- case .subscribed:
- isValid = true
- case .streaming(let state):
- isValid = state.nextSubscriptionIsValid()
- case .finished(let state):
- isValid = state.nextSubscriptionIsValid()
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return isValid
- }
- @usableFromInline
- enum OnInvalidateAllSubscriptions {
- case resume(ConsumerContinuations)
- case none
- }
- @inlinable
- mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
- let onCancel: OnInvalidateAllSubscriptions
- switch self._state {
- case .initial:
- onCancel = .none
- case .subscribed(var state):
- self._state = ._modifying
- onCancel = state.invalidateAllSubscriptions()
- self._state = .subscribed(state)
- case .streaming(var state):
- self._state = ._modifying
- onCancel = state.invalidateAllSubscriptions()
- self._state = .streaming(state)
- case .finished(var state):
- self._state = ._modifying
- onCancel = state.invalidateAllSubscriptions()
- self._state = .finished(state)
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onCancel
- }
- @usableFromInline
- enum OnYield {
- case none
- case suspend(Int)
- case resume(ConsumerContinuations)
- case throwAlreadyFinished
- }
- @inlinable
- mutating func yield(_ element: Element) -> OnYield {
- let onYield: OnYield
- switch self._state {
- case .initial(let state):
- self._state = ._modifying
- // Move to streaming.
- var state = State.Streaming(from: state)
- onYield = state.append(element)
- self._state = .streaming(state)
- case .subscribed(let state):
- self._state = ._modifying
- var state = State.Streaming(from: state)
- onYield = state.append(element)
- self._state = .streaming(state)
- case .streaming(var state):
- self._state = ._modifying
- onYield = state.append(element)
- self._state = .streaming(state)
- case .finished:
- onYield = .throwAlreadyFinished
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onYield
- }
- @usableFromInline
- enum OnFinish {
- case none
- case resume(ConsumerContinuations, ProducerContinuations)
- }
- @inlinable
- mutating func finish(result: Result<Void, any Error>) -> OnFinish {
- let onFinish: OnFinish
- switch self._state {
- case .initial(let state):
- self._state = ._modifying
- let state = State.Finished(from: state, result: result)
- self._state = .finished(state)
- onFinish = .none
- case .subscribed(var state):
- self._state = ._modifying
- onFinish = state.finish(result: result)
- self._state = .finished(State.Finished(from: state, result: result))
- case .streaming(var state):
- self._state = ._modifying
- onFinish = state.finish(result: result)
- self._state = .finished(State.Finished(from: state, result: result))
- case .finished:
- onFinish = .none
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onFinish
- }
- @usableFromInline
- enum OnNext {
- @usableFromInline
- struct ReturnAndResumeProducers {
- @usableFromInline
- var nextResult: Result<Element?, any Error>
- @usableFromInline
- var producers: ProducerContinuations
- @inlinable
- init(
- nextResult: Result<Element?, any Error>,
- producers: [ProducerContinuation] = [],
- producerResult: Result<Void, any Error> = .success(())
- ) {
- self.nextResult = nextResult
- self.producers = ProducerContinuations(continuations: producers, result: producerResult)
- }
- }
- case `return`(ReturnAndResumeProducers)
- case suspend
- }
- @inlinable
- mutating func nextElement(
- forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnNext {
- let onNext: OnNext
- switch self._state {
- case .initial:
- // No subscribers so demand isn't possible.
- fatalError("Internal inconsistency")
- case .subscribed(var state):
- self._state = ._modifying
- onNext = state.next(id)
- self._state = .subscribed(state)
- case .streaming(var state):
- self._state = ._modifying
- onNext = state.next(id)
- self._state = .streaming(state)
- case .finished(var state):
- self._state = ._modifying
- onNext = state.next(id)
- self._state = .finished(state)
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onNext
- }
- @usableFromInline
- enum OnSetContinuation {
- case none
- case resume(ConsumerContinuation, Result<Element?, any Error>)
- }
- @inlinable
- mutating func setContinuation(
- _ continuation: ConsumerContinuation,
- forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnSetContinuation {
- let onSetContinuation: OnSetContinuation
- switch self._state {
- case .initial:
- // No subscribers so demand isn't possible.
- fatalError("Internal inconsistency")
- case .subscribed(var state):
- self._state = ._modifying
- onSetContinuation = state.setContinuation(continuation, forSubscription: id)
- self._state = .subscribed(state)
- case .streaming(var state):
- self._state = ._modifying
- onSetContinuation = state.setContinuation(continuation, forSubscription: id)
- self._state = .streaming(state)
- case .finished(let state):
- onSetContinuation = .resume(continuation, state.result.map { _ in nil })
- case ._modifying:
- // All values must have been produced, nothing to wait for.
- fatalError("Internal inconsistency")
- }
- return onSetContinuation
- }
- @usableFromInline
- enum OnCancelSubscription {
- case none
- case resume(ConsumerContinuation, Result<Element?, any Error>)
- }
- @inlinable
- mutating func cancelSubscription(
- withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- ) -> OnCancelSubscription {
- let onCancel: OnCancelSubscription
- switch self._state {
- case .initial:
- // No subscribers so demand isn't possible.
- fatalError("Internal inconsistency")
- case .subscribed(var state):
- self._state = ._modifying
- onCancel = state.cancel(id)
- self._state = .subscribed(state)
- case .streaming(var state):
- self._state = ._modifying
- onCancel = state.cancel(id)
- self._state = .streaming(state)
- case .finished(var state):
- self._state = ._modifying
- onCancel = state.cancel(id)
- self._state = .finished(state)
- case ._modifying:
- // All values must have been produced, nothing to wait for.
- fatalError("Internal inconsistency")
- }
- return onCancel
- }
- @usableFromInline
- enum OnSubscribe {
- case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
- }
- @inlinable
- mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
- let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
- switch self._state {
- case .initial(let state):
- self._state = ._modifying
- var state = State.Subscribed(from: state)
- id = state.subscribe()
- self._state = .subscribed(state)
- case .subscribed(var state):
- self._state = ._modifying
- id = state.subscribe()
- self._state = .subscribed(state)
- case .streaming(var state):
- self._state = ._modifying
- id = state.subscribe()
- self._state = .streaming(state)
- case .finished(var state):
- self._state = ._modifying
- id = state.subscribe()
- self._state = .finished(state)
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return id
- }
- @usableFromInline
- enum OnWaitToProduceMore {
- case none
- case resume(ProducerContinuation, Result<Void, any Error>)
- }
- @inlinable
- mutating func waitToProduceMore(
- continuation: ProducerContinuation,
- token: Int
- ) -> OnWaitToProduceMore {
- let onWaitToProduceMore: OnWaitToProduceMore
- switch self._state {
- case .initial, .subscribed:
- // Nothing produced yet, so no reason have to wait to produce.
- fatalError("Internal inconsistency")
- case .streaming(var state):
- self._state = ._modifying
- onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
- self._state = .streaming(state)
- case .finished:
- onWaitToProduceMore = .resume(continuation, .success(()))
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onWaitToProduceMore
- }
- @usableFromInline
- typealias OnCancelProducer = OnWaitToProduceMore
- @inlinable
- mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
- let onCancelProducer: OnCancelProducer
- switch self._state {
- case .initial, .subscribed:
- // Nothing produced yet, so no reason have to wait to produce.
- fatalError("Internal inconsistency")
- case .streaming(var state):
- self._state = ._modifying
- onCancelProducer = state.cancelProducer(withToken: token)
- self._state = .streaming(state)
- case .finished:
- // No producers to cancel; do nothing.
- onCancelProducer = .none
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onCancelProducer
- }
- @usableFromInline
- enum OnDropResources {
- case none
- case resume(ConsumerContinuations, ProducerContinuations)
- }
- @inlinable
- mutating func dropResources() -> OnDropResources {
- let error = BroadcastAsyncSequenceError.productionAlreadyFinished
- let onDrop: OnDropResources
- switch self._state {
- case .initial(let state):
- self._state = ._modifying
- onDrop = .none
- self._state = .finished(State.Finished(from: state, result: .failure(error)))
- case .subscribed(var state):
- self._state = ._modifying
- onDrop = state.dropResources(error: error)
- self._state = .finished(State.Finished(from: state, result: .failure(error)))
- case .streaming(var state):
- self._state = ._modifying
- onDrop = state.dropResources(error: error)
- self._state = .finished(State.Finished(from: state, result: .failure(error)))
- case .finished(var state):
- self._state = ._modifying
- onDrop = state.dropResources(error: error)
- self._state = .finished(state)
- case ._modifying:
- fatalError("Internal inconsistency")
- }
- return onDrop
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension _BroadcastSequenceStateMachine {
- /// A collection of elements tagged with an identifier.
- ///
- /// Identifiers are assigned when elements are added to the collection and are monotonically
- /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
- @usableFromInline
- struct Elements: Sendable {
- /// The ID of an element
- @usableFromInline
- struct ID: Hashable, Sendable, Comparable, Strideable {
- @usableFromInline
- private(set) var rawValue: Int
- @usableFromInline
- static var initial: Self {
- ID(id: 0)
- }
- private init(id: Int) {
- self.rawValue = id
- }
- @inlinable
- mutating func formNext() {
- self.rawValue += 1
- }
- @inlinable
- func next() -> Self {
- var copy = self
- copy.formNext()
- return copy
- }
- @inlinable
- func distance(to other: Self) -> Int {
- other.rawValue - self.rawValue
- }
- @inlinable
- func advanced(by n: Int) -> Self {
- var copy = self
- copy.rawValue += n
- return copy
- }
- @inlinable
- static func < (lhs: Self, rhs: Self) -> Bool {
- lhs.rawValue < rhs.rawValue
- }
- }
- @usableFromInline
- struct _IdentifiableElement: Sendable {
- @usableFromInline
- var element: Element
- @usableFromInline
- var id: ID
- @inlinable
- init(element: Element, id: ID) {
- self.element = element
- self.id = id
- }
- }
- @usableFromInline
- var _elements: Deque<_IdentifiableElement>
- @usableFromInline
- var _nextID: ID
- @inlinable
- init() {
- self._nextID = .initial
- self._elements = []
- }
- @inlinable
- mutating func nextElementID() -> ID {
- let id = self._nextID
- self._nextID.formNext()
- return id
- }
- /// The highest ID of the stored elements; `nil` if there are no elements.
- @inlinable
- var highestID: ID? { self._elements.last?.id }
- /// The lowest ID of the stored elements; `nil` if there are no elements.
- @inlinable
- var lowestID: ID? { self._elements.first?.id }
- /// The number of stored elements.
- @inlinable
- var count: Int { self._elements.count }
- /// Whether there are no stored elements.
- @inlinable
- var isEmpty: Bool { self._elements.isEmpty }
- /// Appends an element to the collection.
- @inlinable
- mutating func append(_ element: Element) {
- self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
- }
- /// Removes the first element from the collection.
- @discardableResult
- @inlinable
- mutating func removeFirst() -> Element {
- let removed = self._elements.removeFirst()
- return removed.element
- }
- @usableFromInline
- enum ElementLookup {
- /// The element was found in the collection.
- case found(Element)
- /// The element isn't in the collection, but it could be in the future.
- case maybeAvailableLater
- /// The element was in the collection, but is no longer available.
- case noLongerAvailable
- }
- /// Lookup the element with the given ID.
- ///
- /// - Parameter id: The ID of the element to lookup.
- @inlinable
- mutating func lookupElement(withID id: ID) -> ElementLookup {
- guard let low = self.lowestID, let high = self.highestID else {
- // Must be empty.
- return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
- }
- assert(low <= high)
- let lookup: ElementLookup
- if id < low {
- lookup = .noLongerAvailable
- } else if id > high {
- lookup = .maybeAvailableLater
- } else {
- // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
- // into the deque by looking at the offsets.
- let offset = low.distance(to: id)
- let index = self._elements.startIndex.advanced(by: offset)
- lookup = .found(self._elements[index].element)
- }
- return lookup
- }
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension _BroadcastSequenceStateMachine {
- /// A collection of subscriptions.
- @usableFromInline
- struct Subscriptions: Sendable {
- @usableFromInline
- struct ID: Hashable, Sendable {
- @usableFromInline
- private(set) var rawValue: Int
- @inlinable
- init() {
- self.rawValue = 0
- }
- @inlinable
- mutating func formNext() {
- self.rawValue += 1
- }
- @inlinable
- func next() -> Self {
- var copy = self
- copy.formNext()
- return copy
- }
- }
- @usableFromInline
- struct _Subscriber: Sendable {
- /// The ID of the subscriber.
- @usableFromInline
- var id: ID
- /// The ID of the next element the subscriber will consume.
- @usableFromInline
- var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
- /// A continuation which which will be resumed when the next element becomes available.
- @usableFromInline
- var continuation: ConsumerContinuation?
- @inlinable
- init(
- id: ID,
- nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
- continuation: ConsumerContinuation? = nil
- ) {
- self.id = id
- self.nextElementID = nextElementID
- self.continuation = continuation
- }
- /// Returns and sets the continuation to `nil` if one exists.
- ///
- /// The next element ID is advanced if a contination exists.
- ///
- /// - Returns: The continuation, if one existed.
- @inlinable
- mutating func takeContinuation() -> ConsumerContinuation? {
- guard let continuation = self.continuation else { return nil }
- self.continuation = nil
- self.nextElementID.formNext()
- return continuation
- }
- }
- @usableFromInline
- var _subscribers: [_Subscriber]
- @usableFromInline
- var _nextSubscriberID: ID
- @inlinable
- init() {
- self._subscribers = []
- self._nextSubscriberID = ID()
- }
- /// Returns the number of subscribers.
- @inlinable
- var count: Int { self._subscribers.count }
- /// Returns whether the collection is empty.
- @inlinable
- var isEmpty: Bool { self._subscribers.isEmpty }
- /// Adds a new subscriber and returns its unique ID.
- ///
- /// - Returns: The ID of the new subscriber.
- @inlinable
- mutating func subscribe() -> ID {
- let id = self._nextSubscriberID
- self._nextSubscriberID.formNext()
- self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
- return id
- }
- /// Provides mutable access to the element ID of the given subscriber, if it exists.
- ///
- /// - Parameters:
- /// - id: The ID of the subscriber.
- /// - body: A closure to mutate the element ID of the subscriber which returns the result and
- /// a boolean indicating whether the subscriber should be removed.
- /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
- /// given ID.
- @inlinable
- mutating func withMutableElementID<R>(
- forSubscriber id: ID,
- _ body: (
- inout _BroadcastSequenceStateMachine<Element>.Elements.ID
- ) -> (result: R, removeSubscription: Bool)
- ) -> R? {
- guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
- let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
- if removeSubscription {
- self._subscribers.remove(at: index)
- }
- return result
- }
- /// Sets the continuation for the subscription with the given ID.
- /// - Parameters:
- /// - continuation: The continuation to set.
- /// - id: The ID of the subscriber.
- /// - Returns: A boolean indicating whether the continuation was set or not.
- @inlinable
- mutating func setContinuation(
- _ continuation: ConsumerContinuation,
- forSubscriber id: ID
- ) -> Bool {
- guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
- return false
- }
- assert(self._subscribers[index].continuation == nil)
- self._subscribers[index].continuation = continuation
- return true
- }
- /// Returns an array of subscriber IDs which are whose next element ID is `id`.
- @inlinable
- func subscribers(
- withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
- ) -> [ID] {
- return self._subscribers.filter {
- $0.nextElementID == id
- }.map {
- $0.id
- }
- }
- /// Removes the subscriber with the given ID.
- /// - Parameter id: The ID of the subscriber to remove.
- /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
- /// associated with the subscriber.
- @inlinable
- mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
- guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
- return (false, nil)
- }
- let continuation = self._subscribers[index].continuation
- self._subscribers.remove(at: index)
- return (true, continuation)
- }
- /// Remove all subscribers in the given array of IDs.
- @inlinable
- mutating func removeAllSubscribers(in idsToRemove: [ID]) {
- self._subscribers.removeAll {
- idsToRemove.contains($0.id)
- }
- }
- /// Remove all subscribers and return their IDs.
- @inlinable
- mutating func removeAllSubscribers() -> [ID] {
- let subscribers = self._subscribers.map { $0.id }
- self._subscribers.removeAll()
- return subscribers
- }
- /// Returns any continuations set on subscribers, unsetting at the same time.
- @inlinable
- mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
- // Avoid allocs if there's only one subscriber.
- let count = self._countPendingContinuations()
- let result: _OneOrMany<ConsumerContinuation>?
- switch count {
- case 0:
- result = nil
- case 1:
- let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
- let continuation = self._subscribers[index].takeContinuation()!
- result = .one(continuation)
- default:
- var continuations = [ConsumerContinuation]()
- continuations.reserveCapacity(count)
- for index in self._subscribers.indices {
- if let continuation = self._subscribers[index].takeContinuation() {
- continuations.append(continuation)
- }
- }
- result = .many(continuations)
- }
- return result
- }
- /// Removes all subscribers which have continuations and return their continuations.
- @inlinable
- mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
- // Avoid allocs if there's only one subscriber.
- let count = self._countPendingContinuations()
- let result: _OneOrMany<ConsumerContinuation>
- switch count {
- case 0:
- result = .many([])
- case 1:
- let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
- let subscription = self._subscribers.remove(at: index)
- result = .one(subscription.continuation!)
- default:
- var continuations = [ConsumerContinuation]()
- continuations.reserveCapacity(count)
- var removable = [ID]()
- removable.reserveCapacity(count)
- for subscription in self._subscribers {
- if let continuation = subscription.continuation {
- continuations.append(continuation)
- removable.append(subscription.id)
- }
- }
- self._subscribers.removeAll {
- removable.contains($0.id)
- }
- result = .many(continuations)
- }
- return result
- }
- @inlinable
- func _countPendingContinuations() -> Int {
- return self._subscribers.reduce(into: 0) { count, subscription in
- if subscription.continuation != nil {
- count += 1
- }
- }
- }
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension _BroadcastSequenceStateMachine {
- // TODO: tiny array
- @usableFromInline
- enum _OneOrMany<Value> {
- case one(Value)
- case many([Value])
- }
- }
|