2
0

PassthroughMessageSourceTests.swift 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. @testable import GRPC
  18. import XCTest
  19. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  20. class PassthroughMessageSourceTests: GRPCTestCase {
  21. func testBasicUsage() {
  22. XCTAsyncTest {
  23. let source = PassthroughMessageSource<String, Never>()
  24. let sequence = PassthroughMessageSequence(consuming: source)
  25. XCTAssertEqual(source.yield("foo"), .accepted(queueDepth: 1))
  26. XCTAssertEqual(source.yield("bar"), .accepted(queueDepth: 2))
  27. XCTAssertEqual(source.yield("baz"), .accepted(queueDepth: 3))
  28. let firstTwo = try await sequence.prefix(2).collect()
  29. XCTAssertEqual(firstTwo, ["foo", "bar"])
  30. XCTAssertEqual(source.yield("bar"), .accepted(queueDepth: 2))
  31. XCTAssertEqual(source.yield("foo"), .accepted(queueDepth: 3))
  32. XCTAssertEqual(source.finish(), .accepted(queueDepth: 4))
  33. let theRest = try await sequence.collect()
  34. XCTAssertEqual(theRest, ["baz", "bar", "foo"])
  35. }
  36. }
  37. func testFinishWithError() {
  38. XCTAsyncTest {
  39. let source = PassthroughMessageSource<String, TestError>()
  40. XCTAssertEqual(source.yield("one"), .accepted(queueDepth: 1))
  41. XCTAssertEqual(source.yield("two"), .accepted(queueDepth: 2))
  42. XCTAssertEqual(source.yield("three"), .accepted(queueDepth: 3))
  43. XCTAssertEqual(source.finish(throwing: TestError()), .accepted(queueDepth: 4))
  44. // We should still be able to get the elements before the error.
  45. let sequence = PassthroughMessageSequence(consuming: source)
  46. let elements = try await sequence.prefix(3).collect()
  47. XCTAssertEqual(elements, ["one", "two", "three"])
  48. do {
  49. for try await element in sequence {
  50. XCTFail("Unexpected value '\(element)'")
  51. }
  52. XCTFail("AsyncSequence did not throw")
  53. } catch {
  54. XCTAssert(error is TestError)
  55. }
  56. }
  57. }
  58. func testYieldAfterFinish() {
  59. XCTAsyncTest {
  60. let source = PassthroughMessageSource<String, Never>()
  61. XCTAssertEqual(source.finish(), .accepted(queueDepth: 1))
  62. XCTAssertEqual(source.yield("foo"), .dropped)
  63. let sequence = PassthroughMessageSequence(consuming: source)
  64. let elements = try await sequence.count()
  65. XCTAssertEqual(elements, 0)
  66. }
  67. }
  68. func testMultipleFinishes() {
  69. XCTAsyncTest {
  70. let source = PassthroughMessageSource<String, TestError>()
  71. XCTAssertEqual(source.finish(), .accepted(queueDepth: 1))
  72. XCTAssertEqual(source.finish(), .dropped)
  73. XCTAssertEqual(source.finish(throwing: TestError()), .dropped)
  74. let sequence = PassthroughMessageSequence(consuming: source)
  75. let elements = try await sequence.count()
  76. XCTAssertEqual(elements, 0)
  77. }
  78. }
  79. func testConsumeBeforeYield() {
  80. XCTAsyncTest {
  81. let source = PassthroughMessageSource<String, Never>()
  82. let sequence = PassthroughMessageSequence(consuming: source)
  83. await withThrowingTaskGroup(of: Void.self) { group in
  84. group.addTask(priority: .high) {
  85. let iterator = sequence.makeAsyncIterator()
  86. if let next = try await iterator.next() {
  87. XCTAssertEqual(next, "one")
  88. } else {
  89. XCTFail("No value produced")
  90. }
  91. }
  92. group.addTask(priority: .low) {
  93. let result = source.yield("one")
  94. // We can't guarantee that this task will run after the other so we *may* have a queue
  95. // depth of one.
  96. XCTAssert(result == .accepted(queueDepth: 0) || result == .accepted(queueDepth: 1))
  97. }
  98. }
  99. }
  100. }
  101. func testConsumeBeforeFinish() {
  102. XCTAsyncTest {
  103. let source = PassthroughMessageSource<String, TestError>()
  104. let sequence = PassthroughMessageSequence(consuming: source)
  105. await withThrowingTaskGroup(of: Void.self) { group in
  106. group.addTask(priority: .high) {
  107. let iterator = sequence.makeAsyncIterator()
  108. await XCTAssertThrowsError(_ = try await iterator.next()) { error in
  109. XCTAssert(error is TestError)
  110. }
  111. }
  112. group.addTask(priority: .low) {
  113. let result = source.finish(throwing: TestError())
  114. // We can't guarantee that this task will run after the other so we *may* have a queue
  115. // depth of one.
  116. XCTAssert(result == .accepted(queueDepth: 0) || result == .accepted(queueDepth: 1))
  117. }
  118. }
  119. }
  120. }
  121. }
  122. fileprivate struct TestError: Error {}
  123. #endif // compiler(>=5.5)