GRPCAsyncResponseStreamWriter.swift 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import NIOCore
  18. /// Writer for server-streaming RPC handlers to provide responses.
  19. ///
  20. /// To enable testability this type provides a static ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``
  21. /// method which allows you to create a stream that you can drive.
  22. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  23. public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
  24. @usableFromInline
  25. internal typealias AsyncWriter = NIOAsyncWriter<
  26. (Response, Compression),
  27. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  28. >
  29. /// An `AsyncSequence` backing a ``GRPCAsyncResponseStreamWriter`` for testing purposes.
  30. ///
  31. /// - Important: This `AsyncSequence` is never finishing.
  32. public struct ResponseStream: AsyncSequence {
  33. public typealias Element = (Response, Compression)
  34. @usableFromInline
  35. internal let stream: AsyncStream<(Response, Compression)>
  36. @usableFromInline
  37. internal let continuation: AsyncStream<(Response, Compression)>.Continuation
  38. @inlinable
  39. init(
  40. stream: AsyncStream<(Response, Compression)>,
  41. continuation: AsyncStream<(Response, Compression)>.Continuation
  42. ) {
  43. self.stream = stream
  44. self.continuation = continuation
  45. }
  46. public func makeAsyncIterator() -> AsyncIterator {
  47. AsyncIterator(iterator: self.stream.makeAsyncIterator())
  48. }
  49. /// Finishes the response stream.
  50. ///
  51. /// This is useful in tests to finish the stream after the async method finished and allows you to collect all written responses.
  52. public func finish() {
  53. self.continuation.finish()
  54. }
  55. public struct AsyncIterator: AsyncIteratorProtocol {
  56. @usableFromInline
  57. internal var iterator: AsyncStream<(Response, Compression)>.AsyncIterator
  58. @inlinable
  59. init(iterator: AsyncStream<(Response, Compression)>.AsyncIterator) {
  60. self.iterator = iterator
  61. }
  62. public mutating func next() async -> Element? {
  63. await self.iterator.next()
  64. }
  65. }
  66. }
  67. /// Simple struct for the return type of ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``.
  68. ///
  69. /// This struct contains two properties:
  70. /// 1. The ``writer`` which is the actual ``GRPCAsyncResponseStreamWriter`` and should be passed to the method under testing.
  71. /// 2. The ``stream`` which can be used to observe the written responses.
  72. public struct TestingStreamWriter {
  73. /// The actual writer.
  74. public let writer: GRPCAsyncResponseStreamWriter<Response>
  75. /// The written responses in a stream.
  76. ///
  77. /// - Important: This `AsyncSequence` is never finishing.
  78. public let stream: ResponseStream
  79. @inlinable
  80. init(writer: GRPCAsyncResponseStreamWriter<Response>, stream: ResponseStream) {
  81. self.writer = writer
  82. self.stream = stream
  83. }
  84. }
  85. @usableFromInline
  86. enum Backing: Sendable {
  87. case asyncWriter(AsyncWriter)
  88. case closure(@Sendable ((Response, Compression)) async -> Void)
  89. }
  90. @usableFromInline
  91. internal let backing: Backing
  92. @inlinable
  93. internal init(wrapping asyncWriter: AsyncWriter) {
  94. self.backing = .asyncWriter(asyncWriter)
  95. }
  96. @inlinable
  97. internal init(onWrite: @escaping @Sendable ((Response, Compression)) async -> Void) {
  98. self.backing = .closure(onWrite)
  99. }
  100. @inlinable
  101. public func send(
  102. _ response: Response,
  103. compression: Compression = .deferToCallDefault
  104. ) async throws {
  105. switch self.backing {
  106. case let .asyncWriter(writer):
  107. try await writer.yield((response, compression))
  108. case let .closure(closure):
  109. await closure((response, compression))
  110. }
  111. }
  112. @inlinable
  113. public func send<S: Sequence>(
  114. contentsOf responses: S,
  115. compression: Compression = .deferToCallDefault
  116. ) async throws where S.Element == Response {
  117. let responsesWithCompression = responses.lazy.map { ($0, compression) }
  118. switch self.backing {
  119. case let .asyncWriter(writer):
  120. try await writer.yield(contentsOf: responsesWithCompression)
  121. case let .closure(closure):
  122. for response in responsesWithCompression {
  123. await closure(response)
  124. }
  125. }
  126. }
  127. /// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
  128. /// This is mostly useful for testing purposes where one wants to observe the written responses.
  129. ///
  130. /// - Note: For most tests it is useful to call ``ResponseStream/finish()`` after the async method under testing
  131. /// resumed. This allows you to easily collect all written responses.
  132. @inlinable
  133. public static func makeTestingResponseStreamWriter() -> TestingStreamWriter {
  134. var continuation: AsyncStream<(Response, Compression)>.Continuation!
  135. let asyncStream = AsyncStream<(Response, Compression)> { cont in
  136. continuation = cont
  137. }
  138. let writer = Self.init { [continuation] in
  139. continuation!.yield($0)
  140. }
  141. let responseStream = ResponseStream(
  142. stream: asyncStream,
  143. continuation: continuation
  144. )
  145. return TestingStreamWriter(writer: writer, stream: responseStream)
  146. }
  147. }
  148. #endif