Browse Source

Better handle DATA frames with end stream set (#1139)

Motivation:

If a peer doesn't follow spec and sends the client a DATA frame with end
stream set, the client will ignore the fact that end stream was set.
When the stream channel closes just after, the call is unaware of why it
was closed (because it hasn't received a final status) which leads to a
pretty non-descript error message.

Modifications:

Add a 'receiveEndOfResponseStream' to the client state machine which
produces a 'GRPCStatus' if the end stream is unexpected and should not
be ignored.

Result:

Calls will terminate with an 'internal error' status and a message about
a protocol violation if the server sends a data frame with end stream
set.

Resolves #1136
George Barnett 5 years ago
parent
commit
28d7266cd0

+ 10 - 0
Sources/GRPC/GRPCClientChannelHandler.swift

@@ -425,6 +425,16 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
       MetadataKey.h2EndStream: "\(content.endStream)",
     ])
 
+    self.consumeBytes(from: &buffer, context: context)
+
+    // End stream is set; we don't usually expect this but can handle it in some situations.
+    if content.endStream, let status = self.stateMachine.receiveEndOfResponseStream() {
+      self.logger.warning("Unexpected end stream set on DATA frame")
+      context.fireChannelRead(self.wrapInboundOut(.status(status)))
+    }
+  }
+
+  private func consumeBytes(from buffer: inout ByteBuffer, context: ChannelHandlerContext) {
     // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
     // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
     // containing the gRPC status code) and an additional empty data frame is sent with the

+ 45 - 0
Sources/GRPC/GRPCClientStateMachine.swift

@@ -346,6 +346,21 @@ struct GRPCClientStateMachine {
     }
   }
 
+  /// Receive a DATA frame with the end stream flag set. Determines whether it is safe for the
+  /// caller to ignore the end stream flag or whether a synthesised status should be forwarded.
+  ///
+  /// Receiving a DATA frame with the end stream flag set is unexpected: the specification dictates
+  /// that an RPC should be ended by the server sending the client a HEADERS frame with end stream
+  /// set. However, we will tolerate end stream on a DATA frame if we believe the RPC has already
+  /// completed (i.e. we are in the 'clientClosedServerClosed' state). In cases where we don't
+  /// expect end of stream on a DATA frame we will emit a status with a message explaining
+  /// the protocol violation.
+  mutating func receiveEndOfResponseStream() -> GRPCStatus? {
+    return self.withStateAvoidingCoWs { state in
+      state.receiveEndOfResponseStream()
+    }
+  }
+
   /// Temporarily sets `self.state` to `.modifying` before calling the provided block and setting
   /// `self.state` to the `State` modified by the block.
   ///
@@ -555,6 +570,36 @@ extension GRPCClientStateMachine.State {
     return result
   }
 
+  /// See `GRPCClientStateMachine.receiveEndOfResponseStream()`.
+  mutating func receiveEndOfResponseStream() -> GRPCStatus? {
+    let status: GRPCStatus?
+
+    switch self {
+    case .clientIdleServerIdle:
+      // Can't see end stream before writing on it.
+      preconditionFailure()
+
+    case .clientActiveServerIdle,
+         .clientActiveServerActive,
+         .clientClosedServerIdle,
+         .clientClosedServerActive:
+      self = .clientClosedServerClosed
+      status = .init(
+        code: .internalError,
+        message: "Protocol violation: received DATA frame with end stream set"
+      )
+
+    case .clientClosedServerClosed:
+      // We've already closed. Ignore this.
+      status = nil
+
+    case .modifying:
+      preconditionFailure("State left as 'modifying'")
+    }
+
+    return status
+  }
+
   /// Makes the request headers (`Request-Headers` in the specification) used to initiate an RPC
   /// call.
   ///

+ 94 - 0
Tests/GRPCTests/GRPCClientChannelHandlerTests.swift

