Forráskód Böngészése

Fix several bugs in GRPC stream handlers and state machine (#1844)

Gustavo Cairo 1 éve
szülő
commit
3a9b81a1c2

+ 43 - 14
Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

@@ -67,16 +67,25 @@ extension GRPCClientStreamHandler {
       switch frameData.data {
       case .byteBuffer(let buffer):
         do {
-          try self.stateMachine.receive(buffer: buffer, endStream: endStream)
-          loop: while true {
-            switch self.stateMachine.nextInboundMessage() {
-            case .receiveMessage(let message):
-              context.fireChannelRead(self.wrapInboundOut(.message(message)))
-            case .awaitMoreMessages:
-              break loop
-            case .noMoreMessages:
-              context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
-              break loop
+          switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
+          case .endRPCAndForwardErrorStatus(let status):
+            context.fireChannelRead(self.wrapInboundOut(.status(status, [:])))
+            context.close(promise: nil)
+
+          case .readInbound:
+            loop: while true {
+              switch self.stateMachine.nextInboundMessage() {
+              case .receiveMessage(let message):
+                context.fireChannelRead(self.wrapInboundOut(.message(message)))
+              case .awaitMoreMessages:
+                break loop
+              case .noMoreMessages:
+                // This could only happen if the server sends a data frame with EOS
+                // set, without sending status and trailers.
+                // If this happens, we should have forwarded an error status above
+                // so we should never reach this point. Do nothing.
+                break loop
+              }
             }
           }
         } catch {
@@ -105,6 +114,7 @@ extension GRPCClientStreamHandler {
 
         case .receivedStatusAndMetadata(let status, let metadata):
           context.fireChannelRead(self.wrapInboundOut(.status(status, metadata)))
+          context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
 
         case .doNothing:
           ()
@@ -161,7 +171,29 @@ extension GRPCClientStreamHandler {
 
   func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
     switch mode {
-    case .output, .all:
+    case .input:
+      context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
+      promise?.succeed()
+
+    case .output:
+      // We flush all pending messages and update the internal state machine's
+      // state, but we don't close the outbound end of the channel, because
+      // forwarding the close in this case would cause the HTTP2 stream handler
+      // to close the whole channel (as the mode is ignored in its implementation).
+      do {
+        try self.stateMachine.closeOutbound()
+        // Force a flush by calling _flush instead of flush
+        // (otherwise, we'd skip flushing if we're in a read loop)
+        self._flush(context: context)
+        promise?.succeed()
+      } catch {
+        promise?.fail(error)
+        context.fireErrorCaught(error)
+      }
+
+    case .all:
+      // Since we're closing the whole channel here, we *do* forward the close
+      // down the pipeline.
       do {
         try self.stateMachine.closeOutbound()
         // Force a flush by calling _flush
@@ -172,9 +204,6 @@ extension GRPCClientStreamHandler {
         promise?.fail(error)
         context.fireErrorCaught(error)
       }
-
-    case .input:
-      context.close(mode: .input, promise: promise)
     }
   }
 

+ 72 - 19
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -292,6 +292,12 @@ private enum GRPCStreamStateMachineState {
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
     }
 
+    init(previousState: ClientOpenServerOpenState) {
+      self.framer = previousState.framer
+      self.compressor = previousState.compressor
+      self.inboundMessageBuffer = previousState.inboundMessageBuffer
+    }
+
     init(previousState: ClientOpenServerClosedState) {
       self.framer = previousState.framer
       self.compressor = previousState.compressor
@@ -388,12 +394,24 @@ struct GRPCStreamStateMachine {
     }
   }
 
-  mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
+  enum OnBufferReceivedAction: Equatable {
+    case readInbound
+
+    // Client-specific actions
+
+    // This will be returned when the server sends a data frame with EOS set.
+    // This is invalid as per the protocol specification, because the server
+    // can only close by sending trailers, not by setting EOS when sending
+    // a message.
+    case endRPCAndForwardErrorStatus(Status)
+  }
+
+  mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction {
     switch self.configuration {
     case .client:
-      try self.clientReceive(buffer: buffer, endStream: endStream)
+      return try self.clientReceive(buffer: buffer, endStream: endStream)
     case .server:
-      try self.serverReceive(buffer: buffer, endStream: endStream)
+      return try self.serverReceive(buffer: buffer, endStream: endStream)
     }
   }
 
@@ -729,7 +747,7 @@ extension GRPCStreamStateMachine {
     }
 
     let statusMessage =
-      metadata.firstString(forKey: .grpcStatusMessage)
+      metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false)
       .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
 
     var convertedMetadata = Metadata(headers: metadata)
@@ -860,38 +878,68 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
+  private mutating func clientReceive(
+    buffer: ByteBuffer,
+    endStream: Bool
+  ) throws -> OnBufferReceivedAction {
     // This is a message received by the client, from the server.
     switch self.state {
     case .clientIdleServerIdle:
       try self.invalidState(
         "Cannot have received anything from server if client is not yet open."
       )
+
     case .clientOpenServerIdle, .clientClosedServerIdle:
       try self.invalidState(
         "Server cannot have sent a message before sending the initial metadata."
       )
+
     case .clientOpenServerOpen(var state):
+      if endStream {
+        // This is invalid as per the protocol specification, because the server
+        // can only close by sending trailers, not by setting EOS when sending
+        // a message.
+        self.state = .clientClosedServerClosed(.init(previousState: state))
+        return .endRPCAndForwardErrorStatus(
+          Status(
+            code: .internalError,
+            message: """
+              Server sent EOS alongside a data frame, but server is only allowed \
+              to close by sending status and trailers.
+              """
+          )
+        )
+      }
+
       try state.deframer.process(buffer: buffer) { deframedMessage in
         state.inboundMessageBuffer.append(deframedMessage)
       }
+      self.state = .clientOpenServerOpen(state)
+      return .readInbound
+
+    case .clientClosedServerOpen(var state):
       if endStream {
-        self.state = .clientOpenServerClosed(.init(previousState: state))
-      } else {
-        self.state = .clientOpenServerOpen(state)
+        self.state = .clientClosedServerClosed(.init(previousState: state))
+        return .endRPCAndForwardErrorStatus(
+          Status(
+            code: .internalError,
+            message: """
+              Server sent EOS alongside a data frame, but server is only allowed \
+              to close by sending status and trailers.
+              """
+          )
+        )
       }
-    case .clientClosedServerOpen(var state):
+
       // The client may have sent the end stream and thus it's closed,
       // but the server may still be responding.
       // The client must have a deframer set up, so force-unwrap is okay.
       try state.deframer!.process(buffer: buffer) { deframedMessage in
         state.inboundMessageBuffer.append(deframedMessage)
       }
-      if endStream {
-        self.state = .clientClosedServerClosed(.init(previousState: state))
-      } else {
-        self.state = .clientClosedServerOpen(state)
-      }
+      self.state = .clientClosedServerOpen(state)
+      return .readInbound
+
     case .clientOpenServerClosed, .clientClosedServerClosed:
       try self.invalidState(
         "Cannot have received anything from a closed server."
@@ -1314,7 +1362,10 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
+  private mutating func serverReceive(
+    buffer: ByteBuffer,
+    endStream: Bool
+  ) throws -> OnBufferReceivedAction {
     switch self.state {
     case .clientIdleServerIdle:
       try self.invalidState(
@@ -1354,6 +1405,7 @@ extension GRPCStreamStateMachine {
         "Client can't send a message if closed."
       )
     }
+    return .readInbound
   }
 
   private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
@@ -1443,10 +1495,11 @@ internal enum GRPCHTTP2Keys: String {
 }
 
 extension HPACKHeaders {
-  internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
-    self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
-      String($0)
-    }
+  internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
+    self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
+      .map {
+        String($0)
+      }
   }
 
   internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {

+ 16 - 10
Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

@@ -64,16 +64,22 @@ extension GRPCServerStreamHandler {
       switch frameData.data {
       case .byteBuffer(let buffer):
         do {
-          try self.stateMachine.receive(buffer: buffer, endStream: endStream)
-          loop: while true {
-            switch self.stateMachine.nextInboundMessage() {
-            case .receiveMessage(let message):
-              context.fireChannelRead(self.wrapInboundOut(.message(message)))
-            case .awaitMoreMessages:
-              break loop
-            case .noMoreMessages:
-              context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
-              break loop
+          switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
+          case .endRPCAndForwardErrorStatus:
+            preconditionFailure(
+              "OnBufferReceivedAction.endRPCAndForwardErrorStatus should never be returned for the server."
+            )
+          case .readInbound:
+            loop: while true {
+              switch self.stateMachine.nextInboundMessage() {
+              case .receiveMessage(let message):
+                context.fireChannelRead(self.wrapInboundOut(.message(message)))
+              case .awaitMoreMessages:
+                break loop
+              case .noMoreMessages:
+                context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
+                break loop
+              }
             }
           }
         } catch {

+ 72 - 1
Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift

@@ -305,7 +305,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
     buffer.writeInteger(UInt8(0))  // not compressed
     buffer.writeInteger(UInt32(42))  // message length
     buffer.writeRepeatingByte(0, count: 42)  // message
-    let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
+    let clientDataPayload = HTTP2Frame.FramePayload.Data(
+      data: .byteBuffer(buffer),
+      endStream: false
+    )
     XCTAssertThrowsError(
       ofType: RPCError.self,
       try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
@@ -321,6 +324,74 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
     XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
   }
 
+  func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
+    let handler = GRPCClientStreamHandler(
+      methodDescriptor: .init(service: "test", method: "test"),
+      scheme: .http,
+      outboundEncoding: .identity,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      skipStateMachineAssertions: true
+    )
+
+    let channel = EmbeddedChannel(handler: handler)
+
+    // Send client's initial metadata
+    XCTAssertNoThrow(
+      try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
+    )
+
+    // Make sure we have sent right metadata.
+    let writtenMetadata = try channel.assertReadHeadersOutbound()
+
+    XCTAssertEqual(
+      writtenMetadata.headers,
+      [
+        GRPCHTTP2Keys.method.rawValue: "POST",
+        GRPCHTTP2Keys.scheme.rawValue: "http",
+        GRPCHTTP2Keys.path.rawValue: "test/test",
+        GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+        GRPCHTTP2Keys.te.rawValue: "trailers",
+      ]
+    )
+
+    // Server sends initial metadata
+    let serverInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.status.rawValue: "200",
+      GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
+      )
+    )
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart.self),
+      .metadata(Metadata(headers: serverInitialMetadata))
+    )
+
+    // Server sends message with EOS set.
+    var buffer = ByteBuffer()
+    buffer.writeInteger(UInt8(0))  // not compressed
+    buffer.writeInteger(UInt32(42))  // message length
+    buffer.writeRepeatingByte(0, count: 42)  // message
+    let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
+    XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
+
+    // Make sure we got status + trailers with the right error.
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart.self),
+      .status(
+        Status(
+          code: .internalError,
+          message:
+            "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
+        ),
+        [:]
+      )
+    )
+  }
+
   func testServerEndsStream() throws {
     let handler = GRPCClientStreamHandler(
       methodDescriptor: .init(service: "test", method: "test"),

+ 118 - 29
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -456,14 +456,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
       GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue),
       GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall(
-        "Some status message"
+        "Some, status, message"
       )!,
       "custom-key": "custom-value",
     ]
     let trailers = try stateMachine.receive(headers: trailersOnlyResponse, endStream: true)
     switch trailers {
     case .receivedStatusAndMetadata(let status, let metadata):
-      XCTAssertEqual(status, Status(code: .internalError, message: "Some status message"))
+      XCTAssertEqual(status, Status(code: .internalError, message: "Some, status, message"))
       XCTAssertEqual(
         metadata,
         [
@@ -599,8 +599,22 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
-      XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false))
-      XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true))
+      XCTAssertEqual(
+        try stateMachine.receive(buffer: .init(), endStream: false),
+        .readInbound
+      )
+      XCTAssertEqual(
+        try stateMachine.receive(buffer: .init(), endStream: true),
+        .endRPCAndForwardErrorStatus(
+          Status(
+            code: .internalError,
+            message: """
+              Server sent EOS alongside a data frame, but server is only allowed \
+              to close by sending status and trailers.
+              """
+          )
+        )
+      )
     }
   }
 
@@ -776,7 +790,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -790,7 +807,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
     let receivedBytes = try self.frameMessage(originalMessage, compress: true)
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage))
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -804,7 +824,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close server
     XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
@@ -821,7 +844,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close client
     XCTAssertNoThrow(try stateMachine.closeOutbound())
@@ -840,7 +866,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close server
     XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
@@ -906,8 +935,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
     let secondResponseBytes = [UInt8]([8, 9, 10])
     let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
-    try stateMachine.receive(buffer: firstResponse, endStream: false)
-    try stateMachine.receive(buffer: secondResponse, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstResponse, endStream: false),
+      .readInbound
+    )
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondResponse, endStream: false),
+      .readInbound
+    )
 
     // Make sure messages have arrived
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
@@ -988,8 +1023,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
     let secondResponseBytes = [UInt8]([8, 9, 10])
     let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
