| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- /*
- * Copyright 2023, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import XCTest
- @testable import GRPCCore
- @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
- final class BroadcastAsyncSequenceTests: XCTestCase {
- func testSingleSubscriberToEmptyStream() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- source.finish()
- let elements = try await stream.collect()
- XCTAssertEqual(elements, [])
- }
- func testMultipleSubscribersToEmptyStream() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- source.finish()
- do {
- let elements = try await stream.collect()
- XCTAssertEqual(elements, [])
- }
- do {
- let elements = try await stream.collect()
- XCTAssertEqual(elements, [])
- }
- }
- func testSubscribeToEmptyStreamBeforeFinish() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- var iterator = stream.makeAsyncIterator()
- source.finish()
- let element = try await iterator.next()
- XCTAssertNil(element)
- }
- func testSlowConsumerIsLeftBehind() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- var consumer1 = stream.makeAsyncIterator()
- var consumer2 = stream.makeAsyncIterator()
- for element in 0 ..< 15 {
- try await source.write(element)
- }
- // Buffer should now be full. Consume with one consumer so that the other is dropped on
- // the next yield.
- let element = try await consumer1.next()
- XCTAssertEqual(element, 0)
- // Will invalidate consumer2 as the slowest consumer.
- try await source.write(15)
- await XCTAssertThrowsErrorAsync {
- try await consumer2.next()
- } errorHandler: { error in
- XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
- }
- // consumer1 should be free to continue.
- for expected in 1 ... 15 {
- let element = try await consumer1.next()
- XCTAssertEqual(element, expected)
- }
- // consumer1 should end as expected.
- source.finish()
- let end = try await consumer1.next()
- XCTAssertNil(end)
- }
- func testConsumerJoiningAfterSomeElements() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- for element in 0 ..< 10 {
- try await source.write(element)
- }
- var consumer1 = stream.makeAsyncIterator()
- do {
- for expected in 0 ..< 8 {
- let element = try await consumer1.next()
- XCTAssertEqual(element, expected)
- }
- }
- // Add a second consumer, consume the first four elements.
- var consumer2 = stream.makeAsyncIterator()
- do {
- for expected in 0 ..< 4 {
- let element = try await consumer2.next()
- XCTAssertEqual(element, expected)
- }
- }
- // Add another consumer, consume the first two elements.
- var consumer3 = stream.makeAsyncIterator()
- do {
- for expected in 0 ..< 2 {
- let element = try await consumer3.next()
- XCTAssertEqual(element, expected)
- }
- }
- // Advance each consumer in lock-step.
- for offset in 0 ..< 10 {
- try await source.write(10 + offset)
- let element1 = try await consumer1.next()
- XCTAssertEqual(element1, 8 + offset)
- let element2 = try await consumer2.next()
- XCTAssertEqual(element2, 4 + offset)
- let element3 = try await consumer3.next()
- XCTAssertEqual(element3, 2 + offset)
- }
- // Subscribing isn't possible.
- await XCTAssertThrowsErrorAsync {
- try await stream.collect()
- } errorHandler: { error in
- XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
- }
- source.finish()
- // All elements are present. The existing consumers can finish however they choose.
- do {
- for expected in 18 ..< 20 {
- let element = try await consumer1.next()
- XCTAssertEqual(element, expected)
- }
- let end = try await consumer1.next()
- XCTAssertNil(end)
- }
- do {
- for expected in 14 ..< 20 {
- let element = try await consumer2.next()
- XCTAssertEqual(element, expected)
- }
- let end = try await consumer2.next()
- XCTAssertNil(end)
- }
- do {
- for expected in 12 ..< 20 {
- let element = try await consumer3.next()
- XCTAssertEqual(element, expected)
- }
- let end = try await consumer3.next()
- XCTAssertNil(end)
- }
- }
- func testInvalidateAllConsumersForSingleConcurrentConsumer() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- for element in 0 ..< 10 {
- try await source.write(element)
- }
- var consumer1 = stream.makeAsyncIterator()
- stream.invalidateAllSubscriptions()
- await XCTAssertThrowsErrorAsync {
- try await consumer1.next()
- } errorHandler: { error in
- XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
- }
- // Subscribe, consume one, then cancel.
- var consumer2 = stream.makeAsyncIterator()
- do {
- let value = try await consumer2.next()
- XCTAssertEqual(value, 0)
- }
- stream.invalidateAllSubscriptions()
- await XCTAssertThrowsErrorAsync {
- try await consumer2.next()
- } errorHandler: { error in
- XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
- }
- }
- func testInvalidateAllConsumersForMultipleConcurrentConsumer() async throws {
- let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- for element in 0 ..< 10 {
- try await source.write(element)
- }
- let consumers: [BroadcastAsyncSequence<Int>.AsyncIterator] = (0 ..< 5).map { _ in
- stream.makeAsyncIterator()
- }
- for var consumer in consumers {
- let value = try await consumer.next()
- XCTAssertEqual(value, 0)
- }
- stream.invalidateAllSubscriptions()
- for var consumer in consumers {
- await XCTAssertThrowsErrorAsync {
- try await consumer.next()
- } errorHandler: { error in
- XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
- }
- }
- }
- func testCancelSubscriber() async throws {
- let (stream, _) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- await withTaskGroup(of: Void.self) { group in
- group.cancelAll()
- group.addTask {
- do {
- _ = try await stream.collect()
- XCTFail()
- } catch {
- XCTAssert(error is CancellationError)
- }
- }
- }
- }
- func testCancelProducer() async throws {
- let (_, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
- for i in 0 ..< 15 {
- try await source.write(i)
- }
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.cancelAll()
- for _ in 0 ..< 10 {
- group.addTask {
- try await source.write(42)
- }
- }
- while let result = await group.nextResult() {
- XCTAssertThrowsError(try result.get()) { error in
- XCTAssert(error is CancellationError)
- }
- }
- }
- }
- }
|