GRPCChannelPoolTests.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import GRPCSampleData
  20. import NIO
  21. import NIOConcurrencyHelpers
  22. import NIOSSL
  23. import XCTest
  24. final class GRPCChannelPoolTests: GRPCTestCase {
  25. private var group: MultiThreadedEventLoopGroup!
  26. private var server: Server?
  27. private var channel: GRPCChannel?
  28. private var serverPort: Int? {
  29. return self.server?.channel.localAddress?.port
  30. }
  31. private var echo: Echo_EchoClient {
  32. return Echo_EchoClient(channel: self.channel!)
  33. }
  34. override func tearDown() {
  35. if let channel = self.channel {
  36. XCTAssertNoThrow(try channel.close().wait())
  37. }
  38. if let server = self.server {
  39. XCTAssertNoThrow(try server.close().wait())
  40. }
  41. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  42. super.tearDown()
  43. }
  44. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  45. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  46. }
  47. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  48. let builder: Server.Builder
  49. if withTLS {
  50. builder = Server.usingTLSBackedByNIOSSL(
  51. on: self.group,
  52. certificateChain: [SampleCertificate.server.certificate],
  53. privateKey: SamplePrivateKey.server
  54. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  55. } else {
  56. builder = Server.insecure(group: self.group)
  57. }
  58. return builder
  59. .withLogger(self.serverLogger)
  60. .withServiceProviders([EchoProvider()])
  61. }
  62. private func startServer(withTLS: Bool = false) {
  63. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  64. .bind(host: "localhost", port: 0)
  65. .wait()
  66. }
  67. private func startChannel(
  68. withTLS: Bool = false,
  69. overrideTarget targetOverride: ConnectionTarget? = nil,
  70. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  71. ) {
  72. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  73. if withTLS {
  74. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  75. trustRoots: .certificates([SampleCertificate.ca.certificate])
  76. )
  77. transportSecurity = .tls(configuration)
  78. } else {
  79. transportSecurity = .plaintext
  80. }
  81. self.channel = try! GRPCChannelPool.with(
  82. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  83. transportSecurity: transportSecurity,
  84. eventLoopGroup: self.group
  85. ) { configuration in
  86. configuration.backgroundActivityLogger = self.clientLogger
  87. configure(&configuration)
  88. }
  89. }
  90. private func setUpClientAndServer(withTLS tls: Bool) {
  91. self.configureEventLoopGroup()
  92. self.startServer(withTLS: tls)
  93. self.startChannel(withTLS: tls) {
  94. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  95. // want to bounce off the limit as we wait for a connection to come up.
  96. $0.connectionPool.maxWaitersPerEventLoop = .max
  97. }
  98. }
  99. private func doTestUnaryRPCs(count: Int) throws {
  100. var futures: [EventLoopFuture<GRPCStatus>] = []
  101. futures.reserveCapacity(count)
  102. for i in 1 ... count {
  103. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  104. let get = self.echo.get(request)
  105. futures.append(get.status)
  106. }
  107. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  108. XCTAssert(statuses.allSatisfy { $0.isOk })
  109. }
  110. func testUnaryRPCs_plaintext() throws {
  111. self.setUpClientAndServer(withTLS: false)
  112. try self.doTestUnaryRPCs(count: 100)
  113. }
  114. func testUnaryRPCs_tls() throws {
  115. self.setUpClientAndServer(withTLS: true)
  116. try self.doTestUnaryRPCs(count: 100)
  117. }
  118. private func doTestClientStreamingRPCs(count: Int) throws {
  119. var futures: [EventLoopFuture<GRPCStatus>] = []
  120. futures.reserveCapacity(count)
  121. for i in 1 ... count {
  122. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  123. let collect = self.echo.collect()
  124. collect.sendMessage(request, promise: nil)
  125. collect.sendMessage(request, promise: nil)
  126. collect.sendMessage(request, promise: nil)
  127. collect.sendEnd(promise: nil)
  128. futures.append(collect.status)
  129. }
  130. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  131. XCTAssert(statuses.allSatisfy { $0.isOk })
  132. }
  133. func testClientStreamingRPCs_plaintext() throws {
  134. self.setUpClientAndServer(withTLS: false)
  135. try self.doTestClientStreamingRPCs(count: 100)
  136. }
  137. func testClientStreamingRPCs() throws {
  138. self.setUpClientAndServer(withTLS: true)
  139. try self.doTestClientStreamingRPCs(count: 100)
  140. }
  141. private func doTestServerStreamingRPCs(count: Int) throws {
  142. var futures: [EventLoopFuture<GRPCStatus>] = []
  143. futures.reserveCapacity(count)
  144. for i in 1 ... count {
  145. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  146. let expand = self.echo.expand(request) { _ in }
  147. futures.append(expand.status)
  148. }
  149. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  150. XCTAssert(statuses.allSatisfy { $0.isOk })
  151. }
  152. func testServerStreamingRPCs_plaintext() throws {
  153. self.setUpClientAndServer(withTLS: false)
  154. try self.doTestServerStreamingRPCs(count: 100)
  155. }
  156. func testServerStreamingRPCs() throws {
  157. self.setUpClientAndServer(withTLS: true)
  158. try self.doTestServerStreamingRPCs(count: 100)
  159. }
  160. private func doTestBidiStreamingRPCs(count: Int) throws {
  161. var futures: [EventLoopFuture<GRPCStatus>] = []
  162. futures.reserveCapacity(count)
  163. for i in 1 ... count {
  164. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  165. let update = self.echo.update { _ in }
  166. update.sendMessage(request, promise: nil)
  167. update.sendMessage(request, promise: nil)
  168. update.sendMessage(request, promise: nil)
  169. update.sendEnd(promise: nil)
  170. futures.append(update.status)
  171. }
  172. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  173. XCTAssert(statuses.allSatisfy { $0.isOk })
  174. }
  175. func testBidiStreamingRPCs_plaintext() throws {
  176. self.setUpClientAndServer(withTLS: false)
  177. try self.doTestBidiStreamingRPCs(count: 100)
  178. }
  179. func testBidiStreamingRPCs() throws {
  180. self.setUpClientAndServer(withTLS: true)
  181. try self.doTestBidiStreamingRPCs(count: 100)
  182. }
  183. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  184. // 4 threads == 4 pools
  185. self.configureEventLoopGroup(threads: 4)
  186. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  187. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  188. // Tiny wait time for waiters.
  189. $0.connectionPool.maxWaitTime = .milliseconds(50)
  190. }
  191. var statuses: [EventLoopFuture<GRPCStatus>] = []
  192. statuses.reserveCapacity(40)
  193. // Queue RPCs on each loop.
  194. for eventLoop in self.group.makeIterator() {
  195. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  196. for i in 0 ..< 10 {
  197. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  198. statuses.append(get.status)
  199. }
  200. }
  201. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  202. for result in results {
  203. result.assertSuccess {
  204. XCTAssertEqual($0.code, .deadlineExceeded)
  205. }
  206. }
  207. }
  208. func testRPCsAreDistributedAcrossEventLoops() throws {
  209. self.configureEventLoopGroup(threads: 4)
  210. // We don't need a server here, but we do need a different target
  211. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  212. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  213. // never complete and streams are not returned back to pools.
  214. $0.connectionPool.maxWaitTime = .hours(1)
  215. }
  216. let echo = self.echo
  217. echo.defaultCallOptions.eventLoopPreference = .indifferent
  218. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  219. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  220. for rpcs in rpcsByEventLoop.values {
  221. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  222. XCTAssertEqual(rpcs.count, 10)
  223. }
  224. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  225. // we shutdown the pool.
  226. XCTAssertNoThrow(try self.channel?.close().wait())
  227. // Unset the channel to avoid shutting down again in tearDown().
  228. self.channel = nil
  229. for rpc in rpcs {
  230. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  231. }
  232. }
  233. func testWaiterLimitPerEventLoop() throws {
  234. self.configureEventLoopGroup(threads: 4)
  235. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  236. $0.connectionPool.maxWaitersPerEventLoop = 10
  237. $0.connectionPool.maxWaitTime = .hours(1)
  238. }
  239. let loop = self.group.next()
  240. let options = CallOptions(eventLoopPreference: .exact(loop))
  241. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  242. let rpcs = (1 ... 11).map { _ in
  243. self.echo.get(.with { $0.text = "" }, callOptions: options)
  244. }
  245. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  246. // If we express no event loop preference then we should not get the loaded loop.
  247. let indifferentLoopRPCs = (1 ... 10).map {
  248. _ in echo.get(.with { $0.text = "" })
  249. }
  250. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  251. }
  252. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  253. self.configureEventLoopGroup(threads: 1)
  254. self.startServer()
  255. self.startChannel {
  256. $0.connectionPool.connectionsPerEventLoop = 1
  257. $0.connectionPool.maxWaitTime = .hours(1)
  258. }
  259. let lock = Lock()
  260. var order = 0
  261. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  262. // batch of RPCs in one go.
  263. let warmup = self.echo.get(.with { $0.text = "" })
  264. XCTAssert(try warmup.status.wait().isOk)
  265. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  266. // wait because there's already an active connection.
  267. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
  268. // The first RPC should (obviously) complete first.
  269. rpcs.first!.status.whenComplete { _ in
  270. lock.withLock {
  271. XCTAssertEqual(order, 0)
  272. order += 1
  273. }
  274. }
  275. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  276. // RPC below).
  277. rpcs.last!.status.whenComplete { _ in
  278. lock.withLock {
  279. XCTAssertEqual(order, 1)
  280. order += 1
  281. }
  282. }
  283. // Still zero: the first RPC is still active.
  284. lock.withLockVoid { XCTAssertEqual(order, 0) }
  285. // End the first RPC.
  286. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  287. XCTAssertNoThrow(try rpcs.first!.status.wait())
  288. lock.withLockVoid { XCTAssertEqual(order, 1) }
  289. // End the last RPC.
  290. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  291. XCTAssertNoThrow(try rpcs.last!.status.wait())
  292. lock.withLockVoid { XCTAssertEqual(order, 2) }
  293. // End the rest.
  294. for rpc in rpcs.dropFirst().dropLast() {
  295. XCTAssertNoThrow(try rpc.sendEnd().wait())
  296. }
  297. }
  298. func testRPCOnShutdownPool() {
  299. self.configureEventLoopGroup(threads: 1)
  300. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  301. let echo = self.echo
  302. XCTAssertNoThrow(try self.channel?.close().wait())
  303. // Avoid shutting down again in tearDown()
  304. self.channel = nil
  305. let get = echo.get(.with { $0.text = "" })
  306. XCTAssertEqual(try get.status.wait().code, .unavailable)
  307. }
  308. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  309. self.configureEventLoopGroup(threads: 1)
  310. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  311. $0.connectionPool.maxWaitTime = .hours(24)
  312. }
  313. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  314. // (much) later!
  315. let options = CallOptions(timeLimit: .deadline(.now()))
  316. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  317. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  318. }
  319. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  320. // Mix and match TLS.
  321. self.configureEventLoopGroup(threads: 1)
  322. self.startServer(withTLS: false)
  323. self.startChannel(withTLS: true) {
  324. $0.connectionPool.maxWaitersPerEventLoop = 10
  325. }
  326. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  327. // expect we'll loop until a given deadline passes.
  328. let testDeadline = NIODeadline.now() + .seconds(5)
  329. var seenError = false
  330. while testDeadline > .now() {
  331. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  332. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  333. let status = try get.status.wait()
  334. XCTAssertEqual(status.code, .deadlineExceeded)
  335. if let cause = status.cause, cause is NIOSSLError {
  336. // What we expect.
  337. seenError = true
  338. break
  339. } else {
  340. // Try again.
  341. continue
  342. }
  343. }
  344. XCTAssert(seenError)
  345. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  346. // of these. (They'll fail when we tear down the pool at the end of the test.)
  347. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  348. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  349. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  350. }
  351. // Queue up one more.
  352. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  353. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  354. let status = try tooManyWaiters.status.wait()
  355. XCTAssertEqual(status.code, .resourceExhausted)
  356. if let cause = status.cause {
  357. XCTAssert(cause is NIOSSLError)
  358. } else {
  359. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  360. }
  361. }
  362. }