| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104 |
- /*
- * Copyright 2023, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import XCTest
- @testable import GRPCCore
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- final class BufferedStreamTests: XCTestCase {
- // MARK: - sequenceDeinitialized
- func testSequenceDeinitialized_whenNoIterator() async throws {
- var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- withExtendedLifetime(stream) {}
- stream = nil
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- do {
- _ = try { try source.write(2) }()
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- group.cancelAll()
- }
- }
- func testSequenceDeinitialized_whenIterator() async throws {
- var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- var iterator = stream?.makeAsyncIterator()
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- try withExtendedLifetime(stream) {
- let writeResult = try source.write(1)
- writeResult.assertIsProducerMore()
- }
- stream = nil
- do {
- let writeResult = try { try source.write(2) }()
- writeResult.assertIsProducerMore()
- } catch {
- XCTFail("Expected no error to be thrown")
- }
- let element1 = try await iterator?.next()
- XCTAssertEqual(element1, 1)
- let element2 = try await iterator?.next()
- XCTAssertEqual(element2, 2)
- group.cancelAll()
- }
- }
- func testSequenceDeinitialized_whenFinished() async throws {
- var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- withExtendedLifetime(stream) {
- source.finish(throwing: nil)
- }
- stream = nil
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- do {
- _ = try { try source.write(1) }()
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- group.cancelAll()
- }
- }
- func testSequenceDeinitialized_whenStreaming_andSuspendedProducer() async throws {
- var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- _ = try { try source.write(1) }()
- do {
- try await withCheckedThrowingContinuation { continuation in
- source.write(1) { result in
- continuation.resume(with: result)
- }
- stream = nil
- _ = stream?.makeAsyncIterator()
- }
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- }
- // MARK: - iteratorInitialized
- func testIteratorInitialized_whenInitial() async throws {
- let (stream, _) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- _ = stream.makeAsyncIterator()
- }
- func testIteratorInitialized_whenStreaming() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- try await source.write(1)
- var iterator = stream.makeAsyncIterator()
- let element = try await iterator.next()
- XCTAssertEqual(element, 1)
- }
- func testIteratorInitialized_whenSourceFinished() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- try await source.write(1)
- source.finish(throwing: nil)
- var iterator = stream.makeAsyncIterator()
- let element1 = try await iterator.next()
- XCTAssertEqual(element1, 1)
- let element2 = try await iterator.next()
- XCTAssertNil(element2)
- }
- func testIteratorInitialized_whenFinished() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- source.finish(throwing: nil)
- var iterator = stream.makeAsyncIterator()
- let element = try await iterator.next()
- XCTAssertNil(element)
- }
- // MARK: - iteratorDeinitialized
- func testIteratorDeinitialized_whenInitial() async throws {
- var (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- iterator = nil
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testIteratorDeinitialized_whenStreaming() async throws {
- var (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- try await source.write(1)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- iterator = nil
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testIteratorDeinitialized_whenSourceFinished() async throws {
- var (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- try await source.write(1)
- source.finish(throwing: nil)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- iterator = nil
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testIteratorDeinitialized_whenFinished() async throws {
- var (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source.onTermination = {
- onTerminationContinuation.finish()
- }
- source.finish(throwing: nil)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- iterator = nil
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testIteratorDeinitialized_whenStreaming_andSuspendedProducer() async throws {
- var (stream, source): (BufferedStream?, BufferedStream.Source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- var iterator: BufferedStream<Int>.AsyncIterator? = stream?.makeAsyncIterator()
- stream = nil
- _ = try { try source.write(1) }()
- do {
- try await withCheckedThrowingContinuation { continuation in
- source.write(1) { result in
- continuation.resume(with: result)
- }
- iterator = nil
- }
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- _ = try await iterator?.next()
- }
- // MARK: - sourceDeinitialized
- func testSourceDeinitialized_whenInitial() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source?.onTermination = {
- onTerminationContinuation.finish()
- }
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- source = nil
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- withExtendedLifetime(stream) {}
- }
- func testSourceDeinitialized_whenStreaming_andEmptyBuffer() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source?.onTermination = {
- onTerminationContinuation.finish()
- }
- try await source?.write(1)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- _ = try await iterator?.next()
- source = nil
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testSourceDeinitialized_whenStreaming_andNotEmptyBuffer() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source?.onTermination = {
- onTerminationContinuation.finish()
- }
- try await source?.write(1)
- try await source?.write(2)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- _ = try await iterator?.next()
- source = nil
- _ = await onTerminationIterator.next()
- _ = try await iterator?.next()
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testSourceDeinitialized_whenSourceFinished() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source?.onTermination = {
- onTerminationContinuation.finish()
- }
- try await source?.write(1)
- try await source?.write(2)
- source?.finish(throwing: nil)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- var iterator: BufferedStream<Int>.AsyncIterator? = stream.makeAsyncIterator()
- _ = try await iterator?.next()
- source = nil
- _ = await onTerminationIterator.next()
- _ = try await iterator?.next()
- _ = try await iterator?.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testSourceDeinitialized_whenFinished() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 5, high: 10)
- )
- let (onTerminationStream, onTerminationContinuation) = AsyncStream<Void>.makeStream()
- source?.onTermination = {
- onTerminationContinuation.finish()
- }
- source?.finish(throwing: nil)
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while !Task.isCancelled {
- onTerminationContinuation.yield()
- try await Task.sleep(nanoseconds: 200_000_000)
- }
- }
- var onTerminationIterator = onTerminationStream.makeAsyncIterator()
- _ = await onTerminationIterator.next()
- _ = stream.makeAsyncIterator()
- source = nil
- _ = await onTerminationIterator.next()
- let terminationResult: Void? = await onTerminationIterator.next()
- XCTAssertNil(terminationResult)
- group.cancelAll()
- }
- }
- func testSourceDeinitialized_whenStreaming_andSuspendedProducer() async throws {
- var (stream, source): (BufferedStream, BufferedStream.Source?) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 0, high: 0)
- )
- let (producerStream, producerContinuation) = AsyncThrowingStream<Void, any Error>.makeStream()
- var iterator = stream.makeAsyncIterator()
- source?.write(1) {
- producerContinuation.yield(with: $0)
- }
- _ = try await iterator.next()
- source = nil
- do {
- try await producerStream.first { _ in true }
- XCTFail("We expected to throw here")
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- }
- // MARK: - write
- func testWrite_whenInitial() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 5)
- )
- try await source.write(1)
- var iterator = stream.makeAsyncIterator()
- let element = try await iterator.next()
- XCTAssertEqual(element, 1)
- }
- func testWrite_whenStreaming_andNoConsumer() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 5)
- )
- try await source.write(1)
- try await source.write(2)
- var iterator = stream.makeAsyncIterator()
- let element1 = try await iterator.next()
- XCTAssertEqual(element1, 1)
- let element2 = try await iterator.next()
- XCTAssertEqual(element2, 2)
- }
- func testWrite_whenStreaming_andSuspendedConsumer() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 5)
- )
- try await withThrowingTaskGroup(of: Int?.self) { group in
- group.addTask {
- return try await stream.first { _ in true }
- }
- // This is always going to be a bit racy since we need the call to next() suspend
- try await Task.sleep(nanoseconds: 500_000_000)
- try await source.write(1)
- let element = try await group.next()
- XCTAssertEqual(element, 1)
- }
- }
- func testWrite_whenStreaming_andSuspendedConsumer_andEmptySequence() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 5)
- )
- try await withThrowingTaskGroup(of: Int?.self) { group in
- group.addTask {
- return try await stream.first { _ in true }
- }
- // This is always going to be a bit racy since we need the call to next() suspend
- try await Task.sleep(nanoseconds: 500_000_000)
- try await source.write(contentsOf: [])
- try await source.write(contentsOf: [1])
- let element = try await group.next()
- XCTAssertEqual(element, 1)
- }
- }
- // MARK: - enqueueProducer
- func testEnqueueProducer_whenStreaming_andAndCancelled() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- try await source.write(1)
- let writeResult = try { try source.write(2) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- source.cancelCallback(callbackToken: callbackToken)
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- }
- do {
- _ = try await producerStream.first { _ in true }
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is CancellationError)
- }
- let element = try await stream.first { _ in true }
- XCTAssertEqual(element, 1)
- }
- func testEnqueueProducer_whenStreaming_andAndCancelled_andAsync() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- try await source.write(1)
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await source.write(2)
- }
- group.cancelAll()
- do {
- try await group.next()
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is CancellationError)
- }
- }
- let element = try await stream.first { _ in true }
- XCTAssertEqual(element, 1)
- }
- func testEnqueueProducer_whenStreaming_andInterleaving() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 1)
- )
- var iterator = stream.makeAsyncIterator()
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- let writeResult = try { try source.write(1) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- let element = try await iterator.next()
- XCTAssertEqual(element, 1)
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- }
- do {
- _ = try await producerStream.first { _ in true }
- } catch {
- XCTFail("Expected no error to be thrown")
- }
- }
- func testEnqueueProducer_whenStreaming_andSuspending() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 1)
- )
- var iterator = stream.makeAsyncIterator()
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- let writeResult = try { try source.write(1) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- }
- let element = try await iterator.next()
- XCTAssertEqual(element, 1)
- do {
- _ = try await producerStream.first { _ in true }
- } catch {
- XCTFail("Expected no error to be thrown")
- }
- }
- func testEnqueueProducer_whenFinished() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 1)
- )
- var iterator = stream.makeAsyncIterator()
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- let writeResult = try { try source.write(1) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- source.finish(throwing: nil)
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- }
- let element = try await iterator.next()
- XCTAssertEqual(element, 1)
- do {
- _ = try await producerStream.first { _ in true }
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- }
- // MARK: - cancelProducer
- func testCancelProducer_whenStreaming() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- try await source.write(1)
- let writeResult = try { try source.write(2) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- source.cancelCallback(callbackToken: callbackToken)
- }
- do {
- _ = try await producerStream.first { _ in true }
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is CancellationError)
- }
- let element = try await stream.first { _ in true }
- XCTAssertEqual(element, 1)
- }
- func testCancelProducer_whenSourceFinished() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 2)
- )
- let (producerStream, producerSource) = AsyncThrowingStream<Void, any Error>.makeStream()
- try await source.write(1)
- let writeResult = try { try source.write(2) }()
- switch writeResult {
- case .produceMore:
- preconditionFailure()
- case .enqueueCallback(let callbackToken):
- source.enqueueCallback(callbackToken: callbackToken) { result in
- producerSource.yield(with: result)
- }
- source.finish(throwing: nil)
- source.cancelCallback(callbackToken: callbackToken)
- }
- do {
- _ = try await producerStream.first { _ in true }
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is AlreadyFinishedError)
- }
- let element = try await stream.first { _ in true }
- XCTAssertEqual(element, 1)
- }
- // MARK: - finish
- func testFinish_whenStreaming_andConsumerSuspended() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 1)
- )
- try await withThrowingTaskGroup(of: Int?.self) { group in
- group.addTask {
- return try await stream.first { $0 == 2 }
- }
- // This is always going to be a bit racy since we need the call to next() suspend
- try await Task.sleep(nanoseconds: 500_000_000)
- source.finish(throwing: nil)
- let element = try await group.next()
- XCTAssertEqual(element, .some(nil))
- }
- }
- func testFinish_whenInitial() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 1, high: 1)
- )
- source.finish(throwing: CancellationError())
- do {
- for try await _ in stream {}
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is CancellationError)
- }
- }
- // MARK: - Backpressure
- func testBackPressure() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 4)
- )
- let (backPressureEventStream, backPressureEventContinuation) = AsyncStream.makeStream(
- of: Void.self
- )
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- while true {
- backPressureEventContinuation.yield(())
- try await source.write(contentsOf: [1])
- }
- }
- var backPressureEventIterator = backPressureEventStream.makeAsyncIterator()
- var iterator = stream.makeAsyncIterator()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- _ = try await iterator.next()
- _ = try await iterator.next()
- _ = try await iterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- group.cancelAll()
- }
- }
- func testBackPressureSync() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 4)
- )
- let (backPressureEventStream, backPressureEventContinuation) = AsyncStream.makeStream(
- of: Void.self
- )
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- @Sendable func yield() {
- backPressureEventContinuation.yield(())
- source.write(contentsOf: [1]) { result in
- switch result {
- case .success:
- yield()
- case .failure:
- break
- }
- }
- }
- yield()
- }
- var backPressureEventIterator = backPressureEventStream.makeAsyncIterator()
- var iterator = stream.makeAsyncIterator()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- _ = try await iterator.next()
- _ = try await iterator.next()
- _ = try await iterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- await backPressureEventIterator.next()
- group.cancelAll()
- }
- }
- func testThrowsError() async throws {
- let (stream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 4)
- )
- try await source.write(1)
- try await source.write(2)
- source.finish(throwing: CancellationError())
- var elements = [Int]()
- var iterator = stream.makeAsyncIterator()
- do {
- while let element = try await iterator.next() {
- elements.append(element)
- }
- XCTFail("Expected an error to be thrown")
- } catch {
- XCTAssertTrue(error is CancellationError)
- XCTAssertEqual(elements, [1, 2])
- }
- let element = try await iterator.next()
- XCTAssertNil(element)
- }
- func testAsyncSequenceWrite() async throws {
- let (stream, continuation) = AsyncStream<Int>.makeStream()
- let (backpressuredStream, source) = BufferedStream.makeStream(
- of: Int.self,
- backPressureStrategy: .watermark(low: 2, high: 4)
- )
- continuation.yield(1)
- continuation.yield(2)
- continuation.finish()
- try await source.write(contentsOf: stream)
- source.finish(throwing: nil)
- let elements = try await backpressuredStream.collect()
- XCTAssertEqual(elements, [1, 2])
- }
- }
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- extension BufferedStream.Source.WriteResult {
- func assertIsProducerMore() {
- switch self {
- case .produceMore:
- return
- case .enqueueCallback:
- XCTFail("Expected produceMore")
- }
- }
- func assertIsEnqueueCallback() {
- switch self {
- case .produceMore:
- XCTFail("Expected enqueueCallback")
- case .enqueueCallback:
- return
- }
- }
- }
|