Przeglądaj źródła

Use new http/2 frame delegate (#120)

Motivation:

When the server monitors client pings it resets its stream count after
writing each HEADERS or DATA frame. At the moment this is done by stream
channels when they write a frame into the parent channel. However, this
isn't correct: the parent channel may choose to delay sending a frame or
may chunk DATA frames up to respect various limits.

Modifications:

- Use the new http/2 frame delegate which is notified when the parent
channel does the write

Result:

- https://github.com/grpc/grpc-swift-2/issues/5
George Barnett 4 miesięcy temu
rodzic
commit
170a835845

+ 1 - 1
Package.swift

@@ -43,7 +43,7 @@ let dependencies: [Package.Dependency] = [
   ),
   .package(
     url: "https://github.com/apple/swift-nio-http2.git",
-    from: "1.35.0"
+    from: "1.38.0"
   ),
   .package(
     url: "https://github.com/apple/swift-nio-transport-services.git",

+ 2 - 6
Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift

@@ -79,13 +79,10 @@ extension ChannelPipeline.SynchronousOperations {
     var http2HandlerStreamConfiguration = NIOHTTP2Handler.StreamConfiguration()
     http2HandlerStreamConfiguration.targetWindowSize = clampedTargetWindowSize
 
-    let boundConnectionManagementHandler = NIOLoopBound(
-      serverConnectionHandler.syncView,
-      eventLoop: self.eventLoop
-    )
     let streamMultiplexer = try self.configureAsyncHTTP2Pipeline(
       mode: .server,
       streamDelegate: serverConnectionHandler.http2StreamDelegate,
+      frameDelegate: serverConnectionHandler.syncView,
       configuration: NIOHTTP2Handler.Configuration(
         connection: http2HandlerConnectionConfiguration,
         stream: http2HandlerStreamConfiguration
@@ -98,8 +95,7 @@ extension ChannelPipeline.SynchronousOperations {
           acceptedEncodings: compressionConfig.enabledAlgorithms,
           maxPayloadSize: rpcConfig.maxRequestPayloadSize,
           methodDescriptorPromise: methodDescriptorPromise,
-          eventLoop: streamChannel.eventLoop,
-          connectionManagementHandler: boundConnectionManagementHandler.value
+          eventLoop: streamChannel.eventLoop
         )
         try streamChannel.pipeline.syncOperations.addHandler(streamHandler)
 

+ 13 - 10
Sources/GRPCNIOTransportCore/Server/Connection/ServerConnectionManagementHandler.swift

@@ -152,7 +152,7 @@ package final class ServerConnectionManagementHandler: ChannelDuplexHandler {
   ///
   /// Methods on this view *must* be called from the same `EventLoop` as the `Channel` in which
   /// this handler exists.
-  package struct SyncView {
+  package struct SyncView: NIOHTTP2FrameDelegate {
     private let handler: ServerConnectionManagementHandler
 
     fileprivate init(_ handler: ServerConnectionManagementHandler) {
@@ -174,16 +174,18 @@ package final class ServerConnectionManagementHandler: ChannelDuplexHandler {
       }
     }
 
-    /// Notify the handler that a HEADERS frame was written in the last write loop.
-    package func wroteHeadersFrame() {
-      self.handler.eventLoop.assertInEventLoop()
-      self.handler.frameStats.wroteHeaders()
-    }
+    /// Notify the handler that an HTTP/2 frame was written.
+    package func wroteFrame(_ frame: HTTP2Frame) {
+      // Only interested in HEADERS and DATA frames.
+      switch frame.payload {
+      case .headers:
+        self.handler.frameStats.wroteHeaders()
+      case .data:
+        self.handler.frameStats.wroteData()
+      default:
+        ()
+      }
 
-    /// Notify the handler that a DATA frame was written in the last write loop.
-    package func wroteDataFrame() {
-      self.handler.eventLoop.assertInEventLoop()
-      self.handler.frameStats.wroteData()
     }
   }
 
@@ -515,6 +517,7 @@ extension ServerConnectionManagementHandler {
         }
       }
     }
+
   }
 
   package var http2StreamDelegate: HTTP2StreamDelegate {

+ 0 - 6
Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift

@@ -43,8 +43,6 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
 
   private var cancellationHandle: Optional<ServerContext.RPCCancellationHandle>
 
-  package let connectionManagementHandler: ServerConnectionManagementHandler.SyncView
-
   // Existential errors unconditionally allocate, avoid this per-use allocation by doing it
   // statically.
   private static let handlerRemovedBeforeDescriptorResolved: any Error = RPCError(
@@ -58,7 +56,6 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
     maxPayloadSize: Int,
     methodDescriptorPromise: EventLoopPromise<MethodDescriptor>,
     eventLoop: any EventLoop,
-    connectionManagementHandler: ServerConnectionManagementHandler.SyncView,
     cancellationHandler: ServerContext.RPCCancellationHandle? = nil,
     skipStateMachineAssertions: Bool = false
   ) {
@@ -70,7 +67,6 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
     self.methodDescriptorPromise = methodDescriptorPromise
     self.cancellationHandle = cancellationHandler
     self.eventLoop = eventLoop
-    self.connectionManagementHandler = connectionManagementHandler
   }
 
   package func setCancellationHandle(_ handle: ServerContext.RPCCancellationHandle) {
@@ -272,7 +268,6 @@ extension GRPCServerStreamHandler {
         self.flushPending = true
         let headers = try self.stateMachine.send(metadata: metadata)
         context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
-        self.connectionManagementHandler.wroteHeadersFrame()
       } catch let invalidState {
         let error = RPCError(invalidState)
         promise?.fail(error)
@@ -282,7 +277,6 @@ extension GRPCServerStreamHandler {
     case .message(let message):
       do {
         try self.stateMachine.send(message: message.buffer, promise: promise)
-        self.connectionManagementHandler.wroteDataFrame()
       } catch let invalidState {
         let error = RPCError(invalidState)
         promise?.fail(error)

+ 1 - 13
Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift

@@ -116,24 +116,12 @@ extension ConnectionTest {
             let h2 = NIOHTTP2Handler(mode: .server)
             let mux = HTTP2StreamMultiplexer(mode: .server, channel: channel) { stream in
               let sync = stream.pipeline.syncOperations
-              let connectionManagementHandler = ServerConnectionManagementHandler(
-                eventLoop: stream.eventLoop,
-                maxIdleTime: nil,
-                maxAge: nil,
-                maxGraceTime: nil,
-                keepaliveTime: nil,
-                keepaliveTimeout: nil,
-                allowKeepaliveWithoutCalls: false,
-                minPingIntervalWithoutCalls: .minutes(5),
-                requireALPN: false
-              )
               let handler = GRPCServerStreamHandler(
                 scheme: .http,
                 acceptedEncodings: .none,
                 maxPayloadSize: .max,
                 methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
-                eventLoop: stream.eventLoop,
-                connectionManagementHandler: connectionManagementHandler.syncView
+                eventLoop: stream.eventLoop
               )
 
               return stream.eventLoop.makeCompletedFuture {

+ 1 - 13
Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift

@@ -73,24 +73,12 @@ final class TestServer: Sendable {
         let sync = channel.pipeline.syncOperations
         let multiplexer = try sync.configureAsyncHTTP2Pipeline(mode: .server) { stream in
           stream.eventLoop.makeCompletedFuture {
-            let connectionManagementHandler = ServerConnectionManagementHandler(
-              eventLoop: stream.eventLoop,
-              maxIdleTime: nil,
-              maxAge: nil,
-              maxGraceTime: nil,
-              keepaliveTime: nil,
-              keepaliveTimeout: nil,
-              allowKeepaliveWithoutCalls: false,
-              minPingIntervalWithoutCalls: .minutes(5),
-              requireALPN: false
-            )
             let handler = GRPCServerStreamHandler(
               scheme: .http,
               acceptedEncodings: .all,
               maxPayloadSize: .max,
               methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
-              eventLoop: stream.eventLoop,
-              connectionManagementHandler: connectionManagementHandler.syncView
+              eventLoop: stream.eventLoop
             )
 
             try stream.pipeline.syncOperations.addHandlers(handler)

+ 9 - 1
Tests/GRPCNIOTransportCoreTests/Server/Connection/ServerConnectionManagementHandlerTests.swift

@@ -17,6 +17,7 @@
 import GRPCNIOTransportCore
 import NIOCore
 import NIOEmbedded
+import NIOHPACK
 import NIOHTTP2
 import Testing
 
@@ -285,7 +286,12 @@ struct ServerConnectionManagementHandlerTests {
     try sendThreeKeepalivePings()
 
     // "send" a HEADERS frame and flush to reset keep alive state.
-    connection.syncView.wroteHeadersFrame()
+    connection.frameDelegate.wroteFrame(
+      HTTP2Frame(
+        streamID: 1,
+        payload: .headers(.init(headers: [:]))
+      )
+    )
     connection.syncView.connectionWillFlush()
 
     // As above, the first ping is valid, the next two are strikes.
@@ -391,6 +397,7 @@ extension ServerConnectionManagementHandlerTests {
   struct Connection {
     let channel: EmbeddedChannel
     let streamDelegate: any NIOHTTP2StreamDelegate
+    let frameDelegate: any NIOHTTP2FrameDelegate
     let syncView: ServerConnectionManagementHandler.SyncView
 
     var loop: EmbeddedEventLoop {
@@ -430,6 +437,7 @@ extension ServerConnectionManagementHandlerTests {
       )
 
       self.streamDelegate = handler.http2StreamDelegate
+      self.frameDelegate = handler.syncView
       self.syncView = handler.syncView
       self.channel = EmbeddedChannel(handler: handler, loop: loop)
     }

+ 0 - 61
Tests/GRPCNIOTransportCoreTests/Server/GRPCServerStreamHandlerTests.swift

@@ -34,25 +34,12 @@ final class GRPCServerStreamHandlerTests: XCTestCase {
     descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
     disableAssertions: Bool = false
   ) -> GRPCServerStreamHandler {
-    let serverConnectionManagementHandler = ServerConnectionManagementHandler(
-      eventLoop: channel.eventLoop,
-      maxIdleTime: nil,
-      maxAge: nil,
-      maxGraceTime: nil,
-      keepaliveTime: nil,
-      keepaliveTimeout: nil,
-      allowKeepaliveWithoutCalls: false,
-      minPingIntervalWithoutCalls: .minutes(5),
-      requireALPN: false
-    )
-
     return GRPCServerStreamHandler(
       scheme: scheme,
       acceptedEncodings: acceptedEncodings,
       maxPayloadSize: maxPayloadSize,
       methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
       eventLoop: channel.eventLoop,
-      connectionManagementHandler: serverConnectionManagementHandler.syncView,
       skipStateMachineAssertions: disableAssertions
     )
   }
@@ -1042,7 +1029,6 @@ struct ServerStreamHandlerTests {
       maxPayloadSize: maxPayloadSize,
       methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
       eventLoop: channel.eventLoop,
-      connectionManagementHandler: connectionManagementHandler.syncView,
       skipStateMachineAssertions: disableAssertions
     )
 
@@ -1107,53 +1093,6 @@ struct ServerStreamHandlerTests {
     // Throwing is fine: the channel is closed abruptly, errors are expected.
     _ = try? channel.finish()
   }
-
-  @Test("Connection FrameStats are updated when writing headers or data frames")
-  @available(gRPCSwiftNIOTransport 2.0, *)
-  func connectionFrameStatsAreUpdatedAccordingly() async throws {
-    let channel = EmbeddedChannel()
-    let handlers = self.makeServerConnectionAndStreamHandlers(channel: channel)
-    try channel.pipeline.syncOperations.addHandler(handlers.streamHandler)
-
-    // We have written nothing yet, so expect FrameStats/didWriteHeadersOrData to be false
-    #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
-
-    // FrameStats aren't affected by pings received
-    channel.pipeline.fireChannelRead(
-      HTTP2Frame.FramePayload.ping(.init(withInteger: 42), ack: false)
-    )
-    #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
-
-    // Now write back headers and make sure FrameStats are updated accordingly:
-    // To do that, we first need to receive client's initial metadata...
-    let clientInitialMetadata: HPACKHeaders = [
-      GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
-      GRPCHTTP2Keys.scheme.rawValue: "http",
-      GRPCHTTP2Keys.method.rawValue: "POST",
-      GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
-      GRPCHTTP2Keys.te.rawValue: "trailers",
-    ]
-    try channel.writeInbound(
-      HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
-    )
-
-    // Now we write back server's initial metadata...
-    let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata([:])
-    try channel.writeOutbound(serverInitialMetadata)
-
-    // And this should have updated the FrameStats
-    #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
-
-    // Manually reset the FrameStats to make sure that writing data also updates it correctly.
-    handlers.connectionHandler.frameStats.reset()
-    #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
-    try channel.writeOutbound(RPCResponsePart.message(GRPCNIOTransportBytes([42])))
-    #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
-
-    // Clean up.
-    // Throwing is fine: the channel is closed abruptly, errors are expected.
-    _ = try? channel.finish()
-  }
 }
 
 extension EmbeddedChannel {