Browse Source

Add an internal pausable writer (#1245)

Motivation:

Async RPCs which stream outbound messages (i.e. requests on the client,
responses on the server) will be provided with a means to asynchronously
send messages. This object should suspend if the underlying channel is
not currently writable.

Modifications:

Add an 'AsyncWriter' as the underlying type for these objects which
suspends if the writer is 'paused' and resumes when the writer is
resumed.

Result:

We have a writer capable of suspending writes.
George Barnett 4 years ago
parent
commit
0e28b93aea

+ 312 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

@@ -0,0 +1,312 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+import NIOCore
+
+/// An asynchronous writer which forwards messages to a delegate.
+///
+/// Forwarding of messages to the delegate may be paused and resumed by controlling the writability
+/// of the writer. This may be controlled by calls to ``toggleWritability()``. When the writer is
+/// paused (by becoming unwritable) calls to ``write(_:)`` may suspend. When the writer is resumed
+/// (by becoming writable) any calls which are suspended may be resumed.
+///
+/// The writer must also be "finished" with a final value: as for writing, calls to ``finish(_:)``
+/// may suspend if the writer has been paused.
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+@usableFromInline
+internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
+  @usableFromInline
+  internal typealias Element = Delegate.Element
+
+  @usableFromInline
+  internal typealias End = Delegate.End
+
+  /// A value pending a write.
+  @usableFromInline
+  internal struct _Pending<Value> {
+    @usableFromInline
+    var value: Value
+
+    @usableFromInline
+    var continuation: CheckedContinuation<Void, Error>
+
+    @inlinable
+    internal init(_ value: Value, continuation: CheckedContinuation<Void, Error>) {
+      self.value = value
+      self.continuation = continuation
+    }
+  }
+
+  @usableFromInline
+  typealias PendingElement = _Pending<Element>
+
+  @usableFromInline
+  typealias PendingEnd = _Pending<End>
+
+  @usableFromInline
+  internal enum _CompletionState {
+    /// Finish hasn't been called yet. May move to `pending` or `completed`.
+    case incomplete
+    /// Finish has been called but the writer is paused. May move to `completed`.
+    case pending(PendingEnd)
+    /// The completion message has been sent to the delegate. This is a terminal state.
+    case completed
+
+    /// Move from `pending` to `completed` and return the `PendingCompletion`. Returns `nil` if
+    /// the state was not `pending`.
+    @inlinable
+    mutating func completeIfPending() -> PendingEnd? {
+      switch self {
+      case let .pending(pending):
+        self = .completed
+        return pending
+      case .incomplete, .completed:
+        return nil
+      }
+    }
+
+    @usableFromInline
+    var isPendingOrCompleted: Bool {
+      switch self {
+      case .incomplete:
+        return false
+      case .pending, .completed:
+        return true
+      }
+    }
+  }
+
+  /// The maximum number of pending elements. `pendingElements` must not grow beyond this limit.
+  @usableFromInline
+  internal let _maxPendingElements: Int
+
+  /// The maximum number of writes to the delegate made in `resume` before yielding to allow other
+  /// values to be queued.
+  @usableFromInline
+  internal let _maxWritesBeforeYield: Int
+
+  /// Elements and continuations which have been buffered but are awaiting consumption by the
+  /// delegate.
+  @usableFromInline
+  internal var _pendingElements: CircularBuffer<PendingElement>
+
+  /// The completion state of the writer.
+  @usableFromInline
+  internal var _completionState: _CompletionState
+
+  /// Whether the writer is paused.
+  @usableFromInline
+  internal var _isPaused: Bool = false
+
+  /// The delegate to process elements. By convention we call the delegate before resuming any
+  /// continuation.
+  @usableFromInline
+  internal let _delegate: Delegate
+
+  @inlinable
+  internal init(
+    maxPendingElements: Int = 16,
+    maxWritesBeforeYield: Int = 5,
+    delegate: Delegate
+  ) {
+    self._maxPendingElements = maxPendingElements
+    self._maxWritesBeforeYield = maxWritesBeforeYield
+    self._pendingElements = CircularBuffer(initialCapacity: maxPendingElements)
+    self._completionState = .incomplete
+    self._delegate = delegate
+  }
+
+  deinit {
+    switch self._completionState {
+    case .completed:
+      ()
+    case .incomplete, .pending:
+      assertionFailure("writer has not completed is pending completion")
+    }
+  }
+
+  /// As ``toggleWritability()`` but executed asynchronously.
+  @usableFromInline
+  internal nonisolated func toggleWritabilityAsynchronously() {
+    Task {
+      await self.toggleWritability()
+    }
+  }
+
+  /// Toggles whether the writer is writable or not. The writer is initially writable.
+  ///
+  /// If the writer becomes writable then it may resume writes to the delegate. If it becomes
+  /// unwritable then calls to `write` may suspend until the writability changes again.
+  ///
+  /// This API does not offer explicit control over the writability state so the caller must ensure
+  /// calls to this function correspond with changes in writability. The reason for this is that the
+  /// underlying type is an `actor` and updating its state is therefore asynchronous. However,
+  /// this functions is not called from an asynchronous context so it is not possible to `await`
+  /// state updates to complete. Instead, changing the state is via a `nonisolated` function on
+  /// the `actor` which spawns a new task. If this or a similar API allowed the writability to be
+  /// explicitly set then calls to that API are not guaranteed to be ordered which may lead to
+  /// deadlock.
+  @usableFromInline
+  internal func toggleWritability() async {
+    if self._isPaused {
+      self._isPaused = false
+      await self.resumeWriting()
+    } else {
+      self._isPaused = true
+    }
+  }
+
+  private func resumeWriting() async {
+    var writes = 0
+
+    while !self._isPaused {
+      if let pendingElement = self._pendingElements.popFirst() {
+        self._delegate.write(pendingElement.value)
+        pendingElement.continuation.resume()
+      } else if let pendingCompletion = self._completionState.completeIfPending() {
+        self._delegate.writeEnd(pendingCompletion.value)
+        pendingCompletion.continuation.resume()
+      } else {
+        break
+      }
+
+      // `writes` will never exceed `maxWritesBeforeYield` so unchecked arithmetic is okay here.
+      writes &+= 1
+      if writes == self._maxWritesBeforeYield {
+        writes = 0
+        // We yield every so often to let the delegate (i.e. 'NIO.Channel') catch up since it may
+        // decide it is no longer writable.
+        await Task.yield()
+      }
+    }
+  }
+
+  /// As ``cancel()`` but executed asynchronously.
+  @usableFromInline
+  internal nonisolated func cancelAsynchronously() {
+    Task {
+      await self.cancel()
+    }
+  }
+
+  /// Cancel all pending writes.
+  ///
+  /// Any pending writes will be dropped and their continuations will be resumed with
+  /// a `CancellationError`. Any writes after cancellation has completed will also fail.
+  @usableFromInline
+  internal func cancel() {
+    // If there's an end we should fail that last.
+    let pendingEnd: PendingEnd?
+
+    // Mark our state as completed before resuming any continuations (any future writes should fail
+    // immediately).
+    switch self._completionState {
+    case .incomplete:
+      pendingEnd = nil
+      self._completionState = .completed
+
+    case let .pending(pending):
+      pendingEnd = pending
+      self._completionState = .completed
+
+    case .completed:
+      pendingEnd = nil
+    }
+
+    let cancellationError = CancellationError()
+
+    while let pending = self._pendingElements.popFirst() {
+      pending.continuation.resume(throwing: cancellationError)
+    }
+
+    pendingEnd?.continuation.resume(throwing: cancellationError)
+  }
+
+  /// Write an `element`.
+  ///
+  /// The call may be suspend if the writer is paused.
+  ///
+  /// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
+  ///   have been suspended.
+  @inlinable
+  internal func write(_ element: Element) async throws {
+    try await withCheckedThrowingContinuation { continuation in
+      self._write(element, continuation: continuation)
+    }
+  }
+
+  @inlinable
+  internal func _write(_ element: Element, continuation: CheckedContinuation<Void, Error>) {
+    // There are three outcomes of writing:
+    // - write the element directly (if the writer isn't paused and no writes are pending)
+    // - queue the element (the writer is paused or there are writes already pending)
+    // - error (the writer is complete or the queue is full).
+
+    if self._completionState.isPendingOrCompleted {
+      continuation.resume(throwing: AsyncWriterError.alreadyFinished)
+    } else if !self._isPaused, self._pendingElements.isEmpty {
+      self._delegate.write(element)
+      continuation.resume()
+    } else if self._pendingElements.count < self._maxPendingElements {
+      // The continuation will be resumed later.
+      self._pendingElements.append(PendingElement(element, continuation: continuation))
+    } else {
+      continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
+    }
+  }
+
+  /// Write the final element
+  @inlinable
+  internal func finish(_ end: End) async throws {
+    try await withCheckedThrowingContinuation { continuation in
+      self._finish(end, continuation: continuation)
+    }
+  }
+
+  @inlinable
+  internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
+    if self._completionState.isPendingOrCompleted {
+      continuation.resume(throwing: AsyncWriterError.alreadyFinished)
+    } else if !self._isPaused, self._pendingElements.isEmpty {
+      self._completionState = .completed
+      self._delegate.writeEnd(end)
+      continuation.resume()
+    } else {
+      // Either we're paused or there are pending writes which must be consumed first.
+      self._completionState = .pending(PendingEnd(end, continuation: continuation))
+    }
+  }
+}
+
+@usableFromInline
+internal enum AsyncWriterError: Error, Hashable {
+  case tooManyPendingWrites
+  case alreadyFinished
+}
+
+@usableFromInline
+internal protocol AsyncWriterDelegate: AnyObject {
+  associatedtype Element
+  associatedtype End
+
+  @inlinable
+  func write(_ element: Element)
+
+  @inlinable
+  func writeEnd(_ end: End)
+}
+
+#endif // compiler(>=5.5)

