| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import EchoImplementation
- import EchoModel
- import GRPC
- import NIO
- import XCTest
- class FakeResponseStreamExampleTests: GRPCTestCase {
- var client: Echo_EchoTestClient!
- override func setUp() {
- super.setUp()
- self.client = Echo_EchoTestClient(defaultCallOptions: self.callOptionsWithLogger)
- }
- func testUnary() {
- // Enqueue a Get response. This will be dequeued when the client makes the next Get RPC.
- //
- // We can also use a response stream, see 'testUnaryResponseStream'.
- self.client.enqueueGetResponse(.with { $0.text = "Bar!" })
- // Start the Get RPC.
- let get = self.client.get(.with { $0.text = "Foo!" })
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasGetResponsesRemaining)
- // Check the response values:
- XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
- XCTAssertTrue(try get.status.map { $0.isOk }.wait())
- }
- func testUnaryResponseStream() {
- // Enqueue a Get response stream. This will be used for the next Get RPC and we can choose when
- // to send responses on it.
- let stream = self.client.makeGetResponseStream()
- // Start the Get RPC.
- let get = self.client.get(.with { $0.text = "Foo!" })
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasGetResponsesRemaining)
- // Now send the response on the stream we made.
- XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Bar!" }))
- // Check the response values:
- XCTAssertEqual(try get.response.wait(), .with { $0.text = "Bar!" })
- XCTAssertTrue(try get.status.map { $0.isOk }.wait())
- }
- func testClientStreaming() {
- // Enqueue a Collect response. This will be dequeued when the client makes the next Collect RPC.
- //
- // We can also use a response stream, see 'testClientStreamingResponseStream'.
- self.client.enqueueCollectResponse(.with { $0.text = "Foo" })
- // Start the Collect RPC.
- let collect = self.client.collect()
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasCollectResponsesRemaining)
- XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
- XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
- }
- func testClientStreamingResponseStream() {
- // Enqueue a Collect response stream. This will be used for the next Collect RPC and we can choose when
- // to send responses on it.
- let stream = self.client.makeCollectResponseStream()
- // Start the Collect RPC.
- let collect = self.client.collect()
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasCollectResponsesRemaining)
- // Send the response on the stream we made.
- XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = "Foo" }))
- XCTAssertEqual(try collect.response.wait(), .with { $0.text = "Foo" })
- XCTAssertTrue(try collect.status.map { $0.isOk }.wait())
- }
- func testServerStreaming() {
- // Enqueue some Expand responses. These will be dequeued when the client makes the next Expand RPC.
- //
- // We can also use a response stream, see 'testServerStreamingResponseStream'.
- let fooBarBaz = ["Foo", "Bar", "Baz"]
- self.client.enqueueExpandResponses(fooBarBaz.map { text in .with { $0.text = text } })
- // Start the 'Expand' RPC. We'll create a handler which records responses.
- //
- // Note that in normal applications this wouldn't be thread-safe since the response handler is
- // executed on a different thread; for the test client the calling thread is thread is the same
- // as the tread on which the RPC is called, i.e. this thread.
- var responses: [String] = []
- let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
- responses.append(response.text)
- }
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasExpandResponsesRemaining)
- XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
- XCTAssertEqual(responses, fooBarBaz)
- }
- func testServerStreamingResponseStream() {
- // Enqueue an Expand response stream. This will be used for the next Expand RPC and we can choose
- // when to send responses on it.
- let stream = self.client.makeExpandResponseStream()
- // Start the 'Expand' RPC. We'll create a handler which records responses.
- //
- // Note that in normal applications this wouldn't be thread-safe since the response handler is
- // executed on a different thread; for the test client the calling thread is thread is the same
- // as the tread on which the RPC is called, i.e. this thread.
- var responses: [String] = []
- let expand = self.client.expand(.with { $0.text = "Hello!" }) { response in
- responses.append(response.text)
- }
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasExpandResponsesRemaining)
- // Send some responses.
- let fooBarBaz = ["Foo", "Bar", "Baz"]
- for message in fooBarBaz {
- XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
- }
- // Close the stream.
- XCTAssertNoThrow(try stream.sendEnd())
- XCTAssertTrue(try expand.status.map { $0.isOk }.wait())
- XCTAssertEqual(responses, fooBarBaz)
- }
- func testBidirectionalStreaming() {
- // Enqueue some Update responses. These will be dequeued when the client makes the next Update RPC.
- //
- // We can also use a response stream, see 'testBidirectionalStreamingResponseStream'.
- let fooBarBaz = ["Foo", "Bar", "Baz"]
- self.client.enqueueUpdateResponses(fooBarBaz.map { text in .with { $0.text = text } })
- // Start the 'Update' RPC. We'll create a handler which records responses.
- //
- // Note that in normal applications this wouldn't be thread-safe since the response handler is
- // executed on a different thread; for the test client the calling thread is thread is the same
- // as the tread on which the RPC is called, i.e. this thread.
- var responses: [String] = []
- let update = self.client.update { response in
- responses.append(response.text)
- }
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- XCTAssertEqual(responses, fooBarBaz)
- }
- func testBidirectionalStreamingResponseStream() {
- // Enqueue an Update response stream. This will be used for the next Update RPC and we can choose
- // when to send responses on it.
- let stream = self.client.makeUpdateResponseStream()
- // Start the 'Update' RPC. We'll create a handler which records responses.
- //
- // Note that in normal applications this wouldn't be thread-safe since the response handler is
- // executed on a different thread; for the test client the calling thread is thread is the same
- // as the tread on which the RPC is called, i.e. this thread.
- var responses: [String] = []
- let update = self.client.update { response in
- responses.append(response.text)
- }
- // The response stream has been consumed by the call.
- XCTAssertFalse(self.client.hasUpdateResponsesRemaining)
- // Send some responses.
- let fooBarBaz = ["Foo", "Bar", "Baz"]
- for message in fooBarBaz {
- XCTAssertNoThrow(try stream.sendMessage(.with { $0.text = message }))
- }
- // Close the stream.
- XCTAssertNoThrow(try stream.sendEnd())
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- XCTAssertEqual(responses, fooBarBaz)
- }
- }
- // These tests demonstrate the finer grained control enabled by the response streams.
- extension FakeResponseStreamExampleTests {
- func testUnaryWithTrailingMetadata() {
- // Create a response stream for the RPC.
- let getResponseStream = self.client.makeGetResponseStream()
- // Send the request.
- let get = self.client.get(.with { $0.text = "Hello!" })
- // Send the response as well as some trailing metadata.
- XCTAssertNoThrow(try getResponseStream.sendMessage(
- .with { $0.text = "Goodbye!" },
- trailingMetadata: ["bar": "baz"]
- ))
- // Check the response values:
- XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
- XCTAssertEqual(try get.trailingMetadata.wait(), ["bar": "baz"])
- XCTAssertTrue(try get.status.map { $0.isOk }.wait())
- }
- func testUnaryError() {
- // Create a response stream for the RPC.
- let getResponseStream = self.client.makeGetResponseStream()
- // Send the request.
- let get = self.client.get(.with { $0.text = "Hello!" })
- // Respond with an error. We could send trailing metadata here as well.
- struct DummyError: Error {}
- XCTAssertNoThrow(try getResponseStream.sendError(DummyError()))
- // Check the response values:
- XCTAssertThrowsError(try get.response.wait()) { error in
- XCTAssertTrue(error is DummyError)
- }
- // We sent a dummy error; we could have sent a `GRPCStatus` error in which case we could assert
- // for equality here.
- XCTAssertFalse(try get.status.map { $0.isOk }.wait())
- }
- func testUnaryWithRequestHandler() {
- // Create a response stream for the RPC we want to make, we'll specify a *request* handler as well.
- let getResponseStream = self.client.makeGetResponseStream { requestPart in
- switch requestPart {
- case let .metadata(headers):
- XCTAssertTrue(headers.contains(name: "a-test-key"))
- case let .message(request):
- XCTAssertEqual(request, .with { $0.text = "Hello!" })
- case .end:
- ()
- }
- }
- // We'll send some custom metadata for the call as well. It will be validated above.
- let callOptions = CallOptions(customMetadata: ["a-test-key": "a test value"])
- let get = self.client.get(.with { $0.text = "Hello!" }, callOptions: callOptions)
- // Send the response.
- XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
- XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
- XCTAssertTrue(try get.status.map { $0.isOk }.wait())
- }
- func testUnaryResponseOrdering() {
- // Create a response stream for the RPC we want to make.
- let getResponseStream = self.client.makeGetResponseStream()
- // We can queue up the response *before* we make the RPC.
- XCTAssertNoThrow(try getResponseStream.sendMessage(.with { $0.text = "Goodbye!" }))
- // Start the RPC: the response will be sent automatically.
- let get = self.client.get(.with { $0.text = "Hello!" })
- // Check the response values.
- XCTAssertEqual(try get.response.wait(), .with { $0.text = "Goodbye!" })
- XCTAssertTrue(try get.status.map { $0.isOk }.wait())
- }
- func testBidirectionalResponseOrdering() {
- // Create a response stream for the RPC we want to make.
- let updateResponseStream = self.client.makeUpdateResponseStream()
- // We can queue up responses *before* we make the RPC.
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "1" }))
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "2" }))
- // Start the RPC: the response will be sent automatically.
- var responses: [Echo_EchoResponse] = []
- let update = self.client.update { response in
- responses.append(response)
- }
- // We should have two responses already.
- XCTAssertEqual(responses.count, 2)
- // We can also send responses after starting the RPC.
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "3" }))
- // And interleave requests with responses.
- XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "a" }).wait())
- XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "b" }).wait())
- XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "c" }).wait())
- XCTAssertNoThrow(try update.sendEnd().wait())
- // Send the final response and close.
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "4" }))
- XCTAssertNoThrow(try updateResponseStream.sendEnd())
- // Check the response values.
- let expected = (1 ... 4).map { number in
- Echo_EchoResponse.with { $0.text = "\(number)" }
- }
- XCTAssertEqual(responses, expected)
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testBidirectionalStreamingSendAfterEnd() {
- // Enqueue the responses for Update.
- self.client.enqueueUpdateResponses([.with { $0.text = "Foo" }])
- // Start a call.
- let update = self.client.update { response in
- XCTAssertEqual(response, .with { $0.text = "Foo" })
- }
- // Since the RPC has already completed (the status promise has been fulfilled), send will fail.
- XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Kaboom!" }).wait())
- XCTAssertThrowsError(try update.sendEnd().wait())
- // The call completed *before* we tried to send "Kaboom!".
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testBidirectionalWithCustomInitialMetadata() {
- // Create a response stream for the RPC we want to make.
- let updateResponseStream = self.client.makeUpdateResponseStream()
- // Send back some initial metadata, response, and trailers.
- XCTAssertNoThrow(try updateResponseStream.sendInitialMetadata(["foo": "bar"]))
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
- XCTAssertNoThrow(try updateResponseStream.sendEnd(trailingMetadata: ["bar": "baz"]))
- // Start the RPC. We only expect one response so we'll validate it in the handler.
- let update = self.client.update { response in
- XCTAssertEqual(response, .with { $0.text = "foo" })
- }
- // Check the rest of the response part values.
- XCTAssertEqual(try update.initialMetadata.wait(), ["foo": "bar"])
- XCTAssertEqual(try update.trailingMetadata.wait(), ["bar": "baz"])
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testWriteAfterEndFails() {
- // Create a response stream for the RPC we want to make.
- let updateResponseStream = self.client.makeUpdateResponseStream()
- // Start the RPC.
- let update = self.client.update { response in
- XCTFail("Unexpected response: \(response)")
- }
- // Send a message and end.
- XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "1" }).wait())
- XCTAssertNoThrow(try update.sendEnd().wait())
- // Send another message, the write should fail.
- XCTAssertThrowsError(try update.sendMessage(.with { $0.text = "Too late!" }).wait()) { error in
- XCTAssertEqual(error as? ChannelError, .ioOnClosedChannel)
- }
- // Send close from the server.
- XCTAssertNoThrow(try updateResponseStream.sendEnd())
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testWeGetAllRequestParts() {
- var requestParts: [FakeRequestPart<Echo_EchoRequest>] = []
- let updateResponseStream = self.client.makeUpdateResponseStream { request in
- requestParts.append(request)
- }
- let update = self.client.update(callOptions: CallOptions(customMetadata: ["foo": "bar"])) {
- XCTFail("Unexpected response: \($0)")
- }
- update.sendMessage(.with { $0.text = "foo" }, promise: nil)
- update.sendEnd(promise: nil)
- // These should be ignored since we've already sent end.
- update.sendMessage(.with { $0.text = "bar" }, promise: nil)
- update.sendEnd(promise: nil)
- // Check the expected request parts.
- XCTAssertEqual(requestParts, [
- .metadata(["foo": "bar"]),
- .message(.with { $0.text = "foo" }),
- .end,
- ])
- // Send close from the server.
- XCTAssertNoThrow(try updateResponseStream.sendEnd())
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testInitialMetadataIsSentAutomatically() {
- let updateResponseStream = self.client.makeUpdateResponseStream()
- let update = self.client.update { response in
- XCTAssertEqual(response, .with { $0.text = "foo" })
- }
- // Send a message and end. Initial metadata is explicitly not set but will be sent on our
- // behalf. It will be empty.
- XCTAssertNoThrow(try updateResponseStream.sendMessage(.with { $0.text = "foo" }))
- XCTAssertNoThrow(try updateResponseStream.sendEnd())
- // Metadata should be empty.
- XCTAssertEqual(try update.initialMetadata.wait(), [:])
- XCTAssertTrue(try update.status.map { $0.isOk }.wait())
- }
- func testMissingResponseStream() {
- // If no response stream is created for a call then it will fail with status code 'unavailable'.
- let get = self.client.get(.with { $0.text = "Uh oh!" })
- XCTAssertEqual(try get.status.map { $0.code }.wait(), .unavailable)
- XCTAssertThrowsError(try get.response.wait()) { error in
- guard let status = error as? GRPCStatus else {
- XCTFail("Expected a GRPCStatus, had the error was: \(error)")
- return
- }
- XCTAssertEqual(status.code, .unavailable)
- }
- }
- }
|