|
|
@@ -53,139 +53,209 @@ import NIOTLS
|
|
|
/// delegate associated with this connection (see `DelegatingErrorHandler`).
|
|
|
///
|
|
|
/// See `BaseClientCall` for a description of the remainder of the client pipeline.
|
|
|
-open class ClientConnection {
|
|
|
- /// Makes and configures a `ClientBootstrap` using the provided configuration.
|
|
|
- ///
|
|
|
- /// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
|
|
|
- /// handlers detailed in the documentation for `ClientConnection`.
|
|
|
- ///
|
|
|
- /// - Parameter configuration: The configuration to prepare the bootstrap with.
|
|
|
- public class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
|
|
|
- let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
|
|
|
- // Enable SO_REUSEADDR and TCP_NODELAY.
|
|
|
- .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
|
|
- .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
|
|
|
- .channelInitializer { channel in
|
|
|
- let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
|
|
|
- channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
|
|
|
- }
|
|
|
+public class ClientConnection {
|
|
|
+ /// The configuration this connection was created using.
|
|
|
+ internal let configuration: ClientConnection.Configuration
|
|
|
|
|
|
- return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
|
|
|
- channel.configureHTTP2Pipeline(mode: .client)
|
|
|
- }.flatMap { _ in
|
|
|
- let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
|
|
|
- return channel.pipeline.addHandler(errorHandler)
|
|
|
- }
|
|
|
- }
|
|
|
+ /// The channel which will handle gRPC calls.
|
|
|
+ internal var channel: EventLoopFuture<Channel>
|
|
|
|
|
|
- return bootstrap
|
|
|
- }
|
|
|
+ /// HTTP multiplexer from the `channel` handling gRPC calls.
|
|
|
+ internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
|
|
|
|
|
|
- /// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
|
|
|
- ///
|
|
|
- /// - Parameter channel: The channel to verify successful TLS setup on.
|
|
|
- public class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
|
|
|
- return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
|
|
|
- $0.verification
|
|
|
+ /// A monitor for the connectivity state.
|
|
|
+ public let connectivity: ConnectivityStateMonitor
|
|
|
+
|
|
|
+ /// Creates a new connection from the given configuration.
|
|
|
+ public init(configuration: ClientConnection.Configuration) {
|
|
|
+ let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
|
|
|
+ let channel = ClientConnection.makeChannel(
|
|
|
+ configuration: configuration,
|
|
|
+ connectivityMonitor: monitor
|
|
|
+ )
|
|
|
+
|
|
|
+ self.channel = channel
|
|
|
+ self.multiplexer = channel.flatMap {
|
|
|
+ $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
|
|
|
+ }
|
|
|
+ self.connectivity = monitor
|
|
|
+ self.configuration = configuration
|
|
|
+
|
|
|
+ self.channel.whenSuccess { _ in
|
|
|
+ self.connectivity.state = .ready
|
|
|
}
|
|
|
+ self.replaceChannelAndMultiplexerOnClose(channel: channel)
|
|
|
}
|
|
|
|
|
|
- /// Makes a `ClientConnection` from the given channel and configuration.
|
|
|
- ///
|
|
|
- /// - Parameter channel: The channel to use for the connection.
|
|
|
- /// - Parameter configuration: The configuration used to create the channel.
|
|
|
- public class func makeClientConnection(
|
|
|
- channel: Channel,
|
|
|
- configuration: Configuration
|
|
|
- ) -> EventLoopFuture<ClientConnection> {
|
|
|
- return channel.pipeline.handler(type: HTTP2StreamMultiplexer.self).map { multiplexer in
|
|
|
- ClientConnection(channel: channel, multiplexer: multiplexer, configuration: configuration)
|
|
|
+ /// Registers a callback on the `closeFuture` of the given channel to replace this class's
|
|
|
+ /// channel and multiplexer.
|
|
|
+ private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
|
|
|
+ channel.always { result in
|
|
|
+ // If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
|
|
|
+ if case .failure = result {
|
|
|
+ self.connectivity.state = .shutdown
|
|
|
+ }
|
|
|
+ }.flatMap {
|
|
|
+ $0.closeFuture
|
|
|
+ }.whenComplete { _ in
|
|
|
+ // `.shutdown` is terminal so don't attempt a reconnection.
|
|
|
+ guard self.connectivity.state != .shutdown else {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ let newChannel = ClientConnection.makeChannel(
|
|
|
+ configuration: self.configuration,
|
|
|
+ connectivityMonitor: self.connectivity
|
|
|
+ )
|
|
|
+
|
|
|
+ self.channel = newChannel
|
|
|
+ self.multiplexer = newChannel.flatMap {
|
|
|
+ $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Change the state if the connection was successful.
|
|
|
+ newChannel.whenSuccess { _ in
|
|
|
+ self.connectivity.state = .ready
|
|
|
+ }
|
|
|
+ self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Starts a client connection using the given configuration.
|
|
|
- ///
|
|
|
- /// This involves: creating a `ClientBootstrap`, connecting to a target, verifying that the TLS
|
|
|
- /// handshake was successful (if TLS was configured) and creating the `ClientConnection`.
|
|
|
- /// See the individual functions for more information:
|
|
|
- /// - `makeBootstrap(configuration:)`,
|
|
|
- /// - `verifyTLS(channel:)`, and
|
|
|
- /// - `makeClientConnection(channel:configuration:)`.
|
|
|
- ///
|
|
|
- /// - Parameter configuration: The configuration to start the connection with.
|
|
|
- public class func start(_ configuration: Configuration) -> EventLoopFuture<ClientConnection> {
|
|
|
- return start(configuration, backoffIterator: configuration.connectionBackoff?.makeIterator())
|
|
|
+ /// The `EventLoop` this connection is using.
|
|
|
+ public var eventLoop: EventLoop {
|
|
|
+ return self.channel.eventLoop
|
|
|
}
|
|
|
|
|
|
- /// Starts a client connection using the given configuration and backoff.
|
|
|
+ /// Closes the connection to the server.
|
|
|
+ public func close() -> EventLoopFuture<Void> {
|
|
|
+ if self.connectivity.state == .shutdown {
|
|
|
+ // We're already shutdown or in the process of shutting down.
|
|
|
+ return channel.flatMap { $0.closeFuture }
|
|
|
+ } else {
|
|
|
+ self.connectivity.state = .shutdown
|
|
|
+ return channel.flatMap { $0.close() }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+extension ClientConnection {
|
|
|
+ /// Creates a `Channel` using the given configuration.
|
|
|
///
|
|
|
- /// In addition to the steps taken in `start(configuration:)`, we _may_ additionally set a
|
|
|
- /// connection timeout and schedule a retry attempt (should the connection fail) if a
|
|
|
+ /// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
|
|
|
+ /// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
|
|
|
+ /// timeout and schedule a retry attempt (should the connection fail) if a
|
|
|
/// `ConnectionBackoff.Iterator` is provided.
|
|
|
///
|
|
|
+ /// See the individual functions for more information:
|
|
|
+ /// - `makeBootstrap(configuration:)`, and
|
|
|
+ /// - `verifyTLS(channel:)`.
|
|
|
+ ///
|
|
|
/// - Parameter configuration: The configuration to start the connection with.
|
|
|
- /// - Parameter backoffIterator: A `ConnectionBackoff` iterator which generates connection
|
|
|
- /// timeouts and backoffs to use when attempting to retry the connection.
|
|
|
- internal class func start(
|
|
|
- _ configuration: Configuration,
|
|
|
+ /// - Parameter connectivityMonitor: A connectivity state monitor.
|
|
|
+ /// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
|
|
|
+ /// connection timeouts and backoff to use when attempting to create a connection.
|
|
|
+ private class func makeChannel(
|
|
|
+ configuration: ClientConnection.Configuration,
|
|
|
+ connectivityMonitor: ConnectivityStateMonitor,
|
|
|
backoffIterator: ConnectionBackoff.Iterator?
|
|
|
- ) -> EventLoopFuture<ClientConnection> {
|
|
|
+ ) -> EventLoopFuture<Channel> {
|
|
|
+ connectivityMonitor.state = .connecting
|
|
|
let timeoutAndBackoff = backoffIterator?.next()
|
|
|
+ var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)
|
|
|
|
|
|
- var bootstrap = makeBootstrap(configuration: configuration)
|
|
|
// Set a timeout, if we have one.
|
|
|
if let timeout = timeoutAndBackoff?.timeout {
|
|
|
bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
|
|
|
}
|
|
|
|
|
|
- let connection = bootstrap.connect(to: configuration.target)
|
|
|
- .flatMap { channel -> EventLoopFuture<ClientConnection> in
|
|
|
- let tlsVerified: EventLoopFuture<Void>?
|
|
|
- if configuration.tlsConfiguration != nil {
|
|
|
- tlsVerified = verifyTLS(channel: channel)
|
|
|
- } else {
|
|
|
- tlsVerified = nil
|
|
|
- }
|
|
|
-
|
|
|
- return (tlsVerified ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
|
|
|
- makeClientConnection(channel: channel, configuration: configuration)
|
|
|
- }
|
|
|
+ let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
|
|
|
+ if configuration.tlsConfiguration != nil {
|
|
|
+ return ClientConnection.verifyTLS(channel: channel).map { channel }
|
|
|
+ } else {
|
|
|
+ return channel.eventLoop.makeSucceededFuture(channel)
|
|
|
}
|
|
|
+ }.always { result in
|
|
|
+ switch result {
|
|
|
+ case .success:
|
|
|
+ // Update the state once the channel has been assigned, when it may be used for making
|
|
|
+ // RPCs.
|
|
|
+ break
|
|
|
+
|
|
|
+ case .failure:
|
|
|
+ // We might try again in a moment.
|
|
|
+ connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
guard let backoff = timeoutAndBackoff?.backoff else {
|
|
|
- return connection
|
|
|
+ return channel
|
|
|
}
|
|
|
|
|
|
// If we're in error then schedule our next attempt.
|
|
|
- return connection.flatMapError { error in
|
|
|
+ return channel.flatMapError { error in
|
|
|
// The `futureResult` of the scheduled task is of type
|
|
|
// `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
|
|
|
// remove a level of indirection.
|
|
|
- return connection.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
|
|
|
- return start(configuration, backoffIterator: backoffIterator)
|
|
|
+ return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
|
|
|
+ return makeChannel(
|
|
|
+ configuration: configuration,
|
|
|
+ connectivityMonitor: connectivityMonitor,
|
|
|
+ backoffIterator: backoffIterator
|
|
|
+ )
|
|
|
}.futureResult.flatMap { nextConnection in
|
|
|
return nextConnection
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public let channel: Channel
|
|
|
- public let multiplexer: HTTP2StreamMultiplexer
|
|
|
- public let configuration: Configuration
|
|
|
-
|
|
|
- init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, configuration: Configuration) {
|
|
|
- self.channel = channel
|
|
|
- self.multiplexer = multiplexer
|
|
|
- self.configuration = configuration
|
|
|
+ /// Creates a `Channel` using the given configuration amd state connectivity monitor.
|
|
|
+ ///
|
|
|
+ /// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
|
|
|
+ private class func makeChannel(
|
|
|
+ configuration: ClientConnection.Configuration,
|
|
|
+ connectivityMonitor: ConnectivityStateMonitor
|
|
|
+ ) -> EventLoopFuture<Channel> {
|
|
|
+ return makeChannel(
|
|
|
+ configuration: configuration,
|
|
|
+ connectivityMonitor: connectivityMonitor,
|
|
|
+ backoffIterator: configuration.connectionBackoff?.makeIterator()
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
- /// Fired when the client shuts down.
|
|
|
- public var onClose: EventLoopFuture<Void> {
|
|
|
- return channel.closeFuture
|
|
|
+ /// Makes and configures a `ClientBootstrap` using the provided configuration.
|
|
|
+ ///
|
|
|
+ /// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
|
|
|
+ /// handlers detailed in the documentation for `ClientConnection`.
|
|
|
+ ///
|
|
|
+ /// - Parameter configuration: The configuration to prepare the bootstrap with.
|
|
|
+ private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
|
|
|
+ let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
|
|
|
+ // Enable SO_REUSEADDR and TCP_NODELAY.
|
|
|
+ .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
|
|
+ .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
|
|
|
+ .channelInitializer { channel in
|
|
|
+ let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
|
|
|
+ channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
|
|
|
+ }
|
|
|
+
|
|
|
+ return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
|
|
|
+ channel.configureHTTP2Pipeline(mode: .client)
|
|
|
+ }.flatMap { _ in
|
|
|
+ let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
|
|
|
+ return channel.pipeline.addHandler(errorHandler)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return bootstrap
|
|
|
}
|
|
|
|
|
|
- public func close() -> EventLoopFuture<Void> {
|
|
|
- return channel.close(mode: .all)
|
|
|
+ /// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
|
|
|
+ ///
|
|
|
+ /// - Parameter channel: The channel to verify successful TLS setup on.
|
|
|
+ private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
|
|
|
+ return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
|
|
|
+ $0.verification
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -222,6 +292,9 @@ extension ClientConnection {
|
|
|
/// cycle.
|
|
|
public var errorDelegate: ClientErrorDelegate?
|
|
|
|
|
|
+ /// A delegate which is called when the connectivity state is changed.
|
|
|
+ public var connectivityStateDelegate: ConnectivityStateDelegate?
|
|
|
+
|
|
|
/// TLS configuration for this connection. `nil` if TLS is not desired.
|
|
|
public var tlsConfiguration: TLSConfiguration?
|
|
|
|
|
|
@@ -240,6 +313,7 @@ extension ClientConnection {
|
|
|
/// - Parameter eventLoopGroup: The event loop group to run the connection on.
|
|
|
/// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
|
|
|
/// on debug builds.
|
|
|
+ /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
|
|
|
/// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
|
|
|
/// - Parameter connectionBackoff: The connection backoff configuration to use, defaulting
|
|
|
/// to `nil`.
|
|
|
@@ -247,12 +321,14 @@ extension ClientConnection {
|
|
|
target: ConnectionTarget,
|
|
|
eventLoopGroup: EventLoopGroup,
|
|
|
errorDelegate: ClientErrorDelegate? = DebugOnlyLoggingClientErrorDelegate.shared,
|
|
|
+ connectivityStateDelegate: ConnectivityStateDelegate? = nil,
|
|
|
tlsConfiguration: TLSConfiguration? = nil,
|
|
|
connectionBackoff: ConnectionBackoff? = nil
|
|
|
) {
|
|
|
self.target = target
|
|
|
self.eventLoopGroup = eventLoopGroup
|
|
|
self.errorDelegate = errorDelegate
|
|
|
+ self.connectivityStateDelegate = connectivityStateDelegate
|
|
|
self.tlsConfiguration = tlsConfiguration
|
|
|
self.connectionBackoff = connectionBackoff
|
|
|
}
|
|
|
@@ -309,8 +385,7 @@ fileprivate extension Channel {
|
|
|
context: configuration.sslContext,
|
|
|
serverHostname: configuration.hostnameOverride)
|
|
|
|
|
|
- let verificationHandler = TLSVerificationHandler(errorDelegate: errorDelegate)
|
|
|
- return self.pipeline.addHandlers(sslClientHandler, verificationHandler)
|
|
|
+ return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler())
|
|
|
} catch {
|
|
|
return self.eventLoop.makeFailedFuture(error)
|
|
|
}
|