|
@@ -307,8 +307,20 @@ extension GRPCChannel {
|
|
|
return .stopTrying(error)
|
|
return .stopTrying(error)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- case .failRPC:
|
|
|
|
|
- return .stopTrying(RPCError(code: .unavailable, message: "Channel isn't ready."))
|
|
|
|
|
|
|
+ case .failRPC(let reason):
|
|
|
|
|
+ let message: String
|
|
|
|
|
+ let cause: RPCError?
|
|
|
|
|
+
|
|
|
|
|
+ switch reason {
|
|
|
|
|
+ case .transientFailure(let error):
|
|
|
|
|
+ message = "Channel is in a transient failure state and 'wait for ready' isn't enabled."
|
|
|
|
|
+ cause = error
|
|
|
|
|
+ case .shutdown:
|
|
|
|
|
+ message = "Channel is shutting down."
|
|
|
|
|
+ cause = nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return .stopTrying(RPCError(code: .unavailable, message: message, cause: cause))
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -318,7 +330,9 @@ extension GRPCChannel {
|
|
|
loadBalancer: LoadBalancer
|
|
loadBalancer: LoadBalancer
|
|
|
) async -> MakeStreamResult {
|
|
) async -> MakeStreamResult {
|
|
|
guard let subchannel = loadBalancer.pickSubchannel() else {
|
|
guard let subchannel = loadBalancer.pickSubchannel() else {
|
|
|
- return .tryAgain(RPCError(code: .unavailable, message: "Channel isn't ready."))
|
|
|
|
|
|
|
+ return .tryAgain(
|
|
|
|
|
+ RPCError(code: .unavailable, message: "Channel isn't ready, no subchannel available.")
|
|
|
|
|
+ )
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
let methodConfig = self.config(forMethod: descriptor)
|
|
let methodConfig = self.config(forMethod: descriptor)
|
|
@@ -804,7 +818,12 @@ extension GRPCChannel.StateMachine {
|
|
|
let continuations = state.queue.removeFastFailingEntries()
|
|
let continuations = state.queue.removeFastFailingEntries()
|
|
|
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
|
|
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
|
|
|
continuations: continuations,
|
|
continuations: continuations,
|
|
|
- result: .failure(RPCError(code: .unavailable, message: "Channel isn't ready."))
|
|
|
|
|
|
|
+ result: .failure(
|
|
|
|
|
+ RPCError(
|
|
|
|
|
+ code: .unavailable,
|
|
|
|
|
+ message: "Subchannel shutdown and 'wait for ready' isn't enabled."
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
case .idle, .connecting:
|
|
case .idle, .connecting:
|
|
@@ -882,7 +901,12 @@ extension GRPCChannel.StateMachine {
|
|
|
/// Join the queue and wait until a load-balancer becomes ready.
|
|
/// Join the queue and wait until a load-balancer becomes ready.
|
|
|
case joinQueue
|
|
case joinQueue
|
|
|
/// Fail the stream request, the channel isn't in a suitable state.
|
|
/// Fail the stream request, the channel isn't in a suitable state.
|
|
|
- case failRPC
|
|
|
|
|
|
|
+ case failRPC(FailureReason)
|
|
|
|
|
+
|
|
|
|
|
+ enum FailureReason {
|
|
|
|
|
+ case transientFailure(RPCError)
|
|
|
|
|
+ case shutdown
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func makeStream(waitForReady: Bool) -> OnMakeStream {
|
|
func makeStream(waitForReady: Bool) -> OnMakeStream {
|
|
@@ -898,14 +922,14 @@ extension GRPCChannel.StateMachine {
|
|
|
onMakeStream = .joinQueue
|
|
onMakeStream = .joinQueue
|
|
|
case .ready:
|
|
case .ready:
|
|
|
onMakeStream = .useLoadBalancer(state.current)
|
|
onMakeStream = .useLoadBalancer(state.current)
|
|
|
- case .transientFailure:
|
|
|
|
|
- onMakeStream = waitForReady ? .joinQueue : .failRPC
|
|
|
|
|
|
|
+ case .transientFailure(let error):
|
|
|
|
|
+ onMakeStream = waitForReady ? .joinQueue : .failRPC(.transientFailure(error))
|
|
|
case .shutdown:
|
|
case .shutdown:
|
|
|
- onMakeStream = .failRPC
|
|
|
|
|
|
|
+ onMakeStream = .failRPC(.shutdown)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
case .stopping, .stopped:
|
|
case .stopping, .stopped:
|
|
|
- onMakeStream = .failRPC
|
|
|
|
|
|
|
+ onMakeStream = .failRPC(.shutdown)
|
|
|
|
|
|
|
|
case ._modifying:
|
|
case ._modifying:
|
|
|
fatalError("Invalid state")
|
|
fatalError("Invalid state")
|