GRPCChannelPoolTests.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(NIOSSL)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import GRPCSampleData
  21. import NIOConcurrencyHelpers
  22. import NIOCore
  23. import NIOPosix
  24. import NIOSSL
  25. import XCTest
  26. final class GRPCChannelPoolTests: GRPCTestCase {
  27. private var group: MultiThreadedEventLoopGroup!
  28. private var server: Server?
  29. private var channel: GRPCChannel?
  30. private var serverPort: Int? {
  31. return self.server?.channel.localAddress?.port
  32. }
  33. private var echo: Echo_EchoNIOClient {
  34. return Echo_EchoNIOClient(channel: self.channel!)
  35. }
  36. override func tearDown() {
  37. if let channel = self.channel {
  38. XCTAssertNoThrow(try channel.close().wait())
  39. }
  40. if let server = self.server {
  41. XCTAssertNoThrow(try server.close().wait())
  42. }
  43. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  44. super.tearDown()
  45. }
  46. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  47. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  48. }
  49. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  50. let builder: Server.Builder
  51. if withTLS {
  52. builder = Server.usingTLSBackedByNIOSSL(
  53. on: self.group,
  54. certificateChain: [SampleCertificate.server.certificate],
  55. privateKey: SamplePrivateKey.server
  56. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  57. } else {
  58. builder = Server.insecure(group: self.group)
  59. }
  60. return
  61. builder
  62. .withLogger(self.serverLogger)
  63. .withServiceProviders([EchoProvider()])
  64. }
  65. private func startServer(withTLS: Bool = false, port: Int = 0) {
  66. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  67. .bind(host: "localhost", port: port)
  68. .wait()
  69. }
  70. private func startChannel(
  71. withTLS: Bool = false,
  72. overrideTarget targetOverride: ConnectionTarget? = nil,
  73. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  74. ) {
  75. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  76. if withTLS {
  77. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  78. trustRoots: .certificates([SampleCertificate.ca.certificate])
  79. )
  80. transportSecurity = .tls(configuration)
  81. } else {
  82. transportSecurity = .plaintext
  83. }
  84. self.channel = try! GRPCChannelPool.with(
  85. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  86. transportSecurity: transportSecurity,
  87. eventLoopGroup: self.group
  88. ) { configuration in
  89. configuration.backgroundActivityLogger = self.clientLogger
  90. configure(&configuration)
  91. }
  92. }
  93. private func setUpClientAndServer(
  94. withTLS tls: Bool,
  95. threads: Int = System.coreCount,
  96. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  97. ) {
  98. self.configureEventLoopGroup(threads: threads)
  99. self.startServer(withTLS: tls)
  100. self.startChannel(withTLS: tls) {
  101. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  102. // want to bounce off the limit as we wait for a connection to come up.
  103. $0.connectionPool.maxWaitersPerEventLoop = .max
  104. configure(&$0)
  105. }
  106. }
  107. private func doTestUnaryRPCs(count: Int) throws {
  108. var futures: [EventLoopFuture<GRPCStatus>] = []
  109. futures.reserveCapacity(count)
  110. for i in 1 ... count {
  111. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  112. let get = self.echo.get(request)
  113. futures.append(get.status)
  114. }
  115. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  116. XCTAssert(statuses.allSatisfy { $0.isOk })
  117. }
  118. func testUnaryRPCs_plaintext() throws {
  119. self.setUpClientAndServer(withTLS: false)
  120. try self.doTestUnaryRPCs(count: 100)
  121. }
  122. func testUnaryRPCs_tls() throws {
  123. self.setUpClientAndServer(withTLS: true)
  124. try self.doTestUnaryRPCs(count: 100)
  125. }
  126. private func doTestClientStreamingRPCs(count: Int) throws {
  127. var futures: [EventLoopFuture<GRPCStatus>] = []
  128. futures.reserveCapacity(count)
  129. for i in 1 ... count {
  130. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  131. let collect = self.echo.collect()
  132. collect.sendMessage(request, promise: nil)
  133. collect.sendMessage(request, promise: nil)
  134. collect.sendMessage(request, promise: nil)
  135. collect.sendEnd(promise: nil)
  136. futures.append(collect.status)
  137. }
  138. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  139. XCTAssert(statuses.allSatisfy { $0.isOk })
  140. }
  141. func testClientStreamingRPCs_plaintext() throws {
  142. self.setUpClientAndServer(withTLS: false)
  143. try self.doTestClientStreamingRPCs(count: 100)
  144. }
  145. func testClientStreamingRPCs() throws {
  146. self.setUpClientAndServer(withTLS: true)
  147. try self.doTestClientStreamingRPCs(count: 100)
  148. }
  149. private func doTestServerStreamingRPCs(count: Int) throws {
  150. var futures: [EventLoopFuture<GRPCStatus>] = []
  151. futures.reserveCapacity(count)
  152. for i in 1 ... count {
  153. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  154. let expand = self.echo.expand(request) { _ in }
  155. futures.append(expand.status)
  156. }
  157. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  158. XCTAssert(statuses.allSatisfy { $0.isOk })
  159. }
  160. func testServerStreamingRPCs_plaintext() throws {
  161. self.setUpClientAndServer(withTLS: false)
  162. try self.doTestServerStreamingRPCs(count: 100)
  163. }
  164. func testServerStreamingRPCs() throws {
  165. self.setUpClientAndServer(withTLS: true)
  166. try self.doTestServerStreamingRPCs(count: 100)
  167. }
  168. private func doTestBidiStreamingRPCs(count: Int) throws {
  169. var futures: [EventLoopFuture<GRPCStatus>] = []
  170. futures.reserveCapacity(count)
  171. for i in 1 ... count {
  172. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  173. let update = self.echo.update { _ in }
  174. update.sendMessage(request, promise: nil)
  175. update.sendMessage(request, promise: nil)
  176. update.sendMessage(request, promise: nil)
  177. update.sendEnd(promise: nil)
  178. futures.append(update.status)
  179. }
  180. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  181. XCTAssert(statuses.allSatisfy { $0.isOk })
  182. }
  183. func testBidiStreamingRPCs_plaintext() throws {
  184. self.setUpClientAndServer(withTLS: false)
  185. try self.doTestBidiStreamingRPCs(count: 100)
  186. }
  187. func testBidiStreamingRPCs() throws {
  188. self.setUpClientAndServer(withTLS: true)
  189. try self.doTestBidiStreamingRPCs(count: 100)
  190. }
  191. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  192. // 4 threads == 4 pools
  193. self.configureEventLoopGroup(threads: 4)
  194. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  195. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  196. // Tiny wait time for waiters.
  197. $0.connectionPool.maxWaitTime = .milliseconds(50)
  198. }
  199. var statuses: [EventLoopFuture<GRPCStatus>] = []
  200. statuses.reserveCapacity(40)
  201. // Queue RPCs on each loop.
  202. for eventLoop in self.group.makeIterator() {
  203. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  204. for i in 0 ..< 10 {
  205. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  206. statuses.append(get.status)
  207. }
  208. }
  209. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  210. for result in results {
  211. result.assertSuccess {
  212. XCTAssertEqual($0.code, .deadlineExceeded)
  213. }
  214. }
  215. }
  216. func testRPCsAreDistributedAcrossEventLoops() throws {
  217. self.configureEventLoopGroup(threads: 4)
  218. // We don't need a server here, but we do need a different target
  219. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  220. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  221. // never complete and streams are not returned back to pools.
  222. $0.connectionPool.maxWaitTime = .hours(1)
  223. }
  224. var echo = self.echo
  225. echo.defaultCallOptions.eventLoopPreference = .indifferent
  226. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  227. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  228. for rpcs in rpcsByEventLoop.values {
  229. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  230. XCTAssertEqual(rpcs.count, 10)
  231. }
  232. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  233. // we shutdown the pool.
  234. XCTAssertNoThrow(try self.channel?.close().wait())
  235. // Unset the channel to avoid shutting down again in tearDown().
  236. self.channel = nil
  237. for rpc in rpcs {
  238. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  239. }
  240. }
  241. func testWaiterLimitPerEventLoop() throws {
  242. self.configureEventLoopGroup(threads: 4)
  243. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  244. $0.connectionPool.maxWaitersPerEventLoop = 10
  245. $0.connectionPool.maxWaitTime = .hours(1)
  246. }
  247. let loop = self.group.next()
  248. let options = CallOptions(eventLoopPreference: .exact(loop))
  249. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  250. let rpcs = (1 ... 11).map { _ in
  251. self.echo.get(.with { $0.text = "" }, callOptions: options)
  252. }
  253. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  254. // If we express no event loop preference then we should not get the loaded loop.
  255. let indifferentLoopRPCs = (1 ... 10).map {
  256. _ in self.echo.get(.with { $0.text = "" })
  257. }
  258. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  259. }
  260. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  261. self.configureEventLoopGroup(threads: 1)
  262. self.startServer()
  263. self.startChannel {
  264. $0.connectionPool.connectionsPerEventLoop = 1
  265. $0.connectionPool.maxWaitTime = .hours(1)
  266. }
  267. let lock = NIOLock()
  268. var order = 0
  269. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  270. // batch of RPCs in one go.
  271. let warmup = self.echo.get(.with { $0.text = "" })
  272. XCTAssert(try warmup.status.wait().isOk)
  273. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  274. // wait because there's already an active connection.
  275. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in } }
  276. // The first RPC should (obviously) complete first.
  277. rpcs.first!.status.whenComplete { _ in
  278. lock.withLock {
  279. XCTAssertEqual(order, 0)
  280. order += 1
  281. }
  282. }
  283. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  284. // RPC below).
  285. rpcs.last!.status.whenComplete { _ in
  286. lock.withLock {
  287. XCTAssertEqual(order, 1)
  288. order += 1
  289. }
  290. }
  291. // Still zero: the first RPC is still active.
  292. lock.withLock { XCTAssertEqual(order, 0) }
  293. // End the first RPC.
  294. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  295. XCTAssertNoThrow(try rpcs.first!.status.wait())
  296. lock.withLock { XCTAssertEqual(order, 1) }
  297. // End the last RPC.
  298. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  299. XCTAssertNoThrow(try rpcs.last!.status.wait())
  300. lock.withLock { XCTAssertEqual(order, 2) }
  301. // End the rest.
  302. for rpc in rpcs.dropFirst().dropLast() {
  303. XCTAssertNoThrow(try rpc.sendEnd().wait())
  304. }
  305. }
  306. func testRPCOnShutdownPool() {
  307. self.configureEventLoopGroup(threads: 1)
  308. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  309. let echo = self.echo
  310. XCTAssertNoThrow(try self.channel?.close().wait())
  311. // Avoid shutting down again in tearDown()
  312. self.channel = nil
  313. let get = echo.get(.with { $0.text = "" })
  314. XCTAssertEqual(try get.status.wait().code, .unavailable)
  315. }
  316. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  317. self.configureEventLoopGroup(threads: 1)
  318. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  319. $0.connectionPool.maxWaitTime = .hours(24)
  320. }
  321. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  322. // (much) later!
  323. let options = CallOptions(timeLimit: .deadline(.now()))
  324. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  325. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  326. }
  327. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  328. // Mix and match TLS.
  329. self.configureEventLoopGroup(threads: 1)
  330. self.startServer(withTLS: false)
  331. self.startChannel(withTLS: true) {
  332. $0.connectionPool.maxWaitersPerEventLoop = 10
  333. }
  334. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  335. // expect we'll loop until a given deadline passes.
  336. let testDeadline = NIODeadline.now() + .seconds(5)
  337. var seenError = false
  338. while testDeadline > .now() {
  339. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  340. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  341. let status = try get.status.wait()
  342. XCTAssertEqual(status.code, .deadlineExceeded)
  343. if let cause = status.cause, cause is NIOSSLError {
  344. // What we expect.
  345. seenError = true
  346. break
  347. } else {
  348. // Try again.
  349. continue
  350. }
  351. }
  352. XCTAssert(seenError)
  353. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  354. // of these. (They'll fail when we tear down the pool at the end of the test.)
  355. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  356. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  357. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  358. }
  359. // Queue up one more.
  360. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  361. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  362. let status = try tooManyWaiters.status.wait()
  363. XCTAssertEqual(status.code, .resourceExhausted)
  364. if let cause = status.cause {
  365. XCTAssert(cause is NIOSSLError)
  366. } else {
  367. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  368. }
  369. }
  370. func testConnectionPoolDelegateSingleConnection() throws {
  371. let recorder = EventRecordingConnectionPoolDelegate()
  372. self.setUpClientAndServer(withTLS: false, threads: 1) {
  373. $0.delegate = recorder
  374. }
  375. let warmup = self.echo.get(.with { $0.text = "" })
  376. XCTAssertNoThrow(try warmup.status.wait())
  377. let id = try XCTUnwrap(recorder.first?.id)
  378. XCTAssertEqual(recorder.popFirst(), .connectionAdded(id))
  379. XCTAssertEqual(recorder.popFirst(), .startedConnecting(id))
  380. XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id, 100))
  381. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100))
  382. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100))
  383. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = try (1 ... 10).map { i in
  384. let rpc = self.echo.collect()
  385. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  386. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100))
  387. return rpc
  388. }
  389. for (i, rpc) in rpcs.enumerated() {
  390. XCTAssertNoThrow(try rpc.sendEnd().wait())
  391. XCTAssertNoThrow(try rpc.status.wait())
  392. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 10 - (i + 1), 100))
  393. }
  394. XCTAssertNoThrow(try self.channel?.close().wait())
  395. XCTAssertEqual(recorder.popFirst(), .connectionClosed(id))
  396. XCTAssertEqual(recorder.popFirst(), .connectionRemoved(id))
  397. XCTAssert(recorder.isEmpty)
  398. }
  399. func testConnectionPoolDelegateQuiescing() throws {
  400. let recorder = EventRecordingConnectionPoolDelegate()
  401. self.setUpClientAndServer(withTLS: false, threads: 1) {
  402. $0.delegate = recorder
  403. }
  404. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  405. let id1 = try XCTUnwrap(recorder.first?.id)
  406. XCTAssertEqual(recorder.popFirst(), .connectionAdded(id1))
  407. XCTAssertEqual(recorder.popFirst(), .startedConnecting(id1))
  408. XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id1, 100))
  409. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  410. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100))
  411. // Start an RPC.
  412. let rpc = self.echo.collect()
  413. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  414. // Complete another one to make sure the previous one is known by the server.
  415. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  416. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  417. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 2, 100))
  418. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  419. // Start shutting the server down.
  420. let didShutdown = self.server!.initiateGracefulShutdown()
  421. self.server = nil // Avoid shutting down again in tearDown
  422. // Pause a moment so we know the client received the GOAWAY.
  423. let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {}
  424. XCTAssertNoThrow(try sleep.futureResult.wait())
  425. XCTAssertEqual(recorder.popFirst(), .connectionQuiescing(id1))
  426. // Finish the RPC.
  427. XCTAssertNoThrow(try rpc.sendEnd().wait())
  428. XCTAssertNoThrow(try rpc.status.wait())
  429. // Server should shutdown now.
  430. XCTAssertNoThrow(try didShutdown.wait())
  431. }
  432. func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() {
  433. final class State {
  434. private enum Storage {
  435. case idle
  436. case connecting
  437. case connected
  438. }
  439. private var state: Storage = .idle
  440. private let lock = NIOLock()
  441. var isConnected: Bool {
  442. return self.lock.withLock {
  443. switch self.state {
  444. case .connected:
  445. return true
  446. case .idle, .connecting:
  447. return false
  448. }
  449. }
  450. }
  451. func startedConnecting() {
  452. self.lock.withLock {
  453. switch self.state {
  454. case .idle:
  455. self.state = .connecting
  456. case .connecting, .connected:
  457. XCTFail("Invalid state \(self.state) for \(#function)")
  458. }
  459. }
  460. }
  461. func connected() {
  462. self.lock.withLock {
  463. switch self.state {
  464. case .connecting:
  465. self.state = .connected
  466. case .idle, .connected:
  467. XCTFail("Invalid state \(self.state) for \(#function)")
  468. }
  469. }
  470. }
  471. }
  472. let state = State()
  473. self.setUpClientAndServer(withTLS: false, threads: 1) {
  474. $0.delegate = IsConnectingDelegate { stateChange in
  475. switch stateChange {
  476. case .connecting:
  477. state.startedConnecting()
  478. case .connected:
  479. state.connected()
  480. }
  481. }
  482. }
  483. XCTAssertFalse(state.isConnected)
  484. let rpc = self.echo.get(.with { $0.text = "" })
  485. XCTAssertNoThrow(try rpc.status.wait())
  486. XCTAssertTrue(state.isConnected)
  487. // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if
  488. // a state change happens).
  489. let rpcs: [EventLoopFuture<GRPCStatus>] = (0 ..< 20).map { i in
  490. let rpc = self.echo.get(.with { $0.text = "\(i)" })
  491. return rpc.status
  492. }
  493. XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait())
  494. }
  495. func testDelegateGetsCalledWithStats() throws {
  496. let recorder = EventRecordingConnectionPoolDelegate()
  497. self.configureEventLoopGroup(threads: 4)
  498. self.startServer(withTLS: false)
  499. self.startChannel(withTLS: false) {
  500. $0.statsPeriod = .milliseconds(1)
  501. $0.delegate = recorder
  502. }
  503. let scheduled = self.group.next().scheduleTask(in: .milliseconds(100)) {
  504. _ = self.channel?.close()
  505. }
  506. try scheduled.futureResult.wait()
  507. let events = recorder.removeAll()
  508. let statsEvents = events.compactMap { event in
  509. switch event {
  510. case .stats(let stats, _):
  511. return stats
  512. default:
  513. return nil
  514. }
  515. }
  516. XCTAssertGreaterThan(statsEvents.count, 0)
  517. }
  518. }
  519. #endif // canImport(NIOSSL)