GRPCChannelPoolTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import GRPCSampleData
  20. import NIO
  21. import NIOConcurrencyHelpers
  22. import XCTest
  23. final class GRPCChannelPoolTests: GRPCTestCase {
  24. private var group: MultiThreadedEventLoopGroup!
  25. private var server: Server?
  26. private var channel: GRPCChannel?
  27. private var serverPort: Int? {
  28. return self.server?.channel.localAddress?.port
  29. }
  30. private var echo: Echo_EchoClient {
  31. return Echo_EchoClient(channel: self.channel!)
  32. }
  33. override func tearDown() {
  34. if let channel = self.channel {
  35. XCTAssertNoThrow(try channel.close().wait())
  36. }
  37. if let server = self.server {
  38. XCTAssertNoThrow(try server.close().wait())
  39. }
  40. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  41. super.tearDown()
  42. }
  43. private func configureEventLoopGroup(threads: Int = System.coreCount) {
  44. self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
  45. }
  46. private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
  47. let builder: Server.Builder
  48. if withTLS {
  49. builder = Server.usingTLSBackedByNIOSSL(
  50. on: self.group,
  51. certificateChain: [SampleCertificate.server.certificate],
  52. privateKey: SamplePrivateKey.server
  53. ).withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  54. } else {
  55. builder = Server.insecure(group: self.group)
  56. }
  57. return builder
  58. .withLogger(self.serverLogger)
  59. .withServiceProviders([EchoProvider()])
  60. }
  61. private func startServer(withTLS: Bool = false) {
  62. self.server = try! self.makeServerBuilder(withTLS: withTLS)
  63. .bind(host: "localhost", port: 0)
  64. .wait()
  65. }
  66. private func startChannel(
  67. withTLS: Bool = false,
  68. overrideTarget targetOverride: ConnectionTarget? = nil,
  69. _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  70. ) {
  71. let transportSecurity: GRPCChannelPool.Configuration.TransportSecurity
  72. if withTLS {
  73. let configuration = GRPCTLSConfiguration.makeClientConfigurationBackedByNIOSSL(
  74. trustRoots: .certificates([SampleCertificate.ca.certificate])
  75. )
  76. transportSecurity = .tls(configuration)
  77. } else {
  78. transportSecurity = .plaintext
  79. }
  80. self.channel = try! GRPCChannelPool.with(
  81. target: targetOverride ?? .hostAndPort("localhost", self.serverPort!),
  82. transportSecurity: transportSecurity,
  83. eventLoopGroup: self.group
  84. ) { configuration in
  85. configuration.backgroundActivityLogger = self.clientLogger
  86. configure(&configuration)
  87. }
  88. }
  89. private func setUpClientAndServer(withTLS tls: Bool) {
  90. self.configureEventLoopGroup()
  91. self.startServer(withTLS: tls)
  92. self.startChannel(withTLS: tls) {
  93. // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
  94. // want to bounce off the limit as we wait for a connection to come up.
  95. $0.connectionPool.maxWaitersPerEventLoop = .max
  96. }
  97. }
  98. private func doTestUnaryRPCs(count: Int) throws {
  99. var futures: [EventLoopFuture<GRPCStatus>] = []
  100. futures.reserveCapacity(count)
  101. for i in 1 ... count {
  102. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  103. let get = self.echo.get(request)
  104. futures.append(get.status)
  105. }
  106. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  107. XCTAssert(statuses.allSatisfy { $0.isOk })
  108. }
  109. func testUnaryRPCs_plaintext() throws {
  110. self.setUpClientAndServer(withTLS: false)
  111. try self.doTestUnaryRPCs(count: 100)
  112. }
  113. func testUnaryRPCs_tls() throws {
  114. self.setUpClientAndServer(withTLS: true)
  115. try self.doTestUnaryRPCs(count: 100)
  116. }
  117. private func doTestClientStreamingRPCs(count: Int) throws {
  118. var futures: [EventLoopFuture<GRPCStatus>] = []
  119. futures.reserveCapacity(count)
  120. for i in 1 ... count {
  121. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  122. let collect = self.echo.collect()
  123. collect.sendMessage(request, promise: nil)
  124. collect.sendMessage(request, promise: nil)
  125. collect.sendMessage(request, promise: nil)
  126. collect.sendEnd(promise: nil)
  127. futures.append(collect.status)
  128. }
  129. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  130. XCTAssert(statuses.allSatisfy { $0.isOk })
  131. }
  132. func testClientStreamingRPCs_plaintext() throws {
  133. self.setUpClientAndServer(withTLS: false)
  134. try self.doTestClientStreamingRPCs(count: 100)
  135. }
  136. func testClientStreamingRPCs() throws {
  137. self.setUpClientAndServer(withTLS: true)
  138. try self.doTestClientStreamingRPCs(count: 100)
  139. }
  140. private func doTestServerStreamingRPCs(count: Int) throws {
  141. var futures: [EventLoopFuture<GRPCStatus>] = []
  142. futures.reserveCapacity(count)
  143. for i in 1 ... count {
  144. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  145. let expand = self.echo.expand(request) { _ in }
  146. futures.append(expand.status)
  147. }
  148. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  149. XCTAssert(statuses.allSatisfy { $0.isOk })
  150. }
  151. func testServerStreamingRPCs_plaintext() throws {
  152. self.setUpClientAndServer(withTLS: false)
  153. try self.doTestServerStreamingRPCs(count: 100)
  154. }
  155. func testServerStreamingRPCs() throws {
  156. self.setUpClientAndServer(withTLS: true)
  157. try self.doTestServerStreamingRPCs(count: 100)
  158. }
  159. private func doTestBidiStreamingRPCs(count: Int) throws {
  160. var futures: [EventLoopFuture<GRPCStatus>] = []
  161. futures.reserveCapacity(count)
  162. for i in 1 ... count {
  163. let request = Echo_EchoRequest.with { $0.text = String(describing: i) }
  164. let update = self.echo.update { _ in }
  165. update.sendMessage(request, promise: nil)
  166. update.sendMessage(request, promise: nil)
  167. update.sendMessage(request, promise: nil)
  168. update.sendEnd(promise: nil)
  169. futures.append(update.status)
  170. }
  171. let statuses = try EventLoopFuture.whenAllSucceed(futures, on: self.group.next()).wait()
  172. XCTAssert(statuses.allSatisfy { $0.isOk })
  173. }
  174. func testBidiStreamingRPCs_plaintext() throws {
  175. self.setUpClientAndServer(withTLS: false)
  176. try self.doTestBidiStreamingRPCs(count: 100)
  177. }
  178. func testBidiStreamingRPCs() throws {
  179. self.setUpClientAndServer(withTLS: true)
  180. try self.doTestBidiStreamingRPCs(count: 100)
  181. }
  182. func testWaitersTimeoutWhenNoConnectionCannotBeEstablished() throws {
  183. // 4 threads == 4 pools
  184. self.configureEventLoopGroup(threads: 4)
  185. // Don't start a server; override the target (otherwise we'll fail to unwrap `serverPort`).
  186. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  187. // Tiny wait time for waiters.
  188. $0.connectionPool.maxWaitTime = .milliseconds(50)
  189. }
  190. var statuses: [EventLoopFuture<GRPCStatus>] = []
  191. statuses.reserveCapacity(40)
  192. // Queue RPCs on each loop.
  193. for eventLoop in self.group.makeIterator() {
  194. let options = CallOptions(eventLoopPreference: .exact(eventLoop))
  195. for i in 0 ..< 10 {
  196. let get = self.echo.get(.with { $0.text = String(describing: i) }, callOptions: options)
  197. statuses.append(get.status)
  198. }
  199. }
  200. let results = try EventLoopFuture.whenAllComplete(statuses, on: self.group.next()).wait()
  201. for result in results {
  202. result.assertSuccess {
  203. XCTAssertEqual($0.code, .deadlineExceeded)
  204. }
  205. }
  206. }
  207. func testRPCsAreDistributedAcrossEventLoops() throws {
  208. self.configureEventLoopGroup(threads: 4)
  209. // We don't need a server here, but we do need a different target
  210. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  211. // Increase the max wait time: we're relying on the server will never coming up, so the RPCs
  212. // never complete and streams are not returned back to pools.
  213. $0.connectionPool.maxWaitTime = .hours(1)
  214. }
  215. let echo = self.echo
  216. echo.defaultCallOptions.eventLoopPreference = .indifferent
  217. let rpcs = (0 ..< 40).map { _ in echo.update { _ in } }
  218. let rpcsByEventLoop = Dictionary(grouping: rpcs, by: { ObjectIdentifier($0.eventLoop) })
  219. for rpcs in rpcsByEventLoop.values {
  220. // 40 RPCs over 4 ELs should be 10 RPCs per EL.
  221. XCTAssertEqual(rpcs.count, 10)
  222. }
  223. // All RPCs are waiting for connections since we never brought up a server. Each will fail when
  224. // we shutdown the pool.
  225. XCTAssertNoThrow(try self.channel?.close().wait())
  226. // Unset the channel to avoid shutting down again in tearDown().
  227. self.channel = nil
  228. for rpc in rpcs {
  229. XCTAssertEqual(try rpc.status.wait().code, .unavailable)
  230. }
  231. }
  232. func testWaiterLimitPerEventLoop() throws {
  233. self.configureEventLoopGroup(threads: 4)
  234. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  235. $0.connectionPool.maxWaitersPerEventLoop = 10
  236. $0.connectionPool.maxWaitTime = .hours(1)
  237. }
  238. let loop = self.group.next()
  239. let options = CallOptions(eventLoopPreference: .exact(loop))
  240. // The first 10 will be waiting for the connection. The 11th should be failed immediately.
  241. let rpcs = (1 ... 11).map { _ in
  242. self.echo.get(.with { $0.text = "" }, callOptions: options)
  243. }
  244. XCTAssertEqual(try rpcs.last?.status.wait().code, .resourceExhausted)
  245. // If we express no event loop preference then we should not get the loaded loop.
  246. let indifferentLoopRPCs = (1 ... 10).map {
  247. _ in echo.get(.with { $0.text = "" })
  248. }
  249. XCTAssert(indifferentLoopRPCs.map { $0.eventLoop }.allSatisfy { $0 !== loop })
  250. }
  251. func testWaitingRPCStartsWhenStreamCapacityIsAvailable() throws {
  252. self.configureEventLoopGroup(threads: 1)
  253. self.startServer()
  254. self.startChannel {
  255. $0.connectionPool.connectionsPerEventLoop = 1
  256. $0.connectionPool.maxWaitTime = .hours(1)
  257. }
  258. let lock = Lock()
  259. var order = 0
  260. // We need a connection to be up and running to avoid hitting the waiter limit when creating a
  261. // batch of RPCs in one go.
  262. let warmup = self.echo.get(.with { $0.text = "" })
  263. XCTAssert(try warmup.status.wait().isOk)
  264. // MAX_CONCURRENT_STREAMS should be 100, we'll create 101 RPCs, 100 of which should not have to
  265. // wait because there's already an active connection.
  266. let rpcs = (0 ..< 101).map { _ in self.echo.update { _ in }}
  267. // The first RPC should (obviously) complete first.
  268. rpcs.first!.status.whenComplete { _ in
  269. lock.withLock {
  270. XCTAssertEqual(order, 0)
  271. order += 1
  272. }
  273. }
  274. // The 101st RPC will complete once the first is completed (we explicitly terminate the 1st
  275. // RPC below).
  276. rpcs.last!.status.whenComplete { _ in
  277. lock.withLock {
  278. XCTAssertEqual(order, 1)
  279. order += 1
  280. }
  281. }
  282. // Still zero: the first RPC is still active.
  283. lock.withLockVoid { XCTAssertEqual(order, 0) }
  284. // End the first RPC.
  285. XCTAssertNoThrow(try rpcs.first!.sendEnd().wait())
  286. XCTAssertNoThrow(try rpcs.first!.status.wait())
  287. lock.withLockVoid { XCTAssertEqual(order, 1) }
  288. // End the last RPC.
  289. XCTAssertNoThrow(try rpcs.last!.sendEnd().wait())
  290. XCTAssertNoThrow(try rpcs.last!.status.wait())
  291. lock.withLockVoid { XCTAssertEqual(order, 2) }
  292. // End the rest.
  293. for rpc in rpcs.dropFirst().dropLast() {
  294. XCTAssertNoThrow(try rpc.sendEnd().wait())
  295. }
  296. }
  297. func testRPCOnShutdownPool() {
  298. self.configureEventLoopGroup(threads: 1)
  299. self.startChannel(overrideTarget: .unixDomainSocket("/ignored"))
  300. let echo = self.echo
  301. XCTAssertNoThrow(try self.channel?.close().wait())
  302. // Avoid shutting down again in tearDown()
  303. self.channel = nil
  304. let get = echo.get(.with { $0.text = "" })
  305. XCTAssertEqual(try get.status.wait().code, .unavailable)
  306. }
  307. func testCallDeadlineIsUsedIfSoonerThanWaitingDeadline() {
  308. self.configureEventLoopGroup(threads: 1)
  309. self.startChannel(overrideTarget: .unixDomainSocket("/nope")) {
  310. $0.connectionPool.maxWaitTime = .hours(24)
  311. }
  312. // Deadline is sooner than the 24 hour waiter time, we expect to time out sooner rather than
  313. // (much) later!
  314. let options = CallOptions(timeLimit: .deadline(.now()))
  315. let timedOutOnOwnDeadline = self.echo.get(.with { $0.text = "" }, callOptions: options)
  316. XCTAssertEqual(try timedOutOnOwnDeadline.status.wait().code, .deadlineExceeded)
  317. }
  318. }