BroadcastAsyncSequence.swift 54 KB


  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  18. ///
  19. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  20. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  21. ///
  22. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  23. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  24. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  25. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  26. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  27. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  28. ///
  29. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  30. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  31. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  32. @usableFromInline
  33. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  34. @usableFromInline
  35. let _storage: _BroadcastSequenceStorage<Element>
  36. @inlinable
  37. init(_storage: _BroadcastSequenceStorage<Element>) {
  38. self._storage = _storage
  39. }
  40. /// Make a new stream and continuation.
  41. ///
  42. /// - Parameters:
  43. /// - elementType: The type of element this sequence produces.
  44. /// - bufferSize: The number of elements this sequence may store.
  45. /// - Returns: A stream and continuation.
  46. @inlinable
  47. static func makeStream(
  48. of elementType: Element.Type = Element.self,
  49. bufferSize: Int
  50. ) -> (stream: Self, continuation: Self.Source) {
  51. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  52. let stream = Self(_storage: storage)
  53. let continuation = Self.Source(_storage: storage)
  54. return (stream, continuation)
  55. }
  56. @inlinable
  57. func makeAsyncIterator() -> AsyncIterator {
  58. let id = self._storage.subscribe()
  59. return AsyncIterator(_storage: _storage, id: id)
  60. }
  61. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  62. /// consume elements.
  63. ///
  64. /// This function can return `false` if there are active subscribers or the internal buffer no
  65. /// longer contains the first element in the sequence.
  66. @inlinable
  67. var isKnownSafeForNextSubscriber: Bool {
  68. self._storage.isKnownSafeForNextSubscriber
  69. }
  70. /// Invalidates all active subscribers.
  71. ///
  72. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  73. @inlinable
  74. func invalidateAllSubscriptions() {
  75. self._storage.invalidateAllSubscriptions()
  76. }
  77. }
  78. // MARK: - AsyncIterator
  79. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  80. extension BroadcastAsyncSequence {
  81. @usableFromInline
  82. struct AsyncIterator: AsyncIteratorProtocol {
  83. @usableFromInline
  84. let _storage: _BroadcastSequenceStorage<Element>
  85. @usableFromInline
  86. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  87. @inlinable
  88. init(
  89. _storage: _BroadcastSequenceStorage<Element>,
  90. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  91. ) {
  92. self._storage = _storage
  93. self._subscriberID = id
  94. }
  95. @inlinable
  96. mutating func next() async throws -> Element? {
  97. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  98. }
  99. }
  100. }
  101. // MARK: - Continuation
  102. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  103. extension BroadcastAsyncSequence {
  104. @usableFromInline
  105. struct Source: Sendable {
  106. @usableFromInline
  107. let _storage: _BroadcastSequenceStorage<Element>
  108. @usableFromInline
  109. init(_storage: _BroadcastSequenceStorage<Element>) {
  110. self._storage = _storage
  111. }
  112. @inlinable
  113. func write(_ element: Element) async throws {
  114. try await self._storage.yield(element)
  115. }
  116. @inlinable
  117. func finish(with result: Result<Void, Error>) {
  118. self._storage.finish(result)
  119. }
  120. @inlinable
  121. func finish() {
  122. self.finish(with: .success(()))
  123. }
  124. @inlinable
  125. func finish(throwing error: Error) {
  126. self.finish(with: .failure(error))
  127. }
  128. }
  129. }
  130. @usableFromInline
  131. enum BroadcastAsyncSequenceError: Error {
  132. /// The consumer was too slow.
  133. case consumingTooSlow
  134. /// The producer has already finished.
  135. case productionAlreadyFinished
  136. }
  137. // MARK: - Storage
  138. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  139. @usableFromInline
  140. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  141. @usableFromInline
  142. let _state: LockedValueBox<_BroadcastSequenceStateMachine<Element>>
  143. @inlinable
  144. init(bufferSize: Int) {
  145. self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  146. }
  147. deinit {
  148. let onDrop = self._state.withLockedValue { state in
  149. state.dropResources()
  150. }
  151. switch onDrop {
  152. case .none:
  153. ()
  154. case .resume(let consumers, let producers):
  155. consumers.resume()
  156. producers.resume()
  157. }
  158. }
  159. // MARK - Producer
  160. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  161. ///
  162. /// - Parameter element: The element to write.
  163. @inlinable
  164. func yield(_ element: Element) async throws {
  165. let onYield = self._state.withLockedValue { state in state.yield(element) }
  166. switch onYield {
  167. case .none:
  168. ()
  169. case .resume(let continuations):
  170. continuations.resume()
  171. case .suspend(let token):
  172. try await withTaskCancellationHandler {
  173. try await withCheckedThrowingContinuation { continuation in
  174. let onProduceMore = self._state.withLockedValue { state in
  175. state.waitToProduceMore(continuation: continuation, token: token)
  176. }
  177. switch onProduceMore {
  178. case .resume(let continuation, let result):
  179. continuation.resume(with: result)
  180. case .none:
  181. ()
  182. }
  183. }
  184. } onCancel: {
  185. let onCancel = self._state.withLockedValue { state in
  186. state.cancelProducer(withToken: token)
  187. }
  188. switch onCancel {
  189. case .resume(let continuation, let result):
  190. continuation.resume(with: result)
  191. case .none:
  192. ()
  193. }
  194. }
  195. case .throwAlreadyFinished:
  196. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  197. }
  198. }
  199. /// Indicate that no more values will be produced.
  200. ///
  201. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  202. @inlinable
  203. func finish(_ result: Result<Void, Error>) {
  204. let action = self._state.withLockedValue { state in state.finish(result: result) }
  205. switch action {
  206. case .none:
  207. ()
  208. case .resume(let subscribers, let producers):
  209. subscribers.resume()
  210. producers.resume()
  211. }
  212. }
  213. // MARK: - Consumer
  214. /// Create a subscription to the stream.
  215. ///
  216. /// - Returns: Returns a unique subscription ID.
  217. @inlinable
  218. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  219. self._state.withLockedValue { $0.subscribe() }
  220. }
  221. /// Returns the next element for the given subscriber, if it is available.
  222. ///
  223. /// - Parameter id: The ID of the subscriber requesting the element.
  224. /// - Returns: The next element or `nil` if the stream has been terminated.
  225. @inlinable
  226. func nextElement(
  227. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  228. ) async throws -> Element? {
  229. return try await withTaskCancellationHandler {
  230. self._state.unsafe.lock()
  231. let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
  232. $0.nextElement(forSubscriber: id)
  233. }
  234. switch onNext {
  235. case .return(let returnAndProduceMore):
  236. self._state.unsafe.unlock()
  237. returnAndProduceMore.producers.resume()
  238. return try returnAndProduceMore.nextResult.get()
  239. case .suspend:
  240. return try await withCheckedThrowingContinuation { continuation in
  241. let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
  242. state.setContinuation(continuation, forSubscription: id)
  243. }
  244. self._state.unsafe.unlock()
  245. switch onSetContinuation {
  246. case .resume(let continuation, let result):
  247. continuation.resume(with: result)
  248. case .none:
  249. ()
  250. }
  251. }
  252. }
  253. } onCancel: {
  254. let onCancel = self._state.withLockedValue { state in
  255. state.cancelSubscription(withID: id)
  256. }
  257. switch onCancel {
  258. case .resume(let continuation, let result):
  259. continuation.resume(with: result)
  260. case .none:
  261. ()
  262. }
  263. }
  264. }
  265. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  266. /// elements.
  267. @inlinable
  268. var isKnownSafeForNextSubscriber: Bool {
  269. self._state.withLockedValue { state in
  270. state.nextSubscriptionIsValid
  271. }
  272. }
  273. /// Invalidates all active subscriptions.
  274. @inlinable
  275. func invalidateAllSubscriptions() {
  276. let action = self._state.withLockedValue { state in
  277. state.invalidateAllSubscriptions()
  278. }
  279. switch action {
  280. case .resume(let continuations):
  281. continuations.resume()
  282. case .none:
  283. ()
  284. }
  285. }
  286. }
  287. // MARK: - State machine
  288. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  289. @usableFromInline
  290. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  291. @usableFromInline
  292. typealias ConsumerContinuation = CheckedContinuation<Element?, Error>
  293. @usableFromInline
  294. typealias ProducerContinuation = CheckedContinuation<Void, Error>
  295. @usableFromInline
  296. struct ConsumerContinuations {
  297. @usableFromInline
  298. var continuations: _OneOrMany<ConsumerContinuation>
  299. @usableFromInline
  300. var result: Result<Element?, Error>
  301. @inlinable
  302. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, Error>) {
  303. self.continuations = continuations
  304. self.result = result
  305. }
  306. @inlinable
  307. func resume() {
  308. switch self.continuations {
  309. case .one(let continuation):
  310. continuation.resume(with: self.result)
  311. case .many(let continuations):
  312. for continuation in continuations {
  313. continuation.resume(with: self.result)
  314. }
  315. }
  316. }
  317. }
  318. @usableFromInline
  319. struct ProducerContinuations {
  320. @usableFromInline
  321. var continuations: [ProducerContinuation]
  322. @usableFromInline
  323. var result: Result<Void, Error>
  324. @inlinable
  325. init(continuations: [ProducerContinuation], result: Result<Void, Error>) {
  326. self.continuations = continuations
  327. self.result = result
  328. }
  329. @inlinable
  330. func resume() {
  331. for continuation in self.continuations {
  332. continuation.resume(with: self.result)
  333. }
  334. }
  335. }
  336. @usableFromInline
  337. enum State: Sendable {
  338. /// No subscribers and no elements have been produced.
  339. case initial(Initial)
  340. /// Subscribers exist but no elements have been produced.
  341. case subscribed(Subscribed)
  342. /// Elements have been produced, there may or may not be subscribers.
  343. case streaming(Streaming)
  344. /// No more elements will be produced. There may or may not been subscribers.
  345. case finished(Finished)
  346. /// Temporary state to avoid CoWs.
  347. case _modifying
  348. @inlinable
  349. init(bufferSize: Int) {
  350. self = .initial(Initial(bufferSize: bufferSize))
  351. }
  352. @usableFromInline
  353. struct Initial: Sendable {
  354. @usableFromInline
  355. let bufferSize: Int
  356. @inlinable
  357. init(bufferSize: Int) {
  358. self.bufferSize = bufferSize
  359. }
  360. }
  361. @usableFromInline
  362. struct Subscribed: Sendable {
  363. /// Active subscriptions.
  364. @usableFromInline
  365. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  366. /// Subscriptions to fail and remove when they next request an element.
  367. @usableFromInline
  368. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  369. /// The maximum size of the element buffer.
  370. @usableFromInline
  371. let bufferSize: Int
  372. @inlinable
  373. init(from state: Initial) {
  374. self.subscriptions = Subscriptions()
  375. self.subscriptionsToDrop = []
  376. self.bufferSize = state.bufferSize
  377. }
  378. @inlinable
  379. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  380. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  381. return .resume(
  382. .init(continuations: continuations, result: result.map { nil }),
  383. .init(continuations: [], result: .success(()))
  384. )
  385. }
  386. @inlinable
  387. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  388. // Not streaming, so suspend or remove if the subscription should be dropped.
  389. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  390. return .suspend
  391. }
  392. self.subscriptionsToDrop.remove(at: index)
  393. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  394. }
  395. @inlinable
  396. mutating func cancel(
  397. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  398. ) -> OnCancelSubscription {
  399. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  400. if let continuation = continuation {
  401. return .resume(continuation, .failure(CancellationError()))
  402. } else {
  403. return .none
  404. }
  405. }
  406. @inlinable
  407. mutating func setContinuation(
  408. _ continuation: ConsumerContinuation,
  409. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  410. ) -> OnSetContinuation {
  411. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  412. return .none
  413. } else {
  414. return .resume(continuation, .failure(CancellationError()))
  415. }
  416. }
  417. @inlinable
  418. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  419. self.subscriptions.subscribe()
  420. }
  421. @inlinable
  422. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  423. // Remove subscriptions with continuations, they need to be failed.
  424. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  425. let consumerContinuations = ConsumerContinuations(
  426. continuations: continuations,
  427. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  428. )
  429. // Remove any others to be failed when they next call 'next'.
  430. let ids = self.subscriptions.removeAllSubscribers()
  431. self.subscriptionsToDrop.append(contentsOf: ids)
  432. return .resume(consumerContinuations)
  433. }
  434. @inlinable
  435. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  436. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  437. let consumerContinuations = ConsumerContinuations(
  438. continuations: continuations,
  439. result: .failure(error)
  440. )
  441. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  442. return .resume(consumerContinuations, producerContinuations)
  443. }
  444. }
  445. @usableFromInline
  446. struct Streaming: Sendable {
  447. /// A deque of elements tagged with IDs.
  448. @usableFromInline
  449. var elements: Elements
  450. /// The maximum size of the element buffer.
  451. @usableFromInline
  452. let bufferSize: Int
  453. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  454. /// Producers which have been suspended.
  455. @usableFromInline
  456. var producers: [(ProducerContinuation, Int)]
  457. /// The IDs of producers which have been cancelled.
  458. @usableFromInline
  459. var cancelledProducers: [Int]
  460. /// The next token for a producer.
  461. @usableFromInline
  462. var producerToken: Int
  463. /// Active subscriptions.
  464. @usableFromInline
  465. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  466. /// Subscriptions to fail and remove when they next request an element.
  467. @usableFromInline
  468. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  469. @inlinable
  470. init(from state: Initial) {
  471. self.elements = Elements()
  472. self.producers = []
  473. self.producerToken = 0
  474. self.cancelledProducers = []
  475. self.subscriptions = Subscriptions()
  476. self.subscriptionsToDrop = []
  477. self.bufferSize = state.bufferSize
  478. }
  479. @inlinable
  480. init(from state: Subscribed) {
  481. self.elements = Elements()
  482. self.producers = []
  483. self.producerToken = 0
  484. self.cancelledProducers = []
  485. self.subscriptions = state.subscriptions
  486. self.subscriptionsToDrop = state.subscriptionsToDrop
  487. self.bufferSize = state.bufferSize
  488. }
  489. @inlinable
  490. mutating func append(_ element: Element) -> OnYield {
  491. let onYield: OnYield
  492. self.elements.append(element)
  493. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  494. // If the buffer is too large then:
  495. // - if all subscribers are equally slow suspend the producer
  496. // - if some subscribers are slow then remove them and the oldest value
  497. // - if no subscribers are slow then remove the oldest value
  498. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  499. switch slowConsumers.count {
  500. case 0:
  501. if self.subscriptions.isEmpty {
  502. // No consumers.
  503. let token = self.producerToken
  504. self.producerToken += 1
  505. onYield = .suspend(token)
  506. } else {
  507. // No consumers are slow, some subscribers exist, a subset of which are waiting
  508. // for a value. Drop the oldest value and resume the fastest consumers.
  509. self.elements.removeFirst()
  510. let continuations = self.subscriptions.takeContinuations().map {
  511. ConsumerContinuations(continuations: $0, result: .success(element))
  512. }
  513. if let continuations = continuations {
  514. onYield = .resume(continuations)
  515. } else {
  516. onYield = .none
  517. }
  518. }
  519. case self.subscriptions.count:
  520. // All consumers are slow; stop the production of new value.
  521. let token = self.producerToken
  522. self.producerToken += 1
  523. onYield = .suspend(token)
  524. default:
  525. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  526. // oldest value.
  527. self.elements.removeFirst()
  528. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  529. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  530. onYield = .none
  531. }
  532. } else {
  533. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  534. // must be waiting for the value we just appended.
  535. let continuations = self.subscriptions.takeContinuations().map {
  536. ConsumerContinuations(continuations: $0, result: .success(element))
  537. }
  538. if let continuations = continuations {
  539. onYield = .resume(continuations)
  540. } else {
  541. onYield = .none
  542. }
  543. }
  544. return onYield
  545. }
  546. @inlinable
  547. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  548. let onNext: OnNext
  549. // 1. Lookup the subscriber by ID to get their next offset
  550. // 2. If the element exists, update the element pointer and return the element
  551. // 3. Else if the ID is in the future, wait
  552. // 4. Else the ID is in the past, fail and remove the subscription.
  553. // Lookup the subscriber with the given ID.
  554. let onNextForSubscription = self.subscriptions.withMutableElementID(
  555. forSubscriber: id
  556. ) { elementID -> (OnNext, Bool) in
  557. let onNext: OnNext
  558. let removeSubscription: Bool
  559. // Subscriber exists; do we have the element it requires next?
  560. switch self.elements.lookupElement(withID: elementID) {
  561. case .found(let element):
  562. // Element exists in the buffer. Advance our element ID.
  563. elementID.formNext()
  564. onNext = .return(.init(nextResult: .success(element)))
  565. removeSubscription = false
  566. case .maybeAvailableLater:
  567. // Element may exist in the future.
  568. onNext = .suspend
  569. removeSubscription = false
  570. case .noLongerAvailable:
  571. // Element existed in the past but was dropped from the buffer.
  572. onNext = .return(
  573. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  574. )
  575. removeSubscription = true
  576. }
  577. return (onNext, removeSubscription)
  578. }
  579. switch onNextForSubscription {
  580. case .return(var resultAndResume):
  581. // The producer only suspends when all consumers are equally slow or there are no
  582. // consumers at all. The latter can't be true: this function can only be called by a
  583. // consumer. The former can't be true anymore because consumption isn't concurrent
  584. // so this consumer must be faster than the others so let the producer resume.
  585. //
  586. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  587. // to produce until the producer provides more values.
  588. resultAndResume.producers = ProducerContinuations(
  589. continuations: self.producers.map { $0.0 },
  590. result: .success(())
  591. )
  592. self.producers.removeAll()
  593. onNext = .return(resultAndResume)
  594. case .suspend:
  595. onNext = .suspend
  596. case .none:
  597. // No subscription found, must have been dropped or already finished.
  598. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  599. self.subscriptionsToDrop.remove(at: index)
  600. onNext = .return(
  601. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  602. )
  603. } else {
  604. // Unknown subscriber, i.e. already finished.
  605. onNext = .return(.init(nextResult: .success(nil)))
  606. }
  607. }
  608. return onNext
  609. }
  610. @inlinable
  611. mutating func setContinuation(
  612. _ continuation: ConsumerContinuation,
  613. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  614. ) -> OnSetContinuation {
  615. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  616. return .none
  617. } else {
  618. return .resume(continuation, .failure(CancellationError()))
  619. }
  620. }
  621. @inlinable
  622. mutating func cancel(
  623. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  624. ) -> OnCancelSubscription {
  625. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  626. if let continuation = continuation {
  627. return .resume(continuation, .failure(CancellationError()))
  628. } else {
  629. return .none
  630. }
  631. }
  632. @inlinable
  633. mutating func waitToProduceMore(
  634. _ continuation: ProducerContinuation,
  635. token: Int
  636. ) -> OnWaitToProduceMore {
  637. let onWaitToProduceMore: OnWaitToProduceMore
  638. if self.elements.count < self.bufferSize {
  639. // Buffer has free space, no need to suspend.
  640. onWaitToProduceMore = .resume(continuation, .success(()))
  641. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  642. // Producer was cancelled before suspending.
  643. self.cancelledProducers.remove(at: index)
  644. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  645. } else {
  646. // Store the continuation to resume later.
  647. self.producers.append((continuation, token))
  648. onWaitToProduceMore = .none
  649. }
  650. return onWaitToProduceMore
  651. }
  652. @inlinable
  653. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  654. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  655. self.cancelledProducers.append(token)
  656. return .none
  657. }
  658. let (continuation, _) = self.producers.remove(at: index)
  659. return .resume(continuation, .failure(CancellationError()))
  660. }
  661. @inlinable
  662. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  663. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  664. let producers = self.producers.map { $0.0 }
  665. self.producers.removeAll()
  666. return .resume(
  667. .init(continuations: continuations, result: result.map { nil }),
  668. .init(continuations: producers, result: .success(()))
  669. )
  670. }
  671. @inlinable
  672. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  673. self.subscriptions.subscribe()
  674. }
  675. @inlinable
  676. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  677. // Remove subscriptions with continuations, they need to be failed.
  678. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  679. let consumerContinuations = ConsumerContinuations(
  680. continuations: continuations,
  681. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  682. )
  683. // Remove any others to be failed when they next call 'next'.
  684. let ids = self.subscriptions.removeAllSubscribers()
  685. self.subscriptionsToDrop.append(contentsOf: ids)
  686. return .resume(consumerContinuations)
  687. }
  688. @inlinable
  689. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  690. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  691. let consumerContinuations = ConsumerContinuations(
  692. continuations: continuations,
  693. result: .failure(error)
  694. )
  695. let producers = ProducerContinuations(
  696. continuations: self.producers.map { $0.0 },
  697. result: .failure(error)
  698. )
  699. self.producers.removeAll()
  700. return .resume(consumerContinuations, producers)
  701. }
  702. @inlinable
  703. func nextSubscriptionIsValid() -> Bool {
  704. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  705. }
  706. }
  707. @usableFromInline
  708. struct Finished: Sendable {
  709. /// A deque of elements tagged with IDs.
  710. @usableFromInline
  711. var elements: Elements
  712. /// Active subscriptions.
  713. @usableFromInline
  714. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  715. /// Subscriptions to fail and remove when they next request an element.
  716. @usableFromInline
  717. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  718. /// The terminating result of the sequence.
  719. @usableFromInline
  720. let result: Result<Void, Error>
  721. @inlinable
  722. init(from state: Initial, result: Result<Void, Error>) {
  723. self.elements = Elements()
  724. self.subscriptions = Subscriptions()
  725. self.subscriptionsToDrop = []
  726. self.result = result
  727. }
  728. @inlinable
  729. init(from state: Subscribed, result: Result<Void, Error>) {
  730. self.elements = Elements()
  731. self.subscriptions = state.subscriptions
  732. self.subscriptionsToDrop = []
  733. self.result = result
  734. }
  735. @inlinable
  736. init(from state: Streaming, result: Result<Void, Error>) {
  737. self.elements = state.elements
  738. self.subscriptions = state.subscriptions
  739. self.subscriptionsToDrop = state.subscriptionsToDrop
  740. self.result = result
  741. }
  742. @inlinable
  743. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  744. let onNext: OnNext
  745. let onNextForSubscription = self.subscriptions.withMutableElementID(
  746. forSubscriber: id
  747. ) { elementID -> (OnNext, Bool) in
  748. let onNext: OnNext
  749. let removeSubscription: Bool
  750. switch self.elements.lookupElement(withID: elementID) {
  751. case .found(let element):
  752. elementID.formNext()
  753. onNext = .return(.init(nextResult: .success(element)))
  754. removeSubscription = false
  755. case .maybeAvailableLater:
  756. onNext = .return(.init(nextResult: self.result.map { nil }))
  757. removeSubscription = true
  758. case .noLongerAvailable:
  759. onNext = .return(
  760. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  761. )
  762. removeSubscription = true
  763. }
  764. return (onNext, removeSubscription)
  765. }
  766. switch onNextForSubscription {
  767. case .return(let result):
  768. onNext = .return(result)
  769. case .none:
  770. // No subscriber with the given ID, it was likely dropped previously.
  771. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  772. self.subscriptionsToDrop.remove(at: index)
  773. onNext = .return(
  774. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  775. )
  776. } else {
  777. // Unknown subscriber, i.e. already finished.
  778. onNext = .return(.init(nextResult: .success(nil)))
  779. }
  780. case .suspend:
  781. fatalError("Internal inconsistency")
  782. }
  783. return onNext
  784. }
  785. @inlinable
  786. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  787. self.subscriptions.subscribe()
  788. }
  789. @inlinable
  790. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  791. // Remove subscriptions with continuations, they need to be failed.
  792. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  793. let consumerContinuations = ConsumerContinuations(
  794. continuations: continuations,
  795. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  796. )
  797. // Remove any others to be failed when they next call 'next'.
  798. let ids = self.subscriptions.removeAllSubscribers()
  799. self.subscriptionsToDrop.append(contentsOf: ids)
  800. return .resume(consumerContinuations)
  801. }
  802. @inlinable
  803. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  804. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  805. let consumerContinuations = ConsumerContinuations(
  806. continuations: continuations,
  807. result: .failure(error)
  808. )
  809. let producers = ProducerContinuations(continuations: [], result: .failure(error))
  810. return .resume(consumerContinuations, producers)
  811. }
  812. @inlinable
  813. func nextSubscriptionIsValid() -> Bool {
  814. self.elements.lowestID == .initial
  815. }
  816. @inlinable
  817. mutating func cancel(
  818. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  819. ) -> OnCancelSubscription {
  820. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  821. if let continuation = continuation {
  822. return .resume(continuation, .failure(CancellationError()))
  823. } else {
  824. return .none
  825. }
  826. }
  827. }
  828. }
  829. @usableFromInline
  830. var _state: State
  831. @inlinable
  832. init(bufferSize: Int) {
  833. self._state = State(bufferSize: bufferSize)
  834. }
  835. @inlinable
  836. var nextSubscriptionIsValid: Bool {
  837. let isValid: Bool
  838. switch self._state {
  839. case .initial:
  840. isValid = true
  841. case .subscribed:
  842. isValid = true
  843. case .streaming(let state):
  844. isValid = state.nextSubscriptionIsValid()
  845. case .finished(let state):
  846. isValid = state.nextSubscriptionIsValid()
  847. case ._modifying:
  848. fatalError("Internal inconsistency")
  849. }
  850. return isValid
  851. }
  852. @usableFromInline
  853. enum OnInvalidateAllSubscriptions {
  854. case resume(ConsumerContinuations)
  855. case none
  856. }
  857. @inlinable
  858. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  859. let onCancel: OnInvalidateAllSubscriptions
  860. switch self._state {
  861. case .initial:
  862. onCancel = .none
  863. case .subscribed(var state):
  864. self._state = ._modifying
  865. onCancel = state.invalidateAllSubscriptions()
  866. self._state = .subscribed(state)
  867. case .streaming(var state):
  868. self._state = ._modifying
  869. onCancel = state.invalidateAllSubscriptions()
  870. self._state = .streaming(state)
  871. case .finished(var state):
  872. self._state = ._modifying
  873. onCancel = state.invalidateAllSubscriptions()
  874. self._state = .finished(state)
  875. case ._modifying:
  876. fatalError("Internal inconsistency")
  877. }
  878. return onCancel
  879. }
  880. @usableFromInline
  881. enum OnYield {
  882. case none
  883. case suspend(Int)
  884. case resume(ConsumerContinuations)
  885. case throwAlreadyFinished
  886. }
  887. @inlinable
  888. mutating func yield(_ element: Element) -> OnYield {
  889. let onYield: OnYield
  890. switch self._state {
  891. case .initial(let state):
  892. self._state = ._modifying
  893. // Move to streaming.
  894. var state = State.Streaming(from: state)
  895. onYield = state.append(element)
  896. self._state = .streaming(state)
  897. case .subscribed(let state):
  898. self._state = ._modifying
  899. var state = State.Streaming(from: state)
  900. onYield = state.append(element)
  901. self._state = .streaming(state)
  902. case .streaming(var state):
  903. self._state = ._modifying
  904. onYield = state.append(element)
  905. self._state = .streaming(state)
  906. case .finished:
  907. onYield = .throwAlreadyFinished
  908. case ._modifying:
  909. fatalError("Internal inconsistency")
  910. }
  911. return onYield
  912. }
  913. @usableFromInline
  914. enum OnFinish {
  915. case none
  916. case resume(ConsumerContinuations, ProducerContinuations)
  917. }
  918. @inlinable
  919. mutating func finish(result: Result<Void, Error>) -> OnFinish {
  920. let onFinish: OnFinish
  921. switch self._state {
  922. case .initial(let state):
  923. self._state = ._modifying
  924. let state = State.Finished(from: state, result: result)
  925. self._state = .finished(state)
  926. onFinish = .none
  927. case .subscribed(var state):
  928. self._state = ._modifying
  929. onFinish = state.finish(result: result)
  930. self._state = .finished(State.Finished(from: state, result: result))
  931. case .streaming(var state):
  932. self._state = ._modifying
  933. onFinish = state.finish(result: result)
  934. self._state = .finished(State.Finished(from: state, result: result))
  935. case .finished:
  936. onFinish = .none
  937. case ._modifying:
  938. fatalError("Internal inconsistency")
  939. }
  940. return onFinish
  941. }
  942. @usableFromInline
  943. enum OnNext {
  944. @usableFromInline
  945. struct ReturnAndResumeProducers {
  946. @usableFromInline
  947. var nextResult: Result<Element?, Error>
  948. @usableFromInline
  949. var producers: ProducerContinuations
  950. @inlinable
  951. init(
  952. nextResult: Result<Element?, Error>,
  953. producers: [ProducerContinuation] = [],
  954. producerResult: Result<Void, Error> = .success(())
  955. ) {
  956. self.nextResult = nextResult
  957. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  958. }
  959. }
  960. case `return`(ReturnAndResumeProducers)
  961. case suspend
  962. }
  963. @inlinable
  964. mutating func nextElement(
  965. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  966. ) -> OnNext {
  967. let onNext: OnNext
  968. switch self._state {
  969. case .initial:
  970. // No subscribers so demand isn't possible.
  971. fatalError("Internal inconsistency")
  972. case .subscribed(var state):
  973. self._state = ._modifying
  974. onNext = state.next(id)
  975. self._state = .subscribed(state)
  976. case .streaming(var state):
  977. self._state = ._modifying
  978. onNext = state.next(id)
  979. self._state = .streaming(state)
  980. case .finished(var state):
  981. self._state = ._modifying
  982. onNext = state.next(id)
  983. self._state = .finished(state)
  984. case ._modifying:
  985. fatalError("Internal inconsistency")
  986. }
  987. return onNext
  988. }
  989. @usableFromInline
  990. enum OnSetContinuation {
  991. case none
  992. case resume(ConsumerContinuation, Result<Element?, Error>)
  993. }
  994. @inlinable
  995. mutating func setContinuation(
  996. _ continuation: ConsumerContinuation,
  997. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  998. ) -> OnSetContinuation {
  999. let onSetContinuation: OnSetContinuation
  1000. switch self._state {
  1001. case .initial:
  1002. // No subscribers so demand isn't possible.
  1003. fatalError("Internal inconsistency")
  1004. case .subscribed(var state):
  1005. self._state = ._modifying
  1006. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1007. self._state = .subscribed(state)
  1008. case .streaming(var state):
  1009. self._state = ._modifying
  1010. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1011. self._state = .streaming(state)
  1012. case .finished(let state):
  1013. onSetContinuation = .resume(continuation, state.result.map { _ in nil })
  1014. case ._modifying:
  1015. // All values must have been produced, nothing to wait for.
  1016. fatalError("Internal inconsistency")
  1017. }
  1018. return onSetContinuation
  1019. }
  1020. @usableFromInline
  1021. enum OnCancelSubscription {
  1022. case none
  1023. case resume(ConsumerContinuation, Result<Element?, Error>)
  1024. }
  1025. @inlinable
  1026. mutating func cancelSubscription(
  1027. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1028. ) -> OnCancelSubscription {
  1029. let onCancel: OnCancelSubscription
  1030. switch self._state {
  1031. case .initial:
  1032. // No subscribers so demand isn't possible.
  1033. fatalError("Internal inconsistency")
  1034. case .subscribed(var state):
  1035. self._state = ._modifying
  1036. onCancel = state.cancel(id)
  1037. self._state = .subscribed(state)
  1038. case .streaming(var state):
  1039. self._state = ._modifying
  1040. onCancel = state.cancel(id)
  1041. self._state = .streaming(state)
  1042. case .finished(var state):
  1043. self._state = ._modifying
  1044. onCancel = state.cancel(id)
  1045. self._state = .finished(state)
  1046. case ._modifying:
  1047. // All values must have been produced, nothing to wait for.
  1048. fatalError("Internal inconsistency")
  1049. }
  1050. return onCancel
  1051. }
  1052. @usableFromInline
  1053. enum OnSubscribe {
  1054. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1055. }
  1056. @inlinable
  1057. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1058. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1059. switch self._state {
  1060. case .initial(let state):
  1061. self._state = ._modifying
  1062. var state = State.Subscribed(from: state)
  1063. id = state.subscribe()
  1064. self._state = .subscribed(state)
  1065. case .subscribed(var state):
  1066. self._state = ._modifying
  1067. id = state.subscribe()
  1068. self._state = .subscribed(state)
  1069. case .streaming(var state):
  1070. self._state = ._modifying
  1071. id = state.subscribe()
  1072. self._state = .streaming(state)
  1073. case .finished(var state):
  1074. self._state = ._modifying
  1075. id = state.subscribe()
  1076. self._state = .finished(state)
  1077. case ._modifying:
  1078. fatalError("Internal inconsistency")
  1079. }
  1080. return id
  1081. }
  1082. @usableFromInline
  1083. enum OnWaitToProduceMore {
  1084. case none
  1085. case resume(ProducerContinuation, Result<Void, Error>)
  1086. }
  1087. @inlinable
  1088. mutating func waitToProduceMore(
  1089. continuation: ProducerContinuation,
  1090. token: Int
  1091. ) -> OnWaitToProduceMore {
  1092. let onWaitToProduceMore: OnWaitToProduceMore
  1093. switch self._state {
  1094. case .initial, .subscribed:
  1095. // Nothing produced yet, so no reason have to wait to produce.
  1096. fatalError("Internal inconsistency")
  1097. case .streaming(var state):
  1098. self._state = ._modifying
  1099. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1100. self._state = .streaming(state)
  1101. case .finished:
  1102. onWaitToProduceMore = .resume(continuation, .success(()))
  1103. case ._modifying:
  1104. fatalError("Internal inconsistency")
  1105. }
  1106. return onWaitToProduceMore
  1107. }
  1108. @usableFromInline
  1109. typealias OnCancelProducer = OnWaitToProduceMore
  1110. @inlinable
  1111. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1112. let onCancelProducer: OnCancelProducer
  1113. switch self._state {
  1114. case .initial, .subscribed:
  1115. // Nothing produced yet, so no reason have to wait to produce.
  1116. fatalError("Internal inconsistency")
  1117. case .streaming(var state):
  1118. self._state = ._modifying
  1119. onCancelProducer = state.cancelProducer(withToken: token)
  1120. self._state = .streaming(state)
  1121. case .finished:
  1122. // No producers to cancel; do nothing.
  1123. onCancelProducer = .none
  1124. case ._modifying:
  1125. fatalError("Internal inconsistency")
  1126. }
  1127. return onCancelProducer
  1128. }
  1129. @usableFromInline
  1130. enum OnDropResources {
  1131. case none
  1132. case resume(ConsumerContinuations, ProducerContinuations)
  1133. }
  1134. @inlinable
  1135. mutating func dropResources() -> OnDropResources {
  1136. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1137. let onDrop: OnDropResources
  1138. switch self._state {
  1139. case .initial(let state):
  1140. self._state = ._modifying
  1141. onDrop = .none
  1142. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1143. case .subscribed(var state):
  1144. self._state = ._modifying
  1145. onDrop = state.dropResources(error: error)
  1146. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1147. case .streaming(var state):
  1148. self._state = ._modifying
  1149. onDrop = state.dropResources(error: error)
  1150. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1151. case .finished(var state):
  1152. self._state = ._modifying
  1153. onDrop = state.dropResources(error: error)
  1154. self._state = .finished(state)
  1155. case ._modifying:
  1156. fatalError("Internal inconsistency")
  1157. }
  1158. return onDrop
  1159. }
  1160. }
  1161. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1162. extension _BroadcastSequenceStateMachine {
  1163. /// A collection of elements tagged with an identifier.
  1164. ///
  1165. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1166. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1167. @usableFromInline
  1168. struct Elements: Sendable {
  1169. /// The ID of an element
  1170. @usableFromInline
  1171. struct ID: Hashable, Sendable, Comparable, Strideable {
  1172. @usableFromInline
  1173. private(set) var rawValue: Int
  1174. @usableFromInline
  1175. static var initial: Self {
  1176. ID(id: 0)
  1177. }
  1178. private init(id: Int) {
  1179. self.rawValue = id
  1180. }
  1181. @inlinable
  1182. mutating func formNext() {
  1183. self.rawValue += 1
  1184. }
  1185. @inlinable
  1186. func next() -> Self {
  1187. var copy = self
  1188. copy.formNext()
  1189. return copy
  1190. }
  1191. @inlinable
  1192. func distance(to other: Self) -> Int {
  1193. other.rawValue - self.rawValue
  1194. }
  1195. @inlinable
  1196. func advanced(by n: Int) -> Self {
  1197. var copy = self
  1198. copy.rawValue += n
  1199. return copy
  1200. }
  1201. @inlinable
  1202. static func < (lhs: Self, rhs: Self) -> Bool {
  1203. lhs.rawValue < rhs.rawValue
  1204. }
  1205. }
  1206. @usableFromInline
  1207. struct _IdentifiableElement: Sendable {
  1208. @usableFromInline
  1209. var element: Element
  1210. @usableFromInline
  1211. var id: ID
  1212. @inlinable
  1213. init(element: Element, id: ID) {
  1214. self.element = element
  1215. self.id = id
  1216. }
  1217. }
  1218. @usableFromInline
  1219. var _elements: Deque<_IdentifiableElement>
  1220. @usableFromInline
  1221. var _nextID: ID
  1222. @inlinable
  1223. init() {
  1224. self._nextID = .initial
  1225. self._elements = []
  1226. }
  1227. @inlinable
  1228. mutating func nextElementID() -> ID {
  1229. let id = self._nextID
  1230. self._nextID.formNext()
  1231. return id
  1232. }
  1233. /// The highest ID of the stored elements; `nil` if there are no elements.
  1234. @inlinable
  1235. var highestID: ID? { self._elements.last?.id }
  1236. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1237. @inlinable
  1238. var lowestID: ID? { self._elements.first?.id }
  1239. /// The number of stored elements.
  1240. @inlinable
  1241. var count: Int { self._elements.count }
  1242. /// Whether there are no stored elements.
  1243. @inlinable
  1244. var isEmpty: Bool { self._elements.isEmpty }
  1245. /// Appends an element to the collection.
  1246. @inlinable
  1247. mutating func append(_ element: Element) {
  1248. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1249. }
  1250. /// Removes the first element from the collection.
  1251. @discardableResult
  1252. @inlinable
  1253. mutating func removeFirst() -> Element {
  1254. let removed = self._elements.removeFirst()
  1255. return removed.element
  1256. }
  1257. @usableFromInline
  1258. enum ElementLookup {
  1259. /// The element was found in the collection.
  1260. case found(Element)
  1261. /// The element isn't in the collection, but it could be in the future.
  1262. case maybeAvailableLater
  1263. /// The element was in the collection, but is no longer available.
  1264. case noLongerAvailable
  1265. }
  1266. /// Lookup the element with the given ID.
  1267. ///
  1268. /// - Parameter id: The ID of the element to lookup.
  1269. @inlinable
  1270. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1271. guard let low = self.lowestID, let high = self.highestID else {
  1272. // Must be empty.
  1273. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1274. }
  1275. assert(low <= high)
  1276. let lookup: ElementLookup
  1277. if id < low {
  1278. lookup = .noLongerAvailable
  1279. } else if id > high {
  1280. lookup = .maybeAvailableLater
  1281. } else {
  1282. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1283. // into the deque by looking at the offsets.
  1284. let offset = low.distance(to: id)
  1285. let index = self._elements.startIndex.advanced(by: offset)
  1286. lookup = .found(self._elements[index].element)
  1287. }
  1288. return lookup
  1289. }
  1290. }
  1291. }
  1292. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1293. extension _BroadcastSequenceStateMachine {
  1294. /// A collection of subscriptions.
  1295. @usableFromInline
  1296. struct Subscriptions: Sendable {
  1297. @usableFromInline
  1298. struct ID: Hashable, Sendable {
  1299. @usableFromInline
  1300. private(set) var rawValue: Int
  1301. @inlinable
  1302. init() {
  1303. self.rawValue = 0
  1304. }
  1305. @inlinable
  1306. mutating func formNext() {
  1307. self.rawValue += 1
  1308. }
  1309. @inlinable
  1310. func next() -> Self {
  1311. var copy = self
  1312. copy.formNext()
  1313. return copy
  1314. }
  1315. }
  1316. @usableFromInline
  1317. struct _Subscriber: Sendable {
  1318. /// The ID of the subscriber.
  1319. @usableFromInline
  1320. var id: ID
  1321. /// The ID of the next element the subscriber will consume.
  1322. @usableFromInline
  1323. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1324. /// A continuation which which will be resumed when the next element becomes available.
  1325. @usableFromInline
  1326. var continuation: ConsumerContinuation?
  1327. @inlinable
  1328. init(
  1329. id: ID,
  1330. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1331. continuation: ConsumerContinuation? = nil
  1332. ) {
  1333. self.id = id
  1334. self.nextElementID = nextElementID
  1335. self.continuation = continuation
  1336. }
  1337. /// Returns and sets the continuation to `nil` if one exists.
  1338. ///
  1339. /// The next element ID is advanced if a contination exists.
  1340. ///
  1341. /// - Returns: The continuation, if one existed.
  1342. @inlinable
  1343. mutating func takeContinuation() -> ConsumerContinuation? {
  1344. guard let continuation = self.continuation else { return nil }
  1345. self.continuation = nil
  1346. self.nextElementID.formNext()
  1347. return continuation
  1348. }
  1349. }
  1350. @usableFromInline
  1351. var _subscribers: [_Subscriber]
  1352. @usableFromInline
  1353. var _nextSubscriberID: ID
  1354. @inlinable
  1355. init() {
  1356. self._subscribers = []
  1357. self._nextSubscriberID = ID()
  1358. }
  1359. /// Returns the number of subscribers.
  1360. @inlinable
  1361. var count: Int { self._subscribers.count }
  1362. /// Returns whether the collection is empty.
  1363. @inlinable
  1364. var isEmpty: Bool { self._subscribers.isEmpty }
  1365. /// Adds a new subscriber and returns its unique ID.
  1366. ///
  1367. /// - Returns: The ID of the new subscriber.
  1368. @inlinable
  1369. mutating func subscribe() -> ID {
  1370. let id = self._nextSubscriberID
  1371. self._nextSubscriberID.formNext()
  1372. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1373. return id
  1374. }
  1375. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1376. ///
  1377. /// - Parameters:
  1378. /// - id: The ID of the subscriber.
  1379. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1380. /// a boolean indicating whether the subscriber should be removed.
  1381. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1382. /// given ID.
  1383. @inlinable
  1384. mutating func withMutableElementID<R>(
  1385. forSubscriber id: ID,
  1386. _ body: (
  1387. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1388. ) -> (result: R, removeSubscription: Bool)
  1389. ) -> R? {
  1390. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1391. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1392. if removeSubscription {
  1393. self._subscribers.remove(at: index)
  1394. }
  1395. return result
  1396. }
  1397. /// Sets the continuation for the subscription with the given ID.
  1398. /// - Parameters:
  1399. /// - continuation: The continuation to set.
  1400. /// - id: The ID of the subscriber.
  1401. /// - Returns: A boolean indicating whether the continuation was set or not.
  1402. @inlinable
  1403. mutating func setContinuation(
  1404. _ continuation: ConsumerContinuation,
  1405. forSubscriber id: ID
  1406. ) -> Bool {
  1407. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1408. return false
  1409. }
  1410. assert(self._subscribers[index].continuation == nil)
  1411. self._subscribers[index].continuation = continuation
  1412. return true
  1413. }
  1414. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1415. @inlinable
  1416. func subscribers(
  1417. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1418. ) -> [ID] {
  1419. return self._subscribers.filter {
  1420. $0.nextElementID == id
  1421. }.map {
  1422. $0.id
  1423. }
  1424. }
  1425. /// Removes the subscriber with the given ID.
  1426. /// - Parameter id: The ID of the subscriber to remove.
  1427. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1428. /// associated with the subscriber.
  1429. @inlinable
  1430. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1431. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1432. return (false, nil)
  1433. }
  1434. let continuation = self._subscribers[index].continuation
  1435. self._subscribers.remove(at: index)
  1436. return (true, continuation)
  1437. }
  1438. /// Remove all subscribers in the given array of IDs.
  1439. @inlinable
  1440. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1441. self._subscribers.removeAll {
  1442. idsToRemove.contains($0.id)
  1443. }
  1444. }
  1445. /// Remove all subscribers and return their IDs.
  1446. @inlinable
  1447. mutating func removeAllSubscribers() -> [ID] {
  1448. let subscribers = self._subscribers.map { $0.id }
  1449. self._subscribers.removeAll()
  1450. return subscribers
  1451. }
  1452. /// Returns any continuations set on subscribers, unsetting at the same time.
  1453. @inlinable
  1454. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1455. // Avoid allocs if there's only one subscriber.
  1456. let count = self._countPendingContinuations()
  1457. let result: _OneOrMany<ConsumerContinuation>?
  1458. switch count {
  1459. case 0:
  1460. result = nil
  1461. case 1:
  1462. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1463. let continuation = self._subscribers[index].takeContinuation()!
  1464. result = .one(continuation)
  1465. default:
  1466. var continuations = [ConsumerContinuation]()
  1467. continuations.reserveCapacity(count)
  1468. for index in self._subscribers.indices {
  1469. if let continuation = self._subscribers[index].takeContinuation() {
  1470. continuations.append(continuation)
  1471. }
  1472. }
  1473. result = .many(continuations)
  1474. }
  1475. return result
  1476. }
  1477. /// Removes all subscribers which have continuations and return their continuations.
  1478. @inlinable
  1479. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
  1480. // Avoid allocs if there's only one subscriber.
  1481. let count = self._countPendingContinuations()
  1482. let result: _OneOrMany<ConsumerContinuation>
  1483. switch count {
  1484. case 0:
  1485. result = .many([])
  1486. case 1:
  1487. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1488. let subscription = self._subscribers.remove(at: index)
  1489. result = .one(subscription.continuation!)
  1490. default:
  1491. var continuations = [ConsumerContinuation]()
  1492. continuations.reserveCapacity(count)
  1493. var removable = [ID]()
  1494. removable.reserveCapacity(count)
  1495. for subscription in self._subscribers {
  1496. if let continuation = subscription.continuation {
  1497. continuations.append(continuation)
  1498. removable.append(subscription.id)
  1499. }
  1500. }
  1501. self._subscribers.removeAll {
  1502. removable.contains($0.id)
  1503. }
  1504. result = .many(continuations)
  1505. }
  1506. return result
  1507. }
  1508. @inlinable
  1509. func _countPendingContinuations() -> Int {
  1510. return self._subscribers.reduce(into: 0) { count, subscription in
  1511. if subscription.continuation != nil {
  1512. count += 1
  1513. }
  1514. }
  1515. }
  1516. }
  1517. }
  1518. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1519. extension _BroadcastSequenceStateMachine {
  1520. // TODO: tiny array
  1521. @usableFromInline
  1522. enum _OneOrMany<Value> {
  1523. case one(Value)
  1524. case many([Value])
  1525. }
  1526. }