| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- /*
- * Copyright 2023, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCCore
- import GRPCInProcessTransport
- import XCTest
- @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
- final class InProcessClientTransportTests: XCTestCase {
- struct FailTest: Error {}
- func testConnectWhenConnected() async {
- let client = makeClient()
- await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.connect()
- }
- group.addTask {
- try await client.connect()
- }
- await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
- try await group.next()
- } errorHandler: { error in
- XCTAssertEqual(error.code, .failedPrecondition)
- }
- group.cancelAll()
- }
- }
- func testConnectWhenClosed() async {
- let client = makeClient()
- client.beginGracefulShutdown()
- await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
- try await client.connect()
- } errorHandler: { error in
- XCTAssertEqual(error.code, .failedPrecondition)
- }
- }
- func testConnectWhenClosedAfterCancellation() async throws {
- let client = makeClient()
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.connect()
- }
- group.addTask {
- try await Task.sleep(for: .milliseconds(100))
- }
- try await group.next()
- group.cancelAll()
- await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
- try await client.connect()
- } errorHandler: { error in
- XCTAssertEqual(error.code, .failedPrecondition)
- }
- }
- }
- func testCloseWhenUnconnected() {
- let client = makeClient()
- XCTAssertNoThrow(client.beginGracefulShutdown())
- }
- func testCloseWhenClosed() {
- let client = makeClient()
- client.beginGracefulShutdown()
- XCTAssertNoThrow(client.beginGracefulShutdown())
- }
- func testConnectSuccessfullyAndThenClose() async throws {
- let client = makeClient()
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.connect()
- }
- group.addTask {
- try await Task.sleep(for: .milliseconds(100))
- }
- try await group.next()
- client.beginGracefulShutdown()
- }
- }
- func testOpenStreamWhenUnconnected() async throws {
- let client = makeClient()
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.withStream(
- descriptor: .init(service: "test", method: "test"),
- options: .defaults
- ) { _ in
- // Once the pending stream is opened, close the client to new connections,
- // so that, once this closure is executed and this stream is closed,
- // the client will return from `connect()`.
- client.beginGracefulShutdown()
- }
- }
- group.addTask {
- // Add a sleep to make sure connection happens after `withStream` has been called,
- // to test pending streams are handled correctly.
- try await Task.sleep(for: .milliseconds(100))
- try await client.connect()
- }
- try await group.waitForAll()
- }
- }
- func testOpenStreamWhenClosed() async {
- let client = makeClient()
- client.beginGracefulShutdown()
- await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
- try await client.withStream(
- descriptor: .init(service: "test", method: "test"),
- options: .defaults
- ) { _ in }
- } errorHandler: { error in
- XCTAssertEqual(error.code, .failedPrecondition)
- }
- }
- func testOpenStreamSuccessfullyAndThenClose() async throws {
- let server = InProcessServerTransport()
- let client = makeClient(server: server)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.connect()
- }
- group.addTask {
- try await client.withStream(
- descriptor: .init(service: "test", method: "test"),
- options: .defaults
- ) { stream in
- try await stream.outbound.write(.message([1]))
- stream.outbound.finish()
- let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
- XCTAssertEqual(receivedMessages, [.message([42])])
- }
- }
- group.addTask {
- try await server.listen { stream in
- let receivedMessages = try? await stream.inbound.reduce(into: []) { $0.append($1) }
- try? await stream.outbound.write(RPCResponsePart.message([42]))
- stream.outbound.finish()
- XCTAssertEqual(receivedMessages, [.message([1])])
- }
- }
- group.addTask {
- try await Task.sleep(for: .milliseconds(100))
- client.beginGracefulShutdown()
- }
- try await group.next()
- group.cancelAll()
- }
- }
- func testExecutionConfiguration() {
- let policy = HedgingPolicy(
- maximumAttempts: 10,
- hedgingDelay: .seconds(1),
- nonFatalStatusCodes: []
- )
- var serviceConfig = ServiceConfig(
- methodConfig: [
- MethodConfig(
- names: [
- MethodConfig.Name(service: "", method: "")
- ],
- executionPolicy: .hedge(policy)
- )
- ]
- )
- var client = InProcessClientTransport(
- server: InProcessServerTransport(),
- serviceConfig: serviceConfig
- )
- let firstDescriptor = MethodDescriptor(service: "test", method: "first")
- XCTAssertEqual(
- client.configuration(forMethod: firstDescriptor),
- serviceConfig.methodConfig.first
- )
- let retryPolicy = RetryPolicy(
- maximumAttempts: 10,
- initialBackoff: .seconds(1),
- maximumBackoff: .seconds(1),
- backoffMultiplier: 1.0,
- retryableStatusCodes: [.unavailable]
- )
- let overrideConfiguration = MethodConfig(
- names: [MethodConfig.Name(service: "test", method: "second")],
- executionPolicy: .retry(retryPolicy)
- )
- serviceConfig.methodConfig.append(overrideConfiguration)
- client = InProcessClientTransport(
- server: InProcessServerTransport(),
- serviceConfig: serviceConfig
- )
- let secondDescriptor = MethodDescriptor(service: "test", method: "second")
- XCTAssertEqual(
- client.configuration(forMethod: firstDescriptor),
- serviceConfig.methodConfig.first
- )
- XCTAssertEqual(
- client.configuration(forMethod: secondDescriptor),
- serviceConfig.methodConfig.last
- )
- }
- func testOpenMultipleStreamsThenClose() async throws {
- let server = InProcessServerTransport()
- let client = makeClient(server: server)
- try await withThrowingTaskGroup(of: Void.self) { group in
- group.addTask {
- try await client.connect()
- }
- group.addTask {
- try await client.withStream(
- descriptor: .init(service: "test", method: "test"),
- options: .defaults
- ) { stream in
- try await Task.sleep(for: .milliseconds(100))
- }
- }
- group.addTask {
- try await client.withStream(
- descriptor: .init(service: "test", method: "test"),
- options: .defaults
- ) { stream in
- try await Task.sleep(for: .milliseconds(100))
- }
- }
- group.addTask {
- try await Task.sleep(for: .milliseconds(50))
- client.beginGracefulShutdown()
- }
- try await group.next()
- }
- }
- func makeClient(
- server: InProcessServerTransport = InProcessServerTransport()
- ) -> InProcessClientTransport {
- let defaultPolicy = RetryPolicy(
- maximumAttempts: 10,
- initialBackoff: .seconds(1),
- maximumBackoff: .seconds(1),
- backoffMultiplier: 1.0,
- retryableStatusCodes: [.unavailable]
- )
- let serviceConfig = ServiceConfig(
- methodConfig: [
- MethodConfig(
- names: [MethodConfig.Name(service: "", method: "")],
- executionPolicy: .retry(defaultPolicy)
- )
- ]
- )
- return InProcessClientTransport(
- server: server,
- serviceConfig: serviceConfig
- )
- }
- }
|