2
0

GRPCChannelPoolTests.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(NIOSSL)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import GRPCSampleData
  21. import NIOConcurrencyHelpers
  22. import NIOCore
  23. import NIOPosix
  24. import NIOSSL
  25. import XCTest
  26. final class GRPCChannelPoolTests: GRPCTestCase {
  27. private var group: MultiThreadedEventLoopGroup!
  28. private var server: Server?
  29. private var channel: GRPCChannel?
  30. private var serverPort: Int? {
  31. return self.server?.channel.localAddress?.port
  32. }
  33. private var echo: Echo_EchoNIOClient {
  34. return Echo_EchoNIOClient(channel: self.channel!)
  35. }
  36. override func tearDown() {
  37. if let channel = self.channel {
  38. XCTAssertNoThrow(try channel.close().wait())
  39. }
  40. if let server = self.server {
  41. XCTAssertNoThrow(try server.close().wait())
  42. }
  43. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  44. super.tearDown()
  45. }
  46. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  47. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  48. }
  49. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  50. let builder: Server.Builder
  51. if withTLS {
  52. builder = Server.usingTLSBackedByNIOSSL(
  53. on: self.group,
  54. certificateChain: [SampleCertificate.server.certificate],
  55. privateKey: SamplePrivateKey.server
  56. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  57. } else {
  58. builder = Server.insecure(group: self.group)
  59. }
  60. return builder
  61. .withLogger(self.serverLogger)
  62. .withServiceProviders([EchoProvider()])
  63. }
  64. private func startServer(withTLS: Bool = false, port: Int = 0) {
  65. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  66. .bind(host: "localhost", port: port)
  67. .wait()
  68. }
  69. private func startChannel(
  70. withTLS: Bool = false,
  71. overrideTarget targetOverride: ConnectionTarget? = nil,
  72. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  73. ) {
  74. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  75. if withTLS {
  76. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  77. trustRoots: .certificates([SampleCertificate.ca.certificate])
  78. )
  79. transportSecurity = .tls(configuration)
  80. } else {
  81. transportSecurity = .plaintext
  82. }
  83. self.channel = try! GRPCChannelPool.with(
  84. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  85. transportSecurity: transportSecurity,
  86. eventLoopGroup: self.group
  87. ) { configuration in
  88. configuration.backgroundActivityLogger = self.clientLogger
  89. configure(&configuration)
  90. }
  91. }
  92. private func setUpClientAndServer(
  93. withTLS tls: Bool,
  94. threads: Int = System.coreCount,
  95. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  96. ) {
  97. self.configureEventLoopGroup(threads: threads)
  98. self.startServer(withTLS: tls)
  99. self.startChannel(withTLS: tls) {
  100. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  101. // want to bounce off the limit as we wait for a connection to come up.
  102. $0.connectionPool.maxWaitersPerEventLoop = .max
  103. configure(&$0)
  104. }
  105. }
  106. private func doTestUnaryRPCs(count: Int) throws {
  107. var futures: [EventLoopFuture<GRPCStatus>] = []
  108. futures.reserveCapacity(count)
  109. for i in 1 ... count {
  110. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  111. let get = self.echo.get(request)
  112. futures.append(get.status)
  113. }
  114. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  115. XCTAssert(statuses.allSatisfy { $0.isOk })
  116. }
  117. func testUnaryRPCs_plaintext() throws {
  118. self.setUpClientAndServer(withTLS: false)
  119. try self.doTestUnaryRPCs(count: 100)
  120. }
  121. func testUnaryRPCs_tls() throws {
  122. self.setUpClientAndServer(withTLS: true)
  123. try self.doTestUnaryRPCs(count: 100)
  124. }
  125. private func doTestClientStreamingRPCs(count: Int) throws {
  126. var futures: [EventLoopFuture<GRPCStatus>] = []
  127. futures.reserveCapacity(count)
  128. for i in 1 ... count {
  129. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  130. let collect = self.echo.collect()
  131. collect.sendMessage(request, promise: nil)
  132. collect.sendMessage(request, promise: nil)
  133. collect.sendMessage(request, promise: nil)
  134. collect.sendEnd(promise: nil)
  135. futures.append(collect.status)
  136. }
  137. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  138. XCTAssert(statuses.allSatisfy { $0.isOk })
  139. }
  140. func testClientStreamingRPCs_plaintext() throws {
  141. self.setUpClientAndServer(withTLS: false)
  142. try self.doTestClientStreamingRPCs(count: 100)
  143. }
  144. func testClientStreamingRPCs() throws {
  145. self.setUpClientAndServer(withTLS: true)
  146. try self.doTestClientStreamingRPCs(count: 100)
  147. }
  148. private func doTestServerStreamingRPCs(count: Int) throws {
  149. var futures: [EventLoopFuture<GRPCStatus>] = []
  150. futures.reserveCapacity(count)
  151. for i in 1 ... count {
  152. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  153. let expand = self.echo.expand(request) { _ in }
  154. futures.append(expand.status)
  155. }
  156. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  157. XCTAssert(statuses.allSatisfy { $0.isOk })
  158. }
  159. func testServerStreamingRPCs_plaintext() throws {
  160. self.setUpClientAndServer(withTLS: false)
  161. try self.doTestServerStreamingRPCs(count: 100)
  162. }
  163. func testServerStreamingRPCs() throws {
  164. self.setUpClientAndServer(withTLS: true)
  165. try self.doTestServerStreamingRPCs(count: 100)
  166. }
  167. private func doTestBidiStreamingRPCs(count: Int) throws {
  168. var futures: [EventLoopFuture<GRPCStatus>] = []
  169. futures.reserveCapacity(count)
  170. for i in 1 ... count {
  171. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  172. let update = self.echo.update { _ in }
  173. update.sendMessage(request, promise: nil)
  174. update.sendMessage(request, promise: nil)
  175. update.sendMessage(request, promise: nil)
  176. update.sendEnd(promise: nil)
  177. futures.append(update.status)
  178. }
  179. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  180. XCTAssert(statuses.allSatisfy { $0.isOk })
  181. }
  182. func testBidiStreamingRPCs_plaintext() throws {
  183. self.setUpClientAndServer(withTLS: false)
  184. try self.doTestBidiStreamingRPCs(count: 100)
  185. }
  186. func testBidiStreamingRPCs() throws {
  187. self.setUpClientAndServer(withTLS: true)
  188. try self.doTestBidiStreamingRPCs(count: 100)
  189. }
  190. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  191. // 4 threads == 4 pools
  192. self.configureEventLoopGroup(threads: 4)
  193. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  194. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  195. // Tiny wait time for waiters.
  196. $0.connectionPool.maxWaitTime = .milliseconds(50)
  197. }
  198. var statuses: [EventLoopFuture<GRPCStatus>] = []
  199. statuses.reserveCapacity(40)
  200. // Queue RPCs on each loop.
  201. for eventLoop in self.group.makeIterator() {
  202. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  203. for i in 0 ..< 10 {
  204. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  205. statuses.append(get.status)
  206. }
  207. }
  208. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  209. for result in results {
  210. result.assertSuccess {
  211. XCTAssertEqual($0.code, .deadlineExceeded)
  212. }
  213. }
  214. }
  215. func testRPCsAreDistributedAcrossEventLoops() throws {
  216. self.configureEventLoopGroup(threads: 4)
  217. // We don't need a server here, but we do need a different target
  218. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  219. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  220. // never complete and streams are not returned back to pools.
  221. $0.connectionPool.maxWaitTime = .hours(1)
  222. }
  223. var echo = self.echo
  224. echo.defaultCallOptions.eventLoopPreference = .indifferent
  225. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  226. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  227. for rpcs in rpcsByEventLoop.values {
  228. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  229. XCTAssertEqual(rpcs.count, 10)
  230. }
  231. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  232. // we shutdown the pool.
  233. XCTAssertNoThrow(try self.channel?.close().wait())
  234. // Unset the channel to avoid shutting down again in tearDown().
  235. self.channel = nil
  236. for rpc in rpcs {
  237. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  238. }
  239. }
  240. func testWaiterLimitPerEventLoop() throws {
  241. self.configureEventLoopGroup(threads: 4)
  242. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  243. $0.connectionPool.maxWaitersPerEventLoop = 10
  244. $0.connectionPool.maxWaitTime = .hours(1)
  245. }
  246. let loop = self.group.next()
  247. let options = CallOptions(eventLoopPreference: .exact(loop))
  248. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  249. let rpcs = (1 ... 11).map { _ in
  250. self.echo.get(.with { $0.text = "" }, callOptions: options)
  251. }
  252. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  253. // If we express no event loop preference then we should not get the loaded loop.
  254. let indifferentLoopRPCs = (1 ... 10).map {
  255. _ in echo.get(.with { $0.text = "" })
  256. }
  257. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  258. }
  259. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  260. self.configureEventLoopGroup(threads: 1)
  261. self.startServer()
  262. self.startChannel {
  263. $0.connectionPool.connectionsPerEventLoop = 1
  264. $0.connectionPool.maxWaitTime = .hours(1)
  265. }
  266. let lock = NIOLock()
  267. var order = 0
  268. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  269. // batch of RPCs in one go.
  270. let warmup = self.echo.get(.with { $0.text = "" })
  271. XCTAssert(try warmup.status.wait().isOk)
  272. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  273. // wait because there's already an active connection.
  274. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
  275. // The first RPC should (obviously) complete first.
  276. rpcs.first!.status.whenComplete { _ in
  277. lock.withLock {
  278. XCTAssertEqual(order, 0)
  279. order += 1
  280. }
  281. }
  282. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  283. // RPC below).
  284. rpcs.last!.status.whenComplete { _ in
  285. lock.withLock {
  286. XCTAssertEqual(order, 1)
  287. order += 1
  288. }
  289. }
  290. // Still zero: the first RPC is still active.
  291. lock.withLock { XCTAssertEqual(order, 0) }
  292. // End the first RPC.
  293. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  294. XCTAssertNoThrow(try rpcs.first!.status.wait())
  295. lock.withLock { XCTAssertEqual(order, 1) }
  296. // End the last RPC.
  297. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  298. XCTAssertNoThrow(try rpcs.last!.status.wait())
  299. lock.withLock { XCTAssertEqual(order, 2) }
  300. // End the rest.
  301. for rpc in rpcs.dropFirst().dropLast() {
  302. XCTAssertNoThrow(try rpc.sendEnd().wait())
  303. }
  304. }
  305. func testRPCOnShutdownPool() {
  306. self.configureEventLoopGroup(threads: 1)
  307. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  308. let echo = self.echo
  309. XCTAssertNoThrow(try self.channel?.close().wait())
  310. // Avoid shutting down again in tearDown()
  311. self.channel = nil
  312. let get = echo.get(.with { $0.text = "" })
  313. XCTAssertEqual(try get.status.wait().code, .unavailable)
  314. }
  315. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  316. self.configureEventLoopGroup(threads: 1)
  317. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  318. $0.connectionPool.maxWaitTime = .hours(24)
  319. }
  320. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  321. // (much) later!
  322. let options = CallOptions(timeLimit: .deadline(.now()))
  323. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  324. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  325. }
  326. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  327. // Mix and match TLS.
  328. self.configureEventLoopGroup(threads: 1)
  329. self.startServer(withTLS: false)
  330. self.startChannel(withTLS: true) {
  331. $0.connectionPool.maxWaitersPerEventLoop = 10
  332. }
  333. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  334. // expect we'll loop until a given deadline passes.
  335. let testDeadline = NIODeadline.now() + .seconds(5)
  336. var seenError = false
  337. while testDeadline > .now() {
  338. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  339. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  340. let status = try get.status.wait()
  341. XCTAssertEqual(status.code, .deadlineExceeded)
  342. if let cause = status.cause, cause is NIOSSLError {
  343. // What we expect.
  344. seenError = true
  345. break
  346. } else {
  347. // Try again.
  348. continue
  349. }
  350. }
  351. XCTAssert(seenError)
  352. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  353. // of these. (They'll fail when we tear down the pool at the end of the test.)
  354. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  355. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  356. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  357. }
  358. // Queue up one more.
  359. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  360. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  361. let status = try tooManyWaiters.status.wait()
  362. XCTAssertEqual(status.code, .resourceExhausted)
  363. if let cause = status.cause {
  364. XCTAssert(cause is NIOSSLError)
  365. } else {
  366. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  367. }
  368. }
  369. func testConnectionPoolDelegateSingleConnection() async throws {
  370. let (delegate, stream) = AsyncEventStreamConnectionPoolDelegate.makeDelegateAndAsyncStream()
  371. self.setUpClientAndServer(withTLS: false, threads: 1) {
  372. $0.delegate = delegate
  373. }
  374. let warmup = self.echo.get(.with { $0.text = "" })
  375. XCTAssertNoThrow(try warmup.status.wait())
  376. var iterator = stream.makeAsyncIterator()
  377. var event = await iterator.next()
  378. let id = try XCTUnwrap(event?.id)
  379. XCTAssertEqual(event, .connectionAdded(id))
  380. event = await iterator.next()
  381. XCTAssertEqual(event, .startedConnecting(id))
  382. event = await iterator.next()
  383. XCTAssertEqual(event, .connectSucceeded(id, 100))
  384. event = await iterator.next()
  385. XCTAssertEqual(event, .connectionUtilizationChanged(id, 1, 100))
  386. event = await iterator.next()
  387. XCTAssertEqual(event, .connectionUtilizationChanged(id, 0, 100))
  388. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = try (1 ... 10).map { _ in
  389. let rpc = self.echo.collect()
  390. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  391. return rpc
  392. }
  393. for (i, _) in rpcs.enumerated() {
  394. let event = await iterator.next()
  395. XCTAssertEqual(event, .connectionUtilizationChanged(id, i + 1, 100))
  396. }
  397. for (i, rpc) in rpcs.enumerated() {
  398. XCTAssertNoThrow(try rpc.sendEnd().wait())
  399. XCTAssertNoThrow(try rpc.status.wait())
  400. let event = await iterator.next()
  401. XCTAssertEqual(event, .connectionUtilizationChanged(id, 10 - (i + 1), 100))
  402. }
  403. XCTAssertNoThrow(try self.channel?.close().wait())
  404. event = await iterator.next()
  405. XCTAssertEqual(event, .connectionClosed(id))
  406. event = await iterator.next()
  407. XCTAssertEqual(event, .connectionRemoved(id))
  408. }
  409. func testConnectionPoolDelegateQuiescing() async throws {
  410. let (delegate, stream) = AsyncEventStreamConnectionPoolDelegate.makeDelegateAndAsyncStream()
  411. self.setUpClientAndServer(withTLS: false, threads: 1) {
  412. $0.delegate = delegate
  413. }
  414. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  415. var iterator = stream.makeAsyncIterator()
  416. var event = await iterator.next()
  417. let id1 = try XCTUnwrap(event?.id)
  418. XCTAssertEqual(event, .connectionAdded(id1))
  419. event = await iterator.next()
  420. XCTAssertEqual(event, .startedConnecting(id1))
  421. event = await iterator.next()
  422. XCTAssertEqual(event, .connectSucceeded(id1, 100))
  423. event = await iterator.next()
  424. XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100))
  425. event = await iterator.next()
  426. XCTAssertEqual(event, .connectionUtilizationChanged(id1, 0, 100))
  427. // Start an RPC.
  428. let rpc = self.echo.collect()
  429. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  430. // Complete another one to make sure the previous one is known by the server.
  431. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  432. event = await iterator.next()
  433. XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100))
  434. event = await iterator.next()
  435. XCTAssertEqual(event, .connectionUtilizationChanged(id1, 2, 100))
  436. event = await iterator.next()
  437. XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100))
  438. // Start shutting the server down.
  439. let didShutdown = self.server!.initiateGracefulShutdown()
  440. self.server = nil // Avoid shutting down again in tearDown
  441. // Pause a moment so we know the client received the GOAWAY.
  442. let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {}
  443. XCTAssertNoThrow(try sleep.futureResult.wait())
  444. event = await iterator.next()
  445. XCTAssertEqual(event, .connectionQuiescing(id1))
  446. // Finish the RPC.
  447. XCTAssertNoThrow(try rpc.sendEnd().wait())
  448. XCTAssertNoThrow(try rpc.status.wait())
  449. // Server should shutdown now.
  450. XCTAssertNoThrow(try didShutdown.wait())
  451. }
  452. func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() {
  453. final class State {
  454. private enum _State {
  455. case idle
  456. case connecting
  457. case connected
  458. }
  459. private var state: _State = .idle
  460. private let lock = NIOLock()
  461. var isConnected: Bool {
  462. return self.lock.withLock {
  463. switch self.state {
  464. case .connected:
  465. return true
  466. case .idle, .connecting:
  467. return false
  468. }
  469. }
  470. }
  471. func startedConnecting() {
  472. self.lock.withLock {
  473. switch self.state {
  474. case .idle:
  475. self.state = .connecting
  476. case .connecting, .connected:
  477. XCTFail("Invalid state \(self.state) for \(#function)")
  478. }
  479. }
  480. }
  481. func connected() {
  482. self.lock.withLock {
  483. switch self.state {
  484. case .connecting:
  485. self.state = .connected
  486. case .idle, .connected:
  487. XCTFail("Invalid state \(self.state) for \(#function)")
  488. }
  489. }
  490. }
  491. }
  492. let state = State()
  493. self.setUpClientAndServer(withTLS: false, threads: 1) {
  494. $0.delegate = IsConnectingDelegate { stateChange in
  495. switch stateChange {
  496. case .connecting:
  497. state.startedConnecting()
  498. case .connected:
  499. state.connected()
  500. }
  501. }
  502. }
  503. XCTAssertFalse(state.isConnected)
  504. let rpc = self.echo.get(.with { $0.text = "" })
  505. XCTAssertNoThrow(try rpc.status.wait())
  506. XCTAssertTrue(state.isConnected)
  507. // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if
  508. // a state change happens).
  509. let rpcs: [EventLoopFuture<GRPCStatus>] = (0 ..< 20).map { i in
  510. let rpc = self.echo.get(.with { $0.text = "\(i)" })
  511. return rpc.status
  512. }
  513. XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait())
  514. }
  515. }
  516. #endif // canImport(NIOSSL)