RequestQueueTests.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import XCTest
  18. @testable import GRPCHTTP2Core
  19. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  20. final class RequestQueueTests: XCTestCase {
  21. struct AnErrorToAvoidALeak: Error {}
  22. func testPopFirstEmpty() {
  23. var queue = RequestQueue()
  24. XCTAssertNil(queue.popFirst())
  25. }
  26. func testPopFirstNonEmpty() async {
  27. _ = try? await withCheckedThrowingContinuation { continuation in
  28. var queue = RequestQueue()
  29. let id = QueueEntryID()
  30. queue.append(continuation: continuation, waitForReady: false, id: id)
  31. guard let popped = queue.popFirst() else {
  32. return XCTFail("Missing continuation")
  33. }
  34. XCTAssertNil(queue.popFirst())
  35. popped.resume(throwing: AnErrorToAvoidALeak())
  36. }
  37. }
  38. func testPopFirstMultiple() async {
  39. await withTaskGroup(of: QueueEntryID.self) { group in
  40. let queue = _LockedValueBox(RequestQueue())
  41. let signal1 = AsyncStream.makeStream(of: Void.self)
  42. let signal2 = AsyncStream.makeStream(of: Void.self)
  43. let id1 = QueueEntryID()
  44. let id2 = QueueEntryID()
  45. group.addTask {
  46. _ = try? await withCheckedThrowingContinuation { continuation in
  47. queue.withLockedValue {
  48. $0.append(continuation: continuation, waitForReady: false, id: id1)
  49. }
  50. signal1.continuation.yield()
  51. signal1.continuation.finish()
  52. }
  53. return id1
  54. }
  55. group.addTask {
  56. // Wait until instructed to append.
  57. for await _ in signal1.stream {}
  58. _ = try? await withCheckedThrowingContinuation { continuation in
  59. queue.withLockedValue {
  60. $0.append(continuation: continuation, waitForReady: false, id: id2)
  61. }
  62. signal2.continuation.yield()
  63. signal2.continuation.finish()
  64. }
  65. return id2
  66. }
  67. // Wait for both continuations to be enqueued.
  68. for await _ in signal2.stream {}
  69. for id in [id1, id2] {
  70. let continuation = queue.withLockedValue { $0.popFirst() }
  71. continuation?.resume(throwing: AnErrorToAvoidALeak())
  72. let actual = await group.next()
  73. XCTAssertEqual(id, actual)
  74. }
  75. }
  76. }
  77. func testRemoveEntryByID() async {
  78. _ = try? await withCheckedThrowingContinuation { continuation in
  79. var queue = RequestQueue()
  80. let id = QueueEntryID()
  81. queue.append(continuation: continuation, waitForReady: false, id: id)
  82. guard let popped = queue.removeEntry(withID: id) else {
  83. return XCTFail("Missing continuation")
  84. }
  85. XCTAssertNil(queue.removeEntry(withID: id))
  86. popped.resume(throwing: AnErrorToAvoidALeak())
  87. }
  88. }
  89. func testRemoveEntryByIDMultiple() async {
  90. await withTaskGroup(of: QueueEntryID.self) { group in
  91. let queue = _LockedValueBox(RequestQueue())
  92. let signal1 = AsyncStream.makeStream(of: Void.self)
  93. let signal2 = AsyncStream.makeStream(of: Void.self)
  94. let id1 = QueueEntryID()
  95. let id2 = QueueEntryID()
  96. group.addTask {
  97. _ = try? await withCheckedThrowingContinuation { continuation in
  98. queue.withLockedValue {
  99. $0.append(continuation: continuation, waitForReady: false, id: id1)
  100. }
  101. signal1.continuation.yield()
  102. signal1.continuation.finish()
  103. }
  104. return id1
  105. }
  106. group.addTask {
  107. // Wait until instructed to append.
  108. for await _ in signal1.stream {}
  109. _ = try? await withCheckedThrowingContinuation { continuation in
  110. queue.withLockedValue {
  111. $0.append(continuation: continuation, waitForReady: false, id: id2)
  112. }
  113. signal2.continuation.yield()
  114. signal2.continuation.finish()
  115. }
  116. return id2
  117. }
  118. // Wait for both continuations to be enqueued.
  119. for await _ in signal2.stream {}
  120. for id in [id1, id2] {
  121. let continuation = queue.withLockedValue { $0.removeEntry(withID: id) }
  122. continuation?.resume(throwing: AnErrorToAvoidALeak())
  123. let actual = await group.next()
  124. XCTAssertEqual(id, actual)
  125. }
  126. }
  127. }
  128. func testRemoveFastFailingEntries() async throws {
  129. let queue = _LockedValueBox(RequestQueue())
  130. let enqueued = AsyncStream.makeStream(of: Void.self)
  131. try await withThrowingTaskGroup(of: Void.self) { group in
  132. var waitForReadyIDs = [QueueEntryID]()
  133. var failFastIDs = [QueueEntryID]()
  134. for _ in 0 ..< 50 {
  135. waitForReadyIDs.append(QueueEntryID())
  136. failFastIDs.append(QueueEntryID())
  137. }
  138. for ids in [waitForReadyIDs, failFastIDs] {
  139. let waitForReady = ids == waitForReadyIDs
  140. for id in ids {
  141. group.addTask {
  142. do {
  143. _ = try await withCheckedThrowingContinuation { continuation in
  144. queue.withLockedValue {
  145. $0.append(continuation: continuation, waitForReady: waitForReady, id: id)
  146. }
  147. enqueued.continuation.yield()
  148. }
  149. } catch is AnErrorToAvoidALeak {
  150. ()
  151. }
  152. }
  153. }
  154. }
  155. // Wait for all continuations to be enqueued.
  156. var numberEnqueued = 0
  157. for await _ in enqueued.stream {
  158. numberEnqueued += 1
  159. if numberEnqueued == (waitForReadyIDs.count + failFastIDs.count) {
  160. enqueued.continuation.finish()
  161. }
  162. }
  163. // Remove all fast-failing continuations.
  164. let continuations = queue.withLockedValue {
  165. $0.removeFastFailingEntries()
  166. }
  167. for continuation in continuations {
  168. continuation.resume(throwing: AnErrorToAvoidALeak())
  169. }
  170. for id in failFastIDs {
  171. queue.withLockedValue {
  172. XCTAssertNil($0.removeEntry(withID: id))
  173. }
  174. }
  175. for id in waitForReadyIDs {
  176. let maybeContinuation = queue.withLockedValue { $0.removeEntry(withID: id) }
  177. let continuation = try XCTUnwrap(maybeContinuation)
  178. continuation.resume(throwing: AnErrorToAvoidALeak())
  179. }
  180. }
  181. }
  182. func testRemoveAll() async throws {
  183. let queue = _LockedValueBox(RequestQueue())
  184. let enqueued = AsyncStream.makeStream(of: Void.self)
  185. await withThrowingTaskGroup(of: Void.self) { group in
  186. for _ in 0 ..< 10 {
  187. group.addTask {
  188. _ = try await withCheckedThrowingContinuation { continuation in
  189. queue.withLockedValue {
  190. $0.append(continuation: continuation, waitForReady: false, id: QueueEntryID())
  191. }
  192. enqueued.continuation.yield()
  193. }
  194. }
  195. }
  196. // Wait for all continuations to be enqueued.
  197. var numberEnqueued = 0
  198. for await _ in enqueued.stream {
  199. numberEnqueued += 1
  200. if numberEnqueued == 10 {
  201. enqueued.continuation.finish()
  202. }
  203. }
  204. let continuations = queue.withLockedValue { $0.removeAll() }
  205. XCTAssertEqual(continuations.count, 10)
  206. XCTAssertNil(queue.withLockedValue { $0.popFirst() })
  207. for continuation in continuations {
  208. continuation.resume(throwing: AnErrorToAvoidALeak())
  209. }
  210. }
  211. }
  212. }