RequestQueue.swift 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import DequeModule
  17. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  18. struct RequestQueue {
  19. typealias Continuation = CheckedContinuation<LoadBalancer, any Error>
  20. private struct QueueEntry {
  21. var continuation: Continuation
  22. var waitForReady: Bool
  23. }
  24. /// IDs of entries in the order they should be processed.
  25. ///
  26. /// If an ID is popped from the queue but isn't present in `entriesByID` then it must've
  27. /// been removed directly by its ID, this is fine.
  28. private var ids: Deque<QueueEntryID>
  29. /// Entries keyed by their ID.
  30. private var entriesByID: [QueueEntryID: QueueEntry]
  31. init() {
  32. self.ids = []
  33. self.entriesByID = [:]
  34. }
  35. /// Remove the first continuation from the queue.
  36. mutating func popFirst() -> Continuation? {
  37. while let id = self.ids.popFirst() {
  38. if let waiter = self.entriesByID.removeValue(forKey: id) {
  39. return waiter.continuation
  40. }
  41. }
  42. assert(self.entriesByID.isEmpty)
  43. return nil
  44. }
  45. /// Append a continuation to the queue.
  46. ///
  47. /// - Parameters:
  48. /// - continuation: The continuation to append.
  49. /// - waitForReady: Whether the request associated with the continuation is willing to wait for
  50. /// the channel to become ready.
  51. /// - id: The unique ID of the queue entry.
  52. mutating func append(continuation: Continuation, waitForReady: Bool, id: QueueEntryID) {
  53. let entry = QueueEntry(continuation: continuation, waitForReady: waitForReady)
  54. let removed = self.entriesByID.updateValue(entry, forKey: id)
  55. assert(removed == nil, "id '\(id)' reused")
  56. self.ids.append(id)
  57. }
  58. /// Remove the waiter with the given ID, if it exists.
  59. mutating func removeEntry(withID id: QueueEntryID) -> Continuation? {
  60. let waiter = self.entriesByID.removeValue(forKey: id)
  61. return waiter?.continuation
  62. }
  63. /// Remove all waiters, returning their continuations.
  64. mutating func removeAll() -> [Continuation] {
  65. let continuations = Array(self.entriesByID.values.map { $0.continuation })
  66. self.ids.removeAll(keepingCapacity: true)
  67. self.entriesByID.removeAll(keepingCapacity: true)
  68. return continuations
  69. }
  70. /// Remove all entries which were appended to the queue with a value of `false`
  71. /// for `waitForReady`.
  72. mutating func removeFastFailingEntries() -> [Continuation] {
  73. var removed = [Continuation]()
  74. var remainingIDs = Deque<QueueEntryID>()
  75. var remainingEntriesByID = [QueueEntryID: QueueEntry]()
  76. while let id = self.ids.popFirst() {
  77. guard let waiter = self.entriesByID.removeValue(forKey: id) else { continue }
  78. if waiter.waitForReady {
  79. remainingEntriesByID[id] = waiter
  80. remainingIDs.append(id)
  81. } else {
  82. removed.append(waiter.continuation)
  83. }
  84. }
  85. assert(self.entriesByID.isEmpty)
  86. self.entriesByID = remainingEntriesByID
  87. self.ids = remainingIDs
  88. return removed
  89. }
  90. }