BroadcastAsyncSequenceTests.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import XCTest
  17. @testable import GRPCCore
  18. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  19. final class BroadcastAsyncSequenceTests: XCTestCase {
  20. func testSingleSubscriberToEmptyStream() async throws {
  21. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  22. source.finish()
  23. let elements = try await stream.collect()
  24. XCTAssertEqual(elements, [])
  25. }
  26. func testMultipleSubscribersToEmptyStream() async throws {
  27. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  28. source.finish()
  29. do {
  30. let elements = try await stream.collect()
  31. XCTAssertEqual(elements, [])
  32. }
  33. do {
  34. let elements = try await stream.collect()
  35. XCTAssertEqual(elements, [])
  36. }
  37. }
  38. func testSubscribeToEmptyStreamBeforeFinish() async throws {
  39. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  40. var iterator = stream.makeAsyncIterator()
  41. source.finish()
  42. let element = try await iterator.next()
  43. XCTAssertNil(element)
  44. }
  45. func testSlowConsumerIsLeftBehind() async throws {
  46. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  47. var consumer1 = stream.makeAsyncIterator()
  48. var consumer2 = stream.makeAsyncIterator()
  49. for element in 0 ..< 15 {
  50. try await source.write(element)
  51. }
  52. // Buffer should now be full. Consume with one consumer so that the other is dropped on
  53. // the next yield.
  54. let element = try await consumer1.next()
  55. XCTAssertEqual(element, 0)
  56. // Will invalidate consumer2 as the slowest consumer.
  57. try await source.write(15)
  58. await XCTAssertThrowsErrorAsync {
  59. try await consumer2.next()
  60. } errorHandler: { error in
  61. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  62. }
  63. // consumer1 should be free to continue.
  64. for expected in 1 ... 15 {
  65. let element = try await consumer1.next()
  66. XCTAssertEqual(element, expected)
  67. }
  68. // consumer1 should end as expected.
  69. source.finish()
  70. let end = try await consumer1.next()
  71. XCTAssertNil(end)
  72. }
  73. func testConsumerJoiningAfterSomeElements() async throws {
  74. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  75. for element in 0 ..< 10 {
  76. try await source.write(element)
  77. }
  78. var consumer1 = stream.makeAsyncIterator()
  79. do {
  80. for expected in 0 ..< 8 {
  81. let element = try await consumer1.next()
  82. XCTAssertEqual(element, expected)
  83. }
  84. }
  85. // Add a second consumer, consume the first four elements.
  86. var consumer2 = stream.makeAsyncIterator()
  87. do {
  88. for expected in 0 ..< 4 {
  89. let element = try await consumer2.next()
  90. XCTAssertEqual(element, expected)
  91. }
  92. }
  93. // Add another consumer, consume the first two elements.
  94. var consumer3 = stream.makeAsyncIterator()
  95. do {
  96. for expected in 0 ..< 2 {
  97. let element = try await consumer3.next()
  98. XCTAssertEqual(element, expected)
  99. }
  100. }
  101. // Advance each consumer in lock-step.
  102. for offset in 0 ..< 10 {
  103. try await source.write(10 + offset)
  104. let element1 = try await consumer1.next()
  105. XCTAssertEqual(element1, 8 + offset)
  106. let element2 = try await consumer2.next()
  107. XCTAssertEqual(element2, 4 + offset)
  108. let element3 = try await consumer3.next()
  109. XCTAssertEqual(element3, 2 + offset)
  110. }
  111. // Subscribing isn't possible.
  112. await XCTAssertThrowsErrorAsync {
  113. try await stream.collect()
  114. } errorHandler: { error in
  115. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  116. }
  117. source.finish()
  118. // All elements are present. The existing consumers can finish however they choose.
  119. do {
  120. for expected in 18 ..< 20 {
  121. let element = try await consumer1.next()
  122. XCTAssertEqual(element, expected)
  123. }
  124. let end = try await consumer1.next()
  125. XCTAssertNil(end)
  126. }
  127. do {
  128. for expected in 14 ..< 20 {
  129. let element = try await consumer2.next()
  130. XCTAssertEqual(element, expected)
  131. }
  132. let end = try await consumer2.next()
  133. XCTAssertNil(end)
  134. }
  135. do {
  136. for expected in 12 ..< 20 {
  137. let element = try await consumer3.next()
  138. XCTAssertEqual(element, expected)
  139. }
  140. let end = try await consumer3.next()
  141. XCTAssertNil(end)
  142. }
  143. }
  144. func testInvalidateAllConsumersForSingleConcurrentConsumer() async throws {
  145. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  146. for element in 0 ..< 10 {
  147. try await source.write(element)
  148. }
  149. var consumer1 = stream.makeAsyncIterator()
  150. stream.invalidateAllSubscriptions()
  151. await XCTAssertThrowsErrorAsync {
  152. try await consumer1.next()
  153. } errorHandler: { error in
  154. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  155. }
  156. // Subscribe, consume one, then cancel.
  157. var consumer2 = stream.makeAsyncIterator()
  158. do {
  159. let value = try await consumer2.next()
  160. XCTAssertEqual(value, 0)
  161. }
  162. stream.invalidateAllSubscriptions()
  163. await XCTAssertThrowsErrorAsync {
  164. try await consumer2.next()
  165. } errorHandler: { error in
  166. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  167. }
  168. }
  169. func testInvalidateAllConsumersForMultipleConcurrentConsumer() async throws {
  170. let (stream, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  171. for element in 0 ..< 10 {
  172. try await source.write(element)
  173. }
  174. let consumers: [BroadcastAsyncSequence<Int>.AsyncIterator] = (0 ..< 5).map { _ in
  175. stream.makeAsyncIterator()
  176. }
  177. for var consumer in consumers {
  178. let value = try await consumer.next()
  179. XCTAssertEqual(value, 0)
  180. }
  181. stream.invalidateAllSubscriptions()
  182. for var consumer in consumers {
  183. await XCTAssertThrowsErrorAsync {
  184. try await consumer.next()
  185. } errorHandler: { error in
  186. XCTAssertEqual(error as? BroadcastAsyncSequenceError, .consumingTooSlow)
  187. }
  188. }
  189. }
  190. func testCancelSubscriber() async throws {
  191. let (stream, _) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  192. await withTaskGroup(of: Void.self) { group in
  193. group.cancelAll()
  194. group.addTask {
  195. do {
  196. _ = try await stream.collect()
  197. XCTFail()
  198. } catch {
  199. XCTAssert(error is CancellationError)
  200. }
  201. }
  202. }
  203. }
  204. func testCancelProducer() async throws {
  205. let (_, source) = BroadcastAsyncSequence.makeStream(of: Int.self, bufferSize: 16)
  206. for i in 0 ..< 15 {
  207. try await source.write(i)
  208. }
  209. try await withThrowingTaskGroup(of: Void.self) { group in
  210. group.cancelAll()
  211. for _ in 0 ..< 10 {
  212. group.addTask {
  213. try await source.write(42)
  214. }
  215. }
  216. while let result = await group.nextResult() {
  217. XCTAssertThrowsError(try result.get()) { error in
  218. XCTAssert(error is CancellationError)
  219. }
  220. }
  221. }
  222. }
  223. }