|
|
@@ -140,6 +140,23 @@ internal class ConnectionManager {
|
|
|
|
|
|
/// We never want another `Channel`: this state is terminal.
|
|
|
case shutdown(ShutdownState)
|
|
|
+
|
|
|
+ fileprivate var label: String {
|
|
|
+ switch self {
|
|
|
+ case .idle:
|
|
|
+ return "idle"
|
|
|
+ case .connecting:
|
|
|
+ return "connecting"
|
|
|
+ case .active:
|
|
|
+ return "active"
|
|
|
+ case .ready:
|
|
|
+ return "ready"
|
|
|
+ case .transientFailure:
|
|
|
+ return "transientFailure"
|
|
|
+ case .shutdown:
|
|
|
+ return "shutdown"
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private var state: State {
|
|
|
@@ -240,6 +257,8 @@ internal class ConnectionManager {
|
|
|
/// Returns a future for a connected channel.
|
|
|
internal func getChannel() -> EventLoopFuture<Channel> {
|
|
|
return self.eventLoop.flatSubmit {
|
|
|
+ let channel: EventLoopFuture<Channel>
|
|
|
+
|
|
|
switch self.state {
|
|
|
case .idle:
|
|
|
self.startConnecting()
|
|
|
@@ -247,23 +266,29 @@ internal class ConnectionManager {
|
|
|
guard case let .connecting(connecting) = self.state else {
|
|
|
self.invalidState()
|
|
|
}
|
|
|
- return connecting.readyChannelPromise.futureResult
|
|
|
+ channel = connecting.readyChannelPromise.futureResult
|
|
|
|
|
|
case let .connecting(state):
|
|
|
- return state.readyChannelPromise.futureResult
|
|
|
+ channel = state.readyChannelPromise.futureResult
|
|
|
|
|
|
case let .active(state):
|
|
|
- return state.readyChannelPromise.futureResult
|
|
|
+ channel = state.readyChannelPromise.futureResult
|
|
|
|
|
|
case let .ready(state):
|
|
|
- return state.channel.eventLoop.makeSucceededFuture(state.channel)
|
|
|
+ channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
|
|
|
|
|
|
case let .transientFailure(state):
|
|
|
- return state.readyChannelPromise.futureResult
|
|
|
+ channel = state.readyChannelPromise.futureResult
|
|
|
|
|
|
case .shutdown:
|
|
|
- return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
|
|
|
+ channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
|
|
|
}
|
|
|
+
|
|
|
+ self.logger.debug("vending channel future", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
+
|
|
|
+ return channel
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -273,6 +298,8 @@ internal class ConnectionManager {
|
|
|
/// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
|
|
|
internal func getOptimisticChannel() -> EventLoopFuture<Channel> {
|
|
|
return self.eventLoop.flatSubmit {
|
|
|
+ let channel: EventLoopFuture<Channel>
|
|
|
+
|
|
|
switch self.state {
|
|
|
case .idle:
|
|
|
self.startConnecting()
|
|
|
@@ -280,29 +307,38 @@ internal class ConnectionManager {
|
|
|
guard case let .connecting(connecting) = self.state else {
|
|
|
self.invalidState()
|
|
|
}
|
|
|
- return connecting.candidate
|
|
|
+ channel = connecting.candidate
|
|
|
|
|
|
case let .connecting(state):
|
|
|
- return state.candidate
|
|
|
+ channel = state.candidate
|
|
|
|
|
|
case let .active(state):
|
|
|
- return state.candidate.eventLoop.makeSucceededFuture(state.candidate)
|
|
|
+ channel = state.candidate.eventLoop.makeSucceededFuture(state.candidate)
|
|
|
|
|
|
case let .ready(state):
|
|
|
- return state.channel.eventLoop.makeSucceededFuture(state.channel)
|
|
|
+ channel = state.channel.eventLoop.makeSucceededFuture(state.channel)
|
|
|
|
|
|
case .transientFailure:
|
|
|
- return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
|
|
|
+ channel = self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
|
|
|
|
|
|
case .shutdown:
|
|
|
- return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
|
|
|
+ channel = self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
|
|
|
}
|
|
|
+
|
|
|
+ self.logger.debug("vending fast-failing channel future", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
+
|
|
|
+ return channel
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// Shutdown any connection which exists. This is a request from the application.
|
|
|
internal func shutdown() -> EventLoopFuture<Void> {
|
|
|
return self.eventLoop.flatSubmit {
|
|
|
+ self.logger.debug("shutting down connection", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
let shutdown: ShutdownState
|
|
|
|
|
|
switch self.state {
|
|
|
@@ -375,6 +411,9 @@ internal class ConnectionManager {
|
|
|
/// The connecting channel became `active`. Must be called on the `EventLoop`.
|
|
|
internal func channelActive(channel: Channel) {
|
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
+ self.logger.debug("activating connection", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
|
|
|
switch self.state {
|
|
|
case let .connecting(connecting):
|
|
|
@@ -393,6 +432,9 @@ internal class ConnectionManager {
|
|
|
/// Must be called on the `EventLoop`.
|
|
|
internal func channelInactive() {
|
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
+ self.logger.debug("deactivating connection", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
|
|
|
switch self.state {
|
|
|
// The channel is `active` but not `ready`. Should we try again?
|
|
|
@@ -400,6 +442,7 @@ internal class ConnectionManager {
|
|
|
switch active.reconnect {
|
|
|
// No, shutdown instead.
|
|
|
case .none:
|
|
|
+ self.logger.debug("shutting down connection")
|
|
|
self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
|
|
|
active.readyChannelPromise.fail(GRPCStatus(code: .unavailable, message: nil))
|
|
|
|
|
|
@@ -408,6 +451,7 @@ internal class ConnectionManager {
|
|
|
let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
|
|
|
self.startConnecting()
|
|
|
}
|
|
|
+ self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
|
|
|
self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
|
|
|
}
|
|
|
|
|
|
@@ -416,12 +460,14 @@ internal class ConnectionManager {
|
|
|
case let .ready(ready):
|
|
|
// No, no backoff is configured.
|
|
|
if ready.configuration.connectionBackoff == nil {
|
|
|
+ self.logger.debug("shutting down connection, no reconnect configured/remaining")
|
|
|
self.state = .shutdown(ShutdownState(closeFuture: ready.channel.closeFuture))
|
|
|
} else {
|
|
|
// Yes, start connecting now. We should go via `transientFailure`, however.
|
|
|
let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
|
|
|
self.startConnecting()
|
|
|
}
|
|
|
+ self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
|
|
|
self.state = .transientFailure(TransientFailureState(from: ready, scheduled: scheduled))
|
|
|
}
|
|
|
|
|
|
@@ -442,6 +488,9 @@ internal class ConnectionManager {
|
|
|
/// called on the `EventLoop`.
|
|
|
internal func ready() {
|
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
+ self.logger.debug("connection ready", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
|
|
|
switch self.state {
|
|
|
case let .active(connected):
|
|
|
@@ -460,6 +509,9 @@ internal class ConnectionManager {
|
|
|
/// the `EventLoop`.
|
|
|
internal func idle() {
|
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
+ self.logger.debug("idling connection", metadata: [
|
|
|
+ "connectivity_state": "\(self.state.label)",
|
|
|
+ ])
|
|
|
|
|
|
switch self.state {
|
|
|
case let .ready(state):
|
|
|
@@ -482,11 +534,13 @@ extension ConnectionManager {
|
|
|
switch connecting.reconnect {
|
|
|
// No, shutdown.
|
|
|
case .none:
|
|
|
+ self.logger.debug("shutting down connection, no reconnect configured/remaining")
|
|
|
connecting.readyChannelPromise.fail(error)
|
|
|
self.state = .shutdown(ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(())))
|
|
|
|
|
|
// Yes, after a delay.
|
|
|
case let .after(delay):
|
|
|
+ self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
|
|
|
let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
|
|
|
self.startConnecting()
|
|
|
}
|