Browse Source

Add a server-idle handler (#818)

Motivation:

Connections should be dropped to save resources if there are not RPCs
for an idle timeout. This was added for clients in #798; this adds
similart functionality to the server.

Modifications:

- Rename a `ClientConnectivityHandler` to `GRPCIdleHandler`
- Add a 'mode' to the idle handler

Note: timeout configuration will be added for client and server in a
followup PR.

Result:

- Server's will drop connections if the client isn't doing anything.
- Resolves #706
George Barnett 5 years ago
parent
commit
4dd5ee2b6e

+ 1 - 1
Sources/GRPC/ClientConnection.swift

@@ -346,7 +346,7 @@ extension Channel {
     }.flatMap { _ in
       return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
         self.pipeline.addHandler(
-          ClientConnectivityHandler(connectionManager: connectionManager),
+          GRPCIdleHandler(mode: .client(connectionManager)),
           position: .after(http2Handler)
         )
       }.flatMap {

+ 58 - 21
Sources/GRPC/ClientConnectivityHandler.swift → Sources/GRPC/GRPCIdleHandler.swift

@@ -16,14 +16,29 @@
 import NIO
 import NIOHTTP2
 
-internal class ClientConnectivityHandler: ChannelInboundHandler {
+internal class GRPCIdleHandler: ChannelInboundHandler {
   typealias InboundIn = HTTP2Frame
 
-  private var connectionManager: ConnectionManager
+  /// The amount of time to wait before closing the channel when there are no active streams.
   private let idleTimeout: TimeAmount
 
+  /// The number of active streams.
   private var activeStreams = 0
+
+  /// The scheduled task which will close the channel.
   private var scheduledIdle: Scheduled<Void>? = nil
+
+  /// Client and server have slightly different behaviours; track which we are following.
+  private var mode: Mode
+
+  /// The mode of operation: the client tracks additional connection state in the connection
+  /// manager.
+  internal enum Mode {
+    case client(ConnectionManager)
+    case server
+  }
+
+  /// The current connection state.
   private var state: State = .notReady
 
   private enum State {
@@ -37,8 +52,8 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
     case closed
   }
 
-  init(connectionManager: ConnectionManager, idleTimeout: TimeAmount = .minutes(5)) {
-    self.connectionManager = connectionManager
+  init(mode: Mode, idleTimeout: TimeAmount = .minutes(5)) {
+    self.mode = mode
     self.idleTimeout = idleTimeout
   }
 
@@ -67,10 +82,15 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
   }
 
   func channelActive(context: ChannelHandlerContext) {
-    switch self.state {
-    case .notReady:
-      self.connectionManager.channelActive(channel: context.channel)
-    case .ready, .closed:
+    switch (self.mode, self.state) {
+    // The client should become active: we'll only schedule the idling when the channel
+    // becomes 'ready'.
+    case (.client(let manager), .notReady):
+      manager.channelActive(channel: context.channel)
+
+    case (.server, .notReady),
+         (_, .ready),
+         (_, .closed):
       ()
     }
 
@@ -81,11 +101,16 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
     self.scheduledIdle?.cancel()
     self.scheduledIdle = nil
 
-    switch self.state {
-    case .notReady, .ready:
-      self.connectionManager.channelInactive()
-    case .closed:
+    switch (self.mode, self.state) {
+    case (.client(let manager), .notReady),
+         (.client(let manager), .ready):
+      manager.channelInactive()
+
+    case (.server, .notReady),
+         (.server, .ready),
+         (_, .closed):
       ()
+
     }
 
     context.fireChannelInactive()
@@ -100,18 +125,24 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
       case (.notReady, .settings):
         self.state = .ready
 
-        let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
-        self.connectionManager.logger.info("gRPC connection ready", metadata: [
-          "remote_address": "\(remoteAddressDescription)",
-          "event_loop": "\(context.eventLoop)"
-        ])
+        switch self.mode {
+        case .client(let manager):
+          let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
+          manager.logger.info("gRPC connection ready", metadata: [
+            "remote_address": "\(remoteAddressDescription)",
+            "event_loop": "\(context.eventLoop)"
+          ])
+
+          // Let the manager know we're ready.
+          manager.ready()
+
+        case .server:
+          ()
+        }
 
         // Start the idle timeout.
         self.scheduleIdleTimeout(context: context)
 
-        // Let the manager know we're ready.
-        self.connectionManager.ready()
-
       case (.notReady, .goAway),
            (.ready, .goAway):
         self.idle(context: context)
@@ -140,7 +171,13 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
     }
 
     self.state = .closed
-    self.connectionManager.idle()
+    switch self.mode {
+    case .client(let manager):
+      manager.idle()
+    case .server:
+      ()
+    }
+
     context.close(mode: .all, promise: nil)
   }
 }

+ 3 - 1
Sources/GRPC/HTTPProtocolSwitcher.swift

@@ -144,8 +144,10 @@ extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
         ) { (streamChannel, streamID) in
             streamChannel.pipeline.addHandler(HTTP2ToHTTP1ServerCodec(streamID: streamID, normalizeHTTPHeaders: true))
               .flatMap { self.handlersInitializer(streamChannel) }
+          }.flatMap { multiplexer in
+            // Add an idle handler between the two HTTP2 handlers.
+            context.channel.pipeline.addHandler(GRPCIdleHandler(mode: .server), position: .before(multiplexer))
           }
-          .map { _ in }
           .cascade(to: pipelineConfigured)
       }
 

+ 10 - 10
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -116,7 +116,7 @@ extension ConnectionManagerTests {
 
     // Setup the real channel and activate it.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -150,7 +150,7 @@ extension ConnectionManagerTests {
 
     // Setup the channel.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -193,7 +193,7 @@ extension ConnectionManagerTests {
 
     // Setup the channel.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -249,7 +249,7 @@ extension ConnectionManagerTests {
 
     // Setup the channel.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -305,7 +305,7 @@ extension ConnectionManagerTests {
 
     // Setup the actual channel and complete the promise.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -401,7 +401,7 @@ extension ConnectionManagerTests {
 
     // Prepare the channel
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)
@@ -461,7 +461,7 @@ extension ConnectionManagerTests {
 
     // Prepare the channel
     let firstChannel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(firstChannel)
@@ -521,7 +521,7 @@ extension ConnectionManagerTests {
 
     // Prepare the first channel
     let firstChannel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     firstChannelPromise.succeed(firstChannel)
@@ -548,7 +548,7 @@ extension ConnectionManagerTests {
 
     // Prepare the second channel
     let secondChannel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     secondChannelPromise.succeed(secondChannel)
@@ -582,7 +582,7 @@ extension ConnectionManagerTests {
 
     // Setup the channel.
     let channel = EmbeddedChannel(
-      handler: ClientConnectivityHandler(connectionManager: manager),
+      handler: GRPCIdleHandler(mode: .client(manager)),
       loop: self.loop
     )
     channelPromise.succeed(channel)