ServerConnectionManagementHandlerTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOEmbedded
  18. import NIOHTTP2
  19. import XCTest
  20. @testable import GRPCHTTP2Core
  21. final class ServerConnectionManagementHandlerTests: XCTestCase {
  22. func testIdleTimeoutOnNewConnection() throws {
  23. let connection = try Connection(maxIdleTime: .minutes(1))
  24. try connection.activate()
  25. // Hit the max idle time.
  26. connection.advanceTime(by: .minutes(1))
  27. // Follow the graceful shutdown flow.
  28. try self.testGracefulShutdown(connection: connection, lastStreamID: 0)
  29. // Closed because no streams were open.
  30. try connection.waitUntilClosed()
  31. }
  32. func testIdleTimerIsCancelledWhenStreamIsOpened() throws {
  33. let connection = try Connection(maxIdleTime: .minutes(1))
  34. try connection.activate()
  35. // Open a stream to cancel the idle timer and run through the max idle time.
  36. connection.streamOpened(1)
  37. connection.advanceTime(by: .minutes(1))
  38. // No GOAWAY frame means the timer was cancelled.
  39. XCTAssertNil(try connection.readFrame())
  40. }
  41. func testIdleTimerStartsWhenAllStreamsAreClosed() throws {
  42. let connection = try Connection(maxIdleTime: .minutes(1))
  43. try connection.activate()
  44. // Open a stream to cancel the idle timer and run through the max idle time.
  45. connection.streamOpened(1)
  46. connection.advanceTime(by: .minutes(1))
  47. XCTAssertNil(try connection.readFrame())
  48. // Close the stream to start the timer again.
  49. connection.streamClosed(1)
  50. connection.advanceTime(by: .minutes(1))
  51. // Follow the graceful shutdown flow.
  52. try self.testGracefulShutdown(connection: connection, lastStreamID: 1)
  53. // Closed because no streams were open.
  54. try connection.waitUntilClosed()
  55. }
  56. func testMaxAge() throws {
  57. let connection = try Connection(maxAge: .minutes(1))
  58. try connection.activate()
  59. // Open some streams.
  60. connection.streamOpened(1)
  61. connection.streamOpened(3)
  62. // Run to the max age and follow the graceful shutdown flow.
  63. connection.advanceTime(by: .minutes(1))
  64. try self.testGracefulShutdown(connection: connection, lastStreamID: 3)
  65. // Close the streams.
  66. connection.streamClosed(1)
  67. connection.streamClosed(3)
  68. // Connection will be closed now.
  69. try connection.waitUntilClosed()
  70. }
  71. func testGracefulShutdownRatchetsDownStreamID() throws {
  72. // This test uses the idle timeout to trigger graceful shutdown. The mechanism is the same
  73. // regardless of how it's triggered.
  74. let connection = try Connection(maxIdleTime: .minutes(1))
  75. try connection.activate()
  76. // Trigger the shutdown, but open a stream during shutdown.
  77. connection.advanceTime(by: .minutes(1))
  78. try self.testGracefulShutdown(
  79. connection: connection,
  80. lastStreamID: 1,
  81. streamToOpenBeforePingAck: 1
  82. )
  83. // Close the stream to trigger closing the connection.
  84. connection.streamClosed(1)
  85. try connection.waitUntilClosed()
  86. }
  87. func testGracefulShutdownGracePeriod() throws {
  88. // This test uses the idle timeout to trigger graceful shutdown. The mechanism is the same
  89. // regardless of how it's triggered.
  90. let connection = try Connection(
  91. maxIdleTime: .minutes(1),
  92. maxGraceTime: .seconds(5)
  93. )
  94. try connection.activate()
  95. // Trigger the shutdown, but open a stream during shutdown.
  96. connection.advanceTime(by: .minutes(1))
  97. try self.testGracefulShutdown(
  98. connection: connection,
  99. lastStreamID: 1,
  100. streamToOpenBeforePingAck: 1
  101. )
  102. // Wait out the grace period without closing the stream.
  103. connection.advanceTime(by: .seconds(5))
  104. try connection.waitUntilClosed()
  105. }
  106. func testKeepaliveOnNewConnection() throws {
  107. let connection = try Connection(
  108. keepaliveTime: .minutes(5),
  109. keepaliveTimeout: .seconds(5)
  110. )
  111. try connection.activate()
  112. // Wait for the keep alive timer to fire which should cause the server to send a keep
  113. // alive PING.
  114. connection.advanceTime(by: .minutes(5))
  115. let frame1 = try XCTUnwrap(connection.readFrame())
  116. XCTAssertEqual(frame1.streamID, .rootStream)
  117. try XCTAssertPing(frame1.payload) { data, ack in
  118. XCTAssertFalse(ack)
  119. // Data is opaque, send it back.
  120. try connection.ping(data: data, ack: true)
  121. }
  122. // Run past the timeout, nothing should happen.
  123. connection.advanceTime(by: .seconds(5))
  124. XCTAssertNil(try connection.readFrame())
  125. }
  126. func testKeepaliveStartsAfterReadLoop() throws {
  127. let connection = try Connection(
  128. keepaliveTime: .minutes(5),
  129. keepaliveTimeout: .seconds(5)
  130. )
  131. try connection.activate()
  132. // Write a frame into the channel _without_ calling channel read complete. This will cancel
  133. // the keep alive timer.
  134. let settings = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  135. connection.channel.pipeline.fireChannelRead(NIOAny(settings))
  136. // Run out the keep alive timer, it shouldn't fire.
  137. connection.advanceTime(by: .minutes(5))
  138. XCTAssertNil(try connection.readFrame())
  139. // Fire channel read complete to start the keep alive timer again.
  140. connection.channel.pipeline.fireChannelReadComplete()
  141. // Now expire the keep alive timer again, we should read out a PING frame.
  142. connection.advanceTime(by: .minutes(5))
  143. let frame1 = try XCTUnwrap(connection.readFrame())
  144. XCTAssertEqual(frame1.streamID, .rootStream)
  145. XCTAssertPing(frame1.payload) { data, ack in
  146. XCTAssertFalse(ack)
  147. }
  148. }
  149. func testKeepaliveOnNewConnectionWithoutResponse() throws {
  150. let connection = try Connection(
  151. keepaliveTime: .minutes(5),
  152. keepaliveTimeout: .seconds(5)
  153. )
  154. try connection.activate()
  155. // Wait for the keep alive timer to fire which should cause the server to send a keep
  156. // alive PING.
  157. connection.advanceTime(by: .minutes(5))
  158. let frame1 = try XCTUnwrap(connection.readFrame())
  159. XCTAssertEqual(frame1.streamID, .rootStream)
  160. XCTAssertPing(frame1.payload) { data, ack in
  161. XCTAssertFalse(ack)
  162. }
  163. // We didn't ack the PING, the connection should shutdown after the timeout.
  164. connection.advanceTime(by: .seconds(5))
  165. try self.testGracefulShutdown(connection: connection, lastStreamID: 0)
  166. // Connection is closed now.
  167. try connection.waitUntilClosed()
  168. }
  169. func testClientKeepalivePolicing() throws {
  170. let connection = try Connection(
  171. allowKeepaliveWithoutCalls: true,
  172. minPingIntervalWithoutCalls: .minutes(1)
  173. )
  174. try connection.activate()
  175. // The first ping is valid, the second and third are strikes.
  176. for _ in 1 ... 3 {
  177. try connection.ping(data: HTTP2PingData(), ack: false)
  178. XCTAssertNil(try connection.readFrame())
  179. }
  180. // The fourth ping is the third strike and triggers a GOAWAY.
  181. try connection.ping(data: HTTP2PingData(), ack: false)
  182. let frame = try XCTUnwrap(connection.readFrame())
  183. XCTAssertEqual(frame.streamID, .rootStream)
  184. XCTAssertGoAway(frame.payload) { streamID, error, data in
  185. XCTAssertEqual(streamID, .rootStream)
  186. XCTAssertEqual(error, .enhanceYourCalm)
  187. XCTAssertEqual(data, ByteBuffer(string: "too_many_pings"))
  188. }
  189. // The server should close the connection.
  190. try connection.waitUntilClosed()
  191. }
  192. func testClientKeepaliveWithPermissibleIntervals() throws {
  193. let connection = try Connection(
  194. allowKeepaliveWithoutCalls: true,
  195. minPingIntervalWithoutCalls: .minutes(1),
  196. manualClock: true
  197. )
  198. try connection.activate()
  199. for _ in 1 ... 100 {
  200. try connection.ping(data: HTTP2PingData(), ack: false)
  201. XCTAssertNil(try connection.readFrame())
  202. // Advance by the ping interval.
  203. connection.advanceTime(by: .minutes(1))
  204. }
  205. }
  206. func testClientKeepaliveResetState() throws {
  207. let connection = try Connection(
  208. allowKeepaliveWithoutCalls: true,
  209. minPingIntervalWithoutCalls: .minutes(1)
  210. )
  211. try connection.activate()
  212. func sendThreeKeepalivePings() throws {
  213. // The first ping is valid, the second and third are strikes.
  214. for _ in 1 ... 3 {
  215. try connection.ping(data: HTTP2PingData(), ack: false)
  216. XCTAssertNil(try connection.readFrame())
  217. }
  218. }
  219. try sendThreeKeepalivePings()
  220. // "send" a HEADERS frame and flush to reset keep alive state.
  221. connection.syncView.wroteHeadersFrame()
  222. connection.syncView.connectionWillFlush()
  223. // As above, the first ping is valid, the next two are strikes.
  224. try sendThreeKeepalivePings()
  225. // The next ping is the third strike and triggers a GOAWAY.
  226. try connection.ping(data: HTTP2PingData(), ack: false)
  227. let frame = try XCTUnwrap(connection.readFrame())
  228. XCTAssertEqual(frame.streamID, .rootStream)
  229. XCTAssertGoAway(frame.payload) { streamID, error, data in
  230. XCTAssertEqual(streamID, .rootStream)
  231. XCTAssertEqual(error, .enhanceYourCalm)
  232. XCTAssertEqual(data, ByteBuffer(string: "too_many_pings"))
  233. }
  234. // The server should close the connection.
  235. try connection.waitUntilClosed()
  236. }
  237. }
  238. extension ServerConnectionManagementHandlerTests {
  239. private func testGracefulShutdown(
  240. connection: Connection,
  241. lastStreamID: HTTP2StreamID,
  242. streamToOpenBeforePingAck: HTTP2StreamID? = nil
  243. ) throws {
  244. let frame1 = try XCTUnwrap(connection.readFrame())
  245. XCTAssertEqual(frame1.streamID, .rootStream)
  246. XCTAssertGoAway(frame1.payload) { streamID, errorCode, _ in
  247. XCTAssertEqual(streamID, .maxID)
  248. XCTAssertEqual(errorCode, .noError)
  249. }
  250. // Followed by a PING
  251. let frame2 = try XCTUnwrap(connection.readFrame())
  252. XCTAssertEqual(frame2.streamID, .rootStream)
  253. try XCTAssertPing(frame2.payload) { data, ack in
  254. XCTAssertFalse(ack)
  255. if let id = streamToOpenBeforePingAck {
  256. connection.streamOpened(id)
  257. }
  258. // Send the PING ACK.
  259. try connection.ping(data: data, ack: true)
  260. }
  261. // PING ACK triggers another GOAWAY.
  262. let frame3 = try XCTUnwrap(connection.readFrame())
  263. XCTAssertEqual(frame3.streamID, .rootStream)
  264. XCTAssertGoAway(frame3.payload) { streamID, errorCode, _ in
  265. XCTAssertEqual(streamID, lastStreamID)
  266. XCTAssertEqual(errorCode, .noError)
  267. }
  268. }
  269. }
  270. extension ServerConnectionManagementHandlerTests {
  271. struct Connection {
  272. let channel: EmbeddedChannel
  273. let streamDelegate: any NIOHTTP2StreamDelegate
  274. let syncView: ServerConnectionManagementHandler.SyncView
  275. var loop: EmbeddedEventLoop {
  276. self.channel.embeddedEventLoop
  277. }
  278. private let clock: ServerConnectionManagementHandler.Clock
  279. init(
  280. maxIdleTime: TimeAmount? = nil,
  281. maxAge: TimeAmount? = nil,
  282. maxGraceTime: TimeAmount? = nil,
  283. keepaliveTime: TimeAmount? = nil,
  284. keepaliveTimeout: TimeAmount? = nil,
  285. allowKeepaliveWithoutCalls: Bool = false,
  286. minPingIntervalWithoutCalls: TimeAmount = .minutes(5),
  287. manualClock: Bool = false
  288. ) throws {
  289. if manualClock {
  290. self.clock = .manual(ServerConnectionManagementHandler.Clock.Manual())
  291. } else {
  292. self.clock = .nio
  293. }
  294. let loop = EmbeddedEventLoop()
  295. let handler = ServerConnectionManagementHandler(
  296. eventLoop: loop,
  297. maxIdleTime: maxIdleTime,
  298. maxAge: maxAge,
  299. maxGraceTime: maxGraceTime,
  300. keepaliveTime: keepaliveTime,
  301. keepaliveTimeout: keepaliveTimeout,
  302. allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls,
  303. minPingIntervalWithoutCalls: minPingIntervalWithoutCalls,
  304. requireALPN: false,
  305. clock: self.clock
  306. )
  307. self.streamDelegate = handler.http2StreamDelegate
  308. self.syncView = handler.syncView
  309. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  310. }
  311. func activate() throws {
  312. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  313. }
  314. func advanceTime(by delta: TimeAmount) {
  315. switch self.clock {
  316. case .nio:
  317. ()
  318. case .manual(let clock):
  319. clock.advance(by: delta)
  320. }
  321. self.loop.advanceTime(by: delta)
  322. }
  323. func streamOpened(_ id: HTTP2StreamID) {
  324. self.streamDelegate.streamCreated(id, channel: self.channel)
  325. }
  326. func streamClosed(_ id: HTTP2StreamID) {
  327. self.streamDelegate.streamClosed(id, channel: self.channel)
  328. }
  329. func ping(data: HTTP2PingData, ack: Bool) throws {
  330. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  331. try self.channel.writeInbound(frame)
  332. }
  333. func readFrame() throws -> HTTP2Frame? {
  334. return try self.channel.readOutbound(as: HTTP2Frame.self)
  335. }
  336. func waitUntilClosed() throws {
  337. self.channel.embeddedEventLoop.run()
  338. try self.channel.closeFuture.wait()
  339. }
  340. }
  341. }