AsyncWriter.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import NIOCore
  18. /// An asynchronous writer which forwards messages to a delegate.
  19. ///
  20. /// Forwarding of messages to the delegate may be paused and resumed by controlling the writability
  21. /// of the writer. This may be controlled by calls to ``toggleWritability()``. When the writer is
  22. /// paused (by becoming unwritable) calls to ``write(_:)`` may suspend. When the writer is resumed
  23. /// (by becoming writable) any calls which are suspended may be resumed.
  24. ///
  25. /// The writer must also be "finished" with a final value: as for writing, calls to ``finish(_:)``
  26. /// may suspend if the writer has been paused.
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. @usableFromInline
  29. internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
  30. @usableFromInline
  31. internal typealias Element = Delegate.Element
  32. @usableFromInline
  33. internal typealias End = Delegate.End
  34. /// A value pending a write.
  35. @usableFromInline
  36. internal struct _Pending<Value: Sendable>: Sendable {
  37. @usableFromInline
  38. var value: Value
  39. @usableFromInline
  40. var continuation: CheckedContinuation<Void, Error>
  41. @inlinable
  42. internal init(_ value: Value, continuation: CheckedContinuation<Void, Error>) {
  43. self.value = value
  44. self.continuation = continuation
  45. }
  46. }
  47. @usableFromInline
  48. typealias PendingElement = _Pending<Element>
  49. @usableFromInline
  50. typealias PendingEnd = _Pending<End>
  51. @usableFromInline
  52. internal enum _CompletionState: Sendable {
  53. /// Finish hasn't been called yet. May move to `pending` or `completed`.
  54. case incomplete
  55. /// Finish has been called but the writer is paused. May move to `completed`.
  56. case pending(PendingEnd)
  57. /// The completion message has been sent to the delegate. This is a terminal state.
  58. case completed
  59. /// Move from `pending` to `completed` and return the `PendingCompletion`. Returns `nil` if
  60. /// the state was not `pending`.
  61. @inlinable
  62. mutating func completeIfPending() -> PendingEnd? {
  63. switch self {
  64. case let .pending(pending):
  65. self = .completed
  66. return pending
  67. case .incomplete, .completed:
  68. return nil
  69. }
  70. }
  71. @usableFromInline
  72. var isPendingOrCompleted: Bool {
  73. switch self {
  74. case .incomplete:
  75. return false
  76. case .pending, .completed:
  77. return true
  78. }
  79. }
  80. }
  81. /// The maximum number of pending elements. `pendingElements` must not grow beyond this limit.
  82. @usableFromInline
  83. internal let _maxPendingElements: Int
  84. /// The maximum number of writes to the delegate made in `resume` before yielding to allow other
  85. /// values to be queued.
  86. @usableFromInline
  87. internal let _maxWritesBeforeYield: Int
  88. /// Elements and continuations which have been buffered but are awaiting consumption by the
  89. /// delegate.
  90. @usableFromInline
  91. internal var _pendingElements: CircularBuffer<PendingElement>
  92. /// The completion state of the writer.
  93. @usableFromInline
  94. internal var _completionState: _CompletionState
  95. /// Whether the writer is paused.
  96. @usableFromInline
  97. internal var _isPaused: Bool = false
  98. /// The delegate to process elements. By convention we call the delegate before resuming any
  99. /// continuation.
  100. @usableFromInline
  101. internal let _delegate: Delegate
  102. @inlinable
  103. internal init(
  104. maxPendingElements: Int = 16,
  105. maxWritesBeforeYield: Int = 5,
  106. delegate: Delegate
  107. ) {
  108. self._maxPendingElements = maxPendingElements
  109. self._maxWritesBeforeYield = maxWritesBeforeYield
  110. self._pendingElements = CircularBuffer(initialCapacity: maxPendingElements)
  111. self._completionState = .incomplete
  112. self._delegate = delegate
  113. }
  114. deinit {
  115. switch self._completionState {
  116. case .completed:
  117. ()
  118. case .incomplete, .pending:
  119. assertionFailure("writer has not completed is pending completion")
  120. }
  121. }
  122. /// As ``toggleWritability()`` but executed asynchronously.
  123. @usableFromInline
  124. internal nonisolated func toggleWritabilityAsynchronously() {
  125. Task {
  126. await self.toggleWritability()
  127. }
  128. }
  129. /// Toggles whether the writer is writable or not. The writer is initially writable.
  130. ///
  131. /// If the writer becomes writable then it may resume writes to the delegate. If it becomes
  132. /// unwritable then calls to `write` may suspend until the writability changes again.
  133. ///
  134. /// This API does not offer explicit control over the writability state so the caller must ensure
  135. /// calls to this function correspond with changes in writability. The reason for this is that the
  136. /// underlying type is an `actor` and updating its state is therefore asynchronous. However,
  137. /// this functions is not called from an asynchronous context so it is not possible to `await`
  138. /// state updates to complete. Instead, changing the state is via a `nonisolated` function on
  139. /// the `actor` which spawns a new task. If this or a similar API allowed the writability to be
  140. /// explicitly set then calls to that API are not guaranteed to be ordered which may lead to
  141. /// deadlock.
  142. @usableFromInline
  143. internal func toggleWritability() async {
  144. if self._isPaused {
  145. self._isPaused = false
  146. await self.resumeWriting()
  147. } else {
  148. self._isPaused = true
  149. }
  150. }
  151. private func resumeWriting() async {
  152. var writes = 0
  153. while !self._isPaused {
  154. if let pendingElement = self._pendingElements.popFirst() {
  155. self._delegate.write(pendingElement.value)
  156. pendingElement.continuation.resume()
  157. } else if let pendingCompletion = self._completionState.completeIfPending() {
  158. self._delegate.writeEnd(pendingCompletion.value)
  159. pendingCompletion.continuation.resume()
  160. } else {
  161. break
  162. }
  163. // `writes` will never exceed `maxWritesBeforeYield` so unchecked arithmetic is okay here.
  164. writes &+= 1
  165. if writes == self._maxWritesBeforeYield {
  166. writes = 0
  167. // We yield every so often to let the delegate (i.e. 'NIO.Channel') catch up since it may
  168. // decide it is no longer writable.
  169. await Task.yield()
  170. }
  171. }
  172. }
  173. /// As ``cancel()`` but executed asynchronously.
  174. @usableFromInline
  175. internal nonisolated func cancelAsynchronously() {
  176. Task {
  177. await self.cancel()
  178. }
  179. }
  180. /// Cancel all pending writes.
  181. ///
  182. /// Any pending writes will be dropped and their continuations will be resumed with
  183. /// a `CancellationError`. Any writes after cancellation has completed will also fail.
  184. @usableFromInline
  185. internal func cancel() {
  186. // If there's an end we should fail that last.
  187. let pendingEnd: PendingEnd?
  188. // Mark our state as completed before resuming any continuations (any future writes should fail
  189. // immediately).
  190. switch self._completionState {
  191. case .incomplete:
  192. pendingEnd = nil
  193. self._completionState = .completed
  194. case let .pending(pending):
  195. pendingEnd = pending
  196. self._completionState = .completed
  197. case .completed:
  198. pendingEnd = nil
  199. }
  200. let cancellationError = CancellationError()
  201. while let pending = self._pendingElements.popFirst() {
  202. pending.continuation.resume(throwing: cancellationError)
  203. }
  204. pendingEnd?.continuation.resume(throwing: cancellationError)
  205. }
  206. /// Write an `element`.
  207. ///
  208. /// The call may be suspend if the writer is paused.
  209. ///
  210. /// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks
  211. /// have been suspended.
  212. @inlinable
  213. internal func write(_ element: Element) async throws {
  214. try await withCheckedThrowingContinuation { continuation in
  215. self._write(element, continuation: continuation)
  216. }
  217. }
  218. @inlinable
  219. internal func _write(_ element: Element, continuation: CheckedContinuation<Void, Error>) {
  220. // There are three outcomes of writing:
  221. // - write the element directly (if the writer isn't paused and no writes are pending)
  222. // - queue the element (the writer is paused or there are writes already pending)
  223. // - error (the writer is complete or the queue is full).
  224. if self._completionState.isPendingOrCompleted {
  225. continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
  226. } else if !self._isPaused, self._pendingElements.isEmpty {
  227. self._delegate.write(element)
  228. continuation.resume()
  229. } else if self._pendingElements.count < self._maxPendingElements {
  230. // The continuation will be resumed later.
  231. self._pendingElements.append(PendingElement(element, continuation: continuation))
  232. } else {
  233. continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites)
  234. }
  235. }
  236. /// Write the final element
  237. @inlinable
  238. internal func finish(_ end: End) async throws {
  239. try await withCheckedThrowingContinuation { continuation in
  240. self._finish(end, continuation: continuation)
  241. }
  242. }
  243. @inlinable
  244. internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
  245. if self._completionState.isPendingOrCompleted {
  246. continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
  247. } else if !self._isPaused, self._pendingElements.isEmpty {
  248. self._completionState = .completed
  249. self._delegate.writeEnd(end)
  250. continuation.resume()
  251. } else {
  252. // Either we're paused or there are pending writes which must be consumed first.
  253. self._completionState = .pending(PendingEnd(end, continuation: continuation))
  254. }
  255. }
  256. }
  257. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  258. extension AsyncWriter where End == Void {
  259. @inlinable
  260. internal func finish() async throws {
  261. try await self.finish(())
  262. }
  263. }
  264. public struct GRPCAsyncWriterError: Error, Hashable {
  265. private let wrapped: Wrapped
  266. @usableFromInline
  267. internal enum Wrapped {
  268. case tooManyPendingWrites
  269. case alreadyFinished
  270. }
  271. @usableFromInline
  272. internal init(_ wrapped: Wrapped) {
  273. self.wrapped = wrapped
  274. }
  275. /// There are too many writes pending. This may occur when too many Tasks are writing
  276. /// concurrently.
  277. public static let tooManyPendingWrites = Self(.tooManyPendingWrites)
  278. /// The writer has already finished. This may occur when the RPC completes prematurely, or when
  279. /// a user calls finish more than once.
  280. public static let alreadyFinished = Self(.alreadyFinished)
  281. }
  282. @usableFromInline
  283. internal protocol AsyncWriterDelegate: AnyObject, Sendable {
  284. associatedtype Element: Sendable
  285. associatedtype End: Sendable
  286. @inlinable
  287. func write(_ element: Element)
  288. @inlinable
  289. func writeEnd(_ end: End)
  290. }
  291. #endif // compiler(>=5.6)