GRPCChannelPoolTests.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import GRPCSampleData
  20. import NIOConcurrencyHelpers
  21. import NIOCore
  22. import NIOPosix
  23. import NIOSSL
  24. import XCTest
  25. final class GRPCChannelPoolTests: GRPCTestCase {
  26. private var group: MultiThreadedEventLoopGroup!
  27. private var server: Server?
  28. private var channel: GRPCChannel?
  29. private var serverPort: Int? {
  30. return self.server?.channel.localAddress?.port
  31. }
  32. private var echo: Echo_EchoClient {
  33. return Echo_EchoClient(channel: self.channel!)
  34. }
  35. override func tearDown() {
  36. if let channel = self.channel {
  37. XCTAssertNoThrow(try channel.close().wait())
  38. }
  39. if let server = self.server {
  40. XCTAssertNoThrow(try server.close().wait())
  41. }
  42. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  43. super.tearDown()
  44. }
  45. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  46. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  47. }
  48. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  49. let builder: Server.Builder
  50. if withTLS {
  51. builder = Server.usingTLSBackedByNIOSSL(
  52. on: self.group,
  53. certificateChain: [SampleCertificate.server.certificate],
  54. privateKey: SamplePrivateKey.server
  55. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  56. } else {
  57. builder = Server.insecure(group: self.group)
  58. }
  59. return builder
  60. .withLogger(self.serverLogger)
  61. .withServiceProviders([EchoProvider()])
  62. }
  63. private func startServer(withTLS: Bool = false) {
  64. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  65. .bind(host: "localhost", port: 0)
  66. .wait()
  67. }
  68. private func startChannel(
  69. withTLS: Bool = false,
  70. overrideTarget targetOverride: ConnectionTarget? = nil,
  71. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  72. ) {
  73. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  74. if withTLS {
  75. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  76. trustRoots: .certificates([SampleCertificate.ca.certificate])
  77. )
  78. transportSecurity = .tls(configuration)
  79. } else {
  80. transportSecurity = .plaintext
  81. }
  82. self.channel = try! GRPCChannelPool.with(
  83. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  84. transportSecurity: transportSecurity,
  85. eventLoopGroup: self.group
  86. ) { configuration in
  87. configuration.backgroundActivityLogger = self.clientLogger
  88. configure(&configuration)
  89. }
  90. }
  91. private func setUpClientAndServer(withTLS tls: Bool) {
  92. self.configureEventLoopGroup()
  93. self.startServer(withTLS: tls)
  94. self.startChannel(withTLS: tls) {
  95. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  96. // want to bounce off the limit as we wait for a connection to come up.
  97. $0.connectionPool.maxWaitersPerEventLoop = .max
  98. }
  99. }
  100. private func doTestUnaryRPCs(count: Int) throws {
  101. var futures: [EventLoopFuture<GRPCStatus>] = []
  102. futures.reserveCapacity(count)
  103. for i in 1 ... count {
  104. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  105. let get = self.echo.get(request)
  106. futures.append(get.status)
  107. }
  108. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  109. XCTAssert(statuses.allSatisfy { $0.isOk })
  110. }
  111. func testUnaryRPCs_plaintext() throws {
  112. self.setUpClientAndServer(withTLS: false)
  113. try self.doTestUnaryRPCs(count: 100)
  114. }
  115. func testUnaryRPCs_tls() throws {
  116. self.setUpClientAndServer(withTLS: true)
  117. try self.doTestUnaryRPCs(count: 100)
  118. }
  119. private func doTestClientStreamingRPCs(count: Int) throws {
  120. var futures: [EventLoopFuture<GRPCStatus>] = []
  121. futures.reserveCapacity(count)
  122. for i in 1 ... count {
  123. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  124. let collect = self.echo.collect()
  125. collect.sendMessage(request, promise: nil)
  126. collect.sendMessage(request, promise: nil)
  127. collect.sendMessage(request, promise: nil)
  128. collect.sendEnd(promise: nil)
  129. futures.append(collect.status)
  130. }
  131. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  132. XCTAssert(statuses.allSatisfy { $0.isOk })
  133. }
  134. func testClientStreamingRPCs_plaintext() throws {
  135. self.setUpClientAndServer(withTLS: false)
  136. try self.doTestClientStreamingRPCs(count: 100)
  137. }
  138. func testClientStreamingRPCs() throws {
  139. self.setUpClientAndServer(withTLS: true)
  140. try self.doTestClientStreamingRPCs(count: 100)
  141. }
  142. private func doTestServerStreamingRPCs(count: Int) throws {
  143. var futures: [EventLoopFuture<GRPCStatus>] = []
  144. futures.reserveCapacity(count)
  145. for i in 1 ... count {
  146. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  147. let expand = self.echo.expand(request) { _ in }
  148. futures.append(expand.status)
  149. }
  150. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  151. XCTAssert(statuses.allSatisfy { $0.isOk })
  152. }
  153. func testServerStreamingRPCs_plaintext() throws {
  154. self.setUpClientAndServer(withTLS: false)
  155. try self.doTestServerStreamingRPCs(count: 100)
  156. }
  157. func testServerStreamingRPCs() throws {
  158. self.setUpClientAndServer(withTLS: true)
  159. try self.doTestServerStreamingRPCs(count: 100)
  160. }
  161. private func doTestBidiStreamingRPCs(count: Int) throws {
  162. var futures: [EventLoopFuture<GRPCStatus>] = []
  163. futures.reserveCapacity(count)
  164. for i in 1 ... count {
  165. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  166. let update = self.echo.update { _ in }
  167. update.sendMessage(request, promise: nil)
  168. update.sendMessage(request, promise: nil)
  169. update.sendMessage(request, promise: nil)
  170. update.sendEnd(promise: nil)
  171. futures.append(update.status)
  172. }
  173. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  174. XCTAssert(statuses.allSatisfy { $0.isOk })
  175. }
  176. func testBidiStreamingRPCs_plaintext() throws {
  177. self.setUpClientAndServer(withTLS: false)
  178. try self.doTestBidiStreamingRPCs(count: 100)
  179. }
  180. func testBidiStreamingRPCs() throws {
  181. self.setUpClientAndServer(withTLS: true)
  182. try self.doTestBidiStreamingRPCs(count: 100)
  183. }
  184. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  185. // 4 threads == 4 pools
  186. self.configureEventLoopGroup(threads: 4)
  187. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  188. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  189. // Tiny wait time for waiters.
  190. $0.connectionPool.maxWaitTime = .milliseconds(50)
  191. }
  192. var statuses: [EventLoopFuture<GRPCStatus>] = []
  193. statuses.reserveCapacity(40)
  194. // Queue RPCs on each loop.
  195. for eventLoop in self.group.makeIterator() {
  196. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  197. for i in 0 ..< 10 {
  198. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  199. statuses.append(get.status)
  200. }
  201. }
  202. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  203. for result in results {
  204. result.assertSuccess {
  205. XCTAssertEqual($0.code, .deadlineExceeded)
  206. }
  207. }
  208. }
  209. func testRPCsAreDistributedAcrossEventLoops() throws {
  210. self.configureEventLoopGroup(threads: 4)
  211. // We don't need a server here, but we do need a different target
  212. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  213. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  214. // never complete and streams are not returned back to pools.
  215. $0.connectionPool.maxWaitTime = .hours(1)
  216. }
  217. let echo = self.echo
  218. echo.defaultCallOptions.eventLoopPreference = .indifferent
  219. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  220. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  221. for rpcs in rpcsByEventLoop.values {
  222. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  223. XCTAssertEqual(rpcs.count, 10)
  224. }
  225. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  226. // we shutdown the pool.
  227. XCTAssertNoThrow(try self.channel?.close().wait())
  228. // Unset the channel to avoid shutting down again in tearDown().
  229. self.channel = nil
  230. for rpc in rpcs {
  231. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  232. }
  233. }
  234. func testWaiterLimitPerEventLoop() throws {
  235. self.configureEventLoopGroup(threads: 4)
  236. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  237. $0.connectionPool.maxWaitersPerEventLoop = 10
  238. $0.connectionPool.maxWaitTime = .hours(1)
  239. }
  240. let loop = self.group.next()
  241. let options = CallOptions(eventLoopPreference: .exact(loop))
  242. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  243. let rpcs = (1 ... 11).map { _ in
  244. self.echo.get(.with { $0.text = "" }, callOptions: options)
  245. }
  246. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  247. // If we express no event loop preference then we should not get the loaded loop.
  248. let indifferentLoopRPCs = (1 ... 10).map {
  249. _ in echo.get(.with { $0.text = "" })
  250. }
  251. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  252. }
  253. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  254. self.configureEventLoopGroup(threads: 1)
  255. self.startServer()
  256. self.startChannel {
  257. $0.connectionPool.connectionsPerEventLoop = 1
  258. $0.connectionPool.maxWaitTime = .hours(1)
  259. }
  260. let lock = Lock()
  261. var order = 0
  262. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  263. // batch of RPCs in one go.
  264. let warmup = self.echo.get(.with { $0.text = "" })
  265. XCTAssert(try warmup.status.wait().isOk)
  266. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  267. // wait because there's already an active connection.
  268. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
  269. // The first RPC should (obviously) complete first.
  270. rpcs.first!.status.whenComplete { _ in
  271. lock.withLock {
  272. XCTAssertEqual(order, 0)
  273. order += 1
  274. }
  275. }
  276. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  277. // RPC below).
  278. rpcs.last!.status.whenComplete { _ in
  279. lock.withLock {
  280. XCTAssertEqual(order, 1)
  281. order += 1
  282. }
  283. }
  284. // Still zero: the first RPC is still active.
  285. lock.withLockVoid { XCTAssertEqual(order, 0) }
  286. // End the first RPC.
  287. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  288. XCTAssertNoThrow(try rpcs.first!.status.wait())
  289. lock.withLockVoid { XCTAssertEqual(order, 1) }
  290. // End the last RPC.
  291. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  292. XCTAssertNoThrow(try rpcs.last!.status.wait())
  293. lock.withLockVoid { XCTAssertEqual(order, 2) }
  294. // End the rest.
  295. for rpc in rpcs.dropFirst().dropLast() {
  296. XCTAssertNoThrow(try rpc.sendEnd().wait())
  297. }
  298. }
  299. func testRPCOnShutdownPool() {
  300. self.configureEventLoopGroup(threads: 1)
  301. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  302. let echo = self.echo
  303. XCTAssertNoThrow(try self.channel?.close().wait())
  304. // Avoid shutting down again in tearDown()
  305. self.channel = nil
  306. let get = echo.get(.with { $0.text = "" })
  307. XCTAssertEqual(try get.status.wait().code, .unavailable)
  308. }
  309. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  310. self.configureEventLoopGroup(threads: 1)
  311. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  312. $0.connectionPool.maxWaitTime = .hours(24)
  313. }
  314. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  315. // (much) later!
  316. let options = CallOptions(timeLimit: .deadline(.now()))
  317. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  318. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  319. }
  320. func testTLSFailuresAreClearerAtTheRPCLevel() throws {
  321. // Mix and match TLS.
  322. self.configureEventLoopGroup(threads: 1)
  323. self.startServer(withTLS: false)
  324. self.startChannel(withTLS: true) {
  325. $0.connectionPool.maxWaitersPerEventLoop = 10
  326. }
  327. // We can't guarantee an error happens within a certain time limit, so if we don't see what we
  328. // expect we'll loop until a given deadline passes.
  329. let testDeadline = NIODeadline.now() + .seconds(5)
  330. var seenError = false
  331. while testDeadline > .now() {
  332. let options = CallOptions(timeLimit: .deadline(.now() + .milliseconds(50)))
  333. let get = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  334. let status = try get.status.wait()
  335. XCTAssertEqual(status.code, .deadlineExceeded)
  336. if let cause = status.cause, cause is NIOSSLError {
  337. // What we expect.
  338. seenError = true
  339. break
  340. } else {
  341. // Try again.
  342. continue
  343. }
  344. }
  345. XCTAssert(seenError)
  346. // Now queue up a bunch of RPCs to fill up the waiter queue. We don't care about the outcome
  347. // of these. (They'll fail when we tear down the pool at the end of the test.)
  348. _ = (0 ..< 10).map { i -> UnaryCall<Echo_EchoRequest, Echo_EchoResponse> in
  349. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  350. return self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  351. }
  352. // Queue up one more.
  353. let options = CallOptions(timeLimit: .deadline(.distantFuture))
  354. let tooManyWaiters = self.echo.get(.with { $0.text = "foo" }, callOptions: options)
  355. let status = try tooManyWaiters.status.wait()
  356. XCTAssertEqual(status.code, .resourceExhausted)
  357. if let cause = status.cause {
  358. XCTAssert(cause is NIOSSLError)
  359. } else {
  360. XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'")
  361. }
  362. }
  363. }