TestClientExample.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIOCore
  20. import XCTest
  21. @available(swift, deprecated: 5.6)
  22. class FakeResponseStreamExampleTests: GRPCTestCase {
  23. var client: Echo_EchoTestClient!
  24. override func setUp() {
  25. super.setUp()
  26. self.client = Echo_EchoTestClient(defaultCallOptions: self.callOptionsWithLogger)
  27. }
  28. func testUnary() {
  29. // Enqueue a Get response. This will be dequeued when the client makes the next Get RPC.
  30. //
  31. // We can also use a response stream, see 'testUnaryResponseStream'.
  32. self.client.enqueueGetResponse(.with { $0.text = "Bar!" })
  33. // Start the Get RPC.
  34. let get = self.client.get(.with { $0.text = "Foo!" })
  35. // The response stream has been consumed by the call.
  36. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  37. // Check the response values:
  38. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  39. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  40. }
  41. func testUnaryResponseStream() {
  42. // Enqueue a Get response stream. This will be used for the next Get RPC and we can choose when
  43. // to send responses on it.
  44. let stream = self.client.makeGetResponseStream()
  45. // Start the Get RPC.
  46. let get = self.client.get(.with { $0.text = "Foo!" })
  47. // The response stream has been consumed by the call.
  48. XCTAssertFalse(self.client.hasGetResponsesRemaining)
  49. // Now send the response on the stream we made.
  50. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Bar!" }))
  51. // Check the response values:
  52. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
  53. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  54. }
  55. func testClientStreaming() {
  56. // Enqueue a Collect response. This will be dequeued when the client makes the next Collect RPC.
  57. //
  58. // We can also use a response stream, see 'testClientStreamingResponseStream'.
  59. self.client.enqueueCollectResponse(.with { $0.text = "Foo" })
  60. // Start the Collect RPC.
  61. let collect = self.client.collect()
  62. // The response stream has been consumed by the call.
  63. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  64. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  65. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  66. }
  67. func testClientStreamingResponseStream() {
  68. // Enqueue a Collect response stream. This will be used for the next Collect RPC and we can choose when
  69. // to send responses on it.
  70. let stream = self.client.makeCollectResponseStream()
  71. // Start the Collect RPC.
  72. let collect = self.client.collect()
  73. // The response stream has been consumed by the call.
  74. XCTAssertFalse(self.client.hasCollectResponsesRemaining)
  75. // Send the response on the stream we made.
  76. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Foo" }))
  77. XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
  78. XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
  79. }
  80. func testServerStreaming() {
  81. // Enqueue some Expand responses. These will be dequeued when the client makes the next Expand RPC.
  82. //
  83. // We can also use a response stream, see 'testServerStreamingResponseStream'.
  84. let fooBarBaz = ["Foo", "Bar", "Baz"]
  85. self.client.enqueueExpandResponses(fooBarBaz.map { text in .with { $0.text = text } })
  86. // Start the 'Expand' RPC. We'll create a handler which records responses.
  87. //
  88. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  89. // executed on a different thread; for the test client the calling thread is thread is the same
  90. // as the tread on which the RPC is called, i.e. this thread.
  91. var responses: [String] = []
  92. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  93. responses.append(response.text)
  94. }
  95. // The response stream has been consumed by the call.
  96. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  97. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  98. XCTAssertEqual(responses, fooBarBaz)
  99. }
  100. func testServerStreamingResponseStream() {
  101. // Enqueue an Expand response stream. This will be used for the next Expand RPC and we can choose
  102. // when to send responses on it.
  103. let stream = self.client.makeExpandResponseStream()
  104. // Start the 'Expand' RPC. We'll create a handler which records responses.
  105. //
  106. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  107. // executed on a different thread; for the test client the calling thread is thread is the same
  108. // as the tread on which the RPC is called, i.e. this thread.
  109. var responses: [String] = []
  110. let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
  111. responses.append(response.text)
  112. }
  113. // The response stream has been consumed by the call.
  114. XCTAssertFalse(self.client.hasExpandResponsesRemaining)
  115. // Send some responses.
  116. let fooBarBaz = ["Foo", "Bar", "Baz"]
  117. for message in fooBarBaz {
  118. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  119. }
  120. // Close the stream.
  121. XCTAssertNoThrow(try stream.sendEnd())
  122. XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
  123. XCTAssertEqual(responses, fooBarBaz)
  124. }
  125. func testBidirectionalStreaming() {
  126. // Enqueue some Update responses. These will be dequeued when the client makes the next Update RPC.
  127. //
  128. // We can also use a response stream, see 'testBidirectionalStreamingResponseStream'.
  129. let fooBarBaz = ["Foo", "Bar", "Baz"]
  130. self.client.enqueueUpdateResponses(fooBarBaz.map { text in .with { $0.text = text } })
  131. // Start the 'Update' RPC. We'll create a handler which records responses.
  132. //
  133. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  134. // executed on a different thread; for the test client the calling thread is thread is the same
  135. // as the tread on which the RPC is called, i.e. this thread.
  136. var responses: [String] = []
  137. let update = self.client.update { response in
  138. responses.append(response.text)
  139. }
  140. // The response stream has been consumed by the call.
  141. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  142. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  143. XCTAssertEqual(responses, fooBarBaz)
  144. }
  145. func testBidirectionalStreamingResponseStream() {
  146. // Enqueue an Update response stream. This will be used for the next Update RPC and we can choose
  147. // when to send responses on it.
  148. let stream = self.client.makeUpdateResponseStream()
  149. // Start the 'Update' RPC. We'll create a handler which records responses.
  150. //
  151. // Note that in normal applications this wouldn't be thread-safe since the response handler is
  152. // executed on a different thread; for the test client the calling thread is thread is the same
  153. // as the tread on which the RPC is called, i.e. this thread.
  154. var responses: [String] = []
  155. let update = self.client.update { response in
  156. responses.append(response.text)
  157. }
  158. // The response stream has been consumed by the call.
  159. XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
  160. // Send some responses.
  161. let fooBarBaz = ["Foo", "Bar", "Baz"]
  162. for message in fooBarBaz {
  163. XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
  164. }
  165. // Close the stream.
  166. XCTAssertNoThrow(try stream.sendEnd())
  167. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  168. XCTAssertEqual(responses, fooBarBaz)
  169. }
  170. }
  171. // These tests demonstrate the finer grained control enabled by the response streams.
  172. @available(swift, deprecated: 5.6)
  173. extension FakeResponseStreamExampleTests {
  174. func testUnaryWithTrailingMetadata() {
  175. // Create a response stream for the RPC.
  176. let getResponseStream = self.client.makeGetResponseStream()
  177. // Send the request.
  178. let get = self.client.get(.with { $0.text = "Hello!" })
  179. // Send the response as well as some trailing metadata.
  180. XCTAssertNoThrow(
  181. try getResponseStream.sendMessage(
  182. .with { $0.text = "Goodbye!" },
  183. trailingMetadata: ["bar": "baz"]
  184. )
  185. )
  186. // Check the response values:
  187. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  188. XCTAssertEqual(try get.trailingMetadata.wait(), ["bar": "baz"])
  189. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  190. }
  191. func testUnaryError() {
  192. // Create a response stream for the RPC.
  193. let getResponseStream = self.client.makeGetResponseStream()
  194. // Send the request.
  195. let get = self.client.get(.with { $0.text = "Hello!" })
  196. // Respond with an error. We could send trailing metadata here as well.
  197. struct DummyError: Error {}
  198. XCTAssertNoThrow(try getResponseStream.sendError(DummyError()))
  199. // Check the response values:
  200. XCTAssertThrowsError(try get.response.wait()) { error in
  201. XCTAssertTrue(error is DummyError)
  202. }
  203. // We sent a dummy error; we could have sent a `GRPCStatus` error in which case we could assert
  204. // for equality here.
  205. XCTAssertFalse(try get.status.map { $0.isOk }.wait())
  206. }
  207. func testUnaryWithRequestHandler() {
  208. // Create a response stream for the RPC we want to make, we'll specify a *request* handler as well.
  209. let getResponseStream = self.client.makeGetResponseStream { requestPart in
  210. switch requestPart {
  211. case let .metadata(headers):
  212. XCTAssertTrue(headers.contains(name: "a-test-key"))
  213. case let .message(request):
  214. XCTAssertEqual(request, .with { $0.text = "Hello!" })
  215. case .end:
  216. ()
  217. }
  218. }
  219. // We'll send some custom metadata for the call as well. It will be validated above.
  220. let callOptions = CallOptions(customMetadata: ["a-test-key": "a test value"])
  221. let get = self.client.get(.with { $0.text = "Hello!" }, callOptions: callOptions)
  222. // Send the response.
  223. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
  224. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  225. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  226. }
  227. func testUnaryResponseOrdering() {
  228. // Create a response stream for the RPC we want to make.
  229. let getResponseStream = self.client.makeGetResponseStream()
  230. // We can queue up the response *before* we make the RPC.
  231. XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
  232. // Start the RPC: the response will be sent automatically.
  233. let get = self.client.get(.with { $0.text = "Hello!" })
  234. // Check the response values.
  235. XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
  236. XCTAssertTrue(try get.status.map { $0.isOk }.wait())
  237. }
  238. func testBidirectionalResponseOrdering() {
  239. // Create a response stream for the RPC we want to make.
  240. let updateResponseStream = self.client.makeUpdateResponseStream()
  241. // We can queue up responses *before* we make the RPC.
  242. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "1" }))
  243. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "2" }))
  244. // Start the RPC: the response will be sent automatically.
  245. var responses: [Echo_EchoResponse] = []
  246. let update = self.client.update { response in
  247. responses.append(response)
  248. }
  249. // We should have two responses already.
  250. XCTAssertEqual(responses.count, 2)
  251. // We can also send responses after starting the RPC.
  252. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "3" }))
  253. // And interleave requests with responses.
  254. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "a" }).wait())
  255. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "b" }).wait())
  256. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "c" }).wait())
  257. XCTAssertNoThrow(try update.sendEnd().wait())
  258. // Send the final response and close.
  259. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "4" }))
  260. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  261. // Check the response values.
  262. let expected = (1 ... 4).map { number in
  263. Echo_EchoResponse.with { $0.text = "\(number)" }
  264. }
  265. XCTAssertEqual(responses, expected)
  266. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  267. }
  268. func testBidirectionalStreamingSendAfterEnd() {
  269. // Enqueue the responses for Update.
  270. self.client.enqueueUpdateResponses([.with { $0.text = "Foo" }])
  271. // Start a call.
  272. let update = self.client.update { response in
  273. XCTAssertEqual(response, .with { $0.text = "Foo" })
  274. }
  275. // Since the RPC has already completed (the status promise has been fulfilled), send will fail.
  276. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Kaboom!" }).wait())
  277. XCTAssertThrowsError(try update.sendEnd().wait())
  278. // The call completed *before* we tried to send "Kaboom!".
  279. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  280. }
  281. func testBidirectionalWithCustomInitialMetadata() {
  282. // Create a response stream for the RPC we want to make.
  283. let updateResponseStream = self.client.makeUpdateResponseStream()
  284. // Send back some initial metadata, response, and trailers.
  285. XCTAssertNoThrow(try updateResponseStream.sendInitialMetadata(["foo": "bar"]))
  286. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  287. XCTAssertNoThrow(try updateResponseStream.sendEnd(trailingMetadata: ["bar": "baz"]))
  288. // Start the RPC. We only expect one response so we'll validate it in the handler.
  289. let update = self.client.update { response in
  290. XCTAssertEqual(response, .with { $0.text = "foo" })
  291. }
  292. // Check the rest of the response part values.
  293. XCTAssertEqual(try update.initialMetadata.wait(), ["foo": "bar"])
  294. XCTAssertEqual(try update.trailingMetadata.wait(), ["bar": "baz"])
  295. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  296. }
  297. func testWriteAfterEndFails() {
  298. // Create a response stream for the RPC we want to make.
  299. let updateResponseStream = self.client.makeUpdateResponseStream()
  300. // Start the RPC.
  301. let update = self.client.update { response in
  302. XCTFail("Unexpected response: \(response)")
  303. }
  304. // Send a message and end.
  305. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "1" }).wait())
  306. XCTAssertNoThrow(try update.sendEnd().wait())
  307. // Send another message, the write should fail.
  308. XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Too late!" }).wait()) { error in
  309. XCTAssertEqual(error as? ChannelError, .ioOnClosedChannel)
  310. }
  311. // Send close from the server.
  312. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  313. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  314. }
  315. func testWeGetAllRequestParts() {
  316. var requestParts: [FakeRequestPart<Echo_EchoRequest>] = []
  317. let updateResponseStream = self.client.makeUpdateResponseStream { request in
  318. requestParts.append(request)
  319. }
  320. let update = self.client.update(callOptions: CallOptions(customMetadata: ["foo": "bar"])) {
  321. XCTFail("Unexpected response: \($0)")
  322. }
  323. update.sendMessage(.with { $0.text = "foo" }, promise: nil)
  324. update.sendEnd(promise: nil)
  325. // These should be ignored since we've already sent end.
  326. update.sendMessage(.with { $0.text = "bar" }, promise: nil)
  327. update.sendEnd(promise: nil)
  328. // Check the expected request parts.
  329. XCTAssertEqual(
  330. requestParts,
  331. [
  332. .metadata(["foo": "bar"]),
  333. .message(.with { $0.text = "foo" }),
  334. .end,
  335. ]
  336. )
  337. // Send close from the server.
  338. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  339. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  340. }
  341. func testInitialMetadataIsSentAutomatically() {
  342. let updateResponseStream = self.client.makeUpdateResponseStream()
  343. let update = self.client.update { response in
  344. XCTAssertEqual(response, .with { $0.text = "foo" })
  345. }
  346. // Send a message and end. Initial metadata is explicitly not set but will be sent on our
  347. // behalf. It will be empty.
  348. XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
  349. XCTAssertNoThrow(try updateResponseStream.sendEnd())
  350. // Metadata should be empty.
  351. XCTAssertEqual(try update.initialMetadata.wait(), [:])
  352. XCTAssertTrue(try update.status.map { $0.isOk }.wait())
  353. }
  354. func testMissingResponseStream() {
  355. // If no response stream is created for a call then it will fail with status code 'unavailable'.
  356. let get = self.client.get(.with { $0.text = "Uh oh!" })
  357. XCTAssertEqual(try get.status.map { $0.code }.wait(), .unavailable)
  358. XCTAssertThrowsError(try get.response.wait()) { error in
  359. guard let status = error as? GRPCStatus else {
  360. XCTFail("Expected a GRPCStatus, had the error was: \(error)")
  361. return
  362. }
  363. XCTAssertEqual(status.code, .unavailable)
  364. }
  365. }
  366. }