Forráskód Böngészése

Don't trap on invalid connection state transitions (#1573)

Motivation:

The connection manager is quite aggresive about trapping if an invalid
state is hit. It's alsmost impossible to know when states are truly
unreachable so in most coses we should handle them as best as we can. If
we believe them to be unreachable but cannot easily prove it then we
should crash in debug mode and handle it as best as possible in release
mode.

Modifications:

- Handle various state transitions more gently in the connection
  manager.

Result:

Gentler state handling.
George Barnett 2 éve
szülő
commit
770629b044

+ 80 - 70
Sources/GRPC/ConnectionManager.swift

@@ -77,11 +77,14 @@ internal final class ConnectionManager {
     var scheduled: Scheduled<Void>
     var reason: Error
 
-    init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
+    init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
       self.backoffIterator = state.backoffIterator
       self.readyChannelMuxPromise = state.readyChannelMuxPromise
       self.scheduled = scheduled
-      self.reason = reason
+      self.reason = reason ?? GRPCStatus(
+        code: .unavailable,
+        message: "Unexpected connection drop"
+      )
     }
 
     init(from state: ConnectedState, scheduled: Scheduled<Void>) {
@@ -391,7 +394,7 @@ internal final class ConnectionManager {
       self.startConnecting()
       // We started connecting so we must transition to the `connecting` state.
       guard case let .connecting(connecting) = self.state else {
-        self.invalidState()
+        self.unreachableState()
       }
       multiplexer = connecting.readyChannelMuxPromise.futureResult
 
@@ -432,7 +435,7 @@ internal final class ConnectionManager {
         self.startConnecting()
         // We started connecting so we must transition to the `connecting` state.
         guard case let .connecting(connecting) = self.state else {
-          self.invalidState()
+          self.unreachableState()
         }
         return connecting.candidateMuxPromise.futureResult
       case let .connecting(state):
@@ -674,20 +677,13 @@ internal final class ConnectionManager {
     case .shutdown:
       channel.close(mode: .all, promise: nil)
 
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
-    case .idle:
-      self.invalidState()
-
-    case .active:
-      self.invalidState()
-
-    case .ready:
-      self.invalidState()
-
-    case .transientFailure:
-      self.invalidState()
+    case .idle, .transientFailure:
+      // Received a channelActive when not connecting. Can happen if channelActive and
+      // channelInactive are reordered. Ignore.
+      ()
+    case .active, .ready:
+      // Received a second 'channelActive', already active so ignore.
+      ()
     }
   }
 
@@ -700,6 +696,43 @@ internal final class ConnectionManager {
     ])
 
     switch self.state {
+    // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
+    // common but we should tolerate it.
+    case let .connecting(connecting):
+      // Should we try connecting again?
+      switch connecting.reconnect {
+      // No, shutdown instead.
+      case .none:
+        self.logger.debug("shutting down connection")
+
+        let error = GRPCStatus(
+          code: .unavailable,
+          message: "The connection was dropped and connection re-establishment is disabled"
+        )
+
+        let shutdownState = ShutdownState(
+          closeFuture: self.eventLoop.makeSucceededFuture(()),
+          reason: error
+        )
+
+        self.state = .shutdown(shutdownState)
+        // Shutting down, so fail the outstanding promises.
+        connecting.readyChannelMuxPromise.fail(error)
+        connecting.candidateMuxPromise.fail(error)
+
+      // Yes, after some time.
+      case let .after(delay):
+        let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
+        // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
+        connecting.candidateMuxPromise.fail(error)
+
+        let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
+          self.startConnecting()
+        }
+        self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
+        self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
+      }
+
     // The channel is `active` but not `ready`. Should we try again?
     case let .active(active):
       switch active.reconnect {
@@ -766,14 +799,9 @@ internal final class ConnectionManager {
     case .shutdown:
       ()
 
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
-    case .connecting:
-      self.invalidState()
-
+    // Received 'channelInactive' twice; fine, ignore.
     case .transientFailure:
-      self.invalidState()
+      ()
     }
   }
 
@@ -793,20 +821,20 @@ internal final class ConnectionManager {
     case .shutdown:
       ()
 
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
-    case .idle:
-      self.invalidState()
-
-    case .transientFailure:
-      self.invalidState()
+    case .idle, .transientFailure:
+      // No connection or connection attempt exists but connection was marked as ready. This is
+      // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
+      // error to.
+      assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
 
     case .connecting:
-      self.invalidState()
+      // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
+      // mode.
+      assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
 
     case .ready:
-      self.invalidState()
+      // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
+      assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
     }
   }
 
@@ -834,17 +862,14 @@ internal final class ConnectionManager {
       // 'channelInactive()'.
       ()
 
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
-    case .idle:
-      self.invalidState()
+    case .idle, .transientFailure:
+      // There's no connection to idle; ignore.
+      ()
 
     case .connecting:
-      self.invalidState()
-
-    case .transientFailure:
-      self.invalidState()
+      // The idle watchdog is started when the connection is active, this shouldn't happen
+      // in the connecting state. Ignore it in release mode.
+      assertionFailure("tried to idle a connection in the \(self.state.label) state")
     }
   }
 
