Browse Source

Merge idle and keepalive handlers (#1079)

Motivation:

Both client and the server have two handlers between the two HTTP/2
handlers which inspect various HTTP/2 frames and stream creation/close
events. Each handler has to unwrap NIOAny, which isn't special-cased for
HTTP2Frame, and cast any user inbound events to check whether they are
events they're interested in. This has is duplicated work which we can easily
avoid on such a hot path.

Modifications:

- Move the ping handler into the idle handler
- Update tests

Result:

A 4% drop in instructions in the unary_10k_small_requests benchmark.
George Barnett 5 years ago
parent
commit
fccd9fb511

+ 6 - 5
Sources/GRPC/ClientConnection.swift

@@ -415,7 +415,7 @@ extension Channel {
   ) -> EventLoopFuture<Void> {
     // We add at most 8 handlers to the pipeline.
     var handlers: [ChannelHandler] = []
-    handlers.reserveCapacity(8)
+    handlers.reserveCapacity(7)
 
     #if canImport(Network)
     // This availability guard is arguably unnecessary, but we add it anyway.
@@ -449,15 +449,16 @@ extension Channel {
     )
 
     handlers.append(NIOHTTP2Handler(mode: .client))
-    handlers.append(GRPCClientKeepaliveHandler(configuration: connectionKeepalive))
     // The multiplexer is passed through the idle handler so it is only reported on
     // successful channel activation - with happy eyeballs multiple pipelines can
     // be constructed so it's not safe to report just yet.
     handlers.append(
       GRPCIdleHandler(
-        mode: .client(connectionManager, h2Multiplexer),
-        logger: logger,
-        idleTimeout: connectionIdleTimeout
+        connectionManager: connectionManager,
+        multiplexer: h2Multiplexer,
+        idleTimeout: connectionIdleTimeout,
+        keepalive: connectionKeepalive,
+        logger: logger
       )
     )
     handlers.append(h2Multiplexer)

+ 95 - 11
Sources/GRPC/GRPCIdleHandler.swift

@@ -24,6 +24,15 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   /// The amount of time to wait before closing the channel when there are no active streams.
   private let idleTimeout: TimeAmount
 
+  /// The ping handler.
+  private var pingHandler: PingHandler
+
+  /// The scheduled task which will close the connection after the keep-alive timeout has expired.
+  private var scheduledClose: Scheduled<Void>?
+
+  /// The scheduled task which will ping.
+  private var scheduledPing: RepeatedTask?
+
   /// The mode we're operating in.
   private let mode: Mode
 
@@ -51,17 +60,46 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   /// The current state.
   private var stateMachine: GRPCIdleHandlerStateMachine
 
-  init(mode: Mode, logger: Logger, idleTimeout: TimeAmount) {
-    self.mode = mode
+  init(
+    connectionManager: ConnectionManager,
+    multiplexer: HTTP2StreamMultiplexer,
+    idleTimeout: TimeAmount,
+    keepalive configuration: ClientConnectionKeepalive,
+    logger: Logger
+  ) {
+    self.mode = .client(connectionManager, multiplexer)
     self.idleTimeout = idleTimeout
+    self.stateMachine = .init(role: .client, logger: logger)
+    self.pingHandler = PingHandler(
+      pingCode: 5,
+      interval: configuration.interval,
+      timeout: configuration.timeout,
+      permitWithoutCalls: configuration.permitWithoutCalls,
+      maximumPingsWithoutData: configuration.maximumPingsWithoutData,
+      minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
+    )
     self.logger = logger
+  }
 
-    switch mode {
-    case .client:
-      self.stateMachine = .init(role: .client, logger: logger)
-    case .server:
-      self.stateMachine = .init(role: .server, logger: logger)
-    }
+  init(
+    idleTimeout: TimeAmount,
+    keepalive configuration: ServerConnectionKeepalive,
+    logger: Logger
+  ) {
+    self.mode = .server
+    self.stateMachine = .init(role: .server, logger: logger)
+    self.idleTimeout = idleTimeout
+    self.pingHandler = PingHandler(
+      pingCode: 10,
+      interval: configuration.interval,
+      timeout: configuration.timeout,
+      permitWithoutCalls: configuration.permitWithoutCalls,
+      maximumPingsWithoutData: configuration.maximumPingsWithoutData,
+      minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
+      minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
+      maximumPingStrikes: configuration.maximumPingStrikes
+    )
+    self.logger = logger
   }
 
   private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
@@ -123,6 +161,47 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     }
   }
 
+  private func handlePingAction(_ action: PingHandler.Action) {
+    switch action {
+    case .none:
+      ()
+
+    case .cancelScheduledTimeout:
+      self.scheduledClose?.cancel()
+      self.scheduledClose = nil
+
+    case let .schedulePing(delay, timeout):
+      self.schedulePing(in: delay, timeout: timeout)
+
+    case let .reply(framePayload):
+      let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
+      self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
+    }
+  }
+
+  private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
+    guard delay != .nanoseconds(.max) else {
+      return
+    }
+
+    self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
+      initialDelay: delay,
+      delay: delay
+    ) { _ in
+      self.handlePingAction(self.pingHandler.pingFired())
+      // `timeout` is less than `interval`, guaranteeing that the close task
+      // will be fired before a new ping is triggered.
+      assert(timeout < delay, "`timeout` must be less than `interval`")
+      self.scheduleClose(in: timeout)
+    }
+  }
+
+  private func scheduleClose(in timeout: TimeAmount) {
+    self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
+      self.perform(operations: self.stateMachine.shutdownNow())
+    }
+  }
+
   private func idleTimeoutFired() {
     self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
   }
