Просмотр исходного кода

Provide more specific errors messages for failing channels (#984)

Motivation:

If a connection attempt fails, the reason for the failure should be
propagated to the RPCs.

Modifications:

- Store errors in the active and ready states so that they can be used
  as contextual information when requesting a channel from the transient
  failure or shutdown states

Result:

- Connection level errors get propagated to RPCs made on that same
  Channel
George Barnett 5 лет назад
Родитель
Сommit
7aac780666

+ 25 - 1
Sources/GRPC/ClientCalls/ClientCallTransport.swift

@@ -138,7 +138,12 @@ internal class ChannelTransport<Request, Response> {
     // If the channel creation fails we need to error the call. Note that we receive an
     // 'activation' from the channel instead of relying on the success of the future.
     channelPromise.futureResult.whenFailure { error in
-      self.handleError(error, promise: nil)
+      if error is GRPCStatus || error is GRPCStatusTransformable {
+        self.handleError(error, promise: nil)
+      } else {
+        // Fallback to something which will mark the RPC as 'unavailable'.
+        self.handleError(ConnectionFailure(reason: error), promise: nil)
+      }
     }
 
     // Schedule the timeout.
@@ -696,3 +701,22 @@ extension _GRPCClientResponsePart {
     }
   }
 }
+
+// A wrapper for connection errors: we need to be able to preserve the underlying error as
+// well as extract a 'GRPCStatus' with code '.unavailable'.
+private struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
+  /// The reason the connection failed.
+  var reason: Error
+
+  init(reason: Error) {
+    self.reason = reason
+  }
+
+  var description: String {
+    return String(describing: self.reason)
+  }
+
+  func makeGRPCStatus() -> GRPCStatus {
+    return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
+  }
+}

+ 98 - 20
Sources/GRPC/ConnectionManager.swift

