_FakeResponseStream.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHPACK
  18. public enum FakeRequestPart<Request: GRPCPayload> {
  19. case metadata(HPACKHeaders)
  20. case message(Request)
  21. case end
  22. }
  23. /// Sending on a fake response stream would have resulted in a protocol violation (such as
  24. /// sending initial metadata multiple times or sending messages after the stream has closed).
  25. public struct FakeResponseProtocolViolation: Error, Hashable {
  26. /// The reason that sending the message would have resulted in a protocol violation.
  27. public var reason: String
  28. init(_ reason: String) {
  29. self.reason = reason
  30. }
  31. }
  32. /// A fake response stream into which users may inject response parts for use in unit tests.
  33. ///
  34. /// Users may not interact with this class directly but may do so via one of its subclasses
  35. /// `FakeUnaryResponse` and `FakeStreamingResponse`.
  36. public class _FakeResponseStream<Request: GRPCPayload, Response: GRPCPayload> {
  37. private enum StreamEvent {
  38. case responsePart(_GRPCClientResponsePart<Response>)
  39. case error(Error)
  40. }
  41. /// The channel to use for communication.
  42. internal let channel: EmbeddedChannel
  43. /// A buffer to hold responses in before the proxy is activated.
  44. private var responseBuffer: CircularBuffer<StreamEvent>
  45. /// The current state of the proxy.
  46. private var activeState: ActiveState
  47. /// The state of sending response parts.
  48. private var sendState: SendState
  49. private enum ActiveState {
  50. case inactive
  51. case active
  52. }
  53. private enum SendState {
  54. // Nothing has been sent; we can send initial metadata to become 'sending' or trailing metadata
  55. // to start 'closing'.
  56. case idle
  57. // We're sending messages. We can send more messages in this state or trailing metadata to
  58. // transition to 'closing'.
  59. case sending
  60. // We're closing: we've sent trailing metadata, we may only send a status now to close.
  61. case closing
  62. // Closed, nothing more can be sent.
  63. case closed
  64. }
  65. internal init(requestHandler: @escaping (FakeRequestPart<Request>) -> ()) {
  66. self.activeState = .inactive
  67. self.sendState = .idle
  68. self.responseBuffer = CircularBuffer()
  69. self.channel = EmbeddedChannel(handler: WriteCapturingHandler(requestHandler: requestHandler))
  70. }
  71. /// Activate the test proxy; this should be called
  72. internal func activate() {
  73. switch self.activeState {
  74. case .inactive:
  75. // Activate the channel. This will allow any request parts to be sent.
  76. self.channel.pipeline.fireChannelActive()
  77. // Unbuffer any response parts.
  78. while !self.responseBuffer.isEmpty {
  79. self.write(self.responseBuffer.removeFirst())
  80. }
  81. // Now we're active.
  82. self.activeState = .active
  83. case .active:
  84. ()
  85. }
  86. }
  87. /// Write or buffer the response part, depending on the our current state.
  88. internal func _sendResponsePart(_ part: _GRPCClientResponsePart<Response>) throws {
  89. try self.send(.responsePart(part))
  90. }
  91. internal func _sendError(_ error: Error) throws {
  92. try self.send(.error(error))
  93. }
  94. private func send(_ event: StreamEvent) throws {
  95. switch self.validate(event) {
  96. case .valid:
  97. self.writeOrBuffer(event)
  98. case .validIfSentAfter(let extraPart):
  99. self.writeOrBuffer(extraPart)
  100. self.writeOrBuffer(event)
  101. case .invalid(let reason):
  102. throw FakeResponseProtocolViolation(reason)
  103. }
  104. }
  105. /// Validate events the user wants to send on the stream.
  106. private func validate(_ event: StreamEvent) -> Validation {
  107. switch (event, self.sendState) {
  108. case (.responsePart(.initialMetadata), .idle):
  109. self.sendState = .sending
  110. return .valid
  111. case (.responsePart(.initialMetadata), .sending),
  112. (.responsePart(.initialMetadata), .closing),
  113. (.responsePart(.initialMetadata), .closed):
  114. // We can only send initial metadata from '.idle'.
  115. return .invalid(reason: "Initial metadata has already been sent")
  116. case (.responsePart(.message), .idle):
  117. // This is fine: we don't force the user to specify initial metadata so we send some on their
  118. // behalf.
  119. self.sendState = .sending
  120. return .validIfSentAfter(.responsePart(.initialMetadata([:])))
  121. case (.responsePart(.message), .sending):
  122. return .valid
  123. case (.responsePart(.message), .closing),
  124. (.responsePart(.message), .closed):
  125. // We can't send messages once we're closing or closed.
  126. return .invalid(reason: "Messages can't be sent after the stream has been closed")
  127. case (.responsePart(.trailingMetadata), .idle),
  128. (.responsePart(.trailingMetadata), .sending):
  129. self.sendState = .closing
  130. return .valid
  131. case (.responsePart(.trailingMetadata), .closing),
  132. (.responsePart(.trailingMetadata), .closed):
  133. // We're already closing or closed.
  134. return .invalid(reason: "Trailing metadata can't be sent after the stream has been closed")
  135. case (.responsePart(.status), .idle),
  136. (.error, .idle),
  137. (.responsePart(.status), .sending),
  138. (.error, .sending),
  139. (.responsePart(.status), .closed),
  140. (.error, .closed):
  141. // We can only error/close if we're closing (i.e. have already sent trailers which we enforce
  142. // from the API in the subclasses).
  143. return .invalid(reason: "Status/error can only be sent after trailing metadata has been sent")
  144. case (.responsePart(.status), .closing),
  145. (.error, .closing):
  146. self.sendState = .closed
  147. return .valid
  148. }
  149. }
  150. private enum Validation {
  151. /// Sending the part is valid.
  152. case valid
  153. /// Sending the part, if it is sent after the given part.
  154. case validIfSentAfter(_ part: StreamEvent)
  155. /// Sending the part would be a protocol violation.
  156. case invalid(reason: String)
  157. }
  158. private func writeOrBuffer(_ event: StreamEvent) {
  159. switch self.activeState {
  160. case .inactive:
  161. self.responseBuffer.append(event)
  162. case .active:
  163. self.write(event)
  164. }
  165. }
  166. private func write(_ part: StreamEvent) {
  167. switch part {
  168. case .error(let error):
  169. self.channel.pipeline.fireErrorCaught(error)
  170. case .responsePart(let responsePart):
  171. // We tolerate errors here: an error will be thrown if the write results in an error which
  172. // isn't caught in the channel. Errors in the channel get funnelled into the transport held
  173. // by the actual call object and handled there.
  174. _ = try? self.channel.writeInbound(responsePart)
  175. }
  176. }
  177. }
  178. // MARK: - Unary Response
  179. /// A fake unary response to be used with a generated test client.
  180. ///
  181. /// Users typically create fake responses via helper methods on their generated test clients
  182. /// corresponding to the RPC which they intend to test.
  183. ///
  184. /// For unary responses users may call one of two functions for each RPC:
  185. /// - `sendMessage(_:initialMetadata:trailingMetadata:status)`, or
  186. /// - `sendError(status:trailingMetadata)`
  187. ///
  188. /// `sendMessage` sends a normal unary response with the provided message and allows the caller to
  189. /// also specify initial metadata, trailing metadata and the status. Both metadata arguments are
  190. /// empty by default and the status defaults to one with an 'ok' status code.
  191. ///
  192. /// `sendError` may be used to terminate an RPC without providing a response. As for `sendMessage`,
  193. /// the `trailingMetadata` defaults to being empty.
  194. public class FakeUnaryResponse<Request: GRPCPayload, Response: GRPCPayload>: _FakeResponseStream<Request, Response> {
  195. public override init(requestHandler: @escaping (FakeRequestPart<Request>) -> () = { _ in }) {
  196. super.init(requestHandler: requestHandler)
  197. }
  198. /// Send a response message to the client.
  199. ///
  200. /// - Parameters:
  201. /// - response: The message to send.
  202. /// - initialMetadata: The initial metadata to send. By default the metadata will be empty.
  203. /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
  204. /// - status: The status to send. By default this has an '.ok' status code.
  205. /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC
  206. /// protocol, e.g. sending messages after the RPC has ended.
  207. public func sendMessage(
  208. _ response: Response,
  209. initialMetadata: HPACKHeaders = [:],
  210. trailingMetadata: HPACKHeaders = [:],
  211. status: GRPCStatus = .ok
  212. ) throws {
  213. try self._sendResponsePart(.initialMetadata(initialMetadata))
  214. try self._sendResponsePart(.message(.init(response, compressed: false)))
  215. try self._sendResponsePart(.trailingMetadata(trailingMetadata))
  216. try self._sendResponsePart(.status(status))
  217. }
  218. /// Send an error to the client.
  219. ///
  220. /// - Parameters:
  221. /// - error: The error to send.
  222. /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
  223. public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws {
  224. try self._sendResponsePart(.trailingMetadata(trailingMetadata))
  225. try self._sendError(error)
  226. }
  227. }
  228. // MARK: - Streaming Response
  229. /// A fake streaming response to be used with a generated test client.
  230. ///
  231. /// Users typically create fake responses via helper methods on their generated test clients
  232. /// corresponding to the RPC which they intend to test.
  233. ///
  234. /// For streaming responses users have a number of methods available to them:
  235. /// - `sendInitialMetadata(_:)`
  236. /// - `sendMessage(_:)`
  237. /// - `sendEnd(status:trailingMetadata:)`
  238. /// - `sendError(_:trailingMetadata)`
  239. ///
  240. /// `sendInitialMetadata` may be called to send initial metadata to the client, however, it
  241. /// must be called first in order for the metadata to be sent. If it is not called, empty
  242. /// metadata will be sent automatically if necessary.
  243. ///
  244. /// `sendMessage` may be called to send a response message on the stream. This may be called
  245. /// multiple times. Messages will be ignored if this is called after `sendEnd` or `sendError`.
  246. ///
  247. /// `sendEnd` indicates that the response stream has closed. It – or `sendError` - must be called
  248. /// once. The `status` defaults to a value with the `ok` code and `trailingMetadata` is empty by
  249. /// default.
  250. ///
  251. /// `sendError` may be called at any time to indicate an error on the response stream.
  252. /// Like `sendEnd`, `trailingMetadata` is empty by default.
  253. public class FakeStreamingResponse<Request: GRPCPayload, Response: GRPCPayload>: _FakeResponseStream<Request, Response> {
  254. public override init(requestHandler: @escaping (FakeRequestPart<Request>) -> () = { _ in }) {
  255. super.init(requestHandler: requestHandler)
  256. }
  257. /// Send initial metadata to the client.
  258. ///
  259. /// Note that calling this function is not required; empty initial metadata will be sent
  260. /// automatically if necessary.
  261. ///
  262. /// - Parameter metadata: The metadata to send
  263. /// - Throws: FakeResponseProtocolViolation if sending initial metadata would violate the gRPC
  264. /// protocol, e.g. sending metadata too many times, or out of order.
  265. public func sendInitialMetadata(_ metadata: HPACKHeaders) throws {
  266. try self._sendResponsePart(.initialMetadata(metadata))
  267. }
  268. /// Send a response message to the client.
  269. ///
  270. /// - Parameter response: The response to send.
  271. /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC
  272. /// protocol, e.g. sending messages after the RPC has ended.
  273. public func sendMessage(_ response: Response) throws {
  274. try self._sendResponsePart(.message(.init(response, compressed: false)))
  275. }
  276. /// Send the RPC status and trailing metadata to the client.
  277. ///
  278. /// - Parameters:
  279. /// - status: The status to send. By default the status code will be '.ok'.
  280. /// - trailingMetadata: The trailing metadata to send. Empty by default.
  281. /// - Throws: FakeResponseProtocolViolation if ending the RPC would violate the gRPC
  282. /// protocol, e.g. sending end after the RPC has already completed.
  283. public func sendEnd(status: GRPCStatus = .ok, trailingMetadata: HPACKHeaders = [:]) throws {
  284. try self._sendResponsePart(.trailingMetadata(trailingMetadata))
  285. try self._sendResponsePart(.status(status))
  286. }
  287. /// Send an error to the client.
  288. ///
  289. /// - Parameters:
  290. /// - error: The error to send.
  291. /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
  292. /// - Throws: FakeResponseProtocolViolation if sending the error would violate the gRPC
  293. /// protocol, e.g. erroring after the RPC has already completed.
  294. public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws {
  295. try self._sendResponsePart(.trailingMetadata(trailingMetadata))
  296. try self._sendError(error)
  297. }
  298. }