@@ -138,16 +217,15 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
     if let created = event as? NIOHTTP2StreamCreatedEvent {
       self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
+      self.handlePingAction(self.pingHandler.streamCreated())
       context.fireUserInboundEventTriggered(event)
     } else if let closed = event as? StreamClosedEvent {
       self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
+      self.handlePingAction(self.pingHandler.streamClosed())
       context.fireUserInboundEventTriggered(event)
     } else if event is ChannelShouldQuiesceEvent {
       self.perform(operations: self.stateMachine.initiateGracefulShutdown())
       // Swallow this event.
-    } else if event is ConnectionIdledEvent {
-      self.perform(operations: self.stateMachine.shutdownNow())
-      // Swallow this event.
     } else {
       context.fireUserInboundEventTriggered(event)
     }
@@ -172,6 +250,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
 
   func channelInactive(context: ChannelHandlerContext) {
     self.perform(operations: self.stateMachine.channelInactive())
+    self.scheduledPing?.cancel()
+    self.scheduledClose?.cancel()
+    self.scheduledPing = nil
+    self.scheduledClose = nil
     context.fireChannelInactive()
   }
 
@@ -183,6 +265,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
       self.perform(operations: self.stateMachine.receiveGoAway())
     case let .settings(.settings(settings)):
       self.perform(operations: self.stateMachine.receiveSettings(settings))
+    case let .ping(data, ack):
+      self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
     default:
       // We're not interested in other events.
       ()

+ 2 - 146
Sources/GRPC/GRPCKeepaliveHandlers.swift

@@ -16,151 +16,6 @@
 import NIO
 import NIOHTTP2
 
