Jelajahi Sumber

Delay the point at which a connect 'succeeds' (#1864)

Motivation:

One issue with the connection merged in #1859 is that it considers
connect succeeded to be the point at which the TCP connection succeeds.
However, when considered as an HTTP/2 connection, this is too early: the
connection may fail a TLS handshake or the server might not be an HTTP/2
server. The connect should only be considered a success when the initial
SETTINGS frame is received from the server.

Modifications:

- Add a new connection event 'ready' and react to this appropriately in
  the connection
- Add tests

Result:

The `Connection` is only considered ready when the first SETTINGS frame
is received.
George Barnett 1 tahun lalu
induk
melakukan
2d56a49303

+ 36 - 0
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -31,6 +31,9 @@ public enum ClientConnectionEvent: Sendable, Hashable {
     case initiatedLocally
   }
 
+  /// The connection is now ready.
+  case ready
+
   /// The connection has started shutting down, no new streams should be created.
   case closing(CloseReason)
 }
@@ -208,6 +211,16 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
         }
       }
 
+    case .settings(.settings(_)):
+      let isInitialSettings = self.state.receivedSettings()
+
+      // The first settings frame indicates that the connection is now ready to use. The channel
+      // becoming active is insufficient as, for example, a TLS handshake may fail after
+      // establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
+      if isInitialSettings {
+        context.fireChannelRead(self.wrapInboundOut(.ready))
+      }
+
     default:
       ()
     }
