Browse Source

Fix bad transition when server sends invalid headers in stream state machine (#88)

While triaging
https://github.com/grpc/grpc-swift-nio-transport/issues/83 we discovered
an issue with the stream state machine, where we would not transition
the server to `closed` when receiving invalid headers. This is not
correct, because at this point the server is in an unrecoverable state
from the client's perspective, and we should ignore any subsequent
messages.

This PR fixes this transition, which was causing an "invalid state"
error (or runtime crash in debug mode). It now drops any further
messages from the server on the floor, instead of throwing/sending
another error message back to the client.

I've also fixed a problem with receiving headers with a 1xx status code:
they should be ignored, but we were transitioning to closed states
instead.
Gus Cairo 9 months ago
parent
commit
699ebb8899

+ 32 - 32
Sources/GRPCNIOTransportCore/GRPCStreamStateMachine.swift

@@ -831,6 +831,7 @@ extension GRPCStreamStateMachine {
   private enum ServerHeadersValidationResult {
     case valid
     case invalid(OnMetadataReceived)
+    case skip
   }
 
   private mutating func clientValidateHeadersReceivedFromServer(
@@ -863,7 +864,7 @@ extension GRPCStreamStateMachine {
         // For 1xx status codes, the entire header should be skipped and a
         // subsequent header should be read.
         // See https://github.com/grpc/grpc/blob/7f664c69b2a636386fbf95c16bc78c559734ce0f/doc/http-grpc-status-mapping.md
-        return .invalid(.doNothing)
+        return .skip
       }
 
       // Forward the mapped status code.
@@ -966,22 +967,26 @@ extension GRPCStreamStateMachine {
     switch self.state {
     case .clientOpenServerIdle(let state):
       switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
-      case (.invalid(let action), true):
-        // The headers are invalid, but the server signalled that it was
-        // closing the stream, so close both client and server.
+      case (.skip, _):
+        // Headers should be ignored, so do nothing for now.
+        return .doNothing
+
+      case (.invalid(let action), _):
+        // The received headers are invalid, so we can't do anything other than assume this server
+        // is not behaving correctly: transition both client and server to closed.
         self.state = .clientClosedServerClosed(.init(previousState: state))
         return action
-      case (.invalid(let action), false):
-        self.state = .clientClosedServerIdle(.init(previousState: state))
-        return action
+
       case (.valid, true):
         // This is a trailers-only response: close server.
         self.state = .clientOpenServerClosed(.init(previousState: state))
         return try self.validateTrailers(headers)
+
       case (.valid, false):
         switch self.processInboundEncoding(headers: headers, configuration: configuration) {
         case .error(let failure):
           return failure
+
         case .success(let inboundEncoding):
           let decompressor = Zlib.Method(encoding: inboundEncoding)
             .flatMap { Zlib.Decompressor(method: $0) }
@@ -1013,18 +1018,21 @@ extension GRPCStreamStateMachine {
 
     case .clientClosedServerIdle(let state):
       switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
-      case (.invalid(let action), true):
-        // The headers are invalid, but the server signalled that it was
-        // closing the stream, so close the server side too.
+      case (.skip, _):
+        // Headers should be ignored, so do nothing for now.
+        return .doNothing
+
+      case (.invalid(let action), _):
+        // The received headers are invalid, so we can't do anything other than assume this server
+        // is not behaving correctly: transition both client and server to closed.
         self.state = .clientClosedServerClosed(.init(previousState: state))
         return action
-      case (.invalid(let action), false):
-        // Client is already closed, so we don't need to update our state.
-        return action
+
       case (.valid, true):
         // This is a trailers-only response: close server.
         self.state = .clientClosedServerClosed(.init(previousState: state))
         return try self.validateTrailers(headers)
+
       case (.valid, false):
         switch self.processInboundEncoding(headers: headers, configuration: configuration) {
         case .error(let failure):
@@ -1051,27 +1059,15 @@ extension GRPCStreamStateMachine {
       }
       return try self.validateTrailers(headers)
 
-    case .clientClosedServerClosed:
-      // We could end up here if we received a grpc-status header in a previous
-      // frame (which would have already close the server) and then we receive
-      // an empty frame with EOS set.
-      // We wouldn't want to throw in that scenario, so we just ignore it.
-      // Note that we don't want to ignore it if EOS is not set here though, as
-      // then it would be an invalid payload.
-      if !endStream || headers.count > 0 {
-        try self.invalidState(
-          "Server is closed, nothing could have been sent."
-        )
-      }
+    case .clientOpenServerClosed, .clientClosedServerClosed:
+      // We've transitioned the server to closed: drop any other incoming headers.
       return .doNothing
+
     case .clientIdleServerIdle:
       try self.invalidState(
         "Server cannot have sent metadata if the client is idle."
       )
-    case .clientOpenServerClosed:
-      try self.invalidState(
-        "Server is closed, nothing could have been sent."
-      )
+
     case ._modifying:
       preconditionFailure()
     }
@@ -1153,9 +1149,13 @@ extension GRPCStreamStateMachine {
       }
 
     case .clientOpenServerClosed, .clientClosedServerClosed:
-      try self.invalidState(
-        "Cannot have received anything from a closed server."
-      )
+      // If the server is closed, it's because it actually closed, or because the client
+      // transitioned it to a close state because it returned invalid headers.
+      // In either case, we will have already surfaced a status + trailers response to the client,
+      // so if we receive further packages, we should just drop them on the floor,
+      // as there's nothing for us to do with them.
+      return .doNothing
+
     case ._modifying:
       preconditionFailure()
     }

+ 41 - 10
Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift

@@ -111,18 +111,36 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
     XCTAssertNoThrow(try channel.writeOutbound(request))
 
     // Receive server's initial metadata with 1xx status
-    let serverInitialMetadata: HPACKHeaders = [
+    let invalidServerInitialMetadata: HPACKHeaders = [
       GRPCHTTP2Keys.status.rawValue: "104",
       GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
     ]
 
     XCTAssertNoThrow(
       try channel.writeInbound(
-        HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
+        HTTP2Frame.FramePayload.headers(.init(headers: invalidServerInitialMetadata))
       )
     )
 
     XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
+
+    // We are still expecting the correct headers after getting a 1xx response, so make sure we
+    // don't fail if we get the metadata twice.
+    let validServerInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.status.rawValue: "200",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      "some-custom-header": "some-custom-value",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: validServerInitialMetadata))
+      )
+    )
+
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
+      RPCResponsePart.metadata(Metadata(headers: validServerInitialMetadata))
+    )
   }
 
   func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
@@ -161,6 +179,16 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
         Metadata(headers: serverInitialMetadata)
       )
     )
