|
|
@@ -98,19 +98,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
|
|
|
- guard let context = self.context else {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- let frame = HTTP2Frame(
|
|
|
- streamID: .rootStream,
|
|
|
- payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
|
|
|
- )
|
|
|
-
|
|
|
- context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
|
|
|
- }
|
|
|
-
|
|
|
private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
|
|
|
// Prod the connection manager.
|
|
|
if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
|
|
|
@@ -137,11 +124,17 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
if let idleTask = operations.idleTask {
|
|
|
switch idleTask {
|
|
|
case let .cancel(task):
|
|
|
+ self.stateMachine.logger.debug("idle timeout task cancelled")
|
|
|
task.cancel()
|
|
|
|
|
|
case .schedule:
|
|
|
if self.idleTimeout != .nanoseconds(.max), let context = self.context {
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "scheduling idle timeout task",
|
|
|
+ metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"]
|
|
|
+ )
|
|
|
let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
|
|
|
+ self.stateMachine.logger.debug("idle timeout task fired")
|
|
|
self.idleTimeoutFired()
|
|
|
}
|
|
|
self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
|
|
|
@@ -151,6 +144,13 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
|
|
|
// Send a GOAWAY frame.
|
|
|
if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "sending GOAWAY frame",
|
|
|
+ metadata: [
|
|
|
+ MetadataKey.h2GoAwayLastStreamID: "\(Int(streamID))"
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
let goAwayFrame = HTTP2Frame(
|
|
|
streamID: .rootStream,
|
|
|
payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
|
|
|
@@ -175,6 +175,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
// Close on the next event-loop tick so we don't drop any events which are
|
|
|
// currently being processed.
|
|
|
context.eventLoop.execute {
|
|
|
+ self.stateMachine.logger.debug("closing connection")
|
|
|
context.close(mode: .all, promise: nil)
|
|
|
}
|
|
|
}
|
|
|
@@ -186,8 +187,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
()
|
|
|
|
|
|
case .ack:
|
|
|
- // NIO's HTTP2 handler acks for us so this is a no-op.
|
|
|
- ()
|
|
|
+ // NIO's HTTP2 handler acks for us so this is a no-op. Log so it doesn't appear that we are
|
|
|
+ // ignoring pings.
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "sending PING frame",
|
|
|
+ metadata: [MetadataKey.h2PingAck: "true"]
|
|
|
+ )
|
|
|
|
|
|
case .cancelScheduledTimeout:
|
|
|
self.scheduledClose?.cancel()
|
|
|
@@ -197,6 +202,15 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
self.schedulePing(in: delay, timeout: timeout)
|
|
|
|
|
|
case let .reply(framePayload):
|
|
|
+ switch framePayload {
|
|
|
+ case .ping(_, let ack):
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "sending PING frame",
|
|
|
+ metadata: [MetadataKey.h2PingAck: "\(ack)"]
|
|
|
+ )
|
|
|
+ default:
|
|
|
+ ()
|
|
|
+ }
|
|
|
let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
|
|
|
self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
|
|
|
|
|
|
@@ -210,6 +224,11 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "scheduled keepalive pings",
|
|
|
+ metadata: [MetadataKey.intervalMs: "\(delay.milliseconds)"]
|
|
|
+ )
|
|
|
+
|
|
|
self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
|
|
|
initialDelay: delay,
|
|
|
delay: delay
|
|
|
@@ -226,6 +245,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
|
|
|
private func scheduleClose(in timeout: TimeAmount) {
|
|
|
self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
|
|
|
+ self.stateMachine.logger.debug("keepalive timer expired")
|
|
|
self.perform(operations: self.stateMachine.shutdownNow())
|
|
|
}
|
|
|
}
|
|
|
@@ -318,6 +338,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
case let .settings(.settings(settings)):
|
|
|
self.perform(operations: self.stateMachine.receiveSettings(settings))
|
|
|
case let .ping(data, ack):
|
|
|
+ self.stateMachine.logger.debug(
|
|
|
+ "received PING frame",
|
|
|
+ metadata: [MetadataKey.h2PingAck: "\(ack)"]
|
|
|
+ )
|
|
|
self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
|
|
|
default:
|
|
|
// We're not interested in other events.
|
|
|
@@ -350,3 +374,9 @@ extension HTTP2SettingsParameter {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+extension TimeAmount {
|
|
|
+ fileprivate var milliseconds: Int64 {
|
|
|
+ self.nanoseconds / 1_000_000
|
|
|
+ }
|
|
|
+}
|