GRPCAsyncRequestStream.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. /// A type for the stream of request messages send to a gRPC server method.
  18. ///
  19. /// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
  20. /// method which allows you to create a stream that you can drive.
  21. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  22. public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
  23. @usableFromInline
  24. internal typealias _AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
  25. Element,
  26. Error,
  27. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  28. GRPCAsyncSequenceProducerDelegate
  29. >
  30. /// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
  31. public struct Source {
  32. @usableFromInline
  33. internal let continuation: AsyncThrowingStream<Element, Error>.Continuation
  34. @inlinable
  35. init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
  36. self.continuation = continuation
  37. }
  38. /// Yields the element to the request stream.
  39. ///
  40. /// - Parameter element: The element to yield to the request stream.
  41. @inlinable
  42. public func yield(_ element: Element) {
  43. self.continuation.yield(element)
  44. }
  45. /// Finished the request stream.
  46. @inlinable
  47. public func finish() {
  48. self.continuation.finish()
  49. }
  50. /// Finished the request stream.
  51. ///
  52. /// - Parameter error: An optional `Error` to finish the request stream with.
  53. @inlinable
  54. public func finish(throwing error: Error?) {
  55. self.continuation.finish(throwing: error)
  56. }
  57. }
  58. /// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
  59. ///
  60. /// This struct contains two properties:
  61. /// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
  62. /// 2. The ``source`` which can be used to drive the stream.
  63. public struct TestingStream {
  64. /// The actual stream.
  65. public let stream: GRPCAsyncRequestStream<Element>
  66. /// The source used to drive the stream.
  67. public let source: Source
  68. @inlinable
  69. init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
  70. self.stream = stream
  71. self.source = source
  72. }
  73. }
  74. @usableFromInline
  75. enum Backing: Sendable {
  76. case asyncStream(AsyncThrowingStream<Element, Error>)
  77. case throwingAsyncSequenceProducer(_AsyncSequenceProducer)
  78. }
  79. @usableFromInline
  80. internal let backing: Backing
  81. @inlinable
  82. internal init(_ sequence: _AsyncSequenceProducer) {
  83. self.backing = .throwingAsyncSequenceProducer(sequence)
  84. }
  85. @inlinable
  86. internal init(_ stream: AsyncThrowingStream<Element, Error>) {
  87. self.backing = .asyncStream(stream)
  88. }
  89. /// Creates a new testing stream.
  90. ///
  91. /// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
  92. /// to your method.
  93. ///
  94. /// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
  95. @inlinable
  96. public static func makeTestingRequestStream() -> TestingStream {
  97. var continuation: AsyncThrowingStream<Element, Error>.Continuation!
  98. let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
  99. let source = Source(continuation: continuation)
  100. let requestStream = Self(stream)
  101. return TestingStream(stream: requestStream, source: source)
  102. }
  103. @inlinable
  104. public func makeAsyncIterator() -> Iterator {
  105. switch self.backing {
  106. case let .asyncStream(stream):
  107. return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
  108. case let .throwingAsyncSequenceProducer(sequence):
  109. return Self.AsyncIterator(.throwingAsyncSequenceProducer(sequence.makeAsyncIterator()))
  110. }
  111. }
  112. public struct Iterator: AsyncIteratorProtocol {
  113. @usableFromInline
  114. enum BackingIterator {
  115. case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
  116. case throwingAsyncSequenceProducer(_AsyncSequenceProducer.AsyncIterator)
  117. }
  118. @usableFromInline
  119. internal var iterator: BackingIterator
  120. @usableFromInline
  121. internal init(_ iterator: BackingIterator) {
  122. self.iterator = iterator
  123. }
  124. @inlinable
  125. public mutating func next() async throws -> Element? {
  126. if Task.isCancelled { throw GRPCStatus(code: .cancelled) }
  127. switch self.iterator {
  128. case var .asyncStream(iterator):
  129. let element = try await iterator.next()
  130. self.iterator = .asyncStream(iterator)
  131. return element
  132. case let .throwingAsyncSequenceProducer(iterator):
  133. return try await iterator.next()
  134. }
  135. }
  136. }
  137. }
  138. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  139. extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}