+
+    // We should not throw if the server sends another message:
+    // we should drop it, since the server is now closed.
+    var buffer = ByteBuffer()
+    buffer.writeInteger(UInt8(0))  // not compressed
+    buffer.writeInteger(UInt32(42))  // message length
+    buffer.writeRepeatingByte(0, count: 42)  // message
+    let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
+    XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
+    XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
   }
 
   func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
@@ -448,19 +476,22 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
       )
     )
 
-    // We should throw if the server sends another message, since it's closed the stream already.
+    // We should not throw if the server sends another message:
+    // we should drop it, since the server is now closed.
     var buffer = ByteBuffer()
     buffer.writeInteger(UInt8(0))  // not compressed
     buffer.writeInteger(UInt32(42))  // message length
     buffer.writeRepeatingByte(0, count: 42)  // message
     let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
-    XCTAssertThrowsError(
-      ofType: RPCError.self,
-      try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
-    ) { error in
-      XCTAssertEqual(error.code, .internalError)
-      XCTAssertEqual(error.message, "Invalid state")
-    }
+    XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
+    XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
+
+    // We should also not throw if the server sends trailers again.
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata, endStream: true))
+      )
+    )
   }
 
   func testNormalFlow() throws {

+ 18 - 20
Tests/GRPCNIOTransportCoreTests/GRPCStreamStateMachineTests.swift

@@ -335,6 +335,12 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
           metadata: [":status": "300"]
         )
       )
+
+      // Further attempts from the server to send messages to the client will simply be dropped.
+      XCTAssertEqual(
+        try stateMachine.receive(buffer: .init(), endStream: false),
+        .doNothing
+      )
     }
   }
 
@@ -512,12 +518,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
-      XCTAssertThrowsError(
-        ofType: GRPCStreamStateMachine.InvalidState.self,
-        try stateMachine.receive(headers: .init(), endStream: false)
-      ) { error in
-        XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.")
-      }
+      // We should not throw if the server sends metadata after it's been transitioned to close:
+      // we should just drop these packages.
+      XCTAssertNoThrow(try stateMachine.receive(headers: .init(), endStream: false))
     }
   }
 
@@ -600,13 +603,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
   func testReceiveEndTrailerWhenClientOpenAndServerClosed() {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed)
 
-    // Receive another end trailer
-    XCTAssertThrowsError(
-      ofType: GRPCStreamStateMachine.InvalidState.self,
-      try stateMachine.receive(headers: .init(), endStream: true)
-    ) { error in
-      XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.")
-    }
+    // We should not throw if the server sends trailers after it's been transitioned to close:
+    // we should just drop these packages.
+    XCTAssertNoThrow(try stateMachine.receive(headers: .init(), endStream: true))
   }
 
   func testReceiveEndTrailerWhenClientClosedAndServerIdle() throws {
@@ -703,16 +702,15 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     }
   }
 
-  func testReceiveMessageWhenServerClosed() {
+  func testReceiveMessageWhenServerClosed() throws {
     for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
-      XCTAssertThrowsError(
-        ofType: GRPCStreamStateMachine.InvalidState.self,
-        try stateMachine.receive(buffer: .init(), endStream: false)
-      ) { error in
-        XCTAssertEqual(error.message, "Cannot have received anything from a closed server.")
-      }
+      // We should drop the messages if we receive anything once the server's closed.
+      XCTAssertEqual(
+        try stateMachine.receive(buffer: .init(), endStream: false),
+        .doNothing
+      )
     }
   }