Browse Source

Don't ack pings twice (#1534)

Motivation:

gRPC Swift is emitting two acks per ping. NIOHTTP2 is emitting one and
the keepalive handler is emitting the other.

Modifications:

- Don't emit ping acks from the keep alive handler; just let the H2
  handler do it.

Result:

- No unnecessary ping acks are emitted.
- Resolves #1520
George Barnett 3 years ago
parent
commit
9fc6ead754

+ 4 - 0
Sources/GRPC/GRPCIdleHandler.swift

@@ -184,6 +184,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     case .none:
       ()
 
+    case .ack:
+      // NIO's HTTP2 handler acks for us so this is a no-op.
+      ()
+
     case .cancelScheduledTimeout:
       self.scheduledClose?.cancel()
       self.scheduledClose = nil

+ 6 - 6
Sources/GRPC/GRPCKeepaliveHandlers.swift

@@ -90,6 +90,7 @@ struct PingHandler {
 
   enum Action {
     case none
+    case ack
     case schedulePing(delay: TimeAmount, timeout: TimeAmount)
     case cancelScheduledTimeout
     case reply(HTTP2Frame.FramePayload)
@@ -170,14 +171,14 @@ struct PingHandler {
         // This is a valid ping, reset our strike count and reply with a pong.
         self.pingStrikes = 0
         self.lastReceivedPingDate = self.now()
-        return .reply(self.generatePingFrame(data: pingData, ack: true))
+        return .ack
       }
     } else {
       // We don't support ping strikes. We'll just reply with a pong.
       //
       // Note: we don't need to update `pingStrikes` or `lastReceivedPingDate` as we don't
       // support ping strikes.
-      return .reply(self.generatePingFrame(data: pingData, ack: true))
+      return .ack
     }
   }
 
@@ -185,20 +186,19 @@ struct PingHandler {
     if self.shouldBlockPing {
       return .none
     } else {
-      return .reply(self.generatePingFrame(data: self.pingData, ack: false))
+      return .reply(self.generatePingFrame(data: self.pingData))
     }
   }
 
   private mutating func generatePingFrame(
-    data: HTTP2PingData,
-    ack: Bool
+    data: HTTP2PingData
   ) -> HTTP2Frame.FramePayload {
     if self.activeStreams == 0 {
       self.sentPingsWithoutData += 1
     }
 
     self.lastSentPingDate = self.now()
-    return HTTP2Frame.FramePayload.ping(data, ack: ack)
+    return HTTP2Frame.FramePayload.ping(data, ack: false)
   }
 
   /// Returns true if, on receipt of a ping, the ping should be regarded as a ping strike.

+ 96 - 28
Tests/GRPCTests/GRPCPingHandlerTests.swift

@@ -15,6 +15,7 @@
  */
 @testable import GRPC
 import NIOCore
+import NIOEmbedded
 import NIOHTTP2
 import XCTest
 
@@ -249,24 +250,15 @@ class GRPCPingHandlerTests: GRPCTestCase {
       pingData: HTTP2PingData(withInteger: 1),
       ack: false
     )
