Browse Source

Allow the server to be gracefully shutdown. (#1076)

Motivation:

It should be possible to shutdown a server in a way which allows
existing connection to run to completion.

Modifications:

- Add new API to allow servers to be initiate a graceful shutdown.
- Add new API to the client connectivity delegate so that users can be
  informed that the connection is going away
- Fixed a bug in the idle state machine that sent a GOAWAY frame too soon
  in response to receiving GOAWAY frame

Result:

Servers can be shutdown gracefully
George Barnett 5 years ago
parent
commit
59294d95b0

+ 3 - 0
Package.swift

@@ -33,6 +33,8 @@ let package = Package(
     .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
     // Support for Network.framework where possible.
     .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.6.0"),
+    // Extra NIO stuff; quiescing helpers.
+    .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.4.0"),
 
     // Official SwiftProtobuf library, for [de]serializing data to send on the wire.
     .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.9.0"),
@@ -50,6 +52,7 @@ let package = Package(
         "NIOTransportServices",
         "NIOHTTP1",
         "NIOHTTP2",
+        "NIOExtras",
         "NIOSSL",
         "CGRPCZlib",
         "SwiftProtobuf",

+ 3 - 3
Sources/GRPC/CallHandlers/_BaseCallHandler.swift

@@ -666,9 +666,9 @@ extension _BaseCallHandler {
 
     switch part {
     case let .metadata(headers):
-      // Only flush if we're streaming responses, if we're not streaming responses then we'll wait
-      // for the response and end before emitting the flush.
-      flush = self.callType.isStreamingResponses
+      // Only flush if we're not unary: if we're unary we'll wait for the response and end before
+      // emitting the flush.
+      flush = self.callType != .unary
       context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
 
     case let .message(message, metadata):

+ 5 - 0
Sources/GRPC/ConnectionManager.swift

@@ -676,6 +676,11 @@ internal class ConnectionManager {
       self.invalidState()
     }
   }
+
+  /// The connection has started quiescing: notify the connectivity monitor of this.
+  internal func beginQuiescing() {
+    self.monitor.beginQuiescing()
+  }
 }
 
 extension ConnectionManager {

+ 21 - 0
Sources/GRPC/ConnectivityState.swift

@@ -53,6 +53,19 @@ public protocol ConnectivityStateDelegate: AnyObject {
   /// - Parameter oldState: The old connectivity state.
   /// - Parameter newState: The new connectivity state.
   func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState)
+
+  /// Called when the connection has started quiescing, that is, the connection is going away but
+  /// existing RPCs may continue to run.
+  ///
+  /// - Important: When this is called no new RPCs may be created until the connectivity state
+  ///   changes to 'idle' (the connection successfully quiesced) or 'transientFailure' (the
+  ///   connection was closed before quiescing completed). Starting RPCs before these state changes
+  ///   will lead to a connection error and the immediate failure of any outstanding RPCs.
+  func connectionStartedQuiescing()
+}
+
+extension ConnectivityStateDelegate {
+  public func connectionStartedQuiescing() {}
 }
 
 public class ConnectivityStateMonitor {
@@ -118,4 +131,12 @@ public class ConnectivityStateMonitor {
       }
     }
   }
+
+  internal func beginQuiescing() {
+    self.delegateCallbackQueue.async {
+      if let delegate = self.delegate {
+        delegate.connectionStartedQuiescing()
+      }
+    }
+  }
 }

+ 5 - 0
Sources/GRPC/GRPCIdleHandler.swift

@@ -87,6 +87,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
         manager.channelInactive()
       case .ready:
         manager.ready()
+      case .quiescing:
+        manager.beginQuiescing()
       }
     }
 