@@ -0,0 +1,94 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@testable import GRPC
+import NIO
+import NIOHPACK
+import NIOHTTP2
+import XCTest
+
+class GRPCClientChannelHandlerTests: GRPCTestCase {
+  private func makeRequestHead() -> _GRPCRequestHead {
+    return _GRPCRequestHead(
+      method: "POST",
+      scheme: "https",
+      path: "/foo/bar",
+      host: "localhost",
+      deadline: .distantFuture,
+      customMetadata: [:],
+      encoding: .disabled
+    )
+  }
+
+  func doTestDataFrameWithEndStream(dataContainsMessage: Bool) throws {
+    let handler = GRPCClientChannelHandler(
+      callType: .unary,
+      logger: self.clientLogger
+    )
+
+    let channel = EmbeddedChannel(handler: handler)
+
+    // Write request head.
+    let head = self.makeRequestHead()
+    XCTAssertNoThrow(try channel.writeOutbound(_RawGRPCClientRequestPart.head(head)))
+    // Read out a frame payload.
+    XCTAssertNotNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
+
+    // Respond with headers.
+    let headers: HPACKHeaders = [":status": "200", "content-type": "application/grpc"]
+    let headersPayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
+    XCTAssertNoThrow(try channel.writeInbound(headersPayload))
+    // Read them out the other side.
+    XCTAssertNotNil(try channel.readInbound(as: _RawGRPCClientResponsePart.self))
+
+    // Respond with DATA and end stream.
+    var buffer = ByteBuffer()
+
+    // Write a message, if we need to.
+    if dataContainsMessage {
+      buffer.writeInteger(UInt8(0)) // not compressed
+      buffer.writeInteger(UInt32(42)) // message length
+      buffer.writeRepeatingByte(0, count: 42) // message
+    }
+
+    let dataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
+    XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(dataPayload)))
+
+    if dataContainsMessage {
+      // Read the message out the other side.
+      XCTAssertNotNil(try channel.readInbound(as: _RawGRPCClientResponsePart.self))
+    }
+
+    // We should also generate a status since end stream was set.
+    if let part = try channel.readInbound(as: _RawGRPCClientResponsePart.self) {
+      switch part {
+      case .initialMetadata, .message, .trailingMetadata:
+        XCTFail("Unexpected response part")
+      case .status:
+        () // Expected
+      }
+    } else {
+      XCTFail("Expected to read another response part")
+    }
+  }
+
+  func testDataFrameWithEndStream() throws {
+    try self.doTestDataFrameWithEndStream(dataContainsMessage: true)
+  }
+
+  func testEmptyDataFrameWithEndStream() throws {
+    try self.doTestDataFrameWithEndStream(dataContainsMessage: false)
+  }
+}

+ 39 - 0
Tests/GRPCTests/GRPCClientStateMachineTests.swift

@@ -471,6 +471,45 @@ extension GRPCClientStateMachineTests {
       expected: .invalidState
     )
   }
+
+  private func doTestReceiveEndStreamOnDataWhenActive(_ state: StateMachine.State) throws {
+    var stateMachine = self.makeStateMachine(state)
+    let status = try assertNotNil(stateMachine.receiveEndOfResponseStream())
+    XCTAssertEqual(status.code, .internalError)
+  }
+
+  func testReceiveEndStreamOnDataClientActiveServerIdle() throws {
+    try self.doTestReceiveEndStreamOnDataWhenActive(
+      .clientActiveServerIdle(
+        writeState: .one(),
+        pendingReadState: .init(arity: .one, messageEncoding: .disabled)
+      )
+    )
+  }
+
+  func testReceiveEndStreamOnDataClientClosedServerIdle() throws {
+    try self.doTestReceiveEndStreamOnDataWhenActive(
+      .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
+    )
+  }
+
+  func testReceiveEndStreamOnDataClientActiveServerActive() throws {
+    try self.doTestReceiveEndStreamOnDataWhenActive(
+      .clientActiveServerActive(writeState: .one(), readState: .one())
+    )
+  }
+
+  func testReceiveEndStreamOnDataClientClosedServerActive() throws {
+    try self.doTestReceiveEndStreamOnDataWhenActive(
+      .clientClosedServerActive(readState: .one())
+    )
+  }
+
+  func testReceiveEndStreamOnDataWhenClosed() {
+    var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
+    // Already closed, end stream is ignored.
+    XCTAssertNil(stateMachine.receiveEndOfResponseStream())
+  }
 }
 
 // MARK: - Basic RPC flow.