Przeglądaj źródła

Better handle client connectivity state. (#510)

* Better handle client connectivity state.

Motivation:

Pull #506 highlighted some issues with client reconnectivity, namely
a race between an automatic reconnection and the client closing the
connection. In this case the delegate would report changes of state
after the terminal shutdown state.

Modifications:

- State changes are now thread safe.
- Record when a user initiates shutdown, ignore state changes beyond
  this point and close any channels created after this point.

Result:

More correct channel reconnectivity.

* Fix a couple of code smells and a typo
George Barnett 6 lat temu
rodzic
commit
7c304edb9b
2 zmienionych plików z 199 dodań i 135 usunięć
  1. 141 125
      Sources/GRPC/ClientConnection.swift
  2. 58 10
      Sources/GRPC/ConnectivityState.swift

+ 141 - 125
Sources/GRPC/ClientConnection.swift

@@ -54,71 +54,45 @@ import NIOTLS
 ///
 /// See `BaseClientCall` for a description of the remainder of the client pipeline.
 public class ClientConnection {
-  /// The configuration this connection was created using.
-  internal let configuration: ClientConnection.Configuration
-
   /// The channel which will handle gRPC calls.
-  internal var channel: EventLoopFuture<Channel>
+  internal var channel: EventLoopFuture<Channel> {
+    willSet {
+      self.willSetChannel(to: newValue)
+    }
+    didSet {
+      self.didSetChannel(to: self.channel)
+    }
+  }
 
   /// HTTP multiplexer from the `channel` handling gRPC calls.
   internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
 
+  /// The configuration for this client.
+  internal let configuration: Configuration
+
   /// A monitor for the connectivity state.
   public let connectivity: ConnectivityStateMonitor
 
   /// Creates a new connection from the given configuration.
-  public init(configuration: ClientConnection.Configuration) {
-    let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
-    let channel = ClientConnection.makeChannel(
-      configuration: configuration,
-      connectivityMonitor: monitor
-    )
-
-    self.channel = channel
-    self.multiplexer = channel.flatMap {
-      $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
-    }
-    self.connectivity = monitor
+  public init(configuration: Configuration) {
     self.configuration = configuration
+    self.connectivity = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
+
+    // We need to initialize `multiplexer` before we can call `willSetChannel` (which will then
+    // assign `multiplexer` to one from the created `Channel`s pipeline).
+    let eventLoop = configuration.eventLoopGroup.next()
+    let unavailable = GRPCStatus(code: .unavailable, message: nil)
+    self.multiplexer = eventLoop.makeFailedFuture(unavailable)
+
+    self.channel = ClientConnection.makeChannel(
+      configuration: self.configuration,
+      connectivity: self.connectivity,
+      backoffIterator: self.configuration.connectionBackoff?.makeIterator()
+    )
 
-    self.channel.whenSuccess { _ in
-      self.connectivity.state = .ready
-    }
-    self.replaceChannelAndMultiplexerOnClose(channel: channel)
-  }
-
-  /// Registers a callback on the `closeFuture` of the given channel to replace this class's
-  /// channel and multiplexer.
-  private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
-    channel.always { result in
-      // If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
-      if case .failure = result {
-        self.connectivity.state = .shutdown
-      }
-    }.flatMap {
-      $0.closeFuture
-    }.whenComplete { _ in
-      // `.shutdown` is terminal so don't attempt a reconnection.
-      guard self.connectivity.state != .shutdown else {
-        return
-      }
-
-      let newChannel = ClientConnection.makeChannel(
-        configuration: self.configuration,
-        connectivityMonitor: self.connectivity
-      )
-
-      self.channel = newChannel
-      self.multiplexer = newChannel.flatMap {
-        $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
-      }
-
-      // Change the state if the connection was successful.
-      newChannel.whenSuccess { _ in
-        self.connectivity.state = .ready
-      }
-      self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
-    }
+    // `willSet` and `didSet` are called on initialization, so call them explicitly now.
+    self.willSetChannel(to: channel)
+    self.didSetChannel(to: channel)
   }
 
   /// The `EventLoop` this connection is using.
@@ -132,97 +106,131 @@ public class ClientConnection {
       // We're already shutdown or in the process of shutting down.
       return channel.flatMap { $0.closeFuture }
     } else {
-      self.connectivity.state = .shutdown
+      self.connectivity.initiateUserShutdown()
       return channel.flatMap { $0.close() }
     }
   }
 }
 
