Browse Source

Add a retry throttle (#1689)

Motivation:

To implement retries and hedging, transports need to be able to throttle
attempts to svoid overloading servers. The uses a token based system
where successful requests, ones which end with non-retryable status
code, add tokens and those which fail remove tokens from the system.
Successful requests add 1 token, failed requests remove `tokenRatio`
tokens (typically less than 1).

Modification:

- Implement a retry throttle
- Add a requirement to the `ClientTransport`

Result:

Retries can be throttled.
George Barnett 2 years ago
parent
commit
f37ce30f21

+ 7 - 0
Sources/GRPCCore/Transport/ClientTransport.swift

@@ -19,6 +19,13 @@ public protocol ClientTransport: Sendable {
   associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart
   associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart
   associatedtype Outbound: ClosableRPCWriterProtocol<RPCRequestPart>
   associatedtype Outbound: ClosableRPCWriterProtocol<RPCRequestPart>
 
 
+  /// Returns a throttle which gRPC uses to determine whether retries can be executed.
+  ///
+  /// Client transports don't need to implement the throttle or interact with it beyond its
+  /// creation. gRPC will record the results of requests to determine whether retries can be
+  /// performed.
+  var retryThrottle: RetryThrottle { get }
+
   /// Establish and maintain a connection to the remote destination.
   /// Establish and maintain a connection to the remote destination.
   ///
   ///
   /// Maintains a long-lived connection, or set of connections, to a remote destination.
   /// Maintains a long-lived connection, or set of connections, to a remote destination.

+ 124 - 0
Sources/GRPCCore/Transport/RetryThrottle.swift

@@ -0,0 +1,124 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/// A throttle used to rate-limit retries and hedging attempts.
+///
+/// gRPC prevents servers from being overloaded by retries and hedging by using a token-based
+/// throttling mechanism at the transport level.
+///
+/// Each client transport maintains a throttle for the server it is connected to and gRPC records
+/// successful and failed RPC attempts. Successful attempts increment the number of tokens
+/// by ``tokenRatio`` and failed attempts decrement the available tokens by one. In the context
+/// of throttling, a failed attempt is one where the server terminates the RPC with a status code
+/// which is retryable or non fatal (as defined by ``RetryPolicy/retryableStatusCodes`` and
+/// ``HedgingPolicy/nonFatalStatusCodes``) or when the client receives a pushback response from
+/// the server.
+///
+/// See also [gRFC A6: client retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
+public struct RetryThrottle: Sendable {
+  // Note: only three figures after the decimal point from the original token ratio are used so
+  //   all computation is done a scaled number of tokens (tokens * 1000). This allows us to do all
+  //   computation in integer space.
+
+  /// The number of tokens available, multiplied by 1000.
+  private let scaledTokensAvailable: LockedValueBox<Int>
+  /// The number of tokens, multiplied by 1000.
+  private let scaledTokenRatio: Int
+  /// The maximum number of tokens, multiplied by 1000.
+  private let scaledMaximumTokens: Int
+  /// The retry threshold, multiplied by 1000. If ``scaledTokensAvailable`` is above this then
+  /// retries are permitted.
+  private let scaledRetryThreshold: Int
+
+  /// Returns the throttling token ratio.
+  ///
+  /// The number of tokens held by the throttle is incremented by this value for each successful
+  /// response. In the context of throttling, a successful response is one which:
+  /// - receives metadata from the server, or
+  /// - is terminated with a non-retryable or fatal status code.
+  ///
+  /// If the response is a pushback response then it is not considered to be successful, even if
+  /// either of the preceding conditions are met.
+  public var tokenRatio: Double {
+    Double(self.scaledTokenRatio) / 1000
+  }
+
+  /// The maximum number of tokens the throttle may hold.
+  public var maximumTokens: Int {
+    self.scaledMaximumTokens / 1000
+  }
+
+  /// The number of tokens the throttle currently has.
+  ///
+  /// If this value is less than or equal to the retry threshold (defined as `maximumTokens / 2`)
+  /// then RPCs will not be retried and hedging will be disabled.
+  public var tokens: Double {
+    self.scaledTokensAvailable.withLockedValue {
+      Double($0) / 1000
+    }
+  }
+
+  /// Returns whether retries and hedging are permitted at this time.
+  public var isRetryPermitted: Bool {
+    self.scaledTokensAvailable.withLockedValue {
+      $0 > self.scaledRetryThreshold
+    }
+  }
+
+  /// Create a new throttle.
+  ///
+  /// - Parameters:
+  ///   - maximumTokens: The maximum number of tokens available. Must be in the range `1...1000`.
+  ///   - tokenRatio: The number of tokens to increment the available tokens by for successful
+  ///       responses. See the documentation on this type for a description of what counts as a
+  ///       successful response. Note that only three decimal places are used from this value.
+  /// - Precondition: `maximumTokens` must be in the range `1...1000`.
+  /// - Precondition: `tokenRatio` must be `>= 0.001`.
+  public init(maximumTokens: Int, tokenRatio: Double) {
+    precondition(
+      (1 ... 1000).contains(maximumTokens),
+      "maximumTokens must be in the range 1...1000 (is \(maximumTokens))"
+    )
+
+    let scaledTokenRatio = Int(tokenRatio * 1000)
+    precondition(scaledTokenRatio > 0, "tokenRatio must be >= 0.001 (is \(tokenRatio))")
+
+    let scaledTokens = maximumTokens * 1000
+    self.scaledMaximumTokens = scaledTokens
+    self.scaledRetryThreshold = scaledTokens / 2
+    self.scaledTokenRatio = scaledTokenRatio
+    self.scaledTokensAvailable = LockedValueBox(scaledTokens)
+  }
+
+  /// Records a success, adding a token to the throttle.
+  @usableFromInline
+  func recordSuccess() {
+    self.scaledTokensAvailable.withLockedValue { value in
+      value = min(self.scaledMaximumTokens, value &+ self.scaledTokenRatio)
+    }
+  }
+
+  /// Records a failure, removing tokens from the throttle.
+  /// - Returns: Whether retries will now be throttled.
+  @usableFromInline
+  @discardableResult
+  func recordFailure() -> Bool {
+    self.scaledTokensAvailable.withLockedValue { value in
+      value = max(0, value &- 1000)
+      return value <= self.scaledRetryThreshold
+    }
+  }
+}

+ 98 - 0
Tests/GRPCCoreTests/Transport/RetryThrottleTests.swift

@@ -0,0 +1,98 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import XCTest
+
+@testable import GRPCCore
+
+final class RetryThrottleTests: XCTestCase {
+  func testThrottleOnInit() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+    // Start with max tokens, so permitted.
+    XCTAssertTrue(throttle.isRetryPermitted)
+    XCTAssertEqual(throttle.maximumTokens, 10)
+    XCTAssertEqual(throttle.tokens, 10)
+    XCTAssertEqual(throttle.tokenRatio, 0.1)
+  }
+
+  func testThrottleIgnoresMoreThanThreeDecimals() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1239)
+    XCTAssertEqual(throttle.tokenRatio, 0.123)
+  }
+
+  func testFailureReducesTokens() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+    XCTAssertEqual(throttle.tokens, 10)
+    XCTAssert(throttle.isRetryPermitted)
+
+    throttle.recordFailure()
+    XCTAssertEqual(throttle.tokens, 9)
+    XCTAssert(throttle.isRetryPermitted)
+
+    throttle.recordFailure()
+    XCTAssertEqual(throttle.tokens, 8)
+    XCTAssert(throttle.isRetryPermitted)
+
+    throttle.recordFailure()
+    XCTAssertEqual(throttle.tokens, 7)
+    XCTAssert(throttle.isRetryPermitted)
+
+    throttle.recordFailure()
+    XCTAssertEqual(throttle.tokens, 6)
+    XCTAssert(throttle.isRetryPermitted)
+
+    // Drop to threshold, retries no longer allowed.
+    throttle.recordFailure()
+    XCTAssertEqual(throttle.tokens, 5)
+    XCTAssertFalse(throttle.isRetryPermitted)
+  }
+
+  func testTokensCantDropBelowZero() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+    for _ in 0 ..< 1000 {
+      throttle.recordFailure()
+      XCTAssertGreaterThanOrEqual(throttle.tokens, 0)
+    }
+    XCTAssertEqual(throttle.tokens, 0)
+  }
+
+  func testSuccessIncreasesTokens() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+
+    // Drop to zero.
+    for _ in 0 ..< 10 {
+      throttle.recordFailure()
+    }
+    XCTAssertEqual(throttle.tokens, 0)
+
+    // Start recording successes.
+    throttle.recordSuccess()
+    XCTAssertEqual(throttle.tokens, 0.1)
+
+    throttle.recordSuccess()
+    XCTAssertEqual(throttle.tokens, 0.2)
+
+    throttle.recordSuccess()
+    XCTAssertEqual(throttle.tokens, 0.3)
+  }
+
+  func testTokensCantRiseAboveMax() {
+    let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+    XCTAssertEqual(throttle.tokens, 10)
+    throttle.recordSuccess()
+    XCTAssertEqual(throttle.tokens, 10)
+  }
+}