| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- /*
- * Copyright 2024-2025, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCCore
- import GRPCInProcessTransport
- import Testing
- @Suite("InProcess transport")
- struct InProcessTransportTests {
- private static let cancellationModes = ["await-cancelled", "with-cancellation-handler"]
- @available(gRPCSwift 2.0, *)
- private func withTestServerAndClient(
- execute: (
- GRPCServer<InProcessTransport.Server>,
- GRPCClient<InProcessTransport.Client>
- ) async throws -> Void
- ) async throws {
- try await withThrowingDiscardingTaskGroup { group in
- let inProcess = InProcessTransport()
- let server = GRPCServer(transport: inProcess.server, services: [TestService()])
- group.addTask {
- try await server.serve()
- }
- let client = GRPCClient(transport: inProcess.client)
- group.addTask {
- try await client.runConnections()
- }
- try await execute(server, client)
- }
- }
- @Test("RPC cancelled by graceful shutdown", arguments: Self.cancellationModes)
- @available(gRPCSwift 2.0, *)
- func cancelledByGracefulShutdown(mode: String) async throws {
- try await self.withTestServerAndClient { server, client in
- try await client.serverStreaming(
- request: ClientRequest(message: mode),
- descriptor: .testCancellation,
- serializer: UTF8Serializer(),
- deserializer: UTF8Deserializer(),
- options: .defaults
- ) { response in
- // Got initial metadata, begin shutdown to cancel the RPC.
- server.beginGracefulShutdown()
- // Now wait for the response.
- let messages = try await response.messages.reduce(into: []) { $0.append($1) }
- #expect(messages == ["isCancelled=true"])
- }
- // Finally, shutdown the client so its runConnections() method returns.
- client.beginGracefulShutdown()
- }
- }
- @Test("Peer info")
- @available(gRPCSwift 2.0, *)
- func peerInfo() async throws {
- try await self.withTestServerAndClient { server, client in
- defer {
- client.beginGracefulShutdown()
- server.beginGracefulShutdown()
- }
- let peerInfo = try await client.unary(
- request: ClientRequest(message: ()),
- descriptor: .peerInfo,
- serializer: VoidSerializer(),
- deserializer: JSONDeserializer<PeerInfo>(),
- options: .defaults
- ) {
- try $0.message
- }
- #expect(peerInfo.local == peerInfo.remote)
- }
- }
- }
- @available(gRPCSwift 2.0, *)
- private struct TestService: RegistrableRPCService {
- func cancellation(
- request: ServerRequest<String>,
- context: ServerContext
- ) async throws -> StreamingServerResponse<String> {
- switch request.message {
- case "await-cancelled":
- return StreamingServerResponse { body in
- try await context.cancellation.cancelled
- try await body.write("isCancelled=\(context.cancellation.isCancelled)")
- return [:]
- }
- case "with-cancellation-handler":
- let signal = AsyncStream.makeStream(of: Void.self)
- return StreamingServerResponse { body in
- try await withRPCCancellationHandler {
- for await _ in signal.stream {}
- try await body.write("isCancelled=\(context.cancellation.isCancelled)")
- return [:]
- } onCancelRPC: {
- signal.continuation.finish()
- }
- }
- default:
- throw RPCError(code: .invalidArgument, message: "Invalid argument '\(request.message)'")
- }
- }
- func peerInfo(
- request: ServerRequest<Void>,
- context: ServerContext
- ) async throws -> ServerResponse<PeerInfo> {
- let peerInfo = PeerInfo(local: context.localPeer, remote: context.remotePeer)
- return ServerResponse(message: peerInfo)
- }
- func registerMethods<Transport: ServerTransport>(with router: inout RPCRouter<Transport>) {
- router.registerHandler(
- forMethod: .testCancellation,
- deserializer: UTF8Deserializer(),
- serializer: UTF8Serializer(),
- handler: {
- try await self.cancellation(request: ServerRequest(stream: $0), context: $1)
- }
- )
- router.registerHandler(
- forMethod: .peerInfo,
- deserializer: VoidDeserializer(),
- serializer: JSONSerializer<PeerInfo>(),
- handler: {
- let response = try await self.peerInfo(
- request: ServerRequest<Void>(stream: $0),
- context: $1
- )
- return StreamingServerResponse(single: response)
- }
- )
- }
- }
- @available(gRPCSwift 2.0, *)
- extension MethodDescriptor {
- fileprivate static let testCancellation = Self(
- fullyQualifiedService: "test",
- method: "cancellation"
- )
- fileprivate static let peerInfo = Self(
- fullyQualifiedService: "test",
- method: "peerInfo"
- )
- }
- private struct PeerInfo: Codable {
- var local: String
- var remote: String
- }
- @available(gRPCSwift 2.0, *)
- private struct UTF8Serializer: MessageSerializer {
- func serialize<Bytes: GRPCContiguousBytes>(_ message: String) throws -> Bytes {
- Bytes(message.utf8)
- }
- }
- @available(gRPCSwift 2.0, *)
- private struct UTF8Deserializer: MessageDeserializer {
- func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws -> String {
- serializedMessageBytes.withUnsafeBytes {
- String(decoding: $0, as: UTF8.self)
- }
- }
- }
- @available(gRPCSwift 2.0, *)
- private struct VoidSerializer: MessageSerializer {
- func serialize<Bytes: GRPCContiguousBytes>(_ message: Void) throws -> Bytes {
- Bytes(repeating: 0, count: 0)
- }
- }
- @available(gRPCSwift 2.0, *)
- private struct VoidDeserializer: MessageDeserializer {
- func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws {
- }
- }
|