GRPCAsyncResponseStreamWriter.swift 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. /// Writer for server-streaming RPC handlers to provide responses.
  18. ///
  19. /// To enable testability this type provides a static ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``
  20. /// method which allows you to create a stream that you can drive.
  21. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  22. public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
  23. /// An `AsyncSequence` backing a ``GRPCAsyncResponseStreamWriter`` for testing purposes.
  24. ///
  25. /// - Important: This `AsyncSequence` is never finishing.
  26. public struct ResponseStream: AsyncSequence {
  27. public typealias Element = (Response, Compression)
  28. @usableFromInline
  29. internal let stream: AsyncStream<(Response, Compression)>
  30. @usableFromInline
  31. internal let continuation: AsyncStream<(Response, Compression)>.Continuation
  32. @inlinable
  33. init(
  34. stream: AsyncStream<(Response, Compression)>,
  35. continuation: AsyncStream<(Response, Compression)>.Continuation
  36. ) {
  37. self.stream = stream
  38. self.continuation = continuation
  39. }
  40. public func makeAsyncIterator() -> AsyncIterator {
  41. AsyncIterator(iterator: self.stream.makeAsyncIterator())
  42. }
  43. /// Finishes the response stream.
  44. ///
  45. /// This is useful in tests to finish the stream after the async method finished and allows you to collect all written responses.
  46. public func finish() {
  47. self.continuation.finish()
  48. }
  49. public struct AsyncIterator: AsyncIteratorProtocol {
  50. @usableFromInline
  51. internal var iterator: AsyncStream<(Response, Compression)>.AsyncIterator
  52. @inlinable
  53. init(iterator: AsyncStream<(Response, Compression)>.AsyncIterator) {
  54. self.iterator = iterator
  55. }
  56. public mutating func next() async -> Element? {
  57. await self.iterator.next()
  58. }
  59. }
  60. }
  61. /// Simple struct for the return type of ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``.
  62. ///
  63. /// This struct contains two properties:
  64. /// 1. The ``writer`` which is the actual ``GRPCAsyncResponseStreamWriter`` and should be passed to the method under testing.
  65. /// 2. The ``stream`` which can be used to observe the written responses.
  66. public struct TestingStreamWriter {
  67. /// The actual writer.
  68. public let writer: GRPCAsyncResponseStreamWriter<Response>
  69. /// The written responses in a stream.
  70. ///
  71. /// - Important: This `AsyncSequence` is never finishing.
  72. public let stream: ResponseStream
  73. @inlinable
  74. init(writer: GRPCAsyncResponseStreamWriter<Response>, stream: ResponseStream) {
  75. self.writer = writer
  76. self.stream = stream
  77. }
  78. }
  79. @usableFromInline
  80. enum Backing: Sendable {
  81. case asyncWriter(AsyncWriter<Delegate>)
  82. case closure(@Sendable ((Response, Compression)) async -> Void)
  83. }
  84. @usableFromInline
  85. internal typealias Element = (Response, Compression)
  86. @usableFromInline
  87. internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
  88. @usableFromInline
  89. internal let backing: Backing
  90. @inlinable
  91. internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
  92. self.backing = .asyncWriter(asyncWriter)
  93. }
  94. @inlinable
  95. internal init(onWrite: @escaping @Sendable ((Response, Compression)) async -> Void) {
  96. self.backing = .closure(onWrite)
  97. }
  98. @inlinable
  99. public func send(
  100. _ response: Response,
  101. compression: Compression = .deferToCallDefault
  102. ) async throws {
  103. switch self.backing {
  104. case let .asyncWriter(writer):
  105. try await writer.write((response, compression))
  106. case let .closure(closure):
  107. await closure((response, compression))
  108. }
  109. }
  110. /// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
  111. /// This is mostly useful for testing purposes where one wants to observe the written responses.
  112. ///
  113. /// - Note: For most tests it is useful to call ``ResponseStream/finish()`` after the async method under testing
  114. /// resumed. This allows you to easily collect all written responses.
  115. @inlinable
  116. public static func makeTestingResponseStreamWriter() -> TestingStreamWriter {
  117. var continuation: AsyncStream<(Response, Compression)>.Continuation!
  118. let asyncStream = AsyncStream<(Response, Compression)> { cont in
  119. continuation = cont
  120. }
  121. let writer = Self.init { [continuation] in
  122. continuation!.yield($0)
  123. }
  124. let responseStream = ResponseStream(
  125. stream: asyncStream,
  126. continuation: continuation
  127. )
  128. return TestingStreamWriter(writer: writer, stream: responseStream)
  129. }
  130. }
  131. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  132. @usableFromInline
  133. internal final class AsyncResponseStreamWriterDelegate<Response: Sendable>: AsyncWriterDelegate {
  134. @usableFromInline
  135. internal typealias Element = (Response, Compression)
  136. @usableFromInline
  137. internal typealias End = GRPCStatus
  138. @usableFromInline
  139. internal let _send: @Sendable (Response, Compression) -> Void
  140. @usableFromInline
  141. internal let _finish: @Sendable (GRPCStatus) -> Void
  142. // Create a new AsyncResponseStreamWriterDelegate.
  143. //
  144. // - Important: the `send` and `finish` closures must be thread-safe.
  145. @inlinable
  146. internal init(
  147. send: @escaping @Sendable (Response, Compression) -> Void,
  148. finish: @escaping @Sendable (GRPCStatus) -> Void
  149. ) {
  150. self._send = send
  151. self._finish = finish
  152. }
  153. @inlinable
  154. internal func _send(
  155. _ response: Response,
  156. compression: Compression = .deferToCallDefault
  157. ) {
  158. self._send(response, compression)
  159. }
  160. // MARK: - AsyncWriterDelegate conformance.
  161. @inlinable
  162. internal func write(_ element: (Response, Compression)) {
  163. self._send(element.0, compression: element.1)
  164. }
  165. @inlinable
  166. internal func writeEnd(_ end: GRPCStatus) {
  167. self._finish(end)
  168. }
  169. }
  170. #endif