Browse Source

Allow the client connection handler to be closed gracefully (#1855)

Motivation:

There's currently no way to gracefully closing the client connection.

Modifications:

- Add a user outbound event to the `ClientConnectionHandler` to
  gracefully shutdown the connection.

Result:

Client connection can be instructed to close gracefully.
George Barnett 1 year ago
parent
commit
81e4529821

+ 82 - 9
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -26,6 +26,8 @@ enum ClientConnectionEvent: Sendable, Hashable {
     case keepAliveExpired
     /// The connection became idle.
     case idle
+    /// The local peer initiated the close.
+    case initiatedLocally
   }
 
   /// The connection has started shutting down, no new streams should be created.
@@ -48,6 +50,11 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
   typealias OutboundIn = Never
   typealias OutboundOut = HTTP2Frame
 
+  enum OutboundEvent: Hashable, Sendable {
+    /// Close the connection gracefully
+    case closeGracefully
+  }
+
   /// The `EventLoop` of the `Channel` this handler exists in.
   private let eventLoop: EventLoop
 
@@ -120,7 +127,12 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
   }
 
   func channelInactive(context: ChannelHandlerContext) {
-    self.state.closed()
+    switch self.state.closed() {
+    case .none:
+      ()
+    case .succeed(let promise):
+      promise.succeed()
+    }
     self.keepAliveTimer?.cancel()
     self.keepAliveTimeoutTimer.cancel()
   }
@@ -169,7 +181,7 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
     case .goAway(_, let errorCode, let data):
       // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
       // closing the connection.