+// MARK: - Channel creation
+
 extension ClientConnection {
-  /// Creates a `Channel` using the given configuration.
+  /// Register a callback on the close future of the given `channel` to replace the channel (if
+  /// possible) and also replace the `multiplexer` with that from the new channel.
   ///
-  /// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
-  /// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
-  /// timeout and schedule a retry attempt (should the connection fail) if a
-  /// `ConnectionBackoff.Iterator` is provided.
+  /// - Parameter channel: The channel that will be set.
+  private func willSetChannel(to channel: EventLoopFuture<Channel>) {
+    // If we're about to set the channel and the user has initiated a shutdown (i.e. while the new
+    // channel was being created) then it is no longer needed.
+    guard !self.connectivity.userHasInitiatedShutdown else {
+      channel.whenSuccess { channel in
+        channel.close(mode: .all, promise: nil)
+      }
+      return
+    }
+
+    channel.flatMap { $0.closeFuture }.whenComplete { _ in
+      guard self.connectivity.canAttemptReconnect else { return }
+      self.channel = ClientConnection.makeChannel(
+        configuration: self.configuration,
+        connectivity: self.connectivity,
+        backoffIterator: self.configuration.connectionBackoff?.makeIterator()
+      )
+    }
+
+    self.multiplexer = channel.flatMap {
+      $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
+    }
+  }
+
+  /// Register a callback on the given `channel` to update the connectivity state.
+  ///
+  /// - Parameter channel: The channel that was set.
+  private func didSetChannel(to channel: EventLoopFuture<Channel>) {
+    channel.whenComplete { result in
+      switch result {
+      case .success:
+        self.connectivity.state = .ready
+
+      case .failure:
+        self.connectivity.state = .shutdown
+      }
+    }
+  }
+
+  /// Attempts to create a new `Channel` using the given configuration.
   ///
-  /// See the individual functions for more information:
-  ///  - `makeBootstrap(configuration:)`, and
-  ///  - `verifyTLS(channel:)`.
+  /// This involves: creating a `ClientBootstrapProtocol`, connecting to a target and verifying that
+  /// the TLS handshake was successful (if TLS was configured). We _may_ additiionally set a
+  /// connection timeout and schedule a retry attempt (should the connection fail) if a
+  /// `ConnectionBackoffIterator` is provided.
   ///
   /// - Parameter configuration: The configuration to start the connection with.
-  /// - Parameter connectivityMonitor: A connectivity state monitor.
+  /// - Parameter connectivity: A connectivity state monitor.
   /// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
   ///     connection timeouts and backoff to use when attempting to create a connection.
   private class func makeChannel(
-    configuration: ClientConnection.Configuration,
-    connectivityMonitor: ConnectivityStateMonitor,
-    backoffIterator: ConnectionBackoff.Iterator?
+    configuration: Configuration,
+    connectivity: ConnectivityStateMonitor,
+    backoffIterator: ConnectionBackoffIterator?
   ) -> EventLoopFuture<Channel> {
-    // We could have been shutdown by the user, avoid a connection attempt if this is the case.
-    guard connectivityMonitor.state != .shutdown else {
-      return configuration.eventLoopGroup.next().makeFailedFuture(GRPCStatus.processingError)
-    }
-
-    connectivityMonitor.state = .connecting
+    connectivity.state = .connecting
     let timeoutAndBackoff = backoffIterator?.next()
-    var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)
 
-    // Set a timeout, if we have one.
-    if let timeout = timeoutAndBackoff?.timeout {
-      bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
-    }
+    let bootstrap = self.makeBootstrap(
+      configuration: configuration,
+      group: configuration.eventLoopGroup,
+      timeout: timeoutAndBackoff?.timeout
+    )
 
     let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
       if configuration.tlsConfiguration != nil {
-        return ClientConnection.verifyTLS(channel: channel).map { channel }
+        return channel.verifyTLS().map { channel }
       } else {
         return channel.eventLoop.makeSucceededFuture(channel)
       }
     }
 
