Browse Source

Make testRetriesCantBeExecutedForTooManyRequestMessages more reliable (#1781)

Motivation:

The 'testRetriesCantBeExecutedForTooManyRequestMessages' test wedges
infrequently. After some diagnosis this appears to be due to the
intersection of two bugs. The first is that when yielding an element to
the sequence, if some consumers exist, of which a subset are waiting for
the next element and the remainder are not 'slow', the continuations of
the fastest consumers would not be resumed when a new element was added.

The other bug is a timing issue: when waiting for the next element a
subscriber may be told to suspend. However, the subscriber must drop and
then reacquire the lock to store the continuation. The state wasn't
re-checked on storing the continuation which opened up a window where
the element may have become present between asking for it and storing
the continuation.

Modifications:

- Don't release the lock between attempting to consume an element and
  suspending to wait for it.
- Resume waiting consumers more frequently.
- Make a few tests less flaky

Result:

Test didn't wedge in 10k iterations
George Barnett 2 years ago
parent
commit
2c27ad5123

+ 34 - 0
Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift

@@ -253,6 +253,40 @@ public struct _LockedValueBox<Value> {
   public func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
     return try self.storage.withLockedValue(mutate)
   }
+
+  /// An unsafe view over the locked value box.
+  ///
+  /// Prefer ``withLockedValue(_:)`` where possible.
+  public var unsafe: Unsafe {
+    Unsafe(storage: self.storage)
+  }
+
+  public struct Unsafe {
+    @usableFromInline
+    let storage: LockStorage<Value>
+
+    /// Manually acquire the lock.
+    @inlinable
+    public func lock() {
+      self.storage.lock()
+    }
+
+    /// Manually release the lock.
+    @inlinable
+    public func unlock() {
+      self.storage.unlock()
+    }
+
+    /// Mutate the value, assuming the lock has been acquired manually.
+    @inlinable
+    public func withValueAssumingLockIsAcquired<T>(
+      _ mutate: (inout Value) throws -> T
+    ) rethrows -> T {
+      return try self.storage.withUnsafeMutablePointerToHeader { value in
+        try mutate(&value.pointee)
+      }
+    }
+  }
 }
 
 extension _LockedValueBox: Sendable where Value: Sendable {}

+ 35 - 20
Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift

@@ -261,20 +261,26 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
   func nextElement(
     forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
   ) async throws -> Element? {
-    let onNext = self._state.withLockedValue { $0.nextElement(forSubscriber: id) }
+    return try await withTaskCancellationHandler {
+      self._state.unsafe.lock()
+      let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
+        $0.nextElement(forSubscriber: id)
+      }
 
-    switch onNext {
-    case .return(let returnAndProduceMore):
-      returnAndProduceMore.producers.resume()
-      return try returnAndProduceMore.nextResult.get()
+      switch onNext {
+      case .return(let returnAndProduceMore):
+        self._state.unsafe.unlock()
+        returnAndProduceMore.producers.resume()
+        return try returnAndProduceMore.nextResult.get()
 
-    case .suspend:
-      return try await withTaskCancellationHandler {
+      case .suspend:
         return try await withCheckedThrowingContinuation { continuation in
-          let onSetContinuation = self._state.withLockedValue { state in
+          let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
             state.setContinuation(continuation, forSubscription: id)
           }
 
+          self._state.unsafe.unlock()
+
           switch onSetContinuation {
           case .resume(let continuation, let result):
             continuation.resume(with: result)
@@ -282,17 +288,17 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
             ()
           }
         }
-      } onCancel: {
-        let onCancel = self._state.withLockedValue { state in
-          state.cancelSubscription(withID: id)
-        }
+      }
+    } onCancel: {
+      let onCancel = self._state.withLockedValue { state in
+        state.cancelSubscription(withID: id)
+      }
 
-        switch onCancel {
-        case .resume(let continuation, let result):
-          continuation.resume(with: result)
-        case .none:
-          ()
-        }
+      switch onCancel {
+      case .resume(let continuation, let result):
+        continuation.resume(with: result)
+      case .none:
+        ()
       }
     }
   }
@@ -572,9 +578,18 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
               self.producerToken += 1
               onYield = .suspend(token)
             } else {
-              // No consumers are slow. Remove the oldest value.
+              // No consumers are slow, some subscribers exist, a subset of which are waiting
+              // for a value. Drop the oldest value and resume the fastest consumers.
               self.elements.removeFirst()
-              onYield = .none
+              let continuations = self.subscriptions.takeContinuations().map {
+                ConsumerContinuations(continuations: $0, result: .success(element))
+              }
+
+              if let continuations = continuations {
+                onYield = .resume(continuations)
+              } else {
+                onYield = .none
+              }
             }
 
           case self.subscriptions.count: