Browse Source

Make idle timeout configurable (#824)

* Make idle timeout configurable

Motivation:

We shed idle connections but don't offer the ability to configure the
timeout; we should.

Modifications:

- Make server and client idle timeouts configurable
- Add integ tests to check the timeouts works

Result:

Idle timeouts are configurable, more tests.
George Barnett 5 years ago
parent
commit
0048e6b039

+ 10 - 1
Sources/GRPC/ClientConnection.swift

@@ -240,6 +240,12 @@ extension ClientConnection {
     /// The connection backoff configuration. If no connection retrying is required then this should
     /// be `nil`.
     public var connectionBackoff: ConnectionBackoff?
+
+    /// The amount of time to wait before closing the connection. The idle timeout will start only
+    /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
+    ///
+    /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
+    public var connectionIdleTimeout: TimeAmount
     
     /// The HTTP/2 flow control target window size.
     public var httpTargetWindowSize: Int
@@ -269,6 +275,7 @@ extension ClientConnection {
       connectivityStateDelegate: ConnectivityStateDelegate? = nil,
       tls: Configuration.TLS? = nil,
       connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
+      connectionIdleTimeout: TimeAmount = .minutes(5),
       httpTargetWindowSize: Int = 65535
     ) {
       self.target = target
@@ -277,6 +284,7 @@ extension ClientConnection {
       self.connectivityStateDelegate = connectivityStateDelegate
       self.tls = tls
       self.connectionBackoff = connectionBackoff
+      self.connectionIdleTimeout = connectionIdleTimeout
       self.httpTargetWindowSize = httpTargetWindowSize
     }
   }
@@ -334,6 +342,7 @@ extension Channel {
     tlsConfiguration: TLSConfiguration?,
     tlsServerHostname: String?,
     connectionManager: ConnectionManager,
+    connectionIdleTimeout: TimeAmount,
     errorDelegate: ClientErrorDelegate?,
     logger: Logger
   ) -> EventLoopFuture<Void> {
@@ -346,7 +355,7 @@ extension Channel {
     }.flatMap { _ in
       return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
         self.pipeline.addHandler(
-          GRPCIdleHandler(mode: .client(connectionManager)),
+          GRPCIdleHandler(mode: .client(connectionManager), idleTimeout: connectionIdleTimeout),
           position: .after(http2Handler)
         )
       }.flatMap {

+ 1 - 0
Sources/GRPC/ConnectionManager.swift

@@ -559,6 +559,7 @@ extension ConnectionManager {
           tlsConfiguration: configuration.tls?.configuration,
           tlsServerHostname: serverHostname,
           connectionManager: self,
+          connectionIdleTimeout: configuration.connectionIdleTimeout,
           errorDelegate: configuration.errorDelegate,
           logger: self.logger
         )

+ 12 - 0
Sources/GRPC/GRPCChannel/GRPCChannelBuilder.swift

@@ -36,6 +36,7 @@ extension ClientConnection {
     private var connectionBackoffIsEnabled = true
     private var errorDelegate: ClientErrorDelegate?
     private var connectivityStateDelegate: ConnectivityStateDelegate?
+    private var connectionIdleTimeout: TimeAmount = .minutes(5)
     private var httpTargetWindowSize: Int = 65535
 
     fileprivate init(group: EventLoopGroup) {
@@ -50,6 +51,7 @@ extension ClientConnection {
         connectivityStateDelegate: self.connectivityStateDelegate,
         tls: self.maybeTLS,
         connectionBackoff: self.connectionBackoffIsEnabled ? self.connectionBackoff : nil,
+        connectionIdleTimeout: self.connectionIdleTimeout,
         httpTargetWindowSize: self.httpTargetWindowSize
       )
       return ClientConnection(configuration: configuration)
@@ -140,6 +142,16 @@ extension ClientConnection.Builder {
     self.connectionBackoffIsEnabled = enabled
     return self
   }
+
+  /// The amount of time to wait before closing the connection. The idle timeout will start only
+  /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. If a
+  /// connection becomes idle, starting a new RPC will automatically create a new connection.
+  /// Defaults to 5 minutes if not set.
+  @discardableResult
+  public func withConnectionIdleTimeout(_ timeout: TimeAmount) -> Self {
+    self.connectionIdleTimeout = timeout
+    return self
+  }
 }
 
 extension ClientConnection.Builder {

+ 5 - 1
Sources/GRPC/HTTPProtocolSwitcher.swift

@@ -26,6 +26,7 @@ internal class HTTPProtocolSwitcher {
   private let errorDelegate: ServerErrorDelegate?
   private let logger = Logger(subsystem: .serverChannelCall)
   private let httpTargetWindowSize: Int
+  private let idleTimeout: TimeAmount
 
   // We could receive additional data after the initial data and before configuring
   // the pipeline; buffer it and fire it down the pipeline once it is configured.
@@ -45,10 +46,12 @@ internal class HTTPProtocolSwitcher {
   init(
     errorDelegate: ServerErrorDelegate?,
     httpTargetWindowSize: Int = 65535,
+    idleTimeout: TimeAmount,
     handlersInitializer: (@escaping (Channel) -> EventLoopFuture<Void>)
   ) {
     self.errorDelegate = errorDelegate
     self.httpTargetWindowSize = httpTargetWindowSize
+    self.idleTimeout = idleTimeout
     self.handlersInitializer = handlersInitializer
   }
 }
@@ -146,7 +149,8 @@ extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
               .flatMap { self.handlersInitializer(streamChannel) }
           }.flatMap { multiplexer in
             // Add an idle handler between the two HTTP2 handlers.
-            context.channel.pipeline.addHandler(GRPCIdleHandler(mode: .server), position: .before(multiplexer))
+            let idleHandler = GRPCIdleHandler(mode: .server, idleTimeout: self.idleTimeout)
+            return context.channel.pipeline.addHandler(idleHandler, position: .before(multiplexer))
           }
           .cascade(to: pipelineConfigured)
       }

+ 8 - 1
Sources/GRPC/Server.swift

@@ -101,7 +101,8 @@ public final class Server {
       .childChannelInitializer { channel in
         let protocolSwitcher = HTTPProtocolSwitcher(
           errorDelegate: configuration.errorDelegate,
-          httpTargetWindowSize: configuration.httpTargetWindowSize
+          httpTargetWindowSize: configuration.httpTargetWindowSize,
+          idleTimeout: configuration.connectionIdleTimeout
         ) { channel -> EventLoopFuture<Void> in
           let logger = Logger(subsystem: .serverChannelCall, metadata: [MetadataKey.requestID: "\(UUID())"])
           let handler = GRPCServerRequestRoutingHandler(
@@ -189,6 +190,10 @@ extension Server {
     /// TLS configuration for this connection. `nil` if TLS is not desired.
     public var tls: TLS?
 
+    /// The amount of time to wait before closing connections. The idle timeout will start only
+    /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
+    public var connectionIdleTimeout: TimeAmount
+
     /// The compression configuration for requests and responses.
     ///
     /// If compression is enabled for the server it may be disabled for responses on any RPC by
@@ -217,6 +222,7 @@ extension Server {
       serviceProviders: [CallHandlerProvider],
       errorDelegate: ServerErrorDelegate? = LoggingServerErrorDelegate.shared,
       tls: TLS? = nil,
+      connectionIdleTimeout: TimeAmount = .minutes(5),
       messageEncoding: ServerMessageEncoding = .disabled,
       httpTargetWindowSize: Int = 65535
     ) {
@@ -225,6 +231,7 @@ extension Server {
       self.serviceProviders = serviceProviders
       self.errorDelegate = errorDelegate
       self.tls = tls
+      self.connectionIdleTimeout = connectionIdleTimeout
       self.messageEncoding = messageEncoding
       self.httpTargetWindowSize = httpTargetWindowSize
     }

+ 15 - 0
Sources/GRPC/ServerBuilder.swift

@@ -23,6 +23,7 @@ extension Server {
     private var providers: [CallHandlerProvider] = []
     private var errorDelegate: ServerErrorDelegate?
     private var messageEncoding: ServerMessageEncoding = .disabled
+    private var connectionIdleTimeout: TimeAmount = .minutes(5)
     private var httpTargetWindowSize: Int = 65535
 
     fileprivate init(group: EventLoopGroup) {
@@ -51,6 +52,7 @@ extension Server {
         serviceProviders: self.providers,
         errorDelegate: self.errorDelegate,
         tls: self.maybeTLS,
+        connectionIdleTimeout: self.connectionIdleTimeout,
         messageEncoding: self.messageEncoding,
         httpTargetWindowSize: self.httpTargetWindowSize
       )
@@ -71,15 +73,28 @@ extension Server.Builder {
 extension Server.Builder {
   /// Sets the service providers that this server should offer. Note that calling this multiple
   /// times will override any previously set providers.
+  @discardableResult
   public func withServiceProviders(_ providers: [CallHandlerProvider]) -> Self {
     self.providers = providers
     return self
   }
 }
 
+extension Server.Builder {
+  /// The amount of time to wait before closing connections. The idle timeout will start only
+  /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. Defaults to
+  /// 5 minutes if not set.
+  @discardableResult
+  public func withConnectionIdleTimeout(_ timeout: TimeAmount) -> Self {
+    self.connectionIdleTimeout = timeout
+    return self
+  }
+}
+
 extension Server.Builder {
   /// Sets the message compression configuration. Compression is disabled if this is not configured
   /// and any RPCs using compression will not be accepted.
+  @discardableResult
   public func withMessageCompression(_ encoding: ServerMessageEncoding) -> Self {
     self.messageEncoding = encoding
     return self

+ 80 - 0
Tests/GRPCTests/GRPCIdleTests.swift

@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@testable import GRPC
+import NIO
+import EchoModel
+import EchoImplementation
+import XCTest
+
+class GRPCIdleTests: GRPCTestCase {
+  func testClientIdleTimeout() {
+    XCTAssertNoThrow(try self.doTestIdleTimeout(serverIdle: .minutes(5), clientIdle: .milliseconds(100)))
+  }
+
+  func testServerIdleTimeout() throws {
+    XCTAssertNoThrow(try self.doTestIdleTimeout(serverIdle: .milliseconds(100), clientIdle: .minutes(5)))
+  }
+
+  func doTestIdleTimeout(serverIdle: TimeAmount, clientIdle: TimeAmount) throws {
+    // Is the server idling first? This determines what state change the client should see when the
+    // idle happens.
+    let isServerIdleFirst = serverIdle < clientIdle
+
+    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    // Setup a server.
+    let server = try Server.insecure(group: group)
+      .withServiceProviders([EchoProvider()])
+      .withConnectionIdleTimeout(serverIdle)
+      .bind(host: "localhost", port: 0)
+      .wait()
+    defer {
+      XCTAssertNoThrow(try server.close().wait())
+    }
+
+    // Setup a state change recorder for the client.
+    let stateRecorder = RecordingConnectivityDelegate()
+    stateRecorder.expectChanges(3) { changes in
+      XCTAssertEqual(changes, [
+        Change(from: .idle, to: .connecting),
+        Change(from: .connecting, to: .ready),
+        Change(from: .ready, to: isServerIdleFirst ? .transientFailure : .idle)
+      ])
+    }
+
+    // Setup a connection.
+    let connection = ClientConnection.insecure(group: group)
+      .withConnectivityStateDelegate(stateRecorder)
+      .withConnectionIdleTimeout(clientIdle)
+      .connect(host: "localhost", port: server.channel.localAddress!.port!)
+    defer {
+      XCTAssertNoThrow(try connection.close().wait())
+    }
+
+    let client = Echo_EchoClient(channel: connection)
+
+    // Make a call; this will trigger channel creation.
+    let get = client.get(.with { $0.text = "ignored" })
+    let status = try get.status.wait()
+    XCTAssertEqual(status.code, .ok)
+
+    // Now wait for the state changes.
+    stateRecorder.waitForExpectedChanges(timeout: .seconds(10))
+  }
+}

+ 11 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -353,6 +353,16 @@ extension GRPCCustomPayloadTests {
     ]
 }
 
+extension GRPCIdleTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__GRPCIdleTests = [
+        ("testClientIdleTimeout", testClientIdleTimeout),
+        ("testServerIdleTimeout", testServerIdleTimeout),
+    ]
+}
+
 extension GRPCInsecureInteroperabilityTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -719,6 +729,7 @@ public func __allTests() -> [XCTestCaseEntry] {
         testCase(FunctionalTestsMutualAuthenticationNIOTS.__allTests__FunctionalTestsMutualAuthenticationNIOTS),
         testCase(GRPCClientStateMachineTests.__allTests__GRPCClientStateMachineTests),
         testCase(GRPCCustomPayloadTests.__allTests__GRPCCustomPayloadTests),
+        testCase(GRPCIdleTests.__allTests__GRPCIdleTests),
         testCase(GRPCInsecureInteroperabilityTests.__allTests__GRPCInsecureInteroperabilityTests),
         testCase(GRPCSecureInteroperabilityTests.__allTests__GRPCSecureInteroperabilityTests),
         testCase(GRPCServerRequestRoutingHandlerTests.__allTests__GRPCServerRequestRoutingHandlerTests),