Browse Source

Add connection backoff (#480)

* Add connection backoff

* Simplify ConnectionBackoff
George Barnett 6 years ago
parent
commit
cf7af941ad

+ 138 - 0
Sources/GRPC/ConnectionBackoff.swift

@@ -0,0 +1,138 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+
+/// Provides backoff timeouts for making a connection.
+///
+/// This algorithm and defaults are determined by the gRPC connection backoff
+/// [documentation](https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md).
+public struct ConnectionBackoff: Sequence {
+  public typealias Iterator = ConnectionBackoffIterator
+
+  /// The initial backoff in seconds.
+  public var initialBackoff: TimeInterval
+
+  /// The maximum backoff in seconds. Note that the backoff is _before_ jitter has been applied,
+  /// this means that in practice the maximum backoff can be larger than this value.
+  public var maximumBackoff: TimeInterval
+
+  /// The backoff multiplier.
+  public var multiplier: Double
+
+  /// Backoff jitter; should be between 0 and 1.
+  public var jitter: Double
+
+  /// The minimum amount of time in seconds to try connecting.
+  public var minimumConnectionTimeout: TimeInterval
+
+  /// Creates a `ConnectionBackoff`.
+  ///
+  /// - Parameters:
+  ///   - initialBackoff: Initial backoff in seconds, defaults to 1.0.
+  ///   - maximumBackoff: Maximum backoff in seconds (prior to adding jitter), defaults to 120.0.
+  ///   - multiplier: Backoff multiplier, defaults to 1.6.
+  ///   - jitter: Backoff jitter, defaults to 0.2.
+  ///   - minimumConnectionTimeout: Minimum connection timeout in seconds, defaults to 20.0.
+  public init(
+    initialBackoff: TimeInterval = 1.0,
+    maximumBackoff: TimeInterval = 120.0,
+    multiplier: Double = 1.6,
+    jitter: Double = 0.2,
+    minimumConnectionTimeout: TimeInterval = 20.0
+  ) {
+    self.initialBackoff = initialBackoff
+    self.maximumBackoff = maximumBackoff
+    self.multiplier = multiplier
+    self.jitter = jitter
+    self.minimumConnectionTimeout = minimumConnectionTimeout
+  }
+
+  public func makeIterator() -> ConnectionBackoff.Iterator {
+    return Iterator(connectionBackoff: self)
+  }
+}
+
+/// An iterator for `ConnectionBackoff`.
+public class ConnectionBackoffIterator: IteratorProtocol {
+  public typealias Element = (timeout: TimeInterval, backoff: TimeInterval)
+
+  /// Creates a new connection backoff iterator with the given configuration.
+  public init(connectionBackoff: ConnectionBackoff) {
+    self.connectionBackoff = connectionBackoff
+    self.unjitteredBackoff = connectionBackoff.initialBackoff
+
+    // Since the first backoff is `initialBackoff` it must be generated here instead of
+    // by `makeNextElement`.
+    let backoff = min(connectionBackoff.initialBackoff, connectionBackoff.maximumBackoff)
+    self.initialElement = self.makeElement(backoff: backoff)
+  }
+
+  /// The configuration being used.
+  private let connectionBackoff: ConnectionBackoff
+
+  /// The backoff in seconds, without jitter.
+  private var unjitteredBackoff: TimeInterval
+
+  /// The first element to return. Since the first backoff is defined as `initialBackoff` we can't
+  /// compute it on-the-fly.
+  private var initialElement: Element?
+
+  /// Whether or not we should make another element.
+  private var shouldMakeNextElement: Bool {
+    return self.unjitteredBackoff < self.connectionBackoff.maximumBackoff
+  }
+
+  /// Returns the next pair of connection timeout and backoff (in that order) to use should the
+  /// connection attempt fail.
+  ///
+  /// The iterator will stop producing values _after_ the unjittered backoff is greater than or
+  /// equal to the maximum backoff set in the configuration used to create this iterator.
+  public func next() -> Element? {
+    if let initial = self.initialElement {
+      self.initialElement = nil
+      return initial
+    } else {
+      return self.makeNextElement()
+    }
+  }
+
+  /// Produces the next element to return, or `nil` if no more elements should be made.
+  private func makeNextElement() -> Element? {
+    guard self.shouldMakeNextElement else {
+      return nil
+    }
+
+    let unjittered = self.unjitteredBackoff * self.connectionBackoff.multiplier
+    self.unjitteredBackoff = min(unjittered, self.connectionBackoff.maximumBackoff)
+
+    let backoff = self.jittered(value: self.unjitteredBackoff)
+    return self.makeElement(backoff: backoff)
+  }
+
+  /// Make a timeout-backoff pair from the given backoff. The timeout is the `max` of the backoff
+  /// and `connectionBackoff.minimumConnectionTimeout`.
+  private func makeElement(backoff: TimeInterval) -> Element {
+    let timeout = max(backoff, self.connectionBackoff.minimumConnectionTimeout)
+    return (timeout, backoff)
+  }
+
+  /// Adds 'jitter' to the given value.
+  private func jittered(value: TimeInterval) -> TimeInterval {
+    let lower = -self.connectionBackoff.jitter * value
+    let upper = self.connectionBackoff.jitter * value
+    return value + TimeInterval.random(in: lower...upper)
+  }
+}

+ 60 - 4
Sources/GRPC/GRPCClientConnection.swift

@@ -114,9 +114,32 @@ open class GRPCClientConnection {
   ///
   /// - Parameter configuration: The configuration to start the connection with.
   public class func start(_ configuration: Configuration) -> EventLoopFuture<GRPCClientConnection> {
-    return makeBootstrap(configuration: configuration)
-      .connect(to: configuration.target)
-      .flatMap { channel in
+    return start(configuration, backoffIterator: configuration.connectionBackoff?.makeIterator())
+  }
+
+  /// Starts a client connection using the given configuration and backoff.
+  ///
+  /// In addition to the steps taken in `start(configuration:)`, we _may_ additionally set a
+  /// connection timeout and schedule a retry attempt (should the connection fail) if a
+  /// `ConnectionBackoff.Iterator` is provided.
+  ///
+  /// - Parameter configuration: The configuration to start the connection with.
+  /// - Parameter backoffIterator: A `ConnectionBackoff` iterator which generates connection
+  ///     timeouts and backoffs to use when attempting to retry the connection.
+  internal class func start(
+    _ configuration: Configuration,
+    backoffIterator: ConnectionBackoff.Iterator?
+  ) -> EventLoopFuture<GRPCClientConnection> {
+    let timeoutAndBackoff = backoffIterator?.next()
+
+    var bootstrap = makeBootstrap(configuration: configuration)
+    // Set a timeout, if we have one.
+    if let timeout = timeoutAndBackoff?.timeout {
+      bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
+    }
+
+    let connection = bootstrap.connect(to: configuration.target)
+      .flatMap { channel -> EventLoopFuture<GRPCClientConnection> in
         let tlsVerified: EventLoopFuture<Void>?
         if configuration.tlsConfiguration != nil {
           tlsVerified = verifyTLS(channel: channel)
@@ -128,6 +151,22 @@ open class GRPCClientConnection {
           makeGRPCClientConnection(channel: channel, configuration: configuration)
         }
       }
+
+    guard let backoff = timeoutAndBackoff?.backoff else {
+      return connection
+    }
+
+    // If we're in error then schedule our next attempt.
+    return connection.flatMapError { error in
+      // The `futureResult` of the scheduled task is of type
+      // `EventLoopFuture<EventLoopFuture<GRPCClientConnection>>`, so we need to `flatMap` it to
+      // remove a level of indirection.
+      return connection.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
+        return start(configuration, backoffIterator: backoffIterator)
+      }.futureResult.flatMap { nextConnection in
+        return nextConnection
+      }
+    }
   }
 
   public let channel: Channel
@@ -186,6 +225,10 @@ extension GRPCClientConnection {
     /// TLS configuration for this connection. `nil` if TLS is not desired.
     public var tlsConfiguration: TLSConfiguration?
 
+    /// The connection backoff configuration. If no connection retrying is required then this should
+    /// be `nil`.
+    public var connectionBackoff: ConnectionBackoff?
+
     /// The HTTP protocol used for this connection.
     public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
       return self.tlsConfiguration == nil ? .http : .https
@@ -198,16 +241,20 @@ extension GRPCClientConnection {
     /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
     ///     on debug builds.
     /// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
+    /// - Parameter connectionBackoff: The connection backoff configuration to use, defaulting
+    ///     to `nil`.
     public init(
       target: ConnectionTarget,
       eventLoopGroup: EventLoopGroup,
       errorDelegate: ClientErrorDelegate? = DebugOnlyLoggingClientErrorDelegate.shared,
-      tlsConfiguration: TLSConfiguration? = nil
+      tlsConfiguration: TLSConfiguration? = nil,
+      connectionBackoff: ConnectionBackoff? = nil
     ) {
       self.target = target
       self.eventLoopGroup = eventLoopGroup
       self.errorDelegate = errorDelegate
       self.tlsConfiguration = tlsConfiguration
+      self.connectionBackoff = connectionBackoff
     }
   }
 
@@ -269,3 +316,12 @@ fileprivate extension Channel {
     }
   }
 }
+
+fileprivate extension TimeAmount {
+  /// Creates a new `TimeAmount` from the given time interval in seconds.
+  ///
+  /// - Parameter timeInterval: The amount of time in seconds
+  static func seconds(timeInterval: TimeInterval) -> TimeAmount {
+    return .nanoseconds(TimeAmount.Value(timeInterval * 1_000_000_000))
+  }
+}

+ 99 - 0
Tests/GRPCTests/ClientConnectionBackoffTests.swift

@@ -0,0 +1,99 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import GRPC
+import NIO
+import XCTest
+
+class ClientConnectionBackoffTests: XCTestCase {
+  let port = 8080
+
+  var client: EventLoopFuture<GRPCClientConnection>!
+  var server: EventLoopFuture<GRPCServer>!
+
+  var group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+
+  override func tearDown() {
+    if let server = self.server {
+      XCTAssertNoThrow(try server.flatMap { $0.channel.close() }.wait())
+    }
+
+    // We don't always expect a client (since we deliberately timeout the connection in some cases).
+    if let client = try? self.client.wait() {
+      XCTAssertNoThrow(try client.channel.close().wait())
+    }
+
+    XCTAssertNoThrow(try self.group.syncShutdownGracefully())
+  }
+
+  func makeServer() throws -> EventLoopFuture<GRPCServer> {
+    return try GRPCServer.start(
+      hostname: "localhost",
+      port: self.port,
+      eventLoopGroup: self.group,
+      serviceProviders: [])
+  }
+
+  func makeClientConfiguration() -> GRPCClientConnection.Configuration {
+    return .init(
+      target: .hostAndPort("localhost", self.port),
+      eventLoopGroup: self.group,
+      connectionBackoff: ConnectionBackoff())
+  }
+
+  func makeClientConnection(
+    _ configuration: GRPCClientConnection.Configuration
+  ) -> EventLoopFuture<GRPCClientConnection> {
+    return GRPCClientConnection.start(configuration)
+  }
+
+  func testClientConnectionFailsWithNoBackoff() throws {
+    var configuration = self.makeClientConfiguration()
+    configuration.connectionBackoff = nil
+
+    self.client = self.makeClientConnection(configuration)
+    XCTAssertThrowsError(try self.client.wait()) { error in
+      XCTAssert(error is NIOConnectionError)
+    }
+  }
+
+  func testClientEventuallyConnects() throws {
+    let clientConnected = self.expectation(description: "client connected")
+    let serverStarted = self.expectation(description: "server started")
+
+    // Start the client first.
+    self.client = self.makeClientConnection(self.makeClientConfiguration())
+    self.client.assertSuccess(fulfill: clientConnected)
+
+    // Sleep for a little bit to make sure we hit the backoff.
+    Thread.sleep(forTimeInterval: 0.2)
+
+    self.server = try self.makeServer()
+    self.server.assertSuccess(fulfill: serverStarted)
+
+    self.wait(for: [serverStarted, clientConnected], timeout: 2.0, enforceOrder: true)
+  }
+
+  func testClientEventuallyTimesOut() throws {
+    var configuration = self.makeClientConfiguration()
+    configuration.connectionBackoff = ConnectionBackoff(maximumBackoff: 0.1)
+
+    self.client = self.makeClientConnection(configuration)
+    XCTAssertThrowsError(try self.client.wait()) { error in
+      XCTAssert(error is NIOConnectionError)
+    }
+  }
+}

+ 74 - 0
Tests/GRPCTests/ConnectionBackoffTests.swift

@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import GRPC
+import XCTest
+
+class ConnectionBackoffTests: XCTestCase {
+  var backoff = ConnectionBackoff()
+
+  func testExpectedValuesWithNoJitter() {
+    self.backoff.jitter = 0.0
+    self.backoff.multiplier = 2.0
+    self.backoff.initialBackoff = 1.0
+    self.backoff.maximumBackoff = 16.0
+    self.backoff.minimumConnectionTimeout = 4.2
+
+    let timeoutAndBackoff = Array(self.backoff)
+
+    let expectedBackoff: [TimeInterval] = [1.0, 2.0, 4.0, 8.0, 16.0]
+    XCTAssertEqual(expectedBackoff, timeoutAndBackoff.map { $0.backoff })
+
+    let expectedTimeout: [TimeInterval] = [4.2, 4.2, 4.2, 8.0, 16.0]
+    XCTAssertEqual(expectedTimeout, timeoutAndBackoff.map { $0.timeout })
+  }
+
+  func testBackoffWithNoJitter() {
+    self.backoff.jitter = 0.0
+
+    for (i, timeoutAndBackoff) in self.backoff.enumerated() {
+      let expected = min(pow(self.backoff.initialBackoff * self.backoff.multiplier, Double(i)),
+                         self.backoff.maximumBackoff)
+      XCTAssertEqual(expected, timeoutAndBackoff.backoff, accuracy: 1e-6)
+    }
+  }
+
+  func testBackoffWithJitter() {
+    for (i, timeoutAndBackoff) in self.backoff.enumerated() {
+      let unjittered = min(pow(self.backoff.initialBackoff * self.backoff.multiplier, Double(i)),
+                           self.backoff.maximumBackoff)
+      let halfJitterRange = self.backoff.jitter * unjittered
+      let jitteredRange = (unjittered-halfJitterRange)...(unjittered+halfJitterRange)
+      XCTAssert(jitteredRange.contains(timeoutAndBackoff.backoff))
+    }
+  }
+
+  func testBackoffDoesNotExceedMaximum() {
+    // Since jitter is applied after checking against the maximum allowed backoff, the maximum
+    // backoff can still be exceeded if jitter is non-zero.
+    self.backoff.jitter = 0.0
+
+    for (_, backoff) in self.backoff {
+      XCTAssertLessThanOrEqual(backoff, self.backoff.maximumBackoff)
+    }
+  }
+
+  func testConnectionTimeoutAlwaysGreatherThanOrEqualToMinimum() {
+    for (connectionTimeout, _) in self.backoff {
+      XCTAssertGreaterThanOrEqual(connectionTimeout, self.backoff.minimumConnectionTimeout)
+    }
+  }
+}

+ 9 - 0
Tests/GRPCTests/EventLoopFuture+Assertions.swift

@@ -65,6 +65,15 @@ extension EventLoopFuture {
       }
     }
   }
