| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Foundation
- import Logging
- import NIOConcurrencyHelpers
- import NIOCore
- import NIOHTTP2
- // Unchecked because mutable state is always accessed and modified on a particular event loop.
- // APIs which _may_ be called from different threads execute onto the correct event loop first.
- // APIs which _must_ be called from an exact event loop have preconditions checking that the correct
- // event loop is being used.
- @usableFromInline
- internal final class ConnectionManager: @unchecked Sendable {
- /// Whether the connection managed by this manager should be allowed to go idle and be closed, or
- /// if it should remain open indefinitely even when there are no active streams.
- internal enum IdleBehavior {
- case closeWhenIdleTimeout
- case neverGoIdle
- }
- internal enum Reconnect {
- case none
- case after(TimeInterval)
- }
- internal struct ConnectingState {
- var backoffIterator: ConnectionBackoffIterator?
- var reconnect: Reconnect
- var connectError: Error?
- var candidate: EventLoopFuture<Channel>
- var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
- var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
- }
- internal struct ConnectedState {
- var backoffIterator: ConnectionBackoffIterator?
- var reconnect: Reconnect
- var candidate: Channel
- var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
- var multiplexer: HTTP2StreamMultiplexer
- var error: Error?
- init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
- self.backoffIterator = state.backoffIterator
- self.reconnect = state.reconnect
- self.candidate = candidate
- self.readyChannelMuxPromise = state.readyChannelMuxPromise
- self.multiplexer = multiplexer
- }
- }
- internal struct ReadyState {
- var channel: Channel
- var multiplexer: HTTP2StreamMultiplexer
- var error: Error?
- init(from state: ConnectedState) {
- self.channel = state.candidate
- self.multiplexer = state.multiplexer
- }
- }
- internal struct TransientFailureState {
- var backoffIterator: ConnectionBackoffIterator?
- var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
- var scheduled: Scheduled<Void>
- var reason: Error
- init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
- self.backoffIterator = state.backoffIterator
- self.readyChannelMuxPromise = state.readyChannelMuxPromise
- self.scheduled = scheduled
- self.reason =
- reason
- ?? GRPCStatus(
- code: .unavailable,
- message: "Unexpected connection drop"
- )
- }
- init(from state: ConnectedState, scheduled: Scheduled<Void>) {
- self.backoffIterator = state.backoffIterator
- self.readyChannelMuxPromise = state.readyChannelMuxPromise
- self.scheduled = scheduled
- self.reason =
- state.error
- ?? GRPCStatus(
- code: .unavailable,
- message: "Unexpected connection drop"
- )
- }
- init(
- from state: ReadyState,
- scheduled: Scheduled<Void>,
- backoffIterator: ConnectionBackoffIterator?
- ) {
- self.backoffIterator = backoffIterator
- self.readyChannelMuxPromise = state.channel.eventLoop.makePromise()
- self.scheduled = scheduled
- self.reason =
- state.error
- ?? GRPCStatus(
- code: .unavailable,
- message: "Unexpected connection drop"
- )
- }
- }
- internal struct ShutdownState {
- var closeFuture: EventLoopFuture<Void>
- /// The reason we are shutdown. Any requests for a `Channel` in this state will be failed with
- /// this error.
- var reason: Error
- init(closeFuture: EventLoopFuture<Void>, reason: Error) {
- self.closeFuture = closeFuture
- self.reason = reason
- }
- static func shutdownByUser(closeFuture: EventLoopFuture<Void>) -> ShutdownState {
- return ShutdownState(
- closeFuture: closeFuture,
- reason: GRPCStatus(code: .unavailable, message: "Connection was shutdown by the user")
- )
- }
- }
- internal enum State {
- /// No `Channel` is required.
- ///
- /// Valid next states:
- /// - `connecting`
- /// - `shutdown`
- case idle(lastError: Error?)
- /// We're actively trying to establish a connection.
- ///
- /// Valid next states:
- /// - `active`
- /// - `transientFailure` (if our attempt fails and we're going to try again)
- /// - `shutdown`
- case connecting(ConnectingState)
- /// We've established a `Channel`, it might not be suitable (TLS handshake may fail, etc.).
- /// Our signal to be 'ready' is the initial HTTP/2 SETTINGS frame.
- ///
- /// Valid next states:
- /// - `ready`
- /// - `transientFailure` (if we our handshake fails or other error happens and we can attempt
- /// to re-establish the connection)
- /// - `shutdown`
- case active(ConnectedState)
- /// We have an active `Channel` which has seen the initial HTTP/2 SETTINGS frame. We can use
- /// the channel for making RPCs.
- ///
- /// Valid next states:
- /// - `idle` (we're not serving any RPCs, we can drop the connection for now)
- /// - `transientFailure` (we encountered an error and will re-establish the connection)
- /// - `shutdown`
- case ready(ReadyState)
- /// A `Channel` is desired, we'll attempt to create one in the future.
- ///
- /// Valid next states:
- /// - `connecting`
- /// - `shutdown`
- case transientFailure(TransientFailureState)
- /// We never want another `Channel`: this state is terminal.
- case shutdown(ShutdownState)
- fileprivate var label: String {
- switch self {
- case .idle:
- return "idle"
- case .connecting:
- return "connecting"
- case .active:
- return "active"
- case .ready:
- return "ready"
- case .transientFailure:
- return "transientFailure"
- case .shutdown:
- return "shutdown"
- }
- }
- }
- /// The last 'external' state we are in, a subset of the internal state.
- private var externalState: _ConnectivityState = .idle(nil)
- /// Update the external state, potentially notifying a delegate about the change.
- private func updateExternalState(to nextState: _ConnectivityState) {
- if !self.externalState.isSameState(as: nextState) {
- let oldState = self.externalState
- self.externalState = nextState
- self.connectivityDelegate?.connectionStateDidChange(self, from: oldState, to: nextState)
- }
- }
- /// Our current state.
- private var state: State {
- didSet {
- switch self.state {
- case let .idle(error):
- self.updateExternalState(to: .idle(error))
- self.updateConnectionID()
- case .connecting:
- self.updateExternalState(to: .connecting)
- // This is an internal state.
- case .active:
- ()
- case .ready:
- self.updateExternalState(to: .ready)
- case let .transientFailure(state):
- self.updateExternalState(to: .transientFailure(state.reason))
- self.updateConnectionID()
- case .shutdown:
- self.updateExternalState(to: .shutdown)
- }
- }
- }
- /// Returns whether the state is 'idle'.
- private var isIdle: Bool {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .idle:
- return true
- case .connecting, .transientFailure, .active, .ready, .shutdown:
- return false
- }
- }
- /// Returns whether the state is 'connecting'.
- private var isConnecting: Bool {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .connecting:
- return true
- case .idle, .transientFailure, .active, .ready, .shutdown:
- return false
- }
- }
- /// Returns whether the state is 'ready'.
- private var isReady: Bool {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .ready:
- return true
- case .idle, .active, .connecting, .transientFailure, .shutdown:
- return false
- }
- }
- /// Returns whether the state is 'ready'.
- private var isTransientFailure: Bool {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .transientFailure:
- return true
- case .idle, .connecting, .active, .ready, .shutdown:
- return false
- }
- }
- /// Returns whether the state is 'shutdown'.
- private var isShutdown: Bool {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .shutdown:
- return true
- case .idle, .connecting, .transientFailure, .active, .ready:
- return false
- }
- }
- /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
- private var multiplexer: HTTP2StreamMultiplexer? {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case let .ready(state):
- return state.multiplexer
- case .idle, .connecting, .transientFailure, .active, .shutdown:
- return nil
- }
- }
- /// The `EventLoop` that the managed connection will run on.
- internal let eventLoop: EventLoop
- /// A delegate for connectivity changes. Executed on the `EventLoop`.
- private var connectivityDelegate: ConnectionManagerConnectivityDelegate?
- /// A delegate for HTTP/2 connection changes. Executed on the `EventLoop`.
- private var http2Delegate: ConnectionManagerHTTP2Delegate?
- /// An `EventLoopFuture<Channel>` provider.
- private let channelProvider: ConnectionManagerChannelProvider
- /// The behavior for starting a call, i.e. how patient is the caller when asking for a
- /// multiplexer.
- private let callStartBehavior: CallStartBehavior.Behavior
- /// The configuration to use when backing off between connection attempts, if reconnection
- /// attempts should be made at all.
- private let connectionBackoff: ConnectionBackoff?
- /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires).
- internal let idleBehavior: IdleBehavior
- /// A logger.
- internal var logger: Logger
- internal let id: ConnectionManagerID
- private var channelNumber: UInt64
- private var channelNumberLock = NIOLock()
- private var _connectionIDAndNumber: String {
- return "\(self.id)/\(self.channelNumber)"
- }
- private var connectionIDAndNumber: String {
- return self.channelNumberLock.withLock {
- return self._connectionIDAndNumber
- }
- }
- private func updateConnectionID() {
- self.channelNumberLock.withLock {
- self.channelNumber &+= 1
- self.logger[metadataKey: MetadataKey.connectionID] = "\(self._connectionIDAndNumber)"
- }
- }
- internal func appendMetadata(to logger: inout Logger) {
- logger[metadataKey: MetadataKey.connectionID] = "\(self.connectionIDAndNumber)"
- }
- internal convenience init(
- configuration: ClientConnection.Configuration,
- channelProvider: ConnectionManagerChannelProvider? = nil,
- connectivityDelegate: ConnectionManagerConnectivityDelegate?,
- idleBehavior: IdleBehavior,
- logger: Logger
- ) {
- self.init(
- eventLoop: configuration.eventLoopGroup.next(),
- channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
- callStartBehavior: configuration.callStartBehavior.wrapped,
- idleBehavior: idleBehavior,
- connectionBackoff: configuration.connectionBackoff,
- connectivityDelegate: connectivityDelegate,
- http2Delegate: nil,
- logger: logger
- )
- }
- internal init(
- eventLoop: EventLoop,
- channelProvider: ConnectionManagerChannelProvider,
- callStartBehavior: CallStartBehavior.Behavior,
- idleBehavior: IdleBehavior,
- connectionBackoff: ConnectionBackoff?,
- connectivityDelegate: ConnectionManagerConnectivityDelegate?,
- http2Delegate: ConnectionManagerHTTP2Delegate?,
- logger: Logger
- ) {
- // Setup the logger.
- var logger = logger
- let connectionID = ConnectionManagerID()
- let channelNumber: UInt64 = 0
- logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"
- self.eventLoop = eventLoop
- self.state = .idle(lastError: nil)
- self.channelProvider = channelProvider
- self.callStartBehavior = callStartBehavior
- self.connectionBackoff = connectionBackoff
- self.connectivityDelegate = connectivityDelegate
- self.http2Delegate = http2Delegate
- self.idleBehavior = idleBehavior
- self.id = connectionID
- self.channelNumber = channelNumber
- self.logger = logger
- }
- /// Get the multiplexer from the underlying channel handling gRPC calls.
- /// if the `ConnectionManager` was configured to be `fastFailure` this will have
- /// one chance to connect - if not reconnections are managed here.
- internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
- func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
- switch self.callStartBehavior {
- case .waitsForConnectivity:
- return self.getHTTP2MultiplexerPatient()
- case .fastFailure:
- return self.getHTTP2MultiplexerOptimistic()
- }
- }
- if self.eventLoop.inEventLoop {
- return getHTTP2Multiplexer0()
- } else {
- return self.eventLoop.flatSubmit {
- getHTTP2Multiplexer0()
- }
- }
- }
- /// Returns a future for the multiplexer which succeeded when the channel is connected.
- /// Reconnects are handled if necessary.
- private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
- let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
- switch self.state {
- case .idle:
- self.startConnecting()
- // We started connecting so we must transition to the `connecting` state.
- guard case let .connecting(connecting) = self.state else {
- self.unreachableState()
- }
- multiplexer = connecting.readyChannelMuxPromise.futureResult
- case let .connecting(state):
- multiplexer = state.readyChannelMuxPromise.futureResult
- case let .active(state):
- multiplexer = state.readyChannelMuxPromise.futureResult
- case let .ready(state):
- multiplexer = self.eventLoop.makeSucceededFuture(state.multiplexer)
- case let .transientFailure(state):
- multiplexer = state.readyChannelMuxPromise.futureResult
- case let .shutdown(state):
- multiplexer = self.eventLoop.makeFailedFuture(state.reason)
- }
- self.logger.debug(
- "vending multiplexer future",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- return multiplexer
- }
- /// Returns a future for the current HTTP/2 stream multiplexer, or future HTTP/2 stream multiplexer from the current connection
- /// attempt, or if the state is 'idle' returns the future for the next connection attempt.
- ///
- /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
- private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
- // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
- self.eventLoop.preconditionInEventLoop()
- let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
- switch self.state {
- case .idle:
- self.startConnecting()
- // We started connecting so we must transition to the `connecting` state.
- guard case let .connecting(connecting) = self.state else {
- self.unreachableState()
- }
- return connecting.candidateMuxPromise.futureResult
- case let .connecting(state):
- return state.candidateMuxPromise.futureResult
- case let .active(active):
- return self.eventLoop.makeSucceededFuture(active.multiplexer)
- case let .ready(ready):
- return self.eventLoop.makeSucceededFuture(ready.multiplexer)
- case let .transientFailure(state):
- return self.eventLoop.makeFailedFuture(state.reason)
- case let .shutdown(state):
- return self.eventLoop.makeFailedFuture(state.reason)
- }
- }()
- self.logger.debug(
- "vending fast-failing multiplexer future",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- return muxFuture
- }
- @usableFromInline
- internal enum ShutdownMode {
- /// Closes the underlying channel without waiting for existing RPCs to complete.
- case forceful
- /// Allows running RPCs to run their course before closing the underlying channel. No new
- /// streams may be created.
- case graceful(NIODeadline)
- }
- /// Shutdown the underlying connection.
- ///
- /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
- internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
- let promise = self.eventLoop.makePromise(of: Void.self)
- self.shutdown(mode: mode, promise: promise)
- return promise.futureResult
- }
- /// Shutdown the underlying connection.
- ///
- /// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
- internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
- if self.eventLoop.inEventLoop {
- self._shutdown(mode: mode, promise: promise)
- } else {
- self.eventLoop.execute {
- self._shutdown(mode: mode, promise: promise)
- }
- }
- }
- private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
- self.logger.debug(
- "shutting down connection",
- metadata: [
- "connectivity_state": "\(self.state.label)",
- "shutdown.mode": "\(mode)",
- ]
- )
- switch self.state {
- // We don't have a channel and we don't want one, easy!
- case .idle:
- let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
- self.state = .shutdown(shutdown)
- promise.succeed(())
- // We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
- // the shutdown future and deal with any fallout from the connecting channel without the
- // application knowing.
- case let .connecting(state):
- let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
- self.state = .shutdown(shutdown)
- // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
- // connect the application shouldn't have access to the channel or multiplexer.
- state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
- state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
- // Complete the shutdown promise when the connection attempt has completed.
- state.candidate.whenComplete {
- switch $0 {
- case let .success(channel):
- // In case we do successfully connect, close on the next loop tick. When connecting a
- // channel NIO will complete the promise for the channel before firing channel active.
- // That means we may close and fire inactive before active which HTTP/2 will be unhappy
- // about.
- self.eventLoop.execute {
- channel.close(mode: .all, promise: nil)
- promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())
- }
- case .failure:
- // We failed to connect, that's fine we still shutdown successfully.
- promise.succeed(())
- }
- }
- // We have an active channel but the application doesn't know about it yet. We'll do the same
- // as for `.connecting`.
- case let .active(state):
- let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
- self.state = .shutdown(shutdown)
- // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
- // connect the application shouldn't have access to the channel or multiplexer.
- state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
- // We have a channel, close it. We only create streams in the ready state so there's no need
- // to quiesce here.
- state.candidate.close(mode: .all, promise: nil)
- promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())
- // The channel is up and running: the application could be using it. We can close it and
- // return the `closeFuture`.
- case let .ready(state):
- let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
- self.state = .shutdown(shutdown)
- switch mode {
- case .forceful:
- // We have a channel, close it.
- state.channel.close(mode: .all, promise: nil)
- case let .graceful(deadline):
- // If we don't close by the deadline forcibly close the channel.
- let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
- self.logger.info("shutdown timer expired, forcibly closing connection")
- state.channel.close(mode: .all, promise: nil)
- }
- // Cancel the force close if we close normally first.
- state.channel.closeFuture.whenComplete { _ in
- scheduledForceClose.cancel()
- }
- // Tell the channel to quiesce. It will be picked up by the idle handler which will close
- // the channel when all streams have been closed.
- state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
- }
- // Complete the promise when we eventually close.
- promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())
- // Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
- // do the same but also cancel any scheduled connection attempts and deal with any fallout
- // if we cancelled too late.
- case let .transientFailure(state):
- let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
- self.state = .shutdown(shutdown)
- // Stop the creation of a new channel, if we can. If we can't then the task to
- // `startConnecting()` will see our new `shutdown` state and ignore the request to connect.
- state.scheduled.cancel()
- // Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
- // connect the application shouldn't should have access to the channel.
- state.readyChannelMuxPromise.fail(shutdown.reason)
- // No active channel, so complete the shutdown promise now.
- promise.succeed(())
- // We're already shutdown; there's nothing to do.
- case let .shutdown(state):
- promise.completeWith(state.closeFuture)
- }
- }
- /// Registers a callback which fires when the current active connection is closed.
- ///
- /// If there is a connection, the callback will be invoked with `true` when the connection is
- /// closed. Otherwise the callback is invoked with `false`.
- internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
- if self.eventLoop.inEventLoop {
- self._onCurrentConnectionClose(onClose)
- } else {
- self.eventLoop.execute {
- self._onCurrentConnectionClose(onClose)
- }
- }
- }
- private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case let .ready(state):
- state.channel.closeFuture.whenComplete { _ in onClose(true) }
- case .idle, .connecting, .active, .transientFailure, .shutdown:
- onClose(false)
- }
- }
- // MARK: - State changes from the channel handler.
- /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
- /// some context.
- internal func channelError(_ error: Error) {
- self.eventLoop.preconditionInEventLoop()
- switch self.state {
- // Hitting an error in idle is a surprise, but not really something we do anything about. Either the
- // error is channel fatal, in which case we'll see channelInactive soon (acceptable), or it's not,
- // and future I/O will either fail fast or work. In either case, all we do is log this and move on.
- case .idle:
- self.logger.warning(
- "ignoring unexpected error in idle",
- metadata: [
- MetadataKey.error: "\(error)"
- ]
- )
- case .connecting(var state):
- // Record the error, the channel promise will notify the manager of any error which occurs
- // while connecting. It may be overridden by this error if it contains more relevant
- // information
- if state.connectError == nil {
- state.connectError = error
- self.state = .connecting(state)
- // The pool is only notified of connection errors when the connection transitions to the
- // transient failure state. However, in some cases (i.e. with NIOTS), errors can be thrown
- // during the connect but before the connect times out.
- //
- // This opens up a period of time where you can start a call and have it fail with
- // deadline exceeded (because no connection was available within the configured max
- // wait time for the pool) but without any diagnostic information. The information is
- // available but it hasn't been made available to the pool at that point in time.
- //
- // The delegate can't easily be modified (it's public API) and a new API doesn't make all
- // that much sense so we elect to check whether the delegate is the pool and call it
- // directly.
- if let pool = self.connectivityDelegate as? ConnectionPool {
- pool.sync.updateMostRecentError(error)
- }
- }
- case var .active(state):
- state.error = error
- self.state = .active(state)
- case var .ready(state):
- state.error = error
- self.state = .ready(state)
- // If we've already in one of these states, then additional errors aren't helpful to us.
- case .transientFailure, .shutdown:
- ()
- }
- }
- /// The connecting channel became `active`. Must be called on the `EventLoop`.
- internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
- self.eventLoop.preconditionInEventLoop()
- self.logger.debug(
- "activating connection",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- switch self.state {
- case let .connecting(connecting):
- let connected = ConnectedState(from: connecting, candidate: channel, multiplexer: multiplexer)
- self.state = .active(connected)
- // Optimistic connections are happy this this level of setup.
- connecting.candidateMuxPromise.succeed(multiplexer)
- // Application called shutdown before the channel become active; we should close it.
- case .shutdown:
- channel.close(mode: .all, promise: nil)
- case .idle, .transientFailure:
- // Received a channelActive when not connecting. Can happen if channelActive and
- // channelInactive are reordered. Ignore.
- ()
- case .active, .ready:
- // Received a second 'channelActive', already active so ignore.
- ()
- }
- }
- /// An established channel (i.e. `active` or `ready`) has become inactive: should we reconnect?
- /// Must be called on the `EventLoop`.
- internal func channelInactive() {
- self.eventLoop.preconditionInEventLoop()
- self.logger.debug(
- "deactivating connection",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- switch self.state {
- // We can hit inactive in connecting if we see channelInactive before channelActive; that's not
- // common but we should tolerate it.
- case let .connecting(connecting):
- // Should we try connecting again?
- switch connecting.reconnect {
- // No, shutdown instead.
- case .none:
- self.logger.debug("shutting down connection")
- let error = GRPCStatus(
- code: .unavailable,
- message: "The connection was dropped and connection re-establishment is disabled"
- )
- let shutdownState = ShutdownState(
- closeFuture: self.eventLoop.makeSucceededFuture(()),
- reason: error
- )
- self.state = .shutdown(shutdownState)
- // Shutting down, so fail the outstanding promises.
- connecting.readyChannelMuxPromise.fail(error)
- connecting.candidateMuxPromise.fail(error)
- // Yes, after some time.
- case let .after(delay):
- let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
- // Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again.
- connecting.candidateMuxPromise.fail(error)
- let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
- self.startConnecting()
- }
- self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
- self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
- }
- // The channel is `active` but not `ready`. Should we try again?
- case let .active(active):
- switch active.reconnect {
- // No, shutdown instead.
- case .none:
- self.logger.debug("shutting down connection")
- let error = GRPCStatus(
- code: .unavailable,
- message: "The connection was dropped and connection re-establishment is disabled"
- )
- let shutdownState = ShutdownState(
- closeFuture: self.eventLoop.makeSucceededFuture(()),
- reason: error
- )
- self.state = .shutdown(shutdownState)
- active.readyChannelMuxPromise.fail(error)
- // Yes, after some time.
- case let .after(delay):
- let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
- self.startConnecting()
- }
- self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
- self.state = .transientFailure(TransientFailureState(from: active, scheduled: scheduled))
- }
- // The channel was ready and working fine but something went wrong. Should we try to replace
- // the channel?
- case let .ready(ready):
- // No, no backoff is configured.
- if self.connectionBackoff == nil {
- self.logger.debug("shutting down connection, no reconnect configured/remaining")
- self.state = .shutdown(
- ShutdownState(
- closeFuture: ready.channel.closeFuture,
- reason: GRPCStatus(
- code: .unavailable,
- message: "The connection was dropped and a reconnect was not configured"
- )
- )
- )
- } else {
- // Yes, start connecting now. We should go via `transientFailure`, however.
- let scheduled = self.eventLoop.scheduleTask(in: .nanoseconds(0)) {
- self.startConnecting()
- }
- self.logger.debug("scheduling connection attempt", metadata: ["delay": "0"])
- let backoffIterator = self.connectionBackoff?.makeIterator()
- self.state = .transientFailure(
- TransientFailureState(
- from: ready,
- scheduled: scheduled,
- backoffIterator: backoffIterator
- )
- )
- }
- // This is fine: we expect the channel to become inactive after becoming idle.
- case .idle:
- ()
- // We're already shutdown, that's fine.
- case .shutdown:
- ()
- // Received 'channelInactive' twice; fine, ignore.
- case .transientFailure:
- ()
- }
- }
- /// The channel has become ready, that is, it has seen the initial HTTP/2 SETTINGS frame. Must be
- /// called on the `EventLoop`.
- internal func ready() {
- self.eventLoop.preconditionInEventLoop()
- self.logger.debug(
- "connection ready",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- switch self.state {
- case let .active(connected):
- self.state = .ready(ReadyState(from: connected))
- connected.readyChannelMuxPromise.succeed(connected.multiplexer)
- case .shutdown:
- ()
- case .idle, .transientFailure:
- // No connection or connection attempt exists but connection was marked as ready. This is
- // strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
- // error to.
- assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
- case .connecting:
- // No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
- // mode.
- assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
- case .ready:
- // Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
- assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
- }
- }
- /// No active RPCs are happening on 'ready' channel: close the channel for now. Must be called on
- /// the `EventLoop`.
- internal func idle() {
- self.eventLoop.preconditionInEventLoop()
- self.logger.debug(
- "idling connection",
- metadata: [
- "connectivity_state": "\(self.state.label)"
- ]
- )
- switch self.state {
- case let .active(state):
- // This state is reachable if the keepalive timer fires before we reach the ready state.
- self.state = .idle(lastError: state.error)
- state.readyChannelMuxPromise
- .fail(GRPCStatus(code: .unavailable, message: "Idled before reaching ready state"))
- case let .ready(state):
- self.state = .idle(lastError: state.error)
- case .shutdown:
- // This is expected when the connection is closed by the user: when the channel becomes
- // inactive and there are no outstanding RPCs, 'idle()' will be called instead of
- // 'channelInactive()'.
- ()
- case .idle, .transientFailure:
- // There's no connection to idle; ignore.
- ()
- case .connecting:
- // The idle watchdog is started when the connection is active, this shouldn't happen
- // in the connecting state. Ignore it in release mode.
- assertionFailure("tried to idle a connection in the \(self.state.label) state")
- }
- }
- internal func streamOpened() {
- self.eventLoop.assertInEventLoop()
- self.http2Delegate?.streamOpened(self)
- }
- internal func streamClosed() {
- self.eventLoop.assertInEventLoop()
- self.http2Delegate?.streamClosed(self)
- }
- internal func maxConcurrentStreamsChanged(_ maxConcurrentStreams: Int) {
- self.eventLoop.assertInEventLoop()
- self.http2Delegate?.receivedSettingsMaxConcurrentStreams(
- self,
- maxConcurrentStreams: maxConcurrentStreams
- )
- }
- /// The connection has started quiescing: notify the connectivity monitor of this.
- internal func beginQuiescing() {
- self.eventLoop.assertInEventLoop()
- self.connectivityDelegate?.connectionIsQuiescing(self)
- }
- }
- extension ConnectionManager {
- // A connection attempt failed; we never established a connection.
- private func connectionFailed(withError error: Error) {
- self.eventLoop.preconditionInEventLoop()
- switch self.state {
- case let .connecting(connecting):
- let reportedError: Error
- switch error as? ChannelError {
- case .some(.connectTimeout):
- // A more relevant error may have been caught earlier. Use that in preference to the
- // timeout as it'll likely be more useful.
- reportedError = connecting.connectError ?? error
- default:
- reportedError = error
- }
- // Should we reconnect?
- switch connecting.reconnect {
- // No, shutdown.
- case .none:
- self.logger.debug("shutting down connection, no reconnect configured/remaining")
- self.state = .shutdown(
- ShutdownState(closeFuture: self.eventLoop.makeSucceededFuture(()), reason: reportedError)
- )
- connecting.readyChannelMuxPromise.fail(reportedError)
- connecting.candidateMuxPromise.fail(reportedError)
- // Yes, after a delay.
- case let .after(delay):
- self.logger.debug("scheduling connection attempt", metadata: ["delay": "\(delay)"])
- let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
- self.startConnecting()
- }
- self.state = .transientFailure(
- TransientFailureState(from: connecting, scheduled: scheduled, reason: reportedError)
- )
- // Candidate mux users are not willing to wait.
- connecting.candidateMuxPromise.fail(reportedError)
- }
- // The application must have called shutdown while we were trying to establish a connection
- // which was doomed to fail anyway. That's fine, we can ignore this.
- case .shutdown:
- ()
- // Connection attempt failed, but no connection attempt is in progress.
- case .idle, .active, .ready, .transientFailure:
- // Nothing we can do other than ignore in release mode.
- assertionFailure("connect promise failed in \(self.state.label) state")
- }
- }
- }
- extension ConnectionManager {
- // Start establishing a connection: we can only do this from the `idle` and `transientFailure`
- // states. Must be called on the `EventLoop`.
- private func startConnecting() {
- self.eventLoop.assertInEventLoop()
- switch self.state {
- case .idle:
- let iterator = self.connectionBackoff?.makeIterator()
- // The iterator produces the connect timeout and the backoff to use for the next attempt. This
- // is unfortunate if retries is set to none because we need to connect timeout but not the
- // backoff yet the iterator will not return a value to us. To workaround this we grab the
- // connect timeout and override it.
- let connectTimeoutOverride: TimeAmount?
- if let backoff = self.connectionBackoff, backoff.retries == .none {
- connectTimeoutOverride = .seconds(timeInterval: backoff.minimumConnectionTimeout)
- } else {
- connectTimeoutOverride = nil
- }
- self.startConnecting(
- backoffIterator: iterator,
- muxPromise: self.eventLoop.makePromise(),
- connectTimeoutOverride: connectTimeoutOverride
- )
- case let .transientFailure(pending):
- self.startConnecting(
- backoffIterator: pending.backoffIterator,
- muxPromise: pending.readyChannelMuxPromise
- )
- // We shutdown before a scheduled connection attempt had started.
- case .shutdown:
- ()
- // We only call startConnecting() if the connection does not exist and after checking what the
- // current state is, so none of these states should be reachable.
- case .connecting:
- self.unreachableState()
- case .active:
- self.unreachableState()
- case .ready:
- self.unreachableState()
- }
- }
- private func startConnecting(
- backoffIterator: ConnectionBackoffIterator?,
- muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>,
- connectTimeoutOverride: TimeAmount? = nil
- ) {
- let timeoutAndBackoff = backoffIterator?.next()
- // We're already on the event loop: submit the connect so it starts after we've made the
- // state change to `.connecting`.
- self.eventLoop.assertInEventLoop()
- let candidate: EventLoopFuture<Channel> = self.eventLoop.flatSubmit {
- let connectTimeout: TimeAmount?
- if let connectTimeoutOverride = connectTimeoutOverride {
- connectTimeout = connectTimeoutOverride
- } else {
- connectTimeout = timeoutAndBackoff.map { TimeAmount.seconds(timeInterval: $0.timeout) }
- }
- let channel: EventLoopFuture<Channel> = self.channelProvider.makeChannel(
- managedBy: self,
- onEventLoop: self.eventLoop,
- connectTimeout: connectTimeout,
- logger: self.logger
- )
- channel.whenFailure { error in
- self.connectionFailed(withError: error)
- }
- return channel
- }
- // Should we reconnect if the candidate channel fails?
- let reconnect: Reconnect = timeoutAndBackoff.map { .after($0.backoff) } ?? .none
- let connecting = ConnectingState(
- backoffIterator: backoffIterator,
- reconnect: reconnect,
- candidate: candidate,
- readyChannelMuxPromise: muxPromise,
- candidateMuxPromise: self.eventLoop.makePromise()
- )
- self.state = .connecting(connecting)
- }
- }
- extension ConnectionManager {
- /// Returns a synchronous view of the connection manager; each operation requires the caller to be
- /// executing on the same `EventLoop` as the connection manager.
- internal var sync: Sync {
- return Sync(self)
- }
- internal struct Sync {
- private let manager: ConnectionManager
- fileprivate init(_ manager: ConnectionManager) {
- self.manager = manager
- }
- /// A delegate for connectivity changes.
- internal var connectivityDelegate: ConnectionManagerConnectivityDelegate? {
- get {
- self.manager.eventLoop.assertInEventLoop()
- return self.manager.connectivityDelegate
- }
- nonmutating set {
- self.manager.eventLoop.assertInEventLoop()
- self.manager.connectivityDelegate = newValue
- }
- }
- /// A delegate for HTTP/2 connection changes.
- internal var http2Delegate: ConnectionManagerHTTP2Delegate? {
- get {
- self.manager.eventLoop.assertInEventLoop()
- return self.manager.http2Delegate
- }
- nonmutating set {
- self.manager.eventLoop.assertInEventLoop()
- self.manager.http2Delegate = newValue
- }
- }
- /// Returns `true` if the connection is in the idle state.
- internal var isIdle: Bool {
- return self.manager.isIdle
- }
- /// Returns `true` if the connection is in the connecting state.
- internal var isConnecting: Bool {
- return self.manager.isConnecting
- }
- /// Returns `true` if the connection is in the ready state.
- internal var isReady: Bool {
- return self.manager.isReady
- }
- /// Returns `true` if the connection is in the transient failure state.
- internal var isTransientFailure: Bool {
- return self.manager.isTransientFailure
- }
- /// Returns `true` if the connection is in the shutdown state.
- internal var isShutdown: Bool {
- return self.manager.isShutdown
- }
- /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
- /// other state.
- internal var multiplexer: HTTP2StreamMultiplexer? {
- return self.manager.multiplexer
- }
- // Start establishing a connection. Must only be called when `isIdle` is `true`.
- internal func startConnecting() {
- self.manager.startConnecting()
- }
- }
- }
- extension ConnectionManager {
- private func unreachableState(
- function: StaticString = #function,
- file: StaticString = #fileID,
- line: UInt = #line
- ) -> Never {
- fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
- }
- }
|