-/// Provides keepalive pings.
-///
-/// The logic is determined by the gRPC keepalive
-/// [documentation] (https://github.com/grpc/grpc/blob/master/doc/keepalive.md).
-internal class GRPCClientKeepaliveHandler: ChannelInboundHandler, _ChannelKeepaliveHandler {
-  typealias InboundIn = HTTP2Frame
-  typealias OutboundOut = HTTP2Frame
-
-  init(configuration: ClientConnectionKeepalive) {
-    self.pingHandler = PingHandler(
-      pingCode: 5,
-      interval: configuration.interval,
-      timeout: configuration.timeout,
-      permitWithoutCalls: configuration.permitWithoutCalls,
-      maximumPingsWithoutData: configuration.maximumPingsWithoutData,
-      minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
-    )
-  }
-
-  /// The ping handler.
-  var pingHandler: PingHandler
-
-  /// The scheduled task which will ping.
-  var scheduledPing: RepeatedTask?
-
-  /// The scheduled task which will close the connection.
-  var scheduledClose: Scheduled<Void>?
-}
-
-internal class GRPCServerKeepaliveHandler: ChannelInboundHandler, _ChannelKeepaliveHandler {
-  typealias InboundIn = HTTP2Frame
-  typealias OutboundOut = HTTP2Frame
-
-  init(configuration: ServerConnectionKeepalive) {
-    self.pingHandler = PingHandler(
-      pingCode: 10,
-      interval: configuration.interval,
-      timeout: configuration.timeout,
-      permitWithoutCalls: configuration.permitWithoutCalls,
-      maximumPingsWithoutData: configuration.maximumPingsWithoutData,
-      minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
-      minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
-      maximumPingStrikes: configuration.maximumPingStrikes
-    )
-  }
-
-  /// The ping handler.
-  var pingHandler: PingHandler
-
-  /// The scheduled task which will ping.
-  var scheduledPing: RepeatedTask?
-
-  /// The scheduled task which will close the connection.
-  var scheduledClose: Scheduled<Void>?
-}
-
-protocol _ChannelKeepaliveHandler: ChannelInboundHandler where OutboundOut == HTTP2Frame,
-  InboundIn == HTTP2Frame {
-  var pingHandler: PingHandler { get set }
-  var scheduledPing: RepeatedTask? { get set }
-  var scheduledClose: Scheduled<Void>? { get set }
-}
-
-extension _ChannelKeepaliveHandler {
-  func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
-    if event is NIOHTTP2StreamCreatedEvent {
-      self.perform(action: self.pingHandler.streamCreated(), context: context)
-    } else if event is StreamClosedEvent {
-      self.perform(action: self.pingHandler.streamClosed(), context: context)
-    }
-
-    context.fireUserInboundEventTriggered(event)
-  }
-
-  func channelRead(context: ChannelHandlerContext, data: NIOAny) {
-    switch self.unwrapInboundIn(data).payload {
-    case let .ping(pingData, ack: ack):
-      self.perform(action: self.pingHandler.read(pingData: pingData, ack: ack), context: context)
-    default:
-      break
-    }
-
-    context.fireChannelRead(data)
-  }
-
-  func channelInactive(context: ChannelHandlerContext) {
-    self.cancelScheduledPing()
-    self.cancelScheduledTimeout()
-    context.fireChannelInactive()
-  }
-
-  func handlerRemoved(context: ChannelHandlerContext) {
-    self.cancelScheduledPing()
-    self.cancelScheduledTimeout()
-  }
-
-  private func perform(action: PingHandler.Action, context: ChannelHandlerContext) {
-    switch action {
-    case let .schedulePing(delay, timeout):
-      self.schedulePing(delay: delay, timeout: timeout, context: context)
-    case .cancelScheduledTimeout:
-      self.cancelScheduledTimeout()
-    case let .reply(payload):
-      self.send(payload: payload, context: context)
-    case .none:
-      break
-    }
-  }
-
-  private func send(payload: HTTP2Frame.FramePayload, context: ChannelHandlerContext) {
-    let frame = self.wrapOutboundOut(.init(streamID: .rootStream, payload: payload))
-    context.writeAndFlush(frame, promise: nil)
-  }
-
-  private func schedulePing(delay: TimeAmount, timeout: TimeAmount,
-                            context: ChannelHandlerContext) {
-    guard delay != .nanoseconds(Int64.max) else { return }
-
-    self.scheduledPing = context.eventLoop
-      .scheduleRepeatedTask(initialDelay: delay, delay: delay) { _ in
-        self.perform(action: self.pingHandler.pingFired(), context: context)
-        // `timeout` is less than `interval`, guaranteeing that the close task
-        // will be fired before a new ping is triggered.
-        assert(timeout < delay, "`timeout` must be less than `interval`")
-        self.scheduleClose(timeout: timeout, context: context)
-      }
-  }
-
-  private func scheduleClose(timeout: TimeAmount, context: ChannelHandlerContext) {
-    self.scheduledClose = context.eventLoop.scheduleTask(in: timeout) {
-      context.fireUserInboundEventTriggered(ConnectionIdledEvent())
-    }
-  }
-
-  private func cancelScheduledPing() {
-    self.scheduledPing?.cancel()
-    self.scheduledPing = nil
-  }
-
-  private func cancelScheduledTimeout() {
-    self.scheduledClose?.cancel()
-    self.scheduledClose = nil
-  }
-}
-
 struct PingHandler {
   /// Code for ping
   private let pingCode: UInt64
@@ -223,7 +78,8 @@ struct PingHandler {
 
   private static let goAwayFrame = HTTP2Frame.FramePayload.goAway(
     lastStreamID: .rootStream,
-    errorCode: .enhanceYourCalm, opaqueData: nil
+    errorCode: .enhanceYourCalm,
+    opaqueData: nil
   )
 
   // For testing only

+ 4 - 10
Sources/GRPC/GRPCServerPipelineConfigurator.swift

@@ -67,17 +67,12 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
     self.configuration = configuration
   }
 
-  /// Makes a gRPC Server keepalive handler.
-  private func makeKeepaliveHandler() -> GRPCServerKeepaliveHandler {
-    return .init(configuration: self.configuration.connectionKeepalive)
-  }
-
   /// Makes a gRPC idle handler for the server..
   private func makeIdleHandler() -> GRPCIdleHandler {
     return .init(
-      mode: .server,
-      logger: self.configuration.logger,
-      idleTimeout: self.configuration.connectionIdleTimeout
+      idleTimeout: self.configuration.connectionIdleTimeout,
+      keepalive: self.configuration.connectionKeepalive,
+      logger: self.configuration.logger
     )
   }
 
@@ -141,9 +136,8 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
     // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
     // to then insert our keepalive and idle handlers between. We can just add everything together.
     var handlers: [ChannelHandler] = []
-    handlers.reserveCapacity(4)
+    handlers.reserveCapacity(3)
     handlers.append(self.makeHTTP2Handler())
-    handlers.append(self.makeKeepaliveHandler())
     handlers.append(self.makeIdleHandler())
     handlers.append(self.makeHTTP2Multiplexer(for: context.channel))
 

+ 60 - 40
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -135,9 +135,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -187,9 +189,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -246,9 +250,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
 
@@ -319,9 +325,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -389,9 +397,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -505,9 +515,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -578,9 +590,11 @@ extension ConnectionManagerTests {
     )
     try firstChannel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
 
@@ -652,9 +666,11 @@ extension ConnectionManagerTests {
     )
     try firstChannel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, firstH2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: firstH2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     firstChannelPromise.succeed(firstChannel)
@@ -698,9 +714,11 @@ extension ConnectionManagerTests {
     )
     try secondChannel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, secondH2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: secondH2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     secondChannelPromise.succeed(secondChannel)
@@ -748,9 +766,11 @@ extension ConnectionManagerTests {
     )
     try channel.pipeline.addHandler(
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: self.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       )
     ).wait()
     channelPromise.succeed(channel)
