瀏覽代碼

Synthesise status when stream closes unexpectedly (#1953)

Motivation

When the stream closes unexpectedly (the H2 stream channel becomes inactive, an error is fired through the pipeline, or we receive a RST_STREAM frame), the client-side inbound sequence either finishes or throws an error.
To make it easier for higher layers to retry, we want to synthesise a Status instead.

Modifications

On the client side, write a Status when one of the aforementioned unexpected closures happens.
On the server side, make sure we're always firing an error when closing unexpectedly.
On both, make sure we transition the stream state machine to closed.

Result

Inbound sequence now returns a Status instead of finishing/throwing
Gustavo Cairo 1 年之前
父節點
當前提交
e819f390cb

+ 27 - 1
Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

@@ -130,7 +130,10 @@ extension GRPCClientStreamHandler {
         context.fireErrorCaught(error)
       }
 
-    case .ping, .goAway, .priority, .rstStream, .settings, .pushPromise, .windowUpdate,
+    case .rstStream:
+      self.handleUnexpectedInboundClose(context: context, reason: .streamReset)
+
+    case .ping, .goAway, .priority, .settings, .pushPromise, .windowUpdate,
       .alternativeService, .origin:
       ()
     }
@@ -148,6 +151,29 @@ extension GRPCClientStreamHandler {
   func handlerRemoved(context: ChannelHandlerContext) {
     self.stateMachine.tearDown()
   }
+
+  func channelInactive(context: ChannelHandlerContext) {
+    self.handleUnexpectedInboundClose(context: context, reason: .channelInactive)
+    context.fireChannelInactive()
+  }
+
+  func errorCaught(context: ChannelHandlerContext, error: any Error) {
+    self.handleUnexpectedInboundClose(context: context, reason: .errorThrown(error))
+  }
+
+  private func handleUnexpectedInboundClose(
+    context: ChannelHandlerContext,
+    reason: GRPCStreamStateMachine.UnexpectedInboundCloseReason
+  ) {
+    switch self.stateMachine.unexpectedInboundClose(reason: reason) {
+    case .forwardStatus_clientOnly(let status):
+      context.fireChannelRead(self.wrapInboundOut(.status(status, [:])))
+    case .doNothing:
+      ()
+    case .fireError_serverOnly:
+      assertionFailure("`fireError` should only happen on the server side, never on the client.")
+    }
+  }
 }
 
 // - MARK: ChannelOutboundHandler

+ 110 - 0
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -539,6 +539,38 @@ struct GRPCStreamStateMachine {
       state.compressor?.end()
     }
   }
+
+  enum OnUnexpectedInboundClose {
+    case forwardStatus_clientOnly(Status)
+    case fireError_serverOnly(any Error)
+    case doNothing
+
+    init(serverCloseReason: UnexpectedInboundCloseReason) {
+      switch serverCloseReason {
+      case .streamReset, .channelInactive:
+        self = .fireError_serverOnly(RPCError(serverCloseReason))
+      case .errorThrown(let error):
+        self = .fireError_serverOnly(error)
+      }
+    }
+  }
+
+  enum UnexpectedInboundCloseReason {
+    case streamReset
+    case channelInactive
+    case errorThrown(any Error)
+  }
+
+  mutating func unexpectedInboundClose(
+    reason: UnexpectedInboundCloseReason
+  ) -> OnUnexpectedInboundClose {
+    switch self.configuration {
+    case .client:
+      return self.clientUnexpectedInboundClose(reason: reason)
+    case .server:
+      return self.serverUnexpectedInboundClose(reason: reason)
+    }
+  }
 }
 
 // - MARK: Client
@@ -1044,6 +1076,35 @@ extension GRPCStreamStateMachine {
     }
     throw RPCError(code: .internalError, message: message)
   }
