ClientConnectionHandlerTests.swift 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @_spi(Package) @testable import GRPCHTTP2Core
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHTTP2
  20. import XCTest
  21. final class ClientConnectionHandlerTests: XCTestCase {
  22. func testMaxIdleTime() throws {
  23. let connection = try Connection(maxIdleTime: .minutes(5))
  24. try connection.activate()
  25. // Idle with no streams open we should:
  26. // - read out a closing event,
  27. // - write a GOAWAY frame,
  28. // - close.
  29. connection.loop.advanceTime(by: .minutes(5))
  30. XCTAssertEqual(try connection.readEvent(), .closing(.idle))
  31. let frame = try XCTUnwrap(try connection.readFrame())
  32. XCTAssertEqual(frame.streamID, .rootStream)
  33. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  34. XCTAssertEqual(lastStreamID, .rootStream)
  35. XCTAssertEqual(error, .noError)
  36. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  37. }
  38. try connection.waitUntilClosed()
  39. }
  40. func testMaxIdleTimeWhenOpenStreams() throws {
  41. let connection = try Connection(maxIdleTime: .minutes(5))
  42. try connection.activate()
  43. // Open a stream, the idle timer should be cancelled.
  44. connection.streamOpened(1)
  45. // Advance by the idle time, nothing should happen.
  46. connection.loop.advanceTime(by: .minutes(5))
  47. XCTAssertNil(try connection.readEvent())
  48. XCTAssertNil(try connection.readFrame())
  49. // Close the stream, the idle timer should begin again.
  50. connection.streamClosed(1)
  51. connection.loop.advanceTime(by: .minutes(5))
  52. let frame = try XCTUnwrap(try connection.readFrame())
  53. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  54. XCTAssertEqual(lastStreamID, .rootStream)
  55. XCTAssertEqual(error, .noError)
  56. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  57. }
  58. try connection.waitUntilClosed()
  59. }
  60. func testKeepaliveWithOpenStreams() throws {
  61. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  62. try connection.activate()
  63. // Open a stream so keep-alive starts.
  64. connection.streamOpened(1)
  65. for _ in 0 ..< 10 {
  66. // Advance time, a PING should be sent, ACK it.
  67. connection.loop.advanceTime(by: .minutes(1))
  68. let frame1 = try XCTUnwrap(connection.readFrame())
  69. XCTAssertEqual(frame1.streamID, .rootStream)
  70. try XCTAssertPing(frame1.payload) { data, ack in
  71. XCTAssertFalse(ack)
  72. try connection.ping(data: data, ack: true)
  73. }
  74. XCTAssertNil(try connection.readFrame())
  75. }
  76. // Close the stream, keep-alive pings should stop.
  77. connection.streamClosed(1)
  78. connection.loop.advanceTime(by: .minutes(1))
  79. XCTAssertNil(try connection.readFrame())
  80. }
  81. func testKeepaliveWithNoOpenStreams() throws {
  82. let connection = try Connection(keepaliveTime: .minutes(1), allowKeepaliveWithoutCalls: true)
  83. try connection.activate()
  84. for _ in 0 ..< 10 {
  85. // Advance time, a PING should be sent, ACK it.
  86. connection.loop.advanceTime(by: .minutes(1))
  87. let frame1 = try XCTUnwrap(connection.readFrame())
  88. XCTAssertEqual(frame1.streamID, .rootStream)
  89. try XCTAssertPing(frame1.payload) { data, ack in
  90. XCTAssertFalse(ack)
  91. try connection.ping(data: data, ack: true)
  92. }
  93. XCTAssertNil(try connection.readFrame())
  94. }
  95. }
  96. func testKeepaliveWithOpenStreamsTimingOut() throws {
  97. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  98. try connection.activate()
  99. // Open a stream so keep-alive starts.
  100. connection.streamOpened(1)
  101. // Advance time, a PING should be sent, don't ACK it.
  102. connection.loop.advanceTime(by: .minutes(1))
  103. let frame1 = try XCTUnwrap(connection.readFrame())
  104. XCTAssertEqual(frame1.streamID, .rootStream)
  105. XCTAssertPing(frame1.payload) { _, ack in
  106. XCTAssertFalse(ack)
  107. }
  108. // Advance time by the keep alive timeout. We should:
  109. // - read a connection event
  110. // - read out a GOAWAY frame
  111. // - be closed
  112. connection.loop.advanceTime(by: .seconds(10))
  113. XCTAssertEqual(try connection.readEvent(), .closing(.keepaliveExpired))
  114. let frame2 = try XCTUnwrap(connection.readFrame())
  115. XCTAssertEqual(frame2.streamID, .rootStream)
  116. XCTAssertGoAway(frame2.payload) { lastStreamID, error, data in
  117. XCTAssertEqual(lastStreamID, .rootStream)
  118. XCTAssertEqual(error, .noError)
  119. XCTAssertEqual(data, ByteBuffer(string: "keepalive_expired"))
  120. }
  121. // Doesn't wait for streams to close: the connection is bad.
  122. try connection.waitUntilClosed()
  123. }
  124. func testPingsAreIgnored() throws {
  125. let connection = try Connection()
  126. try connection.activate()
  127. // PING frames without ack set should be ignored, we rely on the HTTP/2 handler replying to them.
  128. try connection.ping(data: HTTP2PingData(), ack: false)
  129. XCTAssertNil(try connection.readFrame())
  130. }
  131. func testReceiveGoAway() throws {
  132. let connection = try Connection()
  133. try connection.activate()
  134. try connection.goAway(
  135. lastStreamID: 0,
  136. errorCode: .enhanceYourCalm,
  137. opaqueData: ByteBuffer(string: "too_many_pings")
  138. )
  139. // Should read out an event and close (because there are no open streams).
  140. XCTAssertEqual(
  141. try connection.readEvent(),
  142. .closing(.goAway(.enhanceYourCalm, "too_many_pings"))
  143. )
  144. try connection.waitUntilClosed()
  145. }
  146. func testReceiveGoAwayWithOpenStreams() throws {
  147. let connection = try Connection()
  148. try connection.activate()
  149. connection.streamOpened(1)
  150. connection.streamOpened(2)
  151. connection.streamOpened(3)
  152. try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
  153. // Should read out an event.
  154. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
  155. // Close streams so the connection can close.
  156. connection.streamClosed(1)
  157. connection.streamClosed(2)
  158. connection.streamClosed(3)
  159. try connection.waitUntilClosed()
  160. }
  161. func testOutboundGracefulClose() throws {
  162. let connection = try Connection()
  163. try connection.activate()
  164. connection.streamOpened(1)
  165. let closed = connection.closeGracefully()
  166. XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
  167. connection.streamClosed(1)
  168. try closed.wait()
  169. }
  170. func testReceiveInitialSettings() throws {
  171. let connection = try Connection()
  172. try connection.activate()
  173. // Nothing yet.
  174. XCTAssertNil(try connection.readEvent())
  175. // Write the initial settings.
  176. try connection.settings([])
  177. XCTAssertEqual(try connection.readEvent(), .ready)
  178. // Receiving another settings frame should be a no-op.
  179. try connection.settings([])
  180. XCTAssertNil(try connection.readEvent())
  181. }
  182. }
  183. extension ClientConnectionHandlerTests {
  184. struct Connection {
  185. let channel: EmbeddedChannel
  186. let streamDelegate: any NIOHTTP2StreamDelegate
  187. var loop: EmbeddedEventLoop {
  188. self.channel.embeddedEventLoop
  189. }
  190. init(
  191. maxIdleTime: TimeAmount? = nil,
  192. keepaliveTime: TimeAmount? = nil,
  193. keepaliveTimeout: TimeAmount? = nil,
  194. allowKeepaliveWithoutCalls: Bool = false
  195. ) throws {
  196. let loop = EmbeddedEventLoop()
  197. let handler = ClientConnectionHandler(
  198. eventLoop: loop,
  199. maxIdleTime: maxIdleTime,
  200. keepaliveTime: keepaliveTime,
  201. keepaliveTimeout: keepaliveTimeout,
  202. keepaliveWithoutCalls: allowKeepaliveWithoutCalls
  203. )
  204. self.streamDelegate = handler.http2StreamDelegate
  205. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  206. }
  207. func activate() throws {
  208. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  209. }
  210. func streamOpened(_ id: HTTP2StreamID) {
  211. self.streamDelegate.streamCreated(id, channel: self.channel)
  212. }
  213. func streamClosed(_ id: HTTP2StreamID) {
  214. self.streamDelegate.streamClosed(id, channel: self.channel)
  215. }
  216. func goAway(
  217. lastStreamID: HTTP2StreamID,
  218. errorCode: HTTP2ErrorCode,
  219. opaqueData: ByteBuffer? = nil
  220. ) throws {
  221. let frame = HTTP2Frame(
  222. streamID: .rootStream,
  223. payload: .goAway(lastStreamID: lastStreamID, errorCode: errorCode, opaqueData: opaqueData)
  224. )
  225. try self.channel.writeInbound(frame)
  226. }
  227. func ping(data: HTTP2PingData, ack: Bool) throws {
  228. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  229. try self.channel.writeInbound(frame)
  230. }
  231. func settings(_ settings: [HTTP2Setting]) throws {
  232. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  233. try self.channel.writeInbound(frame)
  234. }
  235. func readFrame() throws -> HTTP2Frame? {
  236. return try self.channel.readOutbound(as: HTTP2Frame.self)
  237. }
  238. func readEvent() throws -> ClientConnectionEvent? {
  239. return try self.channel.readInbound(as: ClientConnectionEvent.self)
  240. }
  241. func waitUntilClosed() throws {
  242. self.channel.embeddedEventLoop.run()
  243. try self.channel.closeFuture.wait()
  244. }
  245. func closeGracefully() -> EventLoopFuture<Void> {
  246. let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
  247. let event = ClientConnectionHandler.OutboundEvent.closeGracefully
  248. self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
  249. return promise.futureResult
  250. }
  251. }
  252. }