BroadcastAsyncSequence.swift 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import DequeModule // should be @usableFromInline
  17. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  18. ///
  19. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  20. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  21. ///
  22. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  23. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  24. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  25. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  26. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  27. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  28. ///
  29. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  30. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  31. @usableFromInline
  32. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  33. @usableFromInline
  34. let _storage: _BroadcastSequenceStorage<Element>
  35. @inlinable
  36. init(_storage: _BroadcastSequenceStorage<Element>) {
  37. self._storage = _storage
  38. }
  39. /// Make a new stream and continuation.
  40. ///
  41. /// - Parameters:
  42. /// - elementType: The type of element this sequence produces.
  43. /// - bufferSize: The number of elements this sequence may store.
  44. /// - Returns: A stream and continuation.
  45. @inlinable
  46. static func makeStream(
  47. of elementType: Element.Type = Element.self,
  48. bufferSize: Int
  49. ) -> (stream: Self, continuation: Self.Source) {
  50. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  51. let stream = Self(_storage: storage)
  52. let continuation = Self.Source(_storage: storage)
  53. return (stream, continuation)
  54. }
  55. @inlinable
  56. func makeAsyncIterator() -> AsyncIterator {
  57. let id = self._storage.subscribe()
  58. return AsyncIterator(_storage: _storage, id: id)
  59. }
  60. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  61. /// consume elements.
  62. ///
  63. /// This function can return `false` if there are active subscribers or the internal buffer no
  64. /// longer contains the first element in the sequence.
  65. @inlinable
  66. var isKnownSafeForNextSubscriber: Bool {
  67. self._storage.isKnownSafeForNextSubscriber
  68. }
  69. /// Invalidates all active subscribers.
  70. ///
  71. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  72. @inlinable
  73. func invalidateAllSubscriptions() {
  74. self._storage.invalidateAllSubscriptions()
  75. }
  76. }
  77. // MARK: - AsyncIterator
  78. extension BroadcastAsyncSequence {
  79. @usableFromInline
  80. struct AsyncIterator: AsyncIteratorProtocol {
  81. @usableFromInline
  82. let _storage: _BroadcastSequenceStorage<Element>
  83. @usableFromInline
  84. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  85. @inlinable
  86. init(
  87. _storage: _BroadcastSequenceStorage<Element>,
  88. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  89. ) {
  90. self._storage = _storage
  91. self._subscriberID = id
  92. }
  93. @inlinable
  94. mutating func next() async throws -> Element? {
  95. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  96. }
  97. }
  98. }
  99. // MARK: - Continuation
  100. extension BroadcastAsyncSequence {
  101. @usableFromInline
  102. struct Source: Sendable {
  103. @usableFromInline
  104. let _storage: _BroadcastSequenceStorage<Element>
  105. @usableFromInline
  106. init(_storage: _BroadcastSequenceStorage<Element>) {
  107. self._storage = _storage
  108. }
  109. @inlinable
  110. func write(_ element: Element) async throws {
  111. try await self._storage.yield(element)
  112. }
  113. @inlinable
  114. func finish(with result: Result<Void, any Error>) {
  115. self._storage.finish(result)
  116. }
  117. @inlinable
  118. func finish() {
  119. self.finish(with: .success(()))
  120. }
  121. @inlinable
  122. func finish(throwing error: any Error) {
  123. self.finish(with: .failure(error))
  124. }
  125. }
  126. }
  127. @usableFromInline
  128. enum BroadcastAsyncSequenceError: Error {
  129. /// The consumer was too slow.
  130. case consumingTooSlow
  131. /// The producer has already finished.
  132. case productionAlreadyFinished
  133. }
  134. // MARK: - Storage
  135. @usableFromInline
  136. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  137. @usableFromInline
  138. let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
  139. @inlinable
  140. init(bufferSize: Int) {
  141. self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  142. }
  143. deinit {
  144. let onDrop = self._state.withLockedValue { state in
  145. state.dropResources()
  146. }
  147. switch onDrop {
  148. case .none:
  149. ()
  150. case .resume(let consumers, let producers):
  151. consumers.resume()
  152. producers.resume()
  153. }
  154. }
  155. // MARK - Producer
  156. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  157. ///
  158. /// - Parameter element: The element to write.
  159. @inlinable
  160. func yield(_ element: Element) async throws {
  161. let onYield = self._state.withLockedValue { state in state.yield(element) }
  162. switch onYield {
  163. case .none:
  164. ()
  165. case .resume(let continuations):
  166. continuations.resume()
  167. case .suspend(let token):
  168. try await withTaskCancellationHandler {
  169. try await withCheckedThrowingContinuation { continuation in
  170. let onProduceMore = self._state.withLockedValue { state in
  171. state.waitToProduceMore(continuation: continuation, token: token)
  172. }
  173. switch onProduceMore {
  174. case .resume(let continuation, let result):
  175. continuation.resume(with: result)
  176. case .none:
  177. ()
  178. }
  179. }
  180. } onCancel: {
  181. let onCancel = self._state.withLockedValue { state in
  182. state.cancelProducer(withToken: token)
  183. }
  184. switch onCancel {
  185. case .resume(let continuation, let result):
  186. continuation.resume(with: result)
  187. case .none:
  188. ()
  189. }
  190. }
  191. case .throwAlreadyFinished:
  192. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  193. }
  194. }
  195. /// Indicate that no more values will be produced.
  196. ///
  197. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  198. @inlinable
  199. func finish(_ result: Result<Void, any Error>) {
  200. let action = self._state.withLockedValue { state in state.finish(result: result) }
  201. switch action {
  202. case .none:
  203. ()
  204. case .resume(let subscribers, let producers):
  205. subscribers.resume()
  206. producers.resume()
  207. }
  208. }
  209. // MARK: - Consumer
  210. /// Create a subscription to the stream.
  211. ///
  212. /// - Returns: Returns a unique subscription ID.
  213. @inlinable
  214. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  215. self._state.withLockedValue { $0.subscribe() }
  216. }
  217. /// Returns the next element for the given subscriber, if it is available.
  218. ///
  219. /// - Parameter id: The ID of the subscriber requesting the element.
  220. /// - Returns: The next element or `nil` if the stream has been terminated.
  221. @inlinable
  222. func nextElement(
  223. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  224. ) async throws -> Element? {
  225. return try await withTaskCancellationHandler {
  226. self._state.unsafe.lock()
  227. let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
  228. $0.nextElement(forSubscriber: id)
  229. }
  230. switch onNext {
  231. case .return(let returnAndProduceMore):
  232. self._state.unsafe.unlock()
  233. returnAndProduceMore.producers.resume()
  234. return try returnAndProduceMore.nextResult.get()
  235. case .suspend:
  236. return try await withCheckedThrowingContinuation { continuation in
  237. let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
  238. state.setContinuation(continuation, forSubscription: id)
  239. }
  240. self._state.unsafe.unlock()
  241. switch onSetContinuation {
  242. case .resume(let continuation, let result):
  243. continuation.resume(with: result)
  244. case .none:
  245. ()
  246. }
  247. }
  248. }
  249. } onCancel: {
  250. let onCancel = self._state.withLockedValue { state in
  251. state.cancelSubscription(withID: id)
  252. }
  253. switch onCancel {
  254. case .resume(let continuation, let result):
  255. continuation.resume(with: result)
  256. case .none:
  257. ()
  258. }
  259. }
  260. }
  261. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  262. /// elements.
  263. @inlinable
  264. var isKnownSafeForNextSubscriber: Bool {
  265. self._state.withLockedValue { state in
  266. state.nextSubscriptionIsValid
  267. }
  268. }
  269. /// Invalidates all active subscriptions.
  270. @inlinable
  271. func invalidateAllSubscriptions() {
  272. let action = self._state.withLockedValue { state in
  273. state.invalidateAllSubscriptions()
  274. }
  275. switch action {
  276. case .resume(let continuations):
  277. continuations.resume()
  278. case .none:
  279. ()
  280. }
  281. }
  282. }
  283. // MARK: - State machine
  284. @usableFromInline
  285. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  286. @usableFromInline
  287. typealias ConsumerContinuation = CheckedContinuation<Element?, any Error>
  288. @usableFromInline
  289. typealias ProducerContinuation = CheckedContinuation<Void, any Error>
  290. @usableFromInline
  291. struct ConsumerContinuations {
  292. @usableFromInline
  293. var continuations: _OneOrMany<ConsumerContinuation>
  294. @usableFromInline
  295. var result: Result<Element?, any Error>
  296. @inlinable
  297. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, any Error>) {
  298. self.continuations = continuations
  299. self.result = result
  300. }
  301. @inlinable
  302. func resume() {
  303. switch self.continuations {
  304. case .one(let continuation):
  305. continuation.resume(with: self.result)
  306. case .many(let continuations):
  307. for continuation in continuations {
  308. continuation.resume(with: self.result)
  309. }
  310. }
  311. }
  312. }
  313. @usableFromInline
  314. struct ProducerContinuations {
  315. @usableFromInline
  316. var continuations: [ProducerContinuation]
  317. @usableFromInline
  318. var result: Result<Void, any Error>
  319. @inlinable
  320. init(continuations: [ProducerContinuation], result: Result<Void, any Error>) {
  321. self.continuations = continuations
  322. self.result = result
  323. }
  324. @inlinable
  325. func resume() {
  326. for continuation in self.continuations {
  327. continuation.resume(with: self.result)
  328. }
  329. }
  330. }
  331. @usableFromInline
  332. enum State: Sendable {
  333. /// No subscribers and no elements have been produced.
  334. case initial(Initial)
  335. /// Subscribers exist but no elements have been produced.
  336. case subscribed(Subscribed)
  337. /// Elements have been produced, there may or may not be subscribers.
  338. case streaming(Streaming)
  339. /// No more elements will be produced. There may or may not been subscribers.
  340. case finished(Finished)
  341. /// Temporary state to avoid CoWs.
  342. case _modifying
  343. @inlinable
  344. init(bufferSize: Int) {
  345. self = .initial(Initial(bufferSize: bufferSize))
  346. }
  347. @usableFromInline
  348. struct Initial: Sendable {
  349. @usableFromInline
  350. let bufferSize: Int
  351. @inlinable
  352. init(bufferSize: Int) {
  353. self.bufferSize = bufferSize
  354. }
  355. }
  356. @usableFromInline
  357. struct Subscribed: Sendable {
  358. /// Active subscriptions.
  359. @usableFromInline
  360. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  361. /// Subscriptions to fail and remove when they next request an element.
  362. @usableFromInline
  363. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  364. /// The maximum size of the element buffer.
  365. @usableFromInline
  366. let bufferSize: Int
  367. @inlinable
  368. init(from state: Initial) {
  369. self.subscriptions = Subscriptions()
  370. self.subscriptionsToDrop = []
  371. self.bufferSize = state.bufferSize
  372. }
  373. @inlinable
  374. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  375. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  376. return .resume(
  377. .init(continuations: continuations, result: result.map { nil }),
  378. .init(continuations: [], result: .success(()))
  379. )
  380. }
  381. @inlinable
  382. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  383. // Not streaming, so suspend or remove if the subscription should be dropped.
  384. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  385. return .suspend
  386. }
  387. self.subscriptionsToDrop.remove(at: index)
  388. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  389. }
  390. @inlinable
  391. mutating func cancel(
  392. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  393. ) -> OnCancelSubscription {
  394. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  395. if let continuation = continuation {
  396. return .resume(continuation, .failure(CancellationError()))
  397. } else {
  398. return .none
  399. }
  400. }
  401. @inlinable
  402. mutating func setContinuation(
  403. _ continuation: ConsumerContinuation,
  404. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  405. ) -> OnSetContinuation {
  406. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  407. return .none
  408. } else {
  409. return .resume(continuation, .failure(CancellationError()))
  410. }
  411. }
  412. @inlinable
  413. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  414. self.subscriptions.subscribe()
  415. }
  416. @inlinable
  417. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  418. // Remove subscriptions with continuations, they need to be failed.
  419. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  420. let consumerContinuations = ConsumerContinuations(
  421. continuations: continuations,
  422. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  423. )
  424. // Remove any others to be failed when they next call 'next'.
  425. let ids = self.subscriptions.removeAllSubscribers()
  426. self.subscriptionsToDrop.append(contentsOf: ids)
  427. return .resume(consumerContinuations)
  428. }
  429. @inlinable
  430. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  431. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  432. let consumerContinuations = ConsumerContinuations(
  433. continuations: continuations,
  434. result: .failure(error)
  435. )
  436. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  437. return .resume(consumerContinuations, producerContinuations)
  438. }
  439. }
  440. @usableFromInline
  441. struct Streaming: Sendable {
  442. /// A deque of elements tagged with IDs.
  443. @usableFromInline
  444. var elements: Elements
  445. /// The maximum size of the element buffer.
  446. @usableFromInline
  447. let bufferSize: Int
  448. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  449. /// Producers which have been suspended.
  450. @usableFromInline
  451. var producers: [(ProducerContinuation, Int)]
  452. /// The IDs of producers which have been cancelled.
  453. @usableFromInline
  454. var cancelledProducers: [Int]
  455. /// The next token for a producer.
  456. @usableFromInline
  457. var producerToken: Int
  458. /// Active subscriptions.
  459. @usableFromInline
  460. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  461. /// Subscriptions to fail and remove when they next request an element.
  462. @usableFromInline
  463. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  464. @inlinable
  465. init(from state: Initial) {
  466. self.elements = Elements()
  467. self.producers = []
  468. self.producerToken = 0
  469. self.cancelledProducers = []
  470. self.subscriptions = Subscriptions()
  471. self.subscriptionsToDrop = []
  472. self.bufferSize = state.bufferSize
  473. }
  474. @inlinable
  475. init(from state: Subscribed) {
  476. self.elements = Elements()
  477. self.producers = []
  478. self.producerToken = 0
  479. self.cancelledProducers = []
  480. self.subscriptions = state.subscriptions
  481. self.subscriptionsToDrop = state.subscriptionsToDrop
  482. self.bufferSize = state.bufferSize
  483. }
  484. @inlinable
  485. mutating func append(_ element: Element) -> OnYield {
  486. let onYield: OnYield
  487. self.elements.append(element)
  488. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  489. // If the buffer is too large then:
  490. // - if all subscribers are equally slow suspend the producer
  491. // - if some subscribers are slow then remove them and the oldest value
  492. // - if no subscribers are slow then remove the oldest value
  493. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  494. switch slowConsumers.count {
  495. case 0:
  496. if self.subscriptions.isEmpty {
  497. // No consumers.
  498. let token = self.producerToken
  499. self.producerToken += 1
  500. onYield = .suspend(token)
  501. } else {
  502. // No consumers are slow, some subscribers exist, a subset of which are waiting
  503. // for a value. Drop the oldest value and resume the fastest consumers.
  504. self.elements.removeFirst()
  505. let continuations = self.subscriptions.takeContinuations().map {
  506. ConsumerContinuations(continuations: $0, result: .success(element))
  507. }
  508. if let continuations = continuations {
  509. onYield = .resume(continuations)
  510. } else {
  511. onYield = .none
  512. }
  513. }
  514. case self.subscriptions.count:
  515. // All consumers are slow; stop the production of new value.
  516. let token = self.producerToken
  517. self.producerToken += 1
  518. onYield = .suspend(token)
  519. default:
  520. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  521. // oldest value.
  522. self.elements.removeFirst()
  523. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  524. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  525. onYield = .none
  526. }
  527. } else {
  528. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  529. // must be waiting for the value we just appended.
  530. let continuations = self.subscriptions.takeContinuations().map {
  531. ConsumerContinuations(continuations: $0, result: .success(element))
  532. }
  533. if let continuations = continuations {
  534. onYield = .resume(continuations)
  535. } else {
  536. onYield = .none
  537. }
  538. }
  539. return onYield
  540. }
  541. @inlinable
  542. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  543. let onNext: OnNext
  544. // 1. Lookup the subscriber by ID to get their next offset
  545. // 2. If the element exists, update the element pointer and return the element
  546. // 3. Else if the ID is in the future, wait
  547. // 4. Else the ID is in the past, fail and remove the subscription.
  548. // Lookup the subscriber with the given ID.
  549. let onNextForSubscription = self.subscriptions.withMutableElementID(
  550. forSubscriber: id
  551. ) { elementID -> (OnNext, Bool) in
  552. let onNext: OnNext
  553. let removeSubscription: Bool
  554. // Subscriber exists; do we have the element it requires next?
  555. switch self.elements.lookupElement(withID: elementID) {
  556. case .found(let element):
  557. // Element exists in the buffer. Advance our element ID.
  558. elementID.formNext()
  559. onNext = .return(.init(nextResult: .success(element)))
  560. removeSubscription = false
  561. case .maybeAvailableLater:
  562. // Element may exist in the future.
  563. onNext = .suspend
  564. removeSubscription = false
  565. case .noLongerAvailable:
  566. // Element existed in the past but was dropped from the buffer.
  567. onNext = .return(
  568. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  569. )
  570. removeSubscription = true
  571. }
  572. return (onNext, removeSubscription)
  573. }
  574. switch onNextForSubscription {
  575. case .return(var resultAndResume):
  576. // The producer only suspends when all consumers are equally slow or there are no
  577. // consumers at all. The latter can't be true: this function can only be called by a
  578. // consumer. The former can't be true anymore because consumption isn't concurrent
  579. // so this consumer must be faster than the others so let the producer resume.
  580. //
  581. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  582. // to produce until the producer provides more values.
  583. resultAndResume.producers = ProducerContinuations(
  584. continuations: self.producers.map { $0.0 },
  585. result: .success(())
  586. )
  587. self.producers.removeAll()
  588. onNext = .return(resultAndResume)
  589. case .suspend:
  590. onNext = .suspend
  591. case .none:
  592. // No subscription found, must have been dropped or already finished.
  593. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  594. self.subscriptionsToDrop.remove(at: index)
  595. onNext = .return(
  596. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  597. )
  598. } else {
  599. // Unknown subscriber, i.e. already finished.
  600. onNext = .return(.init(nextResult: .success(nil)))
  601. }
  602. }
  603. return onNext
  604. }
  605. @inlinable
  606. mutating func setContinuation(
  607. _ continuation: ConsumerContinuation,
  608. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  609. ) -> OnSetContinuation {
  610. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  611. return .none
  612. } else {
  613. return .resume(continuation, .failure(CancellationError()))
  614. }
  615. }
  616. @inlinable
  617. mutating func cancel(
  618. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  619. ) -> OnCancelSubscription {
  620. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  621. if let continuation = continuation {
  622. return .resume(continuation, .failure(CancellationError()))
  623. } else {
  624. return .none
  625. }
  626. }
  627. @inlinable
  628. mutating func waitToProduceMore(
  629. _ continuation: ProducerContinuation,
  630. token: Int
  631. ) -> OnWaitToProduceMore {
  632. let onWaitToProduceMore: OnWaitToProduceMore
  633. if self.elements.count < self.bufferSize {
  634. // Buffer has free space, no need to suspend.
  635. onWaitToProduceMore = .resume(continuation, .success(()))
  636. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  637. // Producer was cancelled before suspending.
  638. self.cancelledProducers.remove(at: index)
  639. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  640. } else {
  641. // Store the continuation to resume later.
  642. self.producers.append((continuation, token))
  643. onWaitToProduceMore = .none
  644. }
  645. return onWaitToProduceMore
  646. }
  647. @inlinable
  648. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  649. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  650. self.cancelledProducers.append(token)
  651. return .none
  652. }
  653. let (continuation, _) = self.producers.remove(at: index)
  654. return .resume(continuation, .failure(CancellationError()))
  655. }
  656. @inlinable
  657. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  658. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  659. let producers = self.producers.map { $0.0 }
  660. self.producers.removeAll()
  661. return .resume(
  662. .init(continuations: continuations, result: result.map { nil }),
  663. .init(continuations: producers, result: .success(()))
  664. )
  665. }
  666. @inlinable
  667. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  668. self.subscriptions.subscribe()
  669. }
  670. @inlinable
  671. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  672. // Remove subscriptions with continuations, they need to be failed.
  673. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  674. let consumerContinuations = ConsumerContinuations(
  675. continuations: continuations,
  676. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  677. )
  678. // Remove any others to be failed when they next call 'next'.
  679. let ids = self.subscriptions.removeAllSubscribers()
  680. self.subscriptionsToDrop.append(contentsOf: ids)
  681. return .resume(consumerContinuations)
  682. }
  683. @inlinable
  684. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  685. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  686. let consumerContinuations = ConsumerContinuations(
  687. continuations: continuations,
  688. result: .failure(error)
  689. )
  690. let producers = ProducerContinuations(
  691. continuations: self.producers.map { $0.0 },
  692. result: .failure(error)
  693. )
  694. self.producers.removeAll()
  695. return .resume(consumerContinuations, producers)
  696. }
  697. @inlinable
  698. func nextSubscriptionIsValid() -> Bool {
  699. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  700. }
  701. }
  702. @usableFromInline
  703. struct Finished: Sendable {
  704. /// A deque of elements tagged with IDs.
  705. @usableFromInline
  706. var elements: Elements
  707. /// Active subscriptions.
  708. @usableFromInline
  709. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  710. /// Subscriptions to fail and remove when they next request an element.
  711. @usableFromInline
  712. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  713. /// The terminating result of the sequence.
  714. @usableFromInline
  715. let result: Result<Void, any Error>
  716. @inlinable
  717. init(from state: Initial, result: Result<Void, any Error>) {
  718. self.elements = Elements()
  719. self.subscriptions = Subscriptions()
  720. self.subscriptionsToDrop = []
  721. self.result = result
  722. }
  723. @inlinable
  724. init(from state: Subscribed, result: Result<Void, any Error>) {
  725. self.elements = Elements()
  726. self.subscriptions = state.subscriptions
  727. self.subscriptionsToDrop = []
  728. self.result = result
  729. }
  730. @inlinable
  731. init(from state: Streaming, result: Result<Void, any Error>) {
  732. self.elements = state.elements
  733. self.subscriptions = state.subscriptions
  734. self.subscriptionsToDrop = state.subscriptionsToDrop
  735. self.result = result
  736. }
  737. @inlinable
  738. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  739. let onNext: OnNext
  740. let onNextForSubscription = self.subscriptions.withMutableElementID(
  741. forSubscriber: id
  742. ) { elementID -> (OnNext, Bool) in
  743. let onNext: OnNext
  744. let removeSubscription: Bool
  745. switch self.elements.lookupElement(withID: elementID) {
  746. case .found(let element):
  747. elementID.formNext()
  748. onNext = .return(.init(nextResult: .success(element)))
  749. removeSubscription = false
  750. case .maybeAvailableLater:
  751. onNext = .return(.init(nextResult: self.result.map { nil }))
  752. removeSubscription = true
  753. case .noLongerAvailable:
  754. onNext = .return(
  755. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  756. )
  757. removeSubscription = true
  758. }
  759. return (onNext, removeSubscription)
  760. }
  761. switch onNextForSubscription {
  762. case .return(let result):
  763. onNext = .return(result)
  764. case .none:
  765. // No subscriber with the given ID, it was likely dropped previously.
  766. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  767. self.subscriptionsToDrop.remove(at: index)
  768. onNext = .return(
  769. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  770. )
  771. } else {
  772. // Unknown subscriber, i.e. already finished.
  773. onNext = .return(.init(nextResult: .success(nil)))
  774. }
  775. case .suspend:
  776. fatalError("Internal inconsistency")
  777. }
  778. return onNext
  779. }
  780. @inlinable
  781. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  782. self.subscriptions.subscribe()
  783. }
  784. @inlinable
  785. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  786. // Remove subscriptions with continuations, they need to be failed.
  787. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  788. let consumerContinuations = ConsumerContinuations(
  789. continuations: continuations,
  790. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  791. )
  792. // Remove any others to be failed when they next call 'next'.
  793. let ids = self.subscriptions.removeAllSubscribers()
  794. self.subscriptionsToDrop.append(contentsOf: ids)
  795. return .resume(consumerContinuations)
  796. }
  797. @inlinable
  798. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  799. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  800. let consumerContinuations = ConsumerContinuations(
  801. continuations: continuations,
  802. result: .failure(error)
  803. )
  804. let producers = ProducerContinuations(continuations: [], result: .failure(error))
  805. return .resume(consumerContinuations, producers)
  806. }
  807. @inlinable
  808. func nextSubscriptionIsValid() -> Bool {
  809. self.elements.lowestID == .initial
  810. }
  811. @inlinable
  812. mutating func cancel(
  813. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  814. ) -> OnCancelSubscription {
  815. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  816. if let continuation = continuation {
  817. return .resume(continuation, .failure(CancellationError()))
  818. } else {
  819. return .none
  820. }
  821. }
  822. }
  823. }
  824. @usableFromInline
  825. var _state: State
  826. @inlinable
  827. init(bufferSize: Int) {
  828. self._state = State(bufferSize: bufferSize)
  829. }
  830. @inlinable
  831. var nextSubscriptionIsValid: Bool {
  832. let isValid: Bool
  833. switch self._state {
  834. case .initial:
  835. isValid = true
  836. case .subscribed:
  837. isValid = true
  838. case .streaming(let state):
  839. isValid = state.nextSubscriptionIsValid()
  840. case .finished(let state):
  841. isValid = state.nextSubscriptionIsValid()
  842. case ._modifying:
  843. fatalError("Internal inconsistency")
  844. }
  845. return isValid
  846. }
  847. @usableFromInline
  848. enum OnInvalidateAllSubscriptions {
  849. case resume(ConsumerContinuations)
  850. case none
  851. }
  852. @inlinable
  853. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  854. let onCancel: OnInvalidateAllSubscriptions
  855. switch self._state {
  856. case .initial:
  857. onCancel = .none
  858. case .subscribed(var state):
  859. self._state = ._modifying
  860. onCancel = state.invalidateAllSubscriptions()
  861. self._state = .subscribed(state)
  862. case .streaming(var state):
  863. self._state = ._modifying
  864. onCancel = state.invalidateAllSubscriptions()
  865. self._state = .streaming(state)
  866. case .finished(var state):
  867. self._state = ._modifying
  868. onCancel = state.invalidateAllSubscriptions()
  869. self._state = .finished(state)
  870. case ._modifying:
  871. fatalError("Internal inconsistency")
  872. }
  873. return onCancel
  874. }
  875. @usableFromInline
  876. enum OnYield {
  877. case none
  878. case suspend(Int)
  879. case resume(ConsumerContinuations)
  880. case throwAlreadyFinished
  881. }
  882. @inlinable
  883. mutating func yield(_ element: Element) -> OnYield {
  884. let onYield: OnYield
  885. switch self._state {
  886. case .initial(let state):
  887. self._state = ._modifying
  888. // Move to streaming.
  889. var state = State.Streaming(from: state)
  890. onYield = state.append(element)
  891. self._state = .streaming(state)
  892. case .subscribed(let state):
  893. self._state = ._modifying
  894. var state = State.Streaming(from: state)
  895. onYield = state.append(element)
  896. self._state = .streaming(state)
  897. case .streaming(var state):
  898. self._state = ._modifying
  899. onYield = state.append(element)
  900. self._state = .streaming(state)
  901. case .finished:
  902. onYield = .throwAlreadyFinished
  903. case ._modifying:
  904. fatalError("Internal inconsistency")
  905. }
  906. return onYield
  907. }
  908. @usableFromInline
  909. enum OnFinish {
  910. case none
  911. case resume(ConsumerContinuations, ProducerContinuations)
  912. }
  913. @inlinable
  914. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  915. let onFinish: OnFinish
  916. switch self._state {
  917. case .initial(let state):
  918. self._state = ._modifying
  919. let state = State.Finished(from: state, result: result)
  920. self._state = .finished(state)
  921. onFinish = .none
  922. case .subscribed(var state):
  923. self._state = ._modifying
  924. onFinish = state.finish(result: result)
  925. self._state = .finished(State.Finished(from: state, result: result))
  926. case .streaming(var state):
  927. self._state = ._modifying
  928. onFinish = state.finish(result: result)
  929. self._state = .finished(State.Finished(from: state, result: result))
  930. case .finished:
  931. onFinish = .none
  932. case ._modifying:
  933. fatalError("Internal inconsistency")
  934. }
  935. return onFinish
  936. }
  937. @usableFromInline
  938. enum OnNext {
  939. @usableFromInline
  940. struct ReturnAndResumeProducers {
  941. @usableFromInline
  942. var nextResult: Result<Element?, any Error>
  943. @usableFromInline
  944. var producers: ProducerContinuations
  945. @inlinable
  946. init(
  947. nextResult: Result<Element?, any Error>,
  948. producers: [ProducerContinuation] = [],
  949. producerResult: Result<Void, any Error> = .success(())
  950. ) {
  951. self.nextResult = nextResult
  952. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  953. }
  954. }
  955. case `return`(ReturnAndResumeProducers)
  956. case suspend
  957. }
  958. @inlinable
  959. mutating func nextElement(
  960. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  961. ) -> OnNext {
  962. let onNext: OnNext
  963. switch self._state {
  964. case .initial:
  965. // No subscribers so demand isn't possible.
  966. fatalError("Internal inconsistency")
  967. case .subscribed(var state):
  968. self._state = ._modifying
  969. onNext = state.next(id)
  970. self._state = .subscribed(state)
  971. case .streaming(var state):
  972. self._state = ._modifying
  973. onNext = state.next(id)
  974. self._state = .streaming(state)
  975. case .finished(var state):
  976. self._state = ._modifying
  977. onNext = state.next(id)
  978. self._state = .finished(state)
  979. case ._modifying:
  980. fatalError("Internal inconsistency")
  981. }
  982. return onNext
  983. }
  984. @usableFromInline
  985. enum OnSetContinuation {
  986. case none
  987. case resume(ConsumerContinuation, Result<Element?, any Error>)
  988. }
  989. @inlinable
  990. mutating func setContinuation(
  991. _ continuation: ConsumerContinuation,
  992. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  993. ) -> OnSetContinuation {
  994. let onSetContinuation: OnSetContinuation
  995. switch self._state {
  996. case .initial:
  997. // No subscribers so demand isn't possible.
  998. fatalError("Internal inconsistency")
  999. case .subscribed(var state):
  1000. self._state = ._modifying
  1001. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1002. self._state = .subscribed(state)
  1003. case .streaming(var state):
  1004. self._state = ._modifying
  1005. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1006. self._state = .streaming(state)
  1007. case .finished(let state):
  1008. onSetContinuation = .resume(continuation, state.result.map { _ in nil })
  1009. case ._modifying:
  1010. // All values must have been produced, nothing to wait for.
  1011. fatalError("Internal inconsistency")
  1012. }
  1013. return onSetContinuation
  1014. }
  1015. @usableFromInline
  1016. enum OnCancelSubscription {
  1017. case none
  1018. case resume(ConsumerContinuation, Result<Element?, any Error>)
  1019. }
  1020. @inlinable
  1021. mutating func cancelSubscription(
  1022. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1023. ) -> OnCancelSubscription {
  1024. let onCancel: OnCancelSubscription
  1025. switch self._state {
  1026. case .initial:
  1027. // No subscribers so demand isn't possible.
  1028. fatalError("Internal inconsistency")
  1029. case .subscribed(var state):
  1030. self._state = ._modifying
  1031. onCancel = state.cancel(id)
  1032. self._state = .subscribed(state)
  1033. case .streaming(var state):
  1034. self._state = ._modifying
  1035. onCancel = state.cancel(id)
  1036. self._state = .streaming(state)
  1037. case .finished(var state):
  1038. self._state = ._modifying
  1039. onCancel = state.cancel(id)
  1040. self._state = .finished(state)
  1041. case ._modifying:
  1042. // All values must have been produced, nothing to wait for.
  1043. fatalError("Internal inconsistency")
  1044. }
  1045. return onCancel
  1046. }
  1047. @usableFromInline
  1048. enum OnSubscribe {
  1049. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1050. }
  1051. @inlinable
  1052. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1053. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1054. switch self._state {
  1055. case .initial(let state):
  1056. self._state = ._modifying
  1057. var state = State.Subscribed(from: state)
  1058. id = state.subscribe()
  1059. self._state = .subscribed(state)
  1060. case .subscribed(var state):
  1061. self._state = ._modifying
  1062. id = state.subscribe()
  1063. self._state = .subscribed(state)
  1064. case .streaming(var state):
  1065. self._state = ._modifying
  1066. id = state.subscribe()
  1067. self._state = .streaming(state)
  1068. case .finished(var state):
  1069. self._state = ._modifying
  1070. id = state.subscribe()
  1071. self._state = .finished(state)
  1072. case ._modifying:
  1073. fatalError("Internal inconsistency")
  1074. }
  1075. return id
  1076. }
  1077. @usableFromInline
  1078. enum OnWaitToProduceMore {
  1079. case none
  1080. case resume(ProducerContinuation, Result<Void, any Error>)
  1081. }
  1082. @inlinable
  1083. mutating func waitToProduceMore(
  1084. continuation: ProducerContinuation,
  1085. token: Int
  1086. ) -> OnWaitToProduceMore {
  1087. let onWaitToProduceMore: OnWaitToProduceMore
  1088. switch self._state {
  1089. case .initial, .subscribed:
  1090. // Nothing produced yet, so no reason have to wait to produce.
  1091. fatalError("Internal inconsistency")
  1092. case .streaming(var state):
  1093. self._state = ._modifying
  1094. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1095. self._state = .streaming(state)
  1096. case .finished:
  1097. onWaitToProduceMore = .resume(continuation, .success(()))
  1098. case ._modifying:
  1099. fatalError("Internal inconsistency")
  1100. }
  1101. return onWaitToProduceMore
  1102. }
  1103. @usableFromInline
  1104. typealias OnCancelProducer = OnWaitToProduceMore
  1105. @inlinable
  1106. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1107. let onCancelProducer: OnCancelProducer
  1108. switch self._state {
  1109. case .initial, .subscribed:
  1110. // Nothing produced yet, so no reason have to wait to produce.
  1111. fatalError("Internal inconsistency")
  1112. case .streaming(var state):
  1113. self._state = ._modifying
  1114. onCancelProducer = state.cancelProducer(withToken: token)
  1115. self._state = .streaming(state)
  1116. case .finished:
  1117. // No producers to cancel; do nothing.
  1118. onCancelProducer = .none
  1119. case ._modifying:
  1120. fatalError("Internal inconsistency")
  1121. }
  1122. return onCancelProducer
  1123. }
  1124. @usableFromInline
  1125. enum OnDropResources {
  1126. case none
  1127. case resume(ConsumerContinuations, ProducerContinuations)
  1128. }
  1129. @inlinable
  1130. mutating func dropResources() -> OnDropResources {
  1131. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1132. let onDrop: OnDropResources
  1133. switch self._state {
  1134. case .initial(let state):
  1135. self._state = ._modifying
  1136. onDrop = .none
  1137. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1138. case .subscribed(var state):
  1139. self._state = ._modifying
  1140. onDrop = state.dropResources(error: error)
  1141. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1142. case .streaming(var state):
  1143. self._state = ._modifying
  1144. onDrop = state.dropResources(error: error)
  1145. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1146. case .finished(var state):
  1147. self._state = ._modifying
  1148. onDrop = state.dropResources(error: error)
  1149. self._state = .finished(state)
  1150. case ._modifying:
  1151. fatalError("Internal inconsistency")
  1152. }
  1153. return onDrop
  1154. }
  1155. }
  1156. extension _BroadcastSequenceStateMachine {
  1157. /// A collection of elements tagged with an identifier.
  1158. ///
  1159. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1160. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1161. @usableFromInline
  1162. struct Elements: Sendable {
  1163. /// The ID of an element
  1164. @usableFromInline
  1165. struct ID: Hashable, Sendable, Comparable, Strideable {
  1166. @usableFromInline
  1167. private(set) var rawValue: Int
  1168. @usableFromInline
  1169. static var initial: Self {
  1170. ID(id: 0)
  1171. }
  1172. private init(id: Int) {
  1173. self.rawValue = id
  1174. }
  1175. @inlinable
  1176. mutating func formNext() {
  1177. self.rawValue += 1
  1178. }
  1179. @inlinable
  1180. func next() -> Self {
  1181. var copy = self
  1182. copy.formNext()
  1183. return copy
  1184. }
  1185. @inlinable
  1186. func distance(to other: Self) -> Int {
  1187. other.rawValue - self.rawValue
  1188. }
  1189. @inlinable
  1190. func advanced(by n: Int) -> Self {
  1191. var copy = self
  1192. copy.rawValue += n
  1193. return copy
  1194. }
  1195. @inlinable
  1196. static func < (lhs: Self, rhs: Self) -> Bool {
  1197. lhs.rawValue < rhs.rawValue
  1198. }
  1199. }
  1200. @usableFromInline
  1201. struct _IdentifiableElement: Sendable {
  1202. @usableFromInline
  1203. var element: Element
  1204. @usableFromInline
  1205. var id: ID
  1206. @inlinable
  1207. init(element: Element, id: ID) {
  1208. self.element = element
  1209. self.id = id
  1210. }
  1211. }
  1212. @usableFromInline
  1213. var _elements: Deque<_IdentifiableElement>
  1214. @usableFromInline
  1215. var _nextID: ID
  1216. @inlinable
  1217. init() {
  1218. self._nextID = .initial
  1219. self._elements = []
  1220. }
  1221. @inlinable
  1222. mutating func nextElementID() -> ID {
  1223. let id = self._nextID
  1224. self._nextID.formNext()
  1225. return id
  1226. }
  1227. /// The highest ID of the stored elements; `nil` if there are no elements.
  1228. @inlinable
  1229. var highestID: ID? { self._elements.last?.id }
  1230. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1231. @inlinable
  1232. var lowestID: ID? { self._elements.first?.id }
  1233. /// The number of stored elements.
  1234. @inlinable
  1235. var count: Int { self._elements.count }
  1236. /// Whether there are no stored elements.
  1237. @inlinable
  1238. var isEmpty: Bool { self._elements.isEmpty }
  1239. /// Appends an element to the collection.
  1240. @inlinable
  1241. mutating func append(_ element: Element) {
  1242. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1243. }
  1244. /// Removes the first element from the collection.
  1245. @discardableResult
  1246. @inlinable
  1247. mutating func removeFirst() -> Element {
  1248. let removed = self._elements.removeFirst()
  1249. return removed.element
  1250. }
  1251. @usableFromInline
  1252. enum ElementLookup {
  1253. /// The element was found in the collection.
  1254. case found(Element)
  1255. /// The element isn't in the collection, but it could be in the future.
  1256. case maybeAvailableLater
  1257. /// The element was in the collection, but is no longer available.
  1258. case noLongerAvailable
  1259. }
  1260. /// Lookup the element with the given ID.
  1261. ///
  1262. /// - Parameter id: The ID of the element to lookup.
  1263. @inlinable
  1264. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1265. guard let low = self.lowestID, let high = self.highestID else {
  1266. // Must be empty.
  1267. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1268. }
  1269. assert(low <= high)
  1270. let lookup: ElementLookup
  1271. if id < low {
  1272. lookup = .noLongerAvailable
  1273. } else if id > high {
  1274. lookup = .maybeAvailableLater
  1275. } else {
  1276. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1277. // into the deque by looking at the offsets.
  1278. let offset = low.distance(to: id)
  1279. let index = self._elements.startIndex.advanced(by: offset)
  1280. lookup = .found(self._elements[index].element)
  1281. }
  1282. return lookup
  1283. }
  1284. }
  1285. }
  1286. extension _BroadcastSequenceStateMachine {
  1287. /// A collection of subscriptions.
  1288. @usableFromInline
  1289. struct Subscriptions: Sendable {
  1290. @usableFromInline
  1291. struct ID: Hashable, Sendable {
  1292. @usableFromInline
  1293. private(set) var rawValue: Int
  1294. @inlinable
  1295. init() {
  1296. self.rawValue = 0
  1297. }
  1298. @inlinable
  1299. mutating func formNext() {
  1300. self.rawValue += 1
  1301. }
  1302. @inlinable
  1303. func next() -> Self {
  1304. var copy = self
  1305. copy.formNext()
  1306. return copy
  1307. }
  1308. }
  1309. @usableFromInline
  1310. struct _Subscriber: Sendable {
  1311. /// The ID of the subscriber.
  1312. @usableFromInline
  1313. var id: ID
  1314. /// The ID of the next element the subscriber will consume.
  1315. @usableFromInline
  1316. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1317. /// A continuation which which will be resumed when the next element becomes available.
  1318. @usableFromInline
  1319. var continuation: ConsumerContinuation?
  1320. @inlinable
  1321. init(
  1322. id: ID,
  1323. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1324. continuation: ConsumerContinuation? = nil
  1325. ) {
  1326. self.id = id
  1327. self.nextElementID = nextElementID
  1328. self.continuation = continuation
  1329. }
  1330. /// Returns and sets the continuation to `nil` if one exists.
  1331. ///
  1332. /// The next element ID is advanced if a contination exists.
  1333. ///
  1334. /// - Returns: The continuation, if one existed.
  1335. @inlinable
  1336. mutating func takeContinuation() -> ConsumerContinuation? {
  1337. guard let continuation = self.continuation else { return nil }
  1338. self.continuation = nil
  1339. self.nextElementID.formNext()
  1340. return continuation
  1341. }
  1342. }
  1343. @usableFromInline
  1344. var _subscribers: [_Subscriber]
  1345. @usableFromInline
  1346. var _nextSubscriberID: ID
  1347. @inlinable
  1348. init() {
  1349. self._subscribers = []
  1350. self._nextSubscriberID = ID()
  1351. }
  1352. /// Returns the number of subscribers.
  1353. @inlinable
  1354. var count: Int { self._subscribers.count }
  1355. /// Returns whether the collection is empty.
  1356. @inlinable
  1357. var isEmpty: Bool { self._subscribers.isEmpty }
  1358. /// Adds a new subscriber and returns its unique ID.
  1359. ///
  1360. /// - Returns: The ID of the new subscriber.
  1361. @inlinable
  1362. mutating func subscribe() -> ID {
  1363. let id = self._nextSubscriberID
  1364. self._nextSubscriberID.formNext()
  1365. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1366. return id
  1367. }
  1368. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1369. ///
  1370. /// - Parameters:
  1371. /// - id: The ID of the subscriber.
  1372. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1373. /// a boolean indicating whether the subscriber should be removed.
  1374. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1375. /// given ID.
  1376. @inlinable
  1377. mutating func withMutableElementID<R>(
  1378. forSubscriber id: ID,
  1379. _ body: (
  1380. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1381. ) -> (result: R, removeSubscription: Bool)
  1382. ) -> R? {
  1383. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1384. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1385. if removeSubscription {
  1386. self._subscribers.remove(at: index)
  1387. }
  1388. return result
  1389. }
  1390. /// Sets the continuation for the subscription with the given ID.
  1391. /// - Parameters:
  1392. /// - continuation: The continuation to set.
  1393. /// - id: The ID of the subscriber.
  1394. /// - Returns: A boolean indicating whether the continuation was set or not.
  1395. @inlinable
  1396. mutating func setContinuation(
  1397. _ continuation: ConsumerContinuation,
  1398. forSubscriber id: ID
  1399. ) -> Bool {
  1400. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1401. return false
  1402. }
  1403. assert(self._subscribers[index].continuation == nil)
  1404. self._subscribers[index].continuation = continuation
  1405. return true
  1406. }
  1407. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1408. @inlinable
  1409. func subscribers(
  1410. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1411. ) -> [ID] {
  1412. return self._subscribers.filter {
  1413. $0.nextElementID == id
  1414. }.map {
  1415. $0.id
  1416. }
  1417. }
  1418. /// Removes the subscriber with the given ID.
  1419. /// - Parameter id: The ID of the subscriber to remove.
  1420. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1421. /// associated with the subscriber.
  1422. @inlinable
  1423. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1424. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1425. return (false, nil)
  1426. }
  1427. let continuation = self._subscribers[index].continuation
  1428. self._subscribers.remove(at: index)
  1429. return (true, continuation)
  1430. }
  1431. /// Remove all subscribers in the given array of IDs.
  1432. @inlinable
  1433. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1434. self._subscribers.removeAll {
  1435. idsToRemove.contains($0.id)
  1436. }
  1437. }
  1438. /// Remove all subscribers and return their IDs.
  1439. @inlinable
  1440. mutating func removeAllSubscribers() -> [ID] {
  1441. let subscribers = self._subscribers.map { $0.id }
  1442. self._subscribers.removeAll()
  1443. return subscribers
  1444. }
  1445. /// Returns any continuations set on subscribers, unsetting at the same time.
  1446. @inlinable
  1447. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1448. // Avoid allocs if there's only one subscriber.
  1449. let count = self._countPendingContinuations()
  1450. let result: _OneOrMany<ConsumerContinuation>?
  1451. switch count {
  1452. case 0:
  1453. result = nil
  1454. case 1:
  1455. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1456. let continuation = self._subscribers[index].takeContinuation()!
  1457. result = .one(continuation)
  1458. default:
  1459. var continuations = [ConsumerContinuation]()
  1460. continuations.reserveCapacity(count)
  1461. for index in self._subscribers.indices {
  1462. if let continuation = self._subscribers[index].takeContinuation() {
  1463. continuations.append(continuation)
  1464. }
  1465. }
  1466. result = .many(continuations)
  1467. }
  1468. return result
  1469. }
  1470. /// Removes all subscribers which have continuations and return their continuations.
  1471. @inlinable
  1472. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
  1473. // Avoid allocs if there's only one subscriber.
  1474. let count = self._countPendingContinuations()
  1475. let result: _OneOrMany<ConsumerContinuation>
  1476. switch count {
  1477. case 0:
  1478. result = .many([])
  1479. case 1:
  1480. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1481. let subscription = self._subscribers.remove(at: index)
  1482. result = .one(subscription.continuation!)
  1483. default:
  1484. var continuations = [ConsumerContinuation]()
  1485. continuations.reserveCapacity(count)
  1486. var removable = [ID]()
  1487. removable.reserveCapacity(count)
  1488. for subscription in self._subscribers {
  1489. if let continuation = subscription.continuation {
  1490. continuations.append(continuation)
  1491. removable.append(subscription.id)
  1492. }
  1493. }
  1494. self._subscribers.removeAll {
  1495. removable.contains($0.id)
  1496. }
  1497. result = .many(continuations)
  1498. }
  1499. return result
  1500. }
  1501. @inlinable
  1502. func _countPendingContinuations() -> Int {
  1503. return self._subscribers.reduce(into: 0) { count, subscription in
  1504. if subscription.continuation != nil {
  1505. count += 1
  1506. }
  1507. }
  1508. }
  1509. }
  1510. }
  1511. extension _BroadcastSequenceStateMachine {
  1512. // TODO: tiny array
  1513. @usableFromInline
  1514. enum _OneOrMany<Value> {
  1515. case one(Value)
  1516. case many([Value])
  1517. }
  1518. }