MaxAgeTests.swift 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * Copyright 2025, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. final class MaxAgeTests: XCTestCase {
  24. private func withEchoClient(
  25. group: any EventLoopGroup,
  26. configure: (inout GRPCChannelPool.Configuration) -> Void,
  27. test: (Echo_EchoNIOClient) throws -> Void
  28. ) throws {
  29. let eventLoop = MultiThreadedEventLoopGroup.singleton.next()
  30. let server = try Server.insecure(group: group)
  31. .withServiceProviders([EchoProvider()])
  32. .bind(host: "127.0.0.1", port: 0)
  33. .wait()
  34. defer {
  35. try? server.close().wait()
  36. }
  37. let port = server.channel.localAddress!.port!
  38. let pool = try GRPCChannelPool.with(
  39. target: .host("127.0.0.1", port: port),
  40. transportSecurity: .plaintext,
  41. eventLoopGroup: eventLoop,
  42. configure
  43. )
  44. defer {
  45. try? pool.close().wait()
  46. }
  47. try test(Echo_EchoNIOClient(channel: pool))
  48. }
  49. func testMaxAgeIsRespected() throws {
  50. // Verifies that the max-age config is respected by using the connection pool delegate to
  51. // start new RPCs when each connection closes (which close by aging out). It'll also record
  52. // various events that happen as part of the lifecycle of each connection.
  53. // The pool creates one sub-pool per event loop. Use a single loop to simplify connection
  54. // counting.
  55. let eventLoop = MultiThreadedEventLoopGroup.singleton.next()
  56. let done = eventLoop.makePromise(of: [RPCOnConnectionClosedDelegate.Event].self)
  57. let iterations = 2
  58. let delegate = RPCOnConnectionClosedDelegate(iterations: iterations, done: done)
  59. // This needs to be relatively short so the test doesn't take too long but not so short that
  60. // the connection is closed before it's actually used.
  61. let maxConnectionAge: TimeAmount = .milliseconds(50)
  62. try withEchoClient(group: eventLoop) { config in
  63. config.maxConnectionAge = maxConnectionAge
  64. config.delegate = delegate
  65. } test: { echo in
  66. // This creates a retain cycle (delegate → echo → channel → delegate), break it when the
  67. // test is done.
  68. delegate.setEcho(echo)
  69. defer { delegate.setEcho(nil) }
  70. let startTime = NIODeadline.now()
  71. // Do an RPC to kick things off.
  72. let rpc = try echo.get(.with { $0.text = "hello" }).response.wait()
  73. XCTAssertEqual(rpc.text, "Swift echo get: hello")
  74. // Wait for the delegate to finish driving the RPCs.
  75. let events = try done.futureResult.wait()
  76. let endTime = NIODeadline.now()
  77. // Add an iteration as one is done by the test (as opposed to the delegate). Each iteration
  78. // has three events: connected, quiescing, closed.
  79. XCTAssertEqual(events.count, (iterations + 1) * 3)
  80. // Check each triplet is as expected: connected, quiescing, then closed.
  81. for startIndex in stride(from: events.startIndex, to: events.endIndex, by: 3) {
  82. switch (events[startIndex], events[startIndex + 1], events[startIndex + 2]) {
  83. case (.connectSucceeded(let id1), .connectionQuiescing(let id2), .connectionClosed(let id3)):
  84. XCTAssertEqual(id1, id2)
  85. XCTAssertEqual(id2, id3)
  86. default:
  87. XCTFail("Invalid event triplet: \(events[startIndex ... startIndex + 2])")
  88. }
  89. }
  90. // Check the duration was in the right ballpark.
  91. let duration = (endTime - startTime)
  92. let minDuration = iterations * maxConnectionAge
  93. XCTAssertGreaterThanOrEqual(duration, minDuration)
  94. // Allow a few seconds of slack for max duration as some CI systems can be slow.
  95. let maxDuration = iterations * maxConnectionAge + .seconds(5)
  96. XCTAssertLessThanOrEqual(duration, maxDuration)
  97. }
  98. }
  99. private final class RPCOnConnectionClosedDelegate: GRPCConnectionPoolDelegate {
  100. enum Event: Sendable, Hashable {
  101. case connectSucceeded(GRPCConnectionID)
  102. case connectionQuiescing(GRPCConnectionID)
  103. case connectionClosed(GRPCConnectionID)
  104. }
  105. private struct State {
  106. var events: [Event] = []
  107. var echo: Echo_EchoNIOClient? = nil
  108. var iterations: Int
  109. }
  110. private let state: NIOLockedValueBox<State>
  111. private let done: EventLoopPromise<[Event]>
  112. func setEcho(_ echo: Echo_EchoNIOClient?) {
  113. self.state.withLockedValue { state in
  114. state.echo = echo
  115. }
  116. }
  117. init(iterations: Int, done: EventLoopPromise<[Event]>) {
  118. self.state = NIOLockedValueBox(State(iterations: iterations))
  119. self.done = done
  120. }
  121. func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) {
  122. self.state.withLockedValue { state in
  123. state.events.append(.connectSucceeded(id))
  124. }
  125. }
  126. func connectionQuiescing(id: GRPCConnectionID) {
  127. self.state.withLockedValue { state in
  128. state.events.append(.connectionQuiescing(id))
  129. }
  130. }
  131. func connectionClosed(id: GRPCConnectionID, error: (any Error)?) {
  132. enum Action {
  133. case doNextRPC(Echo_EchoNIOClient)
  134. case done([Event])
  135. }
  136. let action: Action = self.state.withLockedValue { state in
  137. state.events.append(.connectionClosed(id))
  138. if state.iterations > 0 {
  139. state.iterations -= 1
  140. return .doNextRPC(state.echo!)
  141. } else {
  142. return .done(state.events)
  143. }
  144. }
  145. switch action {
  146. case .doNextRPC(let echo):
  147. // Start an RPC to trigger a connect. The result doesn't matter:
  148. _ = echo.get(.with { $0.text = "hello" })
  149. case .done(let events):
  150. self.done.succeed(events)
  151. }
  152. }
  153. func connectionAdded(id: GRPCConnectionID) {}
  154. func connectionRemoved(id: GRPCConnectionID) {}
  155. func startedConnecting(id: GRPCConnectionID) {}
  156. func connectFailed(id: GRPCConnectionID, error: any Error) {}
  157. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {
  158. }
  159. }
  160. func testRPCContinuesAfterQuiescing() throws {
  161. // Check that an in-flight RPC can continue to run after the connection is quiescing as a result
  162. // of aging out.
  163. // The pool creates one sub-pool per event loop. Use a single loop to simplify connection
  164. // counting.
  165. let eventLoop = MultiThreadedEventLoopGroup.singleton.next()
  166. let isQuiescing = eventLoop.makePromise(of: Void.self)
  167. try withEchoClient(group: eventLoop) { config in
  168. config.maxConnectionAge = .milliseconds(50)
  169. config.delegate = SucceedOnQuiescing(promise: isQuiescing)
  170. } test: { echo in
  171. // Send an initial message.
  172. let rpc = echo.collect()
  173. try rpc.sendMessage(.with { $0.text = "1" }).wait()
  174. // Wait for the connection to quiesce.
  175. try isQuiescing.futureResult.wait()
  176. // Send a few more messages then end.
  177. try rpc.sendMessage(.with { $0.text = "2" }).wait()
  178. try rpc.sendMessage(.with { $0.text = "3" }).wait()
  179. try rpc.sendEnd().wait()
  180. let response = try rpc.response.wait()
  181. XCTAssertEqual(response.text, "Swift echo collect: 1 2 3")
  182. }
  183. }
  184. final class SucceedOnQuiescing: GRPCConnectionPoolDelegate {
  185. private let quiescingPromise: EventLoopPromise<Void>
  186. init(promise: EventLoopPromise<Void>) {
  187. self.quiescingPromise = promise
  188. }
  189. func connectionQuiescing(id: GRPCConnectionID) {
  190. self.quiescingPromise.succeed()
  191. }
  192. func connectionAdded(id: GRPCConnectionID) {}
  193. func connectionRemoved(id: GRPCConnectionID) {}
  194. func startedConnecting(id: GRPCConnectionID) {}
  195. func connectFailed(id: GRPCConnectionID, error: any Error) {}
  196. func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) {}
  197. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {
  198. }
  199. func connectionClosed(id: GRPCConnectionID, error: (any Error)?) {}
  200. }
  201. }