@@ -905,9 +925,11 @@ extension ConnectionManagerTests {
     )
     XCTAssertNoThrow(try channel.pipeline.addHandlers([
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: manager.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       ),
     ]).wait())
     channelPromise.succeed(channel)
@@ -931,10 +953,6 @@ extension ConnectionManagerTests {
       self.loop.run()
       XCTAssertNoThrow(try shutdown.wait())
     }
-
-    // Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
-    // Previously this would hit a precondition failure.
-    channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
   }
 
   func testCloseWithoutActiveRPCs() throws {
@@ -965,9 +983,11 @@ extension ConnectionManagerTests {
     )
     XCTAssertNoThrow(try channel.pipeline.addHandlers([
       GRPCIdleHandler(
-        mode: .client(manager, h2mux),
-        logger: manager.logger,
-        idleTimeout: .minutes(5)
+        connectionManager: manager,
+        multiplexer: h2mux,
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
       ),
     ]).wait())
     channelPromise.succeed(channel)

+ 1 - 4
Sources/GRPC/ConnectionIdledEvent.swift → Tests/LinuxMain.swift

@@ -13,7 +13,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import NIOHTTP2
-
-/// A `ConnectionIdledEvent` is fired whenever a connection has expired (keepalive).
-struct ConnectionIdledEvent: Hashable {}
+fatalError("Use --enable-test-discovery to run tests on Linux")