| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- /*
- * Copyright 2021, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #if canImport(NIOSSL)
- import EchoImplementation
- import EchoModel
- import GRPC
- import GRPCSampleData
- import NIOConcurrencyHelpers
- import NIOCore
- import NIOPosix
- import NIOSSL
- import XCTest
- #if canImport(Network)
- import Network
- import NIOTransportServices
- #endif
- final class GRPCChannelPoolTests: GRPCTestCase {
- private var group: (any EventLoopGroup)!
- private var server: Server?
- private var channel: GRPCChannel?
- private var serverPort: Int? {
- return self.server?.channel.localAddress?.port
- }
- private var echo: Echo_EchoNIOClient {
- return Echo_EchoNIOClient(channel: self.channel!)
- }
- override func tearDown() {
- if let channel = self.channel {
- XCTAssertNoThrow(try channel.close().wait())
- }
- if let server = self.server {
- XCTAssertNoThrow(try server.close().wait())
- }
- XCTAssertNoThrow(try self.group.syncShutdownGracefully())
- super.tearDown()
- }
- private enum TestEventLoopGroupType {
- case multiThreadedEventLoopGroup
- case transportServicesEventLoopGroup
- }
- private func configureEventLoopGroup(
- threads: Int = System.coreCount,
- eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup
- ) {
- switch eventLoopGroupType {
- case .multiThreadedEventLoopGroup:
- self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
- case .transportServicesEventLoopGroup:
- #if canImport(Network)
- self.group = NIOTSEventLoopGroup(loopCount: threads)
- #else
- fatalError("NIOTS is not available on this platform.")
- #endif
- }
- }
- private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
- let builder: Server.Builder
- if withTLS {
- builder = Server.usingTLSBackedByNIOSSL(
- on: self.group,
- certificateChain: [SampleCertificate.server.certificate],
- privateKey: SamplePrivateKey.server
- ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
- } else {
- builder = Server.insecure(group: self.group)
- }
- return
- builder
- .withLogger(self.serverLogger)
- .withServiceProviders([EchoProvider()])
- }
- private func startServer(withTLS: Bool = false, port: Int = 0) {
- self.server = try! self.makeServerBuilder(withTLS: withTLS)
- .bind(host: "localhost", port: port)
- .wait()
- }
- private func startChannel(
- withTLS: Bool = false,
- overrideTarget targetOverride: ConnectionTarget? = nil,
- _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
- ) {
- let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
- if withTLS {
- let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
- trustRoots: .certificates([SampleCertificate.ca.certificate])
- )
- transportSecurity = .tls(configuration)
- } else {
- transportSecurity = .plaintext
- }
- self.channel = try! GRPCChannelPool.with(
- target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
- transportSecurity: transportSecurity,
- eventLoopGroup: self.group
- ) { configuration in
- configuration.backgroundActivityLogger = self.clientLogger
- configure(&configuration)
- }
- }
- private func setUpClientAndServer(
- withTLS tls: Bool,
- threads: Int = System.coreCount,
- eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup,
- _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
- ) {
- self.configureEventLoopGroup(threads: threads, eventLoopGroupType: eventLoopGroupType)
- self.startServer(withTLS: tls)
- self.startChannel(withTLS: tls) {
- // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
- // want to bounce off the limit as we wait for a connection to come up.
- $0.connectionPool.maxWaitersPerEventLoop = .max
- configure(&$0)
- }
- }
- private func doTestUnaryRPCs(count: Int) throws {
- var futures: [EventLoopFuture<GRPCStatus>] = []
- futures.reserveCapacity(count)
- for i in 1 ... count {
- let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
- let get = self.echo.get(request)
- futures.append(get.status)
- }
- let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
- XCTAssert(statuses.allSatisfy { $0.isOk })
- }
- func testUnaryRPCs_plaintext() throws {
- self.setUpClientAndServer(withTLS: false)
- try self.doTestUnaryRPCs(count: 100)
- }
- func testUnaryRPCs_tls() throws {
- self.setUpClientAndServer(withTLS: true)
- try self.doTestUnaryRPCs(count: 100)
- }
- private func doTestClientStreamingRPCs(count: Int) throws {
- var futures: [EventLoopFuture<GRPCStatus>] = []
- futures.reserveCapacity(count)
- for i in 1 ... count {
- let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
- let collect = self.echo.collect()
- collect.sendMessage(request, promise: nil)
- collect.sendMessage(request, promise: nil)
- collect.sendMessage(request, promise: nil)
- collect.sendEnd(promise: nil)
- futures.append(collect.status)
- }
- let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
- XCTAssert(statuses.allSatisfy { $0.isOk })
- }
- func testClientStreamingRPCs_plaintext() throws {
- self.setUpClientAndServer(withTLS: false)
- try self.doTestClientStreamingRPCs(count: 100)
- }
- func testClientStreamingRPCs() throws {
- self.setUpClientAndServer(withTLS: true)
- try self.doTestClientStreamingRPCs(count: 100)
- }
- private func doTestServerStreamingRPCs(count: Int) throws {
- var futures: [EventLoopFuture<GRPCStatus>] = []
- futures.reserveCapacity(count)
- for i in 1 ... count {
- let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
- let expand = self.echo.expand(request) { _ in }
- futures.append(expand.status)
- }
- let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
- XCTAssert(statuses.allSatisfy { $0.isOk })
- }
- func testServerStreamingRPCs_plaintext() throws {
- self.setUpClientAndServer(withTLS: false)
- try self.doTestServerStreamingRPCs(count: 100)
- }
- func testServerStreamingRPCs() throws {
- self.setUpClientAndServer(withTLS: true)
- try self.doTestServerStreamingRPCs(count: 100)
- }
- private func doTestBidiStreamingRPCs(count: Int) throws {
- var futures: [EventLoopFuture<GRPCStatus>] = []
- futures.reserveCapacity(count)
- for i in 1 ... count {
- let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
- let update = self.echo.update { _ in }
- update.sendMessage(request, promise: nil)
- update.sendMessage(request, promise: nil)
- update.sendMessage(request, promise: nil)
- update.sendEnd(promise: nil)
- futures.append(update.status)
- }
- let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
- XCTAssert(statuses.allSatisfy { $0.isOk })
- }
- func testBidiStreamingRPCs_plaintext() throws {
- self.setUpClientAndServer(withTLS: false)
- try self.doTestBidiStreamingRPCs(count: 100)
- }
- func testBidiStreamingRPCs() throws {
- self.setUpClientAndServer(withTLS: true)
- try self.doTestBidiStreamingRPCs(count: 100)
- }
- func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
- // 4 threads == 4 pools
- self.configureEventLoopGroup(threads: 4)
- // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
- self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
- // Tiny wait time for waiters.
- $0.connectionPool.maxWaitTime = .milliseconds(50)
- }
- var statuses: [EventLoopFuture<GRPCStatus>] = []
- statuses.reserveCapacity(40)
- // Queue RPCs on each loop.
- for eventLoop in self.group.makeIterator() {
- let options = CallOptions(eventLoopPreference: .exact(eventLoop))
- for i in 0 ..< 10 {
- let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
- statuses.append(get.status)
- }
- }
- let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
- for result in results {
- result.assertSuccess {
- XCTAssertEqual($0.code, .deadlineExceeded)
- }
- }
- }
- func testRPCsAreDistributedAcrossEventLoops() throws {
- self.configureEventLoopGroup(threads: 4)
- // We don't need a server here, but we do need a different target
- self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
- // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
- // never complete and streams are not returned back to pools.
- $0.connectionPool.maxWaitTime = .hours(1)
- }
- var echo = self.echo
- echo.defaultCallOptions.eventLoopPreference = .indifferent
- let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
- let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
- for rpcs in rpcsByEventLoop.values {
- // 40 RPCs over 4 ELs should be 10 RPCs per EL.
- XCTAssertEqual(rpcs.count, 10)
- }
- // All RPCs are waiting for connections since we never brought up a server. Each will fail when
- // we shutdown the pool.
- XCTAssertNoThrow(try self.channel?.close().wait())
- // Unset the channel to avoid shutting down again in tearDown().
- self.channel = nil
- for rpc in rpcs {
- XCTAssertEqual(try rpc.status.wait().code, .unavailable)
- }
- }
- func testWaiterLimitPerEventLoop() throws {
- self.configureEventLoopGroup(threads: 4)
- self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
- $0.connectionPool.maxWaitersPerEventLoop = 10
- $0.connectionPool.maxWaitTime = .hours(1)
- }
- let loop = self.group.next()
- let options = CallOptions(eventLoopPreference: .exact(loop))
- // The first 10 will be waiting for the connection. The 11th should be failed immediately.
- let rpcs = (1 ... 11).map { _ in
- self.echo.get(.with { $0.text = "" }, callOptions: options)
- }
- XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
- // If we express no event loop preference then we should not get the loaded loop.
- let indifferentLoopRPCs = (1 ... 10).map {
- _ in self.echo.get(.with { $0.text = "" })
- }
- XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
- }
- func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
- self.configureEventLoopGroup(threads: 1)
- self.startServer()
- self.startChannel {
- $0.connectionPool.connectionsPerEventLoop = 1
- $0.connectionPool.maxWaitTime = .hours(1)
- }
- let lock = NIOLock()
- var order = 0
- // We need a connection to be up and running to avoid hitting the waiter limit when creating a
- // batch of RPCs in one go.
- let warmup = self.echo.get(.with { $0.text = "" })
- XCTAssert(try warmup.status.wait().isOk)
- // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
- // wait because there's already an active connection.
- let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in } }
- // The first RPC should (obviously) complete first.
- rpcs.first!.status.whenComplete { _ in
- lock.withLock {
- XCTAssertEqual(order, 0)
- order += 1
- }
- }
- // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
- // RPC below).
- rpcs.last!.status.whenComplete { _ in
- lock.withLock {
- XCTAssertEqual(order, 1)
- order += 1
- }
- }
- // Still zero: the first RPC is still active.
- lock.withLock { XCTAssertEqual(order, 0) }
- // End the first RPC.
- XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
- XCTAssertNoThrow(try rpcs.first!.status.wait())
- lock.withLock { XCTAssertEqual(order, 1) }
- // End the last RPC.
- XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
- XCTAssertNoThrow(try rpcs.last!.status.wait())
- lock.withLock { XCTAssertEqual(order, 2) }
- // End the rest.
- for rpc in rpcs.dropFirst().dropLast() {
- XCTAssertNoThrow(try rpc.sendEnd().wait())
- }
- }
- func testRPCOnShutdownPool() {
- self.configureEventLoopGroup(threads: 1)
- self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
- let echo = self.echo
- XCTAssertNoThrow(try self.channel?.close().wait())
- // Avoid shutting down again in tearDown()
- self.channel = nil
- let get = echo.get(.with { $0.text = "" })
- XCTAssertEqual(try get.status.wait().code, .unavailable)
- }
- func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
- self.configureEventLoopGroup(threads: 1)
- self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
- $0.connectionPool.maxWaitTime = .hours(24)
- }
- // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
- // (much) later!
- let options = CallOptions(timeLimit: .deadline(.now()))
- let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
- XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
- }
- func testTLSFailuresAreClearerAtTheRPCLevel() throws {
- // Mix and match TLS.
- self.configureEventLoopGroup(threads: 1)
- self.startServer(withTLS: false)
- self.startChannel(withTLS: true) {
- $0.connectionPool.maxWaitersPerEventLoop = 10
- }
- // We can't guarantee an error happens within a certain time limit, so if we don't see what we
- // expect we'll loop until a given deadline passes.
- let testDeadline = NIODeadline.now() + .seconds(5)
- var seenError = false
- while testDeadline > .now() {
- let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
- let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
- let status = try get.status.wait()
- XCTAssertEqual(status.code, .deadlineExceeded)
- if let cause = status.cause, cause is NIOSSLError {
- // What we expect.
- seenError = true
- break
- } else {
- // Try again.
- continue
- }
- }
- XCTAssert(seenError)
- // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
- // of these. (They'll fail when we tear down the pool at the end of the test.)
- _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
- let options = CallOptions(timeLimit: .deadline(.distantFuture))
- return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
- }
- // Queue up one more.
- let options = CallOptions(timeLimit: .deadline(.distantFuture))
- let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
- let status = try tooManyWaiters.status.wait()
- XCTAssertEqual(status.code, .resourceExhausted)
- if let cause = status.cause {
- XCTAssert(cause is NIOSSLError)
- } else {
- XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
- }
- }
- func testConnectionPoolDelegateSingleConnection() throws {
- let recorder = EventRecordingConnectionPoolDelegate()
- self.setUpClientAndServer(withTLS: false, threads: 1) {
- $0.delegate = recorder
- }
- let warmup = self.echo.get(.with { $0.text = "" })
- XCTAssertNoThrow(try warmup.status.wait())
- let id = try XCTUnwrap(recorder.first?.id)
- XCTAssertEqual(recorder.popFirst(), .connectionAdded(id))
- XCTAssertEqual(recorder.popFirst(), .startedConnecting(id))
- XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100))
- let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = try (1 ... 10).map { i in
- let rpc = self.echo.collect()
- XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100))
- return rpc
- }
- for (i, rpc) in rpcs.enumerated() {
- XCTAssertNoThrow(try rpc.sendEnd().wait())
- XCTAssertNoThrow(try rpc.status.wait())
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 10 - (i + 1), 100))
- }
- XCTAssertNoThrow(try self.channel?.close().wait())
- XCTAssertEqual(recorder.popFirst(), .connectionClosed(id))
- XCTAssertEqual(recorder.popFirst(), .connectionRemoved(id))
- XCTAssert(recorder.isEmpty)
- }
- func testConnectionPoolDelegateQuiescing() throws {
- let recorder = EventRecordingConnectionPoolDelegate()
- self.setUpClientAndServer(withTLS: false, threads: 1) {
- $0.delegate = recorder
- }
- XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
- let id1 = try XCTUnwrap(recorder.first?.id)
- XCTAssertEqual(recorder.popFirst(), .connectionAdded(id1))
- XCTAssertEqual(recorder.popFirst(), .startedConnecting(id1))
- XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id1, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100))
- // Start an RPC.
- let rpc = self.echo.collect()
- XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
- // Complete another one to make sure the previous one is known by the server.
- XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 2, 100))
- XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
- // Start shutting the server down.
- let didShutdown = self.server!.initiateGracefulShutdown()
- self.server = nil // Avoid shutting down again in tearDown
- // Pause a moment so we know the client received the GOAWAY.
- let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {}
- XCTAssertNoThrow(try sleep.futureResult.wait())
- XCTAssertEqual(recorder.popFirst(), .connectionQuiescing(id1))
- // Finish the RPC.
- XCTAssertNoThrow(try rpc.sendEnd().wait())
- XCTAssertNoThrow(try rpc.status.wait())
- // Server should shutdown now.
- XCTAssertNoThrow(try didShutdown.wait())
- }
- func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() {
- final class State {
- private enum Storage {
- case idle
- case connecting
- case connected
- }
- private var state: Storage = .idle
- private let lock = NIOLock()
- var isConnected: Bool {
- return self.lock.withLock {
- switch self.state {
- case .connected:
- return true
- case .idle, .connecting:
- return false
- }
- }
- }
- func startedConnecting() {
- self.lock.withLock {
- switch self.state {
- case .idle:
- self.state = .connecting
- case .connecting, .connected:
- XCTFail("Invalid state \(self.state) for \(#function)")
- }
- }
- }
- func connected() {
- self.lock.withLock {
- switch self.state {
- case .connecting:
- self.state = .connected
- case .idle, .connected:
- XCTFail("Invalid state \(self.state) for \(#function)")
- }
- }
- }
- }
- let state = State()
- self.setUpClientAndServer(withTLS: false, threads: 1) {
- $0.delegate = IsConnectingDelegate { stateChange in
- switch stateChange {
- case .connecting:
- state.startedConnecting()
- case .connected:
- state.connected()
- }
- }
- }
- XCTAssertFalse(state.isConnected)
- let rpc = self.echo.get(.with { $0.text = "" })
- XCTAssertNoThrow(try rpc.status.wait())
- XCTAssertTrue(state.isConnected)
- // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if
- // a state change happens).
- let rpcs: [EventLoopFuture<GRPCStatus>] = (0 ..< 20).map { i in
- let rpc = self.echo.get(.with { $0.text = "\(i)" })
- return rpc.status
- }
- XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait())
- }
- func testDelegateGetsCalledWithStats() throws {
- let recorder = EventRecordingConnectionPoolDelegate()
- self.configureEventLoopGroup(threads: 4)
- self.startServer(withTLS: false)
- self.startChannel(withTLS: false) {
- $0.statsPeriod = .milliseconds(1)
- $0.delegate = recorder
- }
- let scheduled = self.group.next().scheduleTask(in: .milliseconds(100)) {
- _ = self.channel?.close()
- }
- try scheduled.futureResult.wait()
- let events = recorder.removeAll()
- let statsEvents = events.compactMap { event in
- switch event {
- case .stats(let stats, _):
- return stats
- default:
- return nil
- }
- }
- XCTAssertGreaterThan(statsEvents.count, 0)
- }
- #if canImport(Network)
- func testNWParametersConfigurator() throws {
- let counter = NIOLockedValueBox(0)
- self.setUpClientAndServer(withTLS: false, eventLoopGroupType: .transportServicesEventLoopGroup)
- { configuration in
- configuration.transportServices.nwParametersConfigurator = { _ in
- counter.withLockedValue { $0 += 1 }
- }
- }
- // Execute an RPC to make sure a channel gets created/activated and the parameters configurator run.
- try self.doTestUnaryRPCs(count: 1)
- XCTAssertEqual(1, counter.withLockedValue({ $0 }))
- }
- #endif // canImport(Network)
- }
- #endif // canImport(NIOSSL)
|