TestClientExample.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIO
  20. import XCTest
  21. class FakeResponseStreamExampleTests: GRPCTestCase {
  22. var client: Echo_EchoTestClient!
  23. override func setUp() {
  24. super.setUp()
  25. self.client = Echo_EchoTestClient(defaultCallOptions: self.callOptionsWithLogger)
  26. }
  27. func testUnary() {
  28. // Enqueue a Get response. This will be dequeued when the client makes the next Get RPC.
  29. //
  30. // We can also use a response stream, see 'testUnaryResponseStream'.
  31. self.client.enqueueGetResponse(.with { $0.text = "Bar!" })
  32. // Start the Get RPC.
  33. let get = self.client.get(.with { $0.text = "Foo!" })
  34. // The response stream has been consumed by the call.
  35. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  36. // Check the response values:
  37. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  38. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  39. }
  40. func testUnaryResponseStream() {
  41. // Enqueue a Get response stream. This will be used for the next Get RPC and we can choose when
  42. // to send responses on it.
  43. let stream = self.client.makeGetResponseStream()
  44. // Start the Get RPC.
  45. let get = self.client.get(.with { $0.text = "Foo!" })
  46. // The response stream has been consumed by the call.
  47. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  48. // Now send the response on the stream we made.
  49. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Bar!" }))
  50. // Check the response values:
  51. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  52. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  53. }
  54. func testClientStreaming() {
  55. // Enqueue a Collect response. This will be dequeued when the client makes the next Collect RPC.
  56. //
  57. // We can also use a response stream, see 'testClientStreamingResponseStream'.
  58. self.client.enqueueCollectResponse(.with { $0.text = "Foo" })
  59. // Start the Collect RPC.
  60. let collect = self.client.collect()
  61. // The response stream has been consumed by the call.
  62. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  63. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  64. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  65. }
  66. func testClientStreamingResponseStream() {
  67. // Enqueue a Collect response stream. This will be used for the next Collect RPC and we can choose when
  68. // to send responses on it.
  69. let stream = self.client.makeCollectResponseStream()
  70. // Start the Collect RPC.
  71. let collect = self.client.collect()
  72. // The response stream has been consumed by the call.
  73. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  74. // Send the response on the stream we made.
  75. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Foo" }))
  76. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  77. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  78. }
  79. func testServerStreaming() {
  80. // Enqueue some Expand responses. These will be dequeued when the client makes the next Expand RPC.
  81. //
  82. // We can also use a response stream, see 'testServerStreamingResponseStream'.
  83. let fooBarBaz = ["Foo", "Bar", "Baz"]
  84. self.client.enqueueExpandResponses(fooBarBaz.map { text in .with { $0.text = text } })
  85. // Start the 'Expand' RPC. We'll create a handler which records responses.
  86. //
  87. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  88. // executed on a different thread; for the test client the calling thread is thread is the same
  89. // as the tread on which the RPC is called, i.e. this thread.
  90. var responses: [String] = []
  91. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  92. responses.append(response.text)
  93. }
  94. // The response stream has been consumed by the call.
  95. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  96. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  97. XCTAssertEqual(responses, fooBarBaz)
  98. }
  99. func testServerStreamingResponseStream() {
  100. // Enqueue an Expand response stream. This will be used for the next Expand RPC and we can choose
  101. // when to send responses on it.
  102. let stream = self.client.makeExpandResponseStream()
  103. // Start the 'Expand' RPC. We'll create a handler which records responses.
  104. //
  105. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  106. // executed on a different thread; for the test client the calling thread is thread is the same
  107. // as the tread on which the RPC is called, i.e. this thread.
  108. var responses: [String] = []
  109. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  110. responses.append(response.text)
  111. }
  112. // The response stream has been consumed by the call.
  113. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  114. // Send some responses.
  115. let fooBarBaz = ["Foo", "Bar", "Baz"]
  116. for message in fooBarBaz {
  117. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  118. }
  119. // Close the stream.
  120. XCTAssertNoThrow(try stream.sendEnd())
  121. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  122. XCTAssertEqual(responses, fooBarBaz)
  123. }
  124. func testBidirectionalStreaming() {
  125. // Enqueue some Update responses. These will be dequeued when the client makes the next Update RPC.
  126. //
  127. // We can also use a response stream, see 'testBidirectionalStreamingResponseStream'.
  128. let fooBarBaz = ["Foo", "Bar", "Baz"]
  129. self.client.enqueueUpdateResponses(fooBarBaz.map { text in .with { $0.text = text } })
  130. // Start the 'Update' RPC. We'll create a handler which records responses.
  131. //
  132. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  133. // executed on a different thread; for the test client the calling thread is thread is the same
  134. // as the tread on which the RPC is called, i.e. this thread.
  135. var responses: [String] = []
  136. let update = self.client.update { response in
  137. responses.append(response.text)
  138. }
  139. // The response stream has been consumed by the call.
  140. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  141. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  142. XCTAssertEqual(responses, fooBarBaz)
  143. }
  144. func testBidirectionalStreamingResponseStream() {
  145. // Enqueue an Update response stream. This will be used for the next Update RPC and we can choose
  146. // when to send responses on it.
  147. let stream = self.client.makeUpdateResponseStream()
  148. // Start the 'Update' RPC. We'll create a handler which records responses.
  149. //
  150. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  151. // executed on a different thread; for the test client the calling thread is thread is the same
  152. // as the tread on which the RPC is called, i.e. this thread.
  153. var responses: [String] = []
  154. let update = self.client.update { response in
  155. responses.append(response.text)
  156. }
  157. // The response stream has been consumed by the call.
  158. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  159. // Send some responses.
  160. let fooBarBaz = ["Foo", "Bar", "Baz"]
  161. for message in fooBarBaz {
  162. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  163. }
  164. // Close the stream.
  165. XCTAssertNoThrow(try stream.sendEnd())
  166. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  167. XCTAssertEqual(responses, fooBarBaz)
  168. }
  169. }
  170. // These tests demonstrate the finer grained control enabled by the response streams.
  171. extension FakeResponseStreamExampleTests {
  172. func testUnaryWithTrailingMetadata() {
  173. // Create a response stream for the RPC.
  174. let getResponseStream = self.client.makeGetResponseStream()
  175. // Send the request.
  176. let get = self.client.get(.with { $0.text = "Hello!" })
  177. // Send the response as well as some trailing metadata.
  178. XCTAssertNoThrow(try getResponseStream.sendMessage(
  179. .with { $0.text = "Goodbye!" },
  180. trailingMetadata: ["bar": "baz"]
  181. ))
  182. // Check the response values:
  183. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  184. XCTAssertEqual(try get.trailingMetadata.wait(), ["bar": "baz"])
  185. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  186. }
  187. func testUnaryError() {
  188. // Create a response stream for the RPC.
  189. let getResponseStream = self.client.makeGetResponseStream()
  190. // Send the request.
  191. let get = self.client.get(.with { $0.text = "Hello!" })
  192. // Respond with an error. We could send trailing metadata here as well.
  193. struct DummyError: Error {}
  194. XCTAssertNoThrow(try getResponseStream.sendError(DummyError()))
  195. // Check the response values:
  196. XCTAssertThrowsError(try get.response.wait()) { error in
  197. XCTAssertTrue(error is DummyError)
  198. }
  199. // We sent a dummy error; we could have sent a `GRPCStatus` error in which case we could assert
  200. // for equality here.
  201. XCTAssertFalse(try get.status.map { $0.isOk }.wait())
  202. }
  203. func testUnaryWithRequestHandler() {
  204. // Create a response stream for the RPC we want to make, we'll specify a *request* handler as well.
  205. let getResponseStream = self.client.makeGetResponseStream { requestPart in
  206. switch requestPart {
  207. case let .metadata(headers):
  208. XCTAssertTrue(headers.contains(name: "a-test-key"))
  209. case let .message(request):
  210. XCTAssertEqual(request, .with { $0.text = "Hello!" })
  211. case .end:
  212. ()
  213. }
  214. }
  215. // We'll send some custom metadata for the call as well. It will be validated above.
  216. let callOptions = CallOptions(customMetadata: ["a-test-key": "a test value"])
  217. let get = self.client.get(.with { $0.text = "Hello!" }, callOptions: callOptions)
  218. // Send the response.
  219. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
  220. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  221. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  222. }
  223. func testUnaryResponseOrdering() {
  224. // Create a response stream for the RPC we want to make.
  225. let getResponseStream = self.client.makeGetResponseStream()
  226. // We can queue up the response *before* we make the RPC.
  227. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
  228. // Start the RPC: the response will be sent automatically.
  229. let get = self.client.get(.with { $0.text = "Hello!" })
  230. // Check the response values.
  231. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  232. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  233. }
  234. func testBidirectionalResponseOrdering() {
  235. // Create a response stream for the RPC we want to make.
  236. let updateResponseStream = self.client.makeUpdateResponseStream()
  237. // We can queue up responses *before* we make the RPC.
  238. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "1" }))
  239. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "2" }))
  240. // Start the RPC: the response will be sent automatically.
  241. var responses: [Echo_EchoResponse] = []
  242. let update = self.client.update { response in
  243. responses.append(response)
  244. }
  245. // We should have two responses already.
  246. XCTAssertEqual(responses.count, 2)
  247. // We can also send responses after starting the RPC.
  248. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "3" }))
  249. // And interleave requests with responses.
  250. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "a" }).wait())
  251. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "b" }).wait())
  252. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "c" }).wait())
  253. XCTAssertNoThrow(try update.sendEnd().wait())
  254. // Send the final response and close.
  255. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "4" }))
  256. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  257. // Check the response values.
  258. let expected = (1 ... 4).map { number in
  259. Echo_EchoResponse.with { $0.text = "\(number)" }
  260. }
  261. XCTAssertEqual(responses, expected)
  262. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  263. }
  264. func testBidirectionalStreamingSendAfterEnd() {
  265. // Enqueue the responses for Update.
  266. self.client.enqueueUpdateResponses([.with { $0.text = "Foo" }])
  267. // Start a call.
  268. let update = self.client.update { response in
  269. XCTAssertEqual(response, .with { $0.text = "Foo" })
  270. }
  271. // Since the RPC has already completed (the status promise has been fulfilled), send will fail.
  272. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Kaboom!" }).wait())
  273. XCTAssertThrowsError(try update.sendEnd().wait())
  274. // The call completed *before* we tried to send "Kaboom!".
  275. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  276. }
  277. func testBidirectionalWithCustomInitialMetadata() {
  278. // Create a response stream for the RPC we want to make.
  279. let updateResponseStream = self.client.makeUpdateResponseStream()
  280. // Send back some initial metadata, response, and trailers.
  281. XCTAssertNoThrow(try updateResponseStream.sendInitialMetadata(["foo": "bar"]))
  282. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  283. XCTAssertNoThrow(try updateResponseStream.sendEnd(trailingMetadata: ["bar": "baz"]))
  284. // Start the RPC. We only expect one response so we'll validate it in the handler.
  285. let update = self.client.update { response in
  286. XCTAssertEqual(response, .with { $0.text = "foo" })
  287. }
  288. // Check the rest of the response part values.
  289. XCTAssertEqual(try update.initialMetadata.wait(), ["foo": "bar"])
  290. XCTAssertEqual(try update.trailingMetadata.wait(), ["bar": "baz"])
  291. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  292. }
  293. func testWriteAfterEndFails() {
  294. // Create a response stream for the RPC we want to make.
  295. let updateResponseStream = self.client.makeUpdateResponseStream()
  296. // Start the RPC.
  297. let update = self.client.update { response in
  298. XCTFail("Unexpected response: \(response)")
  299. }
  300. // Send a message and end.
  301. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "1" }).wait())
  302. XCTAssertNoThrow(try update.sendEnd().wait())
  303. // Send another message, the write should fail.
  304. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Too late!" }).wait()) { error in
  305. XCTAssertEqual(error as? ChannelError, .ioOnClosedChannel)
  306. }
  307. // Send close from the server.
  308. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  309. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  310. }
  311. func testWeGetAllRequestParts() {
  312. var requestParts: [FakeRequestPart<Echo_EchoRequest>] = []
  313. let updateResponseStream = self.client.makeUpdateResponseStream { request in
  314. requestParts.append(request)
  315. }
  316. let update = self.client.update(callOptions: CallOptions(customMetadata: ["foo": "bar"])) {
  317. XCTFail("Unexpected response: \($0)")
  318. }
  319. update.sendMessage(.with { $0.text = "foo" }, promise: nil)
  320. update.sendEnd(promise: nil)
  321. // These should be ignored since we've already sent end.
  322. update.sendMessage(.with { $0.text = "bar" }, promise: nil)
  323. update.sendEnd(promise: nil)
  324. // Check the expected request parts.
  325. XCTAssertEqual(requestParts, [
  326. .metadata(["foo": "bar"]),
  327. .message(.with { $0.text = "foo" }),
  328. .end,
  329. ])
  330. // Send close from the server.
  331. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  332. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  333. }
  334. func testInitialMetadataIsSentAutomatically() {
  335. let updateResponseStream = self.client.makeUpdateResponseStream()
  336. let update = self.client.update { response in
  337. XCTAssertEqual(response, .with { $0.text = "foo" })
  338. }
  339. // Send a message and end. Initial metadata is explicitly not set but will be sent on our
  340. // behalf. It will be empty.
  341. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  342. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  343. // Metadata should be empty.
  344. XCTAssertEqual(try update.initialMetadata.wait(), [:])
  345. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  346. }
  347. func testMissingResponseStream() {
  348. // If no response stream is created for a call then it will fail with status code 'unavailable'.
  349. let get = self.client.get(.with { $0.text = "Uh oh!" })
  350. XCTAssertEqual(try get.status.map { $0.code }.wait(), .unavailable)
  351. XCTAssertThrowsError(try get.response.wait()) { error in
  352. guard let status = error as? GRPCStatus else {
  353. XCTFail("Expected a GRPCStatus, had the error was: \(error)")
  354. return
  355. }
  356. XCTAssertEqual(status.code, .unavailable)
  357. }
  358. }
  359. }