+
+  private mutating func clientUnexpectedInboundClose(
+    reason: UnexpectedInboundCloseReason
+  ) -> OnUnexpectedInboundClose {
+    switch self.state {
+    case .clientIdleServerIdle(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return .forwardStatus_clientOnly(Status(RPCError(reason)))
+
+    case .clientOpenServerIdle(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return .forwardStatus_clientOnly(Status(RPCError(reason)))
+
+    case .clientClosedServerIdle(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return .forwardStatus_clientOnly(Status(RPCError(reason)))
+
+    case .clientOpenServerOpen(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return .forwardStatus_clientOnly(Status(RPCError(reason)))
+
+    case .clientClosedServerOpen(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return .forwardStatus_clientOnly(Status(RPCError(reason)))
+
+    case .clientOpenServerClosed, .clientClosedServerClosed:
+      return .doNothing
+    }
+  }
 }
 
 // - MARK: Server
@@ -1469,6 +1530,31 @@ extension GRPCStreamStateMachine {
       return .awaitMoreMessages
     }
   }
+
+  private mutating func serverUnexpectedInboundClose(
+    reason: UnexpectedInboundCloseReason
+  ) -> OnUnexpectedInboundClose {
+    switch self.state {
+    case .clientIdleServerIdle(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return OnUnexpectedInboundClose(serverCloseReason: reason)
+
+    case .clientOpenServerIdle(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return OnUnexpectedInboundClose(serverCloseReason: reason)
+
+    case .clientOpenServerOpen(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return OnUnexpectedInboundClose(serverCloseReason: reason)
+
+    case .clientOpenServerClosed(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
+      return OnUnexpectedInboundClose(serverCloseReason: reason)
+
+    case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
+      return .doNothing
+    }
+  }
 }
 
 extension MethodDescriptor {
@@ -1612,3 +1698,27 @@ extension MethodDescriptor {
     return "/\(self.service)/\(self.method)"
   }
 }
+
+extension RPCError {
+  @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+  fileprivate init(_ reason: GRPCStreamStateMachine.UnexpectedInboundCloseReason) {
+    switch reason {
+    case .streamReset:
+      self = RPCError(
+        code: .unavailable,
+        message: "Stream unexpectedly closed: a RST_STREAM frame was received."
+      )
+    case .channelInactive:
+      self = RPCError(code: .unavailable, message: "Stream unexpectedly closed.")
+    case .errorThrown:
+      self = RPCError(code: .unavailable, message: "Stream unexpectedly closed with error.")
+    }
+  }
+}
+
+extension Status {
+  @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+  fileprivate init(_ error: RPCError) {
+    self = Status(code: Status.Code(error.code), message: error.message)
+  }
+}

+ 29 - 1
Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

@@ -139,7 +139,10 @@ extension GRPCServerStreamHandler {
         context.fireErrorCaught(error)
       }
 
-    case .ping, .goAway, .priority, .rstStream, .settings, .pushPromise, .windowUpdate,
+    case .rstStream:
+      self.handleUnexpectedInboundClose(context: context, reason: .streamReset)
+
+    case .ping, .goAway, .priority, .settings, .pushPromise, .windowUpdate,
       .alternativeService, .origin:
       ()
     }
@@ -163,6 +166,31 @@ extension GRPCServerStreamHandler {
       )
     )
   }
+
+  func channelInactive(context: ChannelHandlerContext) {
+    self.handleUnexpectedInboundClose(context: context, reason: .channelInactive)
+    context.fireChannelInactive()
+  }
+
+  func errorCaught(context: ChannelHandlerContext, error: any Error) {
+    self.handleUnexpectedInboundClose(context: context, reason: .errorThrown(error))
+  }
+
+  private func handleUnexpectedInboundClose(
+    context: ChannelHandlerContext,
+    reason: GRPCStreamStateMachine.UnexpectedInboundCloseReason
+  ) {
+    switch self.stateMachine.unexpectedInboundClose(reason: reason) {
+    case .fireError_serverOnly(let wrappedError):
+      context.fireErrorCaught(wrappedError)
+    case .doNothing:
+      ()
+    case .forwardStatus_clientOnly:
+      assertionFailure(
+        "`forwardStatus` should only happen on the client side, never on the server."
+      )
+    }
+  }
 }
 
 // - MARK: ChannelOutboundHandler

+ 153 - 3
Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift

@@ -40,9 +40,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
     let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
       .ping(.init(), ack: false),
       .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
-      // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
-      // initialiser is internal, so I can't create one of these frames.
-      .rstStream(.cancel),
+      // TODO: uncomment when it's possible to build a `StreamPriorityData`.
+      // .priority(
+      //   HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
+      // ),
       .settings(.ack),
       .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
       .windowUpdate(windowSizeIncrement: 4),
@@ -764,6 +765,155 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
     )
     XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
   }
