/* * Copyright 2020, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import GRPC import NIOCore import NIOPosix import XCTest // These tests demonstrate how to use gRPC to create a service provider using your own payload type, // or alternatively, how to avoid deserialization and just extract the raw bytes from a payload. class GRPCCustomPayloadTests: GRPCTestCase { var group: EventLoopGroup! var server: Server! var client: GRPCAnyServiceClient! override func setUp() { super.setUp() self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.server = try! Server.insecure(group: self.group) .withServiceProviders([CustomPayloadProvider()]) .withLogger(self.serverLogger) .bind(host: "localhost", port: 0) .wait() let channel = ClientConnection.insecure(group: self.group) .withBackgroundActivityLogger(self.clientLogger) .connect(host: "localhost", port: self.server.channel.localAddress!.port!) self.client = GRPCAnyServiceClient( channel: channel, defaultCallOptions: self.callOptionsWithLogger ) } override func tearDown() { XCTAssertNoThrow(try self.server.close().wait()) XCTAssertNoThrow(try self.client.channel.close().wait()) XCTAssertNoThrow(try self.group.syncShutdownGracefully()) super.tearDown() } func testCustomPayload() throws { // This test demonstrates how to call a manually created bidirectional RPC with custom payloads. let statusExpectation = self.expectation(description: "status received") var responses: [CustomPayload] = [] // Make a bidirectional stream using `CustomPayload` as the request and response type. // The service defined below is called "CustomPayload", and the method we call on it // is "AddOneAndReverseMessage" let rpc: BidirectionalStreamingCall = self.client .makeBidirectionalStreamingCall( path: "/CustomPayload/AddOneAndReverseMessage", handler: { responses.append($0) } ) // Make and send some requests: let requests: [CustomPayload] = [ CustomPayload(message: "one", number: .random(in: Int64.min ..< Int64.max)), CustomPayload(message: "two", number: .random(in: Int64.min ..< Int64.max)), CustomPayload(message: "three", number: .random(in: Int64.min ..< Int64.max)), ] rpc.sendMessages(requests, promise: nil) rpc.sendEnd(promise: nil) // Wait for the RPC to finish before comparing responses. rpc.status.map { $0.code }.assertEqual(.ok, fulfill: statusExpectation) self.wait(for: [statusExpectation], timeout: 1.0) // Are the responses as expected? let expected = requests.map { request in CustomPayload(message: String(request.message.reversed()), number: request.number + 1) } XCTAssertEqual(responses, expected) } func testNoDeserializationOnTheClient() throws { // This test demonstrates how to skip the deserialization step on the client. It isn't necessary // to use a custom service provider to do this, although we do here. let statusExpectation = self.expectation(description: "status received") var responses: [IdentityPayload] = [] // Here we use `IdentityPayload` for our response type: we define it below such that it does // not deserialize the bytes provided to it by gRPC. let rpc: BidirectionalStreamingCall = self.client .makeBidirectionalStreamingCall( path: "/CustomPayload/AddOneAndReverseMessage", handler: { responses.append($0) } ) let request = CustomPayload(message: "message", number: 42) rpc.sendMessage(request, promise: nil) rpc.sendEnd(promise: nil) // Wait for the RPC to finish before comparing responses. rpc.status.map { $0.code }.assertEqual(.ok, fulfill: statusExpectation) self.wait(for: [statusExpectation], timeout: 1.0) guard var response = responses.first?.buffer else { XCTFail("RPC completed without a response") return } // We just took the raw bytes from the payload: we can still decode it because we know the // server returned a serialized `CustomPayload`. let actual = try CustomPayload(serializedByteBuffer: &response) XCTAssertEqual(actual.message, "egassem") XCTAssertEqual(actual.number, 43) } func testCustomPayloadUnary() throws { let rpc: UnaryCall = self.client.makeUnaryCall( path: "/CustomPayload/Reverse", request: StringPayload(message: "foobarbaz") ) XCTAssertEqual(try rpc.response.map { $0.message }.wait(), "zabraboof") XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok) } func testCustomPayloadClientStreaming() throws { let rpc: ClientStreamingCall = self.client .makeClientStreamingCall(path: "/CustomPayload/ReverseThenJoin") rpc.sendMessages(["foo", "bar", "baz"].map(StringPayload.init(message:)), promise: nil) rpc.sendEnd(promise: nil) XCTAssertEqual(try rpc.response.map { $0.message }.wait(), "baz bar foo") XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok) } func testCustomPayloadServerStreaming() throws { let message = "abc" var expectedIterator = message.reversed().makeIterator() let rpc: ServerStreamingCall = self.client .makeServerStreamingCall( path: "/CustomPayload/ReverseThenSplit", request: StringPayload(message: message) ) { response in if let next = expectedIterator.next() { XCTAssertEqual(String(next), response.message) } else { XCTFail("Unexpected message: \(response.message)") } } XCTAssertEqual(try rpc.status.map { $0.code }.wait(), .ok) } } // MARK: Custom Payload Service private class CustomPayloadProvider: CallHandlerProvider { var serviceName: Substring = "CustomPayload" fileprivate func reverseString( request: StringPayload, context: StatusOnlyCallContext ) -> EventLoopFuture { let reversed = StringPayload(message: String(request.message.reversed())) return context.eventLoop.makeSucceededFuture(reversed) } fileprivate func reverseThenJoin( context: UnaryResponseCallContext ) -> EventLoopFuture<(StreamEvent) -> Void> { var messages: [String] = [] return context.eventLoop.makeSucceededFuture({ event in switch event { case let .message(request): messages.append(request.message) case .end: let response = messages.reversed().joined(separator: " ") context.responsePromise.succeed(StringPayload(message: response)) } }) } fileprivate func reverseThenSplit( request: StringPayload, context: StreamingResponseCallContext ) -> EventLoopFuture { let responses = request.message.reversed().map { context.sendResponse(StringPayload(message: String($0))) } return EventLoopFuture.andAllSucceed(responses, on: context.eventLoop).map { .ok } } // Bidirectional RPC which returns a new `CustomPayload` for each `CustomPayload` received. // The returned payloads have their `message` reversed and their `number` incremented by one. fileprivate func addOneAndReverseMessage( context: StreamingResponseCallContext ) -> EventLoopFuture<(StreamEvent) -> Void> { return context.eventLoop.makeSucceededFuture({ event in switch event { case let .message(payload): let response = CustomPayload( message: String(payload.message.reversed()), number: payload.number + 1 ) _ = context.sendResponse(response) case .end: context.statusPromise.succeed(.ok) } }) } func handle(method name: Substring, context: CallHandlerContext) -> GRPCServerHandlerProtocol? { switch name { case "Reverse": return UnaryServerHandler( context: context, requestDeserializer: GRPCPayloadDeserializer(), responseSerializer: GRPCPayloadSerializer(), interceptors: [], userFunction: self.reverseString(request:context:) ) case "ReverseThenJoin": return ClientStreamingServerHandler( context: context, requestDeserializer: GRPCPayloadDeserializer(), responseSerializer: GRPCPayloadSerializer(), interceptors: [], observerFactory: self.reverseThenJoin(context:) ) case "ReverseThenSplit": return ServerStreamingServerHandler( context: context, requestDeserializer: GRPCPayloadDeserializer(), responseSerializer: GRPCPayloadSerializer(), interceptors: [], userFunction: self.reverseThenSplit(request:context:) ) case "AddOneAndReverseMessage": return BidirectionalStreamingServerHandler( context: context, requestDeserializer: GRPCPayloadDeserializer(), responseSerializer: GRPCPayloadSerializer(), interceptors: [], observerFactory: self.addOneAndReverseMessage(context:) ) default: return nil } } } private struct IdentityPayload: GRPCPayload { var buffer: ByteBuffer init(serializedByteBuffer: inout ByteBuffer) throws { self.buffer = serializedByteBuffer } func serialize(into buffer: inout ByteBuffer) throws { // This will never be called, however, it could be implemented as a direct copy of the bytes // we hold, e.g.: // // var copy = self.buffer // buffer.writeBuffer(©) fatalError("Unimplemented") } } /// A toy custom payload which holds a `String` and an `Int64`. /// /// The payload is serialized as: /// - the `UInt32` encoded length of the message, /// - the UTF-8 encoded bytes of the message, and /// - the `Int64` bytes of the number. private struct CustomPayload: GRPCPayload, Equatable { var message: String var number: Int64 init(message: String, number: Int64) { self.message = message self.number = number } init(serializedByteBuffer: inout ByteBuffer) throws { guard let messageLength = serializedByteBuffer.readInteger(as: UInt32.self), let message = serializedByteBuffer.readString(length: Int(messageLength)), let number = serializedByteBuffer.readInteger(as: Int64.self) else { throw GRPCError.DeserializationFailure() } self.message = message self.number = number } func serialize(into buffer: inout ByteBuffer) throws { buffer.writeInteger(UInt32(self.message.count)) buffer.writeString(self.message) buffer.writeInteger(self.number) } } private struct StringPayload: GRPCPayload { var message: String init(message: String) { self.message = message } init(serializedByteBuffer: inout ByteBuffer) throws { self.message = serializedByteBuffer.readString(length: serializedByteBuffer.readableBytes)! } func serialize(into buffer: inout ByteBuffer) throws { buffer.writeString(self.message) } }