2
0

PassthroughMessageSourceTests.swift 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. @testable import GRPC
  18. import XCTest
  19. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  20. class PassthroughMessageSourceTests: GRPCTestCase {
  21. func testBasicUsage() async throws {
  22. let source = PassthroughMessageSource<String, Never>()
  23. let sequence = PassthroughMessageSequence(consuming: source)
  24. XCTAssertEqual(source.yield("foo"), .accepted(queueDepth: 1))
  25. XCTAssertEqual(source.yield("bar"), .accepted(queueDepth: 2))
  26. XCTAssertEqual(source.yield("baz"), .accepted(queueDepth: 3))
  27. let firstTwo = try await sequence.prefix(2).collect()
  28. XCTAssertEqual(firstTwo, ["foo", "bar"])
  29. XCTAssertEqual(source.yield("bar"), .accepted(queueDepth: 2))
  30. XCTAssertEqual(source.yield("foo"), .accepted(queueDepth: 3))
  31. XCTAssertEqual(source.finish(), .accepted(queueDepth: 4))
  32. let theRest = try await sequence.collect()
  33. XCTAssertEqual(theRest, ["baz", "bar", "foo"])
  34. }
  35. func testFinishWithError() async throws {
  36. let source = PassthroughMessageSource<String, TestError>()
  37. XCTAssertEqual(source.yield("one"), .accepted(queueDepth: 1))
  38. XCTAssertEqual(source.yield("two"), .accepted(queueDepth: 2))
  39. XCTAssertEqual(source.yield("three"), .accepted(queueDepth: 3))
  40. XCTAssertEqual(source.finish(throwing: TestError()), .accepted(queueDepth: 4))
  41. // We should still be able to get the elements before the error.
  42. let sequence = PassthroughMessageSequence(consuming: source)
  43. let elements = try await sequence.prefix(3).collect()
  44. XCTAssertEqual(elements, ["one", "two", "three"])
  45. do {
  46. for try await element in sequence {
  47. XCTFail("Unexpected value '\(element)'")
  48. }
  49. XCTFail("AsyncSequence did not throw")
  50. } catch {
  51. XCTAssert(error is TestError)
  52. }
  53. }
  54. func testYieldAfterFinish() async throws {
  55. let source = PassthroughMessageSource<String, Never>()
  56. XCTAssertEqual(source.finish(), .accepted(queueDepth: 1))
  57. XCTAssertEqual(source.yield("foo"), .dropped)
  58. let sequence = PassthroughMessageSequence(consuming: source)
  59. let elements = try await sequence.count()
  60. XCTAssertEqual(elements, 0)
  61. }
  62. func testMultipleFinishes() async throws {
  63. let source = PassthroughMessageSource<String, TestError>()
  64. XCTAssertEqual(source.finish(), .accepted(queueDepth: 1))
  65. XCTAssertEqual(source.finish(), .dropped)
  66. XCTAssertEqual(source.finish(throwing: TestError()), .dropped)
  67. let sequence = PassthroughMessageSequence(consuming: source)
  68. let elements = try await sequence.count()
  69. XCTAssertEqual(elements, 0)
  70. }
  71. func testConsumeBeforeYield() async throws {
  72. let source = PassthroughMessageSource<String, Never>()
  73. let sequence = PassthroughMessageSequence(consuming: source)
  74. await withThrowingTaskGroup(of: Void.self) { group in
  75. group.addTask(priority: .high) {
  76. let iterator = sequence.makeAsyncIterator()
  77. if let next = try await iterator.next() {
  78. XCTAssertEqual(next, "one")
  79. } else {
  80. XCTFail("No value produced")
  81. }
  82. }
  83. group.addTask(priority: .low) {
  84. let result = source.yield("one")
  85. // We can't guarantee that this task will run after the other so we *may* have a queue
  86. // depth of one.
  87. XCTAssert(result == .accepted(queueDepth: 0) || result == .accepted(queueDepth: 1))
  88. }
  89. }
  90. }
  91. func testConsumeBeforeFinish() async throws {
  92. let source = PassthroughMessageSource<String, TestError>()
  93. let sequence = PassthroughMessageSequence(consuming: source)
  94. await withThrowingTaskGroup(of: Void.self) { group in
  95. group.addTask(priority: .high) {
  96. let iterator = sequence.makeAsyncIterator()
  97. await XCTAssertThrowsError(_ = try await iterator.next()) { error in
  98. XCTAssert(error is TestError)
  99. }
  100. }
  101. group.addTask(priority: .low) {
  102. let result = source.finish(throwing: TestError())
  103. // We can't guarantee that this task will run after the other so we *may* have a queue
  104. // depth of one.
  105. XCTAssert(result == .accepted(queueDepth: 0) || result == .accepted(queueDepth: 1))
  106. }
  107. }
  108. }
  109. func testCooperativeCancellationOfSourceOnNext() async throws {
  110. let source = PassthroughMessageSource<String, TestError>()
  111. try await withTaskCancelledAfter(nanoseconds: 100_000) {
  112. do {
  113. _ = try await source.consumeNextElement()
  114. XCTFail("consumeNextElement() should throw CancellationError")
  115. } catch {
  116. XCTAssert(error is CancellationError)
  117. }
  118. }
  119. }
  120. func testCooperativeCancellationOfSequenceOnNext() async throws {
  121. let source = PassthroughMessageSource<String, TestError>()
  122. let sequence = PassthroughMessageSequence(consuming: source)
  123. try await withTaskCancelledAfter(nanoseconds: 100_000) {
  124. do {
  125. for try await _ in sequence {
  126. XCTFail("consumeNextElement() should throw CancellationError")
  127. }
  128. XCTFail("consumeNextElement() should throw CancellationError")
  129. } catch {
  130. XCTAssert(error is CancellationError)
  131. }
  132. }
  133. }
  134. }
  135. fileprivate struct TestError: Error {}
  136. #endif // compiler(>=5.6)