ClientQuiescingTests.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. internal final class ClientQuiescingTests: GRPCTestCase {
  24. private var group: EventLoopGroup!
  25. private var channel: GRPCChannel!
  26. private var server: Server!
  27. private let tracker = RPCTracker()
  28. private var echo: Echo_EchoNIOClient {
  29. return Echo_EchoNIOClient(channel: self.channel)
  30. }
  31. override func setUp() {
  32. super.setUp()
  33. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
  34. self.server = try! Server.insecure(group: self.group)
  35. .withLogger(self.serverLogger)
  36. .withServiceProviders([EchoProvider()])
  37. .bind(host: "127.0.0.1", port: 1234)
  38. .wait()
  39. }
  40. override func tearDown() {
  41. XCTAssertNoThrow(try self.server.close().wait())
  42. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  43. // We don't shutdown the client: it will have been shutdown by the test case.
  44. super.tearDown()
  45. }
  46. private func setUpClientConnection() {
  47. self.channel = ClientConnection.insecure(group: self.group)
  48. .withBackgroundActivityLogger(self.clientLogger)
  49. .connect(host: "127.0.0.1", port: self.server!.channel.localAddress!.port!)
  50. }
  51. private func setUpChannelPool(useSingleEventLoop: Bool = false) {
  52. // Only throws for TLS which we aren't using here.
  53. self.channel = try! GRPCChannelPool.with(
  54. target: .host("127.0.0.1", port: self.server!.channel.localAddress!.port!),
  55. transportSecurity: .plaintext,
  56. eventLoopGroup: useSingleEventLoop ? self.group.next() : self.group
  57. ) {
  58. $0.connectionPool.connectionsPerEventLoop = 1
  59. $0.connectionPool.maxWaitersPerEventLoop = 100
  60. $0.backgroundActivityLogger = self.clientLogger
  61. }
  62. }
  63. private enum ChannelKind {
  64. case single
  65. case pooled
  66. }
  67. private func setUpChannel(kind: ChannelKind) {
  68. switch kind {
  69. case .single:
  70. self.setUpClientConnection()
  71. case .pooled:
  72. self.setUpChannelPool()
  73. }
  74. }
  75. private func startRPC(
  76. withTracking: Bool = true
  77. ) -> ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse> {
  78. if withTracking {
  79. self.tracker.assert(.active)
  80. self.tracker.willStartRPC()
  81. }
  82. let collect = self.echo.collect(callOptions: self.callOptionsWithLogger)
  83. if withTracking {
  84. collect.status.whenSuccess { status in
  85. self.tracker.didFinishRPC()
  86. XCTAssert(status.isOk)
  87. }
  88. }
  89. return collect
  90. }
  91. private func assertConnectionEstablished() {
  92. self.tracker.assert(.active)
  93. let rpc = self.startRPC()
  94. XCTAssertNoThrow(try rpc.sendEnd().wait())
  95. XCTAssert(try rpc.status.wait().isOk)
  96. self.tracker.assert(.active)
  97. }
  98. private func gracefulShutdown(
  99. deadline: NIODeadline = .distantFuture,
  100. withTracking: Bool = true
  101. ) -> EventLoopFuture<Void> {
  102. if withTracking {
  103. self.tracker.willRequestGracefulShutdown()
  104. }
  105. let promise = self.group.next().makePromise(of: Void.self)
  106. self.channel.closeGracefully(deadline: deadline, promise: promise)
  107. if withTracking {
  108. promise.futureResult.whenComplete { _ in
  109. self.tracker.didShutdown()
  110. }
  111. }
  112. return promise.futureResult
  113. }
  114. }
  115. // MARK: - Test Helpers
  116. extension ClientQuiescingTests {
  117. private func _testQuiescingWhenIdle(channelKind kind: ChannelKind) {
  118. self.setUpChannel(kind: kind)
  119. XCTAssertNoThrow(try self.gracefulShutdown().wait())
  120. }
  121. private func _testQuiescingWithNoOutstandingRPCs(channelKind kind: ChannelKind) {
  122. self.setUpChannel(kind: kind)
  123. self.assertConnectionEstablished()
  124. XCTAssertNoThrow(try self.gracefulShutdown().wait())
  125. }
  126. private func _testQuiescingWithOneOutstandingRPC(channelKind kind: ChannelKind) {
  127. self.setUpChannel(kind: kind)
  128. self.assertConnectionEstablished()
  129. let collect = self.startRPC()
  130. XCTAssertNoThrow(try collect.sendMessage(.empty).wait())
  131. let shutdownFuture = self.gracefulShutdown()
  132. XCTAssertNoThrow(try collect.sendEnd().wait())
  133. XCTAssertNoThrow(try shutdownFuture.wait())
  134. }
  135. private func _testQuiescingWithManyOutstandingRPCs(channelKind kind: ChannelKind) {
  136. self.setUpChannel(kind: kind)
  137. self.assertConnectionEstablished()
  138. // Start a bunch of RPCs. Send a message on each to ensure it's open.
  139. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = (0 ..< 50).map { _ in
  140. self.startRPC()
  141. }
  142. for rpc in rpcs {
  143. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  144. }
  145. // Start shutting down.
  146. let shutdownFuture = self.gracefulShutdown()
  147. // All existing RPCs should continue to work. Send a message and end each.
  148. for rpc in rpcs {
  149. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  150. XCTAssertNoThrow(try rpc.sendEnd().wait())
  151. }
  152. // All RPCs should have finished so the shutdown future should complete.
  153. XCTAssertNoThrow(try shutdownFuture.wait())
  154. }
  155. private func _testQuiescingTimesOutAndFailsExistingRPC(channelKind kind: ChannelKind) {
  156. self.setUpChannel(kind: kind)
  157. self.assertConnectionEstablished()
  158. // Tracking asserts that the RPC completes successfully: we don't expect that.
  159. let rpc = self.startRPC(withTracking: false)
  160. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  161. let shutdownFuture = self.gracefulShutdown(deadline: .now() + .milliseconds(50))
  162. XCTAssertNoThrow(try shutdownFuture.wait())
  163. // RPC should fail because the shutdown deadline passed.
  164. XCTAssertThrowsError(try rpc.response.wait())
  165. }
  166. private func _testStartRPCAfterQuiescing(channelKind kind: ChannelKind) {
  167. self.setUpChannel(kind: kind)
  168. self.assertConnectionEstablished()
  169. // Start an RPC, ensure it's up and running.
  170. let rpc = self.startRPC()
  171. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  172. XCTAssertNoThrow(try rpc.initialMetadata.wait())
  173. // Start the shutdown.
  174. let shutdownFuture = self.gracefulShutdown()
  175. // Start another RPC. This should fail immediately.
  176. self.tracker.assert(.shutdownRequested)
  177. let untrackedRPC = self.startRPC(withTracking: false)
  178. XCTAssertThrowsError(try untrackedRPC.response.wait())
  179. XCTAssertFalse(try untrackedRPC.status.wait().isOk)
  180. // The existing RPC should be fine.
  181. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  182. // .. we shutdown should complete after sending end
  183. XCTAssertNoThrow(try rpc.sendEnd().wait())
  184. XCTAssertNoThrow(try shutdownFuture.wait())
  185. }
  186. private func _testStartRPCAfterShutdownCompletes(channelKind kind: ChannelKind) {
  187. self.setUpChannel(kind: kind)
  188. self.assertConnectionEstablished()
  189. XCTAssertNoThrow(try self.gracefulShutdown().wait())
  190. self.tracker.assert(.shutdown)
  191. // New RPCs should fail.
  192. let untrackedRPC = self.startRPC(withTracking: false)
  193. XCTAssertThrowsError(try untrackedRPC.response.wait())
  194. XCTAssertFalse(try untrackedRPC.status.wait().isOk)
  195. }
  196. private func _testInitiateShutdownTwice(channelKind kind: ChannelKind) {
  197. self.setUpChannel(kind: kind)
  198. self.assertConnectionEstablished()
  199. let shutdown1 = self.gracefulShutdown()
  200. // Tracking checks 'normal' paths, this path is allowed but not normal so don't track it.
  201. let shutdown2 = self.gracefulShutdown(withTracking: false)
  202. XCTAssertNoThrow(try shutdown1.wait())
  203. XCTAssertNoThrow(try shutdown2.wait())
  204. }
  205. private func _testInitiateShutdownWithPastDeadline(channelKind kind: ChannelKind) {
  206. self.setUpChannel(kind: kind)
  207. self.assertConnectionEstablished()
  208. // Start a bunch of RPCs. Send a message on each to ensure it's open.
  209. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = (0 ..< 5).map { _ in
  210. self.startRPC(withTracking: false)
  211. }
  212. for rpc in rpcs {
  213. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  214. }
  215. XCTAssertNoThrow(try self.gracefulShutdown(deadline: .distantPast).wait())
  216. for rpc in rpcs {
  217. XCTAssertThrowsError(try rpc.response.wait())
  218. }
  219. }
  220. }
  221. // MARK: - Common Tests
  222. extension ClientQuiescingTests {
  223. internal func testQuiescingWhenIdle_clientConnection() {
  224. self._testQuiescingWhenIdle(channelKind: .single)
  225. }
  226. internal func testQuiescingWithNoOutstandingRPCs_clientConnection() {
  227. self._testQuiescingWithNoOutstandingRPCs(channelKind: .single)
  228. }
  229. internal func testQuiescingWithOneOutstandingRPC_clientConnection() {
  230. self._testQuiescingWithOneOutstandingRPC(channelKind: .single)
  231. }
  232. internal func testQuiescingWithManyOutstandingRPCs_clientConnection() {
  233. self._testQuiescingWithManyOutstandingRPCs(channelKind: .single)
  234. }
  235. internal func testQuiescingTimesOutAndFailsExistingRPC_clientConnection() {
  236. self._testQuiescingTimesOutAndFailsExistingRPC(channelKind: .single)
  237. }
  238. internal func testStartRPCAfterQuiescing_clientConnection() {
  239. self._testStartRPCAfterQuiescing(channelKind: .single)
  240. }
  241. internal func testStartRPCAfterShutdownCompletes_clientConnection() {
  242. self._testStartRPCAfterShutdownCompletes(channelKind: .single)
  243. }
  244. internal func testInitiateShutdownTwice_clientConnection() {
  245. self._testInitiateShutdownTwice(channelKind: .single)
  246. }
  247. internal func testInitiateShutdownWithPastDeadline_clientConnection() {
  248. self._testInitiateShutdownWithPastDeadline(channelKind: .single)
  249. }
  250. internal func testQuiescingWhenIdle_channelPool() {
  251. self._testQuiescingWhenIdle(channelKind: .pooled)
  252. }
  253. internal func testQuiescingWithNoOutstandingRPCs_channelPool() {
  254. self._testQuiescingWithNoOutstandingRPCs(channelKind: .pooled)
  255. }
  256. internal func testQuiescingWithOneOutstandingRPC_channelPool() {
  257. self._testQuiescingWithOneOutstandingRPC(channelKind: .pooled)
  258. }
  259. internal func testQuiescingWithManyOutstandingRPCs_channelPool() {
  260. self._testQuiescingWithManyOutstandingRPCs(channelKind: .pooled)
  261. }
  262. internal func testQuiescingTimesOutAndFailsExistingRPC_channelPool() {
  263. self._testQuiescingTimesOutAndFailsExistingRPC(channelKind: .pooled)
  264. }
  265. internal func testStartRPCAfterQuiescing_channelPool() {
  266. self._testStartRPCAfterQuiescing(channelKind: .pooled)
  267. }
  268. internal func testStartRPCAfterShutdownCompletes_channelPool() {
  269. self._testStartRPCAfterShutdownCompletes(channelKind: .pooled)
  270. }
  271. internal func testInitiateShutdownTwice_channelPool() {
  272. self._testInitiateShutdownTwice(channelKind: .pooled)
  273. }
  274. internal func testInitiateShutdownWithPastDeadline_channelPool() {
  275. self._testInitiateShutdownWithPastDeadline(channelKind: .pooled)
  276. }
  277. }
  278. // MARK: - Pool Specific Tests
  279. extension ClientQuiescingTests {
  280. internal func testQuiescingTimesOutAndFailsWaiters_channelPool() throws {
  281. self.setUpChannelPool(useSingleEventLoop: true)
  282. self.assertConnectionEstablished()
  283. // We should have an established connection so we can load it up with 100 (i.e. http/2 max
  284. // concurrent streams) RPCs. These are all going to fail so we disable tracking.
  285. let rpcs: [ClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse>] = try (0 ..< 100)
  286. .map { _ in
  287. let rpc = self.startRPC(withTracking: false)
  288. XCTAssertNoThrow(try rpc.sendMessage(.empty).wait())
  289. return rpc
  290. }
  291. // Now we'll create a handful of RPCs which will be waiters. We expect these to fail too.
  292. let waitingRPCs = (0 ..< 50).map { _ in
  293. self.startRPC(withTracking: false)
  294. }
  295. // The RPCs won't complete before the deadline as we don't half close them.
  296. let closeFuture = self.gracefulShutdown(deadline: .now() + .milliseconds(50))
  297. XCTAssertNoThrow(try closeFuture.wait())
  298. // All open and waiting RPCs will fail.
  299. for rpc in rpcs {
  300. XCTAssertThrowsError(try rpc.response.wait())
  301. }
  302. for rpc in waitingRPCs {
  303. XCTAssertThrowsError(try rpc.response.wait())
  304. }
  305. }
  306. internal func testQuiescingAllowsForStreamsCreatedBeforeInitiatingShutdown() {
  307. self.setUpChannelPool(useSingleEventLoop: true)
  308. self.assertConnectionEstablished()
  309. // Each of these RPCs will create a stream 'Channel' before we initiate the shutdown but the
  310. // 'HTTP2Handler' may not know about each stream before we initiate shutdown. This test is to
  311. // validate that we allow all of these calls to run normally.
  312. let rpcsWhichShouldSucceed = (0 ..< 100).map { _ in
  313. self.startRPC()
  314. }
  315. // Initiate shutdown. The RPCs should be allowed to complete.
  316. let closeFuture = self.gracefulShutdown()
  317. // These should all fail because they were started after initiating shutdown.
  318. let rpcsWhichShouldFail = (0 ..< 100).map { _ in
  319. self.startRPC(withTracking: false)
  320. }
  321. for rpc in rpcsWhichShouldSucceed {
  322. XCTAssertNoThrow(try rpc.sendEnd().wait())
  323. XCTAssertNoThrow(try rpc.response.wait())
  324. }
  325. for rpc in rpcsWhichShouldFail {
  326. XCTAssertThrowsError(try rpc.sendEnd().wait())
  327. XCTAssertThrowsError(try rpc.response.wait())
  328. }
  329. XCTAssertNoThrow(try closeFuture.wait())
  330. }
  331. }
  332. extension ClientQuiescingTests {
  333. private final class RPCTracker {
  334. private enum _State {
  335. case active(Int)
  336. case shutdownRequested(Int)
  337. case shutdown
  338. }
  339. internal enum State {
  340. case active
  341. case shutdownRequested
  342. case shutdown
  343. }
  344. private var state = _State.active(0)
  345. private let lock = NIOLock()
  346. internal func assert(_ state: State, line: UInt = #line) {
  347. self.lock.withLock {
  348. switch (self.state, state) {
  349. case (.active, .active),
  350. (.shutdownRequested, .shutdownRequested),
  351. (.shutdown, .shutdown):
  352. ()
  353. default:
  354. XCTFail("Expected \(state) but state is \(self.state)", line: line)
  355. }
  356. }
  357. }
  358. internal func willStartRPC() {
  359. self.lock.withLock {
  360. switch self.state {
  361. case let .active(outstandingRPCs):
  362. self.state = .active(outstandingRPCs + 1)
  363. case let .shutdownRequested(outstandingRPCs):
  364. // We still increment despite the shutdown having been requested since the RPC will
  365. // fail immediately and we'll hit 'didFinishRPC'.
  366. self.state = .shutdownRequested(outstandingRPCs + 1)
  367. case .shutdown:
  368. XCTFail("Will start RPC when channel has been shutdown")
  369. }
  370. }
  371. }
  372. internal func didFinishRPC() {
  373. self.lock.withLock {
  374. switch self.state {
  375. case let .active(outstandingRPCs):
  376. XCTAssertGreaterThan(outstandingRPCs, 0)
  377. self.state = .active(outstandingRPCs - 1)
  378. case let .shutdownRequested(outstandingRPCs):
  379. XCTAssertGreaterThan(outstandingRPCs, 0)
  380. self.state = .shutdownRequested(outstandingRPCs - 1)
  381. case .shutdown:
  382. XCTFail("Finished RPC after completing shutdown")
  383. }
  384. }
  385. }
  386. internal func willRequestGracefulShutdown() {
  387. self.lock.withLock {
  388. switch self.state {
  389. case let .active(outstandingRPCs):
  390. self.state = .shutdownRequested(outstandingRPCs)
  391. case .shutdownRequested, .shutdown:
  392. XCTFail("Shutdown has already been requested or completed")
  393. }
  394. }
  395. }
  396. internal func didShutdown() {
  397. switch self.state {
  398. case let .active(outstandingRPCs):
  399. XCTFail("Shutdown completed but not requested with \(outstandingRPCs) outstanding RPCs")
  400. case let .shutdownRequested(outstandingRPCs):
  401. if outstandingRPCs != 0 {
  402. XCTFail("Shutdown completed with \(outstandingRPCs) outstanding RPCs")
  403. } else {
  404. // Expected case.
  405. self.state = .shutdown
  406. }
  407. case .shutdown:
  408. XCTFail("Already shutdown")
  409. }
  410. }
  411. }
  412. }