|
|
@@ -198,9 +198,10 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
// Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
|
|
|
// particular only those carrying the keep-alive data.
|
|
|
if ack, data == self.keepalivePingData {
|
|
|
+ let loopBound = LoopBoundView(handler: self, context: context)
|
|
|
self.keepaliveTimeoutTimer.cancel()
|
|
|
self.keepaliveTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.keepaliveTimerFired(context: context)
|
|
|
+ loopBound.keepaliveTimerFired()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -211,12 +212,13 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
// becoming active is insufficient as, for example, a TLS handshake may fail after
|
|
|
// establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
|
|
|
if isInitialSettings {
|
|
|
+ let loopBound = LoopBoundView(handler: self, context: context)
|
|
|
self.keepaliveTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.keepaliveTimerFired(context: context)
|
|
|
+ loopBound.keepaliveTimerFired()
|
|
|
}
|
|
|
|
|
|
self.maxIdleTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.maxIdleTimerFired(context: context)
|
|
|
+ loopBound.maxIdleTimerFired()
|
|
|
}
|
|
|
|
|
|
context.fireChannelRead(self.wrapInboundOut(.ready))
|
|
|
@@ -264,6 +266,33 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+extension ClientConnectionHandler {
|
|
|
+ struct LoopBoundView: @unchecked Sendable {
|
|
|
+ private let handler: ClientConnectionHandler
|
|
|
+ private let context: ChannelHandlerContext
|
|
|
+
|
|
|
+ init(handler: ClientConnectionHandler, context: ChannelHandlerContext) {
|
|
|
+ self.handler = handler
|
|
|
+ self.context = context
|
|
|
+ }
|
|
|
+
|
|
|
+ func keepaliveTimerFired() {
|
|
|
+ self.context.eventLoop.assertInEventLoop()
|
|
|
+ self.handler.keepaliveTimerFired(context: self.context)
|
|
|
+ }
|
|
|
+
|
|
|
+ func keepaliveTimeoutExpired() {
|
|
|
+ self.context.eventLoop.assertInEventLoop()
|
|
|
+ self.handler.keepaliveTimeoutExpired(context: self.context)
|
|
|
+ }
|
|
|
+
|
|
|
+ func maxIdleTimerFired() {
|
|
|
+ self.context.eventLoop.assertInEventLoop()
|
|
|
+ self.handler.maxIdleTimerFired(context: self.context)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
extension ClientConnectionHandler {
|
|
|
struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
|
|
|
// @unchecked is okay: the only methods do the appropriate event-loop dance.
|
|
|
@@ -315,8 +344,9 @@ extension ClientConnectionHandler {
|
|
|
case .startIdleTimer(let cancelKeepalive):
|
|
|
// All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
|
|
|
// not stop if keep-alive is allowed when there are no active calls).
|
|
|
+ let loopBound = LoopBoundView(handler: self, context: context)
|
|
|
self.maxIdleTimer?.schedule(on: context.eventLoop) {
|
|
|
- self.maxIdleTimerFired(context: context)
|
|
|
+ loopBound.maxIdleTimerFired()
|
|
|
}
|
|
|
|
|
|
if cancelKeepalive {
|
|
|
@@ -355,8 +385,9 @@ extension ClientConnectionHandler {
|
|
|
self.maybeFlush(context: context)
|
|
|
|
|
|
// Schedule a timeout on waiting for the response.
|
|
|
+ let loopBound = LoopBoundView(handler: self, context: context)
|
|
|
self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
|
|
|
- self.keepaliveTimeoutExpired(context: context)
|
|
|
+ loopBound.keepaliveTimeoutExpired()
|
|
|
}
|
|
|
}
|
|
|
|