BufferedStreamTests.swift 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import XCTest
  17. @testable import GRPCCore
  18. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  19. final class BufferedStreamTests: XCTestCase {
  20. // MARK: - sequenceDeinitialized
  21. func testSequenceDeinitialized_whenNoIterator() async throws {
  22. var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
  23. of: Int.self,
  24. backPressureStrategy: .watermark(low: 5, high: 10)
  25. )
  26. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  27. source.onTermination = {
  28. onTerminationContinuation.finish()
  29. }
  30. await withThrowingTaskGroup(of: Void.self) { group in
  31. group.addTask {
  32. while !Task.isCancelled {
  33. onTerminationContinuation.yield()
  34. try await Task.sleep(nanoseconds: 200_000_000)
  35. }
  36. }
  37. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  38. _ = await onTerminationIterator.next()
  39. withExtendedLifetime(stream) {}
  40. stream = nil
  41. let terminationResult: Void? = await onTerminationIterator.next()
  42. XCTAssertNil(terminationResult)
  43. do {
  44. _ = try { try source.write(2) }()
  45. XCTFail("Expected an error to be thrown")
  46. } catch {
  47. XCTAssertTrue(error is AlreadyFinishedError)
  48. }
  49. group.cancelAll()
  50. }
  51. }
  52. func testSequenceDeinitialized_whenIterator() async throws {
  53. var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
  54. of: Int.self,
  55. backPressureStrategy: .watermark(low: 5, high: 10)
  56. )
  57. var iterator = stream?.makeAsyncIterator()
  58. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  59. source.onTermination = {
  60. onTerminationContinuation.finish()
  61. }
  62. try await withThrowingTaskGroup(of: Void.self) { group in
  63. group.addTask {
  64. while !Task.isCancelled {
  65. onTerminationContinuation.yield()
  66. try await Task.sleep(nanoseconds: 200_000_000)
  67. }
  68. }
  69. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  70. _ = await onTerminationIterator.next()
  71. try withExtendedLifetime(stream) {
  72. let writeResult = try source.write(1)
  73. writeResult.assertIsProducerMore()
  74. }
  75. stream = nil
  76. do {
  77. let writeResult = try { try source.write(2) }()
  78. writeResult.assertIsProducerMore()
  79. } catch {
  80. XCTFail("Expected no error to be thrown")
  81. }
  82. let element1 = try await iterator?.next()
  83. XCTAssertEqual(element1, 1)
  84. let element2 = try await iterator?.next()
  85. XCTAssertEqual(element2, 2)
  86. group.cancelAll()
  87. }
  88. }
  89. func testSequenceDeinitialized_whenFinished() async throws {
  90. var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
  91. of: Int.self,
  92. backPressureStrategy: .watermark(low: 5, high: 10)
  93. )
  94. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  95. source.onTermination = {
  96. onTerminationContinuation.finish()
  97. }
  98. await withThrowingTaskGroup(of: Void.self) { group in
  99. group.addTask {
  100. while !Task.isCancelled {
  101. onTerminationContinuation.yield()
  102. try await Task.sleep(nanoseconds: 200_000_000)
  103. }
  104. }
  105. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  106. _ = await onTerminationIterator.next()
  107. withExtendedLifetime(stream) {
  108. source.finish(throwing: nil)
  109. }
  110. stream = nil
  111. let terminationResult: Void? = await onTerminationIterator.next()
  112. XCTAssertNil(terminationResult)
  113. do {
  114. _ = try { try source.write(1) }()
  115. XCTFail("Expected an error to be thrown")
  116. } catch {
  117. XCTAssertTrue(error is AlreadyFinishedError)
  118. }
  119. group.cancelAll()
  120. }
  121. }
  122. func testSequenceDeinitialized_whenStreaming_andSuspendedProducer() async throws {
  123. var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
  124. of: Int.self,
  125. backPressureStrategy: .watermark(low: 1, high: 2)
  126. )
  127. _ = try { try source.write(1) }()
  128. do {
  129. try await withCheckedThrowingContinuation { continuation in
  130. source.write(1) { result in
  131. continuation.resume(with: result)
  132. }
  133. stream = nil
  134. _ = stream?.makeAsyncIterator()
  135. }
  136. } catch {
  137. XCTAssertTrue(error is AlreadyFinishedError)
  138. }
  139. }
  140. // MARK: - iteratorInitialized
  141. func testIteratorInitialized_whenInitial() async throws {
  142. let (stream, _) = BufferedStream.makeStream(
  143. of: Int.self,
  144. backPressureStrategy: .watermark(low: 5, high: 10)
  145. )
  146. _ = stream.makeAsyncIterator()
  147. }
  148. func testIteratorInitialized_whenStreaming() async throws {
  149. let (stream, source) = BufferedStream.makeStream(
  150. of: Int.self,
  151. backPressureStrategy: .watermark(low: 5, high: 10)
  152. )
  153. try await source.write(1)
  154. var iterator = stream.makeAsyncIterator()
  155. let element = try await iterator.next()
  156. XCTAssertEqual(element, 1)
  157. }
  158. func testIteratorInitialized_whenSourceFinished() async throws {
  159. let (stream, source) = BufferedStream.makeStream(
  160. of: Int.self,
  161. backPressureStrategy: .watermark(low: 5, high: 10)
  162. )
  163. try await source.write(1)
  164. source.finish(throwing: nil)
  165. var iterator = stream.makeAsyncIterator()
  166. let element1 = try await iterator.next()
  167. XCTAssertEqual(element1, 1)
  168. let element2 = try await iterator.next()
  169. XCTAssertNil(element2)
  170. }
  171. func testIteratorInitialized_whenFinished() async throws {
  172. let (stream, source) = BufferedStream.makeStream(
  173. of: Int.self,
  174. backPressureStrategy: .watermark(low: 5, high: 10)
  175. )
  176. source.finish(throwing: nil)
  177. var iterator = stream.makeAsyncIterator()
  178. let element = try await iterator.next()
  179. XCTAssertNil(element)
  180. }
  181. // MARK: - iteratorDeinitialized
  182. func testIteratorDeinitialized_whenInitial() async throws {
  183. var (stream, source) = BufferedStream.makeStream(
  184. of: Int.self,
  185. backPressureStrategy: .watermark(low: 5, high: 10)
  186. )
  187. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  188. source.onTermination = {
  189. onTerminationContinuation.finish()
  190. }
  191. try await withThrowingTaskGroup(of: Void.self) { group in
  192. group.addTask {
  193. while !Task.isCancelled {
  194. onTerminationContinuation.yield()
  195. try await Task.sleep(nanoseconds: 200_000_000)
  196. }
  197. }
  198. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  199. _ = await onTerminationIterator.next()
  200. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  201. iterator = nil
  202. _ = try await iterator?.next()
  203. let terminationResult: Void? = await onTerminationIterator.next()
  204. XCTAssertNil(terminationResult)
  205. group.cancelAll()
  206. }
  207. }
  208. func testIteratorDeinitialized_whenStreaming() async throws {
  209. var (stream, source) = BufferedStream.makeStream(
  210. of: Int.self,
  211. backPressureStrategy: .watermark(low: 5, high: 10)
  212. )
  213. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  214. source.onTermination = {
  215. onTerminationContinuation.finish()
  216. }
  217. try await source.write(1)
  218. try await withThrowingTaskGroup(of: Void.self) { group in
  219. group.addTask {
  220. while !Task.isCancelled {
  221. onTerminationContinuation.yield()
  222. try await Task.sleep(nanoseconds: 200_000_000)
  223. }
  224. }
  225. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  226. _ = await onTerminationIterator.next()
  227. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  228. iterator = nil
  229. _ = try await iterator?.next()
  230. let terminationResult: Void? = await onTerminationIterator.next()
  231. XCTAssertNil(terminationResult)
  232. group.cancelAll()
  233. }
  234. }
  235. func testIteratorDeinitialized_whenSourceFinished() async throws {
  236. var (stream, source) = BufferedStream.makeStream(
  237. of: Int.self,
  238. backPressureStrategy: .watermark(low: 5, high: 10)
  239. )
  240. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  241. source.onTermination = {
  242. onTerminationContinuation.finish()
  243. }
  244. try await source.write(1)
  245. source.finish(throwing: nil)
  246. try await withThrowingTaskGroup(of: Void.self) { group in
  247. group.addTask {
  248. while !Task.isCancelled {
  249. onTerminationContinuation.yield()
  250. try await Task.sleep(nanoseconds: 200_000_000)
  251. }
  252. }
  253. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  254. _ = await onTerminationIterator.next()
  255. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  256. iterator = nil
  257. _ = try await iterator?.next()
  258. let terminationResult: Void? = await onTerminationIterator.next()
  259. XCTAssertNil(terminationResult)
  260. group.cancelAll()
  261. }
  262. }
  263. func testIteratorDeinitialized_whenFinished() async throws {
  264. var (stream, source) = BufferedStream.makeStream(
  265. of: Int.self,
  266. backPressureStrategy: .watermark(low: 5, high: 10)
  267. )
  268. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  269. source.onTermination = {
  270. onTerminationContinuation.finish()
  271. }
  272. source.finish(throwing: nil)
  273. try await withThrowingTaskGroup(of: Void.self) { group in
  274. group.addTask {
  275. while !Task.isCancelled {
  276. onTerminationContinuation.yield()
  277. try await Task.sleep(nanoseconds: 200_000_000)
  278. }
  279. }
  280. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  281. _ = await onTerminationIterator.next()
  282. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  283. iterator = nil
  284. _ = try await iterator?.next()
  285. let terminationResult: Void? = await onTerminationIterator.next()
  286. XCTAssertNil(terminationResult)
  287. group.cancelAll()
  288. }
  289. }
  290. func testIteratorDeinitialized_whenStreaming_andSuspendedProducer() async throws {
  291. var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
  292. of: Int.self,
  293. backPressureStrategy: .watermark(low: 1, high: 2)
  294. )
  295. var iterator: BufferedStream<Int>.AsyncIterator? = stream?.makeAsyncIterator()
  296. stream = nil
  297. _ = try { try source.write(1) }()
  298. do {
  299. try await withCheckedThrowingContinuation { continuation in
  300. source.write(1) { result in
  301. continuation.resume(with: result)
  302. }
  303. iterator = nil
  304. }
  305. } catch {
  306. XCTAssertTrue(error is AlreadyFinishedError)
  307. }
  308. _ = try await iterator?.next()
  309. }
  310. // MARK: - sourceDeinitialized
  311. func testSourceDeinitialized_whenInitial() async throws {
  312. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  313. of: Int.self,
  314. backPressureStrategy: .watermark(low: 5, high: 10)
  315. )
  316. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  317. source?.onTermination = {
  318. onTerminationContinuation.finish()
  319. }
  320. await withThrowingTaskGroup(of: Void.self) { group in
  321. group.addTask {
  322. while !Task.isCancelled {
  323. onTerminationContinuation.yield()
  324. try await Task.sleep(nanoseconds: 200_000_000)
  325. }
  326. }
  327. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  328. _ = await onTerminationIterator.next()
  329. source = nil
  330. let terminationResult: Void? = await onTerminationIterator.next()
  331. XCTAssertNil(terminationResult)
  332. group.cancelAll()
  333. }
  334. withExtendedLifetime(stream) {}
  335. }
  336. func testSourceDeinitialized_whenStreaming_andEmptyBuffer() async throws {
  337. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  338. of: Int.self,
  339. backPressureStrategy: .watermark(low: 5, high: 10)
  340. )
  341. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  342. source?.onTermination = {
  343. onTerminationContinuation.finish()
  344. }
  345. try await source?.write(1)
  346. try await withThrowingTaskGroup(of: Void.self) { group in
  347. group.addTask {
  348. while !Task.isCancelled {
  349. onTerminationContinuation.yield()
  350. try await Task.sleep(nanoseconds: 200_000_000)
  351. }
  352. }
  353. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  354. _ = await onTerminationIterator.next()
  355. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  356. _ = try await iterator?.next()
  357. source = nil
  358. let terminationResult: Void? = await onTerminationIterator.next()
  359. XCTAssertNil(terminationResult)
  360. group.cancelAll()
  361. }
  362. }
  363. func testSourceDeinitialized_whenStreaming_andNotEmptyBuffer() async throws {
  364. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  365. of: Int.self,
  366. backPressureStrategy: .watermark(low: 5, high: 10)
  367. )
  368. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  369. source?.onTermination = {
  370. onTerminationContinuation.finish()
  371. }
  372. try await source?.write(1)
  373. try await source?.write(2)
  374. try await withThrowingTaskGroup(of: Void.self) { group in
  375. group.addTask {
  376. while !Task.isCancelled {
  377. onTerminationContinuation.yield()
  378. try await Task.sleep(nanoseconds: 200_000_000)
  379. }
  380. }
  381. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  382. _ = await onTerminationIterator.next()
  383. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  384. _ = try await iterator?.next()
  385. source = nil
  386. _ = await onTerminationIterator.next()
  387. _ = try await iterator?.next()
  388. _ = try await iterator?.next()
  389. let terminationResult: Void? = await onTerminationIterator.next()
  390. XCTAssertNil(terminationResult)
  391. group.cancelAll()
  392. }
  393. }
  394. func testSourceDeinitialized_whenSourceFinished() async throws {
  395. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  396. of: Int.self,
  397. backPressureStrategy: .watermark(low: 5, high: 10)
  398. )
  399. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  400. source?.onTermination = {
  401. onTerminationContinuation.finish()
  402. }
  403. try await source?.write(1)
  404. try await source?.write(2)
  405. source?.finish(throwing: nil)
  406. try await withThrowingTaskGroup(of: Void.self) { group in
  407. group.addTask {
  408. while !Task.isCancelled {
  409. onTerminationContinuation.yield()
  410. try await Task.sleep(nanoseconds: 200_000_000)
  411. }
  412. }
  413. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  414. _ = await onTerminationIterator.next()
  415. var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
  416. _ = try await iterator?.next()
  417. source = nil
  418. _ = await onTerminationIterator.next()
  419. _ = try await iterator?.next()
  420. _ = try await iterator?.next()
  421. let terminationResult: Void? = await onTerminationIterator.next()
  422. XCTAssertNil(terminationResult)
  423. group.cancelAll()
  424. }
  425. }
  426. func testSourceDeinitialized_whenFinished() async throws {
  427. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  428. of: Int.self,
  429. backPressureStrategy: .watermark(low: 5, high: 10)
  430. )
  431. let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
  432. source?.onTermination = {
  433. onTerminationContinuation.finish()
  434. }
  435. source?.finish(throwing: nil)
  436. await withThrowingTaskGroup(of: Void.self) { group in
  437. group.addTask {
  438. while !Task.isCancelled {
  439. onTerminationContinuation.yield()
  440. try await Task.sleep(nanoseconds: 200_000_000)
  441. }
  442. }
  443. var onTerminationIterator = onTerminationStream.makeAsyncIterator()
  444. _ = await onTerminationIterator.next()
  445. _ = stream.makeAsyncIterator()
  446. source = nil
  447. _ = await onTerminationIterator.next()
  448. let terminationResult: Void? = await onTerminationIterator.next()
  449. XCTAssertNil(terminationResult)
  450. group.cancelAll()
  451. }
  452. }
  453. func testSourceDeinitialized_whenStreaming_andSuspendedProducer() async throws {
  454. var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
  455. of: Int.self,
  456. backPressureStrategy: .watermark(low: 0, high: 0)
  457. )
  458. let (producerStream, producerContinuation) = AsyncThrowingStream<Void, any Error>.makeStream()
  459. var iterator = stream.makeAsyncIterator()
  460. source?.write(1) {
  461. producerContinuation.yield(with: $0)
  462. }
  463. _ = try await iterator.next()
  464. source = nil
  465. do {
  466. try await producerStream.first { _ in true }
  467. XCTFail("We expected to throw here")
  468. } catch {
  469. XCTAssertTrue(error is AlreadyFinishedError)
  470. }
  471. }
  472. // MARK: - write
  473. func testWrite_whenInitial() async throws {
  474. let (stream, source) = BufferedStream.makeStream(
  475. of: Int.self,
  476. backPressureStrategy: .watermark(low: 2, high: 5)
  477. )
  478. try await source.write(1)
  479. var iterator = stream.makeAsyncIterator()
  480. let element = try await iterator.next()
  481. XCTAssertEqual(element, 1)
  482. }
  483. func testWrite_whenStreaming_andNoConsumer() async throws {
  484. let (stream, source) = BufferedStream.makeStream(
  485. of: Int.self,
  486. backPressureStrategy: .watermark(low: 2, high: 5)
  487. )
  488. try await source.write(1)
  489. try await source.write(2)
  490. var iterator = stream.makeAsyncIterator()
  491. let element1 = try await iterator.next()
  492. XCTAssertEqual(element1, 1)
  493. let element2 = try await iterator.next()
  494. XCTAssertEqual(element2, 2)
  495. }
  496. func testWrite_whenStreaming_andSuspendedConsumer() async throws {
  497. let (stream, source) = BufferedStream.makeStream(
  498. of: Int.self,
  499. backPressureStrategy: .watermark(low: 2, high: 5)
  500. )
  501. try await withThrowingTaskGroup(of: Int?.self) { group in
  502. group.addTask {
  503. return try await stream.first { _ in true }
  504. }
  505. // This is always going to be a bit racy since we need the call to next() suspend
  506. try await Task.sleep(nanoseconds: 500_000_000)
  507. try await source.write(1)
  508. let element = try await group.next()
  509. XCTAssertEqual(element, 1)
  510. }
  511. }
  512. func testWrite_whenStreaming_andSuspendedConsumer_andEmptySequence() async throws {
  513. let (stream, source) = BufferedStream.makeStream(
  514. of: Int.self,
  515. backPressureStrategy: .watermark(low: 2, high: 5)
  516. )
  517. try await withThrowingTaskGroup(of: Int?.self) { group in
  518. group.addTask {
  519. return try await stream.first { _ in true }
  520. }
  521. // This is always going to be a bit racy since we need the call to next() suspend
  522. try await Task.sleep(nanoseconds: 500_000_000)
  523. try await source.write(contentsOf: [])
  524. try await source.write(contentsOf: [1])
  525. let element = try await group.next()
  526. XCTAssertEqual(element, 1)
  527. }
  528. }
  529. // MARK: - enqueueProducer
  530. func testEnqueueProducer_whenStreaming_andAndCancelled() async throws {
  531. let (stream, source) = BufferedStream.makeStream(
  532. of: Int.self,
  533. backPressureStrategy: .watermark(low: 1, high: 2)
  534. )
  535. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  536. try await source.write(1)
  537. let writeResult = try { try source.write(2) }()
  538. switch writeResult {
  539. case .produceMore:
  540. preconditionFailure()
  541. case .enqueueCallback(let callbackToken):
  542. source.cancelCallback(callbackToken: callbackToken)
  543. source.enqueueCallback(callbackToken: callbackToken) { result in
  544. producerSource.yield(with: result)
  545. }
  546. }
  547. do {
  548. _ = try await producerStream.first { _ in true }
  549. XCTFail("Expected an error to be thrown")
  550. } catch {
  551. XCTAssertTrue(error is CancellationError)
  552. }
  553. let element = try await stream.first { _ in true }
  554. XCTAssertEqual(element, 1)
  555. }
  556. func testEnqueueProducer_whenStreaming_andAndCancelled_andAsync() async throws {
  557. let (stream, source) = BufferedStream.makeStream(
  558. of: Int.self,
  559. backPressureStrategy: .watermark(low: 1, high: 2)
  560. )
  561. try await source.write(1)
  562. await withThrowingTaskGroup(of: Void.self) { group in
  563. group.addTask {
  564. try await source.write(2)
  565. }
  566. group.cancelAll()
  567. do {
  568. try await group.next()
  569. XCTFail("Expected an error to be thrown")
  570. } catch {
  571. XCTAssertTrue(error is CancellationError)
  572. }
  573. }
  574. let element = try await stream.first { _ in true }
  575. XCTAssertEqual(element, 1)
  576. }
  577. func testEnqueueProducer_whenStreaming_andInterleaving() async throws {
  578. let (stream, source) = BufferedStream.makeStream(
  579. of: Int.self,
  580. backPressureStrategy: .watermark(low: 1, high: 1)
  581. )
  582. var iterator = stream.makeAsyncIterator()
  583. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  584. let writeResult = try { try source.write(1) }()
  585. switch writeResult {
  586. case .produceMore:
  587. preconditionFailure()
  588. case .enqueueCallback(let callbackToken):
  589. let element = try await iterator.next()
  590. XCTAssertEqual(element, 1)
  591. source.enqueueCallback(callbackToken: callbackToken) { result in
  592. producerSource.yield(with: result)
  593. }
  594. }
  595. do {
  596. _ = try await producerStream.first { _ in true }
  597. } catch {
  598. XCTFail("Expected no error to be thrown")
  599. }
  600. }
  601. func testEnqueueProducer_whenStreaming_andSuspending() async throws {
  602. let (stream, source) = BufferedStream.makeStream(
  603. of: Int.self,
  604. backPressureStrategy: .watermark(low: 1, high: 1)
  605. )
  606. var iterator = stream.makeAsyncIterator()
  607. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  608. let writeResult = try { try source.write(1) }()
  609. switch writeResult {
  610. case .produceMore:
  611. preconditionFailure()
  612. case .enqueueCallback(let callbackToken):
  613. source.enqueueCallback(callbackToken: callbackToken) { result in
  614. producerSource.yield(with: result)
  615. }
  616. }
  617. let element = try await iterator.next()
  618. XCTAssertEqual(element, 1)
  619. do {
  620. _ = try await producerStream.first { _ in true }
  621. } catch {
  622. XCTFail("Expected no error to be thrown")
  623. }
  624. }
  625. func testEnqueueProducer_whenFinished() async throws {
  626. let (stream, source) = BufferedStream.makeStream(
  627. of: Int.self,
  628. backPressureStrategy: .watermark(low: 1, high: 1)
  629. )
  630. var iterator = stream.makeAsyncIterator()
  631. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  632. let writeResult = try { try source.write(1) }()
  633. switch writeResult {
  634. case .produceMore:
  635. preconditionFailure()
  636. case .enqueueCallback(let callbackToken):
  637. source.finish(throwing: nil)
  638. source.enqueueCallback(callbackToken: callbackToken) { result in
  639. producerSource.yield(with: result)
  640. }
  641. }
  642. let element = try await iterator.next()
  643. XCTAssertEqual(element, 1)
  644. do {
  645. _ = try await producerStream.first { _ in true }
  646. XCTFail("Expected an error to be thrown")
  647. } catch {
  648. XCTAssertTrue(error is AlreadyFinishedError)
  649. }
  650. }
  651. // MARK: - cancelProducer
  652. func testCancelProducer_whenStreaming() async throws {
  653. let (stream, source) = BufferedStream.makeStream(
  654. of: Int.self,
  655. backPressureStrategy: .watermark(low: 1, high: 2)
  656. )
  657. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  658. try await source.write(1)
  659. let writeResult = try { try source.write(2) }()
  660. switch writeResult {
  661. case .produceMore:
  662. preconditionFailure()
  663. case .enqueueCallback(let callbackToken):
  664. source.enqueueCallback(callbackToken: callbackToken) { result in
  665. producerSource.yield(with: result)
  666. }
  667. source.cancelCallback(callbackToken: callbackToken)
  668. }
  669. do {
  670. _ = try await producerStream.first { _ in true }
  671. XCTFail("Expected an error to be thrown")
  672. } catch {
  673. XCTAssertTrue(error is CancellationError)
  674. }
  675. let element = try await stream.first { _ in true }
  676. XCTAssertEqual(element, 1)
  677. }
  678. func testCancelProducer_whenSourceFinished() async throws {
  679. let (stream, source) = BufferedStream.makeStream(
  680. of: Int.self,
  681. backPressureStrategy: .watermark(low: 1, high: 2)
  682. )
  683. let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
  684. try await source.write(1)
  685. let writeResult = try { try source.write(2) }()
  686. switch writeResult {
  687. case .produceMore:
  688. preconditionFailure()
  689. case .enqueueCallback(let callbackToken):
  690. source.enqueueCallback(callbackToken: callbackToken) { result in
  691. producerSource.yield(with: result)
  692. }
  693. source.finish(throwing: nil)
  694. source.cancelCallback(callbackToken: callbackToken)
  695. }
  696. do {
  697. _ = try await producerStream.first { _ in true }
  698. XCTFail("Expected an error to be thrown")
  699. } catch {
  700. XCTAssertTrue(error is AlreadyFinishedError)
  701. }
  702. let element = try await stream.first { _ in true }
  703. XCTAssertEqual(element, 1)
  704. }
  705. // MARK: - finish
  706. func testFinish_whenStreaming_andConsumerSuspended() async throws {
  707. let (stream, source) = BufferedStream.makeStream(
  708. of: Int.self,
  709. backPressureStrategy: .watermark(low: 1, high: 1)
  710. )
  711. try await withThrowingTaskGroup(of: Int?.self) { group in
  712. group.addTask {
  713. return try await stream.first { $0 == 2 }
  714. }
  715. // This is always going to be a bit racy since we need the call to next() suspend
  716. try await Task.sleep(nanoseconds: 500_000_000)
  717. source.finish(throwing: nil)
  718. let element = try await group.next()
  719. XCTAssertEqual(element, .some(nil))
  720. }
  721. }
  722. func testFinish_whenInitial() async throws {
  723. let (stream, source) = BufferedStream.makeStream(
  724. of: Int.self,
  725. backPressureStrategy: .watermark(low: 1, high: 1)
  726. )
  727. source.finish(throwing: CancellationError())
  728. do {
  729. for try await _ in stream {}
  730. XCTFail("Expected an error to be thrown")
  731. } catch {
  732. XCTAssertTrue(error is CancellationError)
  733. }
  734. }
  735. // MARK: - Backpressure
  736. func testBackPressure() async throws {
  737. let (stream, source) = BufferedStream.makeStream(
  738. of: Int.self,
  739. backPressureStrategy: .watermark(low: 2, high: 4)
  740. )
  741. let (backPressureEventStream, backPressureEventContinuation) = AsyncStream.makeStream(
  742. of: Void.self
  743. )
  744. try await withThrowingTaskGroup(of: Void.self) { group in
  745. group.addTask {
  746. while true {
  747. backPressureEventContinuation.yield(())
  748. try await source.write(contentsOf: [1])
  749. }
  750. }
  751. var backPressureEventIterator = backPressureEventStream.makeAsyncIterator()
  752. var iterator = stream.makeAsyncIterator()
  753. await backPressureEventIterator.next()
  754. await backPressureEventIterator.next()
  755. await backPressureEventIterator.next()
  756. await backPressureEventIterator.next()
  757. _ = try await iterator.next()
  758. _ = try await iterator.next()
  759. _ = try await iterator.next()
  760. await backPressureEventIterator.next()
  761. await backPressureEventIterator.next()
  762. await backPressureEventIterator.next()
  763. group.cancelAll()
  764. }
  765. }
  766. func testBackPressureSync() async throws {
  767. let (stream, source) = BufferedStream.makeStream(
  768. of: Int.self,
  769. backPressureStrategy: .watermark(low: 2, high: 4)
  770. )
  771. let (backPressureEventStream, backPressureEventContinuation) = AsyncStream.makeStream(
  772. of: Void.self
  773. )
  774. try await withThrowingTaskGroup(of: Void.self) { group in
  775. group.addTask {
  776. @Sendable func yield() {
  777. backPressureEventContinuation.yield(())
  778. source.write(contentsOf: [1]) { result in
  779. switch result {
  780. case .success:
  781. yield()
  782. case .failure:
  783. break
  784. }
  785. }
  786. }
  787. yield()
  788. }
  789. var backPressureEventIterator = backPressureEventStream.makeAsyncIterator()
  790. var iterator = stream.makeAsyncIterator()
  791. await backPressureEventIterator.next()
  792. await backPressureEventIterator.next()
  793. await backPressureEventIterator.next()
  794. await backPressureEventIterator.next()
  795. _ = try await iterator.next()
  796. _ = try await iterator.next()
  797. _ = try await iterator.next()
  798. await backPressureEventIterator.next()
  799. await backPressureEventIterator.next()
  800. await backPressureEventIterator.next()
  801. group.cancelAll()
  802. }
  803. }
  804. func testThrowsError() async throws {
  805. let (stream, source) = BufferedStream.makeStream(
  806. of: Int.self,
  807. backPressureStrategy: .watermark(low: 2, high: 4)
  808. )
  809. try await source.write(1)
  810. try await source.write(2)
  811. source.finish(throwing: CancellationError())
  812. var elements = [Int]()
  813. var iterator = stream.makeAsyncIterator()
  814. do {
  815. while let element = try await iterator.next() {
  816. elements.append(element)
  817. }
  818. XCTFail("Expected an error to be thrown")
  819. } catch {
  820. XCTAssertTrue(error is CancellationError)
  821. XCTAssertEqual(elements, [1, 2])
  822. }
  823. let element = try await iterator.next()
  824. XCTAssertNil(element)
  825. }
  826. func testAsyncSequenceWrite() async throws {
  827. let (stream, continuation) = AsyncStream<Int>.makeStream()
  828. let (backpressuredStream, source) = BufferedStream.makeStream(
  829. of: Int.self,
  830. backPressureStrategy: .watermark(low: 2, high: 4)
  831. )
  832. continuation.yield(1)
  833. continuation.yield(2)
  834. continuation.finish()
  835. try await source.write(contentsOf: stream)
  836. source.finish(throwing: nil)
  837. let elements = try await backpressuredStream.collect()
  838. XCTAssertEqual(elements, [1, 2])
  839. }
  840. }
  841. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  842. extension BufferedStream.Source.WriteResult {
  843. func assertIsProducerMore() {
  844. switch self {
  845. case .produceMore:
  846. return
  847. case .enqueueCallback:
  848. XCTFail("Expected produceMore")
  849. }
  850. }
  851. func assertIsEnqueueCallback() {
  852. switch self {
  853. case .produceMore:
  854. XCTFail("Expected enqueueCallback")
  855. case .enqueueCallback:
  856. return
  857. }
  858. }
  859. }