+
+  /// Registers a callback which fulfills an expectation when the future succeeds.
+  ///
+  /// - Parameter expectation: The expectation to fulfill.
+  func assertSuccess(fulfill expectation: XCTestExpectation, file: StaticString = #file, line: UInt = #line) {
+    self.whenSuccess { _ in
+      expectation.fulfill()
+    }
+  }
 }
 
 extension EventLoopFuture {

+ 26 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -40,6 +40,17 @@ extension ClientClosedChannelTests {
     ]
 }
 
+extension ClientConnectionBackoffTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__ClientConnectionBackoffTests = [
+        ("testClientConnectionFailsWithNoBackoff", testClientConnectionFailsWithNoBackoff),
+        ("testClientEventuallyConnects", testClientEventuallyConnects),
+        ("testClientEventuallyTimesOut", testClientEventuallyTimesOut),
+    ]
+}
+
 extension ClientTLSFailureTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -77,6 +88,19 @@ extension ClientTimeoutTests {
     ]
 }
 
+extension ConnectionBackoffTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__ConnectionBackoffTests = [
+        ("testBackoffDoesNotExceedMaximum", testBackoffDoesNotExceedMaximum),
+        ("testBackoffWithJitter", testBackoffWithJitter),
+        ("testBackoffWithNoJitter", testBackoffWithNoJitter),
+        ("testConnectionTimeoutAlwaysGreatherThanOrEqualToMinimum", testConnectionTimeoutAlwaysGreatherThanOrEqualToMinimum),
+        ("testExpectedValuesWithNoJitter", testExpectedValuesWithNoJitter),
+    ]
+}
+
 extension FunctionalTestsAnonymousClient {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -322,9 +346,11 @@ public func __allTests() -> [XCTestCaseEntry] {
         testCase(AnyServiceClientTests.__allTests__AnyServiceClientTests),
         testCase(ClientCancellingTests.__allTests__ClientCancellingTests),
         testCase(ClientClosedChannelTests.__allTests__ClientClosedChannelTests),
+        testCase(ClientConnectionBackoffTests.__allTests__ClientConnectionBackoffTests),
         testCase(ClientTLSFailureTests.__allTests__ClientTLSFailureTests),
         testCase(ClientThrowingWhenServerReturningErrorTests.__allTests__ClientThrowingWhenServerReturningErrorTests),
         testCase(ClientTimeoutTests.__allTests__ClientTimeoutTests),
+        testCase(ConnectionBackoffTests.__allTests__ConnectionBackoffTests),
         testCase(FunctionalTestsAnonymousClient.__allTests__FunctionalTestsAnonymousClient),
         testCase(FunctionalTestsInsecureTransport.__allTests__FunctionalTestsInsecureTransport),
         testCase(FunctionalTestsMutualAuthentication.__allTests__FunctionalTestsMutualAuthentication),