Browse Source

Allow users to choose a policy for starting RPCs (#860)

Motivation:

In most cases RPCs should be started once a connection has become
'ready', this allows RPCs to wait for transient network failures to be
recovered from instead of just failing. However in some contexts this is
undesirable, and failing RPCs quickly when it does not have a connection
is the right thing to do.

Modifications:

- Add a 'CallStartPolicy' which allows users RPCs to configure their
  connection such that RPCs will wait for the connection to be ready
  before starting the RPC or fail quickly if the most recent connection
  attempt failed.

Result:

Users can fail RPCs quickly if they do not wish to wait for a connection
to be eventually established.
George Barnett 5 years ago
parent
commit
2c149a2a96

+ 51 - 1
Sources/GRPC/ClientConnection.swift

@@ -72,9 +72,19 @@ import Logging
 public class ClientConnection {
   private let connectionManager: ConnectionManager
 
+  private func getChannel() -> EventLoopFuture<Channel> {
+    switch self.configuration.callStartBehavior.wrapped {
+    case .waitsForConnectivity:
+      return self.connectionManager.getChannel()
+
+    case .fastFailure:
+      return self.connectionManager.getOptimisticChannel()
+    }
+  }
+
   /// HTTP multiplexer from the `channel` handling gRPC calls.
   internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> {
-    return self.connectionManager.getChannel().flatMap {
+    return self.getChannel().flatMap {
       $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
     }
   }
@@ -266,6 +276,38 @@ public struct ConnectionTarget {
   }
 }
 
+/// The connectivity behavior to use when starting an RPC.
+public struct CallStartBehavior: Hashable {
+  internal enum Behavior: Hashable {
+    case waitsForConnectivity
+    case fastFailure
+  }
+
+  internal var wrapped: Behavior
+  private init(_ wrapped: Behavior) {
+    self.wrapped = wrapped
+  }
+
+  /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
+  /// an RPC. Doing so may involve multiple connection attempts.
+  ///
+  /// This is the preferred, and default, behaviour.
+  public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
+
+  /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
+  /// quickly rather than waiting for an active connection. The behaviour depends on the current
+  /// connectivity state:
+  ///
+  /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
+  /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
+  ///     fails.
+  /// - Ready: a connection is already active: the RPC will be started using that connection.
+  /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
+  ///     connect again. The RPC will fail immediately.
+  /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
+  public static let fastFailure = CallStartBehavior(.fastFailure)
+}
+
 extension ClientConnection {
   /// The configuration for a connection.
   public struct Configuration {
@@ -300,6 +342,10 @@ extension ClientConnection {
     /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
     public var connectionIdleTimeout: TimeAmount
 
+    /// The behavior used to determine when an RPC should start. That is, whether it should wait for
+    /// an active connection or fail quickly if no connection is currently available.
+    public var callStartBehavior: CallStartBehavior
+
     /// The HTTP/2 flow control target window size.
     public var httpTargetWindowSize: Int
 
@@ -321,6 +367,8 @@ extension ClientConnection {
     ///     `connectivityStateDelegate`.
     /// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
     /// - Parameter connectionBackoff: The connection backoff configuration to use.
+    /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
+    ///     relation to its underlying connection. Defaults to `waitsForConnectivity`.
     /// - Parameter messageEncoding: Message compression configuration, defaults to no compression.
     /// - Parameter targetWindowSize: The HTTP/2 flow control target window size.
     public init(
@@ -332,6 +380,7 @@ extension ClientConnection {
       tls: Configuration.TLS? = nil,
       connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
       connectionIdleTimeout: TimeAmount = .minutes(5),
+      callStartBehavior: CallStartBehavior = .waitsForConnectivity,
       httpTargetWindowSize: Int = 65535
     ) {
       self.target = target
@@ -342,6 +391,7 @@ extension ClientConnection {
       self.tls = tls
       self.connectionBackoff = connectionBackoff
       self.connectionIdleTimeout = connectionIdleTimeout
+      self.callStartBehavior = callStartBehavior
       self.httpTargetWindowSize = httpTargetWindowSize
     }
   }

+ 33 - 0
Sources/GRPC/ConnectionManager.swift

@@ -254,6 +254,39 @@ internal class ConnectionManager {
     }
   }
 
+  /// Returns a future for the current channel, or future channel from the current connection
+  /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
+  ///
+  /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
+  internal func getOptimisticChannel() -> EventLoopFuture<Channel> {
+    return self.eventLoop.flatSubmit {
+      switch self.state {
+      case .idle:
+        self.startConnecting()
+        // We started connecting so we must transition to the `connecting` state.
+        guard case .connecting(let connecting) = self.state else {
+          self.invalidState()
+        }
+        return connecting.candidate
+
+      case .connecting(let state):
+        return state.candidate
+
+      case .active(let state):
+        return state.candidate.eventLoop.makeSucceededFuture(state.candidate)
+
+      case .ready(let state):
+        return state.channel.eventLoop.makeSucceededFuture(state.channel)
+
+      case .transientFailure:
+        return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
+
+      case .shutdown:
+        return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
+      }
+    }
+  }
+
   /// Shutdown any connection which exists. This is a request from the application.
   internal func shutdown() -> EventLoopFuture<Void> {
     return self.eventLoop.flatSubmit {

+ 11 - 0
Sources/GRPC/GRPCChannel/GRPCChannelBuilder.swift

@@ -39,6 +39,7 @@ extension ClientConnection {
     private var connectivityStateDelegate: ConnectivityStateDelegate?
     private var connectivityStateDelegateQueue: DispatchQueue?
     private var connectionIdleTimeout: TimeAmount = .minutes(5)
+    private var callStartBehavior: CallStartBehavior = .waitsForConnectivity
     private var httpTargetWindowSize: Int = 65535
 
     fileprivate init(group: EventLoopGroup) {
@@ -55,6 +56,7 @@ extension ClientConnection {
         tls: self.maybeTLS,
         connectionBackoff: self.connectionBackoffIsEnabled ? self.connectionBackoff : nil,
         connectionIdleTimeout: self.connectionIdleTimeout,
+        callStartBehavior: self.callStartBehavior,
         httpTargetWindowSize: self.httpTargetWindowSize
       )
       return ClientConnection(configuration: configuration)
@@ -155,6 +157,15 @@ extension ClientConnection.Builder {
     self.connectionIdleTimeout = timeout
     return self
   }
+
+  /// The behavior used to determine when an RPC should start. That is, whether it should wait for
+  /// an active connection or fail quickly if no connection is currently available. Calls will
+  /// use `.waitsForConnectivity` by default.
+  @discardableResult
+  public func withCallStartBehavior(_ behavior: CallStartBehavior) -> Self {
+    self.callStartBehavior = behavior
+    return self
+  }
 }
 
 extension ClientConnection.Builder {

+ 43 - 0
Tests/GRPCTests/CallStartBehaviorTests.swift

@@ -0,0 +1,43 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPC
+import EchoModel
+import NIO
+import XCTest
+
+class CallStartBehaviorTests: GRPCTestCase {
+  func testFastFailure() {
+    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    // If the policy was 'waitsForConnectivity' we'd continue attempting to connect with backoff
+    // and the RPC wouldn't complete until we call shutdown (because we're not setting a timeout).
+    let channel = ClientConnection.insecure(group: group)
+      .withCallStartBehavior(.fastFailure)
+      .connect(host: "http://unreachable.invalid", port: 0)
+    defer {
+      XCTAssertNoThrow(try channel.close().wait())
+    }
+
+    let echo = Echo_EchoClient(channel: channel)
+    let get = echo.get(.with { $0.text = "Is anyone out there?" })
+
+    XCTAssertThrowsError(try get.response.wait())
+    XCTAssertNoThrow(try get.status.wait())
+  }
+}

+ 70 - 0
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -617,6 +617,76 @@ extension ConnectionManagerTests {
       XCTAssertNoThrow(try shutdown.wait())
     }
   }
+
+  func testDoomedOptimisticChannelFromIdle() {
+    let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
+      return self.loop.makeFailedFuture(DoomedChannelError())
+    }
+    let candidate = manager.getOptimisticChannel()
+    self.loop.run()
+    XCTAssertThrowsError(try candidate.wait())
+  }
+
+  func testDoomedOptimisticChannelFromConnecting() throws {
+    let promise = self.loop.makePromise(of: Channel.self)
+    let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
+      return promise.futureResult
+    }
+
+    self.waitForStateChange(from: .idle, to: .connecting) {
+      // Trigger channel creation, and a connection attempt, we don't care about the channel.
+      _ = manager.getChannel()
+      self.loop.run()
+    }
+
+    // We're connecting: get an optimistic channel.
+    let optimisticChannel = manager.getOptimisticChannel()
+    self.loop.run()
+
+    // Fail the promise.
+    promise.fail(DoomedChannelError())
+
+    XCTAssertThrowsError(try optimisticChannel.wait())
+  }
+
+  func testOptimisticChannelFromTransientFailure() throws {
+    var configuration = self.defaultConfiguration
+    configuration.connectionBackoff = ConnectionBackoff()
+
+    let manager = ConnectionManager.testingOnly(configuration: configuration, logger: self.logger) {
+      return self.loop.makeFailedFuture(DoomedChannelError())
+    }
+
+    self.waitForStateChanges([
+      Change(from: .idle, to: .connecting),
+      Change(from: .connecting, to: .transientFailure)
+    ]) {
+      // Trigger channel creation, and a connection attempt, we don't care about the channel.
+      _ = manager.getChannel()
+      self.loop.run()
+    }
+
+    // Now we're sitting in transient failure. Get a channel optimistically.
+    let optimisticChannel = manager.getOptimisticChannel()
+    self.loop.run()
+
+    XCTAssertThrowsError(try optimisticChannel.wait())
+  }
+
+  func testOptimisticChannelFromShutdown() throws {
+    let manager = ConnectionManager.testingOnly(configuration: self.defaultConfiguration, logger: self.logger) {
+      return self.loop.makeFailedFuture(DoomedChannelError())
+    }
+
+    let shutdown = manager.shutdown()
+    self.loop.run()
+    XCTAssertNoThrow(try shutdown.wait())
+
+    // Get a channel optimistically. It'll fail, obviously.
+    let channel = manager.getOptimisticChannel()
+    self.loop.run()
+    XCTAssertThrowsError(try channel.wait())
+  }
 }
 
 internal struct Change: Hashable, CustomStringConvertible {

+ 14 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -13,6 +13,15 @@ extension AnyServiceClientTests {
     ]
 }
 
+extension CallStartBehaviorTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__CallStartBehaviorTests = [
+        ("testFastFailure", testFastFailure),
+    ]
+}
+
 extension ChannelTransportTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -143,9 +152,13 @@ extension ConnectionManagerTests {
         ("testConnectAndThenBecomeInactive", testConnectAndThenBecomeInactive),
         ("testConnectFromIdleFailsWithNoReconnect", testConnectFromIdleFailsWithNoReconnect),
         ("testConnectOnSecondAttempt", testConnectOnSecondAttempt),
+        ("testDoomedOptimisticChannelFromConnecting", testDoomedOptimisticChannelFromConnecting),
+        ("testDoomedOptimisticChannelFromIdle", testDoomedOptimisticChannelFromIdle),
         ("testGoAwayWhenReady", testGoAwayWhenReady),
         ("testIdleShutdown", testIdleShutdown),
         ("testIdleTimeoutWhenThereAreActiveStreams", testIdleTimeoutWhenThereAreActiveStreams),
+        ("testOptimisticChannelFromShutdown", testOptimisticChannelFromShutdown),
+        ("testOptimisticChannelFromTransientFailure", testOptimisticChannelFromTransientFailure),
         ("testShutdownWhileActive", testShutdownWhileActive),
         ("testShutdownWhileConnecting", testShutdownWhileConnecting),
         ("testShutdownWhileShutdown", testShutdownWhileShutdown),
@@ -781,6 +794,7 @@ extension ZlibTests {
 public func __allTests() -> [XCTestCaseEntry] {
     return [
         testCase(AnyServiceClientTests.__allTests__AnyServiceClientTests),
+        testCase(CallStartBehaviorTests.__allTests__CallStartBehaviorTests),
         testCase(ChannelTransportTests.__allTests__ChannelTransportTests),
         testCase(ClientCancellingTests.__allTests__ClientCancellingTests),
         testCase(ClientClosedChannelTests.__allTests__ClientClosedChannelTests),