@@ -323,10 +336,18 @@ extension ClientConnectionHandler {
       struct Active {
         var openStreams: Set<HTTP2StreamID>
         var allowKeepaliveWithoutCalls: Bool
+        var receivedConnectionPreface: Bool
 
         init(allowKeepaliveWithoutCalls: Bool) {
           self.openStreams = []
           self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
+          self.receivedConnectionPreface = false
+        }
+
+        mutating func receivedSettings() -> Bool {
+          let isFirstSettingsFrame = !self.receivedConnectionPreface
+          self.receivedConnectionPreface = true
+          return isFirstSettingsFrame
         }
       }
 
@@ -347,6 +368,21 @@ extension ClientConnectionHandler {
       self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
     }
 
+    /// Record that a SETTINGS frame was received from the remote peer.
+    ///
+    /// - Returns: `true` if this was the first SETTINGS frame received.
+    mutating func receivedSettings() -> Bool {
+      switch self.state {
+      case .active(var active):
+        let isFirstSettingsFrame = active.receivedSettings()
+        self.state = .active(active)
+        return isFirstSettingsFrame
+
+      case .closing, .closed:
+        return false
+      }
+    }
+
     /// Record that the stream with the given ID has been opened.
     mutating func streamOpened(_ id: HTTP2StreamID) {
       switch self.state {

+ 60 - 21
Sources/GRPCHTTP2Core/Client/Connection/Connection.swift

@@ -139,8 +139,6 @@ struct Connection: Sendable {
         state.connected(connected)
       }
 
-      self.event.continuation.yield(.connectSucceeded)
-
       await withDiscardingTaskGroup { group in
         // Add a task to run the connection and consume events.
         group.addTask {
@@ -229,11 +227,34 @@ struct Connection: Sendable {
   private func consumeConnectionEvents(
     _ connectionEvents: NIOAsyncChannelInboundStream<ClientConnectionEvent>
   ) async {
+    // The connection becomes 'ready' when the initial HTTP/2 SETTINGS frame is received.
+    // Establishing a TCP connection is insufficient as the TLS handshake may not complete or the
+    // server might not be configured for gRPC or HTTP/2.
+    //
+    // This state is tracked here so that if the connection events sequence finishes and the
+    // connection never became ready then the connection can report that the connect failed.
+    var isReady = false
+
+    func makeNeverReadyError(cause: (any Error)?) -> RPCError {
+      return RPCError(
+        code: .unavailable,
+        message: """
+          The server accepted the TCP connection but closed the connection before completing \
+          the HTTP/2 connection preface.
+          """,
+        cause: cause
+      )
+    }
+
     do {
       var channelCloseReason: ClientConnectionEvent.CloseReason?
 
       for try await connectionEvent in connectionEvents {
         switch connectionEvent {
+        case .ready:
+          isReady = true
+          self.event.continuation.yield(.connectSucceeded)
+
         case .closing(let reason):
           self.state.withLockedValue { $0.closing() }
 
@@ -256,33 +277,50 @@ struct Connection: Sendable {
         }
       }
 
-      let connectionCloseReason: Self.CloseReason
-      switch channelCloseReason {
-      case .keepaliveExpired:
-        connectionCloseReason = .keepaliveTimeout
+      let finalEvent: Event
+      if isReady {
+        let connectionCloseReason: Self.CloseReason
+        switch channelCloseReason {
+        case .keepaliveExpired:
+          connectionCloseReason = .keepaliveTimeout
 
-      case .idle:
-        // Connection became idle, that's fine.
-        connectionCloseReason = .idleTimeout
+        case .idle:
+          // Connection became idle, that's fine.
+          connectionCloseReason = .idleTimeout
 
-      case .goAway:
-        // Remote peer told us to GOAWAY.
-        connectionCloseReason = .remote
+        case .goAway:
+          // Remote peer told us to GOAWAY.
+          connectionCloseReason = .remote
 
-      case .initiatedLocally, .none:
-        // Shutdown was initiated locally.
-        connectionCloseReason = .initiatedLocally
+        case .initiatedLocally, .none:
+          // Shutdown was initiated locally.
+          connectionCloseReason = .initiatedLocally
+        }
+
+        finalEvent = .closed(connectionCloseReason)
+      } else {
+        // The connection never became ready, this therefore counts as a failed connect attempt.
+        finalEvent = .connectFailed(makeNeverReadyError(cause: nil))
       }
 
       // The connection events sequence has finished: the connection is now closed.
       self.state.withLockedValue { $0.closed() }
-      self.finishStreams(withEvent: .closed(connectionCloseReason))
+      self.finishStreams(withEvent: finalEvent)
     } catch {
-      // Any error must come from consuming the inbound channel meaning that the connection
-      // must be borked, wrap it up and close.
-      let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error)
+      let finalEvent: Event
+
+      if isReady {
+        // Any error must come from consuming the inbound channel meaning that the connection
+        // must be borked, wrap it up and close.
+        let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error)
+        finalEvent = .closed(.error(rpcError))
+      } else {
+        // The connection never became ready, this therefore counts as a failed connect attempt.
+        finalEvent = .connectFailed(makeNeverReadyError(cause: error))
+      }
+
       self.state.withLockedValue { $0.closed() }
-      self.finishStreams(withEvent: .closed(.error(rpcError)))
+      self.finishStreams(withEvent: finalEvent)
     }
   }
 
@@ -356,7 +394,8 @@ extension Connection {
   private enum State {
     /// The connection is idle or connecting.
     case notConnected
-    /// A connection has been established with the remote peer.
+    /// A TCP connection has been established with the remote peer. However, the connection may not
+    /// be ready to use yet.
     case connected(Connected)
     /// The connection has started to close. This may be initiated locally or by the remote.
     case closing

+ 21 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift

@@ -205,6 +205,22 @@ final class ClientConnectionHandlerTests: XCTestCase {
     connection.streamClosed(1)
     try closed.wait()
   }
+
+  func testReceiveInitialSettings() throws {
+    let connection = try Connection()
+    try connection.activate()
+
+    // Nothing yet.
+    XCTAssertNil(try connection.readEvent())
+
+    // Write the initial settings.
+    try connection.settings([])
+    XCTAssertEqual(try connection.readEvent(), .ready)
+
+    // Receiving another settings frame should be a no-op.
+    try connection.settings([])
+    XCTAssertNil(try connection.readEvent())
+  }
 }
 
 extension ClientConnectionHandlerTests {
@@ -268,6 +284,11 @@ extension ClientConnectionHandlerTests {
       try self.channel.writeInbound(frame)
     }
 
+    func settings(_ settings: [HTTP2Setting]) throws {
+      let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
+      try self.channel.writeInbound(frame)
+    }
+
     func readFrame() throws -> HTTP2Frame? {
       return try self.channel.readOutbound(as: HTTP2Frame.self)
     }

+ 15 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ConnectionTests.swift

@@ -89,6 +89,21 @@ final class ConnectionTests: XCTestCase {
     }
   }
 
+  func testConnectFailsOnAcceptedThenClosedTCPConnection() async throws {
+    try await ConnectionTest.run(connector: .posix(), server: .closeOnAccept) { _, events in
+      XCTAssertEqual(events.count, 1)
+      let event = try XCTUnwrap(events.first)
+      switch event {
+      case .connectFailed(let error):
+        XCTAssert(error, as: RPCError.self) { rpcError in
+          XCTAssertEqual(rpcError.code, .unavailable)
+        }
+      default:
+        XCTFail("Expected '.connectFailed', got '\(event)'")
+      }
+    }
+  }
+
   func testMakeStreamOnActiveConnection() async throws {
     try await ConnectionTest.run(connector: .posix()) { context, event in
       switch event {

+ 36 - 22
Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift

@@ -30,13 +30,14 @@ enum ConnectionTest {
 
   static func run(
     connector: HTTP2Connector,
+    server mode: Server.Mode = .regular,
     handlEvents: (
       _ context: Context,
       _ event: Connection.Event
     ) async throws -> Void = { _, _ in },
-    validateEvents: (_ context: Context, _ events: [Connection.Event]) -> Void
+    validateEvents: (_ context: Context, _ events: [Connection.Event]) throws -> Void
   ) async throws {
-    let server = Server()
+    let server = Server(mode: mode)
     let address = try await server.bind()
 
     try await withThrowingTaskGroup(of: Void.self) { group in
@@ -55,7 +56,7 @@ enum ConnectionTest {
         try await handlEvents(context, event)
       }
 
-      validateEvents(context, events)
+      try validateEvents(context, events)
     }
   }
 }
@@ -67,8 +68,15 @@ extension ConnectionTest {
     private let eventLoop: any EventLoop
     private var listener: (any Channel)?
     private let client: EventLoopPromise<Channel>
+    private let mode: Mode
 
-    init() {
+    enum Mode {
+      case regular
+      case closeOnAccept
+    }
+
+    init(mode: Mode) {
+      self.mode = mode
       self.eventLoop = .singletonMultiThreadedEventLoopGroup.next()
       self.client = self.eventLoop.next().makePromise()
     }
@@ -95,26 +103,32 @@ extension ConnectionTest {
         precondition(!hasAcceptedChannel.value, "already accepted a channel")
         hasAcceptedChannel.value = true
 
-        return channel.eventLoop.makeCompletedFuture {
-          let sync = channel.pipeline.syncOperations
-          let h2 = NIOHTTP2Handler(mode: .server)
-          let mux = HTTP2StreamMultiplexer(mode: .server, channel: channel) { stream in
-            let sync = stream.pipeline.syncOperations
-            let handler = GRPCServerStreamHandler(
-              scheme: .http,
-              acceptedEncodings: .none,
-              maximumPayloadSize: .max
-            )
-
-            return stream.eventLoop.makeCompletedFuture {
-              try sync.addHandler(handler)
-              try sync.addHandler(EchoHandler())
+        switch self.mode {
+        case .closeOnAccept:
+          return channel.close()
+
+        case .regular:
+          return channel.eventLoop.makeCompletedFuture {
+            let sync = channel.pipeline.syncOperations
+            let h2 = NIOHTTP2Handler(mode: .server)
+            let mux = HTTP2StreamMultiplexer(mode: .server, channel: channel) { stream in
+              let sync = stream.pipeline.syncOperations
+              let handler = GRPCServerStreamHandler(
+                scheme: .http,
+                acceptedEncodings: .none,
+                maximumPayloadSize: .max
+              )
+
+              return stream.eventLoop.makeCompletedFuture {
+                try sync.addHandler(handler)
+                try sync.addHandler(EchoHandler())
+              }
             }
-          }
 
-          try sync.addHandler(h2)
-          try sync.addHandler(mux)
-          try sync.addHandlers(SucceedOnSettingsAck(promise: self.client))
+            try sync.addHandler(h2)
+            try sync.addHandler(mux)
+            try sync.addHandlers(SucceedOnSettingsAck(promise: self.client))
+          }
         }
       }
 

+ 8 - 0
Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift

@@ -59,3 +59,11 @@ func XCTAssertThrowsErrorAsync<T, E: Error>(
     XCTFail("Error had unexpected type '\(type(of: error))'")
   }
 }
+
+func XCTAssert<T>(_ value: Any, as type: T.Type, _ verify: (T) throws -> Void) rethrows {
+  if let value = value as? T {
+    try verify(value)
+  } else {
+    XCTFail("\(value) couldn't be cast to \(T.self)")
+  }
+}