GRPCChannelPoolTests.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(NIOSSL)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import GRPCSampleData
  21. import NIOConcurrencyHelpers
  22. import NIOCore
  23. import NIOPosix
  24. import NIOSSL
  25. import XCTest
  26. #if canImport(Network)
  27. import Network
  28. import NIOTransportServices
  29. #endif
  30. final class GRPCChannelPoolTests: GRPCTestCase {
  31. private var group: (any EventLoopGroup)!
  32. private var server: Server?
  33. private var channel: GRPCChannel?
  34. private var serverPort: Int? {
  35. return self.server?.channel.localAddress?.port
  36. }
  37. private var echo: Echo_EchoNIOClient {
  38. return Echo_EchoNIOClient(channel: self.channel!)
  39. }
  40. override func tearDown() {
  41. if let channel = self.channel {
  42. XCTAssertNoThrow(try channel.close().wait())
  43. }
  44. if let server = self.server {
  45. XCTAssertNoThrow(try server.close().wait())
  46. }
  47. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  48. super.tearDown()
  49. }
  50. private enum TestEventLoopGroupType {
  51. case multiThreadedEventLoopGroup
  52. case transportServicesEventLoopGroup
  53. }
  54. private func configureEventLoopGroup(
  55. threads: Int = System.coreCount,
  56. eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup
  57. ) {
  58. switch eventLoopGroupType {
  59. case .multiThreadedEventLoopGroup:
  60. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  61. case .transportServicesEventLoopGroup:
  62. #if canImport(Network)
  63. self.group = NIOTSEventLoopGroup(loopCount: threads)
  64. #else
  65. fatalError("NIOTS is not available on this platform.")
  66. #endif
  67. }
  68. }
  69. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  70. let builder: Server.Builder
  71. if withTLS {
  72. builder = Server.usingTLSBackedByNIOSSL(
  73. on: self.group,
  74. certificateChain: [SampleCertificate.server.certificate],
  75. privateKey: SamplePrivateKey.server
  76. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  77. } else {
  78. builder = Server.insecure(group: self.group)
  79. }
  80. return
  81. builder
  82. .withLogger(self.serverLogger)
  83. .withServiceProviders([EchoProvider()])
  84. }
  85. private func startServer(withTLS: Bool = false, port: Int = 0) {
  86. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  87. .bind(host: "localhost", port: port)
  88. .wait()
  89. }
  90. private func startChannel(
  91. withTLS: Bool = false,
  92. overrideTarget targetOverride: ConnectionTarget? = nil,
  93. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  94. ) {
  95. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  96. if withTLS {
  97. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  98. trustRoots: .certificates([SampleCertificate.ca.certificate])
  99. )
  100. transportSecurity = .tls(configuration)
  101. } else {
  102. transportSecurity = .plaintext
  103. }
  104. self.channel = try! GRPCChannelPool.with(
  105. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  106. transportSecurity: transportSecurity,
  107. eventLoopGroup: self.group
  108. ) { configuration in
  109. configuration.backgroundActivityLogger = self.clientLogger
  110. configure(&configuration)
  111. }
  112. }
  113. private func setUpClientAndServer(
  114. withTLS tls: Bool,
  115. threads: Int = System.coreCount,
  116. eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup,
  117. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  118. ) {
  119. self.configureEventLoopGroup(threads: threads, eventLoopGroupType: eventLoopGroupType)
  120. self.startServer(withTLS: tls)
  121. self.startChannel(withTLS: tls) {
  122. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  123. // want to bounce off the limit as we wait for a connection to come up.
  124. $0.connectionPool.maxWaitersPerEventLoop = .max
  125. configure(&$0)
  126. }
  127. }
  128. private func doTestUnaryRPCs(count: Int) throws {
  129. var futures: [EventLoopFuture<GRPCStatus>] = []
  130. futures.reserveCapacity(count)
  131. for i in 1 ... count {
  132. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  133. let get = self.echo.get(request)
  134. futures.append(get.status)
  135. }
  136. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  137. XCTAssert(statuses.allSatisfy { $0.isOk })
  138. }
  139. func testUnaryRPCs_plaintext() throws {
  140. self.setUpClientAndServer(withTLS: false)
  141. try self.doTestUnaryRPCs(count: 100)
  142. }
  143. func testUnaryRPCs_tls() throws {
  144. self.setUpClientAndServer(withTLS: true)
  145. try self.doTestUnaryRPCs(count: 100)
  146. }
  147. private func doTestClientStreamingRPCs(count: Int) throws {
  148. var futures: [EventLoopFuture<GRPCStatus>] = []
  149. futures.reserveCapacity(count)
  150. for i in 1 ... count {
  151. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  152. let collect = self.echo.collect()
  153. collect.sendMessage(request, promise: nil)
  154. collect.sendMessage(request, promise: nil)
  155. collect.sendMessage(request, promise: nil)
  156. collect.sendEnd(promise: nil)
  157. futures.append(collect.status)
  158. }
  159. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  160. XCTAssert(statuses.allSatisfy { $0.isOk })
  161. }
  162. func testClientStreamingRPCs_plaintext() throws {
  163. self.setUpClientAndServer(withTLS: false)
  164. try self.doTestClientStreamingRPCs(count: 100)
  165. }
  166. func testClientStreamingRPCs() throws {
  167. self.setUpClientAndServer(withTLS: true)
  168. try self.doTestClientStreamingRPCs(count: 100)
  169. }
  170. private func doTestServerStreamingRPCs(count: Int) throws {
  171. var futures: [EventLoopFuture<GRPCStatus>] = []
  172. futures.reserveCapacity(count)
  173. for i in 1 ... count {
  174. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  175. let expand = self.echo.expand(request) { _ in }
  176. futures.append(expand.status)
  177. }
  178. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  179. XCTAssert(statuses.allSatisfy { $0.isOk })
  180. }
  181. func testServerStreamingRPCs_plaintext() throws {
  182. self.setUpClientAndServer(withTLS: false)
  183. try self.doTestServerStreamingRPCs(count: 100)
  184. }
  185. func testServerStreamingRPCs() throws {
  186. self.setUpClientAndServer(withTLS: true)
  187. try self.doTestServerStreamingRPCs(count: 100)
  188. }
  189. private func doTestBidiStreamingRPCs(count: Int) throws {
  190. var futures: [EventLoopFuture<GRPCStatus>] = []
  191. futures.reserveCapacity(count)
  192. for i in 1 ... count {
  193. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  194. let update = self.echo.update { _ in }
  195. update.sendMessage(request, promise: nil)
  196. update.sendMessage(request, promise: nil)
  197. update.sendMessage(request, promise: nil)
  198. update.sendEnd(promise: nil)
  199. futures.append(update.status)
  200. }
  201. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  202. XCTAssert(statuses.allSatisfy { $0.isOk })
  203. }
  204. func testBidiStreamingRPCs_plaintext() throws {
  205. self.setUpClientAndServer(withTLS: false)
  206. try self.doTestBidiStreamingRPCs(count: 100)
  207. }
  208. func testBidiStreamingRPCs() throws {
  209. self.setUpClientAndServer(withTLS: true)
  210. try self.doTestBidiStreamingRPCs(count: 100)
  211. }
  212. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  213. // 4 threads == 4 pools
  214. self.configureEventLoopGroup(threads: 4)
  215. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  216. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  217. // Tiny wait time for waiters.
  218. $0.connectionPool.maxWaitTime = .milliseconds(50)
  219. }
  220. var statuses: [EventLoopFuture<GRPCStatus>] = []
  221. statuses.reserveCapacity(40)
  222. // Queue RPCs on each loop.
  223. for eventLoop in self.group.makeIterator() {
  224. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  225. for i in 0 ..< 10 {
  226. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  227. statuses.append(get.status)
  228. }
  229. }
  230. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  231. for result in results {
  232. result.assertSuccess {
  233. XCTAssertEqual($0.code, .deadlineExceeded)
  234. }
  235. }
  236. }
  237. func testRPCsAreDistributedAcrossEventLoops() throws {
  238. self.configureEventLoopGroup(threads: 4)
  239. // We don't need a server here, but we do need a different target
  240. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  241. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  242. // never complete and streams are not returned back to pools.
  243. $0.connectionPool.maxWaitTime = .hours(1)
  244. }
  245. var echo = self.echo
  246. echo.defaultCallOptions.eventLoopPreference = .indifferent
  247. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  248. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  249. for rpcs in rpcsByEventLoop.values {
  250. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  251. XCTAssertEqual(rpcs.count, 10)
  252. }
  253. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  254. // we shutdown the pool.
  255. XCTAssertNoThrow(try self.channel?.close().wait())
  256. // Unset the channel to avoid shutting down again in tearDown().
  257. self.channel = nil
  258. for rpc in rpcs {
  259. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  260. }
  261. }
  262. func testWaiterLimitPerEventLoop() throws {
  263. self.configureEventLoopGroup(threads: 4)
  264. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  265. $0.connectionPool.maxWaitersPerEventLoop = 10
  266. $0.connectionPool.maxWaitTime = .hours(1)
  267. }
  268. let loop = self.group.next()
  269. let options = CallOptions(eventLoopPreference: .exact(loop))
  270. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  271. let rpcs = (1 ... 11).map { _ in
  272. self.echo.get(.with { $0.text = "" }, callOptions: options)
  273. }
  274. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  275. // If we express no event loop preference then we should not get the loaded loop.
  276. let indifferentLoopRPCs = (1 ... 10).map {
  277. _ in self.echo.get(.with { $0.text = "" })
  278. }
  279. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  280. }
  281. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  282. self.configureEventLoopGroup(threads: 1)
  283. self.startServer()
  284. self.startChannel {
  285. $0.connectionPool.connectionsPerEventLoop = 1
  286. $0.connectionPool.maxWaitTime = .hours(1)
  287. }
  288. let lock = NIOLock()
  289. var order = 0
  290. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  291. // batch of RPCs in one go.
  292. let warmup = self.echo.get(.with { $0.text = "" })
  293. XCTAssert(try warmup.status.wait().isOk)
  294. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  295. // wait because there's already an active connection.
  296. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in } }
  297. // The first RPC should (obviously) complete first.
  298. rpcs.first!.status.whenComplete { _ in
  299. lock.withLock {
  300. XCTAssertEqual(order, 0)
  301. order += 1
  302. }
  303. }
  304. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  305. // RPC below).
  306. rpcs.last!.status.whenComplete { _ in
  307. lock.withLock {
  308. XCTAssertEqual(order, 1)
  309. order += 1
  310. }
  311. }
  312. // Still zero: the first RPC is still active.
  313. lock.withLock { XCTAssertEqual(order, 0) }
  314. // End the first RPC.
  315. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  316. XCTAssertNoThrow(try rpcs.first!.status.wait())
  317. lock.withLock { XCTAssertEqual(order, 1) }
  318. // End the last RPC.
  319. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  320. XCTAssertNoThrow(try rpcs.last!.status.wait())
  321. lock.withLock { XCTAssertEqual(order, 2) }
  322. // End the rest.
  323. for rpc in rpcs.dropFirst().dropLast() {
  324. XCTAssertNoThrow(try rpc.sendEnd().wait())
  325. }
  326. }
  327. func testRPCOnShutdownPool() {
  328. self.configureEventLoopGroup(threads: 1)
  329. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  330. let echo = self.echo
  331. XCTAssertNoThrow(try self.channel?.close().wait())
  332. // Avoid shutting down again in tearDown()
  333. self.channel = nil
  334. let get = echo.get(.with { $0.text = "" })
  335. XCTAssertEqual(try get.status.wait().code, .unavailable)
  336. }
  337. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  338. self.configureEventLoopGroup(threads: 1)
  339. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  340. $0.connectionPool.maxWaitTime = .hours(24)
  341. }
  342. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  343. // (much) later!
  344. let options = CallOptions(timeLimit: .deadline(.now()))
  345. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  346. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  347. }
  348. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  349. // Mix and match TLS.
  350. self.configureEventLoopGroup(threads: 1)
  351. self.startServer(withTLS: false)
  352. self.startChannel(withTLS: true) {
  353. $0.connectionPool.maxWaitersPerEventLoop = 10
  354. }
  355. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  356. // expect we'll loop until a given deadline passes.
  357. let testDeadline = NIODeadline.now() + .seconds(5)
  358. var seenError = false
  359. while testDeadline > .now() {
  360. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  361. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  362. let status = try get.status.wait()
  363. XCTAssertEqual(status.code, .deadlineExceeded)
  364. if let cause = status.cause, cause is NIOSSLError {
  365. // What we expect.
  366. seenError = true
  367. break
  368. } else {
  369. // Try again.
  370. continue
  371. }
  372. }
  373. XCTAssert(seenError)
  374. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  375. // of these. (They'll fail when we tear down the pool at the end of the test.)
  376. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  377. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  378. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  379. }
  380. // Queue up one more.
  381. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  382. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  383. let status = try tooManyWaiters.status.wait()
  384. XCTAssertEqual(status.code, .resourceExhausted)
  385. if let cause = status.cause {
  386. XCTAssert(cause is NIOSSLError)
  387. } else {
  388. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  389. }
  390. }
  391. func testConnectionPoolDelegateSingleConnection() throws {
  392. let recorder = EventRecordingConnectionPoolDelegate()
  393. self.setUpClientAndServer(withTLS: false, threads: 1) {
  394. $0.delegate = recorder
  395. }
  396. let warmup = self.echo.get(.with { $0.text = "" })
  397. XCTAssertNoThrow(try warmup.status.wait())
  398. let id = try XCTUnwrap(recorder.first?.id)
  399. XCTAssertEqual(recorder.popFirst(), .connectionAdded(id))
  400. XCTAssertEqual(recorder.popFirst(), .startedConnecting(id))
  401. XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id, 100))
  402. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100))
  403. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100))
  404. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = try (1 ... 10).map { i in
  405. let rpc = self.echo.collect()
  406. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  407. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100))
  408. return rpc
  409. }
  410. for (i, rpc) in rpcs.enumerated() {
  411. XCTAssertNoThrow(try rpc.sendEnd().wait())
  412. XCTAssertNoThrow(try rpc.status.wait())
  413. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 10 - (i + 1), 100))
  414. }
  415. XCTAssertNoThrow(try self.channel?.close().wait())
  416. XCTAssertEqual(recorder.popFirst(), .connectionClosed(id))
  417. XCTAssertEqual(recorder.popFirst(), .connectionRemoved(id))
  418. XCTAssert(recorder.isEmpty)
  419. }
  420. func testConnectionPoolDelegateQuiescing() throws {
  421. let recorder = EventRecordingConnectionPoolDelegate()
  422. self.setUpClientAndServer(withTLS: false, threads: 1) {
  423. $0.delegate = recorder
  424. }
  425. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  426. let id1 = try XCTUnwrap(recorder.first?.id)
  427. XCTAssertEqual(recorder.popFirst(), .connectionAdded(id1))
  428. XCTAssertEqual(recorder.popFirst(), .startedConnecting(id1))
  429. XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id1, 100))
  430. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  431. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100))
  432. // Start an RPC.
  433. let rpc = self.echo.collect()
  434. XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait())
  435. // Complete another one to make sure the previous one is known by the server.
  436. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait())
  437. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  438. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 2, 100))
  439. XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100))
  440. // Start shutting the server down.
  441. let didShutdown = self.server!.initiateGracefulShutdown()
  442. self.server = nil // Avoid shutting down again in tearDown
  443. // Pause a moment so we know the client received the GOAWAY.
  444. let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {}
  445. XCTAssertNoThrow(try sleep.futureResult.wait())
  446. XCTAssertEqual(recorder.popFirst(), .connectionQuiescing(id1))
  447. // Finish the RPC.
  448. XCTAssertNoThrow(try rpc.sendEnd().wait())
  449. XCTAssertNoThrow(try rpc.status.wait())
  450. // Server should shutdown now.
  451. XCTAssertNoThrow(try didShutdown.wait())
  452. }
  453. func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() {
  454. final class State {
  455. private enum Storage {
  456. case idle
  457. case connecting
  458. case connected
  459. }
  460. private var state: Storage = .idle
  461. private let lock = NIOLock()
  462. var isConnected: Bool {
  463. return self.lock.withLock {
  464. switch self.state {
  465. case .connected:
  466. return true
  467. case .idle, .connecting:
  468. return false
  469. }
  470. }
  471. }
  472. func startedConnecting() {
  473. self.lock.withLock {
  474. switch self.state {
  475. case .idle:
  476. self.state = .connecting
  477. case .connecting, .connected:
  478. XCTFail("Invalid state \(self.state) for \(#function)")
  479. }
  480. }
  481. }
  482. func connected() {
  483. self.lock.withLock {
  484. switch self.state {
  485. case .connecting:
  486. self.state = .connected
  487. case .idle, .connected:
  488. XCTFail("Invalid state \(self.state) for \(#function)")
  489. }
  490. }
  491. }
  492. }
  493. let state = State()
  494. self.setUpClientAndServer(withTLS: false, threads: 1) {
  495. $0.delegate = IsConnectingDelegate { stateChange in
  496. switch stateChange {
  497. case .connecting:
  498. state.startedConnecting()
  499. case .connected:
  500. state.connected()
  501. }
  502. }
  503. }
  504. XCTAssertFalse(state.isConnected)
  505. let rpc = self.echo.get(.with { $0.text = "" })
  506. XCTAssertNoThrow(try rpc.status.wait())
  507. XCTAssertTrue(state.isConnected)
  508. // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if
  509. // a state change happens).
  510. let rpcs: [EventLoopFuture<GRPCStatus>] = (0 ..< 20).map { i in
  511. let rpc = self.echo.get(.with { $0.text = "\(i)" })
  512. return rpc.status
  513. }
  514. XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait())
  515. }
  516. func testDelegateGetsCalledWithStats() throws {
  517. let recorder = EventRecordingConnectionPoolDelegate()
  518. self.configureEventLoopGroup(threads: 4)
  519. self.startServer(withTLS: false)
  520. self.startChannel(withTLS: false) {
  521. $0.statsPeriod = .milliseconds(1)
  522. $0.delegate = recorder
  523. }
  524. let scheduled = self.group.next().scheduleTask(in: .milliseconds(100)) {
  525. _ = self.channel?.close()
  526. }
  527. try scheduled.futureResult.wait()
  528. let events = recorder.removeAll()
  529. let statsEvents = events.compactMap { event in
  530. switch event {
  531. case .stats(let stats, _):
  532. return stats
  533. default:
  534. return nil
  535. }
  536. }
  537. XCTAssertGreaterThan(statsEvents.count, 0)
  538. }
  539. #if canImport(Network)
  540. func testNWParametersConfigurator() throws {
  541. let counter = NIOLockedValueBox(0)
  542. self.setUpClientAndServer(withTLS: false, eventLoopGroupType: .transportServicesEventLoopGroup)
  543. { configuration in
  544. configuration.transportServices.nwParametersConfigurator = { _ in
  545. counter.withLockedValue { $0 += 1 }
  546. }
  547. }
  548. // Execute an RPC to make sure a channel gets created/activated and the parameters configurator run.
  549. try self.doTestUnaryRPCs(count: 1)
  550. XCTAssertEqual(1, counter.withLockedValue({ $0 }))
  551. }
  552. #endif // canImport(Network)
  553. }
  554. #endif // canImport(NIOSSL)