ServerConnectionManagementHandlerTests.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOEmbedded
  18. import NIOHTTP2
  19. import XCTest
  20. @testable import GRPCHTTP2Core
  21. final class ServerConnectionManagementHandlerTests: XCTestCase {
  22. func testIdleTimeoutOnNewConnection() throws {
  23. let connection = try Connection(maxIdleTime: .minutes(1))
  24. try connection.activate()
  25. // Hit the max idle time.
  26. connection.advanceTime(by: .minutes(1))
  27. // Follow the graceful shutdown flow.
  28. try self.testGracefulShutdown(connection: connection, lastStreamID: 0)
  29. // Closed because no streams were open.
  30. try connection.waitUntilClosed()
  31. }
  32. func testIdleTimerIsCancelledWhenStreamIsOpened() throws {
  33. let connection = try Connection(maxIdleTime: .minutes(1))
  34. try connection.activate()
  35. // Open a stream to cancel the idle timer and run through the max idle time.
  36. connection.streamOpened(1)
  37. connection.advanceTime(by: .minutes(1))
  38. // No GOAWAY frame means the timer was cancelled.
  39. XCTAssertNil(try connection.readFrame())
  40. }
  41. func testIdleTimerStartsWhenAllStreamsAreClosed() throws {
  42. let connection = try Connection(maxIdleTime: .minutes(1))
  43. try connection.activate()
  44. // Open a stream to cancel the idle timer and run through the max idle time.
  45. connection.streamOpened(1)
  46. connection.advanceTime(by: .minutes(1))
  47. XCTAssertNil(try connection.readFrame())
  48. // Close the stream to start the timer again.
  49. connection.streamClosed(1)
  50. connection.advanceTime(by: .minutes(1))
  51. // Follow the graceful shutdown flow.
  52. try self.testGracefulShutdown(connection: connection, lastStreamID: 1)
  53. // Closed because no streams were open.
  54. try connection.waitUntilClosed()
  55. }
  56. func testMaxAge() throws {
  57. let connection = try Connection(maxAge: .minutes(1))
  58. try connection.activate()
  59. // Open some streams.
  60. connection.streamOpened(1)
  61. connection.streamOpened(3)
  62. // Run to the max age and follow the graceful shutdown flow.
  63. connection.advanceTime(by: .minutes(1))
  64. try self.testGracefulShutdown(connection: connection, lastStreamID: 3)
  65. // Close the streams.
  66. connection.streamClosed(1)
  67. connection.streamClosed(3)
  68. // Connection will be closed now.
  69. try connection.waitUntilClosed()
  70. }
  71. func testGracefulShutdownRatchetsDownStreamID() throws {
  72. // This test uses the idle timeout to trigger graceful shutdown. The mechanism is the same
  73. // regardless of how it's triggered.
  74. let connection = try Connection(maxIdleTime: .minutes(1))
  75. try connection.activate()
  76. // Trigger the shutdown, but open a stream during shutdown.
  77. connection.advanceTime(by: .minutes(1))
  78. try self.testGracefulShutdown(
  79. connection: connection,
  80. lastStreamID: 1,
  81. streamToOpenBeforePingAck: 1
  82. )
  83. // Close the stream to trigger closing the connection.
  84. connection.streamClosed(1)
  85. try connection.waitUntilClosed()
  86. }
  87. func testGracefulShutdownGracePeriod() throws {
  88. // This test uses the idle timeout to trigger graceful shutdown. The mechanism is the same
  89. // regardless of how it's triggered.
  90. let connection = try Connection(
  91. maxIdleTime: .minutes(1),
  92. maxGraceTime: .seconds(5)
  93. )
  94. try connection.activate()
  95. // Trigger the shutdown, but open a stream during shutdown.
  96. connection.advanceTime(by: .minutes(1))
  97. try self.testGracefulShutdown(
  98. connection: connection,
  99. lastStreamID: 1,
  100. streamToOpenBeforePingAck: 1
  101. )
  102. // Wait out the grace period without closing the stream.
  103. connection.advanceTime(by: .seconds(5))
  104. try connection.waitUntilClosed()
  105. }
  106. func testKeepaliveOnNewConnection() throws {
  107. let connection = try Connection(
  108. keepaliveTime: .minutes(5),
  109. keepaliveTimeout: .seconds(5)
  110. )
  111. try connection.activate()
  112. // Wait for the keep alive timer to fire which should cause the server to send a keep
  113. // alive PING.
  114. connection.advanceTime(by: .minutes(5))
  115. let frame1 = try XCTUnwrap(connection.readFrame())
  116. XCTAssertEqual(frame1.streamID, .rootStream)
  117. try XCTAssertPing(frame1.payload) { data, ack in
  118. XCTAssertFalse(ack)
  119. // Data is opaque, send it back.
  120. try connection.ping(data: data, ack: true)
  121. }
  122. // Run past the timeout, nothing should happen.
  123. connection.advanceTime(by: .seconds(5))
  124. XCTAssertNil(try connection.readFrame())
  125. }
  126. func testKeepaliveStartsAfterReadLoop() throws {
  127. let connection = try Connection(
  128. keepaliveTime: .minutes(5),
  129. keepaliveTimeout: .seconds(5)
  130. )
  131. try connection.activate()
  132. // Write a frame into the channel _without_ calling channel read complete. This will cancel
  133. // the keep alive timer.
  134. let settings = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
  135. connection.channel.pipeline.fireChannelRead(NIOAny(settings))
  136. // Run out the keep alive timer, it shouldn't fire.
  137. connection.advanceTime(by: .minutes(5))
  138. XCTAssertNil(try connection.readFrame())
  139. // Fire channel read complete to start the keep alive timer again.
  140. connection.channel.pipeline.fireChannelReadComplete()
  141. // Now expire the keep alive timer again, we should read out a PING frame.
  142. connection.advanceTime(by: .minutes(5))
  143. let frame1 = try XCTUnwrap(connection.readFrame())
  144. XCTAssertEqual(frame1.streamID, .rootStream)
  145. XCTAssertPing(frame1.payload) { data, ack in
  146. XCTAssertFalse(ack)
  147. }
  148. }
  149. func testKeepaliveOnNewConnectionWithoutResponse() throws {
  150. let connection = try Connection(
  151. keepaliveTime: .minutes(5),
  152. keepaliveTimeout: .seconds(5)
  153. )
  154. try connection.activate()
  155. // Wait for the keep alive timer to fire which should cause the server to send a keep
  156. // alive PING.
  157. connection.advanceTime(by: .minutes(5))
  158. let frame1 = try XCTUnwrap(connection.readFrame())
  159. XCTAssertEqual(frame1.streamID, .rootStream)
  160. XCTAssertPing(frame1.payload) { data, ack in
  161. XCTAssertFalse(ack)
  162. }
  163. // We didn't ack the PING, the connection should shutdown after the timeout.
  164. connection.advanceTime(by: .seconds(5))
  165. try self.testGracefulShutdown(connection: connection, lastStreamID: 0)
  166. // Connection is closed now.
  167. try connection.waitUntilClosed()
  168. }
  169. func testClientKeepalivePolicing() throws {
  170. let connection = try Connection(
  171. allowKeepaliveWithoutCalls: true,
  172. minPingIntervalWithoutCalls: .minutes(1)
  173. )
  174. try connection.activate()
  175. // The first ping is valid, the second and third are strikes.
  176. for _ in 1 ... 3 {
  177. try connection.ping(data: HTTP2PingData(), ack: false)
  178. let frame = try XCTUnwrap(connection.readFrame())
  179. XCTAssertEqual(frame.streamID, .rootStream)
  180. XCTAssertPing(frame.payload) { data, ack in
  181. XCTAssertEqual(data, HTTP2PingData())
  182. XCTAssertTrue(ack)
  183. }
  184. }
  185. // The fourth ping is the third strike and triggers a GOAWAY.
  186. try connection.ping(data: HTTP2PingData(), ack: false)
  187. let frame = try XCTUnwrap(connection.readFrame())
  188. XCTAssertEqual(frame.streamID, .rootStream)
  189. XCTAssertGoAway(frame.payload) { streamID, error, data in
  190. XCTAssertEqual(streamID, .rootStream)
  191. XCTAssertEqual(error, .enhanceYourCalm)
  192. XCTAssertEqual(data, ByteBuffer(string: "too_many_pings"))
  193. }
  194. // The server should close the connection.
  195. try connection.waitUntilClosed()
  196. }
  197. func testClientKeepaliveWithPermissibleIntervals() throws {
  198. let connection = try Connection(
  199. allowKeepaliveWithoutCalls: true,
  200. minPingIntervalWithoutCalls: .minutes(1),
  201. manualClock: true
  202. )
  203. try connection.activate()
  204. for _ in 1 ... 100 {
  205. try connection.ping(data: HTTP2PingData(), ack: false)
  206. let frame = try XCTUnwrap(connection.readFrame())
  207. XCTAssertEqual(frame.streamID, .rootStream)
  208. XCTAssertPing(frame.payload) { data, ack in
  209. XCTAssertEqual(data, HTTP2PingData())
  210. XCTAssertTrue(ack)
  211. }
  212. // Advance by the ping interval.
  213. connection.advanceTime(by: .minutes(1))
  214. }
  215. }
  216. func testClientKeepaliveResetState() throws {
  217. let connection = try Connection(
  218. allowKeepaliveWithoutCalls: true,
  219. minPingIntervalWithoutCalls: .minutes(1)
  220. )
  221. try connection.activate()
  222. func sendThreeKeepalivePings() throws {
  223. // The first ping is valid, the second and third are strikes.
  224. for _ in 1 ... 3 {
  225. try connection.ping(data: HTTP2PingData(), ack: false)
  226. let frame = try XCTUnwrap(connection.readFrame())
  227. XCTAssertEqual(frame.streamID, .rootStream)
  228. XCTAssertPing(frame.payload) { data, ack in
  229. XCTAssertEqual(data, HTTP2PingData())
  230. XCTAssertTrue(ack)
  231. }
  232. }
  233. }
  234. try sendThreeKeepalivePings()
  235. // "send" a HEADERS frame and flush to reset keep alive state.
  236. connection.syncView.wroteHeadersFrame()
  237. connection.syncView.connectionWillFlush()
  238. // As above, the first ping is valid, the next two are strikes.
  239. try sendThreeKeepalivePings()
  240. // The next ping is the third strike and triggers a GOAWAY.
  241. try connection.ping(data: HTTP2PingData(), ack: false)
  242. let frame = try XCTUnwrap(connection.readFrame())
  243. XCTAssertEqual(frame.streamID, .rootStream)
  244. XCTAssertGoAway(frame.payload) { streamID, error, data in
  245. XCTAssertEqual(streamID, .rootStream)
  246. XCTAssertEqual(error, .enhanceYourCalm)
  247. XCTAssertEqual(data, ByteBuffer(string: "too_many_pings"))
  248. }
  249. // The server should close the connection.
  250. try connection.waitUntilClosed()
  251. }
  252. }
  253. extension ServerConnectionManagementHandlerTests {
  254. private func testGracefulShutdown(
  255. connection: Connection,
  256. lastStreamID: HTTP2StreamID,
  257. streamToOpenBeforePingAck: HTTP2StreamID? = nil
  258. ) throws {
  259. let frame1 = try XCTUnwrap(connection.readFrame())
  260. XCTAssertEqual(frame1.streamID, .rootStream)
  261. XCTAssertGoAway(frame1.payload) { streamID, errorCode, _ in
  262. XCTAssertEqual(streamID, .maxID)
  263. XCTAssertEqual(errorCode, .noError)
  264. }
  265. // Followed by a PING
  266. let frame2 = try XCTUnwrap(connection.readFrame())
  267. XCTAssertEqual(frame2.streamID, .rootStream)
  268. try XCTAssertPing(frame2.payload) { data, ack in
  269. XCTAssertFalse(ack)
  270. if let id = streamToOpenBeforePingAck {
  271. connection.streamOpened(id)
  272. }
  273. // Send the PING ACK.
  274. try connection.ping(data: data, ack: true)
  275. }
  276. // PING ACK triggers another GOAWAY.
  277. let frame3 = try XCTUnwrap(connection.readFrame())
  278. XCTAssertEqual(frame3.streamID, .rootStream)
  279. XCTAssertGoAway(frame3.payload) { streamID, errorCode, _ in
  280. XCTAssertEqual(streamID, lastStreamID)
  281. XCTAssertEqual(errorCode, .noError)
  282. }
  283. }
  284. }
  285. extension ServerConnectionManagementHandlerTests {
  286. struct Connection {
  287. let channel: EmbeddedChannel
  288. let streamDelegate: any NIOHTTP2StreamDelegate
  289. let syncView: ServerConnectionManagementHandler.SyncView
  290. var loop: EmbeddedEventLoop {
  291. self.channel.embeddedEventLoop
  292. }
  293. private let clock: ServerConnectionManagementHandler.Clock
  294. init(
  295. maxIdleTime: TimeAmount? = nil,
  296. maxAge: TimeAmount? = nil,
  297. maxGraceTime: TimeAmount? = nil,
  298. keepaliveTime: TimeAmount? = nil,
  299. keepaliveTimeout: TimeAmount? = nil,
  300. allowKeepaliveWithoutCalls: Bool = false,
  301. minPingIntervalWithoutCalls: TimeAmount = .minutes(5),
  302. manualClock: Bool = false
  303. ) throws {
  304. if manualClock {
  305. self.clock = .manual(ServerConnectionManagementHandler.Clock.Manual())
  306. } else {
  307. self.clock = .nio
  308. }
  309. let loop = EmbeddedEventLoop()
  310. let handler = ServerConnectionManagementHandler(
  311. eventLoop: loop,
  312. maxIdleTime: maxIdleTime,
  313. maxAge: maxAge,
  314. maxGraceTime: maxGraceTime,
  315. keepaliveTime: keepaliveTime,
  316. keepaliveTimeout: keepaliveTimeout,
  317. allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls,
  318. minPingIntervalWithoutCalls: minPingIntervalWithoutCalls,
  319. clock: self.clock
  320. )
  321. self.streamDelegate = handler.http2StreamDelegate
  322. self.syncView = handler.syncView
  323. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  324. }
  325. func activate() throws {
  326. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  327. }
  328. func advanceTime(by delta: TimeAmount) {
  329. switch self.clock {
  330. case .nio:
  331. ()
  332. case .manual(let clock):
  333. clock.advance(by: delta)
  334. }
  335. self.loop.advanceTime(by: delta)
  336. }
  337. func streamOpened(_ id: HTTP2StreamID) {
  338. self.streamDelegate.streamCreated(id, channel: self.channel)
  339. }
  340. func streamClosed(_ id: HTTP2StreamID) {
  341. self.streamDelegate.streamClosed(id, channel: self.channel)
  342. }
  343. func ping(data: HTTP2PingData, ack: Bool) throws {
  344. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  345. try self.channel.writeInbound(frame)
  346. }
  347. func readFrame() throws -> HTTP2Frame? {
  348. return try self.channel.readOutbound(as: HTTP2Frame.self)
  349. }
  350. func waitUntilClosed() throws {
  351. self.channel.embeddedEventLoop.run()
  352. try self.channel.closeFuture.wait()
  353. }
  354. }
  355. }