Bläddra i källkod

Add the connection backoff interop test (#557)

Motivation:

We'd like our implementation to be compliant with the gRPC
specification. Passing the interop tests is an important part of this,
yet we don't have an implementation of the connectivity backoff test.

Modifications:

- Implement the connectivity backoff test.
- Fixes a bug where call timeouts would not be respected if the
  connection had not been established.
- Fixes a bug where the connection could be re-established after calling
  shutdown.

Result:

Our connection backoff code has better test coverage; fewer bugs.
George Barnett 6 år sedan
förälder
incheckning
d2c0fce9c1

+ 10 - 0
Package.swift

@@ -22,6 +22,7 @@ let package = Package(
   products: [
     .library(name: "GRPC", targets: ["GRPC"]),
     .executable(name: "InteroperabilityTestRunner", targets: ["GRPCInteroperabilityTests"]),
+    .executable(name: "ConnectionBackoffInteropTestRunner", targets: ["GRPCConnectionBackoffInteropTest"]),
     .executable(name: "PerformanceTestRunner", targets: ["GRPCPerformanceTests"]),
     .executable(name: "Echo", targets: ["Echo"]),
   ],
@@ -109,6 +110,15 @@ let package = Package(
       ]
     ),
 
+    // The connection backoff interoperability test.
+    .target(
+      name: "GRPCConnectionBackoffInteropTest",
+      dependencies: [
+        "GRPC",
+        "GRPCInteroperabilityTestModels",
+      ]
+    ),
+
     // Performance tests implementation and CLI.
     .target(
       name: "GRPCPerformanceTests",

+ 1 - 0
Sources/GRPC/ClientCalls/BaseClientCall.swift

@@ -110,6 +110,7 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
     }
 
     self.createStreamChannel()
+    self.responseHandler.scheduleTimeout(eventLoop: connection.eventLoop)
   }
 
   /// Creates and configures an HTTP/2 stream channel. The `self.subchannel` future will hold the

+ 12 - 1
Sources/GRPC/ClientConnection.swift

@@ -208,6 +208,7 @@ extension ClientConnection {
       return
     }
 
+    // If we get a channel and it closes then create a new one, if necessary.
     channel.flatMap { $0.closeFuture }.whenComplete { result in
       switch result {
       case .success:
@@ -219,7 +220,13 @@ extension ClientConnection {
         )
       }
 
-      guard self.connectivity.canAttemptReconnect else { return }
+      guard self.connectivity.canAttemptReconnect else {
+        return
+      }
+
+      // Something went wrong, but we'll try to fix it so let's update our state to reflect that.
+      self.connectivity.state = .transientFailure
+
       self.logger.debug("client connection channel closed, creating a new one")
       self.channel = ClientConnection.makeChannel(
         configuration: self.configuration,
@@ -260,6 +267,10 @@ extension ClientConnection {
     backoffIterator: ConnectionBackoffIterator?,
     logger: Logger
   ) -> EventLoopFuture<Channel> {
+    guard connectivity.state == .idle || connectivity.state == .transientFailure else {
+      return configuration.eventLoopGroup.next().makeFailedFuture(GRPCStatus.processingError)
+    }
+
     logger.info("attempting to connect to \(configuration.target)")
     connectivity.state = .connecting
     let timeoutAndBackoff = backoffIterator?.next()

+ 32 - 12
Sources/GRPC/ClientResponseChannelHandler.swift

@@ -35,6 +35,8 @@ internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelIn
   internal var timeoutTask: Scheduled<Void>?
   internal let errorDelegate: ClientErrorDelegate?
 
+  internal var context: ChannelHandlerContext?
+
   internal enum InboundState {
     case expectingHeadersOrStatus
     case expectingMessageOrStatus
@@ -112,6 +114,7 @@ internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelIn
     }
     self.statusPromise.succeed(status)
     self.timeoutTask?.cancel()
+    self.context = nil
   }
 
   /// Observe the given error.
@@ -133,6 +136,11 @@ internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelIn
     // no-op
   }
 
+  public func handlerAdded(context: ChannelHandlerContext) {
+    // We need to hold the context in case we timeout and need to close the pipeline.
+    self.context = context
+  }
+
   /// Reads inbound data.
   ///
   /// On receipt of:
@@ -208,18 +216,6 @@ internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelIn
     }
   }
 
