ClientConnectionHandlerTests.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) @testable import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHTTP2
  21. import XCTest
  22. final class ClientConnectionHandlerTests: XCTestCase {
  23. func testMaxIdleTime() throws {
  24. let connection = try Connection(maxIdleTime: .minutes(5))
  25. try connection.activate()
  26. // Idle with no streams open we should:
  27. // - read out a closing event,
  28. // - write a GOAWAY frame,
  29. // - close.
  30. connection.loop.advanceTime(by: .minutes(5))
  31. XCTAssertEqual(try connection.readEvent(), .closing(.idle))
  32. let frame = try XCTUnwrap(try connection.readFrame())
  33. XCTAssertEqual(frame.streamID, .rootStream)
  34. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  35. XCTAssertEqual(lastStreamID, .rootStream)
  36. XCTAssertEqual(error, .noError)
  37. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  38. }
  39. try connection.waitUntilClosed()
  40. }
  41. func testMaxIdleTimeWhenOpenStreams() throws {
  42. let connection = try Connection(maxIdleTime: .minutes(5))
  43. try connection.activate()
  44. // Open a stream, the idle timer should be cancelled.
  45. connection.streamOpened(1)
  46. // Advance by the idle time, nothing should happen.
  47. connection.loop.advanceTime(by: .minutes(5))
  48. XCTAssertNil(try connection.readEvent())
  49. XCTAssertNil(try connection.readFrame())
  50. // Close the stream, the idle timer should begin again.
  51. connection.streamClosed(1)
  52. connection.loop.advanceTime(by: .minutes(5))
  53. let frame = try XCTUnwrap(try connection.readFrame())
  54. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  55. XCTAssertEqual(lastStreamID, .rootStream)
  56. XCTAssertEqual(error, .noError)
  57. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  58. }
  59. try connection.waitUntilClosed()
  60. }
  61. func testKeepaliveWithOpenStreams() throws {
  62. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  63. try connection.activate()
  64. // Open a stream so keep-alive starts.
  65. connection.streamOpened(1)
  66. for _ in 0 ..< 10 {
  67. // Advance time, a PING should be sent, ACK it.
  68. connection.loop.advanceTime(by: .minutes(1))
  69. let frame1 = try XCTUnwrap(connection.readFrame())
  70. XCTAssertEqual(frame1.streamID, .rootStream)
  71. try XCTAssertPing(frame1.payload) { data, ack in
  72. XCTAssertFalse(ack)
  73. try connection.ping(data: data, ack: true)
  74. }
  75. XCTAssertNil(try connection.readFrame())
  76. }
  77. // Close the stream, keep-alive pings should stop.
  78. connection.streamClosed(1)
  79. connection.loop.advanceTime(by: .minutes(1))
  80. XCTAssertNil(try connection.readFrame())
  81. }
  82. func testKeepaliveWithNoOpenStreams() throws {
  83. let connection = try Connection(keepaliveTime: .minutes(1), allowKeepaliveWithoutCalls: true)
  84. try connection.activate()
  85. for _ in 0 ..< 10 {
  86. // Advance time, a PING should be sent, ACK it.
  87. connection.loop.advanceTime(by: .minutes(1))
  88. let frame1 = try XCTUnwrap(connection.readFrame())
  89. XCTAssertEqual(frame1.streamID, .rootStream)
  90. try XCTAssertPing(frame1.payload) { data, ack in
  91. XCTAssertFalse(ack)
  92. try connection.ping(data: data, ack: true)
  93. }
  94. XCTAssertNil(try connection.readFrame())
  95. }
  96. }
  97. func testKeepaliveWithOpenStreamsTimingOut() throws {
  98. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  99. try connection.activate()
  100. // Open a stream so keep-alive starts.
  101. connection.streamOpened(1)
  102. // Advance time, a PING should be sent, don't ACK it.
  103. connection.loop.advanceTime(by: .minutes(1))
  104. let frame1 = try XCTUnwrap(connection.readFrame())
  105. XCTAssertEqual(frame1.streamID, .rootStream)
  106. XCTAssertPing(frame1.payload) { _, ack in
  107. XCTAssertFalse(ack)
  108. }
  109. // Advance time by the keep alive timeout. We should:
  110. // - read a connection event
  111. // - read out a GOAWAY frame
  112. // - be closed
  113. connection.loop.advanceTime(by: .seconds(10))
  114. XCTAssertEqual(try connection.readEvent(), .closing(.keepaliveExpired))
  115. let frame2 = try XCTUnwrap(connection.readFrame())
  116. XCTAssertEqual(frame2.streamID, .rootStream)
  117. XCTAssertGoAway(frame2.payload) { lastStreamID, error, data in
  118. XCTAssertEqual(lastStreamID, .rootStream)
  119. XCTAssertEqual(error, .noError)
  120. XCTAssertEqual(data, ByteBuffer(string: "keepalive_expired"))
  121. }
  122. // Doesn't wait for streams to close: the connection is bad.
  123. try connection.waitUntilClosed()
  124. }
  125. func testPingsAreIgnored() throws {
  126. let connection = try Connection()
  127. try connection.activate()
  128. // PING frames without ack set should be ignored, we rely on the HTTP/2 handler replying to them.
  129. try connection.ping(data: HTTP2PingData(), ack: false)
  130. XCTAssertNil(try connection.readFrame())
  131. }
  132. func testReceiveGoAway() throws {
  133. let connection = try Connection()
  134. try connection.activate()
  135. try connection.goAway(
  136. lastStreamID: 0,
  137. errorCode: .enhanceYourCalm,
  138. opaqueData: ByteBuffer(string: "too_many_pings")
  139. )
  140. // Should read out an event and close (because there are no open streams).
  141. XCTAssertEqual(
  142. try connection.readEvent(),
  143. .closing(.goAway(.enhanceYourCalm, "too_many_pings"))
  144. )
  145. try connection.waitUntilClosed()
  146. }
  147. func testReceiveGoAwayWithOpenStreams() throws {
  148. let connection = try Connection()
  149. try connection.activate()
  150. connection.streamOpened(1)
  151. connection.streamOpened(2)
  152. connection.streamOpened(3)
  153. try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
  154. // Should read out an event.
  155. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
  156. // Close streams so the connection can close.
  157. connection.streamClosed(1)
  158. connection.streamClosed(2)
  159. connection.streamClosed(3)
  160. try connection.waitUntilClosed()
  161. }
  162. func testOutboundGracefulClose() throws {
  163. let connection = try Connection()
  164. try connection.activate()
  165. connection.streamOpened(1)
  166. let closed = connection.closeGracefully()
  167. XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
  168. connection.streamClosed(1)
  169. try closed.wait()
  170. }
  171. func testReceiveInitialSettings() throws {
  172. let connection = try Connection()
  173. try connection.activate()
  174. // Nothing yet.
  175. XCTAssertNil(try connection.readEvent())
  176. // Write the initial settings.
  177. try connection.settings([])
  178. XCTAssertEqual(try connection.readEvent(), .ready)
  179. // Receiving another settings frame should be a no-op.
  180. try connection.settings([])
  181. XCTAssertNil(try connection.readEvent())
  182. }
  183. func testReceiveErrorWhenIdle() throws {
  184. let connection = try Connection()
  185. try connection.activate()
  186. // Write the initial settings.
  187. try connection.settings([])
  188. XCTAssertEqual(try connection.readEvent(), .ready)
  189. // Write an error and close.
  190. let error = RPCError(code: .aborted, message: "")
  191. connection.channel.pipeline.fireErrorCaught(error)
  192. connection.channel.close(mode: .all, promise: nil)
  193. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: true)))
  194. }
  195. func testReceiveErrorWhenStreamsAreOpen() throws {
  196. let connection = try Connection()
  197. try connection.activate()
  198. // Write the initial settings.
  199. try connection.settings([])
  200. XCTAssertEqual(try connection.readEvent(), .ready)
  201. // Open a stream.
  202. connection.streamOpened(1)
  203. // Write an error and close.
  204. let error = RPCError(code: .aborted, message: "")
  205. connection.channel.pipeline.fireErrorCaught(error)
  206. connection.channel.close(mode: .all, promise: nil)
  207. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: false)))
  208. }
  209. func testUnexpectedCloseWhenIdle() throws {
  210. let connection = try Connection()
  211. try connection.activate()
  212. // Write the initial settings.
  213. try connection.settings([])
  214. XCTAssertEqual(try connection.readEvent(), .ready)
  215. connection.channel.close(mode: .all, promise: nil)
  216. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: true)))
  217. }
  218. func testUnexpectedCloseWhenStreamsAreOpen() throws {
  219. let connection = try Connection()
  220. try connection.activate()
  221. // Write the initial settings.
  222. try connection.settings([])
  223. XCTAssertEqual(try connection.readEvent(), .ready)
  224. connection.streamOpened(1)
  225. connection.channel.close(mode: .all, promise: nil)
  226. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: false)))
  227. }
  228. }
  229. extension ClientConnectionHandlerTests {
  230. struct Connection {
  231. let channel: EmbeddedChannel
  232. let streamDelegate: any NIOHTTP2StreamDelegate
  233. var loop: EmbeddedEventLoop {
  234. self.channel.embeddedEventLoop
  235. }
  236. init(
  237. maxIdleTime: TimeAmount? = nil,
  238. keepaliveTime: TimeAmount? = nil,
  239. keepaliveTimeout: TimeAmount? = nil,
  240. allowKeepaliveWithoutCalls: Bool = false
  241. ) throws {
  242. let loop = EmbeddedEventLoop()
  243. let handler = ClientConnectionHandler(
  244. eventLoop: loop,
  245. maxIdleTime: maxIdleTime,
  246. keepaliveTime: keepaliveTime,
  247. keepaliveTimeout: keepaliveTimeout,
  248. keepaliveWithoutCalls: allowKeepaliveWithoutCalls
  249. )
  250. self.streamDelegate = handler.http2StreamDelegate
  251. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  252. }
  253. func activate() throws {
  254. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  255. }
  256. func streamOpened(_ id: HTTP2StreamID) {
  257. self.streamDelegate.streamCreated(id, channel: self.channel)
  258. }
  259. func streamClosed(_ id: HTTP2StreamID) {
  260. self.streamDelegate.streamClosed(id, channel: self.channel)
  261. }
  262. func goAway(
  263. lastStreamID: HTTP2StreamID,
  264. errorCode: HTTP2ErrorCode,
  265. opaqueData: ByteBuffer? = nil
  266. ) throws {
  267. let frame = HTTP2Frame(
  268. streamID: .rootStream,
  269. payload: .goAway(lastStreamID: lastStreamID, errorCode: errorCode, opaqueData: opaqueData)
  270. )
  271. try self.channel.writeInbound(frame)
  272. }
  273. func ping(data: HTTP2PingData, ack: Bool) throws {
  274. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  275. try self.channel.writeInbound(frame)
  276. }
  277. func settings(_ settings: [HTTP2Setting]) throws {
  278. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  279. try self.channel.writeInbound(frame)
  280. }
  281. func readFrame() throws -> HTTP2Frame? {
  282. return try self.channel.readOutbound(as: HTTP2Frame.self)
  283. }
  284. func readEvent() throws -> ClientConnectionEvent? {
  285. return try self.channel.readInbound(as: ClientConnectionEvent.self)
  286. }
  287. func waitUntilClosed() throws {
  288. self.channel.embeddedEventLoop.run()
  289. try self.channel.closeFuture.wait()
  290. }
  291. func closeGracefully() -> EventLoopFuture<Void> {
  292. let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
  293. let event = ClientConnectionHandler.OutboundEvent.closeGracefully
  294. self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
  295. return promise.futureResult
  296. }
  297. }
  298. }