+
+  func testUnexpectedStreamClose_ErrorFired() throws {
+    let handler = GRPCClientStreamHandler(
+      methodDescriptor: .init(service: "test", method: "test"),
+      scheme: .http,
+      outboundEncoding: .none,
+      acceptedEncodings: [],
+      maximumPayloadSize: 1,
+      skipStateMachineAssertions: true
+    )
+
+    let channel = EmbeddedChannel(handler: handler)
+
+    // Write client's initial metadata
+    XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/test/test",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
+    XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
+
+    // An error is fired down the pipeline
+    let thrownError = ChannelError.connectTimeout(.milliseconds(100))
+    channel.pipeline.fireErrorCaught(thrownError)
+
+    // The client receives a status explaining the stream was closed because of the thrown error.
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart.self),
+      .status(
+        .init(
+          code: .unavailable,
+          message: "Stream unexpectedly closed with error."
+        ),
+        [:]
+      )
+    )
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
+    }
+  }
+
+  func testUnexpectedStreamClose_ChannelInactive() throws {
+    let handler = GRPCClientStreamHandler(
+      methodDescriptor: .init(service: "test", method: "test"),
+      scheme: .http,
+      outboundEncoding: .none,
+      acceptedEncodings: [],
+      maximumPayloadSize: 1,
+      skipStateMachineAssertions: true
+    )
+
+    let channel = EmbeddedChannel(handler: handler)
+
+    // Write client's initial metadata
+    XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/test/test",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
+    XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
+
+    // Channel becomes inactive
+    channel.pipeline.fireChannelInactive()
+
+    // The client receives a status explaining the stream was closed.
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart.self),
+      .status(
+        .init(code: .unavailable, message: "Stream unexpectedly closed."),
+        [:]
+      )
+    )
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
+    }
+  }
+
+  func testUnexpectedStreamClose_ResetStreamFrame() throws {
+    let handler = GRPCClientStreamHandler(
+      methodDescriptor: .init(service: "test", method: "test"),
+      scheme: .http,
+      outboundEncoding: .none,
+      acceptedEncodings: [],
+      maximumPayloadSize: 1,
+      skipStateMachineAssertions: true
+    )
+
+    let channel = EmbeddedChannel(handler: handler)
+
+    // Write client's initial metadata
+    XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/test/test",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
+    XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
+
+    // Receive RST_STREAM
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.rstStream(.internalError)
+      )
+    )
+
+    // The client receives a status explaining RST_STREAM was sent.
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCResponsePart.self),
+      .status(
+        .init(
+          code: .unavailable,
+          message: "Stream unexpectedly closed: a RST_STREAM frame was received."
+        ),
+        [:]
+      )
+    )
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
+    }
+  }
 }
 
 extension EmbeddedChannel {

+ 165 - 0
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -976,6 +976,90 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
+  // - MARK: Unexpected close
+
+  func testUnexpectedCloseWhenServerIdleOrOpen() throws {
+    let thrownError = RPCError(code: .deadlineExceeded, message: "Test error")
+    let reasonAndExpectedStatusPairs = [
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.channelInactive,
+        Status(code: .unavailable, message: "Stream unexpectedly closed.")
+      ),
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.streamReset,
+        Status(
+          code: .unavailable,
+          message: "Stream unexpectedly closed: a RST_STREAM frame was received."
+        )
+      ),
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.errorThrown(thrownError),
+        Status(
+          code: .unavailable,
+          message: "Stream unexpectedly closed with error."
+        )
+      ),
+    ]
+    let states = [
+      TargetStateMachineState.clientIdleServerIdle,
+      .clientOpenServerIdle,
+      .clientOpenServerOpen,
+      .clientClosedServerIdle,
+      .clientClosedServerOpen,
+    ]
+
+    for state in states {
+      for (closeReason, expectedStatus) in reasonAndExpectedStatusPairs {
+        var stateMachine = self.makeClientStateMachine(targetState: state)
+        var action = stateMachine.unexpectedInboundClose(reason: closeReason)
+
+        guard case .forwardStatus_clientOnly(let status) = action else {
+          XCTFail("Should have been `fireError` but was `\(action)` (state: \(state)).")
+          return
+        }
+        XCTAssertEqual(status, expectedStatus)
+
+        // Calling unexpectedInboundClose again should return `doNothing` because
+        // we're already closed.
+        action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+      }
+    }
+  }
+
+  func testUnexpectedCloseWhenServerClosed() throws {
+    let closeReasons = [
+      GRPCStreamStateMachine.UnexpectedInboundCloseReason.channelInactive,
+      .streamReset,
+      .errorThrown(RPCError(code: .deadlineExceeded, message: "Test error")),
+    ]
+    let states = [
+      TargetStateMachineState.clientOpenServerClosed,
+      .clientClosedServerClosed,
+    ]
+
+    for state in states {
+      for closeReason in closeReasons {
+        var stateMachine = self.makeClientStateMachine(targetState: state)
+        var action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+
+        // Calling unexpectedInboundClose again should return `doNothing` again.
+        action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+      }
+    }
+  }
+
   // - MARK: Common paths
 
   func testNormalFlow() throws {
@@ -2426,6 +2510,87 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
+  // - MARK: Unexpected close
+
+  func testUnexpectedCloseWhenClientIdleOrOpen() throws {
+    let reasonAndExpectedErrorPairs = [
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.channelInactive,
+        RPCError(code: .unavailable, message: "Stream unexpectedly closed.")
+      ),
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.streamReset,
+        RPCError(
+          code: .unavailable,
+          message: "Stream unexpectedly closed: a RST_STREAM frame was received."
+        )
+      ),
+      (
+        GRPCStreamStateMachine.UnexpectedInboundCloseReason.errorThrown(
+          RPCError(code: .deadlineExceeded, message: "Test error")
+        ),
+        RPCError(code: .deadlineExceeded, message: "Test error")
+      ),
+    ]
+    let states = [
+      TargetStateMachineState.clientIdleServerIdle,
+      .clientOpenServerIdle,
+      .clientOpenServerOpen,
+      .clientOpenServerClosed,
+    ]
+
+    for state in states {
+      for (closeReason, expectedError) in reasonAndExpectedErrorPairs {
+        var stateMachine = self.makeServerStateMachine(targetState: state)
+        var action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .fireError_serverOnly(let error) = action else {
+          XCTFail("Should have been `fireError` but was `\(action)` (state: \(state)).")
+          return
+        }
+        XCTAssertEqual(error as? RPCError, expectedError)
+
+        // Calling unexpectedInboundClose again should return `doNothing` because
+        // we're already closed.
+        action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+      }
+    }
+  }
+
+  func testUnexpectedCloseWhenClientClosed() throws {
+    let closeReasons = [
+      GRPCStreamStateMachine.UnexpectedInboundCloseReason.channelInactive,
+      .streamReset,
+      .errorThrown(RPCError(code: .deadlineExceeded, message: "Test error")),
+    ]
+    let states = [
+      TargetStateMachineState.clientClosedServerIdle,
+      .clientClosedServerOpen,
+      .clientClosedServerClosed,
+    ]
+
+    for state in states {
+      for closeReason in closeReasons {
+        var stateMachine = self.makeServerStateMachine(targetState: state)
+        var action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+
+        // Calling unexpectedInboundClose again should return `doNothing` again.
+        action = stateMachine.unexpectedInboundClose(reason: closeReason)
+        guard case .doNothing = action else {
+          XCTFail("Should have been `doNothing` but was `\(action)` (state: \(state)).")
+          return
+        }
+      }
+    }
+  }
+
   // - MARK: Common paths
 
   func testNormalFlow() throws {

+ 178 - 4
Tests/GRPCHTTP2CoreTests/Server/GRPCServerStreamHandlerTests.swift

@@ -38,9 +38,10 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
     let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
       .ping(.init(), ack: false),
       .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
-      // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
-      // initialiser is internal, so I can't create one of these frames.
-      .rstStream(.cancel),
+      // TODO: uncomment when it's possible to build a `StreamPriorityData`.
+      // .priority(
+      //   HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
+      // ),
       .settings(.ack),
       .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
       .windowUpdate(windowSizeIncrement: 4),
@@ -646,7 +647,15 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
     XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
 
     // Receive them again. Should be a protocol violation.
-    try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    ) { error in
+      XCTAssertEqual(error.code, .unavailable)
+      XCTAssertEqual(error.message, "Stream unexpectedly closed.")
+    }
     let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
 
     switch payload {
@@ -912,6 +921,171 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
       XCTAssertEqual(error.message, "RPC was rejected.")
     }
   }
