BroadcastAsyncSequence.swift 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  18. ///
  19. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  20. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  21. ///
  22. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  23. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  24. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  25. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  26. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  27. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  28. ///
  29. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  30. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  31. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  32. @usableFromInline
  33. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  34. @usableFromInline
  35. let _storage: _BroadcastSequenceStorage<Element>
  36. @inlinable
  37. init(_storage: _BroadcastSequenceStorage<Element>) {
  38. self._storage = _storage
  39. }
  40. /// Make a new stream and continuation.
  41. ///
  42. /// - Parameters:
  43. /// - elementType: The type of element this sequence produces.
  44. /// - bufferSize: The number of elements this sequence may store.
  45. /// - Returns: A stream and continuation.
  46. @inlinable
  47. static func makeStream(
  48. of elementType: Element.Type = Element.self,
  49. bufferSize: Int
  50. ) -> (stream: Self, continuation: Self.Source) {
  51. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  52. let stream = Self(_storage: storage)
  53. let continuation = Self.Source(_storage: storage)
  54. return (stream, continuation)
  55. }
  56. @inlinable
  57. func makeAsyncIterator() -> AsyncIterator {
  58. let id = self._storage.subscribe()
  59. return AsyncIterator(_storage: _storage, id: id)
  60. }
  61. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  62. /// consume elements.
  63. ///
  64. /// This function can return `false` if there are active subscribers or the internal buffer no
  65. /// longer contains the first element in the sequence.
  66. @inlinable
  67. var isKnownSafeForNextSubscriber: Bool {
  68. self._storage.isKnownSafeForNextSubscriber
  69. }
  70. /// Invalidates all active subscribers.
  71. ///
  72. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  73. @inlinable
  74. func invalidateAllSubscriptions() {
  75. self._storage.invalidateAllSubscriptions()
  76. }
  77. }
  78. // MARK: - AsyncIterator
  79. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  80. extension BroadcastAsyncSequence {
  81. @usableFromInline
  82. struct AsyncIterator: AsyncIteratorProtocol {
  83. @usableFromInline
  84. let _storage: _BroadcastSequenceStorage<Element>
  85. @usableFromInline
  86. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  87. @inlinable
  88. init(
  89. _storage: _BroadcastSequenceStorage<Element>,
  90. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  91. ) {
  92. self._storage = _storage
  93. self._subscriberID = id
  94. }
  95. @inlinable
  96. mutating func next() async throws -> Element? {
  97. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  98. }
  99. }
  100. }
  101. // MARK: - Continuation
  102. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  103. extension BroadcastAsyncSequence {
  104. @usableFromInline
  105. struct Source: Sendable {
  106. @usableFromInline
  107. let _storage: _BroadcastSequenceStorage<Element>
  108. @usableFromInline
  109. init(_storage: _BroadcastSequenceStorage<Element>) {
  110. self._storage = _storage
  111. }
  112. @inlinable
  113. func write(_ element: Element) async throws {
  114. try await self._storage.yield(element)
  115. }
  116. @inlinable
  117. func finish(with result: Result<Void, Error>) {
  118. self._storage.finish(result)
  119. }
  120. @inlinable
  121. func finish() {
  122. self.finish(with: .success(()))
  123. }
  124. @inlinable
  125. func finish(throwing error: Error) {
  126. self.finish(with: .failure(error))
  127. }
  128. }
  129. }
  130. @usableFromInline
  131. enum BroadcastAsyncSequenceError: Error {
  132. /// The consumer was too slow.
  133. case consumingTooSlow
  134. /// The producer has already finished.
  135. case productionAlreadyFinished
  136. }
  137. // MARK: - Storage
  138. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  139. @usableFromInline
  140. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  141. @usableFromInline
  142. let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
  143. @inlinable
  144. init(bufferSize: Int) {
  145. self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  146. }
  147. deinit {
  148. let onDrop = self._state.withLockedValue { state in
  149. state.dropResources()
  150. }
  151. switch onDrop {
  152. case .none:
  153. ()
  154. case .resume(let consumers, let producers):
  155. consumers.resume()
  156. producers.resume()
  157. }
  158. }
  159. // MARK - Producer
  160. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  161. ///
  162. /// - Parameter element: The element to write.
  163. @inlinable
  164. func yield(_ element: Element) async throws {
  165. let onYield = self._state.withLockedValue { state in state.yield(element) }
  166. switch onYield {
  167. case .none:
  168. ()
  169. case .resume(let continuations):
  170. continuations.resume()
  171. case .suspend(let token):
  172. try await withTaskCancellationHandler {
  173. try await withCheckedThrowingContinuation { continuation in
  174. let onProduceMore = self._state.withLockedValue { state in
  175. state.waitToProduceMore(continuation: continuation, token: token)
  176. }
  177. switch onProduceMore {
  178. case .resume(let continuation, let result):
  179. continuation.resume(with: result)
  180. case .none:
  181. ()
  182. }
  183. }
  184. } onCancel: {
  185. let onCancel = self._state.withLockedValue { state in
  186. state.cancelProducer(withToken: token)
  187. }
  188. switch onCancel {
  189. case .resume(let continuation, let result):
  190. continuation.resume(with: result)
  191. case .none:
  192. ()
  193. }
  194. }
  195. case .throwAlreadyFinished:
  196. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  197. }
  198. }
  199. /// Indicate that no more values will be produced.
  200. ///
  201. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  202. @inlinable
  203. func finish(_ result: Result<Void, Error>) {
  204. let action = self._state.withLockedValue { state in state.finish(result: result) }
  205. switch action {
  206. case .none:
  207. ()
  208. case .resume(let subscribers, let producers):
  209. subscribers.resume()
  210. producers.resume()
  211. }
  212. }
  213. // MARK: - Consumer
  214. /// Create a subscription to the stream.
  215. ///
  216. /// - Returns: Returns a unique subscription ID.
  217. @inlinable
  218. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  219. self._state.withLockedValue { $0.subscribe() }
  220. }
  221. /// Returns the next element for the given subscriber, if it is available.
  222. ///
  223. /// - Parameter id: The ID of the subscriber requesting the element.
  224. /// - Returns: The next element or `nil` if the stream has been terminated.
  225. @inlinable
  226. func nextElement(
  227. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  228. ) async throws -> Element? {
  229. let onNext = self._state.withLockedValue { $0.nextElement(forSubscriber: id) }
  230. switch onNext {
  231. case .return(let returnAndProduceMore):
  232. returnAndProduceMore.producers.resume()
  233. return try returnAndProduceMore.nextResult.get()
  234. case .suspend:
  235. return try await withTaskCancellationHandler {
  236. return try await withCheckedThrowingContinuation { continuation in
  237. let onSetContinuation = self._state.withLockedValue { state in
  238. state.setContinuation(continuation, forSubscription: id)
  239. }
  240. switch onSetContinuation {
  241. case .resume(let continuation, let result):
  242. continuation.resume(with: result)
  243. case .none:
  244. ()
  245. }
  246. }
  247. } onCancel: {
  248. let onCancel = self._state.withLockedValue { state in
  249. state.cancelSubscription(withID: id)
  250. }
  251. switch onCancel {
  252. case .resume(let continuation, let result):
  253. continuation.resume(with: result)
  254. case .none:
  255. ()
  256. }
  257. }
  258. }
  259. }
  260. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  261. /// elements.
  262. @inlinable
  263. var isKnownSafeForNextSubscriber: Bool {
  264. self._state.withLockedValue { state in
  265. state.nextSubscriptionIsValid
  266. }
  267. }
  268. /// Invalidates all active subscriptions.
  269. @inlinable
  270. func invalidateAllSubscriptions() {
  271. let action = self._state.withLockedValue { state in
  272. state.invalidateAllSubscriptions()
  273. }
  274. switch action {
  275. case .resume(let continuations):
  276. continuations.resume()
  277. case .none:
  278. ()
  279. }
  280. }
  281. }
  282. // MARK: - State machine
  283. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  284. @usableFromInline
  285. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  286. @usableFromInline
  287. typealias ConsumerContinuation = CheckedContinuation<Element?, Error>
  288. @usableFromInline
  289. typealias ProducerContinuation = CheckedContinuation<Void, Error>
  290. @usableFromInline
  291. struct ConsumerContinuations {
  292. @usableFromInline
  293. var continuations: _OneOrMany<ConsumerContinuation>
  294. @usableFromInline
  295. var result: Result<Element?, Error>
  296. @inlinable
  297. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, Error>) {
  298. self.continuations = continuations
  299. self.result = result
  300. }
  301. @inlinable
  302. func resume() {
  303. switch self.continuations {
  304. case .one(let continuation):
  305. continuation.resume(with: self.result)
  306. case .many(let continuations):
  307. for continuation in continuations {
  308. continuation.resume(with: self.result)
  309. }
  310. }
  311. }
  312. }
  313. @usableFromInline
  314. struct ProducerContinuations {
  315. @usableFromInline
  316. var continuations: [ProducerContinuation]
  317. @usableFromInline
  318. var result: Result<Void, Error>
  319. @inlinable
  320. init(continuations: [ProducerContinuation], result: Result<Void, Error>) {
  321. self.continuations = continuations
  322. self.result = result
  323. }
  324. @inlinable
  325. func resume() {
  326. for continuation in self.continuations {
  327. continuation.resume(with: self.result)
  328. }
  329. }
  330. }
  331. @usableFromInline
  332. enum State: Sendable {
  333. /// No subscribers and no elements have been produced.
  334. case initial(Initial)
  335. /// Subscribers exist but no elements have been produced.
  336. case subscribed(Subscribed)
  337. /// Elements have been produced, there may or may not be subscribers.
  338. case streaming(Streaming)
  339. /// No more elements will be produced. There may or may not been subscribers.
  340. case finished(Finished)
  341. /// Temporary state to avoid CoWs.
  342. case _modifying
  343. @inlinable
  344. init(bufferSize: Int) {
  345. self = .initial(Initial(bufferSize: bufferSize))
  346. }
  347. @usableFromInline
  348. struct Initial: Sendable {
  349. @usableFromInline
  350. let bufferSize: Int
  351. @inlinable
  352. init(bufferSize: Int) {
  353. self.bufferSize = bufferSize
  354. }
  355. }
  356. @usableFromInline
  357. struct Subscribed: Sendable {
  358. /// Active subscriptions.
  359. @usableFromInline
  360. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  361. /// Subscriptions to fail and remove when they next request an element.
  362. @usableFromInline
  363. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  364. /// The maximum size of the element buffer.
  365. @usableFromInline
  366. let bufferSize: Int
  367. @inlinable
  368. init(from state: Initial) {
  369. self.subscriptions = Subscriptions()
  370. self.subscriptionsToDrop = []
  371. self.bufferSize = state.bufferSize
  372. }
  373. @inlinable
  374. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  375. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  376. return .resume(
  377. .init(continuations: continuations, result: result.map { nil }),
  378. .init(continuations: [], result: .success(()))
  379. )
  380. }
  381. @inlinable
  382. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  383. // Not streaming, so suspend or remove if the subscription should be dropped.
  384. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  385. return .suspend
  386. }
  387. self.subscriptionsToDrop.remove(at: index)
  388. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  389. }
  390. @inlinable
  391. mutating func cancel(
  392. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  393. ) -> OnCancelSubscription {
  394. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  395. if let continuation = continuation {
  396. return .resume(continuation, .failure(CancellationError()))
  397. } else {
  398. return .none
  399. }
  400. }
  401. @inlinable
  402. mutating func setContinuation(
  403. _ continuation: ConsumerContinuation,
  404. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  405. ) -> OnSetContinuation {
  406. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  407. return .none
  408. } else {
  409. return .resume(continuation, .failure(CancellationError()))
  410. }
  411. }
  412. @inlinable
  413. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  414. self.subscriptions.subscribe()
  415. }
  416. @inlinable
  417. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  418. // Remove subscriptions with continuations, they need to be failed.
  419. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  420. let consumerContinuations = ConsumerContinuations(
  421. continuations: continuations,
  422. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  423. )
  424. // Remove any others to be failed when they next call 'next'.
  425. let ids = self.subscriptions.removeAllSubscribers()
  426. self.subscriptionsToDrop.append(contentsOf: ids)
  427. return .resume(consumerContinuations)
  428. }
  429. @inlinable
  430. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  431. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  432. let consumerContinuations = ConsumerContinuations(
  433. continuations: continuations,
  434. result: .failure(error)
  435. )
  436. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  437. return .resume(consumerContinuations, producerContinuations)
  438. }
  439. }
  440. @usableFromInline
  441. struct Streaming: Sendable {
  442. /// A deque of elements tagged with IDs.
  443. @usableFromInline
  444. var elements: Elements
  445. /// The maximum size of the element buffer.
  446. @usableFromInline
  447. let bufferSize: Int
  448. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  449. /// Producers which have been suspended.
  450. @usableFromInline
  451. var producers: [(ProducerContinuation, Int)]
  452. /// The IDs of producers which have been cancelled.
  453. @usableFromInline
  454. var cancelledProducers: [Int]
  455. /// The next token for a producer.
  456. @usableFromInline
  457. var producerToken: Int
  458. /// Active subscriptions.
  459. @usableFromInline
  460. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  461. /// Subscriptions to fail and remove when they next request an element.
  462. @usableFromInline
  463. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  464. @inlinable
  465. init(from state: Initial) {
  466. self.elements = Elements()
  467. self.producers = []
  468. self.producerToken = 0
  469. self.cancelledProducers = []
  470. self.subscriptions = Subscriptions()
  471. self.subscriptionsToDrop = []
  472. self.bufferSize = state.bufferSize
  473. }
  474. @inlinable
  475. init(from state: Subscribed) {
  476. self.elements = Elements()
  477. self.producers = []
  478. self.producerToken = 0
  479. self.cancelledProducers = []
  480. self.subscriptions = state.subscriptions
  481. self.subscriptionsToDrop = state.subscriptionsToDrop
  482. self.bufferSize = state.bufferSize
  483. }
  484. @inlinable
  485. mutating func append(_ element: Element) -> OnYield {
  486. let onYield: OnYield
  487. self.elements.append(element)
  488. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  489. // If the buffer is too large then:
  490. // - if all subscribers are equally slow suspend the producer
  491. // - if some subscribers are slow then remove them and the oldest value
  492. // - if no subscribers are slow then remove the oldest value
  493. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  494. switch slowConsumers.count {
  495. case 0:
  496. if self.subscriptions.isEmpty {
  497. // No consumers.
  498. let token = self.producerToken
  499. self.producerToken += 1
  500. onYield = .suspend(token)
  501. } else {
  502. // No consumers are slow. Remove the oldest value.
  503. self.elements.removeFirst()
  504. onYield = .none
  505. }
  506. case self.subscriptions.count:
  507. // All consumers are slow; stop the production of new value.
  508. let token = self.producerToken
  509. self.producerToken += 1
  510. onYield = .suspend(token)
  511. default:
  512. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  513. // oldest value.
  514. self.elements.removeFirst()
  515. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  516. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  517. onYield = .none
  518. }
  519. } else {
  520. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  521. // must be waiting for the value we just appended.
  522. let continuations = self.subscriptions.takeContinuations().map {
  523. ConsumerContinuations(continuations: $0, result: .success(element))
  524. }
  525. if let continuations = continuations {
  526. onYield = .resume(continuations)
  527. } else {
  528. onYield = .none
  529. }
  530. }
  531. return onYield
  532. }
  533. @inlinable
  534. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  535. let onNext: OnNext
  536. // 1. Lookup the subscriber by ID to get their next offset
  537. // 2. If the element exists, update the element pointer and return the element
  538. // 3. Else if the ID is in the future, wait
  539. // 4. Else the ID is in the past, fail and remove the subscription.
  540. // Lookup the subscriber with the given ID.
  541. let onNextForSubscription = self.subscriptions.withMutableElementID(
  542. forSubscriber: id
  543. ) { elementID -> (OnNext, Bool) in
  544. let onNext: OnNext
  545. let removeSubscription: Bool
  546. // Subscriber exists; do we have the element it requires next?
  547. switch self.elements.lookupElement(withID: elementID) {
  548. case .found(let element):
  549. // Element exists in the buffer. Advance our element ID.
  550. elementID.formNext()
  551. onNext = .return(.init(nextResult: .success(element)))
  552. removeSubscription = false
  553. case .maybeAvailableLater:
  554. // Element may exist in the future.
  555. onNext = .suspend
  556. removeSubscription = false
  557. case .noLongerAvailable:
  558. // Element existed in the past but was dropped from the buffer.
  559. onNext = .return(
  560. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  561. )
  562. removeSubscription = true
  563. }
  564. return (onNext, removeSubscription)
  565. }
  566. switch onNextForSubscription {
  567. case .return(var resultAndResume):
  568. // The producer only suspends when all consumers are equally slow or there are no
  569. // consumers at all. The latter can't be true: this function can only be called by a
  570. // consumer. The former can't be true anymore because consumption isn't concurrent
  571. // so this consumer must be faster than the others so let the producer resume.
  572. //
  573. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  574. // to produce until the producer provides more values.
  575. resultAndResume.producers = ProducerContinuations(
  576. continuations: self.producers.map { $0.0 },
  577. result: .success(())
  578. )
  579. self.producers.removeAll()
  580. onNext = .return(resultAndResume)
  581. case .suspend:
  582. onNext = .suspend
  583. case .none:
  584. // No subscription found, must have been dropped or already finished.
  585. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  586. self.subscriptionsToDrop.remove(at: index)
  587. onNext = .return(
  588. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  589. )
  590. } else {
  591. // Unknown subscriber, i.e. already finished.
  592. onNext = .return(.init(nextResult: .success(nil)))
  593. }
  594. }
  595. return onNext
  596. }
  597. @inlinable
  598. mutating func setContinuation(
  599. _ continuation: ConsumerContinuation,
  600. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  601. ) -> OnSetContinuation {
  602. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  603. return .none
  604. } else {
  605. return .resume(continuation, .failure(CancellationError()))
  606. }
  607. }
  608. @inlinable
  609. mutating func cancel(
  610. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  611. ) -> OnCancelSubscription {
  612. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  613. if let continuation = continuation {
  614. return .resume(continuation, .failure(CancellationError()))
  615. } else {
  616. return .none
  617. }
  618. }
  619. @inlinable
  620. mutating func waitToProduceMore(
  621. _ continuation: ProducerContinuation,
  622. token: Int
  623. ) -> OnWaitToProduceMore {
  624. let onWaitToProduceMore: OnWaitToProduceMore
  625. if self.elements.count < self.bufferSize {
  626. // Buffer has free space, no need to suspend.
  627. onWaitToProduceMore = .resume(continuation, .success(()))
  628. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  629. // Producer was cancelled before suspending.
  630. self.cancelledProducers.remove(at: index)
  631. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  632. } else {
  633. // Store the continuation to resume later.
  634. self.producers.append((continuation, token))
  635. onWaitToProduceMore = .none
  636. }
  637. return onWaitToProduceMore
  638. }
  639. @inlinable
  640. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  641. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  642. self.cancelledProducers.append(token)
  643. return .none
  644. }
  645. let (continuation, _) = self.producers.remove(at: index)
  646. return .resume(continuation, .failure(CancellationError()))
  647. }
  648. @inlinable
  649. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  650. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  651. let producers = self.producers.map { $0.0 }
  652. self.producers.removeAll()
  653. return .resume(
  654. .init(continuations: continuations, result: result.map { nil }),
  655. .init(continuations: producers, result: .success(()))
  656. )
  657. }
  658. @inlinable
  659. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  660. self.subscriptions.subscribe()
  661. }
  662. @inlinable
  663. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  664. // Remove subscriptions with continuations, they need to be failed.
  665. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  666. let consumerContinuations = ConsumerContinuations(
  667. continuations: continuations,
  668. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  669. )
  670. // Remove any others to be failed when they next call 'next'.
  671. let ids = self.subscriptions.removeAllSubscribers()
  672. self.subscriptionsToDrop.append(contentsOf: ids)
  673. return .resume(consumerContinuations)
  674. }
  675. @inlinable
  676. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  677. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  678. let consumerContinuations = ConsumerContinuations(
  679. continuations: continuations,
  680. result: .failure(error)
  681. )
  682. let producers = ProducerContinuations(
  683. continuations: self.producers.map { $0.0 },
  684. result: .failure(error)
  685. )
  686. self.producers.removeAll()
  687. return .resume(consumerContinuations, producers)
  688. }
  689. @inlinable
  690. func nextSubscriptionIsValid() -> Bool {
  691. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  692. }
  693. }
  694. @usableFromInline
  695. struct Finished: Sendable {
  696. /// A deque of elements tagged with IDs.
  697. @usableFromInline
  698. var elements: Elements
  699. /// Active subscriptions.
  700. @usableFromInline
  701. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  702. /// Subscriptions to fail and remove when they next request an element.
  703. @usableFromInline
  704. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  705. /// The terminating result of the sequence.
  706. @usableFromInline
  707. let result: Result<Void, Error>
  708. @inlinable
  709. init(from state: Initial, result: Result<Void, Error>) {
  710. self.elements = Elements()
  711. self.subscriptions = Subscriptions()
  712. self.subscriptionsToDrop = []
  713. self.result = result
  714. }
  715. @inlinable
  716. init(from state: Subscribed, result: Result<Void, Error>) {
  717. self.elements = Elements()
  718. self.subscriptions = state.subscriptions
  719. self.subscriptionsToDrop = []
  720. self.result = result
  721. }
  722. @inlinable
  723. init(from state: Streaming, result: Result<Void, Error>) {
  724. self.elements = state.elements
  725. self.subscriptions = state.subscriptions
  726. self.subscriptionsToDrop = state.subscriptionsToDrop
  727. self.result = result
  728. }
  729. @inlinable
  730. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  731. let onNext: OnNext
  732. let onNextForSubscription = self.subscriptions.withMutableElementID(
  733. forSubscriber: id
  734. ) { elementID -> (OnNext, Bool) in
  735. let onNext: OnNext
  736. let removeSubscription: Bool
  737. switch self.elements.lookupElement(withID: elementID) {
  738. case .found(let element):
  739. elementID.formNext()
  740. onNext = .return(.init(nextResult: .success(element)))
  741. removeSubscription = false
  742. case .maybeAvailableLater:
  743. onNext = .return(.init(nextResult: self.result.map { nil }))
  744. removeSubscription = true
  745. case .noLongerAvailable:
  746. onNext = .return(
  747. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  748. )
  749. removeSubscription = true
  750. }
  751. return (onNext, removeSubscription)
  752. }
  753. switch onNextForSubscription {
  754. case .return(let result):
  755. onNext = .return(result)
  756. case .none:
  757. // No subscriber with the given ID, it was likely dropped previously.
  758. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  759. self.subscriptionsToDrop.remove(at: index)
  760. onNext = .return(
  761. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  762. )
  763. } else {
  764. // Unknown subscriber, i.e. already finished.
  765. onNext = .return(.init(nextResult: .success(nil)))
  766. }
  767. case .suspend:
  768. fatalError("Internal inconsistency")
  769. }
  770. return onNext
  771. }
  772. @inlinable
  773. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  774. self.subscriptions.subscribe()
  775. }
  776. @inlinable
  777. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  778. // Remove subscriptions with continuations, they need to be failed.
  779. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  780. let consumerContinuations = ConsumerContinuations(
  781. continuations: continuations,
  782. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  783. )
  784. // Remove any others to be failed when they next call 'next'.
  785. let ids = self.subscriptions.removeAllSubscribers()
  786. self.subscriptionsToDrop.append(contentsOf: ids)
  787. return .resume(consumerContinuations)
  788. }
  789. @inlinable
  790. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  791. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  792. let consumerContinuations = ConsumerContinuations(
  793. continuations: continuations,
  794. result: .failure(error)
  795. )
  796. let producers = ProducerContinuations(continuations: [], result: .failure(error))
  797. return .resume(consumerContinuations, producers)
  798. }
  799. @inlinable
  800. func nextSubscriptionIsValid() -> Bool {
  801. self.elements.lowestID == .initial
  802. }
  803. @inlinable
  804. mutating func cancel(
  805. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  806. ) -> OnCancelSubscription {
  807. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  808. if let continuation = continuation {
  809. return .resume(continuation, .failure(CancellationError()))
  810. } else {
  811. return .none
  812. }
  813. }
  814. }
  815. }
  816. @usableFromInline
  817. var _state: State
  818. @inlinable
  819. init(bufferSize: Int) {
  820. self._state = State(bufferSize: bufferSize)
  821. }
  822. @inlinable
  823. var nextSubscriptionIsValid: Bool {
  824. let isValid: Bool
  825. switch self._state {
  826. case .initial:
  827. isValid = true
  828. case .subscribed:
  829. isValid = true
  830. case .streaming(let state):
  831. isValid = state.nextSubscriptionIsValid()
  832. case .finished(let state):
  833. isValid = state.nextSubscriptionIsValid()
  834. case ._modifying:
  835. fatalError("Internal inconsistency")
  836. }
  837. return isValid
  838. }
  839. @usableFromInline
  840. enum OnInvalidateAllSubscriptions {
  841. case resume(ConsumerContinuations)
  842. case none
  843. }
  844. @inlinable
  845. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  846. let onCancel: OnInvalidateAllSubscriptions
  847. switch self._state {
  848. case .initial:
  849. onCancel = .none
  850. case .subscribed(var state):
  851. self._state = ._modifying
  852. onCancel = state.invalidateAllSubscriptions()
  853. self._state = .subscribed(state)
  854. case .streaming(var state):
  855. self._state = ._modifying
  856. onCancel = state.invalidateAllSubscriptions()
  857. self._state = .streaming(state)
  858. case .finished(var state):
  859. self._state = ._modifying
  860. onCancel = state.invalidateAllSubscriptions()
  861. self._state = .finished(state)
  862. case ._modifying:
  863. fatalError("Internal inconsistency")
  864. }
  865. return onCancel
  866. }
  867. @usableFromInline
  868. enum OnYield {
  869. case none
  870. case suspend(Int)
  871. case resume(ConsumerContinuations)
  872. case throwAlreadyFinished
  873. }
  874. @inlinable
  875. mutating func yield(_ element: Element) -> OnYield {
  876. let onYield: OnYield
  877. switch self._state {
  878. case .initial(let state):
  879. self._state = ._modifying
  880. // Move to streaming.
  881. var state = State.Streaming(from: state)
  882. onYield = state.append(element)
  883. self._state = .streaming(state)
  884. case .subscribed(let state):
  885. self._state = ._modifying
  886. var state = State.Streaming(from: state)
  887. onYield = state.append(element)
  888. self._state = .streaming(state)
  889. case .streaming(var state):
  890. self._state = ._modifying
  891. onYield = state.append(element)
  892. self._state = .streaming(state)
  893. case .finished:
  894. onYield = .throwAlreadyFinished
  895. case ._modifying:
  896. fatalError("Internal inconsistency")
  897. }
  898. return onYield
  899. }
  900. @usableFromInline
  901. enum OnFinish {
  902. case none
  903. case resume(ConsumerContinuations, ProducerContinuations)
  904. }
  905. @inlinable
  906. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  907. let onFinish: OnFinish
  908. switch self._state {
  909. case .initial(let state):
  910. self._state = ._modifying
  911. let state = State.Finished(from: state, result: result)
  912. self._state = .finished(state)
  913. onFinish = .none
  914. case .subscribed(var state):
  915. self._state = ._modifying
  916. onFinish = state.finish(result: result)
  917. self._state = .finished(State.Finished(from: state, result: result))
  918. case .streaming(var state):
  919. self._state = ._modifying
  920. onFinish = state.finish(result: result)
  921. self._state = .finished(State.Finished(from: state, result: result))
  922. case .finished:
  923. onFinish = .none
  924. case ._modifying:
  925. fatalError("Internal inconsistency")
  926. }
  927. return onFinish
  928. }
  929. @usableFromInline
  930. enum OnNext {
  931. @usableFromInline
  932. struct ReturnAndResumeProducers {
  933. @usableFromInline
  934. var nextResult: Result<Element?, Error>
  935. @usableFromInline
  936. var producers: ProducerContinuations
  937. @inlinable
  938. init(
  939. nextResult: Result<Element?, Error>,
  940. producers: [ProducerContinuation] = [],
  941. producerResult: Result<Void, Error> = .success(())
  942. ) {
  943. self.nextResult = nextResult
  944. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  945. }
  946. }
  947. case `return`(ReturnAndResumeProducers)
  948. case suspend
  949. }
  950. @inlinable
  951. mutating func nextElement(
  952. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  953. ) -> OnNext {
  954. let onNext: OnNext
  955. switch self._state {
  956. case .initial:
  957. // No subscribers so demand isn't possible.
  958. fatalError("Internal inconsistency")
  959. case .subscribed(var state):
  960. self._state = ._modifying
  961. onNext = state.next(id)
  962. self._state = .subscribed(state)
  963. case .streaming(var state):
  964. self._state = ._modifying
  965. onNext = state.next(id)
  966. self._state = .streaming(state)
  967. case .finished(var state):
  968. self._state = ._modifying
  969. onNext = state.next(id)
  970. self._state = .finished(state)
  971. case ._modifying:
  972. fatalError("Internal inconsistency")
  973. }
  974. return onNext
  975. }
  976. @usableFromInline
  977. enum OnSetContinuation {
  978. case none
  979. case resume(ConsumerContinuation, Result<Element?, Error>)
  980. }
  981. @inlinable
  982. mutating func setContinuation(
  983. _ continuation: ConsumerContinuation,
  984. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  985. ) -> OnSetContinuation {
  986. let onSetContinuation: OnSetContinuation
  987. switch self._state {
  988. case .initial:
  989. // No subscribers so demand isn't possible.
  990. fatalError("Internal inconsistency")
  991. case .subscribed(var state):
  992. self._state = ._modifying
  993. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  994. self._state = .subscribed(state)
  995. case .streaming(var state):
  996. self._state = ._modifying
  997. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  998. self._state = .streaming(state)
  999. case .finished(let state):
  1000. onSetContinuation = .resume(continuation, state.result.map { _ in nil })
  1001. case ._modifying:
  1002. // All values must have been produced, nothing to wait for.
  1003. fatalError("Internal inconsistency")
  1004. }
  1005. return onSetContinuation
  1006. }
  1007. @usableFromInline
  1008. enum OnCancelSubscription {
  1009. case none
  1010. case resume(ConsumerContinuation, Result<Element?, Error>)
  1011. }
  1012. @inlinable
  1013. mutating func cancelSubscription(
  1014. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1015. ) -> OnCancelSubscription {
  1016. let onCancel: OnCancelSubscription
  1017. switch self._state {
  1018. case .initial:
  1019. // No subscribers so demand isn't possible.
  1020. fatalError("Internal inconsistency")
  1021. case .subscribed(var state):
  1022. self._state = ._modifying
  1023. onCancel = state.cancel(id)
  1024. self._state = .subscribed(state)
  1025. case .streaming(var state):
  1026. self._state = ._modifying
  1027. onCancel = state.cancel(id)
  1028. self._state = .streaming(state)
  1029. case .finished(var state):
  1030. self._state = ._modifying
  1031. onCancel = state.cancel(id)
  1032. self._state = .finished(state)
  1033. case ._modifying:
  1034. // All values must have been produced, nothing to wait for.
  1035. fatalError("Internal inconsistency")
  1036. }
  1037. return onCancel
  1038. }
  1039. @usableFromInline
  1040. enum OnSubscribe {
  1041. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1042. }
  1043. @inlinable
  1044. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1045. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1046. switch self._state {
  1047. case .initial(let state):
  1048. self._state = ._modifying
  1049. var state = State.Subscribed(from: state)
  1050. id = state.subscribe()
  1051. self._state = .subscribed(state)
  1052. case .subscribed(var state):
  1053. self._state = ._modifying
  1054. id = state.subscribe()
  1055. self._state = .subscribed(state)
  1056. case .streaming(var state):
  1057. self._state = ._modifying
  1058. id = state.subscribe()
  1059. self._state = .streaming(state)
  1060. case .finished(var state):
  1061. self._state = ._modifying
  1062. id = state.subscribe()
  1063. self._state = .finished(state)
  1064. case ._modifying:
  1065. fatalError("Internal inconsistency")
  1066. }
  1067. return id
  1068. }
  1069. @usableFromInline
  1070. enum OnWaitToProduceMore {
  1071. case none
  1072. case resume(ProducerContinuation, Result<Void, Error>)
  1073. }
  1074. @inlinable
  1075. mutating func waitToProduceMore(
  1076. continuation: ProducerContinuation,
  1077. token: Int
  1078. ) -> OnWaitToProduceMore {
  1079. let onWaitToProduceMore: OnWaitToProduceMore
  1080. switch self._state {
  1081. case .initial, .subscribed:
  1082. // Nothing produced yet, so no reason have to wait to produce.
  1083. fatalError("Internal inconsistency")
  1084. case .streaming(var state):
  1085. self._state = ._modifying
  1086. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1087. self._state = .streaming(state)
  1088. case .finished:
  1089. onWaitToProduceMore = .resume(continuation, .success(()))
  1090. case ._modifying:
  1091. fatalError("Internal inconsistency")
  1092. }
  1093. return onWaitToProduceMore
  1094. }
  1095. @usableFromInline
  1096. typealias OnCancelProducer = OnWaitToProduceMore
  1097. @inlinable
  1098. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1099. let onCancelProducer: OnCancelProducer
  1100. switch self._state {
  1101. case .initial, .subscribed:
  1102. // Nothing produced yet, so no reason have to wait to produce.
  1103. fatalError("Internal inconsistency")
  1104. case .streaming(var state):
  1105. self._state = ._modifying
  1106. onCancelProducer = state.cancelProducer(withToken: token)
  1107. self._state = .streaming(state)
  1108. case .finished:
  1109. // No producers to cancel; do nothing.
  1110. onCancelProducer = .none
  1111. case ._modifying:
  1112. fatalError("Internal inconsistency")
  1113. }
  1114. return onCancelProducer
  1115. }
  1116. @usableFromInline
  1117. enum OnDropResources {
  1118. case none
  1119. case resume(ConsumerContinuations, ProducerContinuations)
  1120. }
  1121. @inlinable
  1122. mutating func dropResources() -> OnDropResources {
  1123. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1124. let onDrop: OnDropResources
  1125. switch self._state {
  1126. case .initial(let state):
  1127. self._state = ._modifying
  1128. onDrop = .none
  1129. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1130. case .subscribed(var state):
  1131. self._state = ._modifying
  1132. onDrop = state.dropResources(error: error)
  1133. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1134. case .streaming(var state):
  1135. self._state = ._modifying
  1136. onDrop = state.dropResources(error: error)
  1137. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1138. case .finished(var state):
  1139. self._state = ._modifying
  1140. onDrop = state.dropResources(error: error)
  1141. self._state = .finished(state)
  1142. case ._modifying:
  1143. fatalError("Internal inconsistency")
  1144. }
  1145. return onDrop
  1146. }
  1147. }
  1148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1149. extension _BroadcastSequenceStateMachine {
  1150. /// A collection of elements tagged with an identifier.
  1151. ///
  1152. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1153. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1154. @usableFromInline
  1155. struct Elements: Sendable {
  1156. /// The ID of an element
  1157. @usableFromInline
  1158. struct ID: Hashable, Sendable, Comparable, Strideable {
  1159. @usableFromInline
  1160. private(set) var rawValue: Int
  1161. @usableFromInline
  1162. static var initial: Self {
  1163. ID(id: 0)
  1164. }
  1165. private init(id: Int) {
  1166. self.rawValue = id
  1167. }
  1168. @inlinable
  1169. mutating func formNext() {
  1170. self.rawValue += 1
  1171. }
  1172. @inlinable
  1173. func next() -> Self {
  1174. var copy = self
  1175. copy.formNext()
  1176. return copy
  1177. }
  1178. @inlinable
  1179. func distance(to other: Self) -> Int {
  1180. other.rawValue - self.rawValue
  1181. }
  1182. @inlinable
  1183. func advanced(by n: Int) -> Self {
  1184. var copy = self
  1185. copy.rawValue += n
  1186. return copy
  1187. }
  1188. @inlinable
  1189. static func < (lhs: Self, rhs: Self) -> Bool {
  1190. lhs.rawValue < rhs.rawValue
  1191. }
  1192. }
  1193. @usableFromInline
  1194. struct _IdentifiableElement: Sendable {
  1195. @usableFromInline
  1196. var element: Element
  1197. @usableFromInline
  1198. var id: ID
  1199. @inlinable
  1200. init(element: Element, id: ID) {
  1201. self.element = element
  1202. self.id = id
  1203. }
  1204. }
  1205. @usableFromInline
  1206. var _elements: Deque<_IdentifiableElement>
  1207. @usableFromInline
  1208. var _nextID: ID
  1209. @inlinable
  1210. init() {
  1211. self._nextID = .initial
  1212. self._elements = []
  1213. }
  1214. @inlinable
  1215. mutating func nextElementID() -> ID {
  1216. let id = self._nextID
  1217. self._nextID.formNext()
  1218. return id
  1219. }
  1220. /// The highest ID of the stored elements; `nil` if there are no elements.
  1221. @inlinable
  1222. var highestID: ID? { self._elements.last?.id }
  1223. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1224. @inlinable
  1225. var lowestID: ID? { self._elements.first?.id }
  1226. /// The number of stored elements.
  1227. @inlinable
  1228. var count: Int { self._elements.count }
  1229. /// Whether there are no stored elements.
  1230. @inlinable
  1231. var isEmpty: Bool { self._elements.isEmpty }
  1232. /// Appends an element to the collection.
  1233. @inlinable
  1234. mutating func append(_ element: Element) {
  1235. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1236. }
  1237. /// Removes the first element from the collection.
  1238. @discardableResult
  1239. @inlinable
  1240. mutating func removeFirst() -> Element {
  1241. let removed = self._elements.removeFirst()
  1242. return removed.element
  1243. }
  1244. @usableFromInline
  1245. enum ElementLookup {
  1246. /// The element was found in the collection.
  1247. case found(Element)
  1248. /// The element isn't in the collection, but it could be in the future.
  1249. case maybeAvailableLater
  1250. /// The element was in the collection, but is no longer available.
  1251. case noLongerAvailable
  1252. }
  1253. /// Lookup the element with the given ID.
  1254. ///
  1255. /// - Parameter id: The ID of the element to lookup.
  1256. @inlinable
  1257. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1258. guard let low = self.lowestID, let high = self.highestID else {
  1259. // Must be empty.
  1260. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1261. }
  1262. assert(low <= high)
  1263. let lookup: ElementLookup
  1264. if id < low {
  1265. lookup = .noLongerAvailable
  1266. } else if id > high {
  1267. lookup = .maybeAvailableLater
  1268. } else {
  1269. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1270. // into the deque by looking at the offsets.
  1271. let offset = low.distance(to: id)
  1272. let index = self._elements.startIndex.advanced(by: offset)
  1273. lookup = .found(self._elements[index].element)
  1274. }
  1275. return lookup
  1276. }
  1277. }
  1278. }
  1279. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1280. extension _BroadcastSequenceStateMachine {
  1281. /// A collection of subscriptions.
  1282. @usableFromInline
  1283. struct Subscriptions: Sendable {
  1284. @usableFromInline
  1285. struct ID: Hashable, Sendable {
  1286. @usableFromInline
  1287. private(set) var rawValue: Int
  1288. @inlinable
  1289. init() {
  1290. self.rawValue = 0
  1291. }
  1292. @inlinable
  1293. mutating func formNext() {
  1294. self.rawValue += 1
  1295. }
  1296. @inlinable
  1297. func next() -> Self {
  1298. var copy = self
  1299. copy.formNext()
  1300. return copy
  1301. }
  1302. }
  1303. @usableFromInline
  1304. struct _Subscriber: Sendable {
  1305. /// The ID of the subscriber.
  1306. @usableFromInline
  1307. var id: ID
  1308. /// The ID of the next element the subscriber will consume.
  1309. @usableFromInline
  1310. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1311. /// A continuation which which will be resumed when the next element becomes available.
  1312. @usableFromInline
  1313. var continuation: ConsumerContinuation?
  1314. @inlinable
  1315. init(
  1316. id: ID,
  1317. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1318. continuation: ConsumerContinuation? = nil
  1319. ) {
  1320. self.id = id
  1321. self.nextElementID = nextElementID
  1322. self.continuation = continuation
  1323. }
  1324. /// Returns and sets the continuation to `nil` if one exists.
  1325. ///
  1326. /// The next element ID is advanced if a contination exists.
  1327. ///
  1328. /// - Returns: The continuation, if one existed.
  1329. @inlinable
  1330. mutating func takeContinuation() -> ConsumerContinuation? {
  1331. guard let continuation = self.continuation else { return nil }
  1332. self.continuation = nil
  1333. self.nextElementID.formNext()
  1334. return continuation
  1335. }
  1336. }
  1337. @usableFromInline
  1338. var _subscribers: [_Subscriber]
  1339. @usableFromInline
  1340. var _nextSubscriberID: ID
  1341. @inlinable
  1342. init() {
  1343. self._subscribers = []
  1344. self._nextSubscriberID = ID()
  1345. }
  1346. /// Returns the number of subscribers.
  1347. @inlinable
  1348. var count: Int { self._subscribers.count }
  1349. /// Returns whether the collection is empty.
  1350. @inlinable
  1351. var isEmpty: Bool { self._subscribers.isEmpty }
  1352. /// Adds a new subscriber and returns its unique ID.
  1353. ///
  1354. /// - Returns: The ID of the new subscriber.
  1355. @inlinable
  1356. mutating func subscribe() -> ID {
  1357. let id = self._nextSubscriberID
  1358. self._nextSubscriberID.formNext()
  1359. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1360. return id
  1361. }
  1362. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1363. ///
  1364. /// - Parameters:
  1365. /// - id: The ID of the subscriber.
  1366. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1367. /// a boolean indicating whether the subscriber should be removed.
  1368. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1369. /// given ID.
  1370. @inlinable
  1371. mutating func withMutableElementID<R>(
  1372. forSubscriber id: ID,
  1373. _ body: (
  1374. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1375. ) -> (result: R, removeSubscription: Bool)
  1376. ) -> R? {
  1377. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1378. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1379. if removeSubscription {
  1380. self._subscribers.remove(at: index)
  1381. }
  1382. return result
  1383. }
  1384. /// Sets the continuation for the subscription with the given ID.
  1385. /// - Parameters:
  1386. /// - continuation: The continuation to set.
  1387. /// - id: The ID of the subscriber.
  1388. /// - Returns: A boolean indicating whether the continuation was set or not.
  1389. @inlinable
  1390. mutating func setContinuation(
  1391. _ continuation: ConsumerContinuation,
  1392. forSubscriber id: ID
  1393. ) -> Bool {
  1394. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1395. return false
  1396. }
  1397. assert(self._subscribers[index].continuation == nil)
  1398. self._subscribers[index].continuation = continuation
  1399. return true
  1400. }
  1401. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1402. @inlinable
  1403. func subscribers(
  1404. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1405. ) -> [ID] {
  1406. return self._subscribers.filter {
  1407. $0.nextElementID == id
  1408. }.map {
  1409. $0.id
  1410. }
  1411. }
  1412. /// Removes the subscriber with the given ID.
  1413. /// - Parameter id: The ID of the subscriber to remove.
  1414. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1415. /// associated with the subscriber.
  1416. @inlinable
  1417. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1418. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1419. return (false, nil)
  1420. }
  1421. let continuation = self._subscribers[index].continuation
  1422. self._subscribers.remove(at: index)
  1423. return (true, continuation)
  1424. }
  1425. /// Remove all subscribers in the given array of IDs.
  1426. @inlinable
  1427. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1428. self._subscribers.removeAll {
  1429. idsToRemove.contains($0.id)
  1430. }
  1431. }
  1432. /// Remove all subscribers and return their IDs.
  1433. @inlinable
  1434. mutating func removeAllSubscribers() -> [ID] {
  1435. let subscribers = self._subscribers.map { $0.id }
  1436. self._subscribers.removeAll()
  1437. return subscribers
  1438. }
  1439. /// Returns any continuations set on subscribers, unsetting at the same time.
  1440. @inlinable
  1441. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1442. // Avoid allocs if there's only one subscriber.
  1443. let count = self._countPendingContinuations()
  1444. let result: _OneOrMany<ConsumerContinuation>?
  1445. switch count {
  1446. case 0:
  1447. result = nil
  1448. case 1:
  1449. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1450. let continuation = self._subscribers[index].takeContinuation()!
  1451. result = .one(continuation)
  1452. default:
  1453. var continuations = [ConsumerContinuation]()
  1454. continuations.reserveCapacity(count)
  1455. for index in self._subscribers.indices {
  1456. if let continuation = self._subscribers[index].takeContinuation() {
  1457. continuations.append(continuation)
  1458. }
  1459. }
  1460. result = .many(continuations)
  1461. }
  1462. return result
  1463. }
  1464. /// Removes all subscribers which have continuations and return their continuations.
  1465. @inlinable
  1466. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
  1467. // Avoid allocs if there's only one subscriber.
  1468. let count = self._countPendingContinuations()
  1469. let result: _OneOrMany<ConsumerContinuation>
  1470. switch count {
  1471. case 0:
  1472. result = .many([])
  1473. case 1:
  1474. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1475. let subscription = self._subscribers.remove(at: index)
  1476. result = .one(subscription.continuation!)
  1477. default:
  1478. var continuations = [ConsumerContinuation]()
  1479. continuations.reserveCapacity(count)
  1480. var removable = [ID]()
  1481. removable.reserveCapacity(count)
  1482. for subscription in self._subscribers {
  1483. if let continuation = subscription.continuation {
  1484. continuations.append(continuation)
  1485. removable.append(subscription.id)
  1486. }
  1487. }
  1488. self._subscribers.removeAll {
  1489. removable.contains($0.id)
  1490. }
  1491. result = .many(continuations)
  1492. }
  1493. return result
  1494. }
  1495. @inlinable
  1496. func _countPendingContinuations() -> Int {
  1497. return self._subscribers.reduce(into: 0) { count, subscription in
  1498. if subscription.continuation != nil {
  1499. count += 1
  1500. }
  1501. }
  1502. }
  1503. }
  1504. }
  1505. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1506. extension _BroadcastSequenceStateMachine {
  1507. // TODO: tiny array
  1508. @usableFromInline
  1509. enum _OneOrMany<Value> {
  1510. case one(Value)
  1511. case many([Value])
  1512. }
  1513. }