ClientConnectionHandlerTests.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) @testable import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHTTP2
  21. import XCTest
  22. final class ClientConnectionHandlerTests: XCTestCase {
  23. func testMaxIdleTime() throws {
  24. let connection = try Connection(maxIdleTime: .minutes(5))
  25. try connection.activate()
  26. // Write the initial settings to ready the connection.
  27. try connection.settings([])
  28. XCTAssertEqual(try connection.readEvent(), .ready)
  29. // Idle with no streams open we should:
  30. // - read out a closing event,
  31. // - write a GOAWAY frame,
  32. // - close.
  33. connection.loop.advanceTime(by: .minutes(5))
  34. XCTAssertEqual(try connection.readEvent(), .closing(.idle))
  35. let frame = try XCTUnwrap(try connection.readFrame())
  36. XCTAssertEqual(frame.streamID, .rootStream)
  37. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  38. XCTAssertEqual(lastStreamID, .rootStream)
  39. XCTAssertEqual(error, .noError)
  40. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  41. }
  42. try connection.waitUntilClosed()
  43. }
  44. func testMaxIdleTimeWhenOpenStreams() throws {
  45. let connection = try Connection(maxIdleTime: .minutes(5))
  46. try connection.activate()
  47. // Open a stream, the idle timer should be cancelled.
  48. connection.streamOpened(1)
  49. // Advance by the idle time, nothing should happen.
  50. connection.loop.advanceTime(by: .minutes(5))
  51. XCTAssertNil(try connection.readEvent())
  52. XCTAssertNil(try connection.readFrame())
  53. // Close the stream, the idle timer should begin again.
  54. connection.streamClosed(1)
  55. connection.loop.advanceTime(by: .minutes(5))
  56. let frame = try XCTUnwrap(try connection.readFrame())
  57. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  58. XCTAssertEqual(lastStreamID, .rootStream)
  59. XCTAssertEqual(error, .noError)
  60. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  61. }
  62. try connection.waitUntilClosed()
  63. }
  64. func testKeepaliveWithOpenStreams() throws {
  65. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  66. try connection.activate()
  67. // Write the initial settings to ready the connection.
  68. try connection.settings([])
  69. XCTAssertEqual(try connection.readEvent(), .ready)
  70. // Open a stream so keep-alive starts.
  71. connection.streamOpened(1)
  72. for _ in 0 ..< 10 {
  73. // Advance time, a PING should be sent, ACK it.
  74. connection.loop.advanceTime(by: .minutes(1))
  75. let frame1 = try XCTUnwrap(connection.readFrame())
  76. XCTAssertEqual(frame1.streamID, .rootStream)
  77. try XCTAssertPing(frame1.payload) { data, ack in
  78. XCTAssertFalse(ack)
  79. try connection.ping(data: data, ack: true)
  80. }
  81. XCTAssertNil(try connection.readFrame())
  82. }
  83. // Close the stream, keep-alive pings should stop.
  84. connection.streamClosed(1)
  85. connection.loop.advanceTime(by: .minutes(1))
  86. XCTAssertNil(try connection.readFrame())
  87. }
  88. func testKeepaliveWithNoOpenStreams() throws {
  89. let connection = try Connection(keepaliveTime: .minutes(1), allowKeepaliveWithoutCalls: true)
  90. try connection.activate()
  91. // Write the initial settings to ready the connection.
  92. try connection.settings([])
  93. XCTAssertEqual(try connection.readEvent(), .ready)
  94. for _ in 0 ..< 10 {
  95. // Advance time, a PING should be sent, ACK it.
  96. connection.loop.advanceTime(by: .minutes(1))
  97. let frame1 = try XCTUnwrap(connection.readFrame())
  98. XCTAssertEqual(frame1.streamID, .rootStream)
  99. try XCTAssertPing(frame1.payload) { data, ack in
  100. XCTAssertFalse(ack)
  101. try connection.ping(data: data, ack: true)
  102. }
  103. XCTAssertNil(try connection.readFrame())
  104. }
  105. }
  106. func testKeepaliveWithOpenStreamsTimingOut() throws {
  107. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  108. try connection.activate()
  109. // Write the initial settings to ready the connection.
  110. try connection.settings([])
  111. XCTAssertEqual(try connection.readEvent(), .ready)
  112. // Open a stream so keep-alive starts.
  113. connection.streamOpened(1)
  114. // Advance time, a PING should be sent, don't ACK it.
  115. connection.loop.advanceTime(by: .minutes(1))
  116. let frame1 = try XCTUnwrap(connection.readFrame())
  117. XCTAssertEqual(frame1.streamID, .rootStream)
  118. XCTAssertPing(frame1.payload) { _, ack in
  119. XCTAssertFalse(ack)
  120. }
  121. // Advance time by the keep alive timeout. We should:
  122. // - read a connection event
  123. // - read out a GOAWAY frame
  124. // - be closed
  125. connection.loop.advanceTime(by: .seconds(10))
  126. XCTAssertEqual(try connection.readEvent(), .closing(.keepaliveExpired))
  127. let frame2 = try XCTUnwrap(connection.readFrame())
  128. XCTAssertEqual(frame2.streamID, .rootStream)
  129. XCTAssertGoAway(frame2.payload) { lastStreamID, error, data in
  130. XCTAssertEqual(lastStreamID, .rootStream)
  131. XCTAssertEqual(error, .noError)
  132. XCTAssertEqual(data, ByteBuffer(string: "keepalive_expired"))
  133. }
  134. // Doesn't wait for streams to close: the connection is bad.
  135. try connection.waitUntilClosed()
  136. }
  137. func testPingsAreIgnored() throws {
  138. let connection = try Connection()
  139. try connection.activate()
  140. // PING frames without ack set should be ignored, we rely on the HTTP/2 handler replying to them.
  141. try connection.ping(data: HTTP2PingData(), ack: false)
  142. XCTAssertNil(try connection.readFrame())
  143. }
  144. func testReceiveGoAway() throws {
  145. let connection = try Connection()
  146. try connection.activate()
  147. try connection.goAway(
  148. lastStreamID: 0,
  149. errorCode: .enhanceYourCalm,
  150. opaqueData: ByteBuffer(string: "too_many_pings")
  151. )
  152. // Should read out an event and close (because there are no open streams).
  153. XCTAssertEqual(
  154. try connection.readEvent(),
  155. .closing(.goAway(.enhanceYourCalm, "too_many_pings"))
  156. )
  157. try connection.waitUntilClosed()
  158. }
  159. func testReceiveGoAwayWithOpenStreams() throws {
  160. let connection = try Connection()
  161. try connection.activate()
  162. connection.streamOpened(1)
  163. connection.streamOpened(2)
  164. connection.streamOpened(3)
  165. try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
  166. // Should read out an event.
  167. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
  168. // Close streams so the connection can close.
  169. connection.streamClosed(1)
  170. connection.streamClosed(2)
  171. connection.streamClosed(3)
  172. try connection.waitUntilClosed()
  173. }
  174. func testOutboundGracefulClose() throws {
  175. let connection = try Connection()
  176. try connection.activate()
  177. connection.streamOpened(1)
  178. let closed = connection.closeGracefully()
  179. XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
  180. connection.streamClosed(1)
  181. try closed.wait()
  182. }
  183. func testReceiveInitialSettings() throws {
  184. let connection = try Connection()
  185. try connection.activate()
  186. // Nothing yet.
  187. XCTAssertNil(try connection.readEvent())
  188. // Write the initial settings.
  189. try connection.settings([])
  190. XCTAssertEqual(try connection.readEvent(), .ready)
  191. // Receiving another settings frame should be a no-op.
  192. try connection.settings([])
  193. XCTAssertNil(try connection.readEvent())
  194. }
  195. func testReceiveErrorWhenIdle() throws {
  196. let connection = try Connection()
  197. try connection.activate()
  198. // Write the initial settings.
  199. try connection.settings([])
  200. XCTAssertEqual(try connection.readEvent(), .ready)
  201. // Write an error and close.
  202. let error = RPCError(code: .aborted, message: "")
  203. connection.channel.pipeline.fireErrorCaught(error)
  204. connection.channel.close(mode: .all, promise: nil)
  205. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: true)))
  206. }
  207. func testReceiveErrorWhenStreamsAreOpen() throws {
  208. let connection = try Connection()
  209. try connection.activate()
  210. // Write the initial settings.
  211. try connection.settings([])
  212. XCTAssertEqual(try connection.readEvent(), .ready)
  213. // Open a stream.
  214. connection.streamOpened(1)
  215. // Write an error and close.
  216. let error = RPCError(code: .aborted, message: "")
  217. connection.channel.pipeline.fireErrorCaught(error)
  218. connection.channel.close(mode: .all, promise: nil)
  219. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: false)))
  220. }
  221. func testUnexpectedCloseWhenIdle() throws {
  222. let connection = try Connection()
  223. try connection.activate()
  224. // Write the initial settings.
  225. try connection.settings([])
  226. XCTAssertEqual(try connection.readEvent(), .ready)
  227. connection.channel.close(mode: .all, promise: nil)
  228. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: true)))
  229. }
  230. func testUnexpectedCloseWhenStreamsAreOpen() throws {
  231. let connection = try Connection()
  232. try connection.activate()
  233. // Write the initial settings.
  234. try connection.settings([])
  235. XCTAssertEqual(try connection.readEvent(), .ready)
  236. connection.streamOpened(1)
  237. connection.channel.close(mode: .all, promise: nil)
  238. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: false)))
  239. }
  240. }
  241. extension ClientConnectionHandlerTests {
  242. struct Connection {
  243. let channel: EmbeddedChannel
  244. let streamDelegate: any NIOHTTP2StreamDelegate
  245. var loop: EmbeddedEventLoop {
  246. self.channel.embeddedEventLoop
  247. }
  248. init(
  249. maxIdleTime: TimeAmount? = nil,
  250. keepaliveTime: TimeAmount? = nil,
  251. keepaliveTimeout: TimeAmount? = nil,
  252. allowKeepaliveWithoutCalls: Bool = false
  253. ) throws {
  254. let loop = EmbeddedEventLoop()
  255. let handler = ClientConnectionHandler(
  256. eventLoop: loop,
  257. maxIdleTime: maxIdleTime,
  258. keepaliveTime: keepaliveTime,
  259. keepaliveTimeout: keepaliveTimeout,
  260. keepaliveWithoutCalls: allowKeepaliveWithoutCalls
  261. )
  262. self.streamDelegate = handler.http2StreamDelegate
  263. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  264. }
  265. func activate() throws {
  266. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  267. }
  268. func streamOpened(_ id: HTTP2StreamID) {
  269. self.streamDelegate.streamCreated(id, channel: self.channel)
  270. }
  271. func streamClosed(_ id: HTTP2StreamID) {
  272. self.streamDelegate.streamClosed(id, channel: self.channel)
  273. }
  274. func goAway(
  275. lastStreamID: HTTP2StreamID,
  276. errorCode: HTTP2ErrorCode,
  277. opaqueData: ByteBuffer? = nil
  278. ) throws {
  279. let frame = HTTP2Frame(
  280. streamID: .rootStream,
  281. payload: .goAway(lastStreamID: lastStreamID, errorCode: errorCode, opaqueData: opaqueData)
  282. )
  283. try self.channel.writeInbound(frame)
  284. }
  285. func ping(data: HTTP2PingData, ack: Bool) throws {
  286. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  287. try self.channel.writeInbound(frame)
  288. }
  289. func settings(_ settings: [HTTP2Setting]) throws {
  290. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  291. try self.channel.writeInbound(frame)
  292. }
  293. func readFrame() throws -> HTTP2Frame? {
  294. return try self.channel.readOutbound(as: HTTP2Frame.self)
  295. }
  296. func readEvent() throws -> ClientConnectionEvent? {
  297. return try self.channel.readInbound(as: ClientConnectionEvent.self)
  298. }
  299. func waitUntilClosed() throws {
  300. self.channel.embeddedEventLoop.run()
  301. try self.channel.closeFuture.wait()
  302. }
  303. func closeGracefully() -> EventLoopFuture<Void> {
  304. let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
  305. let event = ClientConnectionHandler.OutboundEvent.closeGracefully
  306. self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
  307. return promise.futureResult
  308. }
  309. }
  310. }