| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPC
- import NIOCore
- import NIOPosix
- import XCTest
- // These tests demonstrate how to use gRPC to create a service provider using your own payload type,
- // or alternatively, how to avoid deserialization and just extract the raw bytes from a payload.
- class GRPCCustomPayloadTests: GRPCTestCase {
- var group: EventLoopGroup!
- var server: Server!
- var client: GRPCAnyServiceClient!
- override func setUp() {
- super.setUp()
- self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
- self.server = try! Server.insecure(group: self.group)
- .withServiceProviders([CustomPayloadProvider()])
- .withLogger(self.serverLogger)
- .bind(host: "localhost", port: 0)
- .wait()
- let channel = ClientConnection.insecure(group: self.group)
- .withBackgroundActivityLogger(self.clientLogger)
- .connect(host: "localhost", port: self.server.channel.localAddress!.port!)
- self.client = GRPCAnyServiceClient(
- channel: channel,
- defaultCallOptions: self.callOptionsWithLogger
- )
- }
- override func tearDown() {
- XCTAssertNoThrow(try self.server.close().wait())
- XCTAssertNoThrow(try self.client.channel.close().wait())
- XCTAssertNoThrow(try self.group.syncShutdownGracefully())
- super.tearDown()
- }
- func testCustomPayload() throws {
- // This test demonstrates how to call a manually created bidirectional RPC with custom payloads.
- let statusExpectation = self.expectation(description: "status received")
- var responses: [CustomPayload] = []
- // Make a bidirectional stream using `CustomPayload` as the request and response type.
- // The service defined below is called "CustomPayload", and the method we call on it
- // is "AddOneAndReverseMessage"
- let rpc: BidirectionalStreamingCall<CustomPayload, CustomPayload> = self.client
- .makeBidirectionalStreamingCall(
- path: "/CustomPayload/AddOneAndReverseMessage",
- handler: { responses.append($0) }
- )
- // Make and send some requests:
- let requests: [CustomPayload] = [
- CustomPayload(message: "one", number: .random(in: Int64.min ..< Int64.max)),
- CustomPayload(message: "two", number: .random(in: Int64.min ..< Int64.max)),
- CustomPayload(message: "three", number: .random(in: Int64.min ..< Int64.max)),
- ]
- rpc.sendMessages(requests, promise: nil)
- rpc.sendEnd(promise: nil)
- // Wait for the RPC to finish before comparing responses.
- rpc.status.map { $0.code }.assertEqual(.ok, fulfill: statusExpectation)
- self.wait(for: [statusExpectation], timeout: 1.0)
- // Are the responses as expected?
- let expected = requests.map { request in
- CustomPayload(message: String(request.message.reversed()), number: request.number + 1)
- }
- XCTAssertEqual(responses, expected)
- }
- func testNoDeserializationOnTheClient() throws {
- // This test demonstrates how to skip the deserialization step on the client. It isn't necessary
- // to use a custom service provider to do this, although we do here.
- let statusExpectation = self.expectation(description: "status received")
- var responses: [IdentityPayload] = []
- // Here we use `IdentityPayload` for our response type: we define it below such that it does
- // not deserialize the bytes provided to it by gRPC.
- let rpc: BidirectionalStreamingCall<CustomPayload, IdentityPayload> = self.client
- .makeBidirectionalStreamingCall(
- path: "/CustomPayload/AddOneAndReverseMessage",
- handler: { responses.append($0) }
- )
- let request = CustomPayload(message: "message", number: 42)
- rpc.sendMessage(request, promise: nil)
- rpc.sendEnd(promise: nil)
- // Wait for the RPC to finish before comparing responses.
- rpc.status.map { $0.code }.assertEqual(.ok, fulfill: statusExpectation)
- self.wait(for: [statusExpectation], timeout: 1.0)
- guard var response = responses.first?.buffer else {
- XCTFail("RPC completed without a response")
- return
- }
- // We just took the raw bytes from the payload: we can still decode it because we know the
- // server returned a serialized `CustomPayload`.
- let actual = try CustomPayload(serializedByteBuffer: &response)
- XCTAssertEqual(actual.message, "egassem")
- XCTAssertEqual(actual.number, 43)
- }
- func testCustomPayloadUnary() throws {
- let rpc: UnaryCall<StringPayload, StringPayload> = self.client.makeUnaryCall(
- path: "/CustomPayload/Reverse",
- request: StringPayload(message: "foobarbaz")
- )
- XCTAssertEqual(try rpc.response.map { $0.message }.wait(), "zabraboof")
- XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok)
- }
- func testCustomPayloadClientStreaming() throws {
- let rpc: ClientStreamingCall<StringPayload, StringPayload> = self.client
- .makeClientStreamingCall(path: "/CustomPayload/ReverseThenJoin")
- rpc.sendMessages(["foo", "bar", "baz"].map(StringPayload.init(message:)), promise: nil)
- rpc.sendEnd(promise: nil)
- XCTAssertEqual(try rpc.response.map { $0.message }.wait(), "baz bar foo")
- XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok)
- }
- func testCustomPayloadServerStreaming() throws {
- let message = "abc"
- var expectedIterator = message.reversed().makeIterator()
- let rpc: ServerStreamingCall<StringPayload, StringPayload> = self.client
- .makeServerStreamingCall(
- path: "/CustomPayload/ReverseThenSplit",
- request: StringPayload(message: message)
- ) { response in
- if let next = expectedIterator.next() {
- XCTAssertEqual(String(next), response.message)
- } else {
- XCTFail("Unexpected message: \(response.message)")
- }
- }
- XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok)
- }
- }
- // MARK: Custom Payload Service
- private class CustomPayloadProvider: CallHandlerProvider {
- var serviceName: Substring = "CustomPayload"
- fileprivate func reverseString(
- request: StringPayload,
- context: StatusOnlyCallContext
- ) -> EventLoopFuture<StringPayload> {
- let reversed = StringPayload(message: String(request.message.reversed()))
- return context.eventLoop.makeSucceededFuture(reversed)
- }
- fileprivate func reverseThenJoin(
- context: UnaryResponseCallContext<StringPayload>
- ) -> EventLoopFuture<(StreamEvent<StringPayload>) -> Void> {
- var messages: [String] = []
- return context.eventLoop.makeSucceededFuture({ event in
- switch event {
- case let .message(request):
- messages.append(request.message)
- case .end:
- let response = messages.reversed().joined(separator: " ")
- context.responsePromise.succeed(StringPayload(message: response))
- }
- })
- }
- fileprivate func reverseThenSplit(
- request: StringPayload,
- context: StreamingResponseCallContext<StringPayload>
- ) -> EventLoopFuture<GRPCStatus> {
- let responses = request.message.reversed().map {
- context.sendResponse(StringPayload(message: String($0)))
- }
- return EventLoopFuture.andAllSucceed(responses, on: context.eventLoop).map { .ok }
- }
- // Bidirectional RPC which returns a new `CustomPayload` for each `CustomPayload` received.
- // The returned payloads have their `message` reversed and their `number` incremented by one.
- fileprivate func addOneAndReverseMessage(
- context: StreamingResponseCallContext<CustomPayload>
- ) -> EventLoopFuture<(StreamEvent<CustomPayload>) -> Void> {
- return context.eventLoop.makeSucceededFuture({ event in
- switch event {
- case let .message(payload):
- let response = CustomPayload(
- message: String(payload.message.reversed()),
- number: payload.number + 1
- )
- _ = context.sendResponse(response)
- case .end:
- context.statusPromise.succeed(.ok)
- }
- })
- }
- func handle(method name: Substring, context: CallHandlerContext) -> GRPCServerHandlerProtocol? {
- switch name {
- case "Reverse":
- return UnaryServerHandler(
- context: context,
- requestDeserializer: GRPCPayloadDeserializer<StringPayload>(),
- responseSerializer: GRPCPayloadSerializer<StringPayload>(),
- interceptors: [],
- userFunction: self.reverseString(request:context:)
- )
- case "ReverseThenJoin":
- return ClientStreamingServerHandler(
- context: context,
- requestDeserializer: GRPCPayloadDeserializer<StringPayload>(),
- responseSerializer: GRPCPayloadSerializer<StringPayload>(),
- interceptors: [],
- observerFactory: self.reverseThenJoin(context:)
- )
- case "ReverseThenSplit":
- return ServerStreamingServerHandler(
- context: context,
- requestDeserializer: GRPCPayloadDeserializer<StringPayload>(),
- responseSerializer: GRPCPayloadSerializer<StringPayload>(),
- interceptors: [],
- userFunction: self.reverseThenSplit(request:context:)
- )
- case "AddOneAndReverseMessage":
- return BidirectionalStreamingServerHandler(
- context: context,
- requestDeserializer: GRPCPayloadDeserializer<CustomPayload>(),
- responseSerializer: GRPCPayloadSerializer<CustomPayload>(),
- interceptors: [],
- observerFactory: self.addOneAndReverseMessage(context:)
- )
- default:
- return nil
- }
- }
- }
- private struct IdentityPayload: GRPCPayload {
- var buffer: ByteBuffer
- init(serializedByteBuffer: inout ByteBuffer) throws {
- self.buffer = serializedByteBuffer
- }
- func serialize(into buffer: inout ByteBuffer) throws {
- // This will never be called, however, it could be implemented as a direct copy of the bytes
- // we hold, e.g.:
- //
- // var copy = self.buffer
- // buffer.writeBuffer(©)
- fatalError("Unimplemented")
- }
- }
- /// A toy custom payload which holds a `String` and an `Int64`.
- ///
- /// The payload is serialized as:
- /// - the `UInt32` encoded length of the message,
- /// - the UTF-8 encoded bytes of the message, and
- /// - the `Int64` bytes of the number.
- private struct CustomPayload: GRPCPayload, Equatable {
- var message: String
- var number: Int64
- init(message: String, number: Int64) {
- self.message = message
- self.number = number
- }
- init(serializedByteBuffer: inout ByteBuffer) throws {
- guard let messageLength = serializedByteBuffer.readInteger(as: UInt32.self),
- let message = serializedByteBuffer.readString(length: Int(messageLength)),
- let number = serializedByteBuffer.readInteger(as: Int64.self)
- else {
- throw GRPCError.DeserializationFailure()
- }
- self.message = message
- self.number = number
- }
- func serialize(into buffer: inout ByteBuffer) throws {
- buffer.writeInteger(UInt32(self.message.count))
- buffer.writeString(self.message)
- buffer.writeInteger(self.number)
- }
- }
- private struct StringPayload: GRPCPayload {
- var message: String
- init(message: String) {
- self.message = message
- }
- init(serializedByteBuffer: inout ByteBuffer) throws {
- self.message = serializedByteBuffer.readString(length: serializedByteBuffer.readableBytes)!
- }
- func serialize(into buffer: inout ByteBuffer) throws {
- buffer.writeString(self.message)
- }
- }
|