-  public func channelActive(context: ChannelHandlerContext) {
-    if self.timeout != .infinite {
-      let timeout = self.timeout
-      self.timeoutTask = context.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
-        self?.errorCaught(
-          context: context,
-          error: GRPCError.client(.deadlineExceeded(timeout))
-        )
-      }
-    }
-  }
-
   public func channelInactive(context: ChannelHandlerContext) {
     self.inboundState = .ignore
     self.observeStatus(.init(code: .unavailable, message: nil))
@@ -231,6 +227,30 @@ internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelIn
     self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
     context.close(mode: .all, promise: nil)
   }
+
+  /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
+  /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
+  internal func scheduleTimeout(eventLoop: EventLoop) {
+    guard self.timeout != .infinite else {
+      return
+    }
+
+    let timeout = self.timeout
+    self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
+      self?.performTimeout(error: .client(.deadlineExceeded(timeout)))
+    }
+  }
+
+  /// Called when this call times out. Any promises which have not been fulfilled will be timed out
+  /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
+  /// its channel is closed.
+  ///
+  /// - Parameter error: The error to fail any promises with.
+  internal func performTimeout(error: GRPCError) {
+    self.observeError(error)
+    self.context?.close(mode: .all, promise: nil)
+    self.context = nil
+  }
 }
 
 /// A channel handler for client calls which recieve a single response.

+ 22 - 0
Sources/GRPCConnectionBackoffInteropTest/README.md

@@ -0,0 +1,22 @@
+# gRPC Connection Backoff Interoperability Test
+
+This module implements the gRPC connection backoff interoperability test as
+described in the [specification][interop-test].
+
+## Running the Test
+
+The C++ interoperability test server implements the required server and should
+be targeted when running this test. It is available in the main [gRPC
+repository][grpc-repo] and may be built using `bazel` (`bazel build
+test/cpp/interop:reconnect_interop_server`) or one of the other options for
+[building the C++ source][grpc-cpp-build].
+
+1. Start the server: `./path/to/server --control_port=8080 --retry_port=8081`
+1. Start the test: `swift run ConnectionBackoffInteropTestRunner 8080 8081`
+
+The test takes **approximately 10 minutes to complete** and logs are written to
+`stderr`.
+
+[interop-test]: https://github.com/grpc/grpc/blob/master/doc/connection-backoff-interop-test-description.md
+[grpc-cpp-build]: https://github.com/grpc/grpc/blob/master/BUILDING.md
+[grpc-repo]: https://github.com/grpc/grpc.git

+ 105 - 0
Sources/GRPCConnectionBackoffInteropTest/main.swift