@@ -908,22 +933,10 @@ extension ConnectionManager {
     case .shutdown:
       ()
 
-    // We can't fail to connect if we aren't trying.
-    //
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
-    case .idle:
-      self.invalidState()
-
-    case .active:
-      self.invalidState()
-
-    case .ready:
-      self.invalidState()
-
-    case .transientFailure:
-      self.invalidState()
+    // Connection attempt failed, but no connection attempt is in progress.
+    case .idle, .active, .ready, .transientFailure:
+      // Nothing we can do other than ignore in release mode.
+      assertionFailure("connect promise failed in \(self.state.label) state")
     }
   }
 }
@@ -951,17 +964,14 @@ extension ConnectionManager {
     case .shutdown:
       ()
 
-    // These cases are purposefully separated: some crash reporting services provide stack traces
-    // which don't include the precondition failure message (which contain the invalid state we were
-    // in). Keeping the cases separate allows us work out the state from the line number.
+    // We only call startConnecting() if the connection does not exist and after checking what the
+    // current state is, so none of these states should be reachable.
     case .connecting:
-      self.invalidState()
-
+      self.unreachableState()
     case .active:
-      self.invalidState()
-
+      self.unreachableState()
     case .ready:
-      self.invalidState()
+      self.unreachableState()
     }
   }
 
@@ -1066,11 +1076,11 @@ extension ConnectionManager {
 }
 
 extension ConnectionManager {
-  private func invalidState(
+  private func unreachableState(
     function: StaticString = #function,
     file: StaticString = #fileID,
     line: UInt = #line
   ) -> Never {
-    preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
+    fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
   }
 }

+ 113 - 0
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -239,6 +239,119 @@ extension ConnectionManagerTests {
     }
   }
 
+  func testChannelInactiveBeforeActiveWithNoReconnect() throws {
+    let channel = EmbeddedChannel(loop: self.loop)
+    let channelPromise = self.loop.makePromise(of: Channel.self)
+
+    let manager = self.makeConnectionManager { _, _ in
+      return channelPromise.futureResult
+    }
+
+    // Start the connection.
+    self.waitForStateChange(from: .idle, to: .connecting) {
+      // Triggers the connect.
+      _ = manager.getHTTP2Multiplexer()
+      self.loop.run()
+    }
+
+    try channel.pipeline.syncOperations.addHandler(
+      GRPCIdleHandler(
+        connectionManager: manager,
+        multiplexer: HTTP2StreamMultiplexer(
+          mode: .client,
+          channel: channel,
+          inboundStreamInitializer: nil
+        ),
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
+      )
+    )
+    channelPromise.succeed(channel)
+    // Oops: wrong way around. We should tolerate this.
+    self.waitForStateChange(from: .connecting, to: .shutdown) {
+      channel.pipeline.fireChannelInactive()
+    }
+
+    // Should be ignored.
+    channel.pipeline.fireChannelActive()
+  }
+
+  func testChannelInactiveBeforeActiveWillReconnect() throws {
+    var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
+    var channelPromises: [EventLoopPromise<Channel>] = [self.loop.makePromise(),
+                                                        self.loop.makePromise()]
+    var channelFutures = Array(channelPromises.map { $0.futureResult })
+
+    var configuration = self.defaultConfiguration
+    configuration.connectionBackoff = .oneSecondFixed
+
+    let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
+      return channelFutures.removeLast()
+    }
+
+    // Start the connection.
+    self.waitForStateChange(from: .idle, to: .connecting) {
+      // Triggers the connect.
+      _ = manager.getHTTP2Multiplexer()
+      self.loop.run()
+    }
+
+    // Setup the channel.
+    let channel1 = channels.removeLast()
+    let channel1Promise = channelPromises.removeLast()
+
+    try channel1.pipeline.syncOperations.addHandler(
+      GRPCIdleHandler(
+        connectionManager: manager,
+        multiplexer: HTTP2StreamMultiplexer(
+          mode: .client,
+          channel: channel1,
+          inboundStreamInitializer: nil
+        ),
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
+      )
+    )
+    channel1Promise.succeed(channel1)
+    // Oops: wrong way around. We should tolerate this.
+    self.waitForStateChange(from: .connecting, to: .transientFailure) {
+      channel1.pipeline.fireChannelInactive()
+    }
+
+    channel1.pipeline.fireChannelActive()
+
+    // Start the next attempt.
+    self.waitForStateChange(from: .transientFailure, to: .connecting) {
+      self.loop.advanceTime(by: .seconds(1))
+    }
+
+    let channel2 = channels.removeLast()
+    let channel2Promise = channelPromises.removeLast()
+    try channel2.pipeline.syncOperations.addHandler(
+      GRPCIdleHandler(
+        connectionManager: manager,
+        multiplexer: HTTP2StreamMultiplexer(
+          mode: .client,
+          channel: channel1,
+          inboundStreamInitializer: nil
+        ),
+        idleTimeout: .minutes(5),
+        keepalive: .init(),
+        logger: self.logger
+      )
+    )
+
+    channel2Promise.succeed(channel2)
+
+    try self.waitForStateChange(from: .connecting, to: .ready) {
+      channel2.pipeline.fireChannelActive()
+      let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
+      XCTAssertNoThrow(try channel2.writeInbound(frame))
+    }
+  }
+
   func testIdleTimeoutWhenThereAreActiveStreams() throws {
     let channelPromise = self.loop.makePromise(of: Channel.self)
     let manager = self.makeConnectionManager { _, _ in