Ver código fonte

Better closing control over GOAWAYs (#1984)

Motivation:

The v2 client treats all GOAWAY frames equally and, if not already
closing, will stop creating streams on that connection. This is okay in
the 'normal' case where the error code is NO_ERROR. However if there's a
protocol violation or some other connection level error the connection
should be closed.

Modifications:

- Have the closing state record whether it's closing gracefully allowing
  for closes to be 'upgraded' (from graceful to not-so-graceful).
- Check whether the error code in the GOAWAY frame is NO_ERROR and take
  the non-graceful path if the code isn't NO_ERROR.
- Add missing 'fireChannelInactive'
- Add tests

Result:

Client will close shut down on a protocol error
George Barnett 1 ano atrás
pai
commit
c671f832c6

+ 37 - 18
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -144,6 +144,7 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
 
     self.keepaliveTimer?.cancel()
     self.keepaliveTimeoutTimer.cancel()
+    context.fireChannelInactive()
   }
 
   package func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
@@ -175,22 +176,32 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
 
     switch frame.payload {
     case .goAway(_, let errorCode, let data):
-      // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
-      // closing the connection.
-      switch self.state.beginGracefulShutdown(promise: nil) {
-      case .sendGoAway(let close):
-        // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
-        let message = data.map { String(buffer: $0) } ?? ""
-        context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
-
-        // Clients should send GOAWAYs when closing a connection.
-        self.writeAndFlushGoAway(context: context, errorCode: .noError)
-        if close {
+      if errorCode == .noError {
+        // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
+        // closing the connection.
+        switch self.state.beginGracefulShutdown(promise: nil) {
+        case .sendGoAway(let close):
+          // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
+          let message = data.map { String(buffer: $0) } ?? ""
+          context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
+
+          // Clients should send GOAWAYs when closing a connection.
+          self.writeAndFlushGoAway(context: context, errorCode: .noError)
+          if close {
+            context.close(promise: nil)
+          }
+
+        case .none:
+          ()
+        }
+      } else {
+        // Some error, begin closing.
+        if self.state.beginClosing() {
+          // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
+          let message = data.map { String(buffer: $0) } ?? ""
+          context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
           context.close(promise: nil)
         }
-
-      case .none:
-        ()
       }
 
     case .ping(let data, let ack):
@@ -459,9 +470,11 @@ extension ClientConnectionHandler {
         var allowKeepaliveWithoutCalls: Bool
         var openStreams: Set<HTTP2StreamID>
         var closePromise: Optional<EventLoopPromise<Void>>
+        var isGraceful: Bool
 
-        init(from state: Active, closePromise: EventLoopPromise<Void>?) {
+        init(from state: Active, isGraceful: Bool, closePromise: EventLoopPromise<Void>?) {
           self.openStreams = state.openStreams
+          self.isGraceful = isGraceful
           self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
           self.closePromise = closePromise
         }
@@ -605,7 +618,7 @@ extension ClientConnectionHandler {
         // ratchet down the last stream ID as only the client creates streams in gRPC.
         let close = state.openStreams.isEmpty
         onGracefulShutdown = .sendGoAway(close)
-        self.state = .closing(State.Closing(from: state, closePromise: promise))
+        self.state = .closing(State.Closing(from: state, isGraceful: true, closePromise: promise))
 
       case .closing(var state):
         self.state = ._modifying
@@ -627,9 +640,15 @@ extension ClientConnectionHandler {
     mutating func beginClosing() -> Bool {
       switch self.state {
       case .active(let active):
-        self.state = .closing(State.Closing(from: active, closePromise: nil))
+        self.state = .closing(State.Closing(from: active, isGraceful: false, closePromise: nil))
         return true
-      case .closing, .closed:
+      case .closing(var state):
+        self.state = ._modifying
+        let forceShutdown = state.isGraceful
+        state.isGraceful = false
+        self.state = .closing(state)
+        return forceShutdown
+      case .closed:
         return false
       case ._modifying:
         preconditionFailure()

+ 8 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerStateMachineTests.swift

@@ -44,6 +44,13 @@ final class ClientConnectionHandlerStateMachineTests: XCTestCase {
     XCTAssertEqual(state.streamClosed(1), .close)
   }
 
+  func testCloseWhenAlreadyClosingGracefully() {
+    var state = self.makeStateMachine()
+    state.streamOpened(1)
+    XCTAssertEqual(state.beginGracefulShutdown(promise: nil), .sendGoAway(false))
+    XCTAssertTrue(state.beginClosing())
+  }
+
   func testOpenAndCloseStreamWhenClosed() {
     var state = self.makeStateMachine()
     _ = state.closed()
@@ -104,4 +111,5 @@ final class ClientConnectionHandlerStateMachineTests: XCTestCase {
     // Close immediately, not streams are open.
     XCTAssertEqual(state.beginGracefulShutdown(promise: nil), .sendGoAway(true))
   }
+
 }

+ 20 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift

@@ -212,6 +212,26 @@ final class ClientConnectionHandlerTests: XCTestCase {
     try connection.waitUntilClosed()
   }
 
+  func testGoAwayWithNoErrorThenGoAwayWithProtocolError() throws {
+    let connection = try Connection()
+    try connection.activate()
+
+    connection.streamOpened(1)
+    connection.streamOpened(2)
+    connection.streamOpened(3)
+
+    try connection.goAway(lastStreamID: .maxID, errorCode: .noError)
+    // Should read out an event.
+    XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.noError, "")))
+
+    // Upgrade the close from graceful to 'error'.
+    try connection.goAway(lastStreamID: .maxID, errorCode: .protocolError)
+    // Should read out an event and the connection will be closed without waiting for notification
+    // from existing streams.
+    XCTAssertEqual(try connection.readEvent(), .closing(.goAway(.protocolError, "")))
+    try connection.waitUntilClosed()
+  }
+
   func testOutboundGracefulClose() throws {
     let connection = try Connection()
     try connection.activate()