@@ -0,0 +1,105 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import GRPC
+import GRPCInteroperabilityTestModels
+import NIO
+import Logging
+
+let args = CommandLine.arguments
+guard args.count == 3, let controlPort = Int(args[1]), let retryPort = Int(args[2]) else {
+  print("Usage: \(args[0]) <server_control_port> <server_retry_port>")
+  exit(1)
+}
+
+// Notes from the test procedure are inline.
+// See: https://github.com/grpc/grpc/blob/master/doc/connection-backoff-interop-test-description.md
+
+// MARK: - Setup
+
+// Reduce stdout noise.
+LoggingSystem.bootstrap(StreamLogHandler.standardError)
+
+let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
+defer {
+  try! group.syncShutdownGracefully()
+}
+
+// The client must connect to the control port without TLS.
+let controlConfig = ClientConnection.Configuration(
+  target: .hostAndPort("localhost", controlPort),
+  eventLoopGroup: group,
+  connectionBackoff: .init()
+)
+
+// The client must connect to the retry port with TLS.
+let retryConfig = ClientConnection.Configuration(
+  target: .hostAndPort("localhost", retryPort),
+  eventLoopGroup: group,
+  tls: .init(),
+  connectionBackoff: .init()
+)
+
+// MARK: - Test Procedure
+
+print("Starting connection backoff interoperability test...")
+
+// 1. Call 'Start' on server control port with a large deadline or no deadline, wait for it to
+//    finish and check it succeeded.
+let controlConnection = ClientConnection(configuration: controlConfig)
+let controlClient = Grpc_Testing_ReconnectServiceServiceClient(connection: controlConnection)
+print("Control 'Start' call started")
+let controlStart = controlClient.start(.init(), callOptions: .init(timeout: .infinite))
+let controlStartStatus = try controlStart.status.wait()
+assert(controlStartStatus.code == .ok, "Control Start rpc failed: \(controlStartStatus.code)")
+print("Control 'Start' call succeeded")
+
+// 2. Initiate a channel connection to server retry port, which should perform reconnections with
+//    proper backoffs. A convenient way to achieve this is to call 'Start' with a deadline of 540s.
+//    The rpc should fail with deadline exceeded.
+print("Retry 'Start' call started")
+let retryConnection = ClientConnection(configuration: retryConfig)
+let retryClient = Grpc_Testing_ReconnectServiceServiceClient(
+  connection: retryConnection,
+  defaultCallOptions: CallOptions(timeout: try! .seconds(540))
+)
+let retryStart = retryClient.start(.init())
+// We expect this to take some time!
+let retryStartStatus = try retryStart.status.wait()
+assert(retryStartStatus.code == .deadlineExceeded,
+       "Retry Start rpc status was not 'deadlineExceeded': \(retryStartStatus.code)")
+print("Retry 'Start' call terminated with expected status")
+
+// 3. Call 'Stop' on server control port and check it succeeded.
+print("Control 'Stop' call started")
+let controlStop = controlClient.stop(.init())
+let controlStopStatus = try controlStop.status.wait()
+assert(controlStopStatus.code == .ok, "Control Stop rpc failed: \(controlStopStatus.code)")
+print("Control 'Stop' call succeeded")
+
+// 4. Check the response to see whether the server thinks the backoffs passed the test.
+let controlResponse = try controlStop.response.wait()
+assert(controlResponse.passed, "TEST FAILED")
+print("TEST PASSED")
+
+// MARK: - Tear down
+
+// Close the connections.
+
+// We expect close to fail on the retry connection because the channel should never be successfully
+// started.
+try? retryConnection.close().wait()
+try controlConnection.close().wait()

+ 18 - 5
Tests/GRPCTests/ClientConnectionBackoffTests.swift

@@ -167,32 +167,45 @@ class ClientConnectionBackoffTests: GRPCTestCase {
   }
 
   func testClientReconnectsAutomatically() throws {
+    // Wait for the server to start.
     self.server = self.makeServer()
     let server = try self.server.wait()
 
+    // Configure the client backoff to have a short backoff.
     var configuration = self.makeClientConfiguration()
     configuration.connectionBackoff!.maximumBackoff = 2.0
 
+    // Prepare the delegate so it expects the connection to hit `.ready`.
     let connectionReady = self.expectation(description: "connection ready")
-    let transientFailure = self.expectation(description: "connection transientFailure")
     self.stateDelegate.expectations[.ready] = connectionReady
-    self.stateDelegate.expectations[.transientFailure] = transientFailure
 
+    // Start the connection.
     self.client = ClientConnection(configuration: configuration)
 
-    // Once the connection is ready we can kill the server.
+    // Wait for the connection to be ready.
     self.wait(for: [connectionReady], timeout: 1.0)
     XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .ready])
 
+    // Now that we have a healthy connectiony, prepare for two transient failures:
+    // 1. when the server has been killed, and
+    // 2. when the client attempts to reconnect.
+    let transientFailure = self.expectation(description: "connection transientFailure")
+    transientFailure.expectedFulfillmentCount = 2
+    self.stateDelegate.expectations[.transientFailure] = transientFailure
+    self.stateDelegate.expectations[.ready] = nil
+
+    // Okay, kill the server!
     try server.close().wait()
     try self.serverGroup.syncShutdownGracefully()
     self.server = nil
     self.serverGroup = nil
 
+    // Our connection should fail now.
     self.wait(for: [transientFailure], timeout: 1.0)
-    XCTAssertEqual(self.stateDelegate.clearStates(), [.connecting, .transientFailure])
+    XCTAssertEqual(self.stateDelegate.clearStates(), [.transientFailure, .connecting, .transientFailure])
+    self.stateDelegate.expectations[.transientFailure] = nil
 
-    // Replace the ready expectation (since it's already been fulfilled).
+    // Prepare an expectation for a new healthy connection.
     let reconnectionReady = self.expectation(description: "(re)connection ready")
     self.stateDelegate.expectations[.ready] = reconnectionReady