BroadcastAsyncSequenceTests.swift 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import XCTest
  17. @testable import GRPCCore
  18. final class BroadcastAsyncSequenceTests: XCTestCase {
  19. func testSingleSubscriberToEmptyStream() async throws {
  20. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  21. source.finish()
  22. let elements = try await stream.collect()
  23. XCTAssertEqual(elements, [])
  24. }
  25. func testMultipleSubscribersToEmptyStream() async throws {
  26. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  27. source.finish()
  28. do {
  29. let elements = try await stream.collect()
  30. XCTAssertEqual(elements, [])
  31. }
  32. do {
  33. let elements = try await stream.collect()
  34. XCTAssertEqual(elements, [])
  35. }
  36. }
  37. func testSubscribeToEmptyStreamBeforeFinish() async throws {
  38. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  39. var iterator = stream.makeAsyncIterator()
  40. source.finish()
  41. let element = try await iterator.next()
  42. XCTAssertNil(element)
  43. }
  44. func testSlowConsumerIsLeftBehind() async throws {
  45. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  46. var consumer1 = stream.makeAsyncIterator()
  47. var consumer2 = stream.makeAsyncIterator()
  48. for element in 0 ..< 15 {
  49. try await source.write(element)
  50. }
  51. // Buffer should now be full. Consume with one consumer so that the other is dropped on
  52. // the next yield.
  53. let element = try await consumer1.next()
  54. XCTAssertEqual(element, 0)
  55. // Will invalidate consumer2 as the slowest consumer.
  56. try await source.write(15)
  57. await XCTAssertThrowsErrorAsync {
  58. try await consumer2.next()
  59. } errorHandler: { error in
  60. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  61. }
  62. // consumer1 should be free to continue.
  63. for expected in 1 ... 15 {
  64. let element = try await consumer1.next()
  65. XCTAssertEqual(element, expected)
  66. }
  67. // consumer1 should end as expected.
  68. source.finish()
  69. let end = try await consumer1.next()
  70. XCTAssertNil(end)
  71. }
  72. func testConsumerJoiningAfterSomeElements() async throws {
  73. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  74. for element in 0 ..< 10 {
  75. try await source.write(element)
  76. }
  77. var consumer1 = stream.makeAsyncIterator()
  78. do {
  79. for expected in 0 ..< 8 {
  80. let element = try await consumer1.next()
  81. XCTAssertEqual(element, expected)
  82. }
  83. }
  84. // Add a second consumer, consume the first four elements.
  85. var consumer2 = stream.makeAsyncIterator()
  86. do {
  87. for expected in 0 ..< 4 {
  88. let element = try await consumer2.next()
  89. XCTAssertEqual(element, expected)
  90. }
  91. }
  92. // Add another consumer, consume the first two elements.
  93. var consumer3 = stream.makeAsyncIterator()
  94. do {
  95. for expected in 0 ..< 2 {
  96. let element = try await consumer3.next()
  97. XCTAssertEqual(element, expected)
  98. }
  99. }
  100. // Advance each consumer in lock-step.
  101. for offset in 0 ..< 10 {
  102. try await source.write(10 + offset)
  103. let element1 = try await consumer1.next()
  104. XCTAssertEqual(element1, 8 + offset)
  105. let element2 = try await consumer2.next()
  106. XCTAssertEqual(element2, 4 + offset)
  107. let element3 = try await consumer3.next()
  108. XCTAssertEqual(element3, 2 + offset)
  109. }
  110. // Subscribing isn't possible.
  111. await XCTAssertThrowsErrorAsync {
  112. try await stream.collect()
  113. } errorHandler: { error in
  114. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  115. }
  116. source.finish()
  117. // All elements are present. The existing consumers can finish however they choose.
  118. do {
  119. for expected in 18 ..< 20 {
  120. let element = try await consumer1.next()
  121. XCTAssertEqual(element, expected)
  122. }
  123. let end = try await consumer1.next()
  124. XCTAssertNil(end)
  125. }
  126. do {
  127. for expected in 14 ..< 20 {
  128. let element = try await consumer2.next()
  129. XCTAssertEqual(element, expected)
  130. }
  131. let end = try await consumer2.next()
  132. XCTAssertNil(end)
  133. }
  134. do {
  135. for expected in 12 ..< 20 {
  136. let element = try await consumer3.next()
  137. XCTAssertEqual(element, expected)
  138. }
  139. let end = try await consumer3.next()
  140. XCTAssertNil(end)
  141. }
  142. }
  143. func testInvalidateAllConsumersForSingleConcurrentConsumer() async throws {
  144. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  145. for element in 0 ..< 10 {
  146. try await source.write(element)
  147. }
  148. var consumer1 = stream.makeAsyncIterator()
  149. stream.invalidateAllSubscriptions()
  150. await XCTAssertThrowsErrorAsync {
  151. try await consumer1.next()
  152. } errorHandler: { error in
  153. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  154. }
  155. // Subscribe, consume one, then cancel.
  156. var consumer2 = stream.makeAsyncIterator()
  157. do {
  158. let value = try await consumer2.next()
  159. XCTAssertEqual(value, 0)
  160. }
  161. stream.invalidateAllSubscriptions()
  162. await XCTAssertThrowsErrorAsync {
  163. try await consumer2.next()
  164. } errorHandler: { error in
  165. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  166. }
  167. }
  168. func testInvalidateAllConsumersForMultipleConcurrentConsumer() async throws {
  169. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  170. for element in 0 ..< 10 {
  171. try await source.write(element)
  172. }
  173. let consumers: [BroadcastAsyncSequence<Int>.AsyncIterator] = (0 ..< 5).map { _ in
  174. stream.makeAsyncIterator()
  175. }
  176. for var consumer in consumers {
  177. let value = try await consumer.next()
  178. XCTAssertEqual(value, 0)
  179. }
  180. stream.invalidateAllSubscriptions()
  181. for var consumer in consumers {
  182. await XCTAssertThrowsErrorAsync {
  183. try await consumer.next()
  184. } errorHandler: { error in
  185. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  186. }
  187. }
  188. }
  189. func testCancelSubscriber() async throws {
  190. let (stream, _) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  191. await withTaskGroup(of: Void.self) { group in
  192. group.cancelAll()
  193. group.addTask {
  194. do {
  195. _ = try await stream.collect()
  196. XCTFail()
  197. } catch {
  198. XCTAssert(error is CancellationError)
  199. }
  200. }
  201. }
  202. }
  203. func testCancelProducer() async throws {
  204. let (_, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  205. for i in 0 ..< 15 {
  206. try await source.write(i)
  207. }
  208. try await withThrowingTaskGroup(of: Void.self) { group in
  209. group.cancelAll()
  210. for _ in 0 ..< 10 {
  211. group.addTask {
  212. try await source.write(42)
  213. }
  214. }
  215. while let result = await group.nextResult() {
  216. XCTAssertThrowsError(try result.get()) { error in
  217. XCTAssert(error is CancellationError)
  218. }
  219. }
  220. }
  221. }
  222. }