ClientConnectionHandlerTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHTTP2
  21. import XCTest
  22. final class ClientConnectionHandlerTests: XCTestCase {
  23. func testMaxIdleTime() throws {
  24. let connection = try Connection(maxIdleTime: .minutes(5))
  25. try connection.activate()
  26. // Write the initial settings to ready the connection.
  27. try connection.settings([])
  28. XCTAssertEqual(try connection.readEvent(), .ready)
  29. // Idle with no streams open we should:
  30. // - read out a closing event,
  31. // - write a GOAWAY frame,
  32. // - close.
  33. connection.loop.advanceTime(by: .minutes(5))
  34. XCTAssertEqual(try connection.readEvent(), .closing(.idle))
  35. let frame = try XCTUnwrap(try connection.readFrame())
  36. XCTAssertEqual(frame.streamID, .rootStream)
  37. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  38. XCTAssertEqual(lastStreamID, .rootStream)
  39. XCTAssertEqual(error, .noError)
  40. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  41. }
  42. try connection.waitUntilClosed()
  43. }
  44. func testMaxIdleTimeWhenOpenStreams() throws {
  45. let connection = try Connection(maxIdleTime: .minutes(5))
  46. try connection.activate()
  47. // Open a stream, the idle timer should be cancelled.
  48. connection.streamOpened(1)
  49. // Advance by the idle time, nothing should happen.
  50. connection.loop.advanceTime(by: .minutes(5))
  51. XCTAssertNil(try connection.readEvent())
  52. XCTAssertNil(try connection.readFrame())
  53. // Close the stream, the idle timer should begin again.
  54. connection.streamClosed(1)
  55. connection.loop.advanceTime(by: .minutes(5))
  56. let frame = try XCTUnwrap(try connection.readFrame())
  57. XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
  58. XCTAssertEqual(lastStreamID, .rootStream)
  59. XCTAssertEqual(error, .noError)
  60. XCTAssertEqual(data, ByteBuffer(string: "idle"))
  61. }
  62. try connection.waitUntilClosed()
  63. }
  64. func testKeepaliveWithOpenStreams() throws {
  65. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  66. try connection.activate()
  67. // Write the initial settings to ready the connection.
  68. try connection.settings([])
  69. XCTAssertEqual(try connection.readEvent(), .ready)
  70. // Open a stream so keep-alive starts.
  71. connection.streamOpened(1)
  72. for _ in 0 ..< 10 {
  73. // Advance time, a PING should be sent, ACK it.
  74. connection.loop.advanceTime(by: .minutes(1))
  75. let frame1 = try XCTUnwrap(connection.readFrame())
  76. XCTAssertEqual(frame1.streamID, .rootStream)
  77. try XCTAssertPing(frame1.payload) { data, ack in
  78. XCTAssertFalse(ack)
  79. try connection.ping(data: data, ack: true)
  80. }
  81. XCTAssertNil(try connection.readFrame())
  82. }
  83. // Close the stream, keep-alive pings should stop.
  84. connection.streamClosed(1)
  85. connection.loop.advanceTime(by: .minutes(1))
  86. XCTAssertNil(try connection.readFrame())
  87. }
  88. func testKeepaliveWithNoOpenStreams() throws {
  89. let connection = try Connection(keepaliveTime: .minutes(1), allowKeepaliveWithoutCalls: true)
  90. try connection.activate()
  91. // Write the initial settings to ready the connection.
  92. try connection.settings([])
  93. XCTAssertEqual(try connection.readEvent(), .ready)
  94. for _ in 0 ..< 10 {
  95. // Advance time, a PING should be sent, ACK it.
  96. connection.loop.advanceTime(by: .minutes(1))
  97. let frame1 = try XCTUnwrap(connection.readFrame())
  98. XCTAssertEqual(frame1.streamID, .rootStream)
  99. try XCTAssertPing(frame1.payload) { data, ack in
  100. XCTAssertFalse(ack)
  101. try connection.ping(data: data, ack: true)
  102. }
  103. XCTAssertNil(try connection.readFrame())
  104. }
  105. }
  106. func testKeepaliveWithOpenStreamsTimingOut() throws {
  107. let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
  108. try connection.activate()
  109. // Write the initial settings to ready the connection.
  110. try connection.settings([])
  111. XCTAssertEqual(try connection.readEvent(), .ready)
  112. // Open a stream so keep-alive starts.
  113. connection.streamOpened(1)
  114. // Advance time, a PING should be sent, don't ACK it.
  115. connection.loop.advanceTime(by: .minutes(1))
  116. let frame1 = try XCTUnwrap(connection.readFrame())
  117. XCTAssertEqual(frame1.streamID, .rootStream)
  118. XCTAssertPing(frame1.payload) { _, ack in
  119. XCTAssertFalse(ack)
  120. }
  121. // Advance time by the keep alive timeout. We should:
  122. // - read a connection event
  123. // - read out a GOAWAY frame
  124. // - be closed
  125. connection.loop.advanceTime(by: .seconds(10))
  126. XCTAssertEqual(try connection.readEvent(), .closing(.keepaliveExpired))
  127. let frame2 = try XCTUnwrap(connection.readFrame())
  128. XCTAssertEqual(frame2.streamID, .rootStream)
  129. XCTAssertGoAway(frame2.payload) { lastStreamID, error, data in
  130. XCTAssertEqual(lastStreamID, .rootStream)
  131. XCTAssertEqual(error, .noError)
  132. XCTAssertEqual(data, ByteBuffer(string: "keepalive_expired"))
  133. }
  134. // Doesn't wait for streams to close: the connection is bad.
  135. try connection.waitUntilClosed()
  136. }
  137. func testPingsAreIgnored() throws {
  138. let connection = try Connection()
  139. try connection.activate()
  140. // PING frames without ack set should be ignored, we rely on the HTTP/2 handler replying to them.
  141. try connection.ping(data: HTTP2PingData(), ack: false)
  142. XCTAssertNil(try connection.readFrame())
  143. }
  144. func testReceiveGoAway() throws {
  145. let connection = try Connection()
  146. try connection.activate()
  147. try connection.goAway(
  148. lastStreamID: 0,
  149. errorCode: .enhanceYourCalm,
  150. opaqueData: ByteBuffer(string: "too_many_pings")
  151. )
  152. // Should read out an event and close (because there are no open streams).
  153. XCTAssertEqual(
  154. try connection.readEvent(),
  155. .closing(.goAway(.enhanceYourCalm, "too_many_pings"))
  156. )
  157. try connection.waitUntilClosed()
  158. }
  159. func testReceiveGoAwayWithOpenStreams() throws {
  160. let connection = try Connection()
  161. try connection.activate()
  162. connection.streamOpened(1)
  163. connection.streamOpened(2)
  164. connection.streamOpened(3)
  165. try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
  166. // Should read out an event.
  167. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
  168. // Close streams so the connection can close.
  169. connection.streamClosed(1)
  170. connection.streamClosed(2)
  171. connection.streamClosed(3)
  172. try connection.waitUntilClosed()
  173. }
  174. func testGoAwayWithNoErrorThenGoAwayWithProtocolError() throws {
  175. let connection = try Connection()
  176. try connection.activate()
  177. connection.streamOpened(1)
  178. connection.streamOpened(2)
  179. connection.streamOpened(3)
  180. try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
  181. // Should read out an event.
  182. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
  183. // Upgrade the close from graceful to 'error'.
  184. try connection.goAway(lastStreamID: .maxID, errorCode: .protocolError)
  185. // Should read out an event and the connection will be closed without waiting for notification
  186. // from existing streams.
  187. XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.protocolError, "")))
  188. try connection.waitUntilClosed()
  189. }
  190. func testOutboundGracefulClose() throws {
  191. let connection = try Connection()
  192. try connection.activate()
  193. connection.streamOpened(1)
  194. let closed = connection.closeGracefully()
  195. XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
  196. connection.streamClosed(1)
  197. try closed.wait()
  198. }
  199. func testReceiveInitialSettings() throws {
  200. let connection = try Connection()
  201. try connection.activate()
  202. // Nothing yet.
  203. XCTAssertNil(try connection.readEvent())
  204. // Write the initial settings.
  205. try connection.settings([])
  206. XCTAssertEqual(try connection.readEvent(), .ready)
  207. // Receiving another settings frame should be a no-op.
  208. try connection.settings([])
  209. XCTAssertNil(try connection.readEvent())
  210. }
  211. func testReceiveErrorWhenIdle() throws {
  212. let connection = try Connection()
  213. try connection.activate()
  214. // Write the initial settings.
  215. try connection.settings([])
  216. XCTAssertEqual(try connection.readEvent(), .ready)
  217. // Write an error and close.
  218. let error = RPCError(code: .aborted, message: "")
  219. connection.channel.pipeline.fireErrorCaught(error)
  220. connection.channel.close(mode: .all, promise: nil)
  221. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: true)))
  222. }
  223. func testReceiveErrorWhenStreamsAreOpen() throws {
  224. let connection = try Connection()
  225. try connection.activate()
  226. // Write the initial settings.
  227. try connection.settings([])
  228. XCTAssertEqual(try connection.readEvent(), .ready)
  229. // Open a stream.
  230. connection.streamOpened(1)
  231. // Write an error and close.
  232. let error = RPCError(code: .aborted, message: "")
  233. connection.channel.pipeline.fireErrorCaught(error)
  234. connection.channel.close(mode: .all, promise: nil)
  235. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(error, isIdle: false)))
  236. }
  237. func testUnexpectedCloseWhenIdle() throws {
  238. let connection = try Connection()
  239. try connection.activate()
  240. // Write the initial settings.
  241. try connection.settings([])
  242. XCTAssertEqual(try connection.readEvent(), .ready)
  243. connection.channel.close(mode: .all, promise: nil)
  244. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: true)))
  245. }
  246. func testUnexpectedCloseWhenStreamsAreOpen() throws {
  247. let connection = try Connection()
  248. try connection.activate()
  249. // Write the initial settings.
  250. try connection.settings([])
  251. XCTAssertEqual(try connection.readEvent(), .ready)
  252. connection.streamOpened(1)
  253. connection.channel.close(mode: .all, promise: nil)
  254. XCTAssertEqual(try connection.readEvent(), .closing(.unexpected(nil, isIdle: false)))
  255. }
  256. }
  257. extension ClientConnectionHandlerTests {
  258. struct Connection {
  259. let channel: EmbeddedChannel
  260. let streamDelegate: any NIOHTTP2StreamDelegate
  261. var loop: EmbeddedEventLoop {
  262. self.channel.embeddedEventLoop
  263. }
  264. init(
  265. maxIdleTime: TimeAmount? = nil,
  266. keepaliveTime: TimeAmount? = nil,
  267. keepaliveTimeout: TimeAmount? = nil,
  268. allowKeepaliveWithoutCalls: Bool = false
  269. ) throws {
  270. let loop = EmbeddedEventLoop()
  271. let handler = ClientConnectionHandler(
  272. eventLoop: loop,
  273. maxIdleTime: maxIdleTime,
  274. keepaliveTime: keepaliveTime,
  275. keepaliveTimeout: keepaliveTimeout,
  276. keepaliveWithoutCalls: allowKeepaliveWithoutCalls
  277. )
  278. self.streamDelegate = handler.http2StreamDelegate
  279. self.channel = EmbeddedChannel(handler: handler, loop: loop)
  280. }
  281. func activate() throws {
  282. try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
  283. }
  284. func streamOpened(_ id: HTTP2StreamID) {
  285. self.streamDelegate.streamCreated(id, channel: self.channel)
  286. }
  287. func streamClosed(_ id: HTTP2StreamID) {
  288. self.streamDelegate.streamClosed(id, channel: self.channel)
  289. }
  290. func goAway(
  291. lastStreamID: HTTP2StreamID,
  292. errorCode: HTTP2ErrorCode,
  293. opaqueData: ByteBuffer? = nil
  294. ) throws {
  295. let frame = HTTP2Frame(
  296. streamID: .rootStream,
  297. payload: .goAway(lastStreamID: lastStreamID, errorCode: errorCode, opaqueData: opaqueData)
  298. )
  299. try self.channel.writeInbound(frame)
  300. }
  301. func ping(data: HTTP2PingData, ack: Bool) throws {
  302. let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
  303. try self.channel.writeInbound(frame)
  304. }
  305. func settings(_ settings: [HTTP2Setting]) throws {
  306. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
  307. try self.channel.writeInbound(frame)
  308. }
  309. func readFrame() throws -> HTTP2Frame? {
  310. return try self.channel.readOutbound(as: HTTP2Frame.self)
  311. }
  312. func readEvent() throws -> ClientConnectionEvent? {
  313. return try self.channel.readInbound(as: ClientConnectionEvent.self)
  314. }
  315. func waitUntilClosed() throws {
  316. self.channel.embeddedEventLoop.run()
  317. try self.channel.closeFuture.wait()
  318. }
  319. func closeGracefully() -> EventLoopFuture<Void> {
  320. let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
  321. let event = ClientConnectionHandler.OutboundEvent.closeGracefully
  322. self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
  323. return promise.futureResult
  324. }
  325. }
  326. }