AsyncWriterTests.swift 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. @testable import GRPC
  18. import NIOConcurrencyHelpers
  19. import XCTest
  20. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  21. internal class AsyncWriterTests: GRPCTestCase {
  22. func testSingleWriterHappyPath() {
  23. XCTAsyncTest {
  24. let delegate = CollectingDelegate<String, Int>()
  25. let writer = AsyncWriter(delegate: delegate)
  26. try await writer.write("jimmy")
  27. XCTAssertEqual(delegate.elements, ["jimmy"])
  28. try await writer.write("jab")
  29. XCTAssertEqual(delegate.elements, ["jimmy", "jab"])
  30. try await writer.finish(99)
  31. XCTAssertEqual(delegate.end, 99)
  32. }
  33. }
  34. func testPauseAndResumeWrites() {
  35. XCTAsyncTest {
  36. let delegate = CollectingDelegate<String, Int>()
  37. let writer = AsyncWriter(delegate: delegate)
  38. // pause
  39. await writer.toggleWritability()
  40. async let written1: Void = writer.write("wunch")
  41. XCTAssert(delegate.elements.isEmpty)
  42. // resume
  43. await writer.toggleWritability()
  44. try await written1
  45. XCTAssertEqual(delegate.elements, ["wunch"])
  46. try await writer.finish(0)
  47. XCTAssertEqual(delegate.end, 0)
  48. }
  49. }
  50. func testTooManyWrites() throws {
  51. XCTAsyncTest {
  52. let delegate = CollectingDelegate<String, Int>()
  53. // Zero pending elements means that any write when paused will trigger an error.
  54. let writer = AsyncWriter(maxPendingElements: 0, delegate: delegate)
  55. // pause
  56. await writer.toggleWritability()
  57. await XCTAssertThrowsError(try await writer.write("pontiac")) { error in
  58. XCTAssertEqual(error as? GRPCAsyncWriterError, .tooManyPendingWrites)
  59. }
  60. // resume (we must finish the writer.)
  61. await writer.toggleWritability()
  62. try await writer.finish(0)
  63. XCTAssertEqual(delegate.end, 0)
  64. XCTAssertTrue(delegate.elements.isEmpty)
  65. }
  66. }
  67. func testWriteAfterFinish() throws {
  68. XCTAsyncTest {
  69. let delegate = CollectingDelegate<String, Int>()
  70. let writer = AsyncWriter(delegate: delegate)
  71. try await writer.finish(0)
  72. XCTAssertEqual(delegate.end, 0)
  73. await XCTAssertThrowsError(try await writer.write("cheddar")) { error in
  74. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  75. }
  76. XCTAssertTrue(delegate.elements.isEmpty)
  77. }
  78. }
  79. func testTooManyCallsToFinish() throws {
  80. XCTAsyncTest {
  81. let delegate = CollectingDelegate<String, Int>()
  82. let writer = AsyncWriter(delegate: delegate)
  83. try await writer.finish(0)
  84. XCTAssertEqual(delegate.end, 0)
  85. await XCTAssertThrowsError(try await writer.finish(1)) { error in
  86. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  87. }
  88. // Still 0.
  89. XCTAssertEqual(delegate.end, 0)
  90. }
  91. }
  92. func testCallToFinishWhilePending() throws {
  93. XCTAsyncTest {
  94. let delegate = CollectingDelegate<String, Int>()
  95. let writer = AsyncWriter(delegate: delegate)
  96. // Pause.
  97. await writer.toggleWritability()
  98. async let finished: Void = writer.finish(42)
  99. XCTAssertNil(delegate.end)
  100. // Resume.
  101. await writer.toggleWritability()
  102. try await finished
  103. XCTAssertEqual(delegate.end, 42)
  104. }
  105. }
  106. func testTooManyCallsToFinishWhilePending() throws {
  107. XCTAsyncTest {
  108. let delegate = CollectingDelegate<String, Int>()
  109. let writer = AsyncWriter(delegate: delegate)
  110. // Pause.
  111. await writer.toggleWritability()
  112. // We want to test that when a finish has suspended that another task calling finish results
  113. // in an `AsyncWriterError.alreadyFinished` error.
  114. //
  115. // It's hard to achieve this reliably in an obvious way because we can't guarantee the
  116. // ordering of `Task`s or when they will be suspended during `finish`. However, by pausing the
  117. // writer and calling finish in two separate tasks we guarantee that one will run first and
  118. // suspend (because the writer is paused) and the other will throw an error. When one throws
  119. // an error it can resume the writer allowing the other task to resume successfully.
  120. await withThrowingTaskGroup(of: Void.self) { group in
  121. group.addTask {
  122. do {
  123. try await writer.finish(1)
  124. } catch {
  125. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  126. // Resume.
  127. await writer.toggleWritability()
  128. }
  129. }
  130. group.addTask {
  131. do {
  132. try await writer.finish(2)
  133. } catch {
  134. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  135. // Resume.
  136. await writer.toggleWritability()
  137. }
  138. }
  139. }
  140. // We should definitely be finished by this point.
  141. await XCTAssertThrowsError(try await writer.finish(3)) { error in
  142. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  143. }
  144. }
  145. }
  146. func testCancellationForPendingWrite() {
  147. XCTAsyncTest {
  148. let delegate = CollectingDelegate<String, Int>()
  149. let writer = AsyncWriter(delegate: delegate)
  150. // Pause.
  151. await writer.toggleWritability()
  152. async let pendingWrite: Void = writer.write("foo")
  153. await writer.cancel()
  154. do {
  155. try await pendingWrite
  156. XCTFail("Expected to throw an error.")
  157. } catch is CancellationError {
  158. // Cancellation is fine: we cancelled while the write was pending.
  159. ()
  160. } catch let error as GRPCAsyncWriterError {
  161. // Already finish is also fine: we cancelled before the write was enqueued.
  162. XCTAssertEqual(error, .alreadyFinished)
  163. } catch {
  164. XCTFail("Unexpected error: \(error)")
  165. }
  166. await XCTAssertThrowsError(try await writer.write("bar")) { error in
  167. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  168. }
  169. XCTAssertTrue(delegate.elements.isEmpty)
  170. XCTAssertNil(delegate.end)
  171. }
  172. }
  173. func testCancellationForPendingFinish() {
  174. XCTAsyncTest {
  175. let delegate = CollectingDelegate<String, Int>()
  176. let writer = AsyncWriter(delegate: delegate)
  177. // Pause.
  178. await writer.toggleWritability()
  179. async let pendingWrite: Void = writer.finish(42)
  180. await writer.cancel()
  181. do {
  182. try await pendingWrite
  183. XCTFail("Expected to throw an error.")
  184. } catch is CancellationError {
  185. // Cancellation is fine: we cancelled while the write was pending.
  186. ()
  187. } catch let error as GRPCAsyncWriterError {
  188. // Already finish is also fine: we cancelled before the write was enqueued.
  189. XCTAssertEqual(error, .alreadyFinished)
  190. } catch {
  191. XCTFail("Unexpected error: \(error)")
  192. }
  193. await XCTAssertThrowsError(try await writer.finish(42)) { error in
  194. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  195. }
  196. XCTAssertTrue(delegate.elements.isEmpty)
  197. XCTAssertNil(delegate.end)
  198. }
  199. }
  200. func testMultipleCancellations() {
  201. XCTAsyncTest {
  202. let delegate = CollectingDelegate<String, Int>()
  203. let writer = AsyncWriter(delegate: delegate)
  204. await writer.cancel()
  205. await XCTAssertThrowsError(try await writer.write("1")) { error in
  206. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  207. }
  208. // Fine, no need to throw. Nothing should change.
  209. await writer.cancel()
  210. await XCTAssertThrowsError(try await writer.write("2")) { error in
  211. XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
  212. }
  213. XCTAssertTrue(delegate.elements.isEmpty)
  214. XCTAssertNil(delegate.end)
  215. }
  216. }
  217. }
  218. fileprivate final class CollectingDelegate<Element, End>: AsyncWriterDelegate {
  219. private let lock = Lock()
  220. private var _elements: [Element] = []
  221. private var _end: End?
  222. internal var elements: [Element] {
  223. return self.lock.withLock { self._elements }
  224. }
  225. internal var end: End? {
  226. return self.lock.withLock { self._end }
  227. }
  228. internal func write(_ element: Element) {
  229. self.lock.withLockVoid {
  230. self._elements.append(element)
  231. }
  232. }
  233. internal func writeEnd(_ end: End) {
  234. self.lock.withLockVoid {
  235. self._end = end
  236. }
  237. }
  238. }
  239. #endif // compiler(>=5.5)