Explorar o código

Delay marking connection as .ready until receiving initial settings frame (#529)

Motivation:

It is possible for a connection to be immediately closed by the remote
after a successful TLS handshake. In this case, the connection would be
marked as ready and any backoff would be reset. That is, a reconnection
attempt would be made immediately and the cycle would repeat. The gRPC
core lib suggests resetting backoff after the initial settings frame has
been made.

Modifications:

Add a handler which on observing the initial settings frame:
1. marks the connectivity as ready and,
2. removes itself from the pipeline.

The client connection diagram was also updated to reflect this.

Result:

The connectivity state will only be marked as ready once the settings
frame has been received from the peer.
George Barnett %!s(int64=6) %!d(string=hai) anos
pai
achega
c256b4a71f
Modificáronse 2 ficheiros con 80 adicións e 16 borrados
  1. 30 16
      Sources/GRPC/ClientConnection.swift
  2. 50 0
      Sources/GRPC/SettingsObservingHandler.swift

+ 30 - 16
Sources/GRPC/ClientConnection.swift

@@ -27,9 +27,20 @@ import Logging
 /// The connection is initially setup with a handler to verify that TLS was established
 /// successfully (assuming TLS is being used).
 ///
-///                          ▲                       |
-///                HTTP2Frame│                       │HTTP2Frame
-///                        ┌─┴───────────────────────▼─┐
+///               ┌──────────────────────────┐
+///               │  DelegatingErrorHandler  │
+///               └──────────▲───────────────┘
+///                HTTP2Frame│
+///               ┌──────────┴───────────────┐
+///               │ SettingsObservingHandler │
+///               └──────────▲───────────────┘
+///                HTTP2Frame│
+///                          │                ⠇ ⠇   ⠇ ⠇
+///                          │               ┌┴─▼┐ ┌┴─▼┐
+///                          │               │   | │   | HTTP/2 streams
+///                          │               └▲─┬┘ └▲─┬┘
+///                          │                │ │   │ │ HTTP2Frame
+///                        ┌─┴────── ─────────┴─▼───┴─▼┐
 ///                        │   HTTP2StreamMultiplexer  |
 ///                        └─▲───────────────────────┬─┘
 ///                HTTP2Frame│                       │HTTP2Frame
@@ -49,11 +60,13 @@ import Logging
 ///
 /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
 /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
-/// handler removes itself from the pipeline once TLS has been verified. There is also a delegated
-/// error handler after the `HTTPStreamMultiplexer` in the main channel which uses the error
-/// delegate associated with this connection (see `DelegatingErrorHandler`).
+/// handler removes itself from the pipeline once TLS has been verified. There is also a handler
+/// after the multiplexer for observing the initial settings frame, after which it determines that
+/// the connection state is `.ready` and removes itself from the channel. Finally there is a
+/// delegated error handler which uses the error delegate associated with this connection
+/// (see `DelegatingErrorHandler`).
 ///
-/// See `BaseClientCall` for a description of the remainder of the client pipeline.
+/// See `BaseClientCall` for a description of the pipelines assoicated with each HTTP/2 stream.
 public class ClientConnection {
   internal let logger: Logger
   /// The UUID of this connection, used for logging.
@@ -144,6 +157,7 @@ public class ClientConnection {
       channel,
       tls: configuration.tls?.configuration,
       serverHostname: configuration.tls?.hostnameOverride ?? configuration.target.host,
+      connectivityMonitor: self.connectivity,
       errorDelegate: configuration.errorDelegate
     ).flatMap {
       channel.connect(to: socketAddress)
@@ -224,14 +238,8 @@ extension ClientConnection {
   ///
   /// - Parameter channel: The channel that was set.
   private func didSetChannel(to channel: EventLoopFuture<Channel>) {
-    channel.whenComplete { result in
-      switch result {
-      case .success:
-        self.connectivity.state = .ready
-
-      case .failure:
-        self.connectivity.state = .shutdown
-      }
+    channel.whenFailure { _ in
+      self.connectivity.state = .shutdown
     }
   }
 
@@ -260,6 +268,7 @@ extension ClientConnection {
       configuration: configuration,
       group: configuration.eventLoopGroup,
       timeout: timeoutAndBackoff?.timeout,
+      connectivityMonitor: connectivity,
       logger: logger
     )
 
@@ -327,10 +336,12 @@ extension ClientConnection {
   /// - Parameter configuration: The configuration to prepare the bootstrap with.
   /// - Parameter group: The `EventLoopGroup` to use for the bootstrap.
   /// - Parameter timeout: The connection timeout in seconds.
+  /// - Parameter connectivityMonitor: The connectivity state monitor for the created channel.
   private class func makeBootstrap(
     configuration: Configuration,
     group: EventLoopGroup,
     timeout: TimeInterval?,
+    connectivityMonitor: ConnectivityStateMonitor,
     logger: Logger
   ) -> ClientBootstrapProtocol {
     // Provide a server hostname if we're using TLS. Prefer the override.
@@ -354,6 +365,7 @@ extension ClientConnection {
           channel,
           tls: configuration.tls?.configuration,
           serverHostname: serverHostname,
+          connectivityMonitor: connectivityMonitor,
           errorDelegate: configuration.errorDelegate
         )
       }
@@ -376,6 +388,7 @@ extension ClientConnection {
     _ channel: Channel,
     tls: TLSConfiguration?,
     serverHostname: String?,
+    connectivityMonitor: ConnectivityStateMonitor,
     errorDelegate: ClientErrorDelegate?
   ) -> EventLoopFuture<Void> {
     let tlsConfigured = tls.map {
@@ -385,8 +398,9 @@ extension ClientConnection {
     return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
       channel.configureHTTP2Pipeline(mode: .client)
     }.flatMap { _ in
+      let settingsObserver = InitialSettingsObservingHandler(connectivityStateMonitor: connectivityMonitor)
       let errorHandler = DelegatingErrorHandler(delegate: errorDelegate)
-      return channel.pipeline.addHandler(errorHandler)
+      return channel.pipeline.addHandlers(settingsObserver, errorHandler)
     }
   }
 }

+ 50 - 0
Sources/GRPC/SettingsObservingHandler.swift

@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import NIO
+import NIOHTTP2
+import Logging
+
+/// The purpose of this channel handler is to observe the initial settings frame on the root stream.
+/// This is an indication that the connection has become `.ready`. When this happens this handler
+/// will remove itself from the pipeline.
+class InitialSettingsObservingHandler: ChannelInboundHandler, RemovableChannelHandler {
+  typealias InboundIn = HTTP2Frame
+  typealias InboundOut = HTTP2Frame
+
+  private let connectivityStateMonitor: ConnectivityStateMonitor
+  private let logger = Logger(subsystem: .clientChannel)
+
+  init(connectivityStateMonitor: ConnectivityStateMonitor) {
+    self.connectivityStateMonitor = connectivityStateMonitor
+  }
+
+  func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+    let frame = self.unwrapInboundIn(data)
+
+    if frame.streamID == .rootStream, case .settings(.settings) = frame.payload {
+      self.logger.info("observed initial settings frame on the root stream")
+      self.connectivityStateMonitor.state = .ready
+
+      // We're no longer needed at this point, remove ourselves from the pipeline.
+      self.logger.debug("removing 'InitialSettingsObservingHandler' from the channel")
+      context.pipeline.removeHandler(self, promise: nil)
+    }
+
+    // We should always forward the frame.
+    context.fireChannelRead(data)
+  }
+}