-    XCTAssertEqual(
-      response,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(response, .ack)
 
     // Received another ping, response should be a pong (ping strikes not in effect)
     response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
-    XCTAssertEqual(
-      response,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(response, .ack)
 
     // Received another ping, response should be a pong (ping strikes not in effect)
     response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
-    XCTAssertEqual(
-      response,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(response, .ack)
   }
 
   func testPingWithoutDataResultsInPongForClient() {
@@ -274,10 +266,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
     self.setupPingHandler(permitWithoutCalls: false)
 
     let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
-    XCTAssertEqual(
-      action,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(action, .ack)
   }
 
   func testPingWithoutDataResultsInPongForServer() {
@@ -291,10 +280,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
     )
 
     let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
-    XCTAssertEqual(
-      action,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(action, .ack)
   }
 
   func testPingStrikesOnServer() {
@@ -312,10 +298,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
       pingData: HTTP2PingData(withInteger: 1),
       ack: false
     )
-    XCTAssertEqual(
-      response,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(response, .ack)
 
     // Received another ping, which is invalid (ping strike), response should be no action
     response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
@@ -326,10 +309,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
 
     // Received another ping, which is valid now, response should be a pong
     response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
-    XCTAssertEqual(
-      response,
-      .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
-    )
+    XCTAssertEqual(response, .ack)
 
     // Received another ping, which is invalid (ping strike), response should be no action
     response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
@@ -381,6 +361,8 @@ extension PingHandler.Action: Equatable {
     switch (lhs, rhs) {
     case (.none, .none):
       return true
+    case (.ack, .ack):
+      return true
     case (let .schedulePing(lhsDelay, lhsTimeout), let .schedulePing(rhsDelay, rhsTimeout)):
       return lhsDelay == rhsDelay && lhsTimeout == rhsTimeout
     case (.cancelScheduledTimeout, .cancelScheduledTimeout):
@@ -401,3 +383,89 @@ extension PingHandler.Action: Equatable {
     }
   }
 }
+
+extension GRPCPingHandlerTests {
+  func testSingleAckIsEmittedOnPing() throws {
+    let client = EmbeddedChannel()
+    let _ = try client.configureHTTP2Pipeline(mode: .client) { _ in
+      fatalError("Unexpected inbound stream")
+    }.wait()
+
+    let server = EmbeddedChannel()
+    let serverMux = try server.configureHTTP2Pipeline(mode: .server) { _ in
+      fatalError("Unexpected inbound stream")
+    }.wait()
+
+    let idleHandler = GRPCIdleHandler(
+      idleTimeout: .minutes(5),
+      keepalive: .init(),
+      logger: self.serverLogger
+    )
+    try server.pipeline.syncOperations.addHandler(idleHandler, position: .before(serverMux))
+    try server.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()
+    try client.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()
+
+    func interact(client: EmbeddedChannel, server: EmbeddedChannel) throws {
+      var didRead = true
+      while didRead {
+        didRead = false
+
+        if let data = try client.readOutbound(as: ByteBuffer.self) {
+          didRead = true
+          try server.writeInbound(data)
+        }
+
+        if let data = try server.readOutbound(as: ByteBuffer.self) {
+          didRead = true
+          try client.writeInbound(data)
+        }
+      }
+    }
+
+    try interact(client: client, server: server)
+
+    // Settings.
+    let f1 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
+    f1.payload.assertSettings(ack: false)
+
+    // Settings ack.
+    let f2 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
+    f2.payload.assertSettings(ack: true)
+
+    // Send a ping.
+    let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(.init(withInteger: 42), ack: false))
+    try client.writeOutbound(ping)
+    try interact(client: client, server: server)
+
+    // Ping ack.
+    let f3 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
+    f3.payload.assertPing(ack: true)
+
+    XCTAssertNil(try client.readInbound(as: HTTP2Frame.self))
+  }
+}
+
+extension HTTP2Frame.FramePayload {
+  func assertSettings(ack: Bool, file: StaticString = #file, line: UInt = #line) {
+    switch self {
+    case let .settings(settings):
+      switch settings {
+      case .ack:
+        XCTAssertTrue(ack, file: file, line: line)
+      case .settings:
+        XCTAssertFalse(ack, file: file, line: line)
+      }
+    default:
+      XCTFail("Expected .settings got \(self)", file: file, line: line)
+    }
+  }
+
+  func assertPing(ack: Bool, file: StaticString = #file, line: UInt = #line) {
+    switch self {
+    case let .ping(_, ack: pingAck):
+      XCTAssertEqual(pingAck, ack, file: file, line: line)
+    default:
+      XCTFail("Expected .ping got \(self)", file: file, line: line)
+    }
+  }
+}