2
0

TestClientExample.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import EchoImplementation
  18. import GRPC
  19. import NIO
  20. import XCTest
  21. class FakeResponseStreamExampleTests: GRPCTestCase {
  22. var client: Echo_EchoTestClient!
  23. override func setUp() {
  24. super.setUp()
  25. self.client = Echo_EchoTestClient(defaultCallOptions: self.callOptionsWithLogger)
  26. }
  27. func testUnary() {
  28. // Enqueue a Get response. This will be dequeued when the client makes the next Get RPC.
  29. //
  30. // We can also use a response stream, see 'testUnaryResponseStream'.
  31. self.client.enqueueGetResponse(.with { $0.text = "Bar!" })
  32. // Start the Get RPC.
  33. let get = self.client.get(.with { $0.text = "Foo!" })
  34. // The response stream has been consumed by the call.
  35. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  36. // Check the response values:
  37. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  38. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  39. }
  40. func testUnaryResponseStream() {
  41. // Enqueue a Get response stream. This will be used for the next Get RPC and we can choose when
  42. // to send responses on it.
  43. let stream = self.client.makeGetResponseStream()
  44. // Start the Get RPC.
  45. let get = self.client.get(.with { $0.text = "Foo!" })
  46. // The response stream has been consumed by the call.
  47. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  48. // Now send the response on the stream we made.
  49. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Bar!" }))
  50. // Check the response values:
  51. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  52. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  53. }
  54. func testClientStreaming() {
  55. // Enqueue a Collect response. This will be dequeued when the client makes the next Collect RPC.
  56. //
  57. // We can also use a response stream, see 'testClientStreamingResponseStream'.
  58. self.client.enqueueCollectResponse(.with { $0.text = "Foo" })
  59. // Start the Collect RPC.
  60. let collect = self.client.collect()
  61. // The response stream has been consumed by the call.
  62. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  63. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  64. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  65. }
  66. func testClientStreamingResponseStream() {
  67. // Enqueue a Collect response stream. This will be used for the next Collect RPC and we can choose when
  68. // to send responses on it.
  69. let stream = self.client.makeCollectResponseStream()
  70. // Start the Collect RPC.
  71. let collect = self.client.collect()
  72. // The response stream has been consumed by the call.
  73. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  74. // Send the response on the stream we made.
  75. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Foo" }))
  76. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  77. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  78. }
  79. func testServerStreaming() {
  80. // Enqueue some Expand responses. These will be dequeued when the client makes the next Expand RPC.
  81. //
  82. // We can also use a response stream, see 'testServerStreamingResponseStream'.
  83. let fooBarBaz = ["Foo", "Bar", "Baz"]
  84. self.client.enqueueExpandResponses(fooBarBaz.map { text in .with { $0.text = text }})
  85. // Start the 'Expand' RPC. We'll create a handler which records responses.
  86. //
  87. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  88. // executed on a different thread; for the test client the calling thread is thread is the same
  89. // as the tread on which the RPC is called, i.e. this thread.
  90. var responses: [String] = []
  91. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  92. responses.append(response.text)
  93. }
  94. // The response stream has been consumed by the call.
  95. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  96. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  97. XCTAssertEqual(responses, fooBarBaz)
  98. }
  99. func testServerStreamingResponseStream() {
  100. // Enqueue an Expand response stream. This will be used for the next Expand RPC and we can choose
  101. // when to send responses on it.
  102. let stream = self.client.makeExpandResponseStream()
  103. // Start the 'Expand' RPC. We'll create a handler which records responses.
  104. //
  105. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  106. // executed on a different thread; for the test client the calling thread is thread is the same
  107. // as the tread on which the RPC is called, i.e. this thread.
  108. var responses: [String] = []
  109. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  110. responses.append(response.text)
  111. }
  112. // The response stream has been consumed by the call.
  113. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  114. // Send some responses.
  115. let fooBarBaz = ["Foo", "Bar", "Baz"]
  116. for message in fooBarBaz {
  117. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  118. }
  119. // Close the stream.
  120. XCTAssertNoThrow(try stream.sendEnd())
  121. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  122. XCTAssertEqual(responses, fooBarBaz)
  123. }
  124. func testBidirectionalStreaming() {
  125. // Enqueue some Update responses. These will be dequeued when the client makes the next Update RPC.
  126. //
  127. // We can also use a response stream, see 'testBidirectionalStreamingResponseStream'.
  128. let fooBarBaz = ["Foo", "Bar", "Baz"]
  129. self.client.enqueueUpdateResponses(fooBarBaz.map { text in .with { $0.text = text }})
  130. // Start the 'Update' RPC. We'll create a handler which records responses.
  131. //
  132. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  133. // executed on a different thread; for the test client the calling thread is thread is the same
  134. // as the tread on which the RPC is called, i.e. this thread.
  135. var responses: [String] = []
  136. let update = self.client.update { response in
  137. responses.append(response.text)
  138. }
  139. // The response stream has been consumed by the call.
  140. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  141. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  142. XCTAssertEqual(responses, fooBarBaz)
  143. }
  144. func testBidirectionalStreamingResponseStream() {
  145. // Enqueue an Update response stream. This will be used for the next Update RPC and we can choose
  146. // when to send responses on it.
  147. let stream = self.client.makeUpdateResponseStream()
  148. // Start the 'Update' RPC. We'll create a handler which records responses.
  149. //
  150. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  151. // executed on a different thread; for the test client the calling thread is thread is the same
  152. // as the tread on which the RPC is called, i.e. this thread.
  153. var responses: [String] = []
  154. let update = self.client.update { response in
  155. responses.append(response.text)
  156. }
  157. // The response stream has been consumed by the call.
  158. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  159. // Send some responses.
  160. let fooBarBaz = ["Foo", "Bar", "Baz"]
  161. for message in fooBarBaz {
  162. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  163. }
  164. // Close the stream.
  165. XCTAssertNoThrow(try stream.sendEnd())
  166. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  167. XCTAssertEqual(responses, fooBarBaz)
  168. }
  169. }
  170. // These tests demonstrate the finer grained control enabled by the response streams.
  171. extension FakeResponseStreamExampleTests {
  172. func testUnaryWithTrailingMetadata() {
  173. // Create a response stream for the RPC.
  174. let getResponseStream = self.client.makeGetResponseStream()
  175. // Send the request.
  176. let get = self.client.get(.with { $0.text = "Hello!" })
  177. // Send the response as well as some trailing metadata.
  178. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }, trailingMetadata: ["bar": "baz"]))
  179. // Check the response values:
  180. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  181. XCTAssertEqual(try get.trailingMetadata.wait(), ["bar": "baz"])
  182. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  183. }
  184. func testUnaryError() {
  185. // Create a response stream for the RPC.
  186. let getResponseStream = self.client.makeGetResponseStream()
  187. // Send the request.
  188. let get = self.client.get(.with { $0.text = "Hello!" })
  189. // Respond with an error. We could send trailing metadata here as well.
  190. struct DummyError: Error {}
  191. XCTAssertNoThrow(try getResponseStream.sendError(DummyError()))
  192. // Check the response values:
  193. XCTAssertThrowsError(try get.response.wait()) { error in
  194. XCTAssertTrue(error is DummyError)
  195. }
  196. // We sent a dummy error; we could have sent a `GRPCStatus` error in which case we could assert
  197. // for equality here.
  198. XCTAssertFalse(try get.status.map { $0.isOk }.wait())
  199. }
  200. func testUnaryWithRequestHandler() {
  201. // Create a response stream for the RPC we want to make, we'll specify a *request* handler as well.
  202. let getResponseStream = self.client.makeGetResponseStream { requestPart in
  203. switch requestPart {
  204. case .metadata(let headers):
  205. XCTAssertTrue(headers.contains(name: "a-test-key"))
  206. case .message(let request):
  207. XCTAssertEqual(request, .with { $0.text = "Hello!" })
  208. case .end:
  209. ()
  210. }
  211. }
  212. // We'll send some custom metadata for the call as well. It will be validated above.
  213. let callOptions = CallOptions(customMetadata: ["a-test-key": "a test value"])
  214. let get = self.client.get(.with { $0.text = "Hello!" }, callOptions: callOptions)
  215. // Send the response.
  216. XCTAssertNoThrow(try getResponseStream.sendMessage(.with{ $0.text = "Goodbye!" }))
  217. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  218. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  219. }
  220. func testUnaryResponseOrdering() {
  221. // Create a response stream for the RPC we want to make.
  222. let getResponseStream = self.client.makeGetResponseStream()
  223. // We can queue up the response *before* we make the RPC.
  224. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
  225. // Start the RPC: the response will be sent automatically.
  226. let get = self.client.get(.with { $0.text = "Hello!" })
  227. // Check the response values.
  228. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  229. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  230. }
  231. func testBidirectionalResponseOrdering() {
  232. // Create a response stream for the RPC we want to make.
  233. let updateResponseStream = self.client.makeUpdateResponseStream()
  234. // We can queue up responses *before* we make the RPC.
  235. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "1" }))
  236. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "2" }))
  237. // Start the RPC: the response will be sent automatically.
  238. var responses: [Echo_EchoResponse] = []
  239. let update = self.client.update { response in
  240. responses.append(response)
  241. }
  242. // We should have two responses already.
  243. XCTAssertEqual(responses.count, 2)
  244. // We can also send responses after starting the RPC.
  245. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "3" }))
  246. // And interleave requests with responses.
  247. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "a" }).wait())
  248. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "b" }).wait())
  249. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "c" }).wait())
  250. XCTAssertNoThrow(try update.sendEnd().wait())
  251. // Send the final response and close.
  252. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "4" }))
  253. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  254. // Check the response values.
  255. let expected = (1...4).map { number in
  256. Echo_EchoResponse.with { $0.text = "\(number)" }
  257. }
  258. XCTAssertEqual(responses, expected)
  259. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  260. }
  261. func testBidirectionalStreamingSendAfterEnd() {
  262. // Enqueue the responses for Update.
  263. self.client.enqueueUpdateResponses([.with { $0.text = "Foo" }])
  264. // Start a call.
  265. let update = self.client.update { response in
  266. XCTAssertEqual(response, .with { $0.text = "Foo" })
  267. }
  268. // Since the RPC has already completed (the status promise has been fulfilled), send will fail.
  269. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Kaboom!"}).wait())
  270. XCTAssertThrowsError(try update.sendEnd().wait())
  271. // The call completed *before* we tried to send "Kaboom!".
  272. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  273. }
  274. func testBidirectionalWithCustomInitialMetadata() {
  275. // Create a response stream for the RPC we want to make.
  276. let updateResponseStream = self.client.makeUpdateResponseStream()
  277. // Send back some initial metadata, response, and trailers.
  278. XCTAssertNoThrow(try updateResponseStream.sendInitialMetadata(["foo": "bar"]))
  279. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  280. XCTAssertNoThrow(try updateResponseStream.sendEnd(trailingMetadata: ["bar": "baz"]))
  281. // Start the RPC. We only expect one response so we'll validate it in the handler.
  282. let update = self.client.update { response in
  283. XCTAssertEqual(response, .with { $0.text = "foo" })
  284. }
  285. // Check the rest of the response part values.
  286. XCTAssertEqual(try update.initialMetadata.wait(), ["foo": "bar"])
  287. XCTAssertEqual(try update.trailingMetadata.wait(), ["bar": "baz"])
  288. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  289. }
  290. func testWriteAfterEndFails() {
  291. // Create a response stream for the RPC we want to make.
  292. let updateResponseStream = self.client.makeUpdateResponseStream()
  293. // Start the RPC.
  294. let update = self.client.update { response in
  295. XCTFail("Unexpected response: \(response)")
  296. }
  297. // Send a message and end.
  298. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "1" }).wait())
  299. XCTAssertNoThrow(try update.sendEnd().wait())
  300. // Send another message, the write should fail.
  301. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Too late!" }).wait()) { error in
  302. XCTAssertEqual(error as? ChannelError, .ioOnClosedChannel)
  303. }
  304. // Send close from the server.
  305. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  306. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  307. }
  308. func testWeGetAllRequestParts() {
  309. var requestParts: [FakeRequestPart<Echo_EchoRequest>] = []
  310. let updateResponseStream = self.client.makeUpdateResponseStream { request in
  311. requestParts.append(request)
  312. }
  313. let update = self.client.update(callOptions: CallOptions(customMetadata: ["foo": "bar"])) {
  314. XCTFail("Unexpected response: \($0)")
  315. }
  316. update.sendMessage(.with { $0.text = "foo" }, promise: nil)
  317. update.sendEnd(promise: nil)
  318. // These should be ignored since we've already sent end.
  319. update.sendMessage(.with { $0.text = "bar" }, promise: nil)
  320. update.sendEnd(promise: nil)
  321. // Check the expected request parts.
  322. XCTAssertEqual(requestParts, [
  323. .metadata(["foo": "bar"]),
  324. .message(.with { $0.text = "foo" }),
  325. .end
  326. ])
  327. // Send close from the server.
  328. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  329. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  330. }
  331. func testInitialMetadataIsSentAutomatically() {
  332. let updateResponseStream = self.client.makeUpdateResponseStream()
  333. let update = self.client.update { response in
  334. XCTAssertEqual(response, .with { $0.text = "foo" })
  335. }
  336. // Send a message and end. Initial metadata is explicitly not set but will be sent on our
  337. // behalf. It will be empty.
  338. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  339. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  340. // Metadata should be empty.
  341. XCTAssertEqual(try update.initialMetadata.wait(), [:])
  342. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  343. }
  344. func testMissingResponseStream() {
  345. // If no response stream is created for a call then it will fail with status code 'unavailable'.
  346. let get = self.client.get(.with { $0.text = "Uh oh!" })
  347. XCTAssertEqual(try get.status.map { $0.code }.wait(), .unavailable)
  348. XCTAssertThrowsError(try get.response.wait()) { error in
  349. guard let status = error as? GRPCStatus else {
  350. XCTFail("Expected a GRPCStatus, had the error was: \(error)")
  351. return
  352. }
  353. XCTAssertEqual(status.code, .unavailable)
  354. }
  355. }
  356. }