-    channel.whenFailure { _ in
-      // We could have been shutdown by the user whilst we were connecting. If we were then avoid
-      // the this extra state transition.
-      if connectivityMonitor.state != .shutdown {
-        // We might try again in a moment.
-        connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
-      }
-    }
-
+    // If we don't have backoff then we can't retry, just return the `channel` no matter what
+    // state we are in.
     guard let backoff = timeoutAndBackoff?.backoff else {
       return channel
     }
 
-    // If we're in error then schedule our next attempt.
-    return channel.flatMapError { error in
-      // The `futureResult` of the scheduled task is of type
-      // `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
-      // remove a level of indirection.
-      return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
-        return makeChannel(
-          configuration: configuration,
-          connectivityMonitor: connectivityMonitor,
-          backoffIterator: backoffIterator
-        )
-      }.futureResult.flatMap { nextConnection in
-        return nextConnection
-      }
+    // If our connection attempt was unsuccessful, schedule another attempt in some time.
+    return channel.flatMapError { _ in
+      // We will try to connect again: the failure is transient.
+      connectivity.state = .transientFailure
+      return ClientConnection.scheduleReconnectAttempt(
+        in: backoff,
+        on: channel.eventLoop,
+        configuration: configuration,
+        connectivity: connectivity,
+        backoffIterator: backoffIterator
+      )
     }
   }
 
-  /// Creates a `Channel` using the given configuration amd state connectivity monitor.
-  ///
-  /// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
-  private class func makeChannel(
-    configuration: ClientConnection.Configuration,
-    connectivityMonitor: ConnectivityStateMonitor
+  /// Schedule an attempt to make a channel in `timeout` seconds on the given `eventLoop`.
+  private class func scheduleReconnectAttempt(
+    in timeout: TimeInterval,
+    on eventLoop: EventLoop,
+    configuration: Configuration,
+    connectivity: ConnectivityStateMonitor,
+    backoffIterator: ConnectionBackoffIterator?
   ) -> EventLoopFuture<Channel> {
-    return makeChannel(
-      configuration: configuration,
-      connectivityMonitor: connectivityMonitor,
-      backoffIterator: configuration.connectionBackoff?.makeIterator()
-    )
+    // The `futureResult` of the scheduled task is of type
+    // `EventLoopFuture<EventLoopFuture<Channel>>`, so we need to `flatMap` it to
+    // remove a level of indirection.
+    return eventLoop.scheduleTask(in: .seconds(timeInterval: timeout)) {
+      ClientConnection.makeChannel(
+        configuration: configuration,
+        connectivity: connectivity,
+        backoffIterator: backoffIterator
+      )
+    }.futureResult.flatMap { channel in
+      channel
+    }
   }
 
   /// Makes and configures a `ClientBootstrap` using the provided configuration.
@@ -231,8 +239,14 @@ extension ClientConnection {
   /// handlers detailed in the documentation for `ClientConnection`.
   ///
   /// - Parameter configuration: The configuration to prepare the bootstrap with.
-  private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
-    let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
+  /// - Parameter group: The `EventLoopGroup` to use for the bootstrap.
+  /// - Parameter timeout: The connection timeout in seconds. 
+  private class func makeBootstrap(
+    configuration: Configuration,
+    group: EventLoopGroup,
+    timeout: TimeInterval?
+  ) -> ClientBootstrapProtocol {
+    let bootstrap = GRPCNIO.makeClientBootstrap(group: group)
       // Enable SO_REUSEADDR and TCP_NODELAY.
       .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
       .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
@@ -247,17 +261,12 @@ extension ClientConnection {
           let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
           return channel.pipeline.addHandler(errorHandler)
         }
-      }
-
-    return bootstrap
-  }
+    }
 