@@ -140,6 +142,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
     } else if let closed = event as? StreamClosedEvent {
       self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
       context.fireUserInboundEventTriggered(event)
+    } else if event is ChannelShouldQuiesceEvent {
+      self.perform(operations: self.stateMachine.initiateGracefulShutdown())
+      // Swallow this event.
     } else if event is ConnectionIdledEvent {
       self.perform(operations: self.stateMachine.shutdownNow())
       // Swallow this event.

+ 9 - 3
Sources/GRPC/GRPCIdleHandlerStateMachine.swift

@@ -114,6 +114,9 @@ struct GRPCIdleHandlerStateMachine {
     /// The number of open stream.
     var openStreams: Int
 
+    /// The last stream ID initiated by the remote peer.
+    var lastPeerInitiatedStreamID: HTTP2StreamID
+
     /// The maximum number of concurrent streams we are allowed to operate.
     var maxConcurrentStreams: Int
 
@@ -127,6 +130,7 @@ struct GRPCIdleHandlerStateMachine {
       self.role = state.role
       self.initiatedByUs = initiatedByUs
       self.openStreams = state.openStreams
+      self.lastPeerInitiatedStreamID = state.lastPeerInitiatedStreamID
       self.maxConcurrentStreams = state.maxConcurrentStreams
     }
   }
@@ -217,6 +221,7 @@ struct GRPCIdleHandlerStateMachine {
     case inactive
     case idle
     case ready
+    case quiescing
   }
 
   enum IdleTask {
@@ -290,6 +295,7 @@ struct GRPCIdleHandlerStateMachine {
         self.state = .quiescing(state)
       } else {
         self.state = .closing(.init(fromQuiescing: state))
+        operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
         operations.closeChannel()
       }
 
@@ -392,6 +398,7 @@ struct GRPCIdleHandlerStateMachine {
         //
         // It's okay if we haven't seen a SETTINGS frame at this point; we've initiated the shutdown
         // so making a connection is ready isn't necessary.
+        operations.notifyConnectionManager(about: .quiescing)
         self.state = .quiescing(.init(fromOperating: state, initiatedByUs: true))
       } else {
         // No open streams: send a GOAWAY frame and close the channel.
@@ -438,14 +445,13 @@ struct GRPCIdleHandlerStateMachine {
       // A SETTINGS frame MUST follow the connection preface. (RFC 7540 § 3.5)
       assert(state.hasSeenSettings)
 
-      // Send a GOAWAY frame in response.
-      operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
-
       if state.hasOpenStreams {
+        operations.notifyConnectionManager(about: .quiescing)
         self.state = .quiescing(.init(fromOperating: state, initiatedByUs: false))
       } else {
         // No open streams, we can close as well.
         self.state = .closing(.init(fromOperating: state))
+        operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
         operations.closeChannel()
       }
 

+ 38 - 3
Sources/GRPC/Server.swift

@@ -16,6 +16,7 @@
 import Foundation
 import Logging
 import NIO
+import NIOExtras
 import NIOHTTP1
 import NIOHTTP2
 import NIOSSL
@@ -144,18 +145,33 @@ public final class Server {
   /// Starts a server with the given configuration. See `Server.Configuration` for the options
   /// available to configure the server.
   public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
+    let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
+
     return self.makeBootstrap(configuration: configuration)
+      .serverChannelInitializer { channel in
+        channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
+      }
       .bind(to: configuration.target)
       .map { channel in
-        Server(channel: channel, errorDelegate: configuration.errorDelegate)
+        Server(
+          channel: channel,
+          quiescingHelper: quiescingHelper,
+          errorDelegate: configuration.errorDelegate
+        )
       }
   }
 
   public let channel: Channel
+  private let quiescingHelper: ServerQuiescingHelper
   private var errorDelegate: ServerErrorDelegate?
 
-  private init(channel: Channel, errorDelegate: ServerErrorDelegate?) {
+  private init(
+    channel: Channel,
+    quiescingHelper: ServerQuiescingHelper,
+    errorDelegate: ServerErrorDelegate?
+  ) {
     self.channel = channel
+    self.quiescingHelper = quiescingHelper
 
     // Maintain a strong reference to ensure it lives as long as the server.
     self.errorDelegate = errorDelegate
@@ -177,7 +193,26 @@ public final class Server {
     return self.channel.closeFuture
   }
 
-  /// Shut down the server; this should be called to avoid leaking resources.
+  /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
+  /// connections will be rejected.
+  public func initiateGracefulShutdown(promise: EventLoopPromise<Void>?) {
+    self.quiescingHelper.initiateShutdown(promise: promise)
+  }
+
+  /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
+  /// connections will be rejected.
+  public func initiateGracefulShutdown() -> EventLoopFuture<Void> {
+    let promise = self.channel.eventLoop.makePromise(of: Void.self)
+    self.initiateGracefulShutdown(promise: promise)
+    return promise.futureResult
+  }
+
+  /// Shutdown the server immediately. Active RPCs and connections will be terminated.
+  public func close(promise: EventLoopPromise<Void>?) {
+    self.channel.close(mode: .all, promise: promise)
+  }
+
+  /// Shutdown the server immediately. Active RPCs and connections will be terminated.
   public func close() -> EventLoopFuture<Void> {
     return self.channel.close(mode: .all)
   }

+ 18 - 0
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -1007,6 +1007,8 @@ internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
   private let semaphore = DispatchSemaphore(value: 0)
   private var expectation: Expectation = .noExpectation
 
+  private let quiescingSemaphore = DispatchSemaphore(value: 0)
+
   private enum Expectation {
     /// We have no expectation of any changes. We'll just ignore any changes.
     case noExpectation
@@ -1062,6 +1064,12 @@ internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
     }
   }
 
+  func connectionStartedQuiescing() {
+    self.serialQueue.async {
+      self.quiescingSemaphore.signal()
+    }
+  }
+
   func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
     self.serialQueue.async {
       self.expectation = .some(count: count, recorded: [], verify)
@@ -1090,6 +1098,16 @@ internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
       )
     }
   }
+
+  func waitForQuiescing(timeout: DispatchTimeInterval) {
+    let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
+    switch result {
+    case .success:
+      ()
+    case .timedOut:
+      XCTFail("Timed out waiting for connection to start quiescing")
+    }
+  }
 }
 
 private extension ConnectionBackoff {

+ 2 - 2
Tests/GRPCTests/GRPCIdleHandlerStateMachineTests.swift

@@ -111,7 +111,7 @@ class GRPCIdleHandlerStateMachineTests: GRPCTestCase {
     // (3) Peer initiates shutdown, streams are open.
     do {
       let op2 = stateMachine.receiveGoAway()
-      op2.assertGoAway(streamID: .rootStream)
+      op2.assertNoGoAway()
       op2.assertShouldNotClose()
 
       // We become inactive.
@@ -271,7 +271,7 @@ class GRPCIdleHandlerStateMachineTests: GRPCTestCase {
 
     // Receive a GOAWAY.
     let op2 = stateMachine.receiveGoAway()
-    op2.assertGoAway(streamID: .rootStream)
+    op2.assertNoGoAway()
 
     // Initiate shutdown from our side: we've already sent GOAWAY and have a stream open, we don't
     // need to do anything.

+ 90 - 0
Tests/GRPCTests/ServerQuiescingTests.swift

@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import EchoImplementation
+import EchoModel
+import GRPC
+import NIO
+import XCTest
+
+class ServerQuiescingTests: GRPCTestCase {
+  func testServerQuiescing() throws {
+    let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
+    defer {
+      assertThat(try group.syncShutdownGracefully(), .doesNotThrow())
+    }
+
+    let server = try Server.insecure(group: group)
+      .withLogger(self.serverLogger)
+      .withServiceProviders([EchoProvider()])
+      .bind(host: "127.0.0.1", port: 0)
+      .wait()
+
+    let connectivityStateDelegate = RecordingConnectivityDelegate()
+    let connection = ClientConnection.insecure(group: group)
+      .withBackgroundActivityLogger(self.clientLogger)
+      .withErrorDelegate(LoggingClientErrorDelegate())
+      .withConnectivityStateDelegate(connectivityStateDelegate)
+      .connect(host: "127.0.0.1", port: server.channel.localAddress!.port!)
+    defer {
+      assertThat(try connection.close().wait(), .doesNotThrow())
+    }
+
+    let echo = Echo_EchoClient(channel: connection)
+
+    // Expect the connection to setup as normal.
+    connectivityStateDelegate.expectChanges(2) { changes in
+      XCTAssertEqual(changes[0], Change(from: .idle, to: .connecting))
+      XCTAssertEqual(changes[1], Change(from: .connecting, to: .ready))
+    }
+
+    // Fire up a handful of client streaming RPCs, this will start the connection.
+    let rpcs = (0 ..< 5).map { _ in
+      echo.collect()
+    }
+
+    // Wait for the connectivity changes.
+    connectivityStateDelegate.waitForExpectedChanges(timeout: .seconds(5))
+
+    // Wait for the response metadata so both peers know about all RPCs.
+    for rpc in rpcs {
+      assertThat(try rpc.initialMetadata.wait(), .doesNotThrow())
+    }
+
+    // Start shutting down the server.
+    let serverShutdown = server.initiateGracefulShutdown()
+
+    // We should observe that we're quiescing now: this is a signal to not start any new RPCs.
+    connectivityStateDelegate.waitForQuiescing(timeout: .seconds(5))
+
+    // Queue up the expected change back to idle (i.e. when the connection is quiesced).
+    connectivityStateDelegate.expectChange {
+      XCTAssertEqual($0, Change(from: .ready, to: .idle))
+    }
+
+    // Finish each RPC.
+    for (index, rpc) in rpcs.enumerated() {
+      assertThat(try rpc.sendMessage(.with { $0.text = "\(index)" }).wait(), .doesNotThrow())
+      assertThat(try rpc.sendEnd().wait(), .doesNotThrow())
+      assertThat(try rpc.response.wait(), .is(.with { $0.text = "Swift echo collect: \(index)" }))
+    }
+
+    // All RPCs are done, the connection should drop back to idle.
+    connectivityStateDelegate.waitForExpectedChanges(timeout: .seconds(5))
+
+    // The server should be shutdown now.
+    assertThat(try serverShutdown.wait(), .doesNotThrow())
+  }
+}

+ 1 - 0
scripts/build_podspecs.py

@@ -152,6 +152,7 @@ def process_package(string):
     pod_mappings = {
         'swift-log': 'Logging',
         'swift-nio': 'SwiftNIO',
+        'swift-nio-extras': 'SwiftNIOExtras',
         'swift-nio-http2': 'SwiftNIOHTTP2',
         'swift-nio-ssl': 'SwiftNIOSSL',
         'swift-nio-transport-services': 'SwiftNIOTransportServices',