+
+  func testUnexpectedStreamClose_ErrorFired() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    // Receive client's initial metadata
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    )
+
+    // Make sure we haven't sent back an error response, and that we read the initial metadata
+    XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCRequestPart.self),
+      RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
+    )
+
+    // An error is fired down the pipeline
+    let thrownError = ChannelError.connectTimeout(.milliseconds(100))
+    channel.pipeline.fireErrorCaught(thrownError)
+
+    // The server handler simply forwards the error.
+    XCTAssertThrowsError(
+      ofType: type(of: thrownError),
+      try channel.throwIfErrorCaught()
+    ) { error in
+      XCTAssertEqual(error, thrownError)
+    }
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
+    }
+  }
+
+  func testUnexpectedStreamClose_ChannelInactive() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    // Receive client's initial metadata
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    )
+
+    // Make sure we haven't sent back an error response, and that we read the initial metadata
+    XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCRequestPart.self),
+      RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
+    )
+
+    // Channel becomes inactive
+    channel.pipeline.fireChannelInactive()
+
+    // The server handler fires an error
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.throwIfErrorCaught()
+    ) { error in
+      XCTAssertEqual(error.code, .unavailable)
+      XCTAssertEqual(error.message, "Stream unexpectedly closed.")
+    }
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
+    }
+  }
+
+  func testUnexpectedStreamClose_ResetStreamFrame() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    // Receive client's initial metadata
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    )
+
+    // Make sure we haven't sent back an error response, and that we read the initial metadata
+    XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCRequestPart.self),
+      RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
+    )
+
+    // We receive RST_STREAM frame
+    // Assert the server handler fires an error
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.rstStream(.internalError)
+      )
+    ) { error in
+      XCTAssertEqual(error.code, .unavailable)
+      XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
+    }
+
+    // We should now be closed: check we can't write anymore.
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
+    }
+  }
 }
 
 extension EmbeddedChannel {