GRPCChannelPoolTests.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(NIOSSL)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import GRPCSampleData
  21. import NIOConcurrencyHelpers
  22. import NIOCore
  23. import NIOPosix
  24. import NIOSSL
  25. import XCTest
  26. final class GRPCChannelPoolTests: GRPCTestCase {
  27. private var group: MultiThreadedEventLoopGroup!
  28. private var server: Server?
  29. private var channel: GRPCChannel?
  30. private var serverPort: Int? {
  31. return self.server?.channel.localAddress?.port
  32. }
  33. private var echo: Echo_EchoClient {
  34. return Echo_EchoClient(channel: self.channel!)
  35. }
  36. override func tearDown() {
  37. if let channel = self.channel {
  38. XCTAssertNoThrow(try channel.close().wait())
  39. }
  40. if let server = self.server {
  41. XCTAssertNoThrow(try server.close().wait())
  42. }
  43. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  44. super.tearDown()
  45. }
  46. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  47. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  48. }
  49. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  50. let builder: Server.Builder
  51. if withTLS {
  52. builder = Server.usingTLSBackedByNIOSSL(
  53. on: self.group,
  54. certificateChain: [SampleCertificate.server.certificate],
  55. privateKey: SamplePrivateKey.server
  56. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  57. } else {
  58. builder = Server.insecure(group: self.group)
  59. }
  60. return builder
  61. .withLogger(self.serverLogger)
  62. .withServiceProviders([EchoProvider()])
  63. }
  64. private func startServer(withTLS: Bool = false) {
  65. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  66. .bind(host: "localhost", port: 0)
  67. .wait()
  68. }
  69. private func startChannel(
  70. withTLS: Bool = false,
  71. overrideTarget targetOverride: ConnectionTarget? = nil,
  72. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  73. ) {
  74. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  75. if withTLS {
  76. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  77. trustRoots: .certificates([SampleCertificate.ca.certificate])
  78. )
  79. transportSecurity = .tls(configuration)
  80. } else {
  81. transportSecurity = .plaintext
  82. }
  83. self.channel = try! GRPCChannelPool.with(
  84. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  85. transportSecurity: transportSecurity,
  86. eventLoopGroup: self.group
  87. ) { configuration in
  88. configuration.backgroundActivityLogger = self.clientLogger
  89. configure(&configuration)
  90. }
  91. }
  92. private func setUpClientAndServer(withTLS tls: Bool) {
  93. self.configureEventLoopGroup()
  94. self.startServer(withTLS: tls)
  95. self.startChannel(withTLS: tls) {
  96. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  97. // want to bounce off the limit as we wait for a connection to come up.
  98. $0.connectionPool.maxWaitersPerEventLoop = .max
  99. }
  100. }
  101. private func doTestUnaryRPCs(count: Int) throws {
  102. var futures: [EventLoopFuture<GRPCStatus>] = []
  103. futures.reserveCapacity(count)
  104. for i in 1 ... count {
  105. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  106. let get = self.echo.get(request)
  107. futures.append(get.status)
  108. }
  109. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  110. XCTAssert(statuses.allSatisfy { $0.isOk })
  111. }
  112. func testUnaryRPCs_plaintext() throws {
  113. self.setUpClientAndServer(withTLS: false)
  114. try self.doTestUnaryRPCs(count: 100)
  115. }
  116. func testUnaryRPCs_tls() throws {
  117. self.setUpClientAndServer(withTLS: true)
  118. try self.doTestUnaryRPCs(count: 100)
  119. }
  120. private func doTestClientStreamingRPCs(count: Int) throws {
  121. var futures: [EventLoopFuture<GRPCStatus>] = []
  122. futures.reserveCapacity(count)
  123. for i in 1 ... count {
  124. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  125. let collect = self.echo.collect()
  126. collect.sendMessage(request, promise: nil)
  127. collect.sendMessage(request, promise: nil)
  128. collect.sendMessage(request, promise: nil)
  129. collect.sendEnd(promise: nil)
  130. futures.append(collect.status)
  131. }
  132. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  133. XCTAssert(statuses.allSatisfy { $0.isOk })
  134. }
  135. func testClientStreamingRPCs_plaintext() throws {
  136. self.setUpClientAndServer(withTLS: false)
  137. try self.doTestClientStreamingRPCs(count: 100)
  138. }
  139. func testClientStreamingRPCs() throws {
  140. self.setUpClientAndServer(withTLS: true)
  141. try self.doTestClientStreamingRPCs(count: 100)
  142. }
  143. private func doTestServerStreamingRPCs(count: Int) throws {
  144. var futures: [EventLoopFuture<GRPCStatus>] = []
  145. futures.reserveCapacity(count)
  146. for i in 1 ... count {
  147. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  148. let expand = self.echo.expand(request) { _ in }
  149. futures.append(expand.status)
  150. }
  151. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  152. XCTAssert(statuses.allSatisfy { $0.isOk })
  153. }
  154. func testServerStreamingRPCs_plaintext() throws {
  155. self.setUpClientAndServer(withTLS: false)
  156. try self.doTestServerStreamingRPCs(count: 100)
  157. }
  158. func testServerStreamingRPCs() throws {
  159. self.setUpClientAndServer(withTLS: true)
  160. try self.doTestServerStreamingRPCs(count: 100)
  161. }
  162. private func doTestBidiStreamingRPCs(count: Int) throws {
  163. var futures: [EventLoopFuture<GRPCStatus>] = []
  164. futures.reserveCapacity(count)
  165. for i in 1 ... count {
  166. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  167. let update = self.echo.update { _ in }
  168. update.sendMessage(request, promise: nil)
  169. update.sendMessage(request, promise: nil)
  170. update.sendMessage(request, promise: nil)
  171. update.sendEnd(promise: nil)
  172. futures.append(update.status)
  173. }
  174. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  175. XCTAssert(statuses.allSatisfy { $0.isOk })
  176. }
  177. func testBidiStreamingRPCs_plaintext() throws {
  178. self.setUpClientAndServer(withTLS: false)
  179. try self.doTestBidiStreamingRPCs(count: 100)
  180. }
  181. func testBidiStreamingRPCs() throws {
  182. self.setUpClientAndServer(withTLS: true)
  183. try self.doTestBidiStreamingRPCs(count: 100)
  184. }
  185. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  186. // 4 threads == 4 pools
  187. self.configureEventLoopGroup(threads: 4)
  188. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  189. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  190. // Tiny wait time for waiters.
  191. $0.connectionPool.maxWaitTime = .milliseconds(50)
  192. }
  193. var statuses: [EventLoopFuture<GRPCStatus>] = []
  194. statuses.reserveCapacity(40)
  195. // Queue RPCs on each loop.
  196. for eventLoop in self.group.makeIterator() {
  197. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  198. for i in 0 ..< 10 {
  199. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  200. statuses.append(get.status)
  201. }
  202. }
  203. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  204. for result in results {
  205. result.assertSuccess {
  206. XCTAssertEqual($0.code, .deadlineExceeded)
  207. }
  208. }
  209. }
  210. func testRPCsAreDistributedAcrossEventLoops() throws {
  211. self.configureEventLoopGroup(threads: 4)
  212. // We don't need a server here, but we do need a different target
  213. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  214. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  215. // never complete and streams are not returned back to pools.
  216. $0.connectionPool.maxWaitTime = .hours(1)
  217. }
  218. let echo = self.echo
  219. echo.defaultCallOptions.eventLoopPreference = .indifferent
  220. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  221. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  222. for rpcs in rpcsByEventLoop.values {
  223. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  224. XCTAssertEqual(rpcs.count, 10)
  225. }
  226. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  227. // we shutdown the pool.
  228. XCTAssertNoThrow(try self.channel?.close().wait())
  229. // Unset the channel to avoid shutting down again in tearDown().
  230. self.channel = nil
  231. for rpc in rpcs {
  232. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  233. }
  234. }
  235. func testWaiterLimitPerEventLoop() throws {
  236. self.configureEventLoopGroup(threads: 4)
  237. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  238. $0.connectionPool.maxWaitersPerEventLoop = 10
  239. $0.connectionPool.maxWaitTime = .hours(1)
  240. }
  241. let loop = self.group.next()
  242. let options = CallOptions(eventLoopPreference: .exact(loop))
  243. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  244. let rpcs = (1 ... 11).map { _ in
  245. self.echo.get(.with { $0.text = "" }, callOptions: options)
  246. }
  247. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  248. // If we express no event loop preference then we should not get the loaded loop.
  249. let indifferentLoopRPCs = (1 ... 10).map {
  250. _ in echo.get(.with { $0.text = "" })
  251. }
  252. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  253. }
  254. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  255. self.configureEventLoopGroup(threads: 1)
  256. self.startServer()
  257. self.startChannel {
  258. $0.connectionPool.connectionsPerEventLoop = 1
  259. $0.connectionPool.maxWaitTime = .hours(1)
  260. }
  261. let lock = Lock()
  262. var order = 0
  263. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  264. // batch of RPCs in one go.
  265. let warmup = self.echo.get(.with { $0.text = "" })
  266. XCTAssert(try warmup.status.wait().isOk)
  267. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  268. // wait because there's already an active connection.
  269. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
  270. // The first RPC should (obviously) complete first.
  271. rpcs.first!.status.whenComplete { _ in
  272. lock.withLock {
  273. XCTAssertEqual(order, 0)
  274. order += 1
  275. }
  276. }
  277. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  278. // RPC below).
  279. rpcs.last!.status.whenComplete { _ in
  280. lock.withLock {
  281. XCTAssertEqual(order, 1)
  282. order += 1
  283. }
  284. }
  285. // Still zero: the first RPC is still active.
  286. lock.withLockVoid { XCTAssertEqual(order, 0) }
  287. // End the first RPC.
  288. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  289. XCTAssertNoThrow(try rpcs.first!.status.wait())
  290. lock.withLockVoid { XCTAssertEqual(order, 1) }
  291. // End the last RPC.
  292. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  293. XCTAssertNoThrow(try rpcs.last!.status.wait())
  294. lock.withLockVoid { XCTAssertEqual(order, 2) }
  295. // End the rest.
  296. for rpc in rpcs.dropFirst().dropLast() {
  297. XCTAssertNoThrow(try rpc.sendEnd().wait())
  298. }
  299. }
  300. func testRPCOnShutdownPool() {
  301. self.configureEventLoopGroup(threads: 1)
  302. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  303. let echo = self.echo
  304. XCTAssertNoThrow(try self.channel?.close().wait())
  305. // Avoid shutting down again in tearDown()
  306. self.channel = nil
  307. let get = echo.get(.with { $0.text = "" })
  308. XCTAssertEqual(try get.status.wait().code, .unavailable)
  309. }
  310. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  311. self.configureEventLoopGroup(threads: 1)
  312. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  313. $0.connectionPool.maxWaitTime = .hours(24)
  314. }
  315. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  316. // (much) later!
  317. let options = CallOptions(timeLimit: .deadline(.now()))
  318. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  319. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  320. }
  321. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  322. // Mix and match TLS.
  323. self.configureEventLoopGroup(threads: 1)
  324. self.startServer(withTLS: false)
  325. self.startChannel(withTLS: true) {
  326. $0.connectionPool.maxWaitersPerEventLoop = 10
  327. }
  328. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  329. // expect we'll loop until a given deadline passes.
  330. let testDeadline = NIODeadline.now() + .seconds(5)
  331. var seenError = false
  332. while testDeadline > .now() {
  333. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  334. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  335. let status = try get.status.wait()
  336. XCTAssertEqual(status.code, .deadlineExceeded)
  337. if let cause = status.cause, cause is NIOSSLError {
  338. // What we expect.
  339. seenError = true
  340. break
  341. } else {
  342. // Try again.
  343. continue
  344. }
  345. }
  346. XCTAssert(seenError)
  347. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  348. // of these. (They'll fail when we tear down the pool at the end of the test.)
  349. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  350. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  351. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  352. }
  353. // Queue up one more.
  354. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  355. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  356. let status = try tooManyWaiters.status.wait()
  357. XCTAssertEqual(status.code, .resourceExhausted)
  358. if let cause = status.cause {
  359. XCTAssert(cause is NIOSSLError)
  360. } else {
  361. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  362. }
  363. }
  364. }
  365. #endif // canImport(NIOSSL)