GRPCAsyncRequestStream.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import NIOCore
  18. /// A type for the stream of request messages send to a gRPC server method.
  19. ///
  20. /// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
  21. /// method which allows you to create a stream that you can drive.
  22. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  23. public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
  24. @usableFromInline
  25. internal typealias _AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
  26. Element,
  27. Error,
  28. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  29. GRPCAsyncSequenceProducerDelegate
  30. >
  31. /// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
  32. public struct Source {
  33. @usableFromInline
  34. internal let continuation: AsyncThrowingStream<Element, Error>.Continuation
  35. @inlinable
  36. init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
  37. self.continuation = continuation
  38. }
  39. /// Yields the element to the request stream.
  40. ///
  41. /// - Parameter element: The element to yield to the request stream.
  42. @inlinable
  43. public func yield(_ element: Element) {
  44. self.continuation.yield(element)
  45. }
  46. /// Finished the request stream.
  47. @inlinable
  48. public func finish() {
  49. self.continuation.finish()
  50. }
  51. /// Finished the request stream.
  52. ///
  53. /// - Parameter error: An optional `Error` to finish the request stream with.
  54. @inlinable
  55. public func finish(throwing error: Error?) {
  56. self.continuation.finish(throwing: error)
  57. }
  58. }
  59. /// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
  60. ///
  61. /// This struct contains two properties:
  62. /// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
  63. /// 2. The ``source`` which can be used to drive the stream.
  64. public struct TestingStream {
  65. /// The actual stream.
  66. public let stream: GRPCAsyncRequestStream<Element>
  67. /// The source used to drive the stream.
  68. public let source: Source
  69. @inlinable
  70. init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
  71. self.stream = stream
  72. self.source = source
  73. }
  74. }
  75. @usableFromInline
  76. enum Backing: Sendable {
  77. case asyncStream(AsyncThrowingStream<Element, Error>)
  78. case throwingAsyncSequenceProducer(_AsyncSequenceProducer)
  79. }
  80. @usableFromInline
  81. internal let backing: Backing
  82. @inlinable
  83. internal init(_ sequence: _AsyncSequenceProducer) {
  84. self.backing = .throwingAsyncSequenceProducer(sequence)
  85. }
  86. @inlinable
  87. internal init(_ stream: AsyncThrowingStream<Element, Error>) {
  88. self.backing = .asyncStream(stream)
  89. }
  90. /// Creates a new testing stream.
  91. ///
  92. /// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
  93. /// to your method.
  94. ///
  95. /// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
  96. @inlinable
  97. public static func makeTestingRequestStream() -> TestingStream {
  98. var continuation: AsyncThrowingStream<Element, Error>.Continuation!
  99. let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
  100. let source = Source(continuation: continuation)
  101. let requestStream = Self(stream)
  102. return TestingStream(stream: requestStream, source: source)
  103. }
  104. @inlinable
  105. public func makeAsyncIterator() -> Iterator {
  106. switch self.backing {
  107. case let .asyncStream(stream):
  108. return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
  109. case let .throwingAsyncSequenceProducer(sequence):
  110. return Self.AsyncIterator(.throwingAsyncSequenceProducer(sequence.makeAsyncIterator()))
  111. }
  112. }
  113. public struct Iterator: AsyncIteratorProtocol {
  114. @usableFromInline
  115. enum BackingIterator {
  116. case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
  117. case throwingAsyncSequenceProducer(_AsyncSequenceProducer.AsyncIterator)
  118. }
  119. @usableFromInline
  120. internal var iterator: BackingIterator
  121. @usableFromInline
  122. internal init(_ iterator: BackingIterator) {
  123. self.iterator = iterator
  124. }
  125. @inlinable
  126. public mutating func next() async throws -> Element? {
  127. if Task.isCancelled { throw GRPCStatus(code: .cancelled) }
  128. switch self.iterator {
  129. case var .asyncStream(iterator):
  130. let element = try await iterator.next()
  131. self.iterator = .asyncStream(iterator)
  132. return element
  133. case let .throwingAsyncSequenceProducer(iterator):
  134. return try await iterator.next()
  135. }
  136. }
  137. }
  138. }
  139. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  140. extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}
  141. #endif