|
|
@@ -26,6 +26,9 @@ internal class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
/// The number of active streams.
|
|
|
private var activeStreams = 0
|
|
|
|
|
|
+ /// The maximum number of streams we may create concurrently on this connection.
|
|
|
+ private var maxConcurrentStreams: Int?
|
|
|
+
|
|
|
/// The scheduled task which will close the channel.
|
|
|
private var scheduledIdle: Scheduled<Void>?
|
|
|
|
|
|
@@ -75,7 +78,14 @@ internal class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
MetadataKey.h2StreamID: "\(created.streamID)",
|
|
|
MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
|
|
|
])
|
|
|
+
|
|
|
+ if self.activeStreams == self.maxConcurrentStreams {
|
|
|
+ self.logger.warning("HTTP2 max concurrent stream limit reached", metadata: [
|
|
|
+ MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
|
|
|
+ ])
|
|
|
+ }
|
|
|
} else if let closed = event as? StreamClosedEvent {
|
|
|
+ let droppingBelowMaxConcurrentStreamLimit = self.maxConcurrentStreams == self.activeStreams
|
|
|
self.activeStreams -= 1
|
|
|
|
|
|
self.logger.debug("HTTP2 stream closed", metadata: [
|
|
|
@@ -83,6 +93,13 @@ internal class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
|
|
|
])
|
|
|
|
|
|
+ if droppingBelowMaxConcurrentStreamLimit {
|
|
|
+ self.logger.notice(
|
|
|
+ "HTTP2 active stream count fell below max concurrent stream limit",
|
|
|
+ metadata: [MetadataKey.h2ActiveStreams: "\(self.activeStreams)"]
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
// No active streams: go idle soon.
|
|
|
if self.activeStreams == 0 {
|
|
|
self.scheduleIdleTimeout(context: context)
|
|
|
@@ -154,43 +171,58 @@ internal class GRPCIdleHandler: ChannelInboundHandler {
|
|
|
let frame = self.unwrapInboundIn(data)
|
|
|
|
|
|
if frame.streamID == .rootStream {
|
|
|
- switch (self.state, frame.payload) {
|
|
|
- // We only care about SETTINGS as long as we are in state `.notReady`.
|
|
|
- case let (.notReady, .settings(content)):
|
|
|
- self.state = .ready
|
|
|
-
|
|
|
- switch self.mode {
|
|
|
- case let .client(manager):
|
|
|
- let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
|
|
|
- manager.logger.info("gRPC connection ready", metadata: [
|
|
|
- MetadataKey.remoteAddress: "\(remoteAddressDescription)",
|
|
|
- MetadataKey.eventLoop: "\(context.eventLoop)",
|
|
|
- ])
|
|
|
+ switch frame.payload {
|
|
|
+ case let .settings(.settings(settings)):
|
|
|
+ // Log any changes to HTTP/2 settings.
|
|
|
+ self.logger.debug(
|
|
|
+ "HTTP2 settings update",
|
|
|
+ metadata: Dictionary(settings.map {
|
|
|
+ ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
|
|
|
+ }, uniquingKeysWith: { a, _ in a })
|
|
|
+ )
|
|
|
+
|
|
|
+ let maxConcurrentStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })
|
|
|
+ if let maxConcurrentStreams = maxConcurrentStreams?.value {
|
|
|
+ self.maxConcurrentStreams = maxConcurrentStreams
|
|
|
+ }
|
|
|
|
|
|
- // Let the manager know we're ready.
|
|
|
- manager.ready()
|
|
|
+ switch self.state {
|
|
|
+ case .notReady:
|
|
|
+ // This must be the initial settings frame, we can move to the ready state now.
|
|
|
+ self.state = .ready
|
|
|
|
|
|
- case .server:
|
|
|
+ switch self.mode {
|
|
|
+ case let .client(manager):
|
|
|
+ let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
|
|
|
+ manager.logger.info("gRPC connection ready", metadata: [
|
|
|
+ MetadataKey.remoteAddress: "\(remoteAddressDescription)",
|
|
|
+ MetadataKey.eventLoop: "\(context.eventLoop)",
|
|
|
+ ])
|
|
|
+
|
|
|
+ // Let the manager know we're ready.
|
|
|
+ manager.ready()
|
|
|
+
|
|
|
+ case .server:
|
|
|
+ ()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start the idle timeout.
|
|
|
+ self.scheduleIdleTimeout(context: context)
|
|
|
+
|
|
|
+ default:
|
|
|
()
|
|
|
}
|
|
|
|
|
|
- if case let .settings(settings) = content {
|
|
|
- self.logger.debug(
|
|
|
- "received initial HTTP2 settings",
|
|
|
- metadata: Dictionary(settings.map {
|
|
|
- ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
|
|
|
- }, uniquingKeysWith: { a, _ in a })
|
|
|
- )
|
|
|
+ case .goAway:
|
|
|
+ switch self.state {
|
|
|
+ case .ready, .notReady:
|
|
|
+ self.idle(context: context)
|
|
|
+ case .closed:
|
|
|
+ ()
|
|
|
}
|
|
|
|
|
|
- // Start the idle timeout.
|
|
|
- self.scheduleIdleTimeout(context: context)
|
|
|
-
|
|
|
- case (.notReady, .goAway),
|
|
|
- (.ready, .goAway):
|
|
|
- self.idle(context: context)
|
|
|
-
|
|
|
default:
|
|
|
+ // Ignore all other frame types.
|
|
|
()
|
|
|
}
|
|
|
}
|