AsyncWriterTests.swift 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. @testable import GRPC
  18. import NIOConcurrencyHelpers
  19. import XCTest
  20. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  21. internal class AsyncWriterTests: GRPCTestCase {
  22. func testSingleWriterHappyPath() async throws {
  23. let delegate = CollectingDelegate<String, Int>()
  24. let writer = AsyncWriter(delegate: delegate)
  25. try await writer.write("jimmy")
  26. XCTAssertEqual(delegate.elements, ["jimmy"])
  27. try await writer.write("jab")
  28. XCTAssertEqual(delegate.elements, ["jimmy", "jab"])
  29. try await writer.finish(99)
  30. XCTAssertEqual(delegate.end, 99)
  31. }
  32. func testPauseAndResumeWrites() async throws {
  33. let delegate = CollectingDelegate<String, Int>()
  34. let writer = AsyncWriter(delegate: delegate)
  35. // pause
  36. await writer.toggleWritability()
  37. async let written1: Void = writer.write("wunch")
  38. XCTAssert(delegate.elements.isEmpty)
  39. // resume
  40. await writer.toggleWritability()
  41. try await written1
  42. XCTAssertEqual(delegate.elements, ["wunch"])
  43. try await writer.finish(0)
  44. XCTAssertEqual(delegate.end, 0)
  45. }
  46. func testTooManyWrites() async throws {
  47. let delegate = CollectingDelegate<String, Int>()
  48. // Zero pending elements means that any write when paused will trigger an error.
  49. let writer = AsyncWriter(maxPendingElements: 0, delegate: delegate)
  50. // pause
  51. await writer.toggleWritability()
  52. await XCTAssertThrowsError(try await writer.write("pontiac")) { error in
  53. XCTAssertEqual(error as? GRPCAsyncWriterError, .tooManyPendingWrites)
  54. }
  55. // resume (we must finish the writer.)
  56. await writer.toggleWritability()
  57. try await writer.finish(0)
  58. XCTAssertEqual(delegate.end, 0)
  59. XCTAssertTrue(delegate.elements.isEmpty)
  60. }
  61. func testWriteAfterFinish() async throws {
  62. let delegate = CollectingDelegate<String, Int>()
  63. let writer = AsyncWriter(delegate: delegate)
  64. try await writer.finish(0)
  65. XCTAssertEqual(delegate.end, 0)
  66. await XCTAssertThrowsError(try await writer.write("cheddar")) { error in
  67. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  68. }
  69. XCTAssertTrue(delegate.elements.isEmpty)
  70. }
  71. func testTooManyCallsToFinish() async throws {
  72. let delegate = CollectingDelegate<String, Int>()
  73. let writer = AsyncWriter(delegate: delegate)
  74. try await writer.finish(0)
  75. XCTAssertEqual(delegate.end, 0)
  76. await XCTAssertThrowsError(try await writer.finish(1)) { error in
  77. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  78. }
  79. // Still 0.
  80. XCTAssertEqual(delegate.end, 0)
  81. }
  82. func testCallToFinishWhilePending() async throws {
  83. let delegate = CollectingDelegate<String, Int>()
  84. let writer = AsyncWriter(delegate: delegate)
  85. // Pause.
  86. await writer.toggleWritability()
  87. async let finished: Void = writer.finish(42)
  88. XCTAssertNil(delegate.end)
  89. // Resume.
  90. await writer.toggleWritability()
  91. try await finished
  92. XCTAssertEqual(delegate.end, 42)
  93. }
  94. func testTooManyCallsToFinishWhilePending() async throws {
  95. let delegate = CollectingDelegate<String, Int>()
  96. let writer = AsyncWriter(delegate: delegate)
  97. // Pause.
  98. await writer.toggleWritability()
  99. // We want to test that when a finish has suspended that another task calling finish results
  100. // in an `AsyncWriterError.alreadyFinished` error.
  101. //
  102. // It's hard to achieve this reliably in an obvious way because we can't guarantee the
  103. // ordering of `Task`s or when they will be suspended during `finish`. However, by pausing the
  104. // writer and calling finish in two separate tasks we guarantee that one will run first and
  105. // suspend (because the writer is paused) and the other will throw an error. When one throws
  106. // an error it can resume the writer allowing the other task to resume successfully.
  107. await withThrowingTaskGroup(of: Void.self) { group in
  108. group.addTask {
  109. do {
  110. try await writer.finish(1)
  111. } catch {
  112. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  113. // Resume.
  114. await writer.toggleWritability()
  115. }
  116. }
  117. group.addTask {
  118. do {
  119. try await writer.finish(2)
  120. } catch {
  121. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  122. // Resume.
  123. await writer.toggleWritability()
  124. }
  125. }
  126. }
  127. // We should definitely be finished by this point.
  128. await XCTAssertThrowsError(try await writer.finish(3)) { error in
  129. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  130. }
  131. }
  132. func testCancellationForPendingWrite() async throws {
  133. let delegate = CollectingDelegate<String, Int>()
  134. let writer = AsyncWriter(delegate: delegate)
  135. // Pause.
  136. await writer.toggleWritability()
  137. async let pendingWrite: Void = writer.write("foo")
  138. await writer.cancel()
  139. do {
  140. try await pendingWrite
  141. XCTFail("Expected to throw an error.")
  142. } catch is CancellationError {
  143. // Cancellation is fine: we cancelled while the write was pending.
  144. ()
  145. } catch let error as GRPCAsyncWriterError {
  146. // Already finish is also fine: we cancelled before the write was enqueued.
  147. XCTAssertEqual(error, .alreadyFinished)
  148. } catch {
  149. XCTFail("Unexpected error: \(error)")
  150. }
  151. await XCTAssertThrowsError(try await writer.write("bar")) { error in
  152. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  153. }
  154. XCTAssertTrue(delegate.elements.isEmpty)
  155. XCTAssertNil(delegate.end)
  156. }
  157. func testCancellationForPendingFinish() async throws {
  158. let delegate = CollectingDelegate<String, Int>()
  159. let writer = AsyncWriter(delegate: delegate)
  160. // Pause.
  161. await writer.toggleWritability()
  162. async let pendingWrite: Void = writer.finish(42)
  163. await writer.cancel()
  164. do {
  165. try await pendingWrite
  166. XCTFail("Expected to throw an error.")
  167. } catch is CancellationError {
  168. // Cancellation is fine: we cancelled while the write was pending.
  169. ()
  170. } catch let error as GRPCAsyncWriterError {
  171. // Already finish is also fine: we cancelled before the write was enqueued.
  172. XCTAssertEqual(error, .alreadyFinished)
  173. } catch {
  174. XCTFail("Unexpected error: \(error)")
  175. }
  176. await XCTAssertThrowsError(try await writer.finish(42)) { error in
  177. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  178. }
  179. XCTAssertTrue(delegate.elements.isEmpty)
  180. XCTAssertNil(delegate.end)
  181. }
  182. func testMultipleCancellations() async throws {
  183. let delegate = CollectingDelegate<String, Int>()
  184. let writer = AsyncWriter(delegate: delegate)
  185. await writer.cancel()
  186. await XCTAssertThrowsError(try await writer.write("1")) { error in
  187. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  188. }
  189. // Fine, no need to throw. Nothing should change.
  190. await writer.cancel()
  191. await XCTAssertThrowsError(try await writer.write("2")) { error in
  192. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  193. }
  194. XCTAssertTrue(delegate.elements.isEmpty)
  195. XCTAssertNil(delegate.end)
  196. }
  197. }
  198. fileprivate final class CollectingDelegate<
  199. Element: Sendable,
  200. End: Sendable
  201. >: AsyncWriterDelegate, @unchecked Sendable {
  202. private let lock = Lock()
  203. private var _elements: [Element] = []
  204. private var _end: End?
  205. internal var elements: [Element] {
  206. return self.lock.withLock { self._elements }
  207. }
  208. internal var end: End? {
  209. return self.lock.withLock { self._end }
  210. }
  211. internal func write(_ element: Element) {
  212. self.lock.withLockVoid {
  213. self._elements.append(element)
  214. }
  215. }
  216. internal func writeEnd(_ end: End) {
  217. self.lock.withLockVoid {
  218. self._end = end
  219. }
  220. }
  221. }
  222. #endif // compiler(>=5.6)