-    try stateMachine.receive(buffer: firstResponse, endStream: false)
-    try stateMachine.receive(buffer: secondResponse, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstResponse, endStream: false),
+      .readInbound
+    )
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondResponse, endStream: false),
+      .readInbound
+    )
 
     // Make sure messages have arrived
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
@@ -1069,8 +1110,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
     let secondResponseBytes = [UInt8]([8, 9, 10])
     let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
-    try stateMachine.receive(buffer: firstResponse, endStream: false)
-    try stateMachine.receive(buffer: secondResponse, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstResponse, endStream: false),
+      .readInbound
+    )
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondResponse, endStream: false),
+      .readInbound
+    )
 
     // Make sure messages have arrived
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
@@ -2061,7 +2108,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -2076,7 +2126,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let originalMessage = [UInt8]([42, 42, 43, 43])
     let receivedBytes = try self.frameMessage(originalMessage, compress: true)
 
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage))
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -2090,7 +2143,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close server
     XCTAssertNoThrow(
@@ -2117,7 +2173,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close client
     XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true))
@@ -2136,7 +2195,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ])
-    try stateMachine.receive(buffer: receivedBytes, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: receivedBytes, endStream: false),
+      .readInbound
+    )
 
     // Close server
     XCTAssertNoThrow(
@@ -2189,9 +2251,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
     let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
 
-    try stateMachine.receive(buffer: firstMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
-    try stateMachine.receive(buffer: secondMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
 
     // Server sends response
@@ -2226,7 +2294,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     try secondPromise.futureResult.assertSuccess().wait()
 
     // Client sends end
-    try stateMachine.receive(buffer: ByteBuffer(), endStream: true)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: ByteBuffer(), endStream: true),
+      .readInbound
+    )
 
     // Server ends
     let response = try stateMachine.send(
@@ -2259,13 +2330,22 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
     let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
 
-    try stateMachine.receive(buffer: firstMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
-    try stateMachine.receive(buffer: secondMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
 
     // Client sends end
-    try stateMachine.receive(buffer: ByteBuffer(), endStream: true)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: ByteBuffer(), endStream: true),
+      .readInbound
+    )
 
     // Server sends initial metadata
     let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"]))
@@ -2324,9 +2404,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
     let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
 
-    try stateMachine.receive(buffer: firstMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: firstMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
-    try stateMachine.receive(buffer: secondMessage, endStream: false)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: secondMessage, endStream: false),
+      .readInbound
+    )
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
 
     // Server sends initial metadata
@@ -2342,7 +2428,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
 
     // Client sends end
-    try stateMachine.receive(buffer: ByteBuffer(), endStream: true)
+    XCTAssertEqual(
+      try stateMachine.receive(buffer: ByteBuffer(), endStream: true),
+      .readInbound
+    )
 
     // Server sends response
     let firstResponse = [UInt8]([5, 6, 7])