BroadcastAsyncSequence.swift 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  18. ///
  19. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  20. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  21. ///
  22. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  23. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  24. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  25. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  26. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  27. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  28. ///
  29. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  30. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  31. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  32. @usableFromInline
  33. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  34. @usableFromInline
  35. let _storage: _BroadcastSequenceStorage<Element>
  36. @inlinable
  37. init(_storage: _BroadcastSequenceStorage<Element>) {
  38. self._storage = _storage
  39. }
  40. /// Make a new stream and continuation.
  41. ///
  42. /// - Parameters:
  43. /// - elementType: The type of element this sequence produces.
  44. /// - bufferSize: The number of elements this sequence may store.
  45. /// - Returns: A stream and continuation.
  46. @inlinable
  47. static func makeStream(
  48. of elementType: Element.Type = Element.self,
  49. bufferSize: Int
  50. ) -> (stream: Self, continuation: Self.Source) {
  51. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  52. let stream = Self(_storage: storage)
  53. let continuation = Self.Source(_storage: storage)
  54. return (stream, continuation)
  55. }
  56. @inlinable
  57. func makeAsyncIterator() -> AsyncIterator {
  58. let id = self._storage.subscribe()
  59. return AsyncIterator(_storage: _storage, id: id)
  60. }
  61. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  62. /// consume elements.
  63. ///
  64. /// This function can return `false` if there are active subscribers or the internal buffer no
  65. /// longer contains the first element in the sequence.
  66. @inlinable
  67. var isKnownSafeForNextSubscriber: Bool {
  68. self._storage.isKnownSafeForNextSubscriber
  69. }
  70. /// Invalidates all active subscribers.
  71. ///
  72. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  73. @inlinable
  74. func invalidateAllSubscriptions() {
  75. self._storage.invalidateAllSubscriptions()
  76. }
  77. }
  78. // MARK: - AsyncIterator
  79. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  80. extension BroadcastAsyncSequence {
  81. @usableFromInline
  82. struct AsyncIterator: AsyncIteratorProtocol {
  83. @usableFromInline
  84. let _storage: _BroadcastSequenceStorage<Element>
  85. @usableFromInline
  86. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  87. @inlinable
  88. init(
  89. _storage: _BroadcastSequenceStorage<Element>,
  90. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  91. ) {
  92. self._storage = _storage
  93. self._subscriberID = id
  94. }
  95. @inlinable
  96. mutating func next() async throws -> Element? {
  97. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  98. }
  99. }
  100. }
  101. // MARK: - Continuation
  102. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  103. extension BroadcastAsyncSequence {
  104. @usableFromInline
  105. struct Source: Sendable {
  106. @usableFromInline
  107. let _storage: _BroadcastSequenceStorage<Element>
  108. @usableFromInline
  109. init(_storage: _BroadcastSequenceStorage<Element>) {
  110. self._storage = _storage
  111. }
  112. @inlinable
  113. func write(_ element: Element) async throws {
  114. try await self._storage.yield(element)
  115. }
  116. @inlinable
  117. func finish(with result: Result<Void, Error>) {
  118. self._storage.finish(result)
  119. }
  120. @inlinable
  121. func finish() {
  122. self.finish(with: .success(()))
  123. }
  124. @inlinable
  125. func finish(throwing error: Error) {
  126. self.finish(with: .failure(error))
  127. }
  128. }
  129. }
  130. @usableFromInline
  131. enum BroadcastAsyncSequenceError: Error {
  132. /// The consumer was too slow.
  133. case consumingTooSlow
  134. /// The producer has already finished.
  135. case productionAlreadyFinished
  136. }
  137. // MARK: - Storage
  138. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  139. @usableFromInline
  140. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  141. @usableFromInline
  142. let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
  143. @inlinable
  144. init(bufferSize: Int) {
  145. self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  146. }
  147. deinit {
  148. let onDrop = self._state.withLockedValue { state in
  149. state.dropResources()
  150. }
  151. switch onDrop {
  152. case .none:
  153. ()
  154. case .resume(let consumers, let producers):
  155. consumers.resume()
  156. producers.resume()
  157. }
  158. }
  159. // MARK - Producer
  160. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  161. ///
  162. /// - Parameter element: The element to write.
  163. @inlinable
  164. func yield(_ element: Element) async throws {
  165. let onYield = self._state.withLockedValue { state in state.yield(element) }
  166. switch onYield {
  167. case .none:
  168. ()
  169. case .resume(let continuations):
  170. continuations.resume()
  171. case .suspend(let token):
  172. try await withTaskCancellationHandler {
  173. try await withCheckedThrowingContinuation { continuation in
  174. let onProduceMore = self._state.withLockedValue { state in
  175. state.waitToProduceMore(continuation: continuation, token: token)
  176. }
  177. switch onProduceMore {
  178. case .resume(let continuation, let result):
  179. continuation.resume(with: result)
  180. case .none:
  181. ()
  182. }
  183. }
  184. } onCancel: {
  185. let onCancel = self._state.withLockedValue { state in
  186. state.cancelProducer(withToken: token)
  187. }
  188. switch onCancel {
  189. case .resume(let continuation, let result):
  190. continuation.resume(with: result)
  191. case .none:
  192. ()
  193. }
  194. }
  195. case .throwAlreadyFinished:
  196. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  197. }
  198. }
  199. /// Indicate that no more values will be produced.
  200. ///
  201. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  202. @inlinable
  203. func finish(_ result: Result<Void, Error>) {
  204. let action = self._state.withLockedValue { state in state.finish(result: result) }
  205. switch action {
  206. case .none:
  207. ()
  208. case .resume(let subscribers, let producers):
  209. subscribers.resume()
  210. producers.resume()
  211. }
  212. }
  213. // MARK: - Consumer
  214. /// Create a subscription to the stream.
  215. ///
  216. /// - Returns: Returns a unique subscription ID.
  217. @inlinable
  218. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  219. self._state.withLockedValue { $0.subscribe() }
  220. }
  221. /// Returns the next element for the given subscriber, if it is available.
  222. ///
  223. /// - Parameter id: The ID of the subscriber requesting the element.
  224. /// - Returns: The next element or `nil` if the stream has been terminated.
  225. @inlinable
  226. func nextElement(
  227. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  228. ) async throws -> Element? {
  229. let onNext = self._state.withLockedValue { $0.nextElement(forSubscriber: id) }
  230. switch onNext {
  231. case .return(let returnAndProduceMore):
  232. returnAndProduceMore.producers.resume()
  233. return try returnAndProduceMore.nextResult.get()
  234. case .suspend:
  235. return try await withTaskCancellationHandler {
  236. return try await withCheckedThrowingContinuation { continuation in
  237. let onSetContinuation = self._state.withLockedValue { state in
  238. state.setContinuation(continuation, forSubscription: id)
  239. }
  240. switch onSetContinuation {
  241. case .resume(let continuation, let result):
  242. continuation.resume(with: result)
  243. case .none:
  244. ()
  245. }
  246. }
  247. } onCancel: {
  248. let onCancel = self._state.withLockedValue { state in
  249. state.cancelSubscription(withID: id)
  250. }
  251. switch onCancel {
  252. case .resume(let continuation, let result):
  253. continuation.resume(with: result)
  254. case .none:
  255. ()
  256. }
  257. }
  258. }
  259. }
  260. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  261. /// elements.
  262. @inlinable
  263. var isKnownSafeForNextSubscriber: Bool {
  264. self._state.withLockedValue { state in
  265. state.nextSubscriptionIsValid
  266. }
  267. }
  268. /// Invalidates all active subscriptions.
  269. @inlinable
  270. func invalidateAllSubscriptions() {
  271. let action = self._state.withLockedValue { state in
  272. state.invalidateAllSubscriptions()
  273. }
  274. switch action {
  275. case .resume(let continuations):
  276. continuations.resume()
  277. case .none:
  278. ()
  279. }
  280. }
  281. }
  282. // MARK: - State machine
  283. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  284. @usableFromInline
  285. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  286. @usableFromInline
  287. typealias ConsumerContinuation = CheckedContinuation<Element?, Error>
  288. @usableFromInline
  289. typealias ProducerContinuation = CheckedContinuation<Void, Error>
  290. @usableFromInline
  291. struct ConsumerContinuations {
  292. @usableFromInline
  293. var continuations: _OneOrMany<ConsumerContinuation>
  294. @usableFromInline
  295. var result: Result<Element?, Error>
  296. @inlinable
  297. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, Error>) {
  298. self.continuations = continuations
  299. self.result = result
  300. }
  301. @inlinable
  302. func resume() {
  303. switch self.continuations {
  304. case .one(let continuation):
  305. continuation.resume(with: self.result)
  306. case .many(let continuations):
  307. for continuation in continuations {
  308. continuation.resume(with: self.result)
  309. }
  310. }
  311. }
  312. }
  313. @usableFromInline
  314. struct ProducerContinuations {
  315. @usableFromInline
  316. var continuations: [ProducerContinuation]
  317. @usableFromInline
  318. var result: Result<Void, Error>
  319. @inlinable
  320. init(continuations: [ProducerContinuation], result: Result<Void, Error>) {
  321. self.continuations = continuations
  322. self.result = result
  323. }
  324. @inlinable
  325. func resume() {
  326. for continuation in self.continuations {
  327. continuation.resume(with: self.result)
  328. }
  329. }
  330. }
  331. @usableFromInline
  332. enum State: Sendable {
  333. /// No subscribers and no elements have been produced.
  334. case initial(Initial)
  335. /// Subscribers exist but no elements have been produced.
  336. case subscribed(Subscribed)
  337. /// Elements have been produced, there may or may not be subscribers.
  338. case streaming(Streaming)
  339. /// No more elements will be produced. There may or may not been subscribers.
  340. case finished(Finished)
  341. /// Temporary state to avoid CoWs.
  342. case _modifying
  343. @inlinable
  344. init(bufferSize: Int) {
  345. self = .initial(Initial(bufferSize: bufferSize))
  346. }
  347. @usableFromInline
  348. struct Initial: Sendable {
  349. @usableFromInline
  350. let bufferSize: Int
  351. @inlinable
  352. init(bufferSize: Int) {
  353. self.bufferSize = bufferSize
  354. }
  355. }
  356. @usableFromInline
  357. struct Subscribed: Sendable {
  358. /// Active subscriptions.
  359. @usableFromInline
  360. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  361. /// Subscriptions to fail and remove when they next request an element.
  362. @usableFromInline
  363. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  364. /// The maximum size of the element buffer.
  365. @usableFromInline
  366. let bufferSize: Int
  367. @inlinable
  368. init(from state: Initial) {
  369. self.subscriptions = Subscriptions()
  370. self.subscriptionsToDrop = []
  371. self.bufferSize = state.bufferSize
  372. }
  373. @inlinable
  374. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  375. guard let continuations = self.subscriptions.removeSubscribersWithContinuations() else {
  376. return .none
  377. }
  378. return .resume(
  379. .init(continuations: continuations, result: result.map { nil }),
  380. .init(continuations: [], result: .success(()))
  381. )
  382. }
  383. @inlinable
  384. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  385. // Not streaming, so suspend or remove if the subscription should be dropped.
  386. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  387. return .suspend
  388. }
  389. self.subscriptionsToDrop.remove(at: index)
  390. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  391. }
  392. @inlinable
  393. mutating func cancel(
  394. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  395. ) -> OnCancelSubscription {
  396. let (removed, continuation) = self.subscriptions.removeSubscriber(withID: id)
  397. assert(removed)
  398. if let continuation = continuation {
  399. return .resume(continuation, .failure(CancellationError()))
  400. } else {
  401. return .none
  402. }
  403. }
  404. @inlinable
  405. mutating func setContinuation(
  406. _ continuation: ConsumerContinuation,
  407. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  408. ) -> OnSetContinuation {
  409. let didSet = self.subscriptions.setContinuation(continuation, forSubscriber: id)
  410. return didSet ? .none : .resume(continuation, .failure(CancellationError()))
  411. }
  412. @inlinable
  413. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  414. self.subscriptions.subscribe()
  415. }
  416. @inlinable
  417. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  418. let ids = self.subscriptions.removeAllSubscribers()
  419. self.subscriptionsToDrop.append(contentsOf: ids)
  420. return .none
  421. }
  422. @inlinable
  423. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  424. if let continuations = self.subscriptions.removeSubscribersWithContinuations() {
  425. let consumerContinuations = ConsumerContinuations(
  426. continuations: continuations,
  427. result: .failure(error)
  428. )
  429. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  430. return .resume(consumerContinuations, producerContinuations)
  431. } else {
  432. return .none
  433. }
  434. }
  435. }
  436. @usableFromInline
  437. struct Streaming: Sendable {
  438. /// A deque of elements tagged with IDs.
  439. @usableFromInline
  440. var elements: Elements
  441. /// The maximum size of the element buffer.
  442. @usableFromInline
  443. let bufferSize: Int
  444. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  445. /// Producers which have been suspended.
  446. @usableFromInline
  447. var producers: [(ProducerContinuation, Int)]
  448. /// The IDs of producers which have been cancelled.
  449. @usableFromInline
  450. var cancelledProducers: [Int]
  451. /// The next token for a producer.
  452. @usableFromInline
  453. var producerToken: Int
  454. /// Active subscriptions.
  455. @usableFromInline
  456. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  457. /// Subscriptions to fail and remove when they next request an element.
  458. @usableFromInline
  459. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  460. @inlinable
  461. init(from state: Initial) {
  462. self.elements = Elements()
  463. self.producers = []
  464. self.producerToken = 0
  465. self.cancelledProducers = []
  466. self.subscriptions = Subscriptions()
  467. self.subscriptionsToDrop = []
  468. self.bufferSize = state.bufferSize
  469. }
  470. @inlinable
  471. init(from state: Subscribed) {
  472. self.elements = Elements()
  473. self.producers = []
  474. self.producerToken = 0
  475. self.cancelledProducers = []
  476. self.subscriptions = state.subscriptions
  477. self.subscriptionsToDrop = state.subscriptionsToDrop
  478. self.bufferSize = state.bufferSize
  479. }
  480. @inlinable
  481. mutating func append(_ element: Element) -> OnYield {
  482. let onYield: OnYield
  483. self.elements.append(element)
  484. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  485. // If the buffer is too large then:
  486. // - if all subscribers are equally slow suspend the producer
  487. // - if some subscribers are slow then remove them and the oldest value
  488. // - if no subscribers are slow then remove the oldest value
  489. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  490. switch slowConsumers.count {
  491. case 0:
  492. if self.subscriptions.isEmpty {
  493. // No consumers.
  494. let token = self.producerToken
  495. self.producerToken += 1
  496. onYield = .suspend(token)
  497. } else {
  498. // No consumers are slow. Remove the oldest value.
  499. self.elements.removeFirst()
  500. onYield = .none
  501. }
  502. case self.subscriptions.count:
  503. // All consumers are slow; stop the production of new value.
  504. let token = self.producerToken
  505. self.producerToken += 1
  506. onYield = .suspend(token)
  507. default:
  508. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  509. // oldest value.
  510. self.elements.removeFirst()
  511. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  512. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  513. onYield = .none
  514. }
  515. } else {
  516. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  517. // must be waiting for the value we just appended.
  518. let continuations = self.subscriptions.takeContinuations().map {
  519. ConsumerContinuations(continuations: $0, result: .success(element))
  520. }
  521. if let continuations = continuations {
  522. onYield = .resume(continuations)
  523. } else {
  524. onYield = .none
  525. }
  526. }
  527. return onYield
  528. }
  529. @inlinable
  530. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  531. let onNext: OnNext
  532. // 1. Lookup the subscriber by ID to get their next offset
  533. // 2. If the element exists, update the element pointer and return the element
  534. // 3. Else if the ID is in the future, wait
  535. // 4. Else the ID is in the past, fail and remove the subscription.
  536. // Lookup the subscriber with the given ID.
  537. let onNextForSubscription = self.subscriptions.withMutableElementID(
  538. forSubscriber: id
  539. ) { elementID -> (OnNext, Bool) in
  540. let onNext: OnNext
  541. let removeSubscription: Bool
  542. // Subscriber exists; do we have the element it requires next?
  543. switch self.elements.lookupElement(withID: elementID) {
  544. case .found(let element):
  545. // Element exists in the buffer. Advance our element ID.
  546. elementID.formNext()
  547. onNext = .return(.init(nextResult: .success(element)))
  548. removeSubscription = false
  549. case .maybeAvailableLater:
  550. // Element may exist in the future.
  551. onNext = .suspend
  552. removeSubscription = false
  553. case .noLongerAvailable:
  554. // Element existed in the past but was dropped from the buffer.
  555. onNext = .return(
  556. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  557. )
  558. removeSubscription = true
  559. }
  560. return (onNext, removeSubscription)
  561. }
  562. switch onNextForSubscription {
  563. case .return(var resultAndResume):
  564. // The producer only suspends when all consumers are equally slow or there are no
  565. // consumers at all. The latter can't be true: this function can only be called by a
  566. // consumer. The former can't be true anymore because consumption isn't concurrent
  567. // so this consumer must be faster than the others so let the producer resume.
  568. //
  569. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  570. // to produce until the producer provides more values.
  571. resultAndResume.producers = ProducerContinuations(
  572. continuations: self.producers.map { $0.0 },
  573. result: .success(())
  574. )
  575. self.producers.removeAll()
  576. onNext = .return(resultAndResume)
  577. case .suspend:
  578. onNext = .suspend
  579. case .none:
  580. // No subscription found, must have been dropped or already finished.
  581. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  582. self.subscriptionsToDrop.remove(at: index)
  583. onNext = .return(
  584. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  585. )
  586. } else {
  587. // Unknown subscriber, i.e. already finished.
  588. onNext = .return(.init(nextResult: .success(nil)))
  589. }
  590. }
  591. return onNext
  592. }
  593. @inlinable
  594. mutating func setContinuation(
  595. _ continuation: ConsumerContinuation,
  596. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  597. ) -> OnSetContinuation {
  598. let didSet = self.subscriptions.setContinuation(continuation, forSubscriber: id)
  599. return didSet ? .none : .resume(continuation, .failure(CancellationError()))
  600. }
  601. @inlinable
  602. mutating func cancel(
  603. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  604. ) -> OnCancelSubscription {
  605. let (removed, continuation) = self.subscriptions.removeSubscriber(withID: id)
  606. assert(removed)
  607. if let continuation = continuation {
  608. return .resume(continuation, .failure(CancellationError()))
  609. } else {
  610. return .none
  611. }
  612. }
  613. @inlinable
  614. mutating func waitToProduceMore(
  615. _ continuation: ProducerContinuation,
  616. token: Int
  617. ) -> OnWaitToProduceMore {
  618. let onWaitToProduceMore: OnWaitToProduceMore
  619. if self.elements.count < self.bufferSize {
  620. // Buffer has free space, no need to suspend.
  621. onWaitToProduceMore = .resume(continuation, .success(()))
  622. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  623. // Producer was cancelled before suspending.
  624. self.cancelledProducers.remove(at: index)
  625. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  626. } else {
  627. // Store the continuation to resume later.
  628. self.producers.append((continuation, token))
  629. onWaitToProduceMore = .none
  630. }
  631. return onWaitToProduceMore
  632. }
  633. @inlinable
  634. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  635. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  636. self.cancelledProducers.append(token)
  637. return .none
  638. }
  639. let (continuation, _) = self.producers.remove(at: index)
  640. return .resume(continuation, .failure(CancellationError()))
  641. }
  642. @inlinable
  643. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  644. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  645. let producers = self.producers.map { $0.0 }
  646. self.producers.removeAll()
  647. return .resume(
  648. .init(continuations: continuations ?? .many([]), result: result.map { nil }),
  649. .init(continuations: producers, result: .success(()))
  650. )
  651. }
  652. @inlinable
  653. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  654. self.subscriptions.subscribe()
  655. }
  656. @inlinable
  657. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  658. let onCancel: OnInvalidateAllSubscriptions
  659. // Remove subscriptions with continuations, they need to be failed.
  660. switch self.subscriptions.removeSubscribersWithContinuations() {
  661. case .some(let oneOrMany):
  662. let continuations = ConsumerContinuations(
  663. continuations: oneOrMany,
  664. result: .failure(
  665. BroadcastAsyncSequenceError.consumingTooSlow
  666. )
  667. )
  668. onCancel = .resume(continuations)
  669. case .none:
  670. onCancel = .none
  671. }
  672. // Remove any others to be failed when they next call 'next'.
  673. let ids = self.subscriptions.removeAllSubscribers()
  674. self.subscriptionsToDrop.append(contentsOf: ids)
  675. return onCancel
  676. }
  677. @inlinable
  678. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  679. let consumers = self.subscriptions.removeSubscribersWithContinuations().map {
  680. ConsumerContinuations(continuations: $0, result: .failure(error))
  681. }
  682. let producers = ProducerContinuations(
  683. continuations: self.producers.map { $0.0 },
  684. result: .failure(error)
  685. )
  686. self.producers.removeAll()
  687. return .resume(
  688. consumers ?? ConsumerContinuations(continuations: .many([]), result: .failure(error)),
  689. producers
  690. )
  691. }
  692. @inlinable
  693. func nextSubscriptionIsValid() -> Bool {
  694. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  695. }
  696. }
  697. @usableFromInline
  698. struct Finished: Sendable {
  699. /// A deque of elements tagged with IDs.
  700. @usableFromInline
  701. var elements: Elements
  702. /// Active subscriptions.
  703. @usableFromInline
  704. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  705. /// Subscriptions to fail and remove when they next request an element.
  706. @usableFromInline
  707. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  708. /// The terminating result of the sequence.
  709. @usableFromInline
  710. let result: Result<Void, Error>
  711. @inlinable
  712. init(from state: Initial, result: Result<Void, Error>) {
  713. self.elements = Elements()
  714. self.subscriptions = Subscriptions()
  715. self.subscriptionsToDrop = []
  716. self.result = result
  717. }
  718. @inlinable
  719. init(from state: Subscribed, result: Result<Void, Error>) {
  720. self.elements = Elements()
  721. self.subscriptions = state.subscriptions
  722. self.subscriptionsToDrop = []
  723. self.result = result
  724. }
  725. @inlinable
  726. init(from state: Streaming, result: Result<Void, Error>) {
  727. self.elements = state.elements
  728. self.subscriptions = state.subscriptions
  729. self.subscriptionsToDrop = state.subscriptionsToDrop
  730. self.result = result
  731. }
  732. @inlinable
  733. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  734. let onNext: OnNext
  735. let onNextForSubscription = self.subscriptions.withMutableElementID(
  736. forSubscriber: id
  737. ) { elementID -> (OnNext, Bool) in
  738. let onNext: OnNext
  739. let removeSubscription: Bool
  740. switch self.elements.lookupElement(withID: elementID) {
  741. case .found(let element):
  742. elementID.formNext()
  743. onNext = .return(.init(nextResult: .success(element)))
  744. removeSubscription = false
  745. case .maybeAvailableLater:
  746. onNext = .return(.init(nextResult: self.result.map { nil }))
  747. removeSubscription = true
  748. case .noLongerAvailable:
  749. onNext = .return(
  750. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  751. )
  752. removeSubscription = true
  753. }
  754. return (onNext, removeSubscription)
  755. }
  756. switch onNextForSubscription {
  757. case .return(let result):
  758. onNext = .return(result)
  759. case .none:
  760. // No subscriber with the given ID, it was likely dropped previously.
  761. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  762. self.subscriptionsToDrop.remove(at: index)
  763. onNext = .return(
  764. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  765. )
  766. } else {
  767. // Unknown subscriber, i.e. already finished.
  768. onNext = .return(.init(nextResult: .success(nil)))
  769. }
  770. case .suspend:
  771. fatalError("Internal inconsistency")
  772. }
  773. return onNext
  774. }
  775. @inlinable
  776. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  777. self.subscriptions.subscribe()
  778. }
  779. @inlinable
  780. func nextSubscriptionIsValid() -> Bool {
  781. self.elements.lowestID == .initial
  782. }
  783. }
  784. }
  785. @usableFromInline
  786. var _state: State
  787. @inlinable
  788. init(bufferSize: Int) {
  789. self._state = State(bufferSize: bufferSize)
  790. }
  791. @inlinable
  792. var nextSubscriptionIsValid: Bool {
  793. let isValid: Bool
  794. switch self._state {
  795. case .initial:
  796. isValid = true
  797. case .subscribed:
  798. isValid = true
  799. case .streaming(let state):
  800. isValid = state.nextSubscriptionIsValid()
  801. case .finished(let state):
  802. isValid = state.nextSubscriptionIsValid()
  803. case ._modifying:
  804. fatalError("Internal inconsistency")
  805. }
  806. return isValid
  807. }
  808. @usableFromInline
  809. enum OnInvalidateAllSubscriptions {
  810. case resume(ConsumerContinuations)
  811. case none
  812. }
  813. @inlinable
  814. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  815. let onCancel: OnInvalidateAllSubscriptions
  816. switch self._state {
  817. case .initial:
  818. onCancel = .none
  819. case .subscribed(var state):
  820. onCancel = state.invalidateAllSubscriptions()
  821. self._state = .subscribed(state)
  822. case .streaming(var state):
  823. onCancel = state.invalidateAllSubscriptions()
  824. self._state = .streaming(state)
  825. case .finished:
  826. onCancel = .none
  827. case ._modifying:
  828. fatalError("Internal inconsistency")
  829. }
  830. return onCancel
  831. }
  832. @usableFromInline
  833. enum OnYield {
  834. case none
  835. case suspend(Int)
  836. case resume(ConsumerContinuations)
  837. case throwAlreadyFinished
  838. }
  839. @inlinable
  840. mutating func yield(_ element: Element) -> OnYield {
  841. let onYield: OnYield
  842. switch self._state {
  843. case .initial(let state):
  844. self._state = ._modifying
  845. // Move to streaming.
  846. var state = State.Streaming(from: state)
  847. onYield = state.append(element)
  848. self._state = .streaming(state)
  849. case .subscribed(let state):
  850. self._state = ._modifying
  851. var state = State.Streaming(from: state)
  852. onYield = state.append(element)
  853. self._state = .streaming(state)
  854. case .streaming(var state):
  855. self._state = ._modifying
  856. onYield = state.append(element)
  857. self._state = .streaming(state)
  858. case .finished:
  859. onYield = .throwAlreadyFinished
  860. case ._modifying:
  861. fatalError("Internal inconsistency")
  862. }
  863. return onYield
  864. }
  865. @usableFromInline
  866. enum OnFinish {
  867. case none
  868. case resume(ConsumerContinuations, ProducerContinuations)
  869. }
  870. @inlinable
  871. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  872. let onFinish: OnFinish
  873. switch self._state {
  874. case .initial(let state):
  875. self._state = ._modifying
  876. let state = State.Finished(from: state, result: result)
  877. self._state = .finished(state)
  878. onFinish = .none
  879. case .subscribed(var state):
  880. self._state = ._modifying
  881. onFinish = state.finish(result: result)
  882. self._state = .finished(State.Finished(from: state, result: result))
  883. case .streaming(var state):
  884. self._state = ._modifying
  885. onFinish = state.finish(result: result)
  886. self._state = .finished(State.Finished(from: state, result: result))
  887. case .finished:
  888. onFinish = .none
  889. case ._modifying:
  890. fatalError("Internal inconsistency")
  891. }
  892. return onFinish
  893. }
  894. @usableFromInline
  895. enum OnNext {
  896. @usableFromInline
  897. struct ReturnAndResumeProducers {
  898. @usableFromInline
  899. var nextResult: Result<Element?, Error>
  900. @usableFromInline
  901. var producers: ProducerContinuations
  902. @inlinable
  903. init(
  904. nextResult: Result<Element?, Error>,
  905. producers: [ProducerContinuation] = [],
  906. producerResult: Result<Void, Error> = .success(())
  907. ) {
  908. self.nextResult = nextResult
  909. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  910. }
  911. }
  912. case `return`(ReturnAndResumeProducers)
  913. case suspend
  914. }
  915. @inlinable
  916. mutating func nextElement(
  917. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  918. ) -> OnNext {
  919. let onNext: OnNext
  920. switch self._state {
  921. case .initial:
  922. // No subscribers so demand isn't possible.
  923. fatalError("Internal inconsistency")
  924. case .subscribed(var state):
  925. self._state = ._modifying
  926. onNext = state.next(id)
  927. self._state = .subscribed(state)
  928. case .streaming(var state):
  929. self._state = ._modifying
  930. onNext = state.next(id)
  931. self._state = .streaming(state)
  932. case .finished(var state):
  933. self._state = ._modifying
  934. onNext = state.next(id)
  935. self._state = .finished(state)
  936. case ._modifying:
  937. fatalError("Internal inconsistency")
  938. }
  939. return onNext
  940. }
  941. @usableFromInline
  942. enum OnSetContinuation {
  943. case none
  944. case resume(ConsumerContinuation, Result<Element?, Error>)
  945. }
  946. @inlinable
  947. mutating func setContinuation(
  948. _ continuation: ConsumerContinuation,
  949. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  950. ) -> OnSetContinuation {
  951. let onSetContinuation: OnSetContinuation
  952. switch self._state {
  953. case .initial:
  954. // No subscribers so demand isn't possible.
  955. fatalError("Internal inconsistency")
  956. case .subscribed(var state):
  957. self._state = ._modifying
  958. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  959. self._state = .subscribed(state)
  960. case .streaming(var state):
  961. self._state = ._modifying
  962. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  963. self._state = .streaming(state)
  964. case .finished, ._modifying:
  965. // All values must have been produced, nothing to wait for.
  966. fatalError("Internal inconsistency")
  967. }
  968. return onSetContinuation
  969. }
  970. @usableFromInline
  971. enum OnCancelSubscription {
  972. case none
  973. case resume(ConsumerContinuation, Result<Element?, Error>)
  974. }
  975. @inlinable
  976. mutating func cancelSubscription(
  977. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  978. ) -> OnCancelSubscription {
  979. let onCancel: OnCancelSubscription
  980. switch self._state {
  981. case .initial:
  982. // No subscribers so demand isn't possible.
  983. fatalError("Internal inconsistency")
  984. case .subscribed(var state):
  985. self._state = ._modifying
  986. onCancel = state.cancel(id)
  987. self._state = .subscribed(state)
  988. case .streaming(var state):
  989. self._state = ._modifying
  990. onCancel = state.cancel(id)
  991. self._state = .streaming(state)
  992. case .finished, ._modifying:
  993. // All values must have been produced, nothing to wait for.
  994. fatalError("Internal inconsistency")
  995. }
  996. return onCancel
  997. }
  998. @usableFromInline
  999. enum OnSubscribe {
  1000. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1001. }
  1002. @inlinable
  1003. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1004. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1005. switch self._state {
  1006. case .initial(let state):
  1007. self._state = ._modifying
  1008. var state = State.Subscribed(from: state)
  1009. id = state.subscribe()
  1010. self._state = .subscribed(state)
  1011. case .subscribed(var state):
  1012. self._state = ._modifying
  1013. id = state.subscribe()
  1014. self._state = .subscribed(state)
  1015. case .streaming(var state):
  1016. self._state = ._modifying
  1017. id = state.subscribe()
  1018. self._state = .streaming(state)
  1019. case .finished(var state):
  1020. self._state = ._modifying
  1021. id = state.subscribe()
  1022. self._state = .finished(state)
  1023. case ._modifying:
  1024. fatalError("Internal inconsistency")
  1025. }
  1026. return id
  1027. }
  1028. @usableFromInline
  1029. enum OnWaitToProduceMore {
  1030. case none
  1031. case resume(ProducerContinuation, Result<Void, Error>)
  1032. }
  1033. @inlinable
  1034. mutating func waitToProduceMore(
  1035. continuation: ProducerContinuation,
  1036. token: Int
  1037. ) -> OnWaitToProduceMore {
  1038. let onWaitToProduceMore: OnWaitToProduceMore
  1039. switch self._state {
  1040. case .initial, .subscribed:
  1041. // Nothing produced yet, so no reason have to wait to produce.
  1042. fatalError("Internal inconsistency")
  1043. case .streaming(var state):
  1044. self._state = ._modifying
  1045. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1046. self._state = .streaming(state)
  1047. case .finished:
  1048. onWaitToProduceMore = .resume(continuation, .success(()))
  1049. case ._modifying:
  1050. fatalError("Internal inconsistency")
  1051. }
  1052. return onWaitToProduceMore
  1053. }
  1054. @usableFromInline
  1055. typealias OnCancelProducer = OnWaitToProduceMore
  1056. @inlinable
  1057. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1058. let onCancelProducer: OnCancelProducer
  1059. switch self._state {
  1060. case .initial, .subscribed:
  1061. // Nothing produced yet, so no reason have to wait to produce.
  1062. fatalError("Internal inconsistency")
  1063. case .streaming(var state):
  1064. self._state = ._modifying
  1065. onCancelProducer = state.cancelProducer(withToken: token)
  1066. self._state = .streaming(state)
  1067. case .finished:
  1068. // No producers to cancel; do nothing.
  1069. onCancelProducer = .none
  1070. case ._modifying:
  1071. fatalError("Internal inconsistency")
  1072. }
  1073. return onCancelProducer
  1074. }
  1075. @usableFromInline
  1076. enum OnDropResources {
  1077. case none
  1078. case resume(ConsumerContinuations, ProducerContinuations)
  1079. }
  1080. @inlinable
  1081. mutating func dropResources() -> OnDropResources {
  1082. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1083. let onDrop: OnDropResources
  1084. switch self._state {
  1085. case .initial(let state):
  1086. self._state = ._modifying
  1087. onDrop = .none
  1088. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1089. case .subscribed(var state):
  1090. self._state = ._modifying
  1091. onDrop = state.dropResources(error: error)
  1092. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1093. case .streaming(var state):
  1094. self._state = ._modifying
  1095. onDrop = state.dropResources(error: error)
  1096. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1097. case .finished:
  1098. onDrop = .none
  1099. case ._modifying:
  1100. fatalError("Internal inconsistency")
  1101. }
  1102. return onDrop
  1103. }
  1104. }
  1105. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1106. extension _BroadcastSequenceStateMachine {
  1107. /// A collection of elements tagged with an identifier.
  1108. ///
  1109. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1110. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1111. @usableFromInline
  1112. struct Elements: Sendable {
  1113. /// The ID of an element
  1114. @usableFromInline
  1115. struct ID: Hashable, Sendable, Comparable, Strideable {
  1116. @usableFromInline
  1117. private(set) var rawValue: Int
  1118. @usableFromInline
  1119. static var initial: Self {
  1120. ID(id: 0)
  1121. }
  1122. private init(id: Int) {
  1123. self.rawValue = id
  1124. }
  1125. @inlinable
  1126. mutating func formNext() {
  1127. self.rawValue += 1
  1128. }
  1129. @inlinable
  1130. func next() -> Self {
  1131. var copy = self
  1132. copy.formNext()
  1133. return copy
  1134. }
  1135. @inlinable
  1136. func distance(to other: Self) -> Int {
  1137. other.rawValue - self.rawValue
  1138. }
  1139. @inlinable
  1140. func advanced(by n: Int) -> Self {
  1141. var copy = self
  1142. copy.rawValue += n
  1143. return copy
  1144. }
  1145. @inlinable
  1146. static func < (lhs: Self, rhs: Self) -> Bool {
  1147. lhs.rawValue < rhs.rawValue
  1148. }
  1149. }
  1150. @usableFromInline
  1151. struct _IdentifiableElement: Sendable {
  1152. @usableFromInline
  1153. var element: Element
  1154. @usableFromInline
  1155. var id: ID
  1156. @inlinable
  1157. init(element: Element, id: ID) {
  1158. self.element = element
  1159. self.id = id
  1160. }
  1161. }
  1162. @usableFromInline
  1163. var _elements: Deque<_IdentifiableElement>
  1164. @usableFromInline
  1165. var _nextID: ID
  1166. @inlinable
  1167. init() {
  1168. self._nextID = .initial
  1169. self._elements = []
  1170. }
  1171. @inlinable
  1172. mutating func nextElementID() -> ID {
  1173. let id = self._nextID
  1174. self._nextID.formNext()
  1175. return id
  1176. }
  1177. /// The highest ID of the stored elements; `nil` if there are no elements.
  1178. @inlinable
  1179. var highestID: ID? { self._elements.last?.id }
  1180. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1181. @inlinable
  1182. var lowestID: ID? { self._elements.first?.id }
  1183. /// The number of stored elements.
  1184. @inlinable
  1185. var count: Int { self._elements.count }
  1186. /// Whether there are no stored elements.
  1187. @inlinable
  1188. var isEmpty: Bool { self._elements.isEmpty }
  1189. /// Appends an element to the collection.
  1190. @inlinable
  1191. mutating func append(_ element: Element) {
  1192. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1193. }
  1194. /// Removes the first element from the collection.
  1195. @discardableResult
  1196. @inlinable
  1197. mutating func removeFirst() -> Element {
  1198. let removed = self._elements.removeFirst()
  1199. return removed.element
  1200. }
  1201. @usableFromInline
  1202. enum ElementLookup {
  1203. /// The element was found in the collection.
  1204. case found(Element)
  1205. /// The element isn't in the collection, but it could be in the future.
  1206. case maybeAvailableLater
  1207. /// The element was in the collection, but is no longer available.
  1208. case noLongerAvailable
  1209. }
  1210. /// Lookup the element with the given ID.
  1211. ///
  1212. /// - Parameter id: The ID of the element to lookup.
  1213. @inlinable
  1214. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1215. guard let low = self.lowestID, let high = self.highestID else {
  1216. // Must be empty.
  1217. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1218. }
  1219. assert(low <= high)
  1220. let lookup: ElementLookup
  1221. if id < low {
  1222. lookup = .noLongerAvailable
  1223. } else if id > high {
  1224. lookup = .maybeAvailableLater
  1225. } else {
  1226. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1227. // into the deque by looking at the offsets.
  1228. let offset = low.distance(to: id)
  1229. let index = self._elements.startIndex.advanced(by: offset)
  1230. lookup = .found(self._elements[index].element)
  1231. }
  1232. return lookup
  1233. }
  1234. }
  1235. }
  1236. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1237. extension _BroadcastSequenceStateMachine {
  1238. /// A collection of subcriptions.
  1239. @usableFromInline
  1240. struct Subscriptions: Sendable {
  1241. @usableFromInline
  1242. struct ID: Hashable, Sendable {
  1243. @usableFromInline
  1244. private(set) var rawValue: Int
  1245. @inlinable
  1246. init() {
  1247. self.rawValue = 0
  1248. }
  1249. @inlinable
  1250. mutating func formNext() {
  1251. self.rawValue += 1
  1252. }
  1253. @inlinable
  1254. func next() -> Self {
  1255. var copy = self
  1256. copy.formNext()
  1257. return copy
  1258. }
  1259. }
  1260. @usableFromInline
  1261. struct _Subscriber: Sendable {
  1262. /// The ID of the subscriber.
  1263. @usableFromInline
  1264. var id: ID
  1265. /// The ID of the next element the subscriber will consume.
  1266. @usableFromInline
  1267. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1268. /// A continuation which which will be resumed when the next element becomes available.
  1269. @usableFromInline
  1270. var continuation: ConsumerContinuation?
  1271. @inlinable
  1272. init(
  1273. id: ID,
  1274. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1275. continuation: ConsumerContinuation? = nil
  1276. ) {
  1277. self.id = id
  1278. self.nextElementID = nextElementID
  1279. self.continuation = continuation
  1280. }
  1281. /// Returns and sets the continuation to `nil` if one exists.
  1282. ///
  1283. /// The next element ID is advanced if a contination exists.
  1284. ///
  1285. /// - Returns: The continuation, if one existed.
  1286. @inlinable
  1287. mutating func takeContinuation() -> ConsumerContinuation? {
  1288. guard let continuation = self.continuation else { return nil }
  1289. self.continuation = nil
  1290. self.nextElementID.formNext()
  1291. return continuation
  1292. }
  1293. }
  1294. @usableFromInline
  1295. var _subscribers: [_Subscriber]
  1296. @usableFromInline
  1297. var _nextSubscriberID: ID
  1298. @inlinable
  1299. init() {
  1300. self._subscribers = []
  1301. self._nextSubscriberID = ID()
  1302. }
  1303. /// Returns the number of subscribers.
  1304. @inlinable
  1305. var count: Int { self._subscribers.count }
  1306. /// Returns whether the collection is empty.
  1307. @inlinable
  1308. var isEmpty: Bool { self._subscribers.isEmpty }
  1309. /// Adds a new subscriber and returns its unique ID.
  1310. ///
  1311. /// - Returns: The ID of the new subscriber.
  1312. @inlinable
  1313. mutating func subscribe() -> ID {
  1314. let id = self._nextSubscriberID
  1315. self._nextSubscriberID.formNext()
  1316. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1317. return id
  1318. }
  1319. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1320. ///
  1321. /// - Parameters:
  1322. /// - id: The ID of the subscriber.
  1323. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1324. /// a boolean indicating whether the subscriber should be removed.
  1325. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1326. /// given ID.
  1327. @inlinable
  1328. mutating func withMutableElementID<R>(
  1329. forSubscriber id: ID,
  1330. _ body: (
  1331. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1332. ) -> (result: R, removeSubscription: Bool)
  1333. ) -> R? {
  1334. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1335. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1336. if removeSubscription {
  1337. self._subscribers.remove(at: index)
  1338. }
  1339. return result
  1340. }
  1341. /// Sets the continuation for the subscription with the given ID.
  1342. /// - Parameters:
  1343. /// - continuation: The continuation to set.
  1344. /// - id: The ID of the subscriber.
  1345. /// - Returns: A boolean indicating whether the continuation was set or not.
  1346. @inlinable
  1347. mutating func setContinuation(
  1348. _ continuation: ConsumerContinuation,
  1349. forSubscriber id: ID
  1350. ) -> Bool {
  1351. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1352. return false
  1353. }
  1354. assert(self._subscribers[index].continuation == nil)
  1355. self._subscribers[index].continuation = continuation
  1356. return true
  1357. }
  1358. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1359. @inlinable
  1360. func subscribers(
  1361. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1362. ) -> [ID] {
  1363. return self._subscribers.filter {
  1364. $0.nextElementID == id
  1365. }.map {
  1366. $0.id
  1367. }
  1368. }
  1369. /// Removes the subscriber with the given ID.
  1370. /// - Parameter id: The ID of the subscriber to remove.
  1371. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1372. /// associated with the subscriber.
  1373. @inlinable
  1374. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1375. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1376. return (false, nil)
  1377. }
  1378. let continuation = self._subscribers[index].continuation
  1379. self._subscribers.remove(at: index)
  1380. return (true, continuation)
  1381. }
  1382. /// Remove all subscribers in the given array of IDs.
  1383. @inlinable
  1384. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1385. self._subscribers.removeAll {
  1386. idsToRemove.contains($0.id)
  1387. }
  1388. }
  1389. /// Remove all subscribers and return their IDs.
  1390. @inlinable
  1391. mutating func removeAllSubscribers() -> [ID] {
  1392. let subscribers = self._subscribers.map { $0.id }
  1393. self._subscribers.removeAll()
  1394. return subscribers
  1395. }
  1396. /// Returns any continuations set on subscribers, unsetting at the same time.
  1397. @inlinable
  1398. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1399. // Avoid allocs if there's only one subscriber.
  1400. let count = self._countPendingContinuations()
  1401. let result: _OneOrMany<ConsumerContinuation>?
  1402. switch count {
  1403. case 0:
  1404. result = nil
  1405. case 1:
  1406. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1407. let continuation = self._subscribers[index].takeContinuation()!
  1408. result = .one(continuation)
  1409. default:
  1410. var continuations = [ConsumerContinuation]()
  1411. continuations.reserveCapacity(count)
  1412. for index in self._subscribers.indices {
  1413. if let continuation = self._subscribers[index].takeContinuation() {
  1414. continuations.append(continuation)
  1415. }
  1416. }
  1417. result = .many(continuations)
  1418. }
  1419. return result
  1420. }
  1421. /// Removes all subscribers which have continuations and return their continuations.
  1422. @inlinable
  1423. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1424. // Avoid allocs if there's only one subscriber.
  1425. let count = self._countPendingContinuations()
  1426. let result: _OneOrMany<ConsumerContinuation>?
  1427. switch count {
  1428. case 0:
  1429. result = nil
  1430. case 1:
  1431. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1432. let subscription = self._subscribers.remove(at: index)
  1433. result = .one(subscription.continuation!)
  1434. default:
  1435. var continuations = [ConsumerContinuation]()
  1436. continuations.reserveCapacity(count)
  1437. var removable = [ID]()
  1438. removable.reserveCapacity(count)
  1439. for subscription in self._subscribers {
  1440. if let continuation = subscription.continuation {
  1441. continuations.append(continuation)
  1442. removable.append(subscription.id)
  1443. }
  1444. }
  1445. self._subscribers.removeAll {
  1446. removable.contains($0.id)
  1447. }
  1448. result = .many(continuations)
  1449. }
  1450. return result
  1451. }
  1452. @inlinable
  1453. func _countPendingContinuations() -> Int {
  1454. return self._subscribers.reduce(into: 0) { count, subscription in
  1455. if subscription.continuation != nil {
  1456. count += 1
  1457. }
  1458. }
  1459. }
  1460. }
  1461. }
  1462. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1463. extension _BroadcastSequenceStateMachine {
  1464. // TODO: tiny array
  1465. @usableFromInline
  1466. enum _OneOrMany<Value> {
  1467. case one(Value)
  1468. case many([Value])
  1469. }
  1470. }