UncheckedAsyncIteratorSequence.swift 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import Synchronization // should be @usableFromInline
  17. @usableFromInline
  18. /// An `AsyncSequence` which wraps an existing async iterator.
  19. final class UncheckedAsyncIteratorSequence<
  20. Base: AsyncIteratorProtocol
  21. >: AsyncSequence, @unchecked Sendable {
  22. // This is '@unchecked Sendable' because iterators are typically marked as not being Sendable
  23. // to avoid multiple iterators being created. This is done to avoid multiple concurrent consumers
  24. // of a single async sequence.
  25. //
  26. // However, gRPC needs to read the first message in a sequence of inbound request/response parts
  27. // to check how the RPC should be handled. To do this it creates an async iterator and waits for
  28. // the first value and then decides what to do. If it continues processing the RPC it uses this
  29. // wrapper type to turn the iterator back into an async sequence and then drops the iterator on
  30. // the floor so that there is only a single consumer of the original source.
  31. @usableFromInline
  32. typealias Element = Base.Element
  33. /// The base iterator.
  34. @usableFromInline
  35. private(set) var base: Base
  36. /// Set to `true` when an iterator has been made.
  37. @usableFromInline
  38. let _hasMadeIterator = Atomic(false)
  39. @inlinable
  40. init(_ base: Base) {
  41. self.base = base
  42. }
  43. @usableFromInline
  44. struct AsyncIterator: AsyncIteratorProtocol {
  45. @usableFromInline
  46. private(set) var base: Base
  47. @inlinable
  48. init(base: Base) {
  49. self.base = base
  50. }
  51. @inlinable
  52. mutating func next() async throws -> Element? {
  53. try await self.base.next()
  54. }
  55. }
  56. @inlinable
  57. func makeAsyncIterator() -> AsyncIterator {
  58. let (exchanged, original) = self._hasMadeIterator.compareExchange(
  59. expected: false,
  60. desired: true,
  61. ordering: .relaxed
  62. )
  63. guard exchanged else {
  64. fatalError("Only one iterator can be made")
  65. }
  66. assert(!original)
  67. return AsyncIterator(base: self.base)
  68. }
  69. }