-      switch self.state.beginGracefulShutdown() {
+      switch self.state.beginGracefulShutdown(promise: nil) {
       case .sendGoAway(let close):
         // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
         let message = data.map { String(buffer: $0) } ?? ""
@@ -209,6 +221,32 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
     self.inReadLoop = false
     context.fireChannelReadComplete()
   }
+
+  func triggerUserOutboundEvent(
+    context: ChannelHandlerContext,
+    event: Any,
+    promise: EventLoopPromise<Void>?
+  ) {
+    if let event = event as? OutboundEvent {
+      switch event {
+      case .closeGracefully:
+        switch self.state.beginGracefulShutdown(promise: promise) {
+        case .sendGoAway(let close):
+          context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
+          // Clients should send GOAWAYs when closing a connection.
+          self.writeAndFlushGoAway(context: context, errorCode: .noError)
+          if close {
+            context.close(promise: nil)
+          }
+
+        case .none:
+          ()
+        }
+      }
+    } else {
+      context.triggerUserOutboundEvent(event, promise: promise)
+    }
+  }
 }
 
 extension ClientConnectionHandler {
@@ -294,10 +332,12 @@ extension ClientConnectionHandler {
       struct Closing {
         var allowKeepAliveWithoutCalls: Bool
         var openStreams: Set<HTTP2StreamID>
+        var closePromise: Optional<EventLoopPromise<Void>>
 
-        init(from state: Active) {
+        init(from state: Active, closePromise: EventLoopPromise<Void>?) {
           self.openStreams = state.openStreams
           self.allowKeepAliveWithoutCalls = state.allowKeepAliveWithoutCalls
+          self.closePromise = closePromise
         }
       }
     }
@@ -384,7 +424,7 @@ extension ClientConnectionHandler {
       case none
     }
 
-    mutating func beginGracefulShutdown() -> OnGracefulShutDown {
+    mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
       let onGracefulShutdown: OnGracefulShutDown
 
       switch self.state {
@@ -393,9 +433,14 @@ extension ClientConnectionHandler {
         // ratchet down the last stream ID as only the client creates streams in gRPC.
         let close = state.openStreams.isEmpty
         onGracefulShutdown = .sendGoAway(close)
-        self.state = .closing(State.Closing(from: state))
+        self.state = .closing(State.Closing(from: state, closePromise: promise))
 
-      case .closing, .closed:
+      case .closing(var state):
+        state.closePromise.setOrCascade(to: promise)
+        self.state = .closing(state)
+        onGracefulShutdown = .none
+
+      case .closed:
         onGracefulShutdown = .none
       }
 
@@ -406,16 +451,44 @@ extension ClientConnectionHandler {
     mutating func beginClosing() -> Bool {
       switch self.state {
       case .active(let active):
-        self.state = .closing(State.Closing(from: active))
+        self.state = .closing(State.Closing(from: active, closePromise: nil))
         return true
       case .closing, .closed:
         return false
       }
     }
 
+    enum OnClosed {
+      case succeed(EventLoopPromise<Void>)
+      case none
+    }
+
     /// Marks the state as closed.
-    mutating func closed() {
-      self.state = .closed
+    mutating func closed() -> OnClosed {
+      switch self.state {
+      case .active, .closed:
+        self.state = .closed
+        return .none
+      case .closing(let closing):
+        self.state = .closed
+        return closing.closePromise.map { .succeed($0) } ?? .none
+      }
+    }
+  }
+}
+
+extension Optional {
+  // TODO: replace with https://github.com/apple/swift-nio/pull/2697
+  mutating func setOrCascade<Value>(
+    to promise: EventLoopPromise<Value>?
+  ) where Wrapped == EventLoopPromise<Value> {
+    guard let promise = promise else { return }
+
+    switch self {
+    case .none:
+      self = .some(promise)
+    case .some(let existing):
+      existing.futureResult.cascade(to: promise)
     }
   }
 }

+ 4 - 4
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerStateMachineTests.swift

@@ -46,7 +46,7 @@ final class ClientConnectionHandlerStateMachineTests: XCTestCase {
 
   func testOpenAndCloseStreamWhenClosed() {
     var state = self.makeStateMachine()
-    state.closed()
+    _ = state.closed()
     state.streamOpened(1)
     XCTAssertEqual(state.streamClosed(1), .none)
   }
@@ -88,7 +88,7 @@ final class ClientConnectionHandlerStateMachineTests: XCTestCase {
 
   func testSendKeepAlivePingWhenClosed() {
     var state = self.makeStateMachine(keepAliveWithoutCalls: true)
-    state.closed()
+    _ = state.closed()
     XCTAssertFalse(state.sendKeepAlivePing())
   }
 
@@ -96,12 +96,12 @@ final class ClientConnectionHandlerStateMachineTests: XCTestCase {
     var state = self.makeStateMachine()
     state.streamOpened(1)
     // Close is false as streams are still open.
-    XCTAssertEqual(state.beginGracefulShutdown(), .sendGoAway(false))
+    XCTAssertEqual(state.beginGracefulShutdown(promise: nil), .sendGoAway(false))
   }
 
   func testBeginGracefulShutdownWhenNoStreamsAreOpen() {
     var state = self.makeStateMachine()
     // Close immediately, not streams are open.
-    XCTAssertEqual(state.beginGracefulShutdown(), .sendGoAway(true))
+    XCTAssertEqual(state.beginGracefulShutdown(promise: nil), .sendGoAway(true))
   }
 }

+ 18 - 0
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift

@@ -195,6 +195,17 @@ final class ClientConnectionHandlerTests: XCTestCase {
     connection.streamClosed(3)
     try connection.waitUntilClosed()
   }
+
+  func testOutboundGracefulClose() throws {
+    let connection = try Connection()
+    try connection.activate()
+
+    connection.streamOpened(1)
+    let closed = connection.closeGracefully()
+    XCTAssertEqual(try connection.readEvent(), .closing(.initiatedLocally))
+    connection.streamClosed(1)
+    try closed.wait()
+  }
 }
 
 extension ClientConnectionHandlerTests {
@@ -270,5 +281,12 @@ extension ClientConnectionHandlerTests {
       self.channel.embeddedEventLoop.run()
       try self.channel.closeFuture.wait()
     }
+
+    func closeGracefully() -> EventLoopFuture<Void> {
+      let promise = self.channel.embeddedEventLoop.makePromise(of: Void.self)
+      let event = ClientConnectionHandler.OutboundEvent.closeGracefully
+      self.channel.pipeline.triggerUserOutboundEvent(event, promise: promise)
+      return promise.futureResult
+    }
   }
 }