GRPCAsyncRequestStream.swift 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. /// A type for the stream of request messages send to a gRPC server method.
  18. ///
  19. /// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
  20. /// method which allows you to create a stream that you can drive.
  21. ///
  22. /// - Note: This is currently a wrapper around AsyncThrowingStream because we want to be
  23. /// able to swap out the implementation for something else in the future.
  24. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  25. public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
  26. /// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
  27. public struct Source {
  28. @usableFromInline
  29. internal let continuation: AsyncThrowingStream<Element, Error>.Continuation
  30. @inlinable
  31. init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
  32. self.continuation = continuation
  33. }
  34. /// Yields the element to the request stream.
  35. ///
  36. /// - Parameter element: The element to yield to the request stream.
  37. @inlinable
  38. public func yield(_ element: Element) {
  39. self.continuation.yield(element)
  40. }
  41. /// Finished the request stream.
  42. @inlinable
  43. public func finish() {
  44. self.continuation.finish()
  45. }
  46. /// Finished the request stream.
  47. ///
  48. /// - Parameter error: An optional `Error` to finish the request stream with.
  49. @inlinable
  50. public func finish(throwing error: Error?) {
  51. self.continuation.finish(throwing: error)
  52. }
  53. }
  54. /// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
  55. ///
  56. /// This struct contains two properties:
  57. /// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
  58. /// 2. The ``source`` which can be used to drive the stream.
  59. public struct TestingStream {
  60. /// The actual stream.
  61. public let stream: GRPCAsyncRequestStream<Element>
  62. /// The source used to drive the stream.
  63. public let source: Source
  64. @inlinable
  65. init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
  66. self.stream = stream
  67. self.source = source
  68. }
  69. }
  70. @usableFromInline
  71. enum Backing: Sendable {
  72. case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>)
  73. case asyncStream(AsyncThrowingStream<Element, Error>)
  74. }
  75. @usableFromInline
  76. internal let backing: Backing
  77. @inlinable
  78. internal init(_ sequence: PassthroughMessageSequence<Element, Error>) {
  79. self.backing = .passthroughMessageSequence(sequence)
  80. }
  81. @inlinable
  82. internal init(_ stream: AsyncThrowingStream<Element, Error>) {
  83. self.backing = .asyncStream(stream)
  84. }
  85. /// Creates a new testing stream.
  86. ///
  87. /// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
  88. /// to your method.
  89. ///
  90. /// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
  91. @inlinable
  92. public static func makeTestingRequestStream() -> TestingStream {
  93. var continuation: AsyncThrowingStream<Element, Error>.Continuation!
  94. let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
  95. let source = Source(continuation: continuation)
  96. let requestStream = Self(stream)
  97. return TestingStream(stream: requestStream, source: source)
  98. }
  99. @inlinable
  100. public func makeAsyncIterator() -> Iterator {
  101. switch self.backing {
  102. case let .passthroughMessageSequence(sequence):
  103. return Self.AsyncIterator(.passthroughMessageSequence(sequence.makeAsyncIterator()))
  104. case let .asyncStream(stream):
  105. return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
  106. }
  107. }
  108. public struct Iterator: AsyncIteratorProtocol {
  109. @usableFromInline
  110. enum BackingIterator {
  111. case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>.Iterator)
  112. case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
  113. }
  114. @usableFromInline
  115. internal var iterator: BackingIterator
  116. @usableFromInline
  117. internal init(_ iterator: BackingIterator) {
  118. self.iterator = iterator
  119. }
  120. @inlinable
  121. public mutating func next() async throws -> Element? {
  122. switch self.iterator {
  123. case let .passthroughMessageSequence(iterator):
  124. return try await iterator.next()
  125. case var .asyncStream(iterator):
  126. let element = try await iterator.next()
  127. self.iterator = .asyncStream(iterator)
  128. return element
  129. }
  130. }
  131. }
  132. }
  133. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  134. extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}
  135. #endif