| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package import NIOCore
- package import NIOHTTP2
- /// An event which happens on a client's HTTP/2 connection.
- package enum ClientConnectionEvent: Sendable {
- package enum CloseReason: Sendable {
- /// The server sent a GOAWAY frame to the client.
- case goAway(HTTP2ErrorCode, String)
- /// The keep alive timer fired and subsequently timed out.
- case keepaliveExpired
- /// The connection became idle.
- case idle
- /// The local peer initiated the close.
- case initiatedLocally
- /// The connection was closed unexpectedly
- case unexpected((any Error)?, isIdle: Bool)
- }
- /// The connection is now ready.
- case ready
- /// The connection has started shutting down, no new streams should be created.
- case closing(CloseReason)
- }
- /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
- ///
- /// This handler is responsible for managing several aspects of the connection. These include:
- /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
- /// connection if necessary.
- /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
- /// 3. Forwarding lifecycle events to the next handler.
- ///
- /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
- package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
- package typealias InboundIn = HTTP2Frame
- package typealias InboundOut = ClientConnectionEvent
- package typealias OutboundIn = Never
- package typealias OutboundOut = HTTP2Frame
- package enum OutboundEvent: Hashable, Sendable {
- /// Close the connection gracefully
- case closeGracefully
- }
- /// The `EventLoop` of the `Channel` this handler exists in.
- private let eventLoop: any EventLoop
- /// The maximum amount of time the connection may be idle for. If the connection remains idle
- /// (i.e. has no open streams) for this period of time then the connection will be gracefully
- /// closed.
- private var maxIdleTimer: Timer?
- /// The amount of time to wait before sending a keep alive ping.
- private var keepaliveTimer: Timer?
- /// The amount of time the client has to reply after sending a keep alive ping. Only used if
- /// `keepaliveTimer` is set.
- private var keepaliveTimeoutTimer: Timer
- /// Opaque data sent in keep alive pings.
- private let keepalivePingData: HTTP2PingData
- /// The current state of the connection.
- private var state: StateMachine
- /// Whether a flush is pending.
- private var flushPending: Bool
- /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
- /// Resets once `channelReadComplete` returns.
- private var inReadLoop: Bool
- /// The context of the channel this handler is in.
- private var context: ChannelHandlerContext?
- /// Creates a new handler which manages the lifecycle of a connection.
- ///
- /// - Parameters:
- /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
- /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
- /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
- /// ping.
- /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
- /// keep-alive ping to keep the connection open. The connection is closed if no reply
- /// is received.
- /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
- /// in progress.
- package init(
- eventLoop: any EventLoop,
- maxIdleTime: TimeAmount?,
- keepaliveTime: TimeAmount?,
- keepaliveTimeout: TimeAmount?,
- keepaliveWithoutCalls: Bool
- ) {
- self.eventLoop = eventLoop
- self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
- self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
- self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
- self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
- self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
- self.flushPending = false
- self.inReadLoop = false
- }
- package func handlerAdded(context: ChannelHandlerContext) {
- assert(context.eventLoop === self.eventLoop)
- self.context = context
- }
- package func handlerRemoved(context: ChannelHandlerContext) {
- self.context = nil
- }
- package func channelInactive(context: ChannelHandlerContext) {
- switch self.state.closed() {
- case .none:
- ()
- case .unexpectedClose(let error, let isIdle):
- let event = self.wrapInboundOut(.closing(.unexpected(error, isIdle: isIdle)))
- context.fireChannelRead(event)
- case .succeed(let promise):
- promise.succeed()
- }
- self.keepaliveTimer?.cancel()
- self.keepaliveTimeoutTimer.cancel()
- context.fireChannelInactive()
- }
- package func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
- switch event {
- case let event as NIOHTTP2StreamCreatedEvent:
- self._streamCreated(event.streamID, channel: context.channel)
- case let event as StreamClosedEvent:
- self._streamClosed(event.streamID, channel: context.channel)
- default:
- ()
- }
- context.fireUserInboundEventTriggered(event)
- }
- package func errorCaught(context: ChannelHandlerContext, error: any Error) {
- // Store the error and close, this will result in the final close event being fired down
- // the pipeline with an appropriate close reason and appropriate error. (This avoids
- // the async channel just throwing the error.)
- self.state.receivedError(error)
- context.close(mode: .all, promise: nil)
- }
- package func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- let frame = self.unwrapInboundIn(data)
- self.inReadLoop = true
- switch frame.payload {
- case .goAway(_, let errorCode, let data):
- if errorCode == .noError {
- // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
- // closing the connection.
- switch self.state.beginGracefulShutdown(promise: nil) {
- case .sendGoAway(let close):
- // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
- let message = data.map { String(buffer: $0) } ?? ""
- context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
- // Clients should send GOAWAYs when closing a connection.
- self.writeAndFlushGoAway(context: context, errorCode: .noError)
- if close {
- context.close(promise: nil)
- }
- case .none:
- ()
- }
- } else {
- // Some error, begin closing.
- if self.state.beginClosing() {
- // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
- let message = data.map { String(buffer: $0) } ?? ""
- context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
- context.close(promise: nil)
- }
- }
- case .ping(let data, let ack):
- // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
- // particular only those carrying the keep-alive data.
- if ack, data == self.keepalivePingData {
- let loopBound = LoopBoundView(handler: self, context: context)
- self.keepaliveTimeoutTimer.cancel()
- self.keepaliveTimer?.schedule(on: context.eventLoop) {
- loopBound.keepaliveTimerFired()
- }
- }
- case .settings(.settings(_)):
- let isInitialSettings = self.state.receivedSettings()
- // The first settings frame indicates that the connection is now ready to use. The channel
- // becoming active is insufficient as, for example, a TLS handshake may fail after
- // establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
- if isInitialSettings {
- let loopBound = LoopBoundView(handler: self, context: context)
- self.keepaliveTimer?.schedule(on: context.eventLoop) {
- loopBound.keepaliveTimerFired()
- }
- self.maxIdleTimer?.schedule(on: context.eventLoop) {
- loopBound.maxIdleTimerFired()
- }
- context.fireChannelRead(self.wrapInboundOut(.ready))
- }
- default:
- ()
- }
- }
- package func channelReadComplete(context: ChannelHandlerContext) {
- while self.flushPending {
- self.flushPending = false
- context.flush()
- }
- self.inReadLoop = false
- context.fireChannelReadComplete()
- }
- package func triggerUserOutboundEvent(
- context: ChannelHandlerContext,
- event: Any,
- promise: EventLoopPromise<Void>?
- ) {
- if let event = event as? OutboundEvent {
- switch event {
- case .closeGracefully:
- switch self.state.beginGracefulShutdown(promise: promise) {
- case .sendGoAway(let close):
- context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
- // The client could send a GOAWAY at this point but it's not really necessary, the server
- // can't open streams anyway, the client will just close the connection when it's done.
- if close {
- context.close(promise: nil)
- }
- case .none:
- ()
- }
- }
- } else {
- context.triggerUserOutboundEvent(event, promise: promise)
- }
- }
- }
- extension ClientConnectionHandler {
- struct LoopBoundView: @unchecked Sendable {
- private let handler: ClientConnectionHandler
- private let context: ChannelHandlerContext
- init(handler: ClientConnectionHandler, context: ChannelHandlerContext) {
- self.handler = handler
- self.context = context
- }
- func keepaliveTimerFired() {
- self.context.eventLoop.assertInEventLoop()
- self.handler.keepaliveTimerFired(context: self.context)
- }
- func keepaliveTimeoutExpired() {
- self.context.eventLoop.assertInEventLoop()
- self.handler.keepaliveTimeoutExpired(context: self.context)
- }
- func maxIdleTimerFired() {
- self.context.eventLoop.assertInEventLoop()
- self.handler.maxIdleTimerFired(context: self.context)
- }
- }
- }
- extension ClientConnectionHandler {
- package struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
- // @unchecked is okay: the only methods do the appropriate event-loop dance.
- private let handler: ClientConnectionHandler
- init(_ handler: ClientConnectionHandler) {
- self.handler = handler
- }
- package func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
- if self.handler.eventLoop.inEventLoop {
- self.handler._streamCreated(id, channel: channel)
- } else {
- self.handler.eventLoop.execute {
- self.handler._streamCreated(id, channel: channel)
- }
- }
- }
- package func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
- if self.handler.eventLoop.inEventLoop {
- self.handler._streamClosed(id, channel: channel)
- } else {
- self.handler.eventLoop.execute {
- self.handler._streamClosed(id, channel: channel)
- }
- }
- }
- }
- package var http2StreamDelegate: HTTP2StreamDelegate {
- return HTTP2StreamDelegate(self)
- }
- private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
- self.eventLoop.assertInEventLoop()
- // Stream created, so the connection isn't idle.
- self.maxIdleTimer?.cancel()
- self.state.streamOpened(id)
- }
- private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
- guard let context = self.context else { return }
- self.eventLoop.assertInEventLoop()
- switch self.state.streamClosed(id) {
- case .startIdleTimer(let cancelKeepalive):
- // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
- // not stop if keep-alive is allowed when there are no active calls).
- let loopBound = LoopBoundView(handler: self, context: context)
- self.maxIdleTimer?.schedule(on: context.eventLoop) {
- loopBound.maxIdleTimerFired()
- }
- if cancelKeepalive {
- self.keepaliveTimer?.cancel()
- }
- case .close:
- // Connection was closing but waiting for all streams to close. They must all be closed
- // now so close the connection.
- context.close(promise: nil)
- case .none:
- ()
- }
- }
- }
- extension ClientConnectionHandler {
- private func maybeFlush(context: ChannelHandlerContext) {
- if self.inReadLoop {
- self.flushPending = true
- } else {
- context.flush()
- }
- }
- private func keepaliveTimerFired(context: ChannelHandlerContext) {
- guard self.state.sendKeepalivePing() else { return }
- // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
- // is acknowledged.
- self.keepaliveTimer?.cancel()
- let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
- context.write(self.wrapOutboundOut(ping), promise: nil)
- self.maybeFlush(context: context)
- // Schedule a timeout on waiting for the response.
- let loopBound = LoopBoundView(handler: self, context: context)
- self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
- loopBound.keepaliveTimeoutExpired()
- }
- }
- private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
- guard self.state.beginClosing() else { return }
- context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
- self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
- context.close(promise: nil)
- }
- private func maxIdleTimerFired(context: ChannelHandlerContext) {
- guard self.state.beginClosing() else { return }
- context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
- self.writeAndFlushGoAway(context: context, message: "idle")
- context.close(promise: nil)
- }
- private func writeAndFlushGoAway(
- context: ChannelHandlerContext,
- errorCode: HTTP2ErrorCode = .noError,
- message: String? = nil
- ) {
- let goAway = HTTP2Frame(
- streamID: .rootStream,
- payload: .goAway(
- lastStreamID: 0,
- errorCode: errorCode,
- opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
- )
- )
- context.write(self.wrapOutboundOut(goAway), promise: nil)
- self.maybeFlush(context: context)
- }
- }
- extension ClientConnectionHandler {
- struct StateMachine {
- private var state: State
- private enum State {
- case active(Active)
- case closing(Closing)
- case closed
- case _modifying
- struct Active {
- var openStreams: Set<HTTP2StreamID>
- var allowKeepaliveWithoutCalls: Bool
- var receivedConnectionPreface: Bool
- var error: (any Error)?
- init(allowKeepaliveWithoutCalls: Bool) {
- self.openStreams = []
- self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
- self.receivedConnectionPreface = false
- self.error = nil
- }
- mutating func receivedSettings() -> Bool {
- let isFirstSettingsFrame = !self.receivedConnectionPreface
- self.receivedConnectionPreface = true
- return isFirstSettingsFrame
- }
- }
- struct Closing {
- var allowKeepaliveWithoutCalls: Bool
- var openStreams: Set<HTTP2StreamID>
- var closePromise: Optional<EventLoopPromise<Void>>
- var isGraceful: Bool
- init(from state: Active, isGraceful: Bool, closePromise: EventLoopPromise<Void>?) {
- self.openStreams = state.openStreams
- self.isGraceful = isGraceful
- self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
- self.closePromise = closePromise
- }
- }
- }
- init(allowKeepaliveWithoutCalls: Bool) {
- self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
- }
- /// Record that a SETTINGS frame was received from the remote peer.
- ///
- /// - Returns: `true` if this was the first SETTINGS frame received.
- mutating func receivedSettings() -> Bool {
- switch self.state {
- case .active(var active):
- self.state = ._modifying
- let isFirstSettingsFrame = active.receivedSettings()
- self.state = .active(active)
- return isFirstSettingsFrame
- case .closing, .closed:
- return false
- case ._modifying:
- preconditionFailure()
- }
- }
- /// Record that an error was received.
- mutating func receivedError(_ error: any Error) {
- switch self.state {
- case .active(var active):
- self.state = ._modifying
- active.error = error
- self.state = .active(active)
- case .closing, .closed:
- ()
- case ._modifying:
- preconditionFailure()
- }
- }
- /// Record that the stream with the given ID has been opened.
- mutating func streamOpened(_ id: HTTP2StreamID) {
- switch self.state {
- case .active(var state):
- self.state = ._modifying
- let (inserted, _) = state.openStreams.insert(id)
- assert(inserted, "Can't open stream \(Int(id)), it's already open")
- self.state = .active(state)
- case .closing(var state):
- self.state = ._modifying
- let (inserted, _) = state.openStreams.insert(id)
- assert(inserted, "Can't open stream \(Int(id)), it's already open")
- self.state = .closing(state)
- case .closed:
- ()
- case ._modifying:
- preconditionFailure()
- }
- }
- enum OnStreamClosed: Equatable {
- /// Start the idle timer, after which the connection should be closed gracefully.
- case startIdleTimer(cancelKeepalive: Bool)
- /// Close the connection.
- case close
- /// Do nothing.
- case none
- }
- /// Record that the stream with the given ID has been closed.
- mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
- let onStreamClosed: OnStreamClosed
- switch self.state {
- case .active(var state):
- self.state = ._modifying
- let removedID = state.openStreams.remove(id)
- assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
- if state.openStreams.isEmpty {
- onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
- } else {
- onStreamClosed = .none
- }
- self.state = .active(state)
- case .closing(var state):
- self.state = ._modifying
- let removedID = state.openStreams.remove(id)
- assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
- onStreamClosed = state.openStreams.isEmpty ? .close : .none
- self.state = .closing(state)
- case .closed:
- onStreamClosed = .none
- case ._modifying:
- preconditionFailure()
- }
- return onStreamClosed
- }
- /// Returns whether a keep alive ping should be sent to the server.
- func sendKeepalivePing() -> Bool {
- let sendKeepalivePing: Bool
- // Only send a ping if there are open streams or there are no open streams and keep alive
- // is permitted when there are no active calls.
- switch self.state {
- case .active(let state):
- sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
- case .closing(let state):
- sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
- case .closed:
- sendKeepalivePing = false
- case ._modifying:
- preconditionFailure()
- }
- return sendKeepalivePing
- }
- enum OnGracefulShutDown: Equatable {
- case sendGoAway(Bool)
- case none
- }
- mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
- let onGracefulShutdown: OnGracefulShutDown
- switch self.state {
- case .active(let state):
- self.state = ._modifying
- // Only close immediately if there are no open streams. The client doesn't need to
- // ratchet down the last stream ID as only the client creates streams in gRPC.
- let close = state.openStreams.isEmpty
- onGracefulShutdown = .sendGoAway(close)
- self.state = .closing(State.Closing(from: state, isGraceful: true, closePromise: promise))
- case .closing(var state):
- self.state = ._modifying
- state.closePromise.setOrCascade(to: promise)
- self.state = .closing(state)
- onGracefulShutdown = .none
- case .closed:
- onGracefulShutdown = .none
- case ._modifying:
- preconditionFailure()
- }
- return onGracefulShutdown
- }
- /// Returns whether the connection should be closed.
- mutating func beginClosing() -> Bool {
- switch self.state {
- case .active(let active):
- self.state = .closing(State.Closing(from: active, isGraceful: false, closePromise: nil))
- return true
- case .closing(var state):
- self.state = ._modifying
- let forceShutdown = state.isGraceful
- state.isGraceful = false
- self.state = .closing(state)
- return forceShutdown
- case .closed:
- return false
- case ._modifying:
- preconditionFailure()
- }
- }
- enum OnClosed {
- case succeed(EventLoopPromise<Void>)
- case unexpectedClose((any Error)?, isIdle: Bool)
- case none
- }
- /// Marks the state as closed.
- mutating func closed() -> OnClosed {
- switch self.state {
- case .active(let state):
- self.state = .closed
- return .unexpectedClose(state.error, isIdle: state.openStreams.isEmpty)
- case .closing(let closing):
- self.state = .closed
- return closing.closePromise.map { .succeed($0) } ?? .none
- case .closed:
- self.state = .closed
- return .none
- case ._modifying:
- preconditionFailure()
- }
- }
- }
- }
|