BufferedStream.swift 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. //===----------------------------------------------------------------------===//
  17. //
  18. // This source file is part of the Swift.org open source project
  19. //
  20. // Copyright (c) 2020-2021 Apple Inc. and the Swift project authors
  21. // Licensed under Apache License v2.0 with Runtime Library Exception
  22. //
  23. // See https://swift.org/LICENSE.txt for license information
  24. // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
  25. //
  26. //===----------------------------------------------------------------------===//
  27. import DequeModule
  28. /// An asynchronous sequence generated from an error-throwing closure that
  29. /// calls a continuation to produce new elements.
  30. ///
  31. /// `BufferedStream` conforms to `AsyncSequence`, providing a convenient
  32. /// way to create an asynchronous sequence without manually implementing an
  33. /// asynchronous iterator. In particular, an asynchronous stream is well-suited
  34. /// to adapt callback- or delegation-based APIs to participate with
  35. /// `async`-`await`.
  36. ///
  37. /// In contrast to `AsyncStream`, this type can throw an error from the awaited
  38. /// `next()`, which terminates the stream with the thrown error.
  39. ///
  40. /// You initialize an `BufferedStream` with a closure that receives an
  41. /// `BufferedStream.Continuation`. Produce elements in this closure, then
  42. /// provide them to the stream by calling the continuation's `yield(_:)` method.
  43. /// When there are no further elements to produce, call the continuation's
  44. /// `finish()` method. This causes the sequence iterator to produce a `nil`,
  45. /// which terminates the sequence. If an error occurs, call the continuation's
  46. /// `finish(throwing:)` method, which causes the iterator's `next()` method to
  47. /// throw the error to the awaiting call point. The continuation is `Sendable`,
  48. /// which permits calling it from concurrent contexts external to the iteration
  49. /// of the `BufferedStream`.
  50. ///
  51. /// An arbitrary source of elements can produce elements faster than they are
  52. /// consumed by a caller iterating over them. Because of this, `BufferedStream`
  53. /// defines a buffering behavior, allowing the stream to buffer a specific
  54. /// number of oldest or newest elements. By default, the buffer limit is
  55. /// `Int.max`, which means it's unbounded.
  56. ///
  57. /// ### Adapting Existing Code to Use Streams
  58. ///
  59. /// To adapt existing callback code to use `async`-`await`, use the callbacks
  60. /// to provide values to the stream, by using the continuation's `yield(_:)`
  61. /// method.
  62. ///
  63. /// Consider a hypothetical `QuakeMonitor` type that provides callers with
  64. /// `Quake` instances every time it detects an earthquake. To receive callbacks,
  65. /// callers set a custom closure as the value of the monitor's
  66. /// `quakeHandler` property, which the monitor calls back as necessary. Callers
  67. /// can also set an `errorHandler` to receive asynchronous error notifications,
  68. /// such as the monitor service suddenly becoming unavailable.
  69. ///
  70. /// class QuakeMonitor {
  71. /// var quakeHandler: ((Quake) -> Void)?
  72. /// var errorHandler: ((Error) -> Void)?
  73. ///
  74. /// func startMonitoring() {…}
  75. /// func stopMonitoring() {…}
  76. /// }
  77. ///
  78. /// To adapt this to use `async`-`await`, extend the `QuakeMonitor` to add a
  79. /// `quakes` property, of type `BufferedStream<Quake>`. In the getter for
  80. /// this property, return an `BufferedStream`, whose `build` closure --
  81. /// called at runtime to create the stream -- uses the continuation to
  82. /// perform the following steps:
  83. ///
  84. /// 1. Creates a `QuakeMonitor` instance.
  85. /// 2. Sets the monitor's `quakeHandler` property to a closure that receives
  86. /// each `Quake` instance and forwards it to the stream by calling the
  87. /// continuation's `yield(_:)` method.
  88. /// 3. Sets the monitor's `errorHandler` property to a closure that receives
  89. /// any error from the monitor and forwards it to the stream by calling the
  90. /// continuation's `finish(throwing:)` method. This causes the stream's
  91. /// iterator to throw the error and terminate the stream.
  92. /// 4. Sets the continuation's `onTermination` property to a closure that
  93. /// calls `stopMonitoring()` on the monitor.
  94. /// 5. Calls `startMonitoring` on the `QuakeMonitor`.
  95. ///
  96. /// ```
  97. /// extension QuakeMonitor {
  98. ///
  99. /// static var throwingQuakes: BufferedStream<Quake, Error> {
  100. /// BufferedStream { continuation in
  101. /// let monitor = QuakeMonitor()
  102. /// monitor.quakeHandler = { quake in
  103. /// continuation.yield(quake)
  104. /// }
  105. /// monitor.errorHandler = { error in
  106. /// continuation.finish(throwing: error)
  107. /// }
  108. /// continuation.onTermination = { @Sendable _ in
  109. /// monitor.stopMonitoring()
  110. /// }
  111. /// monitor.startMonitoring()
  112. /// }
  113. /// }
  114. /// }
  115. /// ```
  116. ///
  117. ///
  118. /// Because the stream is an `AsyncSequence`, the call point uses the
  119. /// `for`-`await`-`in` syntax to process each `Quake` instance as produced by the stream:
  120. ///
  121. /// do {
  122. /// for try await quake in quakeStream {
  123. /// print("Quake: \(quake.date)")
  124. /// }
  125. /// print("Stream done.")
  126. /// } catch {
  127. /// print("Error: \(error)")
  128. /// }
  129. ///
  130. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  131. @usableFromInline
  132. internal struct BufferedStream<Element> {
  133. @usableFromInline
  134. final class _Backing: Sendable {
  135. @usableFromInline
  136. let storage: _BackPressuredStorage
  137. @inlinable
  138. init(storage: _BackPressuredStorage) {
  139. self.storage = storage
  140. }
  141. deinit {
  142. self.storage.sequenceDeinitialized()
  143. }
  144. }
  145. @usableFromInline
  146. enum _Implementation: Sendable {
  147. /// This is the implementation with backpressure based on the Source
  148. case backpressured(_Backing)
  149. }
  150. @usableFromInline
  151. let implementation: _Implementation
  152. }
  153. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  154. extension BufferedStream: AsyncSequence {
  155. /// The asynchronous iterator for iterating an asynchronous stream.
  156. ///
  157. /// This type is not `Sendable`. Don't use it from multiple
  158. /// concurrent contexts. It is a programmer error to invoke `next()` from a
  159. /// concurrent context that contends with another such call, which
  160. /// results in a call to `fatalError()`.
  161. @usableFromInline
  162. internal struct Iterator: AsyncIteratorProtocol {
  163. @usableFromInline
  164. final class _Backing {
  165. @usableFromInline
  166. let storage: _BackPressuredStorage
  167. @inlinable
  168. init(storage: _BackPressuredStorage) {
  169. self.storage = storage
  170. self.storage.iteratorInitialized()
  171. }
  172. deinit {
  173. self.storage.iteratorDeinitialized()
  174. }
  175. }
  176. @usableFromInline
  177. enum _Implementation {
  178. /// This is the implementation with backpressure based on the Source
  179. case backpressured(_Backing)
  180. }
  181. @usableFromInline
  182. var implementation: _Implementation
  183. @inlinable
  184. init(implementation: _Implementation) {
  185. self.implementation = implementation
  186. }
  187. /// The next value from the asynchronous stream.
  188. ///
  189. /// When `next()` returns `nil`, this signifies the end of the
  190. /// `BufferedStream`.
  191. ///
  192. /// It is a programmer error to invoke `next()` from a concurrent context
  193. /// that contends with another such call, which results in a call to
  194. /// `fatalError()`.
  195. ///
  196. /// If you cancel the task this iterator is running in while `next()` is
  197. /// awaiting a value, the `BufferedStream` terminates. In this case,
  198. /// `next()` may return `nil` immediately, or else return `nil` on
  199. /// subsequent calls.
  200. @inlinable
  201. internal mutating func next() async throws -> Element? {
  202. switch self.implementation {
  203. case .backpressured(let backing):
  204. return try await backing.storage.next()
  205. }
  206. }
  207. }
  208. /// Creates the asynchronous iterator that produces elements of this
  209. /// asynchronous sequence.
  210. @inlinable
  211. internal func makeAsyncIterator() -> Iterator {
  212. switch self.implementation {
  213. case .backpressured(let backing):
  214. return Iterator(implementation: .backpressured(.init(storage: backing.storage)))
  215. }
  216. }
  217. }
  218. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  219. extension BufferedStream: Sendable where Element: Sendable {}
  220. @usableFromInline
  221. internal struct _ManagedCriticalState<State>: @unchecked Sendable {
  222. @usableFromInline
  223. let lock: LockedValueBox<State>
  224. @inlinable
  225. internal init(_ initial: State) {
  226. self.lock = .init(initial)
  227. }
  228. @inlinable
  229. internal func withCriticalRegion<R>(
  230. _ critical: (inout State) throws -> R
  231. ) rethrows -> R {
  232. try self.lock.withLockedValue(critical)
  233. }
  234. }
  235. @usableFromInline
  236. internal struct AlreadyFinishedError: Error {
  237. @inlinable
  238. init() {}
  239. }
  240. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  241. extension BufferedStream {
  242. /// A mechanism to interface between producer code and an asynchronous stream.
  243. ///
  244. /// Use this source to provide elements to the stream by calling one of the `write` methods, then terminate the stream normally
  245. /// by calling the `finish()` method. You can also use the source's `finish(throwing:)` method to terminate the stream by
  246. /// throwing an error.
  247. @usableFromInline
  248. internal struct Source: Sendable {
  249. /// A strategy that handles the backpressure of the asynchronous stream.
  250. @usableFromInline
  251. internal struct BackPressureStrategy: Sendable {
  252. /// When the high watermark is reached producers will be suspended. All producers will be resumed again once
  253. /// the low watermark is reached.
  254. @inlinable
  255. internal static func watermark(low: Int, high: Int) -> BackPressureStrategy {
  256. BackPressureStrategy(
  257. internalBackPressureStrategy: .watermark(.init(low: low, high: high))
  258. )
  259. }
  260. @inlinable
  261. init(internalBackPressureStrategy: _InternalBackPressureStrategy) {
  262. self._internalBackPressureStrategy = internalBackPressureStrategy
  263. }
  264. @usableFromInline
  265. let _internalBackPressureStrategy: _InternalBackPressureStrategy
  266. }
  267. /// A type that indicates the result of writing elements to the source.
  268. @frozen
  269. @usableFromInline
  270. internal enum WriteResult: Sendable {
  271. /// A token that is returned when the asynchronous stream's backpressure strategy indicated that production should
  272. /// be suspended. Use this token to enqueue a callback by calling the ``enqueueCallback(_:)`` method.
  273. @usableFromInline
  274. internal struct CallbackToken: Sendable {
  275. @usableFromInline
  276. let id: UInt
  277. @usableFromInline
  278. init(id: UInt) {
  279. self.id = id
  280. }
  281. }
  282. /// Indicates that more elements should be produced and written to the source.
  283. case produceMore
  284. /// Indicates that a callback should be enqueued.
  285. ///
  286. /// The associated token should be passed to the ``enqueueCallback(_:)`` method.
  287. case enqueueCallback(CallbackToken)
  288. }
  289. /// Backing class for the source used to hook a deinit.
  290. @usableFromInline
  291. final class _Backing: Sendable {
  292. @usableFromInline
  293. let storage: _BackPressuredStorage
  294. @inlinable
  295. init(storage: _BackPressuredStorage) {
  296. self.storage = storage
  297. }
  298. deinit {
  299. self.storage.sourceDeinitialized()
  300. }
  301. }
  302. /// A callback to invoke when the stream finished.
  303. ///
  304. /// The stream finishes and calls this closure in the following cases:
  305. /// - No iterator was created and the sequence was deinited
  306. /// - An iterator was created and deinited
  307. /// - After ``finish(throwing:)`` was called and all elements have been consumed
  308. /// - The consuming task got cancelled
  309. @inlinable
  310. internal var onTermination: (@Sendable () -> Void)? {
  311. set {
  312. self._backing.storage.onTermination = newValue
  313. }
  314. get {
  315. self._backing.storage.onTermination
  316. }
  317. }
  318. @usableFromInline
  319. var _backing: _Backing
  320. @inlinable
  321. internal init(storage: _BackPressuredStorage) {
  322. self._backing = .init(storage: storage)
  323. }
  324. /// Writes new elements to the asynchronous stream.
  325. ///
  326. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  327. /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
  328. /// indicating the failure.
  329. ///
  330. /// - Parameter sequence: The elements to write to the asynchronous stream.
  331. /// - Returns: The result that indicates if more elements should be produced at this time.
  332. @inlinable
  333. internal func write<S>(contentsOf sequence: S) throws -> WriteResult
  334. where Element == S.Element, S: Sequence {
  335. try self._backing.storage.write(contentsOf: sequence)
  336. }
  337. /// Write the element to the asynchronous stream.
  338. ///
  339. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  340. /// provided element. If the asynchronous stream already terminated then this method will throw an error
  341. /// indicating the failure.
  342. ///
  343. /// - Parameter element: The element to write to the asynchronous stream.
  344. /// - Returns: The result that indicates if more elements should be produced at this time.
  345. @inlinable
  346. internal func write(_ element: Element) throws -> WriteResult {
  347. try self._backing.storage.write(contentsOf: CollectionOfOne(element))
  348. }
  349. /// Enqueues a callback that will be invoked once more elements should be produced.
  350. ///
  351. /// Call this method after ``write(contentsOf:)`` or ``write(:)`` returned ``WriteResult/enqueueCallback(_:)``.
  352. ///
  353. /// - Important: Enqueueing the same token multiple times is not allowed.
  354. ///
  355. /// - Parameters:
  356. /// - callbackToken: The callback token.
  357. /// - onProduceMore: The callback which gets invoked once more elements should be produced.
  358. @inlinable
  359. internal func enqueueCallback(
  360. callbackToken: WriteResult.CallbackToken,
  361. onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
  362. ) {
  363. self._backing.storage.enqueueProducer(
  364. callbackToken: callbackToken,
  365. onProduceMore: onProduceMore
  366. )
  367. }
  368. /// Cancel an enqueued callback.
  369. ///
  370. /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method.
  371. ///
  372. /// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and
  373. /// will mark the passed `callbackToken` as cancelled.
  374. ///
  375. /// - Parameter callbackToken: The callback token.
  376. @inlinable
  377. internal func cancelCallback(callbackToken: WriteResult.CallbackToken) {
  378. self._backing.storage.cancelProducer(callbackToken: callbackToken)
  379. }
  380. /// Write new elements to the asynchronous stream and provide a callback which will be invoked once more elements should be produced.
  381. ///
  382. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  383. /// first element of the provided sequence. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
  384. /// a `Result.failure`.
  385. ///
  386. /// - Parameters:
  387. /// - sequence: The elements to write to the asynchronous stream.
  388. /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
  389. /// invoked during the call to ``write(contentsOf:onProduceMore:)``.
  390. @inlinable
  391. internal func write<S>(
  392. contentsOf sequence: S,
  393. onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
  394. ) where Element == S.Element, S: Sequence {
  395. do {
  396. let writeResult = try self.write(contentsOf: sequence)
  397. switch writeResult {
  398. case .produceMore:
  399. onProduceMore(Result<Void, Error>.success(()))
  400. case .enqueueCallback(let callbackToken):
  401. self.enqueueCallback(callbackToken: callbackToken, onProduceMore: onProduceMore)
  402. }
  403. } catch {
  404. onProduceMore(.failure(error))
  405. }
  406. }
  407. /// Writes the element to the asynchronous stream.
  408. ///
  409. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  410. /// provided element. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
  411. /// a `Result.failure`.
  412. ///
  413. /// - Parameters:
  414. /// - sequence: The element to write to the asynchronous stream.
  415. /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
  416. /// invoked during the call to ``write(_:onProduceMore:)``.
  417. @inlinable
  418. internal func write(
  419. _ element: Element,
  420. onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
  421. ) {
  422. self.write(contentsOf: CollectionOfOne(element), onProduceMore: onProduceMore)
  423. }
  424. /// Write new elements to the asynchronous stream.
  425. ///
  426. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  427. /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
  428. /// indicating the failure.
  429. ///
  430. /// This method returns once more elements should be produced.
  431. ///
  432. /// - Parameters:
  433. /// - sequence: The elements to write to the asynchronous stream.
  434. @inlinable
  435. internal func write<S>(contentsOf sequence: S) async throws
  436. where Element == S.Element, S: Sequence {
  437. let writeResult = try { try self.write(contentsOf: sequence) }()
  438. switch writeResult {
  439. case .produceMore:
  440. return
  441. case .enqueueCallback(let callbackToken):
  442. try await withTaskCancellationHandler {
  443. try await withCheckedThrowingContinuation { continuation in
  444. self.enqueueCallback(
  445. callbackToken: callbackToken,
  446. onProduceMore: { result in
  447. switch result {
  448. case .success():
  449. continuation.resume(returning: ())
  450. case .failure(let error):
  451. continuation.resume(throwing: error)
  452. }
  453. }
  454. )
  455. }
  456. } onCancel: {
  457. self.cancelCallback(callbackToken: callbackToken)
  458. }
  459. }
  460. }
  461. /// Write new element to the asynchronous stream.
  462. ///
  463. /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
  464. /// provided element. If the asynchronous stream already terminated then this method will throw an error
  465. /// indicating the failure.
  466. ///
  467. /// This method returns once more elements should be produced.
  468. ///
  469. /// - Parameters:
  470. /// - sequence: The element to write to the asynchronous stream.
  471. @inlinable
  472. internal func write(_ element: Element) async throws {
  473. try await self.write(contentsOf: CollectionOfOne(element))
  474. }
  475. /// Write the elements of the asynchronous sequence to the asynchronous stream.
  476. ///
  477. /// This method returns once the provided asynchronous sequence or the the asynchronous stream finished.
  478. ///
  479. /// - Important: This method does not finish the source if consuming the upstream sequence terminated.
  480. ///
  481. /// - Parameters:
  482. /// - sequence: The elements to write to the asynchronous stream.
  483. @inlinable
  484. internal func write<S>(contentsOf sequence: S) async throws
  485. where Element == S.Element, S: AsyncSequence {
  486. for try await element in sequence {
  487. try await self.write(contentsOf: CollectionOfOne(element))
  488. }
  489. }
  490. /// Indicates that the production terminated.
  491. ///
  492. /// After all buffered elements are consumed the next iteration point will return `nil` or throw an error.
  493. ///
  494. /// Calling this function more than once has no effect. After calling finish, the stream enters a terminal state and doesn't accept
  495. /// new elements.
  496. ///
  497. /// - Parameters:
  498. /// - error: The error to throw, or `nil`, to finish normally.
  499. @inlinable
  500. internal func finish(throwing error: Error?) {
  501. self._backing.storage.finish(error)
  502. }
  503. }
  504. /// Initializes a new ``BufferedStream`` and an ``BufferedStream/Source``.
  505. ///
  506. /// - Parameters:
  507. /// - elementType: The element type of the stream.
  508. /// - failureType: The failure type of the stream.
  509. /// - backPressureStrategy: The backpressure strategy that the stream should use.
  510. /// - Returns: A tuple containing the stream and its source. The source should be passed to the
  511. /// producer while the stream should be passed to the consumer.
  512. @inlinable
  513. internal static func makeStream(
  514. of elementType: Element.Type = Element.self,
  515. throwing failureType: Error.Type = Error.self,
  516. backPressureStrategy: Source.BackPressureStrategy
  517. ) -> (`Self`, Source) where Error == Error {
  518. let storage = _BackPressuredStorage(
  519. backPressureStrategy: backPressureStrategy._internalBackPressureStrategy
  520. )
  521. let source = Source(storage: storage)
  522. return (.init(storage: storage), source)
  523. }
  524. @inlinable
  525. init(storage: _BackPressuredStorage) {
  526. self.implementation = .backpressured(.init(storage: storage))
  527. }
  528. }
  529. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  530. extension BufferedStream {
  531. @usableFromInline
  532. struct _WatermarkBackPressureStrategy: Sendable {
  533. /// The low watermark where demand should start.
  534. @usableFromInline
  535. let _low: Int
  536. /// The high watermark where demand should be stopped.
  537. @usableFromInline
  538. let _high: Int
  539. /// Initializes a new ``_WatermarkBackPressureStrategy``.
  540. ///
  541. /// - Parameters:
  542. /// - low: The low watermark where demand should start.
  543. /// - high: The high watermark where demand should be stopped.
  544. @inlinable
  545. init(low: Int, high: Int) {
  546. precondition(low <= high)
  547. self._low = low
  548. self._high = high
  549. }
  550. @inlinable
  551. func didYield(bufferDepth: Int) -> Bool {
  552. // We are demanding more until we reach the high watermark
  553. return bufferDepth < self._high
  554. }
  555. @inlinable
  556. func didConsume(bufferDepth: Int) -> Bool {
  557. // We start demanding again once we are below the low watermark
  558. return bufferDepth < self._low
  559. }
  560. }
  561. @usableFromInline
  562. enum _InternalBackPressureStrategy: Sendable {
  563. case watermark(_WatermarkBackPressureStrategy)
  564. @inlinable
  565. mutating func didYield(bufferDepth: Int) -> Bool {
  566. switch self {
  567. case .watermark(let strategy):
  568. return strategy.didYield(bufferDepth: bufferDepth)
  569. }
  570. }
  571. @inlinable
  572. mutating func didConsume(bufferDepth: Int) -> Bool {
  573. switch self {
  574. case .watermark(let strategy):
  575. return strategy.didConsume(bufferDepth: bufferDepth)
  576. }
  577. }
  578. }
  579. }
  580. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  581. extension BufferedStream {
  582. // We are unchecked Sendable since we are protecting our state with a lock.
  583. @usableFromInline
  584. final class _BackPressuredStorage: Sendable {
  585. /// The state machine
  586. @usableFromInline
  587. let _stateMachine: _ManagedCriticalState<_StateMachine>
  588. @usableFromInline
  589. var onTermination: (@Sendable () -> Void)? {
  590. set {
  591. self._stateMachine.withCriticalRegion {
  592. $0._onTermination = newValue
  593. }
  594. }
  595. get {
  596. self._stateMachine.withCriticalRegion {
  597. $0._onTermination
  598. }
  599. }
  600. }
  601. @inlinable
  602. init(
  603. backPressureStrategy: _InternalBackPressureStrategy
  604. ) {
  605. self._stateMachine = .init(.init(backPressureStrategy: backPressureStrategy))
  606. }
  607. @inlinable
  608. func sequenceDeinitialized() {
  609. let action = self._stateMachine.withCriticalRegion {
  610. $0.sequenceDeinitialized()
  611. }
  612. switch action {
  613. case .callOnTermination(let onTermination):
  614. onTermination?()
  615. case .failProducersAndCallOnTermination(let producerContinuations, let onTermination):
  616. for producerContinuation in producerContinuations {
  617. producerContinuation(.failure(AlreadyFinishedError()))
  618. }
  619. onTermination?()
  620. case .none:
  621. break
  622. }
  623. }
  624. @inlinable
  625. func iteratorInitialized() {
  626. self._stateMachine.withCriticalRegion {
  627. $0.iteratorInitialized()
  628. }
  629. }
  630. @inlinable
  631. func iteratorDeinitialized() {
  632. let action = self._stateMachine.withCriticalRegion {
  633. $0.iteratorDeinitialized()
  634. }
  635. switch action {
  636. case .callOnTermination(let onTermination):
  637. onTermination?()
  638. case .failProducersAndCallOnTermination(let producerContinuations, let onTermination):
  639. for producerContinuation in producerContinuations {
  640. producerContinuation(.failure(AlreadyFinishedError()))
  641. }
  642. onTermination?()
  643. case .none:
  644. break
  645. }
  646. }
  647. @inlinable
  648. func sourceDeinitialized() {
  649. let action = self._stateMachine.withCriticalRegion {
  650. $0.sourceDeinitialized()
  651. }
  652. switch action {
  653. case .callOnTermination(let onTermination):
  654. onTermination?()
  655. case .failProducersAndCallOnTermination(
  656. let consumer,
  657. let producerContinuations,
  658. let onTermination
  659. ):
  660. consumer?.resume(returning: nil)
  661. for producerContinuation in producerContinuations {
  662. producerContinuation(.failure(AlreadyFinishedError()))
  663. }
  664. onTermination?()
  665. case .failProducers(let producerContinuations):
  666. for producerContinuation in producerContinuations {
  667. producerContinuation(.failure(AlreadyFinishedError()))
  668. }
  669. case .none:
  670. break
  671. }
  672. }
  673. @inlinable
  674. func write(
  675. contentsOf sequence: some Sequence<Element>
  676. ) throws -> Source.WriteResult {
  677. let action = self._stateMachine.withCriticalRegion {
  678. return $0.write(sequence)
  679. }
  680. switch action {
  681. case .returnProduceMore:
  682. return .produceMore
  683. case .returnEnqueue(let callbackToken):
  684. return .enqueueCallback(callbackToken)
  685. case .resumeConsumerAndReturnProduceMore(let continuation, let element):
  686. continuation.resume(returning: element)
  687. return .produceMore
  688. case .resumeConsumerAndReturnEnqueue(let continuation, let element, let callbackToken):
  689. continuation.resume(returning: element)
  690. return .enqueueCallback(callbackToken)
  691. case .throwFinishedError:
  692. throw AlreadyFinishedError()
  693. }
  694. }
  695. @inlinable
  696. func enqueueProducer(
  697. callbackToken: Source.WriteResult.CallbackToken,
  698. onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
  699. ) {
  700. let action = self._stateMachine.withCriticalRegion {
  701. $0.enqueueProducer(callbackToken: callbackToken, onProduceMore: onProduceMore)
  702. }
  703. switch action {
  704. case .resumeProducer(let onProduceMore):
  705. onProduceMore(Result<Void, Error>.success(()))
  706. case .resumeProducerWithError(let onProduceMore, let error):
  707. onProduceMore(Result<Void, Error>.failure(error))
  708. case .none:
  709. break
  710. }
  711. }
  712. @inlinable
  713. func cancelProducer(callbackToken: Source.WriteResult.CallbackToken) {
  714. let action = self._stateMachine.withCriticalRegion {
  715. $0.cancelProducer(callbackToken: callbackToken)
  716. }
  717. switch action {
  718. case .resumeProducerWithCancellationError(let onProduceMore):
  719. onProduceMore(Result<Void, Error>.failure(CancellationError()))
  720. case .none:
  721. break
  722. }
  723. }
  724. @inlinable
  725. func finish(_ failure: Error?) {
  726. let action = self._stateMachine.withCriticalRegion {
  727. $0.finish(failure)
  728. }
  729. switch action {
  730. case .callOnTermination(let onTermination):
  731. onTermination?()
  732. case .resumeConsumerAndCallOnTermination(
  733. let consumerContinuation,
  734. let failure,
  735. let onTermination
  736. ):
  737. switch failure {
  738. case .some(let error):
  739. consumerContinuation.resume(throwing: error)
  740. case .none:
  741. consumerContinuation.resume(returning: nil)
  742. }
  743. onTermination?()
  744. case .resumeProducers(let producerContinuations):
  745. for producerContinuation in producerContinuations {
  746. producerContinuation(.failure(AlreadyFinishedError()))
  747. }
  748. case .none:
  749. break
  750. }
  751. }
  752. @inlinable
  753. func next() async throws -> Element? {
  754. let action = self._stateMachine.withCriticalRegion {
  755. $0.next()
  756. }
  757. switch action {
  758. case .returnElement(let element):
  759. return element
  760. case .returnElementAndResumeProducers(let element, let producerContinuations):
  761. for producerContinuation in producerContinuations {
  762. producerContinuation(Result<Void, Error>.success(()))
  763. }
  764. return element
  765. case .returnErrorAndCallOnTermination(let failure, let onTermination):
  766. onTermination?()
  767. switch failure {
  768. case .some(let error):
  769. throw error
  770. case .none:
  771. return nil
  772. }
  773. case .returnNil:
  774. return nil
  775. case .suspendTask:
  776. return try await self.suspendNext()
  777. }
  778. }
  779. @inlinable
  780. func suspendNext() async throws -> Element? {
  781. return try await withTaskCancellationHandler {
  782. return try await withCheckedThrowingContinuation { continuation in
  783. let action = self._stateMachine.withCriticalRegion {
  784. $0.suspendNext(continuation: continuation)
  785. }
  786. switch action {
  787. case .resumeConsumerWithElement(let continuation, let element):
  788. continuation.resume(returning: element)
  789. case .resumeConsumerWithElementAndProducers(
  790. let continuation,
  791. let element,
  792. let producerContinuations
  793. ):
  794. continuation.resume(returning: element)
  795. for producerContinuation in producerContinuations {
  796. producerContinuation(Result<Void, Error>.success(()))
  797. }
  798. case .resumeConsumerWithErrorAndCallOnTermination(
  799. let continuation,
  800. let failure,
  801. let onTermination
  802. ):
  803. switch failure {
  804. case .some(let error):
  805. continuation.resume(throwing: error)
  806. case .none:
  807. continuation.resume(returning: nil)
  808. }
  809. onTermination?()
  810. case .resumeConsumerWithNil(let continuation):
  811. continuation.resume(returning: nil)
  812. case .none:
  813. break
  814. }
  815. }
  816. } onCancel: {
  817. let action = self._stateMachine.withCriticalRegion {
  818. $0.cancelNext()
  819. }
  820. switch action {
  821. case .resumeConsumerWithCancellationErrorAndCallOnTermination(
  822. let continuation,
  823. let onTermination
  824. ):
  825. continuation.resume(throwing: CancellationError())
  826. onTermination?()
  827. case .failProducersAndCallOnTermination(
  828. let producerContinuations,
  829. let onTermination
  830. ):
  831. for producerContinuation in producerContinuations {
  832. producerContinuation(.failure(AlreadyFinishedError()))
  833. }
  834. onTermination?()
  835. case .none:
  836. break
  837. }
  838. }
  839. }
  840. }
  841. }
  842. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  843. extension BufferedStream {
  844. /// The state machine of the backpressured async stream.
  845. @usableFromInline
  846. struct _StateMachine {
  847. @usableFromInline
  848. enum _State {
  849. @usableFromInline
  850. struct Initial {
  851. /// The backpressure strategy.
  852. @usableFromInline
  853. var backPressureStrategy: _InternalBackPressureStrategy
  854. /// Indicates if the iterator was initialized.
  855. @usableFromInline
  856. var iteratorInitialized: Bool
  857. /// The onTermination callback.
  858. @usableFromInline
  859. var onTermination: (@Sendable () -> Void)?
  860. @inlinable
  861. init(
  862. backPressureStrategy: _InternalBackPressureStrategy,
  863. iteratorInitialized: Bool,
  864. onTermination: (@Sendable () -> Void)? = nil
  865. ) {
  866. self.backPressureStrategy = backPressureStrategy
  867. self.iteratorInitialized = iteratorInitialized
  868. self.onTermination = onTermination
  869. }
  870. }
  871. @usableFromInline
  872. struct Streaming {
  873. /// The backpressure strategy.
  874. @usableFromInline
  875. var backPressureStrategy: _InternalBackPressureStrategy
  876. /// Indicates if the iterator was initialized.
  877. @usableFromInline
  878. var iteratorInitialized: Bool
  879. /// The onTermination callback.
  880. @usableFromInline
  881. var onTermination: (@Sendable () -> Void)?
  882. /// The buffer of elements.
  883. @usableFromInline
  884. var buffer: Deque<Element>
  885. /// The optional consumer continuation.
  886. @usableFromInline
  887. var consumerContinuation: CheckedContinuation<Element?, Error>?
  888. /// The producer continuations.
  889. @usableFromInline
  890. var producerContinuations: Deque<(UInt, (Result<Void, Error>) -> Void)>
  891. /// The producers that have been cancelled.
  892. @usableFromInline
  893. var cancelledAsyncProducers: Deque<UInt>
  894. /// Indicates if we currently have outstanding demand.
  895. @usableFromInline
  896. var hasOutstandingDemand: Bool
  897. @inlinable
  898. init(
  899. backPressureStrategy: _InternalBackPressureStrategy,
  900. iteratorInitialized: Bool,
  901. onTermination: (@Sendable () -> Void)? = nil,
  902. buffer: Deque<Element>,
  903. consumerContinuation: CheckedContinuation<Element?, Error>? = nil,
  904. producerContinuations: Deque<(UInt, (Result<Void, Error>) -> Void)>,
  905. cancelledAsyncProducers: Deque<UInt>,
  906. hasOutstandingDemand: Bool
  907. ) {
  908. self.backPressureStrategy = backPressureStrategy
  909. self.iteratorInitialized = iteratorInitialized
  910. self.onTermination = onTermination
  911. self.buffer = buffer
  912. self.consumerContinuation = consumerContinuation
  913. self.producerContinuations = producerContinuations
  914. self.cancelledAsyncProducers = cancelledAsyncProducers
  915. self.hasOutstandingDemand = hasOutstandingDemand
  916. }
  917. }
  918. @usableFromInline
  919. struct SourceFinished {
  920. /// Indicates if the iterator was initialized.
  921. @usableFromInline
  922. var iteratorInitialized: Bool
  923. /// The buffer of elements.
  924. @usableFromInline
  925. var buffer: Deque<Element>
  926. /// The failure that should be thrown after the last element has been consumed.
  927. @usableFromInline
  928. var failure: Error?
  929. /// The onTermination callback.
  930. @usableFromInline
  931. var onTermination: (@Sendable () -> Void)?
  932. @inlinable
  933. init(
  934. iteratorInitialized: Bool,
  935. buffer: Deque<Element>,
  936. failure: Error? = nil,
  937. onTermination: (@Sendable () -> Void)?
  938. ) {
  939. self.iteratorInitialized = iteratorInitialized
  940. self.buffer = buffer
  941. self.failure = failure
  942. self.onTermination = onTermination
  943. }
  944. }
  945. case initial(Initial)
  946. /// The state once either any element was yielded or `next()` was called.
  947. case streaming(Streaming)
  948. /// The state once the underlying source signalled that it is finished.
  949. case sourceFinished(SourceFinished)
  950. /// The state once there can be no outstanding demand. This can happen if:
  951. /// 1. The iterator was deinited
  952. /// 2. The underlying source finished and all buffered elements have been consumed
  953. case finished(iteratorInitialized: Bool)
  954. /// An intermediate state to avoid CoWs.
  955. case modify
  956. }
  957. /// The state machine's current state.
  958. @usableFromInline
  959. var _state: _State
  960. // The ID used for the next CallbackToken.
  961. @usableFromInline
  962. var nextCallbackTokenID: UInt = 0
  963. @inlinable
  964. var _onTermination: (@Sendable () -> Void)? {
  965. set {
  966. switch self._state {
  967. case .initial(var initial):
  968. initial.onTermination = newValue
  969. self._state = .initial(initial)
  970. case .streaming(var streaming):
  971. streaming.onTermination = newValue
  972. self._state = .streaming(streaming)
  973. case .sourceFinished(var sourceFinished):
  974. sourceFinished.onTermination = newValue
  975. self._state = .sourceFinished(sourceFinished)
  976. case .finished:
  977. break
  978. case .modify:
  979. fatalError("AsyncStream internal inconsistency")
  980. }
  981. }
  982. get {
  983. switch self._state {
  984. case .initial(let initial):
  985. return initial.onTermination
  986. case .streaming(let streaming):
  987. return streaming.onTermination
  988. case .sourceFinished(let sourceFinished):
  989. return sourceFinished.onTermination
  990. case .finished:
  991. return nil
  992. case .modify:
  993. fatalError("AsyncStream internal inconsistency")
  994. }
  995. }
  996. }
  997. /// Initializes a new `StateMachine`.
  998. ///
  999. /// We are passing and holding the back-pressure strategy here because
  1000. /// it is a customizable extension of the state machine.
  1001. ///
  1002. /// - Parameter backPressureStrategy: The back-pressure strategy.
  1003. @inlinable
  1004. init(
  1005. backPressureStrategy: _InternalBackPressureStrategy
  1006. ) {
  1007. self._state = .initial(
  1008. .init(
  1009. backPressureStrategy: backPressureStrategy,
  1010. iteratorInitialized: false
  1011. )
  1012. )
  1013. }
  1014. /// Generates the next callback token.
  1015. @inlinable
  1016. mutating func nextCallbackToken() -> Source.WriteResult.CallbackToken {
  1017. let id = self.nextCallbackTokenID
  1018. self.nextCallbackTokenID += 1
  1019. return .init(id: id)
  1020. }
  1021. /// Actions returned by `sequenceDeinitialized()`.
  1022. @usableFromInline
  1023. enum SequenceDeinitializedAction {
  1024. /// Indicates that `onTermination` should be called.
  1025. case callOnTermination((@Sendable () -> Void)?)
  1026. /// Indicates that all producers should be failed and `onTermination` should be called.
  1027. case failProducersAndCallOnTermination(
  1028. [(Result<Void, Error>) -> Void],
  1029. (@Sendable () -> Void)?
  1030. )
  1031. }
  1032. @inlinable
  1033. mutating func sequenceDeinitialized() -> SequenceDeinitializedAction? {
  1034. switch self._state {
  1035. case .initial(let initial):
  1036. if initial.iteratorInitialized {
  1037. // An iterator was created and we deinited the sequence.
  1038. // This is an expected pattern and we just continue on normal.
  1039. return .none
  1040. } else {
  1041. // No iterator was created so we can transition to finished right away.
  1042. self._state = .finished(iteratorInitialized: false)
  1043. return .callOnTermination(initial.onTermination)
  1044. }
  1045. case .streaming(let streaming):
  1046. if streaming.iteratorInitialized {
  1047. // An iterator was created and we deinited the sequence.
  1048. // This is an expected pattern and we just continue on normal.
  1049. return .none
  1050. } else {
  1051. // No iterator was created so we can transition to finished right away.
  1052. self._state = .finished(iteratorInitialized: false)
  1053. return .failProducersAndCallOnTermination(
  1054. Array(streaming.producerContinuations.map { $0.1 }),
  1055. streaming.onTermination
  1056. )
  1057. }
  1058. case .sourceFinished(let sourceFinished):
  1059. if sourceFinished.iteratorInitialized {
  1060. // An iterator was created and we deinited the sequence.
  1061. // This is an expected pattern and we just continue on normal.
  1062. return .none
  1063. } else {
  1064. // No iterator was created so we can transition to finished right away.
  1065. self._state = .finished(iteratorInitialized: false)
  1066. return .callOnTermination(sourceFinished.onTermination)
  1067. }
  1068. case .finished:
  1069. // We are already finished so there is nothing left to clean up.
  1070. // This is just the references dropping afterwards.
  1071. return .none
  1072. case .modify:
  1073. fatalError("AsyncStream internal inconsistency")
  1074. }
  1075. }
  1076. @inlinable
  1077. mutating func iteratorInitialized() {
  1078. switch self._state {
  1079. case .initial(var initial):
  1080. if initial.iteratorInitialized {
  1081. // Our sequence is a unicast sequence and does not support multiple AsyncIterator's
  1082. fatalError("Only a single AsyncIterator can be created")
  1083. } else {
  1084. // The first and only iterator was initialized.
  1085. initial.iteratorInitialized = true
  1086. self._state = .initial(initial)
  1087. }
  1088. case .streaming(var streaming):
  1089. if streaming.iteratorInitialized {
  1090. // Our sequence is a unicast sequence and does not support multiple AsyncIterator's
  1091. fatalError("Only a single AsyncIterator can be created")
  1092. } else {
  1093. // The first and only iterator was initialized.
  1094. streaming.iteratorInitialized = true
  1095. self._state = .streaming(streaming)
  1096. }
  1097. case .sourceFinished(var sourceFinished):
  1098. if sourceFinished.iteratorInitialized {
  1099. // Our sequence is a unicast sequence and does not support multiple AsyncIterator's
  1100. fatalError("Only a single AsyncIterator can be created")
  1101. } else {
  1102. // The first and only iterator was initialized.
  1103. sourceFinished.iteratorInitialized = true
  1104. self._state = .sourceFinished(sourceFinished)
  1105. }
  1106. case .finished(iteratorInitialized: true):
  1107. // Our sequence is a unicast sequence and does not support multiple AsyncIterator's
  1108. fatalError("Only a single AsyncIterator can be created")
  1109. case .finished(iteratorInitialized: false):
  1110. // It is strange that an iterator is created after we are finished
  1111. // but it can definitely happen, e.g.
  1112. // Sequence.init -> source.finish -> sequence.makeAsyncIterator
  1113. self._state = .finished(iteratorInitialized: true)
  1114. case .modify:
  1115. fatalError("AsyncStream internal inconsistency")
  1116. }
  1117. }
  1118. /// Actions returned by `iteratorDeinitialized()`.
  1119. @usableFromInline
  1120. enum IteratorDeinitializedAction {
  1121. /// Indicates that `onTermination` should be called.
  1122. case callOnTermination((@Sendable () -> Void)?)
  1123. /// Indicates that all producers should be failed and `onTermination` should be called.
  1124. case failProducersAndCallOnTermination(
  1125. [(Result<Void, Error>) -> Void],
  1126. (@Sendable () -> Void)?
  1127. )
  1128. }
  1129. @inlinable
  1130. mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? {
  1131. switch self._state {
  1132. case .initial(let initial):
  1133. if initial.iteratorInitialized {
  1134. // An iterator was created and deinited. Since we only support
  1135. // a single iterator we can now transition to finish.
  1136. self._state = .finished(iteratorInitialized: true)
  1137. return .callOnTermination(initial.onTermination)
  1138. } else {
  1139. // An iterator needs to be initialized before it can be deinitialized.
  1140. fatalError("AsyncStream internal inconsistency")
  1141. }
  1142. case .streaming(let streaming):
  1143. if streaming.iteratorInitialized {
  1144. // An iterator was created and deinited. Since we only support
  1145. // a single iterator we can now transition to finish.
  1146. self._state = .finished(iteratorInitialized: true)
  1147. return .failProducersAndCallOnTermination(
  1148. Array(streaming.producerContinuations.map { $0.1 }),
  1149. streaming.onTermination
  1150. )
  1151. } else {
  1152. // An iterator needs to be initialized before it can be deinitialized.
  1153. fatalError("AsyncStream internal inconsistency")
  1154. }
  1155. case .sourceFinished(let sourceFinished):
  1156. if sourceFinished.iteratorInitialized {
  1157. // An iterator was created and deinited. Since we only support
  1158. // a single iterator we can now transition to finish.
  1159. self._state = .finished(iteratorInitialized: true)
  1160. return .callOnTermination(sourceFinished.onTermination)
  1161. } else {
  1162. // An iterator needs to be initialized before it can be deinitialized.
  1163. fatalError("AsyncStream internal inconsistency")
  1164. }
  1165. case .finished:
  1166. // We are already finished so there is nothing left to clean up.
  1167. // This is just the references dropping afterwards.
  1168. return .none
  1169. case .modify:
  1170. fatalError("AsyncStream internal inconsistency")
  1171. }
  1172. }
  1173. /// Actions returned by `sourceDeinitialized()`.
  1174. @usableFromInline
  1175. enum SourceDeinitializedAction {
  1176. /// Indicates that `onTermination` should be called.
  1177. case callOnTermination((() -> Void)?)
  1178. /// Indicates that all producers should be failed and `onTermination` should be called.
  1179. case failProducersAndCallOnTermination(
  1180. CheckedContinuation<Element?, Error>?,
  1181. [(Result<Void, Error>) -> Void],
  1182. (@Sendable () -> Void)?
  1183. )
  1184. /// Indicates that all producers should be failed.
  1185. case failProducers([(Result<Void, Error>) -> Void])
  1186. }
  1187. @inlinable
  1188. mutating func sourceDeinitialized() -> SourceDeinitializedAction? {
  1189. switch self._state {
  1190. case .initial(let initial):
  1191. // The source got deinited before anything was written
  1192. self._state = .finished(iteratorInitialized: initial.iteratorInitialized)
  1193. return .callOnTermination(initial.onTermination)
  1194. case .streaming(let streaming):
  1195. if streaming.buffer.isEmpty {
  1196. // We can transition to finished right away since the buffer is empty now
  1197. self._state = .finished(iteratorInitialized: streaming.iteratorInitialized)
  1198. return .failProducersAndCallOnTermination(
  1199. streaming.consumerContinuation,
  1200. Array(streaming.producerContinuations.map { $0.1 }),
  1201. streaming.onTermination
  1202. )
  1203. } else {
  1204. // The continuation must be `nil` if the buffer has elements
  1205. precondition(streaming.consumerContinuation == nil)
  1206. self._state = .sourceFinished(
  1207. .init(
  1208. iteratorInitialized: streaming.iteratorInitialized,
  1209. buffer: streaming.buffer,
  1210. failure: nil,
  1211. onTermination: streaming.onTermination
  1212. )
  1213. )
  1214. return .failProducers(
  1215. Array(streaming.producerContinuations.map { $0.1 })
  1216. )
  1217. }
  1218. case .sourceFinished, .finished:
  1219. // This is normal and we just have to tolerate it
  1220. return .none
  1221. case .modify:
  1222. fatalError("AsyncStream internal inconsistency")
  1223. }
  1224. }
  1225. /// Actions returned by `write()`.
  1226. @usableFromInline
  1227. enum WriteAction {
  1228. /// Indicates that the producer should be notified to produce more.
  1229. case returnProduceMore
  1230. /// Indicates that the producer should be suspended to stop producing.
  1231. case returnEnqueue(
  1232. callbackToken: Source.WriteResult.CallbackToken
  1233. )
  1234. /// Indicates that the consumer should be resumed and the producer should be notified to produce more.
  1235. case resumeConsumerAndReturnProduceMore(
  1236. continuation: CheckedContinuation<Element?, Error>,
  1237. element: Element
  1238. )
  1239. /// Indicates that the consumer should be resumed and the producer should be suspended.
  1240. case resumeConsumerAndReturnEnqueue(
  1241. continuation: CheckedContinuation<Element?, Error>,
  1242. element: Element,
  1243. callbackToken: Source.WriteResult.CallbackToken
  1244. )
  1245. /// Indicates that the producer has been finished.
  1246. case throwFinishedError
  1247. @inlinable
  1248. init(
  1249. callbackToken: Source.WriteResult.CallbackToken?,
  1250. continuationAndElement: (CheckedContinuation<Element?, Error>, Element)? = nil
  1251. ) {
  1252. switch (callbackToken, continuationAndElement) {
  1253. case (.none, .none):
  1254. self = .returnProduceMore
  1255. case (.some(let callbackToken), .none):
  1256. self = .returnEnqueue(callbackToken: callbackToken)
  1257. case (.none, .some((let continuation, let element))):
  1258. self = .resumeConsumerAndReturnProduceMore(
  1259. continuation: continuation,
  1260. element: element
  1261. )
  1262. case (.some(let callbackToken), .some((let continuation, let element))):
  1263. self = .resumeConsumerAndReturnEnqueue(
  1264. continuation: continuation,
  1265. element: element,
  1266. callbackToken: callbackToken
  1267. )
  1268. }
  1269. }
  1270. }
  1271. @inlinable
  1272. mutating func write(_ sequence: some Sequence<Element>) -> WriteAction {
  1273. switch self._state {
  1274. case .initial(var initial):
  1275. var buffer = Deque<Element>()
  1276. buffer.append(contentsOf: sequence)
  1277. let shouldProduceMore = initial.backPressureStrategy.didYield(
  1278. bufferDepth: buffer.count
  1279. )
  1280. let callbackToken = shouldProduceMore ? nil : self.nextCallbackToken()
  1281. self._state = .streaming(
  1282. .init(
  1283. backPressureStrategy: initial.backPressureStrategy,
  1284. iteratorInitialized: initial.iteratorInitialized,
  1285. onTermination: initial.onTermination,
  1286. buffer: buffer,
  1287. consumerContinuation: nil,
  1288. producerContinuations: .init(),
  1289. cancelledAsyncProducers: .init(),
  1290. hasOutstandingDemand: shouldProduceMore
  1291. )
  1292. )
  1293. return .init(callbackToken: callbackToken)
  1294. case .streaming(var streaming):
  1295. self._state = .modify
  1296. streaming.buffer.append(contentsOf: sequence)
  1297. // We have an element and can resume the continuation
  1298. let shouldProduceMore = streaming.backPressureStrategy.didYield(
  1299. bufferDepth: streaming.buffer.count
  1300. )
  1301. streaming.hasOutstandingDemand = shouldProduceMore
  1302. let callbackToken = shouldProduceMore ? nil : self.nextCallbackToken()
  1303. if let consumerContinuation = streaming.consumerContinuation {
  1304. guard let element = streaming.buffer.popFirst() else {
  1305. // We got a yield of an empty sequence. We just tolerate this.
  1306. self._state = .streaming(streaming)
  1307. return .init(callbackToken: callbackToken)
  1308. }
  1309. // We got a consumer continuation and an element. We can resume the consumer now
  1310. streaming.consumerContinuation = nil
  1311. self._state = .streaming(streaming)
  1312. return .init(
  1313. callbackToken: callbackToken,
  1314. continuationAndElement: (consumerContinuation, element)
  1315. )
  1316. } else {
  1317. // We don't have a suspended consumer so we just buffer the elements
  1318. self._state = .streaming(streaming)
  1319. return .init(
  1320. callbackToken: callbackToken
  1321. )
  1322. }
  1323. case .sourceFinished, .finished:
  1324. // If the source has finished we are dropping the elements.
  1325. return .throwFinishedError
  1326. case .modify:
  1327. fatalError("AsyncStream internal inconsistency")
  1328. }
  1329. }
  1330. /// Actions returned by `enqueueProducer()`.
  1331. @usableFromInline
  1332. enum EnqueueProducerAction {
  1333. /// Indicates that the producer should be notified to produce more.
  1334. case resumeProducer((Result<Void, Error>) -> Void)
  1335. /// Indicates that the producer should be notified about an error.
  1336. case resumeProducerWithError((Result<Void, Error>) -> Void, Error)
  1337. }
  1338. @inlinable
  1339. mutating func enqueueProducer(
  1340. callbackToken: Source.WriteResult.CallbackToken,
  1341. onProduceMore: @Sendable @escaping (Result<Void, Error>) -> Void
  1342. ) -> EnqueueProducerAction? {
  1343. switch self._state {
  1344. case .initial:
  1345. // We need to transition to streaming before we can suspend
  1346. // This is enforced because the CallbackToken has no internal init so
  1347. // one must create it by calling `write` first.
  1348. fatalError("AsyncStream internal inconsistency")
  1349. case .streaming(var streaming):
  1350. if let index = streaming.cancelledAsyncProducers.firstIndex(of: callbackToken.id) {
  1351. // Our producer got marked as cancelled.
  1352. self._state = .modify
  1353. streaming.cancelledAsyncProducers.remove(at: index)
  1354. self._state = .streaming(streaming)
  1355. return .resumeProducerWithError(onProduceMore, CancellationError())
  1356. } else if streaming.hasOutstandingDemand {
  1357. // We hit an edge case here where we wrote but the consuming thread got interleaved
  1358. return .resumeProducer(onProduceMore)
  1359. } else {
  1360. self._state = .modify
  1361. streaming.producerContinuations.append((callbackToken.id, onProduceMore))
  1362. self._state = .streaming(streaming)
  1363. return .none
  1364. }
  1365. case .sourceFinished, .finished:
  1366. // Since we are unlocking between yielding and suspending the yield
  1367. // It can happen that the source got finished or the consumption fully finishes.
  1368. return .resumeProducerWithError(onProduceMore, AlreadyFinishedError())
  1369. case .modify:
  1370. fatalError("AsyncStream internal inconsistency")
  1371. }
  1372. }
  1373. /// Actions returned by `cancelProducer()`.
  1374. @usableFromInline
  1375. enum CancelProducerAction {
  1376. /// Indicates that the producer should be notified about cancellation.
  1377. case resumeProducerWithCancellationError((Result<Void, Error>) -> Void)
  1378. }
  1379. @inlinable
  1380. mutating func cancelProducer(
  1381. callbackToken: Source.WriteResult.CallbackToken
  1382. ) -> CancelProducerAction? {
  1383. switch self._state {
  1384. case .initial:
  1385. // We need to transition to streaming before we can suspend
  1386. fatalError("AsyncStream internal inconsistency")
  1387. case .streaming(var streaming):
  1388. if let index = streaming.producerContinuations.firstIndex(where: {
  1389. $0.0 == callbackToken.id
  1390. }) {
  1391. // We have an enqueued producer that we need to resume now
  1392. self._state = .modify
  1393. let continuation = streaming.producerContinuations.remove(at: index).1
  1394. self._state = .streaming(streaming)
  1395. return .resumeProducerWithCancellationError(continuation)
  1396. } else {
  1397. // The task that yields was cancelled before yielding so the cancellation handler
  1398. // got invoked right away
  1399. self._state = .modify
  1400. streaming.cancelledAsyncProducers.append(callbackToken.id)
  1401. self._state = .streaming(streaming)
  1402. return .none
  1403. }
  1404. case .sourceFinished, .finished:
  1405. // Since we are unlocking between yielding and suspending the yield
  1406. // It can happen that the source got finished or the consumption fully finishes.
  1407. return .none
  1408. case .modify:
  1409. fatalError("AsyncStream internal inconsistency")
  1410. }
  1411. }
  1412. /// Actions returned by `finish()`.
  1413. @usableFromInline
  1414. enum FinishAction {
  1415. /// Indicates that `onTermination` should be called.
  1416. case callOnTermination((() -> Void)?)
  1417. /// Indicates that the consumer should be resumed with the failure, the producers
  1418. /// should be resumed with an error and `onTermination` should be called.
  1419. case resumeConsumerAndCallOnTermination(
  1420. consumerContinuation: CheckedContinuation<Element?, Error>,
  1421. failure: Error?,
  1422. onTermination: (() -> Void)?
  1423. )
  1424. /// Indicates that the producers should be resumed with an error.
  1425. case resumeProducers(
  1426. producerContinuations: [(Result<Void, Error>) -> Void]
  1427. )
  1428. }
  1429. @inlinable
  1430. mutating func finish(_ failure: Error?) -> FinishAction? {
  1431. switch self._state {
  1432. case .initial(let initial):
  1433. // Nothing was yielded nor did anybody call next
  1434. // This means we can transition to sourceFinished and store the failure
  1435. self._state = .sourceFinished(
  1436. .init(
  1437. iteratorInitialized: initial.iteratorInitialized,
  1438. buffer: .init(),
  1439. failure: failure,
  1440. onTermination: initial.onTermination
  1441. )
  1442. )
  1443. return .callOnTermination(initial.onTermination)
  1444. case .streaming(let streaming):
  1445. if let consumerContinuation = streaming.consumerContinuation {
  1446. // We have a continuation, this means our buffer must be empty
  1447. // Furthermore, we can now transition to finished
  1448. // and resume the continuation with the failure
  1449. precondition(streaming.buffer.isEmpty, "Expected an empty buffer")
  1450. precondition(
  1451. streaming.producerContinuations.isEmpty,
  1452. "Expected no suspended producers"
  1453. )
  1454. self._state = .finished(iteratorInitialized: streaming.iteratorInitialized)
  1455. return .resumeConsumerAndCallOnTermination(
  1456. consumerContinuation: consumerContinuation,
  1457. failure: failure,
  1458. onTermination: streaming.onTermination
  1459. )
  1460. } else {
  1461. self._state = .sourceFinished(
  1462. .init(
  1463. iteratorInitialized: streaming.iteratorInitialized,
  1464. buffer: streaming.buffer,
  1465. failure: failure,
  1466. onTermination: streaming.onTermination
  1467. )
  1468. )
  1469. return .resumeProducers(
  1470. producerContinuations: Array(streaming.producerContinuations.map { $0.1 })
  1471. )
  1472. }
  1473. case .sourceFinished, .finished:
  1474. // If the source has finished, finishing again has no effect.
  1475. return .none
  1476. case .modify:
  1477. fatalError("AsyncStream internal inconsistency")
  1478. }
  1479. }
  1480. /// Actions returned by `next()`.
  1481. @usableFromInline
  1482. enum NextAction {
  1483. /// Indicates that the element should be returned to the caller.
  1484. case returnElement(Element)
  1485. /// Indicates that the element should be returned to the caller and that all producers should be called.
  1486. case returnElementAndResumeProducers(Element, [(Result<Void, Error>) -> Void])
  1487. /// Indicates that the `Error` should be returned to the caller and that `onTermination` should be called.
  1488. case returnErrorAndCallOnTermination(Error?, (() -> Void)?)
  1489. /// Indicates that the `nil` should be returned to the caller.
  1490. case returnNil
  1491. /// Indicates that the `Task` of the caller should be suspended.
  1492. case suspendTask
  1493. }
  1494. @inlinable
  1495. mutating func next() -> NextAction {
  1496. switch self._state {
  1497. case .initial(let initial):
  1498. // We are not interacting with the back-pressure strategy here because
  1499. // we are doing this inside `next(:)`
  1500. self._state = .streaming(
  1501. .init(
  1502. backPressureStrategy: initial.backPressureStrategy,
  1503. iteratorInitialized: initial.iteratorInitialized,
  1504. onTermination: initial.onTermination,
  1505. buffer: Deque<Element>(),
  1506. consumerContinuation: nil,
  1507. producerContinuations: .init(),
  1508. cancelledAsyncProducers: .init(),
  1509. hasOutstandingDemand: false
  1510. )
  1511. )
  1512. return .suspendTask
  1513. case .streaming(var streaming):
  1514. guard streaming.consumerContinuation == nil else {
  1515. // We have multiple AsyncIterators iterating the sequence
  1516. fatalError("AsyncStream internal inconsistency")
  1517. }
  1518. self._state = .modify
  1519. if let element = streaming.buffer.popFirst() {
  1520. // We have an element to fulfil the demand right away.
  1521. let shouldProduceMore = streaming.backPressureStrategy.didConsume(
  1522. bufferDepth: streaming.buffer.count
  1523. )
  1524. streaming.hasOutstandingDemand = shouldProduceMore
  1525. if shouldProduceMore {
  1526. // There is demand and we have to resume our producers
  1527. let producers = Array(streaming.producerContinuations.map { $0.1 })
  1528. streaming.producerContinuations.removeAll()
  1529. self._state = .streaming(streaming)
  1530. return .returnElementAndResumeProducers(element, producers)
  1531. } else {
  1532. // We don't have any new demand, so we can just return the element.
  1533. self._state = .streaming(streaming)
  1534. return .returnElement(element)
  1535. }
  1536. } else {
  1537. // There is nothing in the buffer to fulfil the demand so we need to suspend.
  1538. // We are not interacting with the back-pressure strategy here because
  1539. // we are doing this inside `suspendNext`
  1540. self._state = .streaming(streaming)
  1541. return .suspendTask
  1542. }
  1543. case .sourceFinished(var sourceFinished):
  1544. // Check if we have an element left in the buffer and return it
  1545. self._state = .modify
  1546. if let element = sourceFinished.buffer.popFirst() {
  1547. self._state = .sourceFinished(sourceFinished)
  1548. return .returnElement(element)
  1549. } else {
  1550. // We are returning the queued failure now and can transition to finished
  1551. self._state = .finished(iteratorInitialized: sourceFinished.iteratorInitialized)
  1552. return .returnErrorAndCallOnTermination(
  1553. sourceFinished.failure,
  1554. sourceFinished.onTermination
  1555. )
  1556. }
  1557. case .finished:
  1558. return .returnNil
  1559. case .modify:
  1560. fatalError("AsyncStream internal inconsistency")
  1561. }
  1562. }
  1563. /// Actions returned by `suspendNext()`.
  1564. @usableFromInline
  1565. enum SuspendNextAction {
  1566. /// Indicates that the consumer should be resumed.
  1567. case resumeConsumerWithElement(CheckedContinuation<Element?, Error>, Element)
  1568. /// Indicates that the consumer and all producers should be resumed.
  1569. case resumeConsumerWithElementAndProducers(
  1570. CheckedContinuation<Element?, Error>,
  1571. Element,
  1572. [(Result<Void, Error>) -> Void]
  1573. )
  1574. /// Indicates that the consumer should be resumed with the failure and that `onTermination` should be called.
  1575. case resumeConsumerWithErrorAndCallOnTermination(
  1576. CheckedContinuation<Element?, Error>,
  1577. Error?,
  1578. (() -> Void)?
  1579. )
  1580. /// Indicates that the consumer should be resumed with `nil`.
  1581. case resumeConsumerWithNil(CheckedContinuation<Element?, Error>)
  1582. }
  1583. @inlinable
  1584. mutating func suspendNext(
  1585. continuation: CheckedContinuation<Element?, Error>
  1586. ) -> SuspendNextAction? {
  1587. switch self._state {
  1588. case .initial:
  1589. // We need to transition to streaming before we can suspend
  1590. preconditionFailure("AsyncStream internal inconsistency")
  1591. case .streaming(var streaming):
  1592. guard streaming.consumerContinuation == nil else {
  1593. // We have multiple AsyncIterators iterating the sequence
  1594. fatalError(
  1595. "This should never happen since we only allow a single Iterator to be created"
  1596. )
  1597. }
  1598. self._state = .modify
  1599. // We have to check here again since we might have a producer interleave next and suspendNext
  1600. if let element = streaming.buffer.popFirst() {
  1601. // We have an element to fulfil the demand right away.
  1602. let shouldProduceMore = streaming.backPressureStrategy.didConsume(
  1603. bufferDepth: streaming.buffer.count
  1604. )
  1605. streaming.hasOutstandingDemand = shouldProduceMore
  1606. if shouldProduceMore {
  1607. // There is demand and we have to resume our producers
  1608. let producers = Array(streaming.producerContinuations.map { $0.1 })
  1609. streaming.producerContinuations.removeAll()
  1610. self._state = .streaming(streaming)
  1611. return .resumeConsumerWithElementAndProducers(
  1612. continuation,
  1613. element,
  1614. producers
  1615. )
  1616. } else {
  1617. // We don't have any new demand, so we can just return the element.
  1618. self._state = .streaming(streaming)
  1619. return .resumeConsumerWithElement(continuation, element)
  1620. }
  1621. } else {
  1622. // There is nothing in the buffer to fulfil the demand so we to store the continuation.
  1623. streaming.consumerContinuation = continuation
  1624. self._state = .streaming(streaming)
  1625. return .none
  1626. }
  1627. case .sourceFinished(var sourceFinished):
  1628. // Check if we have an element left in the buffer and return it
  1629. self._state = .modify
  1630. if let element = sourceFinished.buffer.popFirst() {
  1631. self._state = .sourceFinished(sourceFinished)
  1632. return .resumeConsumerWithElement(continuation, element)
  1633. } else {
  1634. // We are returning the queued failure now and can transition to finished
  1635. self._state = .finished(iteratorInitialized: sourceFinished.iteratorInitialized)
  1636. return .resumeConsumerWithErrorAndCallOnTermination(
  1637. continuation,
  1638. sourceFinished.failure,
  1639. sourceFinished.onTermination
  1640. )
  1641. }
  1642. case .finished:
  1643. return .resumeConsumerWithNil(continuation)
  1644. case .modify:
  1645. fatalError("AsyncStream internal inconsistency")
  1646. }
  1647. }
  1648. /// Actions returned by `cancelNext()`.
  1649. @usableFromInline
  1650. enum CancelNextAction {
  1651. /// Indicates that the continuation should be resumed with a cancellation error, the producers should be finished and call onTermination.
  1652. case resumeConsumerWithCancellationErrorAndCallOnTermination(
  1653. CheckedContinuation<Element?, Error>,
  1654. (() -> Void)?
  1655. )
  1656. /// Indicates that the producers should be finished and call onTermination.
  1657. case failProducersAndCallOnTermination([(Result<Void, Error>) -> Void], (() -> Void)?)
  1658. }
  1659. @inlinable
  1660. mutating func cancelNext() -> CancelNextAction? {
  1661. switch self._state {
  1662. case .initial:
  1663. // We need to transition to streaming before we can suspend
  1664. fatalError("AsyncStream internal inconsistency")
  1665. case .streaming(let streaming):
  1666. self._state = .finished(iteratorInitialized: streaming.iteratorInitialized)
  1667. if let consumerContinuation = streaming.consumerContinuation {
  1668. precondition(
  1669. streaming.producerContinuations.isEmpty,
  1670. "Internal inconsistency. Unexpected producer continuations."
  1671. )
  1672. return .resumeConsumerWithCancellationErrorAndCallOnTermination(
  1673. consumerContinuation,
  1674. streaming.onTermination
  1675. )
  1676. } else {
  1677. return .failProducersAndCallOnTermination(
  1678. Array(streaming.producerContinuations.map { $0.1 }),
  1679. streaming.onTermination
  1680. )
  1681. }
  1682. case .sourceFinished, .finished:
  1683. return .none
  1684. case .modify:
  1685. fatalError("AsyncStream internal inconsistency")
  1686. }
  1687. }
  1688. }
  1689. }
  1690. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  1691. extension BufferedStream.Source: ClosableRPCWriterProtocol {
  1692. @inlinable
  1693. func finish() {
  1694. self.finish(throwing: nil)
  1695. }
  1696. @inlinable
  1697. func finish(throwing error: Error) {
  1698. self.finish(throwing: error as Error?)
  1699. }
  1700. }