BroadcastAsyncSequence.swift 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import DequeModule // should be @usableFromInline
  17. public import Synchronization // should be @usableFromInline
  18. /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently.
  19. ///
  20. /// The sequence is not a general-purpose broadcast sequence; it is tailored specifically for the
  21. /// requirements of gRPC Swift, in particular it is used to support retrying and hedging requests.
  22. ///
  23. /// In order to achieve this it maintains on an internal buffer of elements which is limited in
  24. /// size. Each iterator ("subscriber") maintains an offset into the elements which the sequence has
  25. /// produced over time. If a subscriber is consuming too slowly (and the buffer is full) then the
  26. /// sequence will cancel the subscriber's subscription to the stream, dropping the oldest element
  27. /// in the buffer to make space for more elements. If the buffer is full and all subscribers are
  28. /// equally slow then all producers are suspended until the buffer drops to a reasonable size.
  29. ///
  30. /// The expectation is that the number of subscribers will be low; for retries there will be at most
  31. /// one subscriber at a time, for hedging there may be at most five subscribers at a time.
  32. @usableFromInline
  33. @available(gRPCSwift 2.0, *)
  34. struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
  35. @usableFromInline
  36. let _storage: _BroadcastSequenceStorage<Element>
  37. @inlinable
  38. init(_storage: _BroadcastSequenceStorage<Element>) {
  39. self._storage = _storage
  40. }
  41. /// Make a new stream and continuation.
  42. ///
  43. /// - Parameters:
  44. /// - elementType: The type of element this sequence produces.
  45. /// - bufferSize: The number of elements this sequence may store.
  46. /// - Returns: A stream and continuation.
  47. @inlinable
  48. static func makeStream(
  49. of elementType: Element.Type = Element.self,
  50. bufferSize: Int
  51. ) -> (stream: Self, continuation: Self.Source) {
  52. let storage = _BroadcastSequenceStorage<Element>(bufferSize: bufferSize)
  53. let stream = Self(_storage: storage)
  54. let continuation = Self.Source(_storage: storage)
  55. return (stream, continuation)
  56. }
  57. @inlinable
  58. func makeAsyncIterator() -> AsyncIterator {
  59. let id = self._storage.subscribe()
  60. return AsyncIterator(_storage: _storage, id: id)
  61. }
  62. /// Returns true if it is known to be safe for the next subscriber to subscribe and successfully
  63. /// consume elements.
  64. ///
  65. /// This function can return `false` if there are active subscribers or the internal buffer no
  66. /// longer contains the first element in the sequence.
  67. @inlinable
  68. var isKnownSafeForNextSubscriber: Bool {
  69. self._storage.isKnownSafeForNextSubscriber
  70. }
  71. /// Invalidates all active subscribers.
  72. ///
  73. /// Any active subscriber will receive an error the next time they attempt to consume an element.
  74. @inlinable
  75. func invalidateAllSubscriptions() {
  76. self._storage.invalidateAllSubscriptions()
  77. }
  78. }
  79. // MARK: - AsyncIterator
  80. @available(gRPCSwift 2.0, *)
  81. extension BroadcastAsyncSequence {
  82. @usableFromInline
  83. struct AsyncIterator: AsyncIteratorProtocol {
  84. @usableFromInline
  85. let _storage: _BroadcastSequenceStorage<Element>
  86. @usableFromInline
  87. let _subscriberID: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  88. @inlinable
  89. init(
  90. _storage: _BroadcastSequenceStorage<Element>,
  91. id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  92. ) {
  93. self._storage = _storage
  94. self._subscriberID = id
  95. }
  96. @inlinable
  97. mutating func next() async throws -> Element? {
  98. try await self._storage.nextElement(forSubscriber: self._subscriberID)
  99. }
  100. }
  101. }
  102. // MARK: - Continuation
  103. @available(gRPCSwift 2.0, *)
  104. extension BroadcastAsyncSequence {
  105. @usableFromInline
  106. struct Source: Sendable {
  107. @usableFromInline
  108. let _storage: _BroadcastSequenceStorage<Element>
  109. @usableFromInline
  110. init(_storage: _BroadcastSequenceStorage<Element>) {
  111. self._storage = _storage
  112. }
  113. @inlinable
  114. func write(_ element: Element) async throws {
  115. try await self._storage.yield(element)
  116. }
  117. @inlinable
  118. func finish(with result: Result<Void, any Error>) {
  119. self._storage.finish(result)
  120. }
  121. @inlinable
  122. func finish() {
  123. self.finish(with: .success(()))
  124. }
  125. @inlinable
  126. func finish(throwing error: any Error) {
  127. self.finish(with: .failure(error))
  128. }
  129. }
  130. }
  131. @usableFromInline
  132. @available(gRPCSwift 2.0, *)
  133. enum BroadcastAsyncSequenceError: Error {
  134. /// The consumer was too slow.
  135. case consumingTooSlow
  136. /// The producer has already finished.
  137. case productionAlreadyFinished
  138. }
  139. // MARK: - Storage
  140. @usableFromInline
  141. @available(gRPCSwift 2.0, *)
  142. final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
  143. @usableFromInline
  144. let _state: Mutex<_BroadcastSequenceStateMachine<Element>>
  145. @inlinable
  146. init(bufferSize: Int) {
  147. self._state = Mutex(_BroadcastSequenceStateMachine(bufferSize: bufferSize))
  148. }
  149. deinit {
  150. let onDrop = self._state.withLock { state in
  151. state.dropResources()
  152. }
  153. switch onDrop {
  154. case .none:
  155. ()
  156. case .resume(let consumers, let producers):
  157. consumers.resume()
  158. producers.resume()
  159. }
  160. }
  161. // MARK - Producer
  162. /// Yield a single element to the stream. Suspends if the stream's buffer is full.
  163. ///
  164. /// - Parameter element: The element to write.
  165. @inlinable
  166. func yield(_ element: Element) async throws {
  167. let onYield = self._state.withLock { state in state.yield(element) }
  168. switch onYield {
  169. case .none:
  170. ()
  171. case .resume(let continuations):
  172. continuations.resume()
  173. case .suspend(let token):
  174. try await withTaskCancellationHandler {
  175. try await withCheckedThrowingContinuation { continuation in
  176. let onProduceMore = self._state.withLock { state in
  177. state.waitToProduceMore(continuation: continuation, token: token)
  178. }
  179. switch onProduceMore {
  180. case .resume(let continuation, let result):
  181. continuation.resume(with: result)
  182. case .none:
  183. ()
  184. }
  185. }
  186. } onCancel: {
  187. let onCancel = self._state.withLock { state in
  188. state.cancelProducer(withToken: token)
  189. }
  190. switch onCancel {
  191. case .resume(let continuation, let result):
  192. continuation.resume(with: result)
  193. case .none:
  194. ()
  195. }
  196. }
  197. case .throwAlreadyFinished:
  198. throw BroadcastAsyncSequenceError.productionAlreadyFinished
  199. }
  200. }
  201. /// Indicate that no more values will be produced.
  202. ///
  203. /// - Parameter result: Whether the stream is finishing cleanly or because of an error.
  204. @inlinable
  205. func finish(_ result: Result<Void, any Error>) {
  206. let action = self._state.withLock { state in state.finish(result: result) }
  207. switch action {
  208. case .none:
  209. ()
  210. case .resume(let subscribers, let producers):
  211. subscribers.resume()
  212. producers.resume()
  213. }
  214. }
  215. // MARK: - Consumer
  216. /// Create a subscription to the stream.
  217. ///
  218. /// - Returns: Returns a unique subscription ID.
  219. @inlinable
  220. func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  221. self._state.withLock { $0.subscribe() }
  222. }
  223. /// Returns the next element for the given subscriber, if it is available.
  224. ///
  225. /// - Parameter id: The ID of the subscriber requesting the element.
  226. /// - Returns: The next element or `nil` if the stream has been terminated.
  227. @inlinable
  228. func nextElement(
  229. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  230. ) async throws -> Element? {
  231. return try await withTaskCancellationHandler {
  232. let onNext = self._state.withLock {
  233. $0.nextElement(forSubscriber: id)
  234. }
  235. switch onNext {
  236. case .return(let returnAndProduceMore):
  237. returnAndProduceMore.producers.resume()
  238. return try returnAndProduceMore.nextResult.get()
  239. case .suspend:
  240. return try await withCheckedThrowingContinuation { continuation in
  241. let onSetContinuation = self._state.withLock { state in
  242. state.setContinuation(continuation, forSubscription: id)
  243. }
  244. switch onSetContinuation {
  245. case .resume(let continuation, let result, let producerContinuations):
  246. continuation.resume(with: result)
  247. producerContinuations?.resume()
  248. case .none:
  249. ()
  250. }
  251. }
  252. }
  253. } onCancel: {
  254. let onCancel = self._state.withLock { state in
  255. state.cancelSubscription(withID: id)
  256. }
  257. switch onCancel {
  258. case .resume(let continuation, let result):
  259. continuation.resume(with: result)
  260. case .none:
  261. ()
  262. }
  263. }
  264. }
  265. /// Returns true if it's guaranteed that the next subscriber may join and safely begin consuming
  266. /// elements.
  267. @inlinable
  268. var isKnownSafeForNextSubscriber: Bool {
  269. self._state.withLock { state in
  270. state.nextSubscriptionIsValid
  271. }
  272. }
  273. /// Invalidates all active subscriptions.
  274. @inlinable
  275. func invalidateAllSubscriptions() {
  276. let action = self._state.withLock { state in
  277. state.invalidateAllSubscriptions()
  278. }
  279. switch action {
  280. case .resume(let continuations):
  281. continuations.resume()
  282. case .none:
  283. ()
  284. }
  285. }
  286. }
  287. // MARK: - State machine
  288. @usableFromInline
  289. @available(gRPCSwift 2.0, *)
  290. struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
  291. @usableFromInline
  292. typealias ConsumerContinuation = CheckedContinuation<Element?, any Error>
  293. @usableFromInline
  294. typealias ProducerContinuation = CheckedContinuation<Void, any Error>
  295. @usableFromInline
  296. struct ConsumerContinuations {
  297. @usableFromInline
  298. var continuations: _OneOrMany<ConsumerContinuation>
  299. @usableFromInline
  300. var result: Result<Element?, any Error>
  301. @inlinable
  302. init(continuations: _OneOrMany<ConsumerContinuation>, result: Result<Element?, any Error>) {
  303. self.continuations = continuations
  304. self.result = result
  305. }
  306. @inlinable
  307. func resume() {
  308. switch self.continuations {
  309. case .one(let continuation):
  310. continuation.resume(with: self.result)
  311. case .many(let continuations):
  312. for continuation in continuations {
  313. continuation.resume(with: self.result)
  314. }
  315. }
  316. }
  317. }
  318. @usableFromInline
  319. struct ProducerContinuations {
  320. @usableFromInline
  321. var continuations: [ProducerContinuation]
  322. @usableFromInline
  323. var result: Result<Void, any Error>
  324. @inlinable
  325. init(continuations: [ProducerContinuation], result: Result<Void, any Error>) {
  326. self.continuations = continuations
  327. self.result = result
  328. }
  329. @inlinable
  330. func resume() {
  331. for continuation in self.continuations {
  332. continuation.resume(with: self.result)
  333. }
  334. }
  335. }
  336. @usableFromInline
  337. enum State: Sendable {
  338. /// No subscribers and no elements have been produced.
  339. case initial(Initial)
  340. /// Subscribers exist but no elements have been produced.
  341. case subscribed(Subscribed)
  342. /// Elements have been produced, there may or may not be subscribers.
  343. case streaming(Streaming)
  344. /// No more elements will be produced. There may or may not been subscribers.
  345. case finished(Finished)
  346. /// Temporary state to avoid CoWs.
  347. case _modifying
  348. @inlinable
  349. init(bufferSize: Int) {
  350. self = .initial(Initial(bufferSize: bufferSize))
  351. }
  352. @usableFromInline
  353. struct Initial: Sendable {
  354. @usableFromInline
  355. let bufferSize: Int
  356. @inlinable
  357. init(bufferSize: Int) {
  358. self.bufferSize = bufferSize
  359. }
  360. }
  361. @usableFromInline
  362. struct Subscribed: Sendable {
  363. /// Active subscriptions.
  364. @usableFromInline
  365. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  366. /// Subscriptions to fail and remove when they next request an element.
  367. @usableFromInline
  368. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  369. /// The maximum size of the element buffer.
  370. @usableFromInline
  371. let bufferSize: Int
  372. @inlinable
  373. init(from state: Initial) {
  374. self.subscriptions = Subscriptions()
  375. self.subscriptionsToDrop = []
  376. self.bufferSize = state.bufferSize
  377. }
  378. @inlinable
  379. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  380. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  381. return .resume(
  382. .init(continuations: continuations, result: result.map { nil }),
  383. .init(continuations: [], result: .success(()))
  384. )
  385. }
  386. @inlinable
  387. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  388. // Not streaming, so suspend or remove if the subscription should be dropped.
  389. guard let index = self.subscriptionsToDrop.firstIndex(of: id) else {
  390. return .suspend
  391. }
  392. self.subscriptionsToDrop.remove(at: index)
  393. return .return(.init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow)))
  394. }
  395. @inlinable
  396. mutating func cancel(
  397. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  398. ) -> OnCancelSubscription {
  399. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  400. if let continuation = continuation {
  401. return .resume(continuation, .failure(CancellationError()))
  402. } else {
  403. return .none
  404. }
  405. }
  406. @inlinable
  407. mutating func setContinuation(
  408. _ continuation: ConsumerContinuation,
  409. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  410. ) -> OnSetContinuation {
  411. // 'next(id)' must be checked first: an element might've been provided between the lock
  412. // being dropped and a continuation being created and the lock being acquired again.
  413. switch self.next(id) {
  414. case .return(let resultAndProducers):
  415. return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
  416. case .suspend:
  417. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  418. return .none
  419. } else {
  420. return .resume(continuation, .failure(CancellationError()), nil)
  421. }
  422. }
  423. }
  424. @inlinable
  425. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  426. self.subscriptions.subscribe()
  427. }
  428. @inlinable
  429. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  430. // Remove subscriptions with continuations, they need to be failed.
  431. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  432. let consumerContinuations = ConsumerContinuations(
  433. continuations: continuations,
  434. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  435. )
  436. // Remove any others to be failed when they next call 'next'.
  437. let ids = self.subscriptions.removeAllSubscribers()
  438. self.subscriptionsToDrop.append(contentsOf: ids)
  439. return .resume(consumerContinuations)
  440. }
  441. @inlinable
  442. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  443. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  444. let consumerContinuations = ConsumerContinuations(
  445. continuations: continuations,
  446. result: .failure(error)
  447. )
  448. let producerContinuations = ProducerContinuations(continuations: [], result: .success(()))
  449. return .resume(consumerContinuations, producerContinuations)
  450. }
  451. }
  452. @usableFromInline
  453. struct Streaming: Sendable {
  454. /// A deque of elements tagged with IDs.
  455. @usableFromInline
  456. var elements: Elements
  457. /// The maximum size of the element buffer.
  458. @usableFromInline
  459. let bufferSize: Int
  460. // TODO: (optimisation) one-or-many Deque to avoid allocations in the case of a single writer
  461. /// Producers which have been suspended.
  462. @usableFromInline
  463. var producers: [(ProducerContinuation, Int)]
  464. /// The IDs of producers which have been cancelled.
  465. @usableFromInline
  466. var cancelledProducers: [Int]
  467. /// The next token for a producer.
  468. @usableFromInline
  469. var producerToken: Int
  470. /// Active subscriptions.
  471. @usableFromInline
  472. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  473. /// Subscriptions to fail and remove when they next request an element.
  474. @usableFromInline
  475. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  476. @inlinable
  477. init(from state: Initial) {
  478. self.elements = Elements()
  479. self.producers = []
  480. self.producerToken = 0
  481. self.cancelledProducers = []
  482. self.subscriptions = Subscriptions()
  483. self.subscriptionsToDrop = []
  484. self.bufferSize = state.bufferSize
  485. }
  486. @inlinable
  487. init(from state: Subscribed) {
  488. self.elements = Elements()
  489. self.producers = []
  490. self.producerToken = 0
  491. self.cancelledProducers = []
  492. self.subscriptions = state.subscriptions
  493. self.subscriptionsToDrop = state.subscriptionsToDrop
  494. self.bufferSize = state.bufferSize
  495. }
  496. @inlinable
  497. mutating func append(_ element: Element) -> OnYield {
  498. let onYield: OnYield
  499. self.elements.append(element)
  500. if self.elements.count >= self.bufferSize, let lowestID = self.elements.lowestID {
  501. // If the buffer is too large then:
  502. // - if all subscribers are equally slow suspend the producer
  503. // - if some subscribers are slow then remove them and the oldest value
  504. // - if no subscribers are slow then remove the oldest value
  505. let slowConsumers = self.subscriptions.subscribers(withElementID: lowestID)
  506. switch slowConsumers.count {
  507. case 0:
  508. if self.subscriptions.isEmpty {
  509. // No consumers.
  510. let token = self.producerToken
  511. self.producerToken += 1
  512. onYield = .suspend(token)
  513. } else {
  514. // No consumers are slow, some subscribers exist, a subset of which are waiting
  515. // for a value. Drop the oldest value and resume the fastest consumers.
  516. self.elements.removeFirst()
  517. let continuations = self.subscriptions.takeContinuations().map {
  518. ConsumerContinuations(continuations: $0, result: .success(element))
  519. }
  520. if let continuations = continuations {
  521. onYield = .resume(continuations)
  522. } else {
  523. onYield = .none
  524. }
  525. }
  526. case self.subscriptions.count:
  527. // All consumers are slow; stop the production of new value.
  528. let token = self.producerToken
  529. self.producerToken += 1
  530. onYield = .suspend(token)
  531. default:
  532. // Some consumers are slow, but not all. Remove the slow consumers and drop the
  533. // oldest value.
  534. self.elements.removeFirst()
  535. self.subscriptions.removeAllSubscribers(in: slowConsumers)
  536. self.subscriptionsToDrop.append(contentsOf: slowConsumers)
  537. onYield = .none
  538. }
  539. } else {
  540. // The buffer isn't full. Take the continuations of subscriptions which have them; they
  541. // must be waiting for the value we just appended.
  542. let continuations = self.subscriptions.takeContinuations().map {
  543. ConsumerContinuations(continuations: $0, result: .success(element))
  544. }
  545. if let continuations = continuations {
  546. onYield = .resume(continuations)
  547. } else {
  548. onYield = .none
  549. }
  550. }
  551. return onYield
  552. }
  553. @inlinable
  554. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  555. let onNext: OnNext
  556. // 1. Lookup the subscriber by ID to get their next offset
  557. // 2. If the element exists, update the element pointer and return the element
  558. // 3. Else if the ID is in the future, wait
  559. // 4. Else the ID is in the past, fail and remove the subscription.
  560. // Lookup the subscriber with the given ID.
  561. let onNextForSubscription = self.subscriptions.withMutableElementID(
  562. forSubscriber: id
  563. ) { elementID -> (OnNext, Bool) in
  564. let onNext: OnNext
  565. let removeSubscription: Bool
  566. // Subscriber exists; do we have the element it requires next?
  567. switch self.elements.lookupElement(withID: elementID) {
  568. case .found(let element):
  569. // Element exists in the buffer. Advance our element ID.
  570. elementID.formNext()
  571. onNext = .return(.init(nextResult: .success(element)))
  572. removeSubscription = false
  573. case .maybeAvailableLater:
  574. // Element may exist in the future.
  575. onNext = .suspend
  576. removeSubscription = false
  577. case .noLongerAvailable:
  578. // Element existed in the past but was dropped from the buffer.
  579. onNext = .return(
  580. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  581. )
  582. removeSubscription = true
  583. }
  584. return (onNext, removeSubscription)
  585. }
  586. switch onNextForSubscription {
  587. case .return(var resultAndResume):
  588. // The producer only suspends when all consumers are equally slow or there are no
  589. // consumers at all. The latter can't be true: this function can only be called by a
  590. // consumer. The former can't be true anymore because consumption isn't concurrent
  591. // so this consumer must be faster than the others so let the producer resume.
  592. //
  593. // Note that this doesn't mean that all other consumers will be dropped: they can continue
  594. // to produce until the producer provides more values.
  595. resultAndResume.producers = ProducerContinuations(
  596. continuations: self.producers.map { $0.0 },
  597. result: .success(())
  598. )
  599. self.producers.removeAll()
  600. onNext = .return(resultAndResume)
  601. case .suspend:
  602. onNext = .suspend
  603. case .none:
  604. // No subscription found, must have been dropped or already finished.
  605. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  606. self.subscriptionsToDrop.remove(at: index)
  607. onNext = .return(
  608. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  609. )
  610. } else {
  611. // Unknown subscriber, i.e. already finished.
  612. onNext = .return(.init(nextResult: .success(nil)))
  613. }
  614. }
  615. return onNext
  616. }
  617. @inlinable
  618. mutating func setContinuation(
  619. _ continuation: ConsumerContinuation,
  620. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  621. ) -> OnSetContinuation {
  622. // 'next(id)' must be checked first: an element might've been provided between the lock
  623. // being dropped and a continuation being created and the lock being acquired again.
  624. switch self.next(id) {
  625. case .return(let resultAndProducers):
  626. return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers)
  627. case .suspend:
  628. if self.subscriptions.setContinuation(continuation, forSubscriber: id) {
  629. return .none
  630. } else {
  631. return .resume(continuation, .failure(CancellationError()), nil)
  632. }
  633. }
  634. }
  635. @inlinable
  636. mutating func cancel(
  637. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  638. ) -> OnCancelSubscription {
  639. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  640. if let continuation = continuation {
  641. return .resume(continuation, .failure(CancellationError()))
  642. } else {
  643. return .none
  644. }
  645. }
  646. @inlinable
  647. mutating func waitToProduceMore(
  648. _ continuation: ProducerContinuation,
  649. token: Int
  650. ) -> OnWaitToProduceMore {
  651. let onWaitToProduceMore: OnWaitToProduceMore
  652. if self.elements.count < self.bufferSize {
  653. // Buffer has free space, no need to suspend.
  654. onWaitToProduceMore = .resume(continuation, .success(()))
  655. } else if let index = self.cancelledProducers.firstIndex(of: token) {
  656. // Producer was cancelled before suspending.
  657. self.cancelledProducers.remove(at: index)
  658. onWaitToProduceMore = .resume(continuation, .failure(CancellationError()))
  659. } else {
  660. // Store the continuation to resume later.
  661. self.producers.append((continuation, token))
  662. onWaitToProduceMore = .none
  663. }
  664. return onWaitToProduceMore
  665. }
  666. @inlinable
  667. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  668. guard let index = self.producers.firstIndex(where: { $0.1 == token }) else {
  669. self.cancelledProducers.append(token)
  670. return .none
  671. }
  672. let (continuation, _) = self.producers.remove(at: index)
  673. return .resume(continuation, .failure(CancellationError()))
  674. }
  675. @inlinable
  676. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  677. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  678. let producers = self.producers.map { $0.0 }
  679. self.producers.removeAll()
  680. return .resume(
  681. .init(continuations: continuations, result: result.map { nil }),
  682. .init(continuations: producers, result: .success(()))
  683. )
  684. }
  685. @inlinable
  686. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  687. self.subscriptions.subscribe()
  688. }
  689. @inlinable
  690. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  691. // Remove subscriptions with continuations, they need to be failed.
  692. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  693. let consumerContinuations = ConsumerContinuations(
  694. continuations: continuations,
  695. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  696. )
  697. // Remove any others to be failed when they next call 'next'.
  698. let ids = self.subscriptions.removeAllSubscribers()
  699. self.subscriptionsToDrop.append(contentsOf: ids)
  700. return .resume(consumerContinuations)
  701. }
  702. @inlinable
  703. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  704. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  705. let consumerContinuations = ConsumerContinuations(
  706. continuations: continuations,
  707. result: .failure(error)
  708. )
  709. let producers = ProducerContinuations(
  710. continuations: self.producers.map { $0.0 },
  711. result: .failure(error)
  712. )
  713. self.producers.removeAll()
  714. return .resume(consumerContinuations, producers)
  715. }
  716. @inlinable
  717. func nextSubscriptionIsValid() -> Bool {
  718. return self.subscriptions.isEmpty && self.elements.lowestID == .initial
  719. }
  720. }
  721. @usableFromInline
  722. struct Finished: Sendable {
  723. /// A deque of elements tagged with IDs.
  724. @usableFromInline
  725. var elements: Elements
  726. /// Active subscriptions.
  727. @usableFromInline
  728. var subscriptions: _BroadcastSequenceStateMachine<Element>.Subscriptions
  729. /// Subscriptions to fail and remove when they next request an element.
  730. @usableFromInline
  731. var subscriptionsToDrop: [_BroadcastSequenceStateMachine<Element>.Subscriptions.ID]
  732. /// The terminating result of the sequence.
  733. @usableFromInline
  734. let result: Result<Void, any Error>
  735. @inlinable
  736. init(from state: Initial, result: Result<Void, any Error>) {
  737. self.elements = Elements()
  738. self.subscriptions = Subscriptions()
  739. self.subscriptionsToDrop = []
  740. self.result = result
  741. }
  742. @inlinable
  743. init(from state: Subscribed, result: Result<Void, any Error>) {
  744. self.elements = Elements()
  745. self.subscriptions = state.subscriptions
  746. self.subscriptionsToDrop = []
  747. self.result = result
  748. }
  749. @inlinable
  750. init(from state: Streaming, result: Result<Void, any Error>) {
  751. self.elements = state.elements
  752. self.subscriptions = state.subscriptions
  753. self.subscriptionsToDrop = state.subscriptionsToDrop
  754. self.result = result
  755. }
  756. @inlinable
  757. mutating func next(_ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID) -> OnNext {
  758. let onNext: OnNext
  759. let onNextForSubscription = self.subscriptions.withMutableElementID(
  760. forSubscriber: id
  761. ) { elementID -> (OnNext, Bool) in
  762. let onNext: OnNext
  763. let removeSubscription: Bool
  764. switch self.elements.lookupElement(withID: elementID) {
  765. case .found(let element):
  766. elementID.formNext()
  767. onNext = .return(.init(nextResult: .success(element)))
  768. removeSubscription = false
  769. case .maybeAvailableLater:
  770. onNext = .return(.init(nextResult: self.result.map { nil }))
  771. removeSubscription = true
  772. case .noLongerAvailable:
  773. onNext = .return(
  774. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  775. )
  776. removeSubscription = true
  777. }
  778. return (onNext, removeSubscription)
  779. }
  780. switch onNextForSubscription {
  781. case .return(let result):
  782. onNext = .return(result)
  783. case .none:
  784. // No subscriber with the given ID, it was likely dropped previously.
  785. if let index = self.subscriptionsToDrop.firstIndex(where: { $0 == id }) {
  786. self.subscriptionsToDrop.remove(at: index)
  787. onNext = .return(
  788. .init(nextResult: .failure(BroadcastAsyncSequenceError.consumingTooSlow))
  789. )
  790. } else {
  791. // Unknown subscriber, i.e. already finished.
  792. onNext = .return(.init(nextResult: .success(nil)))
  793. }
  794. case .suspend:
  795. fatalError("Internal inconsistency")
  796. }
  797. return onNext
  798. }
  799. @inlinable
  800. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  801. self.subscriptions.subscribe()
  802. }
  803. @inlinable
  804. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  805. // Remove subscriptions with continuations, they need to be failed.
  806. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  807. let consumerContinuations = ConsumerContinuations(
  808. continuations: continuations,
  809. result: .failure(BroadcastAsyncSequenceError.consumingTooSlow)
  810. )
  811. // Remove any others to be failed when they next call 'next'.
  812. let ids = self.subscriptions.removeAllSubscribers()
  813. self.subscriptionsToDrop.append(contentsOf: ids)
  814. return .resume(consumerContinuations)
  815. }
  816. @inlinable
  817. mutating func dropResources(error: BroadcastAsyncSequenceError) -> OnDropResources {
  818. let continuations = self.subscriptions.removeSubscribersWithContinuations()
  819. let consumerContinuations = ConsumerContinuations(
  820. continuations: continuations,
  821. result: .failure(error)
  822. )
  823. let producers = ProducerContinuations(continuations: [], result: .failure(error))
  824. return .resume(consumerContinuations, producers)
  825. }
  826. @inlinable
  827. func nextSubscriptionIsValid() -> Bool {
  828. self.elements.lowestID == .initial
  829. }
  830. @inlinable
  831. mutating func cancel(
  832. _ id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  833. ) -> OnCancelSubscription {
  834. let (_, continuation) = self.subscriptions.removeSubscriber(withID: id)
  835. if let continuation = continuation {
  836. return .resume(continuation, .failure(CancellationError()))
  837. } else {
  838. return .none
  839. }
  840. }
  841. }
  842. }
  843. @usableFromInline
  844. var _state: State
  845. @inlinable
  846. init(bufferSize: Int) {
  847. self._state = State(bufferSize: bufferSize)
  848. }
  849. @inlinable
  850. var nextSubscriptionIsValid: Bool {
  851. let isValid: Bool
  852. switch self._state {
  853. case .initial:
  854. isValid = true
  855. case .subscribed:
  856. isValid = true
  857. case .streaming(let state):
  858. isValid = state.nextSubscriptionIsValid()
  859. case .finished(let state):
  860. isValid = state.nextSubscriptionIsValid()
  861. case ._modifying:
  862. fatalError("Internal inconsistency")
  863. }
  864. return isValid
  865. }
  866. @usableFromInline
  867. enum OnInvalidateAllSubscriptions {
  868. case resume(ConsumerContinuations)
  869. case none
  870. }
  871. @inlinable
  872. mutating func invalidateAllSubscriptions() -> OnInvalidateAllSubscriptions {
  873. let onCancel: OnInvalidateAllSubscriptions
  874. switch self._state {
  875. case .initial:
  876. onCancel = .none
  877. case .subscribed(var state):
  878. self._state = ._modifying
  879. onCancel = state.invalidateAllSubscriptions()
  880. self._state = .subscribed(state)
  881. case .streaming(var state):
  882. self._state = ._modifying
  883. onCancel = state.invalidateAllSubscriptions()
  884. self._state = .streaming(state)
  885. case .finished(var state):
  886. self._state = ._modifying
  887. onCancel = state.invalidateAllSubscriptions()
  888. self._state = .finished(state)
  889. case ._modifying:
  890. fatalError("Internal inconsistency")
  891. }
  892. return onCancel
  893. }
  894. @usableFromInline
  895. enum OnYield {
  896. case none
  897. case suspend(Int)
  898. case resume(ConsumerContinuations)
  899. case throwAlreadyFinished
  900. }
  901. @inlinable
  902. mutating func yield(_ element: Element) -> OnYield {
  903. let onYield: OnYield
  904. switch self._state {
  905. case .initial(let state):
  906. self._state = ._modifying
  907. // Move to streaming.
  908. var state = State.Streaming(from: state)
  909. onYield = state.append(element)
  910. self._state = .streaming(state)
  911. case .subscribed(let state):
  912. self._state = ._modifying
  913. var state = State.Streaming(from: state)
  914. onYield = state.append(element)
  915. self._state = .streaming(state)
  916. case .streaming(var state):
  917. self._state = ._modifying
  918. onYield = state.append(element)
  919. self._state = .streaming(state)
  920. case .finished:
  921. onYield = .throwAlreadyFinished
  922. case ._modifying:
  923. fatalError("Internal inconsistency")
  924. }
  925. return onYield
  926. }
  927. @usableFromInline
  928. enum OnFinish {
  929. case none
  930. case resume(ConsumerContinuations, ProducerContinuations)
  931. }
  932. @inlinable
  933. mutating func finish(result: Result<Void, any Error>) -> OnFinish {
  934. let onFinish: OnFinish
  935. switch self._state {
  936. case .initial(let state):
  937. self._state = ._modifying
  938. let state = State.Finished(from: state, result: result)
  939. self._state = .finished(state)
  940. onFinish = .none
  941. case .subscribed(var state):
  942. self._state = ._modifying
  943. onFinish = state.finish(result: result)
  944. self._state = .finished(State.Finished(from: state, result: result))
  945. case .streaming(var state):
  946. self._state = ._modifying
  947. onFinish = state.finish(result: result)
  948. self._state = .finished(State.Finished(from: state, result: result))
  949. case .finished:
  950. onFinish = .none
  951. case ._modifying:
  952. fatalError("Internal inconsistency")
  953. }
  954. return onFinish
  955. }
  956. @usableFromInline
  957. enum OnNext {
  958. @usableFromInline
  959. struct ReturnAndResumeProducers {
  960. @usableFromInline
  961. var nextResult: Result<Element?, any Error>
  962. @usableFromInline
  963. var producers: ProducerContinuations
  964. @inlinable
  965. init(
  966. nextResult: Result<Element?, any Error>,
  967. producers: [ProducerContinuation] = [],
  968. producerResult: Result<Void, any Error> = .success(())
  969. ) {
  970. self.nextResult = nextResult
  971. self.producers = ProducerContinuations(continuations: producers, result: producerResult)
  972. }
  973. }
  974. case `return`(ReturnAndResumeProducers)
  975. case suspend
  976. }
  977. @inlinable
  978. mutating func nextElement(
  979. forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  980. ) -> OnNext {
  981. let onNext: OnNext
  982. switch self._state {
  983. case .initial:
  984. // No subscribers so demand isn't possible.
  985. fatalError("Internal inconsistency")
  986. case .subscribed(var state):
  987. self._state = ._modifying
  988. onNext = state.next(id)
  989. self._state = .subscribed(state)
  990. case .streaming(var state):
  991. self._state = ._modifying
  992. onNext = state.next(id)
  993. self._state = .streaming(state)
  994. case .finished(var state):
  995. self._state = ._modifying
  996. onNext = state.next(id)
  997. self._state = .finished(state)
  998. case ._modifying:
  999. fatalError("Internal inconsistency")
  1000. }
  1001. return onNext
  1002. }
  1003. @usableFromInline
  1004. enum OnSetContinuation {
  1005. case none
  1006. case resume(ConsumerContinuation, Result<Element?, any Error>, ProducerContinuations?)
  1007. }
  1008. @inlinable
  1009. mutating func setContinuation(
  1010. _ continuation: ConsumerContinuation,
  1011. forSubscription id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1012. ) -> OnSetContinuation {
  1013. let onSetContinuation: OnSetContinuation
  1014. switch self._state {
  1015. case .initial:
  1016. // No subscribers so demand isn't possible.
  1017. fatalError("Internal inconsistency")
  1018. case .subscribed(var state):
  1019. self._state = ._modifying
  1020. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1021. self._state = .subscribed(state)
  1022. case .streaming(var state):
  1023. self._state = ._modifying
  1024. onSetContinuation = state.setContinuation(continuation, forSubscription: id)
  1025. self._state = .streaming(state)
  1026. case .finished(let state):
  1027. onSetContinuation = .resume(continuation, state.result.map { _ in nil }, nil)
  1028. case ._modifying:
  1029. // All values must have been produced, nothing to wait for.
  1030. fatalError("Internal inconsistency")
  1031. }
  1032. return onSetContinuation
  1033. }
  1034. @usableFromInline
  1035. enum OnCancelSubscription {
  1036. case none
  1037. case resume(ConsumerContinuation, Result<Element?, any Error>)
  1038. }
  1039. @inlinable
  1040. mutating func cancelSubscription(
  1041. withID id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1042. ) -> OnCancelSubscription {
  1043. let onCancel: OnCancelSubscription
  1044. switch self._state {
  1045. case .initial:
  1046. // No subscribers so demand isn't possible.
  1047. fatalError("Internal inconsistency")
  1048. case .subscribed(var state):
  1049. self._state = ._modifying
  1050. onCancel = state.cancel(id)
  1051. self._state = .subscribed(state)
  1052. case .streaming(var state):
  1053. self._state = ._modifying
  1054. onCancel = state.cancel(id)
  1055. self._state = .streaming(state)
  1056. case .finished(var state):
  1057. self._state = ._modifying
  1058. onCancel = state.cancel(id)
  1059. self._state = .finished(state)
  1060. case ._modifying:
  1061. // All values must have been produced, nothing to wait for.
  1062. fatalError("Internal inconsistency")
  1063. }
  1064. return onCancel
  1065. }
  1066. @usableFromInline
  1067. enum OnSubscribe {
  1068. case subscribed(_BroadcastSequenceStateMachine<Element>.Subscriptions.ID)
  1069. }
  1070. @inlinable
  1071. mutating func subscribe() -> _BroadcastSequenceStateMachine<Element>.Subscriptions.ID {
  1072. let id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
  1073. switch self._state {
  1074. case .initial(let state):
  1075. self._state = ._modifying
  1076. var state = State.Subscribed(from: state)
  1077. id = state.subscribe()
  1078. self._state = .subscribed(state)
  1079. case .subscribed(var state):
  1080. self._state = ._modifying
  1081. id = state.subscribe()
  1082. self._state = .subscribed(state)
  1083. case .streaming(var state):
  1084. self._state = ._modifying
  1085. id = state.subscribe()
  1086. self._state = .streaming(state)
  1087. case .finished(var state):
  1088. self._state = ._modifying
  1089. id = state.subscribe()
  1090. self._state = .finished(state)
  1091. case ._modifying:
  1092. fatalError("Internal inconsistency")
  1093. }
  1094. return id
  1095. }
  1096. @usableFromInline
  1097. enum OnWaitToProduceMore {
  1098. case none
  1099. case resume(ProducerContinuation, Result<Void, any Error>)
  1100. }
  1101. @inlinable
  1102. mutating func waitToProduceMore(
  1103. continuation: ProducerContinuation,
  1104. token: Int
  1105. ) -> OnWaitToProduceMore {
  1106. let onWaitToProduceMore: OnWaitToProduceMore
  1107. switch self._state {
  1108. case .initial, .subscribed:
  1109. // Nothing produced yet, so no reason have to wait to produce.
  1110. fatalError("Internal inconsistency")
  1111. case .streaming(var state):
  1112. self._state = ._modifying
  1113. onWaitToProduceMore = state.waitToProduceMore(continuation, token: token)
  1114. self._state = .streaming(state)
  1115. case .finished:
  1116. onWaitToProduceMore = .resume(continuation, .success(()))
  1117. case ._modifying:
  1118. fatalError("Internal inconsistency")
  1119. }
  1120. return onWaitToProduceMore
  1121. }
  1122. @usableFromInline
  1123. typealias OnCancelProducer = OnWaitToProduceMore
  1124. @inlinable
  1125. mutating func cancelProducer(withToken token: Int) -> OnCancelProducer {
  1126. let onCancelProducer: OnCancelProducer
  1127. switch self._state {
  1128. case .initial, .subscribed:
  1129. // Nothing produced yet, so no reason have to wait to produce.
  1130. fatalError("Internal inconsistency")
  1131. case .streaming(var state):
  1132. self._state = ._modifying
  1133. onCancelProducer = state.cancelProducer(withToken: token)
  1134. self._state = .streaming(state)
  1135. case .finished:
  1136. // No producers to cancel; do nothing.
  1137. onCancelProducer = .none
  1138. case ._modifying:
  1139. fatalError("Internal inconsistency")
  1140. }
  1141. return onCancelProducer
  1142. }
  1143. @usableFromInline
  1144. enum OnDropResources {
  1145. case none
  1146. case resume(ConsumerContinuations, ProducerContinuations)
  1147. }
  1148. @inlinable
  1149. mutating func dropResources() -> OnDropResources {
  1150. let error = BroadcastAsyncSequenceError.productionAlreadyFinished
  1151. let onDrop: OnDropResources
  1152. switch self._state {
  1153. case .initial(let state):
  1154. self._state = ._modifying
  1155. onDrop = .none
  1156. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1157. case .subscribed(var state):
  1158. self._state = ._modifying
  1159. onDrop = state.dropResources(error: error)
  1160. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1161. case .streaming(var state):
  1162. self._state = ._modifying
  1163. onDrop = state.dropResources(error: error)
  1164. self._state = .finished(State.Finished(from: state, result: .failure(error)))
  1165. case .finished(var state):
  1166. self._state = ._modifying
  1167. onDrop = state.dropResources(error: error)
  1168. self._state = .finished(state)
  1169. case ._modifying:
  1170. fatalError("Internal inconsistency")
  1171. }
  1172. return onDrop
  1173. }
  1174. }
  1175. @available(gRPCSwift 2.0, *)
  1176. extension _BroadcastSequenceStateMachine {
  1177. /// A collection of elements tagged with an identifier.
  1178. ///
  1179. /// Identifiers are assigned when elements are added to the collection and are monotonically
  1180. /// increasing. If element 'A' is added before element 'B' then 'A' will have a lower ID than 'B'.
  1181. @usableFromInline
  1182. struct Elements: Sendable {
  1183. /// The ID of an element
  1184. @usableFromInline
  1185. struct ID: Hashable, Sendable, Comparable, Strideable {
  1186. @usableFromInline
  1187. private(set) var rawValue: Int
  1188. @usableFromInline
  1189. static var initial: Self {
  1190. ID(id: 0)
  1191. }
  1192. private init(id: Int) {
  1193. self.rawValue = id
  1194. }
  1195. @inlinable
  1196. mutating func formNext() {
  1197. self.rawValue += 1
  1198. }
  1199. @inlinable
  1200. func next() -> Self {
  1201. var copy = self
  1202. copy.formNext()
  1203. return copy
  1204. }
  1205. @inlinable
  1206. func distance(to other: Self) -> Int {
  1207. other.rawValue - self.rawValue
  1208. }
  1209. @inlinable
  1210. func advanced(by n: Int) -> Self {
  1211. var copy = self
  1212. copy.rawValue += n
  1213. return copy
  1214. }
  1215. @inlinable
  1216. static func < (lhs: Self, rhs: Self) -> Bool {
  1217. lhs.rawValue < rhs.rawValue
  1218. }
  1219. }
  1220. @usableFromInline
  1221. struct _IdentifiableElement: Sendable {
  1222. @usableFromInline
  1223. var element: Element
  1224. @usableFromInline
  1225. var id: ID
  1226. @inlinable
  1227. init(element: Element, id: ID) {
  1228. self.element = element
  1229. self.id = id
  1230. }
  1231. }
  1232. @usableFromInline
  1233. var _elements: Deque<_IdentifiableElement>
  1234. @usableFromInline
  1235. var _nextID: ID
  1236. @inlinable
  1237. init() {
  1238. self._nextID = .initial
  1239. self._elements = []
  1240. }
  1241. @inlinable
  1242. mutating func nextElementID() -> ID {
  1243. let id = self._nextID
  1244. self._nextID.formNext()
  1245. return id
  1246. }
  1247. /// The highest ID of the stored elements; `nil` if there are no elements.
  1248. @inlinable
  1249. var highestID: ID? { self._elements.last?.id }
  1250. /// The lowest ID of the stored elements; `nil` if there are no elements.
  1251. @inlinable
  1252. var lowestID: ID? { self._elements.first?.id }
  1253. /// The number of stored elements.
  1254. @inlinable
  1255. var count: Int { self._elements.count }
  1256. /// Whether there are no stored elements.
  1257. @inlinable
  1258. var isEmpty: Bool { self._elements.isEmpty }
  1259. /// Appends an element to the collection.
  1260. @inlinable
  1261. mutating func append(_ element: Element) {
  1262. self._elements.append(_IdentifiableElement(element: element, id: self.nextElementID()))
  1263. }
  1264. /// Removes the first element from the collection.
  1265. @discardableResult
  1266. @inlinable
  1267. mutating func removeFirst() -> Element {
  1268. let removed = self._elements.removeFirst()
  1269. return removed.element
  1270. }
  1271. @usableFromInline
  1272. enum ElementLookup {
  1273. /// The element was found in the collection.
  1274. case found(Element)
  1275. /// The element isn't in the collection, but it could be in the future.
  1276. case maybeAvailableLater
  1277. /// The element was in the collection, but is no longer available.
  1278. case noLongerAvailable
  1279. }
  1280. /// Lookup the element with the given ID.
  1281. ///
  1282. /// - Parameter id: The ID of the element to lookup.
  1283. @inlinable
  1284. mutating func lookupElement(withID id: ID) -> ElementLookup {
  1285. guard let low = self.lowestID, let high = self.highestID else {
  1286. // Must be empty.
  1287. return id >= self._nextID ? .maybeAvailableLater : .noLongerAvailable
  1288. }
  1289. assert(low <= high)
  1290. let lookup: ElementLookup
  1291. if id < low {
  1292. lookup = .noLongerAvailable
  1293. } else if id > high {
  1294. lookup = .maybeAvailableLater
  1295. } else {
  1296. // IDs are monotonically increasing. If the buffer contains the tag we can use it to index
  1297. // into the deque by looking at the offsets.
  1298. let offset = low.distance(to: id)
  1299. let index = self._elements.startIndex.advanced(by: offset)
  1300. lookup = .found(self._elements[index].element)
  1301. }
  1302. return lookup
  1303. }
  1304. }
  1305. }
  1306. @available(gRPCSwift 2.0, *)
  1307. extension _BroadcastSequenceStateMachine {
  1308. /// A collection of subscriptions.
  1309. @usableFromInline
  1310. struct Subscriptions: Sendable {
  1311. @usableFromInline
  1312. struct ID: Hashable, Sendable {
  1313. @usableFromInline
  1314. private(set) var rawValue: Int
  1315. @inlinable
  1316. init() {
  1317. self.rawValue = 0
  1318. }
  1319. @inlinable
  1320. mutating func formNext() {
  1321. self.rawValue += 1
  1322. }
  1323. @inlinable
  1324. func next() -> Self {
  1325. var copy = self
  1326. copy.formNext()
  1327. return copy
  1328. }
  1329. }
  1330. @usableFromInline
  1331. struct _Subscriber: Sendable {
  1332. /// The ID of the subscriber.
  1333. @usableFromInline
  1334. var id: ID
  1335. /// The ID of the next element the subscriber will consume.
  1336. @usableFromInline
  1337. var nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1338. /// A continuation which which will be resumed when the next element becomes available.
  1339. @usableFromInline
  1340. var continuation: ConsumerContinuation?
  1341. @inlinable
  1342. init(
  1343. id: ID,
  1344. nextElementID: _BroadcastSequenceStateMachine<Element>.Elements.ID,
  1345. continuation: ConsumerContinuation? = nil
  1346. ) {
  1347. self.id = id
  1348. self.nextElementID = nextElementID
  1349. self.continuation = continuation
  1350. }
  1351. /// Returns and sets the continuation to `nil` if one exists.
  1352. ///
  1353. /// The next element ID is advanced if a contination exists.
  1354. ///
  1355. /// - Returns: The continuation, if one existed.
  1356. @inlinable
  1357. mutating func takeContinuation() -> ConsumerContinuation? {
  1358. guard let continuation = self.continuation else { return nil }
  1359. self.continuation = nil
  1360. self.nextElementID.formNext()
  1361. return continuation
  1362. }
  1363. }
  1364. @usableFromInline
  1365. var _subscribers: [_Subscriber]
  1366. @usableFromInline
  1367. var _nextSubscriberID: ID
  1368. @inlinable
  1369. init() {
  1370. self._subscribers = []
  1371. self._nextSubscriberID = ID()
  1372. }
  1373. /// Returns the number of subscribers.
  1374. @inlinable
  1375. var count: Int { self._subscribers.count }
  1376. /// Returns whether the collection is empty.
  1377. @inlinable
  1378. var isEmpty: Bool { self._subscribers.isEmpty }
  1379. /// Adds a new subscriber and returns its unique ID.
  1380. ///
  1381. /// - Returns: The ID of the new subscriber.
  1382. @inlinable
  1383. mutating func subscribe() -> ID {
  1384. let id = self._nextSubscriberID
  1385. self._nextSubscriberID.formNext()
  1386. self._subscribers.append(_Subscriber(id: id, nextElementID: .initial))
  1387. return id
  1388. }
  1389. /// Provides mutable access to the element ID of the given subscriber, if it exists.
  1390. ///
  1391. /// - Parameters:
  1392. /// - id: The ID of the subscriber.
  1393. /// - body: A closure to mutate the element ID of the subscriber which returns the result and
  1394. /// a boolean indicating whether the subscriber should be removed.
  1395. /// - Returns: The result returned from the closure or `nil` if no subscriber exists with the
  1396. /// given ID.
  1397. @inlinable
  1398. mutating func withMutableElementID<R>(
  1399. forSubscriber id: ID,
  1400. _ body: (
  1401. inout _BroadcastSequenceStateMachine<Element>.Elements.ID
  1402. ) -> (result: R, removeSubscription: Bool)
  1403. ) -> R? {
  1404. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else { return nil }
  1405. let (result, removeSubscription) = body(&self._subscribers[index].nextElementID)
  1406. if removeSubscription {
  1407. self._subscribers.remove(at: index)
  1408. }
  1409. return result
  1410. }
  1411. /// Sets the continuation for the subscription with the given ID.
  1412. /// - Parameters:
  1413. /// - continuation: The continuation to set.
  1414. /// - id: The ID of the subscriber.
  1415. /// - Returns: A boolean indicating whether the continuation was set or not.
  1416. @inlinable
  1417. mutating func setContinuation(
  1418. _ continuation: ConsumerContinuation,
  1419. forSubscriber id: ID
  1420. ) -> Bool {
  1421. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1422. return false
  1423. }
  1424. assert(self._subscribers[index].continuation == nil)
  1425. self._subscribers[index].continuation = continuation
  1426. return true
  1427. }
  1428. /// Returns an array of subscriber IDs which are whose next element ID is `id`.
  1429. @inlinable
  1430. func subscribers(
  1431. withElementID id: _BroadcastSequenceStateMachine<Element>.Elements.ID
  1432. ) -> [ID] {
  1433. return self._subscribers.filter {
  1434. $0.nextElementID == id
  1435. }.map {
  1436. $0.id
  1437. }
  1438. }
  1439. /// Removes the subscriber with the given ID.
  1440. /// - Parameter id: The ID of the subscriber to remove.
  1441. /// - Returns: A tuple indicating whether a subscriber was removed and any continuation
  1442. /// associated with the subscriber.
  1443. @inlinable
  1444. mutating func removeSubscriber(withID id: ID) -> (Bool, ConsumerContinuation?) {
  1445. guard let index = self._subscribers.firstIndex(where: { $0.id == id }) else {
  1446. return (false, nil)
  1447. }
  1448. let continuation = self._subscribers[index].continuation
  1449. self._subscribers.remove(at: index)
  1450. return (true, continuation)
  1451. }
  1452. /// Remove all subscribers in the given array of IDs.
  1453. @inlinable
  1454. mutating func removeAllSubscribers(in idsToRemove: [ID]) {
  1455. self._subscribers.removeAll {
  1456. idsToRemove.contains($0.id)
  1457. }
  1458. }
  1459. /// Remove all subscribers and return their IDs.
  1460. @inlinable
  1461. mutating func removeAllSubscribers() -> [ID] {
  1462. let subscribers = self._subscribers.map { $0.id }
  1463. self._subscribers.removeAll()
  1464. return subscribers
  1465. }
  1466. /// Returns any continuations set on subscribers, unsetting at the same time.
  1467. @inlinable
  1468. mutating func takeContinuations() -> _OneOrMany<ConsumerContinuation>? {
  1469. // Avoid allocs if there's only one subscriber.
  1470. let count = self._countPendingContinuations()
  1471. let result: _OneOrMany<ConsumerContinuation>?
  1472. switch count {
  1473. case 0:
  1474. result = nil
  1475. case 1:
  1476. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1477. let continuation = self._subscribers[index].takeContinuation()!
  1478. result = .one(continuation)
  1479. default:
  1480. var continuations = [ConsumerContinuation]()
  1481. continuations.reserveCapacity(count)
  1482. for index in self._subscribers.indices {
  1483. if let continuation = self._subscribers[index].takeContinuation() {
  1484. continuations.append(continuation)
  1485. }
  1486. }
  1487. result = .many(continuations)
  1488. }
  1489. return result
  1490. }
  1491. /// Removes all subscribers which have continuations and return their continuations.
  1492. @inlinable
  1493. mutating func removeSubscribersWithContinuations() -> _OneOrMany<ConsumerContinuation> {
  1494. // Avoid allocs if there's only one subscriber.
  1495. let count = self._countPendingContinuations()
  1496. let result: _OneOrMany<ConsumerContinuation>
  1497. switch count {
  1498. case 0:
  1499. result = .many([])
  1500. case 1:
  1501. let index = self._subscribers.firstIndex(where: { $0.continuation != nil })!
  1502. let subscription = self._subscribers.remove(at: index)
  1503. result = .one(subscription.continuation!)
  1504. default:
  1505. var continuations = [ConsumerContinuation]()
  1506. continuations.reserveCapacity(count)
  1507. var removable = [ID]()
  1508. removable.reserveCapacity(count)
  1509. for subscription in self._subscribers {
  1510. if let continuation = subscription.continuation {
  1511. continuations.append(continuation)
  1512. removable.append(subscription.id)
  1513. }
  1514. }
  1515. self._subscribers.removeAll {
  1516. removable.contains($0.id)
  1517. }
  1518. result = .many(continuations)
  1519. }
  1520. return result
  1521. }
  1522. @inlinable
  1523. func _countPendingContinuations() -> Int {
  1524. return self._subscribers.reduce(into: 0) { count, subscription in
  1525. if subscription.continuation != nil {
  1526. count += 1
  1527. }
  1528. }
  1529. }
  1530. }
  1531. }
  1532. @available(gRPCSwift 2.0, *)
  1533. extension _BroadcastSequenceStateMachine {
  1534. // TODO: tiny array
  1535. @usableFromInline
  1536. enum _OneOrMany<Value> {
  1537. case one(Value)
  1538. case many([Value])
  1539. }
  1540. }