@@ -44,6 +44,7 @@ internal class ConnectionManager {
 
     var readyChannelPromise: EventLoopPromise<Channel>
     var candidate: Channel
+    var error: Error?
 
     init(from state: ConnectingState, candidate: Channel) {
       self.configuration = state.configuration
@@ -57,6 +58,7 @@ internal class ConnectionManager {
   internal struct ReadyState {
     var configuration: ClientConnection.Configuration
     var channel: Channel
+    var error: Error?
 
     init(from state: ConnectedState) {
       self.configuration = state.configuration
@@ -69,12 +71,14 @@ internal class ConnectionManager {
     var backoffIterator: ConnectionBackoffIterator?
     var readyChannelPromise: EventLoopPromise<Channel>
     var scheduled: Scheduled<Void>
+    var reason: Error?
 
-    init(from state: ConnectingState, scheduled: Scheduled<Void>) {
+    init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
       self.configuration = state.configuration
       self.backoffIterator = state.backoffIterator
       self.readyChannelPromise = state.readyChannelPromise
       self.scheduled = scheduled
+      self.reason = reason
     }
 
     init(from state: ConnectedState, scheduled: Scheduled<Void>) {
@@ -82,6 +86,7 @@ internal class ConnectionManager {
       self.backoffIterator = state.backoffIterator
       self.readyChannelPromise = state.readyChannelPromise
       self.scheduled = scheduled
+      self.reason = state.error
     }
 
     init(from state: ReadyState, scheduled: Scheduled<Void>) {
@@ -89,11 +94,27 @@ internal class ConnectionManager {
       self.backoffIterator = state.configuration.connectionBackoff?.makeIterator()
       self.readyChannelPromise = state.channel.eventLoop.makePromise()
       self.scheduled = scheduled
+      self.reason = state.error
     }
   }
 
   internal struct ShutdownState {
     var closeFuture: EventLoopFuture<Void>
+    /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
+    /// this error.
+    var reason: Error
+
+    init(closeFuture: EventLoopFuture<Void>, reason: Error) {
+      self.closeFuture = closeFuture
+      self.reason = reason
+    }
+
+    static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
+      return ShutdownState(
+        closeFuture: closeFuture,
+        reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
+      )
+    }
   }
 
   internal enum State {
@@ -287,8 +308,8 @@ internal class ConnectionManager {
       case let .transientFailure(state):
         channel = state.readyChannelPromise.futureResult
 
-      case .shutdown:
-        channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
+      case let .shutdown(state):
+        channel = self.eventLoop.makeFailedFuture(state.reason)
       }
 
       self.logger.debug("vending channel future", metadata: [
@@ -325,11 +346,16 @@ internal class ConnectionManager {
       case let .ready(state):
         channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
 
-      case .transientFailure:
-        channel = self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
+      case let .transientFailure(state):
+        // Provide the reason we failed transiently, if we can.
+        let error = state.reason ?? GRPCStatus(
+          code: .unavailable,
+          message: "Connection requested while backing off"
+        )
+        channel = self.eventLoop.makeFailedFuture(error)
 
-      case .shutdown:
-        channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
+      case let .shutdown(state):
+        channel = self.eventLoop.makeFailedFuture(state.reason)
       }
 
       self.logger.debug("vending fast-failing channel future", metadata: [
@@ -351,14 +377,14 @@ internal class ConnectionManager {
       switch self.state {
       // We don't have a channel and we don't want one, easy!
       case .idle:
-        shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
+        shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
         self.state = .shutdown(shutdown)
 
       // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
       // the shutdown future and deal with any fallout from the connecting channel without the
       // application knowing.
       case let .connecting(state):
-        shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
+        shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
         self.state = .shutdown(shutdown)
 
         // Fail the ready channel promise: we're shutting down so even if we manage to successfully
@@ -372,7 +398,7 @@ internal class ConnectionManager {
       // We have an active channel but the application doesn't know about it yet. We'll do the same
       // as for `.connecting`.
       case let .active(state):
-        shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
+        shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
         self.state = .shutdown(shutdown)
 
         // Fail the ready channel promise: we're shutting down so even if we manage to successfully
@@ -384,7 +410,7 @@ internal class ConnectionManager {
       // The channel is up and running: the application could be using it. We can close it and
       // return the `closeFuture`.
       case let .ready(state):
-        shutdown = ShutdownState(closeFuture: state.channel.closeFuture)
+        shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
         self.state = .shutdown(shutdown)
 
         // We have a channel, close it.
@@ -394,15 +420,16 @@ internal class ConnectionManager {
       // do the same but also cancel any scheduled connection attempts and deal with any fallout
       // if we cancelled too late.
       case let .transientFailure(state):
+        shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
+        self.state = .shutdown(shutdown)
+
         // Stop the creation of a new channel, if we can. If we can't then the task to
         // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
         state.scheduled.cancel()
-        shutdown = ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()))
-        self.state = .shutdown(shutdown)
 
         // Fail the ready channel promise: we're shutting down so even if we manage to successfully
         // connect the application shouldn't should have access to the channel.
-        state.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
+        state.readyChannelPromise.fail(shutdown.reason)
 
       // We're already shutdown; nothing to do.
       case let .shutdown(state):
@@ -415,6 +442,35 @@ internal class ConnectionManager {
 
   // MARK: - State changes from the channel handler.
 
+  /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
+  /// some context.
+  internal func channelError(_ error: Error) {
+    self.eventLoop.preconditionInEventLoop()
+
+    switch self.state {
+    // These cases are purposefully separated: some crash reporting services provide stack traces
+    // which don't include the precondition failure message (which contain the invalid state we were
+    // in). Keeping the cases separate allows us work out the state from the line number.
+    case .idle:
+      self.invalidState()
+
+    case .connecting:
+      self.invalidState()
+
+    case var .active(state):
+      state.error = error
+      self.state = .active(state)
+
+    case var .ready(state):
+      state.error = error
+      self.state = .ready(state)
+
+    // If we've already in one of these states, then additional errors aren't helpful to us.
+    case .transientFailure, .shutdown:
+      ()
+    }
+  }
+
   /// The connecting channel became `active`. Must be called on the `EventLoop`.
   internal func channelActive(channel: Channel) {
     self.eventLoop.preconditionInEventLoop()
@@ -462,8 +518,19 @@ internal class ConnectionManager {
       // No, shutdown instead.
       case .none:
         self.logger.debug("shutting down connection")
-        self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
-        active.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
+
+        let error = GRPCStatus(
+          code: .unavailable,
+          message: "The connection was dropped and connection re-establishment is disabled"
+        )
+
+        let shutdownState = ShutdownState(
+          closeFuture: self.eventLoop.makeSucceededFuture(()),
+          reason: error
+        )
+
+        self.state = .shutdown(shutdownState)
+        active.readyChannelPromise.fail(error)
 
       // Yes, after some time.
       case let .after(delay):
@@ -480,7 +547,15 @@ internal class ConnectionManager {
       // No, no backoff is configured.
       if ready.configuration.connectionBackoff == nil {
         self.logger.debug("shutting down connection, no reconnect configured/remaining")
-        self.state = .shutdown(ShutdownState(closeFuture: ready.channel.closeFuture))
+        self.state = .shutdown(
+          ShutdownState(
+            closeFuture: ready.channel.closeFuture,
+            reason: GRPCStatus(
+              code: .unavailable,
+              message: "The connection was dropped and a reconnect was not configured"
+            )
+          )
+        )
       } else {
         // Yes, start connecting now. We should go via `transientFailure`, however.
         let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
@@ -593,8 +668,10 @@ extension ConnectionManager {
       // No, shutdown.
       case .none:
         self.logger.debug("shutting down connection, no reconnect configured/remaining")
+        self.state = .shutdown(
+          ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: error)
+        )
         connecting.readyChannelPromise.fail(error)
-        self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
 
       // Yes, after a delay.
       case let .after(delay):
@@ -602,8 +679,9 @@ extension ConnectionManager {
         let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
           self.startConnecting()
         }
-        self
-          .state = .transientFailure(TransientFailureState(from: connecting, scheduled: scheduled))
+        self.state = .transientFailure(
+          TransientFailureState(from: connecting, scheduled: scheduled, reason: error)
+        )
       }
 
     // The application must have called shutdown while we were trying to establish a connection

+ 15 - 0
Sources/GRPC/GRPCIdleHandler.swift

@@ -116,6 +116,21 @@ internal class GRPCIdleHandler: ChannelInboundHandler {
     context.fireUserInboundEventTriggered(event)
   }
 
+  func errorCaught(context: ChannelHandlerContext, error: Error) {
+    switch (self.mode, self.state) {
+    case let (.client(manager), .notReady),
+         let (.client(manager), .ready):
+      // We're most likely about to become inactive: let the manager know the reason why.
+      manager.channelError(error)
+
+    case (.client, .closed),
+         (.server, _):
+      ()
+    }
+
+    context.fireErrorCaught(error)
+  }
+
   func channelActive(context: ChannelHandlerContext) {
     switch (self.mode, self.state) {
     // The client should become active: we'll only schedule the idling when the channel

+ 33 - 0
Tests/GRPCTests/ChannelTransportTests.swift

@@ -51,6 +51,24 @@ class ChannelTransportTests: GRPCTestCase {
     return transport
   }
 
+  private func makeTransport(
+    on eventLoop: EventLoop,
+    container: ResponsePartContainer<Response>,
+    deadline: NIODeadline = .distantFuture,
+    channelProvider: @escaping (ChannelTransport<Request, Response>, EventLoopPromise<Channel>)
+      -> Void
+  ) -> ChannelTransport<Request, Response> {
+    let transport = ChannelTransport<Request, Response>(
+      eventLoop: eventLoop,
+      responseContainer: container,
+      timeLimit: .deadline(deadline),
+      errorDelegate: nil,
+      logger: self.logger,
+      channelProvider: channelProvider
+    )
+    return transport
+  }
+
   private func makeRequestHead() -> _GRPCRequestHead {
     return _GRPCRequestHead(
       method: "POST",
@@ -357,6 +375,21 @@ class ChannelTransportTests: GRPCTestCase {
     XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
   }
 
+  func testChannelFutureError() throws {
+    let channel = EmbeddedChannel()
+    let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) {
+      XCTFail("No response expected but got: \($0)")
+    }
+
+    struct DoomedChannelError: Error {}
+    let transport = self.makeTransport(on: channel.eventLoop, container: container) { _, promise in
+      promise.fail(GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
+    }
+
+    let status = try transport.responseContainer.lazyStatusPromise.getFutureResult().wait()
+    XCTAssertEqual(status, GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
+  }
+
   // MARK: - Test Transport after Shutdown
 
   func testOutboundMethodsAfterShutdown() throws {

+ 63 - 0
Tests/GRPCTests/ConnectionFailingTests.swift

@@ -0,0 +1,63 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import EchoModel
+import GRPC
+import NIO
+import XCTest
+
+class ConnectionFailingTests: GRPCTestCase {
+  func testStartRPCWhenChannelIsInTransientFailure() throws {
+    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+    defer {
+      XCTAssertNoThrow(try group.syncShutdownGracefully())
+    }
+
+    let waiter = RecordingConnectivityDelegate()
+    let connection = ClientConnection.insecure(group: group)
+      // We want to make sure we sit in transient failure for a long time.
+      .withConnectionBackoff(initial: .hours(24))
+      .withCallStartBehavior(.fastFailure)
+      .withConnectivityStateDelegate(waiter)
+      .connect(host: "http://unreachable.invalid", port: 0)
+    defer {
+      XCTAssertNoThrow(try connection.close().wait())
+    }
+
+    let echo = Echo_EchoClient(channel: connection)
+
+    // Set our expectation.
+    waiter.expectChanges(2) { changes in
+      XCTAssertEqual(changes[0], Change(from: .idle, to: .connecting))
+      XCTAssertEqual(changes[1], Change(from: .connecting, to: .transientFailure))
+    }
+
+    // This will trigger a connection attempt and subsequently fail.
+    _ = echo.get(.with { $0.text = "cheddar" })
+
+    // Wait for the changes.
+    waiter.waitForExpectedChanges(timeout: .seconds(10))
+
+    // Okay, now let's try another RPC. It should fail immediately with the connection error.
+    let get = echo.get(.with { $0.text = "comté" })
+    XCTAssertThrowsError(try get.response.wait())
+    let status = try get.status.wait()
+    XCTAssertEqual(status.code, .unavailable)
+    // We can't say too much about the message here. It should contain details about the transient
+    // failure error.
+    XCTAssertNotNil(status.message)
+    XCTAssertTrue(status.message?.contains("unreachable.invalid") ?? false)
+  }
+}

+ 4 - 1
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import EchoModel
 @testable import GRPC
 import NIO
 import NIOHTTP2
@@ -793,7 +794,9 @@ extension ConnectionManagerTests {
     let optimisticChannel = manager.getOptimisticChannel()
     self.loop.run()
 
-    XCTAssertThrowsError(try optimisticChannel.wait())
+    XCTAssertThrowsError(try optimisticChannel.wait()) { error in
+      XCTAssertTrue(error is DoomedChannelError)
+    }
   }
 
   func testOptimisticChannelFromShutdown() throws {

+ 11 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -31,6 +31,7 @@ extension ChannelTransportTests {
         ("testBufferedWritesAreFailedOnClose", testBufferedWritesAreFailedOnClose),
         ("testChannelBecomesInactive", testChannelBecomesInactive),
         ("testChannelError", testChannelError),
+        ("testChannelFutureError", testChannelFutureError),
         ("testErrorsAreNotAlwaysStatus", testErrorsAreNotAlwaysStatus),
         ("testInboundMethodsAfterShutdown", testInboundMethodsAfterShutdown),
         ("testOutboundMethodsAfterShutdown", testOutboundMethodsAfterShutdown),
@@ -143,6 +144,15 @@ extension ConnectionBackoffTests {
     ]
 }
 
+extension ConnectionFailingTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__ConnectionFailingTests = [
+        ("testStartRPCWhenChannelIsInTransientFailure", testStartRPCWhenChannelIsInTransientFailure),
+    ]
+}
+
 extension ConnectionManagerTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -960,6 +970,7 @@ public func __allTests() -> [XCTestCaseEntry] {
         testCase(ClientThrowingWhenServerReturningErrorTests.__allTests__ClientThrowingWhenServerReturningErrorTests),
         testCase(ClientTimeoutTests.__allTests__ClientTimeoutTests),
         testCase(ConnectionBackoffTests.__allTests__ConnectionBackoffTests),
+        testCase(ConnectionFailingTests.__allTests__ConnectionFailingTests),
         testCase(ConnectionManagerTests.__allTests__ConnectionManagerTests),
         testCase(ConnectivityStateMonitorTests.__allTests__ConnectivityStateMonitorTests),
         testCase(DebugChannelInitializerTests.__allTests__DebugChannelInitializerTests),