Procházet zdrojové kódy

Server transport changes (#1891)

Gustavo Cairo před 1 rokem
rodič
revize
1bdb831b95

+ 1 - 1
Sources/GRPCHTTP2Core/Client/Connection/Connection.swift

@@ -408,7 +408,7 @@ extension Connection {
       /// Multiplexer for creating HTTP/2 streams.
       var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
       /// Whether the connection is plaintext, `false` implies TLS is being used.
-      var scheme: Scheme
+      var scheme: GRPCStreamStateMachineConfiguration.Scheme
 
       init(_ connection: HTTP2Connection) {
         self.channel = connection.channel

+ 2 - 2
Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

@@ -33,7 +33,7 @@ final class GRPCClientStreamHandler: ChannelDuplexHandler {
 
   init(
     methodDescriptor: MethodDescriptor,
-    scheme: Scheme,
+    scheme: GRPCStreamStateMachineConfiguration.Scheme,
     outboundEncoding: CompressionAlgorithm,
     acceptedEncodings: CompressionAlgorithmSet,
     maximumPayloadSize: Int,
@@ -103,7 +103,7 @@ extension GRPCClientStreamHandler {
           endStream: headers.endStream
         )
         switch action {
-        case .receivedMetadata(let metadata):
+        case .receivedMetadata(let metadata, _):
           context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
 
         case .rejectRPC:

+ 25 - 15
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -19,15 +19,15 @@ import NIOCore
 import NIOHPACK
 import NIOHTTP1
 
-enum Scheme: String {
-  case http
-  case https
-}
-
 enum GRPCStreamStateMachineConfiguration {
   case client(ClientConfiguration)
   case server(ServerConfiguration)
 
+  enum Scheme: String {
+    case http
+    case https
+  }
+
   struct ClientConfiguration {
     var methodDescriptor: MethodDescriptor
     var scheme: Scheme
@@ -384,7 +384,7 @@ struct GRPCStreamStateMachine {
   }
 
   enum OnMetadataReceived: Equatable {
-    case receivedMetadata(Metadata)
+    case receivedMetadata(Metadata, MethodDescriptor?)
 
     // Client-specific actions
     case receivedStatusAndMetadata(status: Status, metadata: Metadata)
@@ -505,7 +505,7 @@ struct GRPCStreamStateMachine {
 extension GRPCStreamStateMachine {
   private func makeClientHeaders(
     methodDescriptor: MethodDescriptor,
-    scheme: Scheme,
+    scheme: GRPCStreamStateMachineConfiguration.Scheme,
     outboundEncoding: CompressionAlgorithm?,
     acceptedEncodings: CompressionAlgorithmSet,
     customMetadata: Metadata
@@ -817,7 +817,7 @@ extension GRPCStreamStateMachine {
               decompressor: decompressor
             )
           )
-          return .receivedMetadata(Metadata(headers: headers))
+          return .receivedMetadata(Metadata(headers: headers), nil)
         }
       }
 
@@ -857,7 +857,7 @@ extension GRPCStreamStateMachine {
               decompressionAlgorithm: inboundEncoding
             )
           )
-          return .receivedMetadata(Metadata(headers: headers))
+          return .receivedMetadata(Metadata(headers: headers), nil)
         }
       }
 
@@ -1211,21 +1211,31 @@ extension GRPCStreamStateMachine {
         return .rejectRPC(trailers: trailers)
       }
 
-      let path = headers.firstString(forKey: .path)
-        .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
-      if path == nil {
+      guard let pathHeader = headers.firstString(forKey: .path) else {
         return self.closeServerAndBuildRejectRPCAction(
           currentState: state,
           endStream: endStream,
           rejectWithStatus: Status(
-            code: .unimplemented,
+            code: .invalidArgument,
             message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
           )
         )
       }
 
+      guard let path = MethodDescriptor(fullyQualifiedMethod: pathHeader) else {
+        return self.closeServerAndBuildRejectRPCAction(
+          currentState: state,
+          endStream: endStream,
+          rejectWithStatus: Status(
+            code: .unimplemented,
+            message:
+              "The given \(GRPCHTTP2Keys.path.rawValue) (\(pathHeader)) does not correspond to a valid method."
+          )
+        )
+      }
+
       let scheme = headers.firstString(forKey: .scheme)
-        .flatMap { Scheme(rawValue: $0) }
+        .flatMap { GRPCStreamStateMachineConfiguration.Scheme(rawValue: $0) }
       if scheme == nil {
         return self.closeServerAndBuildRejectRPCAction(
           currentState: state,
@@ -1355,7 +1365,7 @@ extension GRPCStreamStateMachine {
         )
       }
 
-      return .receivedMetadata(Metadata(headers: headers))
+      return .receivedMetadata(Metadata(headers: headers), path)
     case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
       try self.invalidState(
         "Client shouldn't have sent metadata twice."

+ 3 - 0
Sources/GRPCHTTP2Core/Server/Connection/ServerConnectionManagementHandler.swift

@@ -279,6 +279,9 @@ final class ServerConnectionManagementHandler: ChannelDuplexHandler {
     case let event as StreamClosedEvent:
       self.streamClosed(event.streamID, channel: context.channel)
 
+    case is ChannelShouldQuiesceEvent:
+      self.initiateGracefulShutdown(context: context)
+
     default:
       ()
     }

+ 24 - 3
Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

@@ -37,10 +37,13 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler {
   private var pendingTrailers:
     (trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise<Void>?)?
 
+  private let methodDescriptorPromise: EventLoopPromise<MethodDescriptor>
+
   init(
-    scheme: Scheme,
+    scheme: GRPCStreamStateMachineConfiguration.Scheme,
     acceptedEncodings: CompressionAlgorithmSet,
     maximumPayloadSize: Int,
+    methodDescriptorPromise: EventLoopPromise<MethodDescriptor>,
     skipStateMachineAssertions: Bool = false
   ) {
     self.stateMachine = .init(
@@ -48,6 +51,7 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler {
       maximumPayloadSize: maximumPayloadSize,
       skipAssertions: skipStateMachineAssertions
     )
+    self.methodDescriptorPromise = methodDescriptorPromise
   }
 }
 
@@ -97,11 +101,22 @@ extension GRPCServerStreamHandler {
           endStream: headers.endStream
         )
         switch action {
-        case .receivedMetadata(let metadata):
-          context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
+        case .receivedMetadata(let metadata, let methodDescriptor):
+          if let methodDescriptor = methodDescriptor {
+            self.methodDescriptorPromise.succeed(methodDescriptor)
+            context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
+          } else {
+            assertionFailure("Method descriptor should have been present if we received metadata.")
+          }
 
         case .rejectRPC(let trailers):
           self.flushPending = true
+          self.methodDescriptorPromise.fail(
+            RPCError(
+              code: .unavailable,
+              message: "RPC was rejected."
+            )
+          )
           let response = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
           context.write(self.wrapOutboundOut(response), promise: nil)
 
@@ -135,6 +150,12 @@ extension GRPCServerStreamHandler {
 
   func handlerRemoved(context: ChannelHandlerContext) {
     self.stateMachine.tearDown()
+    self.methodDescriptorPromise.fail(
+      RPCError(
+        code: .unavailable,
+        message: "RPC stream was closed before we got any Metadata."
+      )
+    )
   }
 }
 

+ 2 - 1
Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/ConnectionTest.swift

@@ -116,7 +116,8 @@ extension ConnectionTest {
               let handler = GRPCServerStreamHandler(
                 scheme: .http,
                 acceptedEncodings: .none,
-                maximumPayloadSize: .max
+                maximumPayloadSize: .max,
+                methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
               )
 
               return stream.eventLoop.makeCompletedFuture {

+ 2 - 1
Tests/GRPCHTTP2CoreTests/Client/Connection/Utilities/TestServer.swift

@@ -73,7 +73,8 @@ final class TestServer: Sendable {
             let handler = GRPCServerStreamHandler(
               scheme: .http,
               acceptedEncodings: .all,
-              maximumPayloadSize: .max
+              maximumPayloadSize: .max,
+              methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
             )
 
             try stream.pipeline.syncOperations.addHandlers(handler)

+ 72 - 22
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -66,6 +66,10 @@ extension HPACKHeaders {
     GRPCHTTP2Keys.path.rawValue: "test/test",
     GRPCHTTP2Keys.contentType.rawValue: "invalid/invalid",
   ]
+  fileprivate static let receivedWithInvalidPath: Self = [
+    GRPCHTTP2Keys.path.rawValue: "someinvalidpath",
+    GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+  ]
   fileprivate static let receivedWithoutEndpoint: Self = [
     GRPCHTTP2Keys.contentType.rawValue: "application/grpc"
   ]
@@ -449,7 +453,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
         "custom": "123",
       ]
       expectedMetadata.addBinary([42, 43, 44], forKey: "custom-bin")
-      XCTAssertEqual(action, .receivedMetadata(expectedMetadata))
+      XCTAssertEqual(action, .receivedMetadata(expectedMetadata, nil))
     }
   }
 
@@ -1002,11 +1006,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       serverInitialHeadersAction,
-      .receivedMetadata([
-        ":status": "200",
-        "content-type": "application/grpc",
-        "grpc-accept-encoding": "deflate",
-      ])
+      .receivedMetadata(
+        [
+          ":status": "200",
+          "content-type": "application/grpc",
+          "grpc-accept-encoding": "deflate",
+        ],
+        nil
+      )
     )
 
     // Client sends messages
@@ -1102,11 +1109,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       serverInitialHeadersAction,
-      .receivedMetadata([
-        ":status": "200",
-        "content-type": "application/grpc",
-        "grpc-accept-encoding": "deflate",
-      ])
+      .receivedMetadata(
+        [
+          ":status": "200",
+          "content-type": "application/grpc",
+          "grpc-accept-encoding": "deflate",
+        ],
+        nil
+      )
     )
 
     // Server sends response
@@ -1186,11 +1196,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       serverInitialHeadersAction,
-      .receivedMetadata([
-        ":status": "200",
-        "content-type": "application/grpc",
-        "grpc-accept-encoding": "deflate",
-      ])
+      .receivedMetadata(
+        [
+          ":status": "200",
+          "content-type": "application/grpc",
+          "grpc-accept-encoding": "deflate",
+        ],
+        nil
+      )
     )
 
     // Client closes
@@ -1631,7 +1644,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let action = try stateMachine.receive(headers: .clientInitialMetadata, endStream: false)
     XCTAssertEqual(
       action,
-      .receivedMetadata(Metadata(headers: .clientInitialMetadata))
+      .receivedMetadata(
+        Metadata(headers: .clientInitialMetadata),
+        MethodDescriptor(fullyQualifiedMethod: "test/test")
+      )
     )
   }
 
@@ -1641,7 +1657,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let action = try stateMachine.receive(headers: .clientInitialMetadata, endStream: true)
     XCTAssertEqual(
       action,
-      .receivedMetadata(Metadata(headers: .clientInitialMetadata))
+      .receivedMetadata(
+        Metadata(headers: .clientInitialMetadata),
+        MethodDescriptor(fullyQualifiedMethod: "test/test")
+      )
     )
   }
 
@@ -1687,13 +1706,35 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
         [
           ":status": "200",
           "content-type": "application/grpc",
-          "grpc-status": "12",
+          "grpc-status": String(Status.Code.invalidArgument.rawValue),
           "grpc-status-message": "No :path header has been set.",
         ]
       )
     }
   }
 
+  func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidPath() throws {
+    var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
+
+    let action = try stateMachine.receive(
+      headers: .receivedWithInvalidPath,
+      endStream: false
+    )
+
+    self.assertRejectedRPC(action) { trailers in
+      XCTAssertEqual(
+        trailers,
+        [
+          ":status": "200",
+          "content-type": "application/grpc",
+          "grpc-status": String(Status.Code.unimplemented.rawValue),
+          "grpc-status-message":
+            "The given :path (someinvalidpath) does not correspond to a valid method.",
+        ]
+      )
+    }
+  }
+
   func testReceiveMetadataWhenClientIdleAndServerIdle_MissingTE() throws {
     var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
 
@@ -2376,7 +2417,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       receiveMetadataAction,
-      .receivedMetadata(Metadata(headers: .clientInitialMetadata))
+      .receivedMetadata(
+        Metadata(headers: .clientInitialMetadata),
+        MethodDescriptor(fullyQualifiedMethod: "test/test")
+      )
     )
 
     // Server sends initial metadata
@@ -2470,7 +2514,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       receiveMetadataAction,
-      .receivedMetadata(Metadata(headers: .clientInitialMetadata))
+      .receivedMetadata(
+        Metadata(headers: .clientInitialMetadata),
+        MethodDescriptor(fullyQualifiedMethod: "test/test")
+      )
     )
 
     // Client sends messages
@@ -2547,7 +2594,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(
       receiveMetadataAction,
-      .receivedMetadata(Metadata(headers: .clientInitialMetadata))
+      .receivedMetadata(
+        Metadata(headers: .clientInitialMetadata),
+        MethodDescriptor(fullyQualifiedMethod: "test/test")
+      )
     )
 
     // Client sends messages

+ 153 - 40
Tests/GRPCHTTP2CoreTests/Server/GRPCServerStreamHandlerTests.swift

@@ -26,13 +26,14 @@ import XCTest
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 final class GRPCServerStreamHandlerTests: XCTestCase {
   func testH2FramesAreIgnored() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
       .ping(.init(), ack: false),
@@ -54,13 +55,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata without content-type
     let clientInitialMetadata: HPACKHeaders = [
@@ -83,13 +85,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata without :method
     let clientInitialMetadata: HPACKHeaders = [
@@ -121,13 +124,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata without :scheme
     let clientInitialMetadata: HPACKHeaders = [
@@ -159,13 +163,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata without :path
     let clientInitialMetadata: HPACKHeaders = [
@@ -188,7 +193,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
       [
         GRPCHTTP2Keys.status.rawValue: "200",
         GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
-        GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
+        GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
         GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
       ]
     )
@@ -196,13 +201,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata without TE
     let clientInitialMetadata: HPACKHeaders = [
@@ -234,13 +240,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testNotAcceptedEncodingResultsInRejectedRPC() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -275,13 +282,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testOverMaximumPayloadSize() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 1
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -346,14 +354,15 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testClientEndsStream() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100,
+      maximumPayloadSize: 1,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
       skipStateMachineAssertions: true
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata with end stream set
     let clientInitialMetadata: HPACKHeaders = [
@@ -411,14 +420,15 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testNormalFlow() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100,
+      maximumPayloadSize: 42,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
       skipStateMachineAssertions: true
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -520,13 +530,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testReceiveMessageSplitAcrossMultipleBuffers() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -615,13 +626,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testSendMultipleMessagesInSingleBuffer() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -692,13 +704,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
   }
 
   func testMessageAndStatusAreNotReordered() throws {
+    let channel = EmbeddedChannel()
     let handler = GRPCServerStreamHandler(
       scheme: .http,
       acceptedEncodings: [],
-      maximumPayloadSize: 100
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
     )
-
-    let channel = EmbeddedChannel(handler: handler)
+    try channel.pipeline.syncOperations.addHandler(handler)
 
     // Receive client's initial metadata
     let clientInitialMetadata: HPACKHeaders = [
@@ -770,6 +783,103 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
     // Make sure we get nothing else.
     XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
   }
+
+  func testMethodDescriptorPromiseSucceeds() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    // Receive client's initial metadata
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    )
+
+    // Make sure we haven't sent back an error response, and that we read the initial metadata
+    XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
+    XCTAssertEqual(
+      try channel.readInbound(as: RPCRequestPart.self),
+      RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
+    )
+
+    XCTAssertEqual(
+      try promise.futureResult.wait(),
+      MethodDescriptor(fullyQualifiedMethod: "SomeService/SomeMethod")
+    )
+  }
+
+  func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    try channel.pipeline.syncOperations.removeHandler(handler).wait()
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try promise.futureResult.wait()
+    ) { error in
+      XCTAssertEqual(error.code, .unavailable)
+      XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
+    }
+  }
+
+  func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
+    let channel = EmbeddedChannel()
+    let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
+    let handler = GRPCServerStreamHandler(
+      scheme: .http,
+      acceptedEncodings: [],
+      maximumPayloadSize: 100,
+      methodDescriptorPromise: promise,
+      skipStateMachineAssertions: true
+    )
+    try channel.pipeline.syncOperations.addHandler(handler)
+
+    // Receive client's initial metadata
+    let clientInitialMetadata: HPACKHeaders = [
+      GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
+      GRPCHTTP2Keys.scheme.rawValue: "http",
+      GRPCHTTP2Keys.method.rawValue: "POST",
+      GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
+      GRPCHTTP2Keys.te.rawValue: "trailers",
+    ]
+    XCTAssertNoThrow(
+      try channel.writeInbound(
+        HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
+      )
+    )
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try promise.futureResult.wait()
+    ) { error in
+      XCTAssertEqual(error.code, .unavailable)
+      XCTAssertEqual(error.message, "RPC was rejected.")
+    }
+  }
 }
 
 extension EmbeddedChannel {
@@ -799,3 +909,6 @@ extension EmbeddedChannel {
 private enum TestError: Error {
   case assertionFailure(String)
 }
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension GRPCServerStreamHandler: RemovableChannelHandler {}