BroadcastAsyncSequence.swift 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import DequeModule // should be @usableFromInline
  17. public import Synchronization // should be @usableFromInline
  18. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  19. ///
  20. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  21. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  22. ///
  23. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  24. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  25. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  26. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  27. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  28. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  29. ///
  30. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  31. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  32. @usableFromInline
  33. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  34. @usableFromInline
  35. let _storage: _BroadcastSequenceStorage<Element>
  36. @inlinable
  37. init(_storage: _BroadcastSequenceStorage<Element>) {
  38. self._storage = _storage
  39. }
  40. /// Make a new stream and continuation.
  41. ///
  42. /// - Parameters:
  43. /// - elementType: The type of element this sequence produces.
  44. /// - bufferSize: The number of elements this sequence may store.
  45. /// - Returns: A stream and continuation.
  46. @inlinable
  47. static func makeStream(
  48. of elementType: Element.Type = Element.self,
  49. bufferSize: Int
  50. ) -> (stream: Self, continuation: Self.Source) {
  51. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  52. let stream = Self(_storage: storage)
  53. let continuation = Self.Source(_storage: storage)
  54. return (stream, continuation)
  55. }
  56. @inlinable
  57. func makeAsyncIterator() -> AsyncIterator {
  58. let id = self._storage.subscribe()
  59. return AsyncIterator(_storage: _storage, id: id)
  60. }
  61. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  62. /// consume elements.
  63. ///
  64. /// This function can return `false` if there are active subscribers or the internal buffer no
  65. /// longer contains the first element in the sequence.
  66. @inlinable
  67. var isKnownSafeForNextSubscriber: Bool {
  68. self._storage.isKnownSafeForNextSubscriber
  69. }
  70. /// Invalidates all active subscribers.
  71. ///
  72. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  73. @inlinable
  74. func invalidateAllSubscriptions() {
  75. self._storage.invalidateAllSubscriptions()
  76. }
  77. }
  78. // MARK: - AsyncIterator
  79. extension BroadcastAsyncSequence {
  80. @usableFromInline
  81. struct AsyncIterator: AsyncIteratorProtocol {
  82. @usableFromInline
  83. let _storage: _BroadcastSequenceStorage<Element>
  84. @usableFromInline
  85. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  86. @inlinable
  87. init(
  88. _storage: _BroadcastSequenceStorage<Element>,
  89. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  90. ) {
  91. self._storage = _storage
  92. self._subscriberID = id
  93. }
  94. @inlinable
  95. mutating func next() async throws -> Element? {
  96. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  97. }
  98. }
  99. }
  100. // MARK: - Continuation
  101. extension BroadcastAsyncSequence {
  102. @usableFromInline
  103. struct Source: Sendable {
  104. @usableFromInline
  105. let _storage: _BroadcastSequenceStorage<Element>
  106. @usableFromInline
  107. init(_storage: _BroadcastSequenceStorage<Element>) {
  108. self._storage = _storage
  109. }
  110. @inlinable
  111. func write(_ element: Element) async throws {
  112. try await self._storage.yield(element)
  113. }
  114. @inlinable
  115. func finish(with result: Result<Void, any Error>) {
  116. self._storage.finish(result)
  117. }
  118. @inlinable
  119. func finish() {
  120. self.finish(with: .success(()))
  121. }
  122. @inlinable
  123. func finish(throwing error: any Error) {
  124. self.finish(with: .failure(error))
  125. }
  126. }
  127. }
  128. @usableFromInline
  129. enum BroadcastAsyncSequenceError: Error {
  130. /// The consumer was too slow.
  131. case consumingTooSlow
  132. /// The producer has already finished.
  133. case productionAlreadyFinished
  134. }
  135. // MARK: - Storage
  136. @usableFromInline
  137. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  138. @usableFromInline
  139. let _state: Mutex<_BroadcastSequenceStateMachine<Element>>
  140. @inlinable
  141. init(bufferSize: Int) {
  142. self._state = Mutex(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  143. }
  144. deinit {
  145. let onDrop = self._state.withLock { state in
  146. state.dropResources()
  147. }
  148. switch onDrop {
  149. case .none:
  150. ()
  151. case .resume(let consumers, let producers):
  152. consumers.resume()
  153. producers.resume()
  154. }
  155. }
  156. // MARK - Producer
  157. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  158. ///
  159. /// - Parameter element: The element to write.
  160. @inlinable
  161. func yield(_ element: Element) async throws {
  162. let onYield = self._state.withLock { state in state.yield(element) }
  163. switch onYield {
  164. case .none:
  165. ()
  166. case .resume(let continuations):
  167. continuations.resume()
  168. case .suspend(let token):
  169. try await withTaskCancellationHandler {
  170. try await withCheckedThrowingContinuation { continuation in
  171. let onProduceMore = self._state.withLock { state in
  172. state.waitToProduceMore(continuation: continuation, token: token)
  173. }
  174. switch onProduceMore {
  175. case .resume(let continuation, let result):
  176. continuation.resume(with: result)
  177. case .none:
  178. ()
  179. }
  180. }
  181. } onCancel: {
  182. let onCancel = self._state.withLock { state in
  183. state.cancelProducer(withToken: token)
  184. }
  185. switch onCancel {
  186. case .resume(let continuation, let result):
  187. continuation.resume(with: result)
  188. case .none:
  189. ()
  190. }
  191. }
  192. case .throwAlreadyFinished:
  193. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  194. }
  195. }
  196. /// Indicate that no more values will be produced.
  197. ///
  198. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  199. @inlinable
  200. func finish(_ result: Result<Void, any Error>) {
  201. let action = self._state.withLock { state in state.finish(result: result) }
  202. switch action {
  203. case .none:
  204. ()
  205. case .resume(let subscribers, let producers):
  206. subscribers.resume()
  207. producers.resume()
  208. }
  209. }
  210. // MARK: - Consumer
  211. /// Create a subscription to the stream.
  212. ///
  213. /// - Returns: Returns a unique subscription ID.
  214. @inlinable
  215. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  216. self._state.withLock { $0.subscribe() }
  217. }
  218. /// Returns the next element for the given subscriber, if it is available.
  219. ///
  220. /// - Parameter id: The ID of the subscriber requesting the element.
  221. /// - Returns: The next element or `nil` if the stream has been terminated.
  222. @inlinable
  223. func nextElement(
  224. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  225. ) async throws -> Element? {
  226. return try await withTaskCancellationHandler {
  227. let onNext = self._state.withLock {
  228. $0.nextElement(forSubscriber: id)
  229. }
  230. switch onNext {
  231. case .return(let returnAndProduceMore):
  232. returnAndProduceMore.producers.resume()
  233. return try returnAndProduceMore.nextResult.get()
  234. case .suspend:
  235. return try await withCheckedThrowingContinuation { continuation in
  236. let onSetContinuation = self._state.withLock { state in
  237. state.setContinuation(continuation, forSubscription: id)
  238. }
  239. switch onSetContinuation {
  240. case .resume(let continuation, let result, let producerContinuations):
  241. continuation.resume(with: result)
  242. producerContinuations?.resume()
  243. case .none:
  244. ()
  245. }
  246. }
  247. }
  248. } onCancel: {
  249. let onCancel = self._state.withLock { state in
  250. state.cancelSubscription(withID: id)
  251. }
  252. switch onCancel {
  253. case .resume(let continuation, let result):
  254. continuation.resume(with: result)
  255. case .none:
  256. ()
  257. }
  258. }
  259. }
  260. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  261. /// elements.
  262. @inlinable
  263. var isKnownSafeForNextSubscriber: Bool {
  264. self._state.withLock { state in
  265. state.nextSubscriptionIsValid
  266. }
  267. }
  268. /// Invalidates all active subscriptions.
  269. @inlinable
  270. func invalidateAllSubscriptions() {
  271. let action = self._state.withLock { state in
  272. state.invalidateAllSubscriptions()
  273. }
  274. switch action {
  275. case .resume(let continuations):
  276. continuations.resume()
  277. case .none:
  278. ()
  279. }
  280. }
  281. }
  282. // MARK: - State machine
  283. @usableFromInline
  284. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  285. @usableFromInline
  286. typealias ConsumerContinuation = CheckedContinuation<Element?, any Error>
  287. @usableFromInline
  288. typealias ProducerContinuation = CheckedContinuation<Void, any Error>
  289. @usableFromInline
  290. struct ConsumerContinuations {
  291. @usableFromInline
  292. var continuations: _OneOrMany<ConsumerContinuation>
  293. @usableFromInline
  294. var result: Result<Element?, any Error>
  295. @inlinable
  296. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, any Error>) {
  297. self.continuations = continuations
  298. self.result = result
  299. }
  300. @inlinable
  301. func resume() {
  302. switch self.continuations {
  303. case .one(let continuation):
  304. continuation.resume(with: self.result)
  305. case .many(let continuations):
  306. for continuation in continuations {
  307. continuation.resume(with: self.result)
  308. }
  309. }
  310. }
  311. }
  312. @usableFromInline
  313. struct ProducerContinuations {
  314. @usableFromInline
  315. var continuations: [ProducerContinuation]
  316. @usableFromInline
  317. var result: Result<Void, any Error>
  318. @inlinable
  319. init(continuations: [ProducerContinuation], result: Result<Void, any Error>) {
  320. self.continuations = continuations
  321. self.result = result
  322. }
  323. @inlinable
  324. func resume() {
  325. for continuation in self.continuations {
  326. continuation.resume(with: self.result)
  327. }
  328. }
  329. }
  330. @usableFromInline
  331. enum State: Sendable {
  332. /// No subscribers and no elements have been produced.
  333. case initial(Initial)
  334. /// Subscribers exist but no elements have been produced.
  335. case subscribed(Subscribed)
  336. /// Elements have been produced, there may or may not be subscribers.
  337. case streaming(Streaming)
  338. /// No more elements will be produced. There may or may not been subscribers.
  339. case finished(Finished)
  340. /// Temporary state to avoid CoWs.
  341. case _modifying
  342. @inlinable
  343. init(bufferSize: Int) {
  344. self = .initial(Initial(bufferSize: bufferSize))
  345. }
  346. @usableFromInline
  347. struct Initial: Sendable {
  348. @usableFromInline
  349. let bufferSize: Int
  350. @inlinable
  351. init(bufferSize: Int) {
  352. self.bufferSize = bufferSize
  353. }
  354. }
  355. @usableFromInline
  356. struct Subscribed: Sendable {
  357. /// Active subscriptions.
  358. @usableFromInline
  359. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  360. /// Subscriptions to fail and remove when they next request an element.
  361. @usableFromInline
  362. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  363. /// The maximum size of the element buffer.
  364. @usableFromInline
  365. let bufferSize: Int
  366. @inlinable
  367. init(from state: Initial) {
  368. self.subscriptions = Subscriptions()
  369. self.subscriptionsToDrop = []
  370. self.bufferSize = state.bufferSize
  371. }
  372. @inlinable
  373. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  374. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  375. return .resume(
  376. .init(continuations: continuations, result: result.map { nil }),
  377. .init(continuations: [], result: .success(()))
  378. )
  379. }
  380. @inlinable
  381. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  382. // Not streaming, so suspend or remove if the subscription should be dropped.
  383. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  384. return .suspend
  385. }
  386. self.subscriptionsToDrop.remove(at: index)
  387. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  388. }
  389. @inlinable
  390. mutating func cancel(
  391. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  392. ) -> OnCancelSubscription {
  393. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  394. if let continuation = continuation {
  395. return .resume(continuation, .failure(CancellationError()))
  396. } else {
  397. return .none
  398. }
  399. }
  400. @inlinable
  401. mutating func setContinuation(
  402. _ continuation: ConsumerContinuation,
  403. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  404. ) -> OnSetContinuation {
  405. // 'next(id)' must be checked first: an element might've been provided between the lock
  406. // being dropped and a continuation being created and the lock being acquired again.
  407. switch self.next(id) {
  408. case .return(let resultAndProducers):
  409. return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
  410. case .suspend:
  411. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  412. return .none
  413. } else {
  414. return .resume(continuation, .failure(CancellationError()), nil)
  415. }
  416. }
  417. }
  418. @inlinable
  419. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  420. self.subscriptions.subscribe()
  421. }
  422. @inlinable
  423. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  424. // Remove subscriptions with continuations, they need to be failed.
  425. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  426. let consumerContinuations = ConsumerContinuations(
  427. continuations: continuations,
  428. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  429. )
  430. // Remove any others to be failed when they next call 'next'.
  431. let ids = self.subscriptions.removeAllSubscribers()
  432. self.subscriptionsToDrop.append(contentsOf: ids)
  433. return .resume(consumerContinuations)
  434. }
  435. @inlinable
  436. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  437. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  438. let consumerContinuations = ConsumerContinuations(
  439. continuations: continuations,
  440. result: .failure(error)
  441. )
  442. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  443. return .resume(consumerContinuations, producerContinuations)
  444. }
  445. }
  446. @usableFromInline
  447. struct Streaming: Sendable {
  448. /// A deque of elements tagged with IDs.
  449. @usableFromInline
  450. var elements: Elements
  451. /// The maximum size of the element buffer.
  452. @usableFromInline
  453. let bufferSize: Int
  454. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  455. /// Producers which have been suspended.
  456. @usableFromInline
  457. var producers: [(ProducerContinuation, Int)]
  458. /// The IDs of producers which have been cancelled.
  459. @usableFromInline
  460. var cancelledProducers: [Int]
  461. /// The next token for a producer.
  462. @usableFromInline
  463. var producerToken: Int
  464. /// Active subscriptions.
  465. @usableFromInline
  466. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  467. /// Subscriptions to fail and remove when they next request an element.
  468. @usableFromInline
  469. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  470. @inlinable
  471. init(from state: Initial) {
  472. self.elements = Elements()
  473. self.producers = []
  474. self.producerToken = 0
  475. self.cancelledProducers = []
  476. self.subscriptions = Subscriptions()
  477. self.subscriptionsToDrop = []
  478. self.bufferSize = state.bufferSize
  479. }
  480. @inlinable
  481. init(from state: Subscribed) {
  482. self.elements = Elements()
  483. self.producers = []
  484. self.producerToken = 0
  485. self.cancelledProducers = []
  486. self.subscriptions = state.subscriptions
  487. self.subscriptionsToDrop = state.subscriptionsToDrop
  488. self.bufferSize = state.bufferSize
  489. }
  490. @inlinable
  491. mutating func append(_ element: Element) -> OnYield {
  492. let onYield: OnYield
  493. self.elements.append(element)
  494. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  495. // If the buffer is too large then:
  496. // - if all subscribers are equally slow suspend the producer
  497. // - if some subscribers are slow then remove them and the oldest value
  498. // - if no subscribers are slow then remove the oldest value
  499. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  500. switch slowConsumers.count {
  501. case 0:
  502. if self.subscriptions.isEmpty {
  503. // No consumers.
  504. let token = self.producerToken
  505. self.producerToken += 1
  506. onYield = .suspend(token)
  507. } else {
  508. // No consumers are slow, some subscribers exist, a subset of which are waiting
  509. // for a value. Drop the oldest value and resume the fastest consumers.
  510. self.elements.removeFirst()
  511. let continuations = self.subscriptions.takeContinuations().map {
  512. ConsumerContinuations(continuations: $0, result: .success(element))
  513. }
  514. if let continuations = continuations {
  515. onYield = .resume(continuations)
  516. } else {
  517. onYield = .none
  518. }
  519. }
  520. case self.subscriptions.count:
  521. // All consumers are slow; stop the production of new value.
  522. let token = self.producerToken
  523. self.producerToken += 1
  524. onYield = .suspend(token)
  525. default:
  526. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  527. // oldest value.
  528. self.elements.removeFirst()
  529. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  530. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  531. onYield = .none
  532. }
  533. } else {
  534. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  535. // must be waiting for the value we just appended.
  536. let continuations = self.subscriptions.takeContinuations().map {
  537. ConsumerContinuations(continuations: $0, result: .success(element))
  538. }
  539. if let continuations = continuations {
  540. onYield = .resume(continuations)
  541. } else {
  542. onYield = .none
  543. }
  544. }
  545. return onYield
  546. }
  547. @inlinable
  548. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  549. let onNext: OnNext
  550. // 1. Lookup the subscriber by ID to get their next offset
  551. // 2. If the element exists, update the element pointer and return the element
  552. // 3. Else if the ID is in the future, wait
  553. // 4. Else the ID is in the past, fail and remove the subscription.
  554. // Lookup the subscriber with the given ID.
  555. let onNextForSubscription = self.subscriptions.withMutableElementID(
  556. forSubscriber: id
  557. ) { elementID -> (OnNext, Bool) in
  558. let onNext: OnNext
  559. let removeSubscription: Bool
  560. // Subscriber exists; do we have the element it requires next?
  561. switch self.elements.lookupElement(withID: elementID) {
  562. case .found(let element):
  563. // Element exists in the buffer. Advance our element ID.
  564. elementID.formNext()
  565. onNext = .return(.init(nextResult: .success(element)))
  566. removeSubscription = false
  567. case .maybeAvailableLater:
  568. // Element may exist in the future.
  569. onNext = .suspend
  570. removeSubscription = false
  571. case .noLongerAvailable:
  572. // Element existed in the past but was dropped from the buffer.
  573. onNext = .return(
  574. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  575. )
  576. removeSubscription = true
  577. }
  578. return (onNext, removeSubscription)
  579. }
  580. switch onNextForSubscription {
  581. case .return(var resultAndResume):
  582. // The producer only suspends when all consumers are equally slow or there are no
  583. // consumers at all. The latter can't be true: this function can only be called by a
  584. // consumer. The former can't be true anymore because consumption isn't concurrent
  585. // so this consumer must be faster than the others so let the producer resume.
  586. //
  587. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  588. // to produce until the producer provides more values.
  589. resultAndResume.producers = ProducerContinuations(
  590. continuations: self.producers.map { $0.0 },
  591. result: .success(())
  592. )
  593. self.producers.removeAll()
  594. onNext = .return(resultAndResume)
  595. case .suspend:
  596. onNext = .suspend
  597. case .none:
  598. // No subscription found, must have been dropped or already finished.
  599. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  600. self.subscriptionsToDrop.remove(at: index)
  601. onNext = .return(
  602. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  603. )
  604. } else {
  605. // Unknown subscriber, i.e. already finished.
  606. onNext = .return(.init(nextResult: .success(nil)))
  607. }
  608. }
  609. return onNext
  610. }
  611. @inlinable
  612. mutating func setContinuation(
  613. _ continuation: ConsumerContinuation,
  614. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  615. ) -> OnSetContinuation {
  616. // 'next(id)' must be checked first: an element might've been provided between the lock
  617. // being dropped and a continuation being created and the lock being acquired again.
  618. switch self.next(id) {
  619. case .return(let resultAndProducers):
  620. return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
  621. case .suspend:
  622. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  623. return .none
  624. } else {
  625. return .resume(continuation, .failure(CancellationError()), nil)
  626. }
  627. }
  628. }
  629. @inlinable
  630. mutating func cancel(
  631. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  632. ) -> OnCancelSubscription {
  633. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  634. if let continuation = continuation {
  635. return .resume(continuation, .failure(CancellationError()))
  636. } else {
  637. return .none
  638. }
  639. }
  640. @inlinable
  641. mutating func waitToProduceMore(
  642. _ continuation: ProducerContinuation,
  643. token: Int
  644. ) -> OnWaitToProduceMore {
  645. let onWaitToProduceMore: OnWaitToProduceMore
  646. if self.elements.count < self.bufferSize {
  647. // Buffer has free space, no need to suspend.
  648. onWaitToProduceMore = .resume(continuation, .success(()))
  649. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  650. // Producer was cancelled before suspending.
  651. self.cancelledProducers.remove(at: index)
  652. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  653. } else {
  654. // Store the continuation to resume later.
  655. self.producers.append((continuation, token))
  656. onWaitToProduceMore = .none
  657. }
  658. return onWaitToProduceMore
  659. }
  660. @inlinable
  661. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  662. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  663. self.cancelledProducers.append(token)
  664. return .none
  665. }
  666. let (continuation, _) = self.producers.remove(at: index)
  667. return .resume(continuation, .failure(CancellationError()))
  668. }
  669. @inlinable
  670. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  671. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  672. let producers = self.producers.map { $0.0 }
  673. self.producers.removeAll()
  674. return .resume(
  675. .init(continuations: continuations, result: result.map { nil }),
  676. .init(continuations: producers, result: .success(()))
  677. )
  678. }
  679. @inlinable
  680. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  681. self.subscriptions.subscribe()
  682. }
  683. @inlinable
  684. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  685. // Remove subscriptions with continuations, they need to be failed.
  686. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  687. let consumerContinuations = ConsumerContinuations(
  688. continuations: continuations,
  689. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  690. )
  691. // Remove any others to be failed when they next call 'next'.
  692. let ids = self.subscriptions.removeAllSubscribers()
  693. self.subscriptionsToDrop.append(contentsOf: ids)
  694. return .resume(consumerContinuations)
  695. }
  696. @inlinable
  697. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  698. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  699. let consumerContinuations = ConsumerContinuations(
  700. continuations: continuations,
  701. result: .failure(error)
  702. )
  703. let producers = ProducerContinuations(
  704. continuations: self.producers.map { $0.0 },
  705. result: .failure(error)
  706. )
  707. self.producers.removeAll()
  708. return .resume(consumerContinuations, producers)
  709. }
  710. @inlinable
  711. func nextSubscriptionIsValid() -> Bool {
  712. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  713. }
  714. }
  715. @usableFromInline
  716. struct Finished: Sendable {
  717. /// A deque of elements tagged with IDs.
  718. @usableFromInline
  719. var elements: Elements
  720. /// Active subscriptions.
  721. @usableFromInline
  722. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  723. /// Subscriptions to fail and remove when they next request an element.
  724. @usableFromInline
  725. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  726. /// The terminating result of the sequence.
  727. @usableFromInline
  728. let result: Result<Void, any Error>
  729. @inlinable
  730. init(from state: Initial, result: Result<Void, any Error>) {
  731. self.elements = Elements()
  732. self.subscriptions = Subscriptions()
  733. self.subscriptionsToDrop = []
  734. self.result = result
  735. }
  736. @inlinable
  737. init(from state: Subscribed, result: Result<Void, any Error>) {
  738. self.elements = Elements()
  739. self.subscriptions = state.subscriptions
  740. self.subscriptionsToDrop = []
  741. self.result = result
  742. }
  743. @inlinable
  744. init(from state: Streaming, result: Result<Void, any Error>) {
  745. self.elements = state.elements
  746. self.subscriptions = state.subscriptions
  747. self.subscriptionsToDrop = state.subscriptionsToDrop
  748. self.result = result
  749. }
  750. @inlinable
  751. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  752. let onNext: OnNext
  753. let onNextForSubscription = self.subscriptions.withMutableElementID(
  754. forSubscriber: id
  755. ) { elementID -> (OnNext, Bool) in
  756. let onNext: OnNext
  757. let removeSubscription: Bool
  758. switch self.elements.lookupElement(withID: elementID) {
  759. case .found(let element):
  760. elementID.formNext()
  761. onNext = .return(.init(nextResult: .success(element)))
  762. removeSubscription = false
  763. case .maybeAvailableLater:
  764. onNext = .return(.init(nextResult: self.result.map { nil }))
  765. removeSubscription = true
  766. case .noLongerAvailable:
  767. onNext = .return(
  768. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  769. )
  770. removeSubscription = true
  771. }
  772. return (onNext, removeSubscription)
  773. }
  774. switch onNextForSubscription {
  775. case .return(let result):
  776. onNext = .return(result)
  777. case .none:
  778. // No subscriber with the given ID, it was likely dropped previously.
  779. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  780. self.subscriptionsToDrop.remove(at: index)
  781. onNext = .return(
  782. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  783. )
  784. } else {
  785. // Unknown subscriber, i.e. already finished.
  786. onNext = .return(.init(nextResult: .success(nil)))
  787. }
  788. case .suspend:
  789. fatalError("Internal inconsistency")
  790. }
  791. return onNext
  792. }
  793. @inlinable
  794. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  795. self.subscriptions.subscribe()
  796. }
  797. @inlinable
  798. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  799. // Remove subscriptions with continuations, they need to be failed.
  800. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  801. let consumerContinuations = ConsumerContinuations(
  802. continuations: continuations,
  803. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  804. )
  805. // Remove any others to be failed when they next call 'next'.
  806. let ids = self.subscriptions.removeAllSubscribers()
  807. self.subscriptionsToDrop.append(contentsOf: ids)
  808. return .resume(consumerContinuations)
  809. }
  810. @inlinable
  811. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  812. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  813. let consumerContinuations = ConsumerContinuations(
  814. continuations: continuations,
  815. result: .failure(error)
  816. )
  817. let producers = ProducerContinuations(continuations: [], result: .failure(error))
  818. return .resume(consumerContinuations, producers)
  819. }
  820. @inlinable
  821. func nextSubscriptionIsValid() -> Bool {
  822. self.elements.lowestID == .initial
  823. }
  824. @inlinable
  825. mutating func cancel(
  826. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  827. ) -> OnCancelSubscription {
  828. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  829. if let continuation = continuation {
  830. return .resume(continuation, .failure(CancellationError()))
  831. } else {
  832. return .none
  833. }
  834. }
  835. }
  836. }
  837. @usableFromInline
  838. var _state: State
  839. @inlinable
  840. init(bufferSize: Int) {
  841. self._state = State(bufferSize: bufferSize)
  842. }
  843. @inlinable
  844. var nextSubscriptionIsValid: Bool {
  845. let isValid: Bool
  846. switch self._state {
  847. case .initial:
  848. isValid = true
  849. case .subscribed:
  850. isValid = true
  851. case .streaming(let state):
  852. isValid = state.nextSubscriptionIsValid()
  853. case .finished(let state):
  854. isValid = state.nextSubscriptionIsValid()
  855. case ._modifying:
  856. fatalError("Internal inconsistency")
  857. }
  858. return isValid
  859. }
  860. @usableFromInline
  861. enum OnInvalidateAllSubscriptions {
  862. case resume(ConsumerContinuations)
  863. case none
  864. }
  865. @inlinable
  866. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  867. let onCancel: OnInvalidateAllSubscriptions
  868. switch self._state {
  869. case .initial:
  870. onCancel = .none
  871. case .subscribed(var state):
  872. self._state = ._modifying
  873. onCancel = state.invalidateAllSubscriptions()
  874. self._state = .subscribed(state)
  875. case .streaming(var state):
  876. self._state = ._modifying
  877. onCancel = state.invalidateAllSubscriptions()
  878. self._state = .streaming(state)
  879. case .finished(var state):
  880. self._state = ._modifying
  881. onCancel = state.invalidateAllSubscriptions()
  882. self._state = .finished(state)
  883. case ._modifying:
  884. fatalError("Internal inconsistency")
  885. }
  886. return onCancel
  887. }
  888. @usableFromInline
  889. enum OnYield {
  890. case none
  891. case suspend(Int)
  892. case resume(ConsumerContinuations)
  893. case throwAlreadyFinished
  894. }
  895. @inlinable
  896. mutating func yield(_ element: Element) -> OnYield {
  897. let onYield: OnYield
  898. switch self._state {
  899. case .initial(let state):
  900. self._state = ._modifying
  901. // Move to streaming.
  902. var state = State.Streaming(from: state)
  903. onYield = state.append(element)
  904. self._state = .streaming(state)
  905. case .subscribed(let state):
  906. self._state = ._modifying
  907. var state = State.Streaming(from: state)
  908. onYield = state.append(element)
  909. self._state = .streaming(state)
  910. case .streaming(var state):
  911. self._state = ._modifying
  912. onYield = state.append(element)
  913. self._state = .streaming(state)
  914. case .finished:
  915. onYield = .throwAlreadyFinished
  916. case ._modifying:
  917. fatalError("Internal inconsistency")
  918. }
  919. return onYield
  920. }
  921. @usableFromInline
  922. enum OnFinish {
  923. case none
  924. case resume(ConsumerContinuations, ProducerContinuations)
  925. }
  926. @inlinable
  927. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  928. let onFinish: OnFinish
  929. switch self._state {
  930. case .initial(let state):
  931. self._state = ._modifying
  932. let state = State.Finished(from: state, result: result)
  933. self._state = .finished(state)
  934. onFinish = .none
  935. case .subscribed(var state):
  936. self._state = ._modifying
  937. onFinish = state.finish(result: result)
  938. self._state = .finished(State.Finished(from: state, result: result))
  939. case .streaming(var state):
  940. self._state = ._modifying
  941. onFinish = state.finish(result: result)
  942. self._state = .finished(State.Finished(from: state, result: result))
  943. case .finished:
  944. onFinish = .none
  945. case ._modifying:
  946. fatalError("Internal inconsistency")
  947. }
  948. return onFinish
  949. }
  950. @usableFromInline
  951. enum OnNext {
  952. @usableFromInline
  953. struct ReturnAndResumeProducers {
  954. @usableFromInline
  955. var nextResult: Result<Element?, any Error>
  956. @usableFromInline
  957. var producers: ProducerContinuations
  958. @inlinable
  959. init(
  960. nextResult: Result<Element?, any Error>,
  961. producers: [ProducerContinuation] = [],
  962. producerResult: Result<Void, any Error> = .success(())
  963. ) {
  964. self.nextResult = nextResult
  965. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  966. }
  967. }
  968. case `return`(ReturnAndResumeProducers)
  969. case suspend
  970. }
  971. @inlinable
  972. mutating func nextElement(
  973. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  974. ) -> OnNext {
  975. let onNext: OnNext
  976. switch self._state {
  977. case .initial:
  978. // No subscribers so demand isn't possible.
  979. fatalError("Internal inconsistency")
  980. case .subscribed(var state):
  981. self._state = ._modifying
  982. onNext = state.next(id)
  983. self._state = .subscribed(state)
  984. case .streaming(var state):
  985. self._state = ._modifying
  986. onNext = state.next(id)
  987. self._state = .streaming(state)
  988. case .finished(var state):
  989. self._state = ._modifying
  990. onNext = state.next(id)
  991. self._state = .finished(state)
  992. case ._modifying:
  993. fatalError("Internal inconsistency")
  994. }
  995. return onNext
  996. }
  997. @usableFromInline
  998. enum OnSetContinuation {
  999. case none
  1000. case resume(ConsumerContinuation, Result<Element?, any Error>, ProducerContinuations?)
  1001. }
  1002. @inlinable
  1003. mutating func setContinuation(
  1004. _ continuation: ConsumerContinuation,
  1005. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1006. ) -> OnSetContinuation {
  1007. let onSetContinuation: OnSetContinuation
  1008. switch self._state {
  1009. case .initial:
  1010. // No subscribers so demand isn't possible.
  1011. fatalError("Internal inconsistency")
  1012. case .subscribed(var state):
  1013. self._state = ._modifying
  1014. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1015. self._state = .subscribed(state)
  1016. case .streaming(var state):
  1017. self._state = ._modifying
  1018. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1019. self._state = .streaming(state)
  1020. case .finished(let state):
  1021. onSetContinuation = .resume(continuation, state.result.map { _ in nil }, nil)
  1022. case ._modifying:
  1023. // All values must have been produced, nothing to wait for.
  1024. fatalError("Internal inconsistency")
  1025. }
  1026. return onSetContinuation
  1027. }
  1028. @usableFromInline
  1029. enum OnCancelSubscription {
  1030. case none
  1031. case resume(ConsumerContinuation, Result<Element?, any Error>)
  1032. }
  1033. @inlinable
  1034. mutating func cancelSubscription(
  1035. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1036. ) -> OnCancelSubscription {
  1037. let onCancel: OnCancelSubscription
  1038. switch self._state {
  1039. case .initial:
  1040. // No subscribers so demand isn't possible.
  1041. fatalError("Internal inconsistency")
  1042. case .subscribed(var state):
  1043. self._state = ._modifying
  1044. onCancel = state.cancel(id)
  1045. self._state = .subscribed(state)
  1046. case .streaming(var state):
  1047. self._state = ._modifying
  1048. onCancel = state.cancel(id)
  1049. self._state = .streaming(state)
  1050. case .finished(var state):
  1051. self._state = ._modifying
  1052. onCancel = state.cancel(id)
  1053. self._state = .finished(state)
  1054. case ._modifying:
  1055. // All values must have been produced, nothing to wait for.
  1056. fatalError("Internal inconsistency")
  1057. }
  1058. return onCancel
  1059. }
  1060. @usableFromInline
  1061. enum OnSubscribe {
  1062. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1063. }
  1064. @inlinable
  1065. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1066. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1067. switch self._state {
  1068. case .initial(let state):
  1069. self._state = ._modifying
  1070. var state = State.Subscribed(from: state)
  1071. id = state.subscribe()
  1072. self._state = .subscribed(state)
  1073. case .subscribed(var state):
  1074. self._state = ._modifying
  1075. id = state.subscribe()
  1076. self._state = .subscribed(state)
  1077. case .streaming(var state):
  1078. self._state = ._modifying
  1079. id = state.subscribe()
  1080. self._state = .streaming(state)
  1081. case .finished(var state):
  1082. self._state = ._modifying
  1083. id = state.subscribe()
  1084. self._state = .finished(state)
  1085. case ._modifying:
  1086. fatalError("Internal inconsistency")
  1087. }
  1088. return id
  1089. }
  1090. @usableFromInline
  1091. enum OnWaitToProduceMore {
  1092. case none
  1093. case resume(ProducerContinuation, Result<Void, any Error>)
  1094. }
  1095. @inlinable
  1096. mutating func waitToProduceMore(
  1097. continuation: ProducerContinuation,
  1098. token: Int
  1099. ) -> OnWaitToProduceMore {
  1100. let onWaitToProduceMore: OnWaitToProduceMore
  1101. switch self._state {
  1102. case .initial, .subscribed:
  1103. // Nothing produced yet, so no reason have to wait to produce.
  1104. fatalError("Internal inconsistency")
  1105. case .streaming(var state):
  1106. self._state = ._modifying
  1107. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1108. self._state = .streaming(state)
  1109. case .finished:
  1110. onWaitToProduceMore = .resume(continuation, .success(()))
  1111. case ._modifying:
  1112. fatalError("Internal inconsistency")
  1113. }
  1114. return onWaitToProduceMore
  1115. }
  1116. @usableFromInline
  1117. typealias OnCancelProducer = OnWaitToProduceMore
  1118. @inlinable
  1119. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1120. let onCancelProducer: OnCancelProducer
  1121. switch self._state {
  1122. case .initial, .subscribed:
  1123. // Nothing produced yet, so no reason have to wait to produce.
  1124. fatalError("Internal inconsistency")
  1125. case .streaming(var state):
  1126. self._state = ._modifying
  1127. onCancelProducer = state.cancelProducer(withToken: token)
  1128. self._state = .streaming(state)
  1129. case .finished:
  1130. // No producers to cancel; do nothing.
  1131. onCancelProducer = .none
  1132. case ._modifying:
  1133. fatalError("Internal inconsistency")
  1134. }
  1135. return onCancelProducer
  1136. }
  1137. @usableFromInline
  1138. enum OnDropResources {
  1139. case none
  1140. case resume(ConsumerContinuations, ProducerContinuations)
  1141. }
  1142. @inlinable
  1143. mutating func dropResources() -> OnDropResources {
  1144. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1145. let onDrop: OnDropResources
  1146. switch self._state {
  1147. case .initial(let state):
  1148. self._state = ._modifying
  1149. onDrop = .none
  1150. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1151. case .subscribed(var state):
  1152. self._state = ._modifying
  1153. onDrop = state.dropResources(error: error)
  1154. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1155. case .streaming(var state):
  1156. self._state = ._modifying
  1157. onDrop = state.dropResources(error: error)
  1158. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1159. case .finished(var state):
  1160. self._state = ._modifying
  1161. onDrop = state.dropResources(error: error)
  1162. self._state = .finished(state)
  1163. case ._modifying:
  1164. fatalError("Internal inconsistency")
  1165. }
  1166. return onDrop
  1167. }
  1168. }
  1169. extension _BroadcastSequenceStateMachine {
  1170. /// A collection of elements tagged with an identifier.
  1171. ///
  1172. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1173. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1174. @usableFromInline
  1175. struct Elements: Sendable {
  1176. /// The ID of an element
  1177. @usableFromInline
  1178. struct ID: Hashable, Sendable, Comparable, Strideable {
  1179. @usableFromInline
  1180. private(set) var rawValue: Int
  1181. @usableFromInline
  1182. static var initial: Self {
  1183. ID(id: 0)
  1184. }
  1185. private init(id: Int) {
  1186. self.rawValue = id
  1187. }
  1188. @inlinable
  1189. mutating func formNext() {
  1190. self.rawValue += 1
  1191. }
  1192. @inlinable
  1193. func next() -> Self {
  1194. var copy = self
  1195. copy.formNext()
  1196. return copy
  1197. }
  1198. @inlinable
  1199. func distance(to other: Self) -> Int {
  1200. other.rawValue - self.rawValue
  1201. }
  1202. @inlinable
  1203. func advanced(by n: Int) -> Self {
  1204. var copy = self
  1205. copy.rawValue += n
  1206. return copy
  1207. }
  1208. @inlinable
  1209. static func < (lhs: Self, rhs: Self) -> Bool {
  1210. lhs.rawValue < rhs.rawValue
  1211. }
  1212. }
  1213. @usableFromInline
  1214. struct _IdentifiableElement: Sendable {
  1215. @usableFromInline
  1216. var element: Element
  1217. @usableFromInline
  1218. var id: ID
  1219. @inlinable
  1220. init(element: Element, id: ID) {
  1221. self.element = element
  1222. self.id = id
  1223. }
  1224. }
  1225. @usableFromInline
  1226. var _elements: Deque<_IdentifiableElement>
  1227. @usableFromInline
  1228. var _nextID: ID
  1229. @inlinable
  1230. init() {
  1231. self._nextID = .initial
  1232. self._elements = []
  1233. }
  1234. @inlinable
  1235. mutating func nextElementID() -> ID {
  1236. let id = self._nextID
  1237. self._nextID.formNext()
  1238. return id
  1239. }
  1240. /// The highest ID of the stored elements; `nil` if there are no elements.
  1241. @inlinable
  1242. var highestID: ID? { self._elements.last?.id }
  1243. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1244. @inlinable
  1245. var lowestID: ID? { self._elements.first?.id }
  1246. /// The number of stored elements.
  1247. @inlinable
  1248. var count: Int { self._elements.count }
  1249. /// Whether there are no stored elements.
  1250. @inlinable
  1251. var isEmpty: Bool { self._elements.isEmpty }
  1252. /// Appends an element to the collection.
  1253. @inlinable
  1254. mutating func append(_ element: Element) {
  1255. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1256. }
  1257. /// Removes the first element from the collection.
  1258. @discardableResult
  1259. @inlinable
  1260. mutating func removeFirst() -> Element {
  1261. let removed = self._elements.removeFirst()
  1262. return removed.element
  1263. }
  1264. @usableFromInline
  1265. enum ElementLookup {
  1266. /// The element was found in the collection.
  1267. case found(Element)
  1268. /// The element isn't in the collection, but it could be in the future.
  1269. case maybeAvailableLater
  1270. /// The element was in the collection, but is no longer available.
  1271. case noLongerAvailable
  1272. }
  1273. /// Lookup the element with the given ID.
  1274. ///
  1275. /// - Parameter id: The ID of the element to lookup.
  1276. @inlinable
  1277. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1278. guard let low = self.lowestID, let high = self.highestID else {
  1279. // Must be empty.
  1280. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1281. }
  1282. assert(low <= high)
  1283. let lookup: ElementLookup
  1284. if id < low {
  1285. lookup = .noLongerAvailable
  1286. } else if id > high {
  1287. lookup = .maybeAvailableLater
  1288. } else {
  1289. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1290. // into the deque by looking at the offsets.
  1291. let offset = low.distance(to: id)
  1292. let index = self._elements.startIndex.advanced(by: offset)
  1293. lookup = .found(self._elements[index].element)
  1294. }
  1295. return lookup
  1296. }
  1297. }
  1298. }
  1299. extension _BroadcastSequenceStateMachine {
  1300. /// A collection of subscriptions.
  1301. @usableFromInline
  1302. struct Subscriptions: Sendable {
  1303. @usableFromInline
  1304. struct ID: Hashable, Sendable {
  1305. @usableFromInline
  1306. private(set) var rawValue: Int
  1307. @inlinable
  1308. init() {
  1309. self.rawValue = 0
  1310. }
  1311. @inlinable
  1312. mutating func formNext() {
  1313. self.rawValue += 1
  1314. }
  1315. @inlinable
  1316. func next() -> Self {
  1317. var copy = self
  1318. copy.formNext()
  1319. return copy
  1320. }
  1321. }
  1322. @usableFromInline
  1323. struct _Subscriber: Sendable {
  1324. /// The ID of the subscriber.
  1325. @usableFromInline
  1326. var id: ID
  1327. /// The ID of the next element the subscriber will consume.
  1328. @usableFromInline
  1329. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1330. /// A continuation which which will be resumed when the next element becomes available.
  1331. @usableFromInline
  1332. var continuation: ConsumerContinuation?
  1333. @inlinable
  1334. init(
  1335. id: ID,
  1336. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1337. continuation: ConsumerContinuation? = nil
  1338. ) {
  1339. self.id = id
  1340. self.nextElementID = nextElementID
  1341. self.continuation = continuation
  1342. }
  1343. /// Returns and sets the continuation to `nil` if one exists.
  1344. ///
  1345. /// The next element ID is advanced if a contination exists.
  1346. ///
  1347. /// - Returns: The continuation, if one existed.
  1348. @inlinable
  1349. mutating func takeContinuation() -> ConsumerContinuation? {
  1350. guard let continuation = self.continuation else { return nil }
  1351. self.continuation = nil
  1352. self.nextElementID.formNext()
  1353. return continuation
  1354. }
  1355. }
  1356. @usableFromInline
  1357. var _subscribers: [_Subscriber]
  1358. @usableFromInline
  1359. var _nextSubscriberID: ID
  1360. @inlinable
  1361. init() {
  1362. self._subscribers = []
  1363. self._nextSubscriberID = ID()
  1364. }
  1365. /// Returns the number of subscribers.
  1366. @inlinable
  1367. var count: Int { self._subscribers.count }
  1368. /// Returns whether the collection is empty.
  1369. @inlinable
  1370. var isEmpty: Bool { self._subscribers.isEmpty }
  1371. /// Adds a new subscriber and returns its unique ID.
  1372. ///
  1373. /// - Returns: The ID of the new subscriber.
  1374. @inlinable
  1375. mutating func subscribe() -> ID {
  1376. let id = self._nextSubscriberID
  1377. self._nextSubscriberID.formNext()
  1378. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1379. return id
  1380. }
  1381. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1382. ///
  1383. /// - Parameters:
  1384. /// - id: The ID of the subscriber.
  1385. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1386. /// a boolean indicating whether the subscriber should be removed.
  1387. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1388. /// given ID.
  1389. @inlinable
  1390. mutating func withMutableElementID<R>(
  1391. forSubscriber id: ID,
  1392. _ body: (
  1393. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1394. ) -> (result: R, removeSubscription: Bool)
  1395. ) -> R? {
  1396. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1397. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1398. if removeSubscription {
  1399. self._subscribers.remove(at: index)
  1400. }
  1401. return result
  1402. }
  1403. /// Sets the continuation for the subscription with the given ID.
  1404. /// - Parameters:
  1405. /// - continuation: The continuation to set.
  1406. /// - id: The ID of the subscriber.
  1407. /// - Returns: A boolean indicating whether the continuation was set or not.
  1408. @inlinable
  1409. mutating func setContinuation(
  1410. _ continuation: ConsumerContinuation,
  1411. forSubscriber id: ID
  1412. ) -> Bool {
  1413. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1414. return false
  1415. }
  1416. assert(self._subscribers[index].continuation == nil)
  1417. self._subscribers[index].continuation = continuation
  1418. return true
  1419. }
  1420. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1421. @inlinable
  1422. func subscribers(
  1423. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1424. ) -> [ID] {
  1425. return self._subscribers.filter {
  1426. $0.nextElementID == id
  1427. }.map {
  1428. $0.id
  1429. }
  1430. }
  1431. /// Removes the subscriber with the given ID.
  1432. /// - Parameter id: The ID of the subscriber to remove.
  1433. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1434. /// associated with the subscriber.
  1435. @inlinable
  1436. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1437. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1438. return (false, nil)
  1439. }
  1440. let continuation = self._subscribers[index].continuation
  1441. self._subscribers.remove(at: index)
  1442. return (true, continuation)
  1443. }
  1444. /// Remove all subscribers in the given array of IDs.
  1445. @inlinable
  1446. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1447. self._subscribers.removeAll {
  1448. idsToRemove.contains($0.id)
  1449. }
  1450. }
  1451. /// Remove all subscribers and return their IDs.
  1452. @inlinable
  1453. mutating func removeAllSubscribers() -> [ID] {
  1454. let subscribers = self._subscribers.map { $0.id }
  1455. self._subscribers.removeAll()
  1456. return subscribers
  1457. }
  1458. /// Returns any continuations set on subscribers, unsetting at the same time.
  1459. @inlinable
  1460. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1461. // Avoid allocs if there's only one subscriber.
  1462. let count = self._countPendingContinuations()
  1463. let result: _OneOrMany<ConsumerContinuation>?
  1464. switch count {
  1465. case 0:
  1466. result = nil
  1467. case 1:
  1468. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1469. let continuation = self._subscribers[index].takeContinuation()!
  1470. result = .one(continuation)
  1471. default:
  1472. var continuations = [ConsumerContinuation]()
  1473. continuations.reserveCapacity(count)
  1474. for index in self._subscribers.indices {
  1475. if let continuation = self._subscribers[index].takeContinuation() {
  1476. continuations.append(continuation)
  1477. }
  1478. }
  1479. result = .many(continuations)
  1480. }
  1481. return result
  1482. }
  1483. /// Removes all subscribers which have continuations and return their continuations.
  1484. @inlinable
  1485. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
  1486. // Avoid allocs if there's only one subscriber.
  1487. let count = self._countPendingContinuations()
  1488. let result: _OneOrMany<ConsumerContinuation>
  1489. switch count {
  1490. case 0:
  1491. result = .many([])
  1492. case 1:
  1493. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1494. let subscription = self._subscribers.remove(at: index)
  1495. result = .one(subscription.continuation!)
  1496. default:
  1497. var continuations = [ConsumerContinuation]()
  1498. continuations.reserveCapacity(count)
  1499. var removable = [ID]()
  1500. removable.reserveCapacity(count)
  1501. for subscription in self._subscribers {
  1502. if let continuation = subscription.continuation {
  1503. continuations.append(continuation)
  1504. removable.append(subscription.id)
  1505. }
  1506. }
  1507. self._subscribers.removeAll {
  1508. removable.contains($0.id)
  1509. }
  1510. result = .many(continuations)
  1511. }
  1512. return result
  1513. }
  1514. @inlinable
  1515. func _countPendingContinuations() -> Int {
  1516. return self._subscribers.reduce(into: 0) { count, subscription in
  1517. if subscription.continuation != nil {
  1518. count += 1
  1519. }
  1520. }
  1521. }
  1522. }
  1523. }
  1524. extension _BroadcastSequenceStateMachine {
  1525. // TODO: tiny array
  1526. @usableFromInline
  1527. enum _OneOrMany<Value> {
  1528. case one(Value)
  1529. case many([Value])
  1530. }
  1531. }