|
|
@@ -23,7 +23,7 @@ enum ClientConnectionEvent: Sendable, Hashable {
|
|
|
/// The server sent a GOAWAY frame to the client.
|
|
|
case goAway(HTTP2ErrorCode, String)
|
|
|
/// The keep alive timer fired and subsequently timed out.
|
|
|
- case keepAliveExpired
|
|
|
+ case keepaliveExpired
|
|
|
/// The connection became idle.
|
|
|
case idle
|
|
|
/// The local peer initiated the close.
|
|
|
@@ -64,14 +64,14 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
private var maxIdleTimer: Timer?
|
|
|
|
|
|
/// The amount of time to wait before sending a keep alive ping.
|
|
|
- private var keepAliveTimer: Timer?
|
|
|
+ private var keepaliveTimer: Timer?
|
|
|
|
|
|
/// The amount of time the client has to reply after sending a keep alive ping. Only used if
|
|
|
- /// `keepAliveTimer` is set.
|
|
|
- private var keepAliveTimeoutTimer: Timer
|
|
|
+ /// `keepaliveTimer` is set.
|
|
|
+ private var keepaliveTimeoutTimer: Timer
|
|
|
|
|
|
/// Opaque data sent in keep alive pings.
|
|
|
- private let keepAlivePingData: HTTP2PingData
|
|
|
+ private let keepalivePingData: HTTP2PingData
|
|
|
|
|
|
/// The current state of the connection.
|
|
|
private var state: StateMachine
|
|
|
@@ -87,26 +87,26 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
/// - Parameters:
|
|
|
/// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
|
|
|
/// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
|
|
|
- /// - keepAliveTime: The amount of time to wait after reading data before sending a keep-alive
|
|
|
+ /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
|
|
|
/// ping.
|
|
|
- /// - keepAliveTimeout: The amount of time the client has to reply after the server sends a
|
|
|
+ /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
|
|
|
/// keep-alive ping to keep the connection open. The connection is closed if no reply
|
|
|
/// is received.
|
|
|
- /// - keepAliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
|
|
|
+ /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
|
|
|
/// in progress.
|
|
|
init(
|
|
|
eventLoop: EventLoop,
|
|
|
maxIdleTime: TimeAmount?,
|
|
|
- keepAliveTime: TimeAmount?,
|
|
|
- keepAliveTimeout: TimeAmount?,
|
|
|
- keepAliveWithoutCalls: Bool
|
|
|
+ keepaliveTime: TimeAmount?,
|
|
|
+ keepaliveTimeout: TimeAmount?,
|
|
|
+ keepaliveWithoutCalls: Bool
|
|
|
) {
|
|
|
self.eventLoop = eventLoop
|
|
|
self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
|
|
|
- self.keepAliveTimer = keepAliveTime.map { Timer(delay: $0, repeat: true) }
|
|
|
- self.keepAliveTimeoutTimer = Timer(delay: keepAliveTimeout ?? .seconds(20))
|
|
|
- self.keepAlivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
|
|
|
- self.state = StateMachine(allowKeepAliveWithoutCalls: keepAliveWithoutCalls)
|
|
|
+ self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
|
|
|
+ self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
|
|
|
+ self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
|
|
|
+ self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
|
|
|
|
|
|
self.flushPending = false
|
|
|
self.inReadLoop = false
|
|
|
@@ -117,8 +117,8 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
}
|
|
|
|
|
|
func channelActive(context: ChannelHandlerContext) {
|
|
|
- self.keepAliveTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.keepAliveTimerFired(context: context)
|
|
|
+ self.keepaliveTimer?.schedule(on: context.eventLoop) {
|
|
|
+ self.keepaliveTimerFired(context: context)
|
|
|
}
|
|
|
|
|
|
self.maxIdleTimer?.schedule(on: context.eventLoop) {
|
|
|
@@ -133,8 +133,8 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
case .succeed(let promise):
|
|
|
promise.succeed()
|
|
|
}
|
|
|
- self.keepAliveTimer?.cancel()
|
|
|
- self.keepAliveTimeoutTimer.cancel()
|
|
|
+ self.keepaliveTimer?.cancel()
|
|
|
+ self.keepaliveTimeoutTimer.cancel()
|
|
|
}
|
|
|
|
|
|
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
|
|
|
@@ -146,15 +146,15 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
|
|
|
case let event as StreamClosedEvent:
|
|
|
switch self.state.streamClosed(event.streamID) {
|
|
|
- case .startIdleTimer(let cancelKeepAlive):
|
|
|
+ case .startIdleTimer(let cancelKeepalive):
|
|
|
// All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
|
|
|
// not stop if keep-alive is allowed when there are no active calls).
|
|
|
self.maxIdleTimer?.schedule(on: context.eventLoop) {
|
|
|
self.maxIdleTimerFired(context: context)
|
|
|
}
|
|
|
|
|
|
- if cancelKeepAlive {
|
|
|
- self.keepAliveTimer?.cancel()
|
|
|
+ if cancelKeepalive {
|
|
|
+ self.keepaliveTimer?.cancel()
|
|
|
}
|
|
|
|
|
|
case .close:
|
|
|
@@ -200,10 +200,10 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
case .ping(let data, let ack):
|
|
|
// Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
|
|
|
// particular only those carrying the keep-alive data.
|
|
|
- if ack, data == self.keepAlivePingData {
|
|
|
- self.keepAliveTimeoutTimer.cancel()
|
|
|
- self.keepAliveTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.keepAliveTimerFired(context: context)
|
|
|
+ if ack, data == self.keepalivePingData {
|
|
|
+ self.keepaliveTimeoutTimer.cancel()
|
|
|
+ self.keepaliveTimer?.schedule(on: context.eventLoop) {
|
|
|
+ self.keepaliveTimerFired(context: context)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -258,27 +258,27 @@ extension ClientConnectionHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func keepAliveTimerFired(context: ChannelHandlerContext) {
|
|
|
- guard self.state.sendKeepAlivePing() else { return }
|
|
|
+ private func keepaliveTimerFired(context: ChannelHandlerContext) {
|
|
|
+ guard self.state.sendKeepalivePing() else { return }
|
|
|
|
|
|
// Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
|
|
|
// is acknowledged.
|
|
|
- self.keepAliveTimer?.cancel()
|
|
|
+ self.keepaliveTimer?.cancel()
|
|
|
|
|
|
- let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepAlivePingData, ack: false))
|
|
|
+ let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
|
|
|
context.write(self.wrapOutboundOut(ping), promise: nil)
|
|
|
self.maybeFlush(context: context)
|
|
|
|
|
|
// Schedule a timeout on waiting for the response.
|
|
|
- self.keepAliveTimeoutTimer.schedule(on: context.eventLoop) {
|
|
|
- self.keepAliveTimeoutExpired(context: context)
|
|
|
+ self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
|
|
|
+ self.keepaliveTimeoutExpired(context: context)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func keepAliveTimeoutExpired(context: ChannelHandlerContext) {
|
|
|
+ private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
|
|
|
guard self.state.beginClosing() else { return }
|
|
|
|
|
|
- context.fireChannelRead(self.wrapInboundOut(.closing(.keepAliveExpired)))
|
|
|
+ context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
|
|
|
self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
|
|
|
context.close(promise: nil)
|
|
|
}
|
|
|
@@ -321,29 +321,29 @@ extension ClientConnectionHandler {
|
|
|
|
|
|
struct Active {
|
|
|
var openStreams: Set<HTTP2StreamID>
|
|
|
- var allowKeepAliveWithoutCalls: Bool
|
|
|
+ var allowKeepaliveWithoutCalls: Bool
|
|
|
|
|
|
- init(allowKeepAliveWithoutCalls: Bool) {
|
|
|
+ init(allowKeepaliveWithoutCalls: Bool) {
|
|
|
self.openStreams = []
|
|
|
- self.allowKeepAliveWithoutCalls = allowKeepAliveWithoutCalls
|
|
|
+ self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
|
|
|
}
|
|
|
}
|
|
|
|
|
|
struct Closing {
|
|
|
- var allowKeepAliveWithoutCalls: Bool
|
|
|
+ var allowKeepaliveWithoutCalls: Bool
|
|
|
var openStreams: Set<HTTP2StreamID>
|
|
|
var closePromise: Optional<EventLoopPromise<Void>>
|
|
|
|
|
|
init(from state: Active, closePromise: EventLoopPromise<Void>?) {
|
|
|
self.openStreams = state.openStreams
|
|
|
- self.allowKeepAliveWithoutCalls = state.allowKeepAliveWithoutCalls
|
|
|
+ self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
|
|
|
self.closePromise = closePromise
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- init(allowKeepAliveWithoutCalls: Bool) {
|
|
|
- self.state = .active(State.Active(allowKeepAliveWithoutCalls: allowKeepAliveWithoutCalls))
|
|
|
+ init(allowKeepaliveWithoutCalls: Bool) {
|
|
|
+ self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
|
|
|
}
|
|
|
|
|
|
/// Record that the stream with the given ID has been opened.
|
|
|
@@ -366,7 +366,7 @@ extension ClientConnectionHandler {
|
|
|
|
|
|
enum OnStreamClosed: Equatable {
|
|
|
/// Start the idle timer, after which the connection should be closed gracefully.
|
|
|
- case startIdleTimer(cancelKeepAlive: Bool)
|
|
|
+ case startIdleTimer(cancelKeepalive: Bool)
|
|
|
/// Close the connection.
|
|
|
case close
|
|
|
/// Do nothing.
|
|
|
@@ -382,7 +382,7 @@ extension ClientConnectionHandler {
|
|
|
let removedID = state.openStreams.remove(id)
|
|
|
assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
|
|
|
if state.openStreams.isEmpty {
|
|
|
- onStreamClosed = .startIdleTimer(cancelKeepAlive: !state.allowKeepAliveWithoutCalls)
|
|
|
+ onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
|
|
|
} else {
|
|
|
onStreamClosed = .none
|
|
|
}
|
|
|
@@ -402,21 +402,21 @@ extension ClientConnectionHandler {
|
|
|
}
|
|
|
|
|
|
/// Returns whether a keep alive ping should be sent to the server.
|
|
|
- mutating func sendKeepAlivePing() -> Bool {
|
|
|
- let sendKeepAlivePing: Bool
|
|
|
+ mutating func sendKeepalivePing() -> Bool {
|
|
|
+ let sendKeepalivePing: Bool
|
|
|
|
|
|
// Only send a ping if there are open streams or there are no open streams and keep alive
|
|
|
// is permitted when there are no active calls.
|
|
|
switch self.state {
|
|
|
case .active(let state):
|
|
|
- sendKeepAlivePing = !state.openStreams.isEmpty || state.allowKeepAliveWithoutCalls
|
|
|
+ sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
|
|
|
case .closing(let state):
|
|
|
- sendKeepAlivePing = !state.openStreams.isEmpty || state.allowKeepAliveWithoutCalls
|
|
|
+ sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
|
|
|
case .closed:
|
|
|
- sendKeepAlivePing = false
|
|
|
+ sendKeepalivePing = false
|
|
|
}
|
|
|
|
|
|
- return sendKeepAlivePing
|
|
|
+ return sendKeepalivePing
|
|
|
}
|
|
|
|
|
|
enum OnGracefulShutDown: Equatable {
|