+ 294 - 0
Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift

@@ -0,0 +1,294 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+@testable import GRPC
+import NIOConcurrencyHelpers
+import XCTest
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+internal class AsyncWriterTests: GRPCTestCase {
+  func testSingleWriterHappyPath() {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      try await writer.write("jimmy")
+      XCTAssertEqual(delegate.elements, ["jimmy"])
+
+      try await writer.write("jab")
+      XCTAssertEqual(delegate.elements, ["jimmy", "jab"])
+
+      try await writer.finish(99)
+      XCTAssertEqual(delegate.end, 99)
+    }
+  }
+
+  func testPauseAndResumeWrites() {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      // pause
+      await writer.toggleWritability()
+
+      async let written1: Void = writer.write("wunch")
+      XCTAssert(delegate.elements.isEmpty)
+
+      // resume
+      await writer.toggleWritability()
+      try await written1
+      XCTAssertEqual(delegate.elements, ["wunch"])
+
+      try await writer.finish(0)
+      XCTAssertEqual(delegate.end, 0)
+    }
+  }
+
+  func testTooManyWrites() throws {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      // Zero pending elements means that any write when paused will trigger an error.
+      let writer = AsyncWriter(maxPendingElements: 0, delegate: delegate)
+
+      // pause
+      await writer.toggleWritability()
+
+      await XCTAssertThrowsError(try await writer.write("pontiac")) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .tooManyPendingWrites)
+      }
+
+      // resume (we must finish the writer.)
+      await writer.toggleWritability()
+      try await writer.finish(0)
+      XCTAssertEqual(delegate.end, 0)
+      XCTAssertTrue(delegate.elements.isEmpty)
+    }
+  }
+
+  func testWriteAfterFinish() throws {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      try await writer.finish(0)
+      XCTAssertEqual(delegate.end, 0)
+
+      await XCTAssertThrowsError(try await writer.write("cheddar")) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      XCTAssertTrue(delegate.elements.isEmpty)
+    }
+  }
+
+  func testTooManyCallsToFinish() throws {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      try await writer.finish(0)
+      XCTAssertEqual(delegate.end, 0)
+
+      await XCTAssertThrowsError(try await writer.finish(1)) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      // Still 0.
+      XCTAssertEqual(delegate.end, 0)
+    }
+  }
+
+  func testCallToFinishWhilePending() throws {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      // Pause.
+      await writer.toggleWritability()
+
+      async let finished: Void = writer.finish(42)
+      XCTAssertNil(delegate.end)
+
+      // Resume.
+      await writer.toggleWritability()
+      try await finished
+
+      XCTAssertEqual(delegate.end, 42)
+    }
+  }
+
+  func testTooManyCallsToFinishWhilePending() throws {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      // Pause.
+      await writer.toggleWritability()
+
+      // We want to test that when a finish has suspended that another task calling finish results
+      // in an `AsyncWriterError.alreadyFinished` error.
+      //
+      // It's hard to achieve this reliably in an obvious way because we can't guarantee the
+      // ordering of `Task`s or when they will be suspended during `finish`. However, by pausing the
+      // writer and calling finish in two separate tasks we guarantee that one will run first and
+      // suspend (because the writer is paused) and the other will throw an error. When one throws
+      // an error it can resume the writer allowing the other task to resume successfully.
+      await withThrowingTaskGroup(of: Void.self) { group in
+        group.addTask {
+          do {
+            try await writer.finish(1)
+          } catch {
+            XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+            // Resume.
+            await writer.toggleWritability()
+          }
+        }
+
+        group.addTask {
+          do {
+            try await writer.finish(2)
+          } catch {
+            XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+            // Resume.
+            await writer.toggleWritability()
+          }
+        }
+      }
+
+      // We should definitely be finished by this point.
+      await XCTAssertThrowsError(try await writer.finish(3)) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+    }
+  }
+
+  func testCancellationForPendingWrite() {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      // Pause.
+      await writer.toggleWritability()
+
+      async let pendingWrite: Void = writer.write("foo")
+
+      await writer.cancel()
+
+      do {
+        try await pendingWrite
+        XCTFail("Expected to throw an error.")
+      } catch is CancellationError {
+        // Cancellation is fine: we cancelled while the write was pending.
+        ()
+      } catch let error as AsyncWriterError {
+        // Already finish is also fine: we cancelled before the write was enqueued.
+        XCTAssertEqual(error, .alreadyFinished)
+      } catch {
+        XCTFail("Unexpected error: \(error)")
+      }
+
+      await XCTAssertThrowsError(try await writer.write("bar")) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      XCTAssertTrue(delegate.elements.isEmpty)
+      XCTAssertNil(delegate.end)
+    }
+  }
+
+  func testCancellationForPendingFinish() {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      // Pause.
+      await writer.toggleWritability()
+
+      async let pendingWrite: Void = writer.finish(42)
+
+      await writer.cancel()
+
+      do {
+        try await pendingWrite
+        XCTFail("Expected to throw an error.")
+      } catch is CancellationError {
+        // Cancellation is fine: we cancelled while the write was pending.
+        ()
+      } catch let error as AsyncWriterError {
+        // Already finish is also fine: we cancelled before the write was enqueued.
+        XCTAssertEqual(error, .alreadyFinished)
+      } catch {
+        XCTFail("Unexpected error: \(error)")
+      }
+
+      await XCTAssertThrowsError(try await writer.finish(42)) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      XCTAssertTrue(delegate.elements.isEmpty)
+      XCTAssertNil(delegate.end)
+    }
+  }
+
+  func testMultipleCancellations() {
+    XCTAsyncTest {
+      let delegate = CollectingDelegate<String, Int>()
+      let writer = AsyncWriter(delegate: delegate)
+
+      await writer.cancel()
+      await XCTAssertThrowsError(try await writer.write("1")) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      // Fine, no need to throw. Nothing should change.
+      await writer.cancel()
+      await XCTAssertThrowsError(try await writer.write("2")) { error in
+        XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished)
+      }
+
+      XCTAssertTrue(delegate.elements.isEmpty)
+      XCTAssertNil(delegate.end)
+    }
+  }
+}
+
+fileprivate final class CollectingDelegate<Element, End>: AsyncWriterDelegate {
+  private let lock = Lock()
+  private var _elements: [Element] = []
+  private var _end: End?
+
+  internal var elements: [Element] {
+    return self.lock.withLock { self._elements }
+  }
+
+  internal var end: End? {
+    return self.lock.withLock { self._end }
+  }
+
+  internal func write(_ element: Element) {
+    self.lock.withLockVoid {
+      self._elements.append(element)
+    }
+  }
+
+  internal func writeEnd(_ end: End) {
+    self.lock.withLockVoid {
+      self._end = end
+    }
+  }
+}
+
+#endif // compiler(>=5.5)