| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- @_spi(Package) @testable import GRPCHTTP2Core
- import NIOCore
- import NIOEmbedded
- import NIOHTTP2
- import XCTest
- final class ClientConnectionHandlerTests: XCTestCase {
- func testMaxIdleTime() throws {
- let connection = try Connection(maxIdleTime: .minutes(5))
- try connection.activate()
- // Idle with no streams open we should:
- // - read out a closing event,
- // - write a GOAWAY frame,
- // - close.
- connection.loop.advanceTime(by: .minutes(5))
- XCTAssertEqual(try connection.readEvent(), .closing(.idle))
- let frame = try XCTUnwrap(try connection.readFrame())
- XCTAssertEqual(frame.streamID, .rootStream)
- XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
- XCTAssertEqual(lastStreamID, .rootStream)
- XCTAssertEqual(error, .noError)
- XCTAssertEqual(data, ByteBuffer(string: "idle"))
- }
- try connection.waitUntilClosed()
- }
- func testMaxIdleTimeWhenOpenStreams() throws {
- let connection = try Connection(maxIdleTime: .minutes(5))
- try connection.activate()
- // Open a stream, the idle timer should be cancelled.
- connection.streamOpened(1)
- // Advance by the idle time, nothing should happen.
- connection.loop.advanceTime(by: .minutes(5))
- XCTAssertNil(try connection.readEvent())
- XCTAssertNil(try connection.readFrame())
- // Close the stream, the idle timer should begin again.
- connection.streamClosed(1)
- connection.loop.advanceTime(by: .minutes(5))
- let frame = try XCTUnwrap(try connection.readFrame())
- XCTAssertGoAway(frame.payload) { lastStreamID, error, data in
- XCTAssertEqual(lastStreamID, .rootStream)
- XCTAssertEqual(error, .noError)
- XCTAssertEqual(data, ByteBuffer(string: "idle"))
- }
- try connection.waitUntilClosed()
- }
- func testKeepaliveWithOpenStreams() throws {
- let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
- try connection.activate()
- // Open a stream so keep-alive starts.
- connection.streamOpened(1)
- for _ in 0 ..< 10 {
- // Advance time, a PING should be sent, ACK it.
- connection.loop.advanceTime(by: .minutes(1))
- let frame1 = try XCTUnwrap(connection.readFrame())
- XCTAssertEqual(frame1.streamID, .rootStream)
- try XCTAssertPing(frame1.payload) { data, ack in
- XCTAssertFalse(ack)
- try connection.ping(data: data, ack: true)
- }
- XCTAssertNil(try connection.readFrame())
- }
- // Close the stream, keep-alive pings should stop.
- connection.streamClosed(1)
- connection.loop.advanceTime(by: .minutes(1))
- XCTAssertNil(try connection.readFrame())
- }
- func testKeepaliveWithNoOpenStreams() throws {
- let connection = try Connection(keepaliveTime: .minutes(1), allowKeepaliveWithoutCalls: true)
- try connection.activate()
- for _ in 0 ..< 10 {
- // Advance time, a PING should be sent, ACK it.
- connection.loop.advanceTime(by: .minutes(1))
- let frame1 = try XCTUnwrap(connection.readFrame())
- XCTAssertEqual(frame1.streamID, .rootStream)
- try XCTAssertPing(frame1.payload) { data, ack in
- XCTAssertFalse(ack)
- try connection.ping(data: data, ack: true)
- }
- XCTAssertNil(try connection.readFrame())
- }
- }
- func testKeepaliveWithOpenStreamsTimingOut() throws {
- let connection = try Connection(keepaliveTime: .minutes(1), keepaliveTimeout: .seconds(10))
- try connection.activate()
- // Open a stream so keep-alive starts.
- connection.streamOpened(1)
- // Advance time, a PING should be sent, don't ACK it.
- connection.loop.advanceTime(by: .minutes(1))
- let frame1 = try XCTUnwrap(connection.readFrame())
- XCTAssertEqual(frame1.streamID, .rootStream)
- XCTAssertPing(frame1.payload) { _, ack in
- XCTAssertFalse(ack)
- }
- // Advance time by the keep alive timeout. We should:
- // - read a connection event
- // - read out a GOAWAY frame
- // - be closed
- connection.loop.advanceTime(by: .seconds(10))
- XCTAssertEqual(try connection.readEvent(), .closing(.keepaliveExpired))
- let frame2 = try XCTUnwrap(connection.readFrame())
- XCTAssertEqual(frame2.streamID, .rootStream)
- XCTAssertGoAway(frame2.payload) { lastStreamID, error, data in
- XCTAssertEqual(lastStreamID, .rootStream)
- XCTAssertEqual(error, .noError)
- XCTAssertEqual(data, ByteBuffer(string: "keepalive_expired"))
- }
- // Doesn't wait for streams to close: the connection is bad.
- try connection.waitUntilClosed()
- }
- func testPingsAreIgnored() throws {
- let connection = try Connection()
- try connection.activate()
- // PING frames without ack set should be ignored, we rely on the HTTP/2 handler replying to them.
- try connection.ping(data: HTTP2PingData(), ack: false)
- XCTAssertNil(try connection.readFrame())
- }
- func testReceiveGoAway() throws {
- let connection = try Connection()
- try connection.activate()
- try connection.goAway(
- lastStreamID: 0,
- errorCode: .enhanceYourCalm,
- opaqueData: ByteBuffer(string: "too_many_pings")
- )
- // Should read out an event and close (because there are no open streams).
- XCTAssertEqual(
- try connection.readEvent(),
- .closing(.goAway(.enhanceYourCalm, "too_many_pings"))
- )
- try connection.waitUntilClosed()
- }
- func testReceiveGoAwayWithOpenStreams() throws {
- let connection = try Connection()
- try connection.activate()
- connection.streamOpened(1)
- connection.streamOpened(2)
- connection.streamOpened(3)
- try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
- // Should read out an event.
- XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
- // Close streams so the connection can close.
- connection.streamClosed(1)
- connection.streamClosed(2)
- connection.streamClosed(3)
- try connection.waitUntilClosed()
- }
- func testOutboundGracefulClose() throws {
- let connection = try Connection()
- try connection.activate()
- connection.streamOpened(1)
- let closed = connection.closeGracefully()
- XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
- connection.streamClosed(1)
- try closed.wait()
- }
- func testReceiveInitialSettings() throws {
- let connection = try Connection()
- try connection.activate()
- // Nothing yet.
- XCTAssertNil(try connection.readEvent())
- // Write the initial settings.
- try connection.settings([])
- XCTAssertEqual(try connection.readEvent(), .ready)
- // Receiving another settings frame should be a no-op.
- try connection.settings([])
- XCTAssertNil(try connection.readEvent())
- }
- }
- extension ClientConnectionHandlerTests {
- struct Connection {
- let channel: EmbeddedChannel
- let streamDelegate: any NIOHTTP2StreamDelegate
- var loop: EmbeddedEventLoop {
- self.channel.embeddedEventLoop
- }
- init(
- maxIdleTime: TimeAmount? = nil,
- keepaliveTime: TimeAmount? = nil,
- keepaliveTimeout: TimeAmount? = nil,
- allowKeepaliveWithoutCalls: Bool = false
- ) throws {
- let loop = EmbeddedEventLoop()
- let handler = ClientConnectionHandler(
- eventLoop: loop,
- maxIdleTime: maxIdleTime,
- keepaliveTime: keepaliveTime,
- keepaliveTimeout: keepaliveTimeout,
- keepaliveWithoutCalls: allowKeepaliveWithoutCalls
- )
- self.streamDelegate = handler.http2StreamDelegate
- self.channel = EmbeddedChannel(handler: handler, loop: loop)
- }
- func activate() throws {
- try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait()
- }
- func streamOpened(_ id: HTTP2StreamID) {
- self.streamDelegate.streamCreated(id, channel: self.channel)
- }
- func streamClosed(_ id: HTTP2StreamID) {
- self.streamDelegate.streamClosed(id, channel: self.channel)
- }
- func goAway(
- lastStreamID: HTTP2StreamID,
- errorCode: HTTP2ErrorCode,
- opaqueData: ByteBuffer? = nil
- ) throws {
- let frame = HTTP2Frame(
- streamID: .rootStream,
- payload: .goAway(lastStreamID: lastStreamID, errorCode: errorCode, opaqueData: opaqueData)
- )
- try self.channel.writeInbound(frame)
- }
- func ping(data: HTTP2PingData, ack: Bool) throws {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: ack))
- try self.channel.writeInbound(frame)
- }
- func settings(_ settings: [HTTP2Setting]) throws {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
- try self.channel.writeInbound(frame)
- }
- func readFrame() throws -> HTTP2Frame? {
- return try self.channel.readOutbound(as: HTTP2Frame.self)
- }
- func readEvent() throws -> ClientConnectionEvent? {
- return try self.channel.readInbound(as: ClientConnectionEvent.self)
- }
- func waitUntilClosed() throws {
- self.channel.embeddedEventLoop.run()
- try self.channel.closeFuture.wait()
- }
- func closeGracefully() -> EventLoopFuture<Void> {
- let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
- let event = ClientConnectionHandler.OutboundEvent.closeGracefully
- self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
- return promise.futureResult
- }
- }
- }
|