RequestQueueTests.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import Synchronization
  18. import XCTest
  19. @testable import GRPCHTTP2Core
  20. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  21. final class RequestQueueTests: XCTestCase {
  22. struct AnErrorToAvoidALeak: Error {}
  23. func testPopFirstEmpty() {
  24. var queue = RequestQueue()
  25. XCTAssertNil(queue.popFirst())
  26. }
  27. func testPopFirstNonEmpty() async {
  28. _ = try? await withCheckedThrowingContinuation { continuation in
  29. var queue = RequestQueue()
  30. let id = QueueEntryID()
  31. queue.append(continuation: continuation, waitForReady: false, id: id)
  32. guard let popped = queue.popFirst() else {
  33. return XCTFail("Missing continuation")
  34. }
  35. XCTAssertNil(queue.popFirst())
  36. popped.resume(throwing: AnErrorToAvoidALeak())
  37. }
  38. }
  39. func testPopFirstMultiple() async {
  40. await withTaskGroup(of: QueueEntryID.self) { group in
  41. let queue = SharedRequestQueue()
  42. let signal1 = AsyncStream.makeStream(of: Void.self)
  43. let signal2 = AsyncStream.makeStream(of: Void.self)
  44. let id1 = QueueEntryID()
  45. let id2 = QueueEntryID()
  46. group.addTask {
  47. _ = try? await withCheckedThrowingContinuation { continuation in
  48. queue.withQueue {
  49. $0.append(continuation: continuation, waitForReady: false, id: id1)
  50. }
  51. signal1.continuation.yield()
  52. signal1.continuation.finish()
  53. }
  54. return id1
  55. }
  56. group.addTask {
  57. // Wait until instructed to append.
  58. for await _ in signal1.stream {}
  59. _ = try? await withCheckedThrowingContinuation { continuation in
  60. queue.withQueue {
  61. $0.append(continuation: continuation, waitForReady: false, id: id2)
  62. }
  63. signal2.continuation.yield()
  64. signal2.continuation.finish()
  65. }
  66. return id2
  67. }
  68. // Wait for both continuations to be enqueued.
  69. for await _ in signal2.stream {}
  70. for id in [id1, id2] {
  71. let continuation = queue.withQueue { $0.popFirst() }
  72. continuation?.resume(throwing: AnErrorToAvoidALeak())
  73. let actual = await group.next()
  74. XCTAssertEqual(id, actual)
  75. }
  76. }
  77. }
  78. func testRemoveEntryByID() async {
  79. _ = try? await withCheckedThrowingContinuation { continuation in
  80. var queue = RequestQueue()
  81. let id = QueueEntryID()
  82. queue.append(continuation: continuation, waitForReady: false, id: id)
  83. guard let popped = queue.removeEntry(withID: id) else {
  84. return XCTFail("Missing continuation")
  85. }
  86. XCTAssertNil(queue.removeEntry(withID: id))
  87. popped.resume(throwing: AnErrorToAvoidALeak())
  88. }
  89. }
  90. func testRemoveEntryByIDMultiple() async {
  91. await withTaskGroup(of: QueueEntryID.self) { group in
  92. let queue = SharedRequestQueue()
  93. let signal1 = AsyncStream.makeStream(of: Void.self)
  94. let signal2 = AsyncStream.makeStream(of: Void.self)
  95. let id1 = QueueEntryID()
  96. let id2 = QueueEntryID()
  97. group.addTask {
  98. _ = try? await withCheckedThrowingContinuation { continuation in
  99. queue.withQueue {
  100. $0.append(continuation: continuation, waitForReady: false, id: id1)
  101. }
  102. signal1.continuation.yield()
  103. signal1.continuation.finish()
  104. }
  105. return id1
  106. }
  107. group.addTask {
  108. // Wait until instructed to append.
  109. for await _ in signal1.stream {}
  110. _ = try? await withCheckedThrowingContinuation { continuation in
  111. queue.withQueue {
  112. $0.append(continuation: continuation, waitForReady: false, id: id2)
  113. }
  114. signal2.continuation.yield()
  115. signal2.continuation.finish()
  116. }
  117. return id2
  118. }
  119. // Wait for both continuations to be enqueued.
  120. for await _ in signal2.stream {}
  121. for id in [id1, id2] {
  122. let continuation = queue.withQueue { $0.removeEntry(withID: id) }
  123. continuation?.resume(throwing: AnErrorToAvoidALeak())
  124. let actual = await group.next()
  125. XCTAssertEqual(id, actual)
  126. }
  127. }
  128. }
  129. func testRemoveFastFailingEntries() async throws {
  130. let queue = SharedRequestQueue()
  131. let enqueued = AsyncStream.makeStream(of: Void.self)
  132. try await withThrowingTaskGroup(of: Void.self) { group in
  133. var waitForReadyIDs = [QueueEntryID]()
  134. var failFastIDs = [QueueEntryID]()
  135. for _ in 0 ..< 50 {
  136. waitForReadyIDs.append(QueueEntryID())
  137. failFastIDs.append(QueueEntryID())
  138. }
  139. for ids in [waitForReadyIDs, failFastIDs] {
  140. let waitForReady = ids == waitForReadyIDs
  141. for id in ids {
  142. group.addTask {
  143. do {
  144. _ = try await withCheckedThrowingContinuation { continuation in
  145. queue.withQueue {
  146. $0.append(continuation: continuation, waitForReady: waitForReady, id: id)
  147. }
  148. enqueued.continuation.yield()
  149. }
  150. } catch is AnErrorToAvoidALeak {
  151. ()
  152. }
  153. }
  154. }
  155. }
  156. // Wait for all continuations to be enqueued.
  157. var numberEnqueued = 0
  158. for await _ in enqueued.stream {
  159. numberEnqueued += 1
  160. if numberEnqueued == (waitForReadyIDs.count + failFastIDs.count) {
  161. enqueued.continuation.finish()
  162. }
  163. }
  164. // Remove all fast-failing continuations.
  165. let continuations = queue.withQueue {
  166. $0.removeFastFailingEntries()
  167. }
  168. for continuation in continuations {
  169. continuation.resume(throwing: AnErrorToAvoidALeak())
  170. }
  171. for id in failFastIDs {
  172. queue.withQueue {
  173. XCTAssertNil($0.removeEntry(withID: id))
  174. }
  175. }
  176. for id in waitForReadyIDs {
  177. let maybeContinuation = queue.withQueue { $0.removeEntry(withID: id) }
  178. let continuation = try XCTUnwrap(maybeContinuation)
  179. continuation.resume(throwing: AnErrorToAvoidALeak())
  180. }
  181. }
  182. }
  183. func testRemoveAll() async throws {
  184. let queue = SharedRequestQueue()
  185. let enqueued = AsyncStream.makeStream(of: Void.self)
  186. await withThrowingTaskGroup(of: Void.self) { group in
  187. for _ in 0 ..< 10 {
  188. group.addTask {
  189. _ = try await withCheckedThrowingContinuation { continuation in
  190. queue.withQueue {
  191. $0.append(continuation: continuation, waitForReady: false, id: QueueEntryID())
  192. }
  193. enqueued.continuation.yield()
  194. }
  195. }
  196. }
  197. // Wait for all continuations to be enqueued.
  198. var numberEnqueued = 0
  199. for await _ in enqueued.stream {
  200. numberEnqueued += 1
  201. if numberEnqueued == 10 {
  202. enqueued.continuation.finish()
  203. }
  204. }
  205. let continuations = queue.withQueue { $0.removeAll() }
  206. XCTAssertEqual(continuations.count, 10)
  207. XCTAssertNil(queue.withQueue { $0.popFirst() })
  208. for continuation in continuations {
  209. continuation.resume(throwing: AnErrorToAvoidALeak())
  210. }
  211. }
  212. }
  213. final class SharedRequestQueue: Sendable {
  214. private let protectedQueue: Mutex<RequestQueue>
  215. init() {
  216. self.protectedQueue = Mutex(RequestQueue())
  217. }
  218. func withQueue<T>(_ body: @Sendable (inout RequestQueue) throws -> T) rethrows -> T {
  219. try self.protectedQueue.withLock {
  220. try body(&$0)
  221. }
  222. }
  223. }
  224. }