-  /// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
-  ///
-  /// - Parameter channel: The channel to verify successful TLS setup on.
-  private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
-    return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
-      $0.verification
+    if let timeout = timeout {
+      return bootstrap.connectTimeout(.seconds(timeInterval: timeout))
+    } else {
+      return bootstrap
     }
   }
 }
@@ -393,6 +402,13 @@ fileprivate extension Channel {
       return self.eventLoop.makeFailedFuture(error)
     }
   }
+
+  /// Returns the `verification` future from the `TLSVerificationHandler` in this channels pipeline.
+  func verifyTLS() -> EventLoopFuture<Void> {
+    return self.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
+      $0.verification
+    }
+  }
 }
 
 fileprivate extension TimeAmount {

+ 58 - 10
Sources/GRPC/ConnectivityState.swift

@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 import Foundation
+import NIOConcurrencyHelpers
 
 /// The connectivity state of a client connection. Note that this is heavily lifted from the gRPC
 /// documentation: https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
@@ -53,25 +54,72 @@ public protocol ConnectivityStateDelegate: class {
 }
 
 public class ConnectivityStateMonitor {
-  public typealias Callback = () -> Void
-
   /// A delegate to call when the connectivity state changes.
   public var delegate: ConnectivityStateDelegate?
 
+  private let lock = Lock()
+  private var _state: ConnectivityState = .idle
+  private var _userInitiatedShutdown = false
+
+  /// Creates a new connectivity state monitor.
+  ///
+  /// - Parameter delegate: A delegate to call when the connectivity state changes.
+  public init(delegate: ConnectivityStateDelegate?) {
+    self.delegate = delegate
+  }
+
   /// The current state of connectivity.
   public internal(set) var state: ConnectivityState {
-    didSet {
-      if oldValue != self.state {
-        self.delegate?.connectivityStateDidChange(from: oldValue, to: self.state)
+    get {
+      return self.lock.withLock {
+        self._state
+      }
+    }
+    set {
+      self.lock.withLockVoid {
+        self.setNewState(to: newValue)
       }
     }
   }
 
-  /// Creates a new connectivity state monitor.
+  /// Updates `_state` to `newValue`.
   ///
-  /// - Parameter delegate: A delegate to call when the connectivity state changes.
-  public init(delegate: ConnectivityStateDelegate?) {
-    self.delegate = delegate
-    self.state = .idle
+  /// If the user has initiated shutdown then state updates are _ignored_. This may happen if the
+  /// connection is being estabilshed as the user initiates shutdown.
+  ///
+  /// - Important: This is **not** thread safe.
+  private func setNewState(to newValue: ConnectivityState) {
+    if self._userInitiatedShutdown {
+      return
+    }
+
+    let oldValue = self._state
+    if oldValue != newValue {
+      self._state = newValue
+      self.delegate?.connectivityStateDidChange(from: oldValue, to: newValue)
+    }
+  }
+
+  /// Initiates a user shutdown.
+  func initiateUserShutdown() {
+    self.lock.withLockVoid {
+      self.setNewState(to: .shutdown)
+      self._userInitiatedShutdown = true
+    }
+  }
+
+  /// Whether the user has initiated a shutdown or not.
+  var userHasInitiatedShutdown: Bool {
+    return self.lock.withLock {
+      return self._userInitiatedShutdown
+    }
+  }
+
+  /// Whether we can attempt a reconnection, that is the user has not initiated a shutdown and we
+  /// are in the `.ready` state.
+  var canAttemptReconnect: Bool {
+    return self.lock.withLock {
+      return !self._userInitiatedShutdown && self._state == .ready
+    }
   }
 }