ServerConnectionManagementHandler.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import GRPCCore
  17. internal import NIOCore
  18. internal import NIOHTTP2
  19. internal import NIOTLS
  20. /// A `ChannelHandler` which manages the lifecycle of a gRPC connection over HTTP/2.
  21. ///
  22. /// This handler is responsible for managing several aspects of the connection. These include:
  23. /// 1. Handling the graceful close of connections. When gracefully closing a connection the server
  24. /// sends a GOAWAY frame with the last stream ID set to the maximum stream ID allowed followed by
  25. /// a PING frame. On receipt of the PING frame the server sends another GOAWAY frame with the
  26. /// highest ID of all streams which have been opened. After this, the handler closes the
  27. /// connection once all streams are closed.
  28. /// 2. Enforcing that graceful shutdown doesn't exceed a configured limit (if configured).
  29. /// 3. Gracefully closing the connection once it reaches the maximum configured age (if configured).
  30. /// 4. Gracefully closing the connection once it has been idle for a given period of time (if
  31. /// configured).
  32. /// 5. Periodically sending keep alive pings to the client (if configured) and closing the
  33. /// connection if necessary.
  34. /// 6. Policing pings sent by the client to ensure that the client isn't misconfigured to send
  35. /// too many pings.
  36. ///
  37. /// Some of the behaviours are described in:
  38. /// - [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md), and
  39. /// - [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md).
  40. final class ServerConnectionManagementHandler: ChannelDuplexHandler {
  41. typealias InboundIn = HTTP2Frame
  42. typealias InboundOut = HTTP2Frame
  43. typealias OutboundIn = HTTP2Frame
  44. typealias OutboundOut = HTTP2Frame
  45. /// The `EventLoop` of the `Channel` this handler exists in.
  46. private let eventLoop: any EventLoop
  47. /// The maximum amount of time a connection may be idle for. If the connection remains idle
  48. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  49. /// closed.
  50. private var maxIdleTimer: Timer?
  51. /// The maximum age of a connection. If the connection remains open after this amount of time
  52. /// then it will be gracefully closed.
  53. private var maxAgeTimer: Timer?
  54. /// The maximum amount of time a connection may spend closing gracefully, after which it is
  55. /// closed abruptly. The timer starts after the second GOAWAY frame has been sent.
  56. private var maxGraceTimer: Timer?
  57. /// The amount of time to wait before sending a keep alive ping.
  58. private var keepaliveTimer: Timer?
  59. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  60. /// `keepaliveTimer` is set.
  61. private var keepaliveTimeoutTimer: Timer
  62. /// Opaque data sent in keep alive pings.
  63. private let keepalivePingData: HTTP2PingData
  64. /// Whether a flush is pending.
  65. private var flushPending: Bool
  66. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  67. /// Resets once `channelReadComplete` returns.
  68. private var inReadLoop: Bool
  69. /// The context of the channel this handler is in.
  70. private var context: ChannelHandlerContext?
  71. /// The current state of the connection.
  72. private var state: StateMachine
  73. /// The clock.
  74. private let clock: Clock
  75. /// Whether ALPN is required.
  76. /// If it is but the TLS handshake finished without negotiating a protocol, an error will be fired down the
  77. /// pipeline and the channel will be closed.
  78. private let requireALPN: Bool
  79. /// A clock providing the current time.
  80. ///
  81. /// This is necessary for testing where a manual clock can be used and advanced from the test.
  82. /// While NIO's `EmbeddedEventLoop` provides control over its view of time (and therefore any
  83. /// events scheduled on it) it doesn't offer a way to get the current time. This is usually done
  84. /// via `NIODeadline`.
  85. enum Clock {
  86. case nio
  87. case manual(Manual)
  88. func now() -> NIODeadline {
  89. switch self {
  90. case .nio:
  91. return .now()
  92. case .manual(let clock):
  93. return clock.time
  94. }
  95. }
  96. final class Manual {
  97. private(set) var time: NIODeadline
  98. init() {
  99. self.time = .uptimeNanoseconds(0)
  100. }
  101. func advance(by amount: TimeAmount) {
  102. self.time = self.time + amount
  103. }
  104. }
  105. }
  106. /// Stats about recently written frames. Used to determine whether to reset keep-alive state.
  107. private var frameStats: FrameStats
  108. struct FrameStats {
  109. private(set) var didWriteHeadersOrData = false
  110. /// Mark that a HEADERS frame has been written.
  111. mutating func wroteHeaders() {
  112. self.didWriteHeadersOrData = true
  113. }
  114. /// Mark that DATA frame has been written.
  115. mutating func wroteData() {
  116. self.didWriteHeadersOrData = true
  117. }
  118. /// Resets the state such that no HEADERS or DATA frames have been written.
  119. mutating func reset() {
  120. self.didWriteHeadersOrData = false
  121. }
  122. }
  123. /// A synchronous view over this handler.
  124. var syncView: SyncView {
  125. return SyncView(self)
  126. }
  127. /// A synchronous view over this handler.
  128. ///
  129. /// Methods on this view *must* be called from the same `EventLoop` as the `Channel` in which
  130. /// this handler exists.
  131. struct SyncView {
  132. private let handler: ServerConnectionManagementHandler
  133. fileprivate init(_ handler: ServerConnectionManagementHandler) {
  134. self.handler = handler
  135. }
  136. /// Notify the handler that the connection has received a flush event.
  137. func connectionWillFlush() {
  138. // The handler can't rely on `flush(context:)` due to its expected position in the pipeline.
  139. // It's expected to be placed after the HTTP/2 handler (i.e. closer to the application) as
  140. // it needs to receive HTTP/2 frames. However, flushes from stream channels aren't sent down
  141. // the entire connection channel, instead they are sent from the point in the channel they
  142. // are multiplexed from (either the HTTP/2 handler or the HTTP/2 multiplexing handler,
  143. // depending on how multiplexing is configured).
  144. self.handler.eventLoop.assertInEventLoop()
  145. if self.handler.frameStats.didWriteHeadersOrData {
  146. self.handler.frameStats.reset()
  147. self.handler.state.resetKeepaliveState()
  148. }
  149. }
  150. /// Notify the handler that a HEADERS frame was written in the last write loop.
  151. func wroteHeadersFrame() {
  152. self.handler.eventLoop.assertInEventLoop()
  153. self.handler.frameStats.wroteHeaders()
  154. }
  155. /// Notify the handler that a DATA frame was written in the last write loop.
  156. func wroteDataFrame() {
  157. self.handler.eventLoop.assertInEventLoop()
  158. self.handler.frameStats.wroteData()
  159. }
  160. }
  161. /// Creates a new handler which manages the lifecycle of a connection.
  162. ///
  163. /// - Parameters:
  164. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  165. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  166. /// - maxAge: The maximum amount of time a connection may exist before being gracefully closed.
  167. /// - maxGraceTime: The maximum amount of time that the connection has to close gracefully.
  168. /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
  169. /// ping.
  170. /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
  171. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  172. /// is received.
  173. /// - allowKeepaliveWithoutCalls: Whether the server allows the client to send keep-alive pings
  174. /// when there are no calls in progress.
  175. /// - minPingIntervalWithoutCalls: The minimum allowed interval the client is allowed to send
  176. /// keep-alive pings. Pings more frequent than this interval count as 'strikes' and the
  177. /// connection is closed if there are too many strikes.
  178. /// - clock: A clock providing the current time.
  179. init(
  180. eventLoop: any EventLoop,
  181. maxIdleTime: TimeAmount?,
  182. maxAge: TimeAmount?,
  183. maxGraceTime: TimeAmount?,
  184. keepaliveTime: TimeAmount?,
  185. keepaliveTimeout: TimeAmount?,
  186. allowKeepaliveWithoutCalls: Bool,
  187. minPingIntervalWithoutCalls: TimeAmount,
  188. requireALPN: Bool,
  189. clock: Clock = .nio
  190. ) {
  191. self.eventLoop = eventLoop
  192. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  193. self.maxAgeTimer = maxAge.map { Timer(delay: $0) }
  194. self.maxGraceTimer = maxGraceTime.map { Timer(delay: $0) }
  195. self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0) }
  196. // Always create a keep alive timeout timer, it's only used if there is a keep alive timer.
  197. self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
  198. // Generate a random value to be used as keep alive ping data.
  199. let pingData = UInt64.random(in: .min ... .max)
  200. self.keepalivePingData = HTTP2PingData(withInteger: pingData)
  201. self.state = StateMachine(
  202. allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls,
  203. minPingReceiveIntervalWithoutCalls: minPingIntervalWithoutCalls,
  204. goAwayPingData: HTTP2PingData(withInteger: ~pingData)
  205. )
  206. self.flushPending = false
  207. self.inReadLoop = false
  208. self.clock = clock
  209. self.frameStats = FrameStats()
  210. self.requireALPN = requireALPN
  211. }
  212. func handlerAdded(context: ChannelHandlerContext) {
  213. assert(context.eventLoop === self.eventLoop)
  214. self.context = context
  215. }
  216. func handlerRemoved(context: ChannelHandlerContext) {
  217. self.context = nil
  218. }
  219. func channelActive(context: ChannelHandlerContext) {
  220. let view = LoopBoundView(handler: self, context: context)
  221. self.maxAgeTimer?.schedule(on: context.eventLoop) {
  222. view.initiateGracefulShutdown()
  223. }
  224. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  225. view.initiateGracefulShutdown()
  226. }
  227. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  228. view.keepaliveTimerFired()
  229. }
  230. context.fireChannelActive()
  231. }
  232. func channelInactive(context: ChannelHandlerContext) {
  233. self.maxIdleTimer?.cancel()
  234. self.maxAgeTimer?.cancel()
  235. self.maxGraceTimer?.cancel()
  236. self.keepaliveTimer?.cancel()
  237. self.keepaliveTimeoutTimer.cancel()
  238. context.fireChannelInactive()
  239. }
  240. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  241. switch event {
  242. case let event as NIOHTTP2StreamCreatedEvent:
  243. self._streamCreated(event.streamID, channel: context.channel)
  244. case let event as StreamClosedEvent:
  245. self._streamClosed(event.streamID, channel: context.channel)
  246. case is ChannelShouldQuiesceEvent:
  247. self.initiateGracefulShutdown(context: context)
  248. case TLSUserEvent.handshakeCompleted(let negotiatedProtocol):
  249. if negotiatedProtocol == nil, self.requireALPN {
  250. // No ALPN protocol negotiated but it was required: fire an error and close the channel.
  251. context.fireErrorCaught(
  252. RPCError(
  253. code: .internalError,
  254. message: "ALPN resulted in no protocol being negotiated, but it was required."
  255. )
  256. )
  257. context.close(mode: .all, promise: nil)
  258. }
  259. default:
  260. ()
  261. }
  262. context.fireUserInboundEventTriggered(event)
  263. }
  264. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  265. self.inReadLoop = true
  266. // Any read data indicates that the connection is alive so cancel the keep-alive timers.
  267. self.keepaliveTimer?.cancel()
  268. self.keepaliveTimeoutTimer.cancel()
  269. let frame = self.unwrapInboundIn(data)
  270. switch frame.payload {
  271. case .ping(let data, let ack):
  272. if ack {
  273. self.handlePingAck(context: context, data: data)
  274. } else {
  275. self.handlePing(context: context, data: data)
  276. }
  277. default:
  278. () // Only interested in PING frames, ignore the rest.
  279. }
  280. context.fireChannelRead(data)
  281. }
  282. func channelReadComplete(context: ChannelHandlerContext) {
  283. while self.flushPending {
  284. self.flushPending = false
  285. context.flush()
  286. }
  287. self.inReadLoop = false
  288. // Done reading: schedule the keep-alive timer.
  289. let view = LoopBoundView(handler: self, context: context)
  290. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  291. view.keepaliveTimerFired()
  292. }
  293. context.fireChannelReadComplete()
  294. }
  295. func flush(context: ChannelHandlerContext) {
  296. self.maybeFlush(context: context)
  297. }
  298. }
  299. extension ServerConnectionManagementHandler {
  300. struct LoopBoundView: @unchecked Sendable {
  301. private let handler: ServerConnectionManagementHandler
  302. private let context: ChannelHandlerContext
  303. init(handler: ServerConnectionManagementHandler, context: ChannelHandlerContext) {
  304. self.handler = handler
  305. self.context = context
  306. }
  307. func initiateGracefulShutdown() {
  308. self.context.eventLoop.assertInEventLoop()
  309. self.handler.initiateGracefulShutdown(context: self.context)
  310. }
  311. func keepaliveTimerFired() {
  312. self.context.eventLoop.assertInEventLoop()
  313. self.handler.keepaliveTimerFired(context: self.context)
  314. }
  315. }
  316. }
  317. extension ServerConnectionManagementHandler {
  318. struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
  319. // @unchecked is okay: the only methods do the appropriate event-loop dance.
  320. private let handler: ServerConnectionManagementHandler
  321. init(_ handler: ServerConnectionManagementHandler) {
  322. self.handler = handler
  323. }
  324. func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  325. if self.handler.eventLoop.inEventLoop {
  326. self.handler._streamCreated(id, channel: channel)
  327. } else {
  328. self.handler.eventLoop.execute {
  329. self.handler._streamCreated(id, channel: channel)
  330. }
  331. }
  332. }
  333. func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  334. if self.handler.eventLoop.inEventLoop {
  335. self.handler._streamClosed(id, channel: channel)
  336. } else {
  337. self.handler.eventLoop.execute {
  338. self.handler._streamClosed(id, channel: channel)
  339. }
  340. }
  341. }
  342. }
  343. var http2StreamDelegate: HTTP2StreamDelegate {
  344. return HTTP2StreamDelegate(self)
  345. }
  346. private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  347. // The connection isn't idle if a stream is open.
  348. self.maxIdleTimer?.cancel()
  349. self.state.streamOpened(id)
  350. }
  351. private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  352. guard let context = self.context else { return }
  353. switch self.state.streamClosed(id) {
  354. case .startIdleTimer:
  355. let loopBound = LoopBoundView(handler: self, context: context)
  356. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  357. loopBound.initiateGracefulShutdown()
  358. }
  359. case .close:
  360. context.close(mode: .all, promise: nil)
  361. case .none:
  362. ()
  363. }
  364. }
  365. }
  366. extension ServerConnectionManagementHandler {
  367. private func maybeFlush(context: ChannelHandlerContext) {
  368. if self.inReadLoop {
  369. self.flushPending = true
  370. } else {
  371. context.flush()
  372. }
  373. }
  374. private func initiateGracefulShutdown(context: ChannelHandlerContext) {
  375. context.eventLoop.assertInEventLoop()
  376. // Cancel any timers if initiating shutdown.
  377. self.maxIdleTimer?.cancel()
  378. self.maxAgeTimer?.cancel()
  379. self.keepaliveTimer?.cancel()
  380. self.keepaliveTimeoutTimer.cancel()
  381. switch self.state.startGracefulShutdown() {
  382. case .sendGoAwayAndPing(let pingData):
  383. // There's a time window between the server sending a GOAWAY frame and the client receiving
  384. // it. During this time the client may open new streams as it doesn't yet know about the
  385. // GOAWAY frame.
  386. //
  387. // The server therefore sends a GOAWAY with the last stream ID set to the maximum stream ID
  388. // and follows it with a PING frame. When the server receives the ack for the PING frame it
  389. // knows that the client has received the initial GOAWAY frame and that no more streams may
  390. // be opened. The server can then send an additional GOAWAY frame with a more representative
  391. // last stream ID.
  392. let goAway = HTTP2Frame(
  393. streamID: .rootStream,
  394. payload: .goAway(
  395. lastStreamID: .maxID,
  396. errorCode: .noError,
  397. opaqueData: nil
  398. )
  399. )
  400. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(pingData, ack: false))
  401. context.write(self.wrapOutboundOut(goAway), promise: nil)
  402. context.write(self.wrapOutboundOut(ping), promise: nil)
  403. self.maybeFlush(context: context)
  404. case .none:
  405. () // Already shutting down.
  406. }
  407. }
  408. private func handlePing(context: ChannelHandlerContext, data: HTTP2PingData) {
  409. switch self.state.receivedPing(atTime: self.clock.now(), data: data) {
  410. case .enhanceYourCalmThenClose(let streamID):
  411. let goAway = HTTP2Frame(
  412. streamID: .rootStream,
  413. payload: .goAway(
  414. lastStreamID: streamID,
  415. errorCode: .enhanceYourCalm,
  416. opaqueData: context.channel.allocator.buffer(string: "too_many_pings")
  417. )
  418. )
  419. context.write(self.wrapOutboundOut(goAway), promise: nil)
  420. self.maybeFlush(context: context)
  421. context.close(promise: nil)
  422. case .sendAck:
  423. () // ACKs are sent by NIO's HTTP/2 handler, don't double ack.
  424. case .none:
  425. ()
  426. }
  427. }
  428. private func handlePingAck(context: ChannelHandlerContext, data: HTTP2PingData) {
  429. switch self.state.receivedPingAck(data: data) {
  430. case .sendGoAway(let streamID, let close):
  431. let goAway = HTTP2Frame(
  432. streamID: .rootStream,
  433. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  434. )
  435. context.write(self.wrapOutboundOut(goAway), promise: nil)
  436. self.maybeFlush(context: context)
  437. if close {
  438. context.close(promise: nil)
  439. } else {
  440. // RPCs may have a grace period for finishing once the second GOAWAY frame has finished.
  441. // If this is set close the connection abruptly once the grace period passes.
  442. let loopBound = NIOLoopBound(context, eventLoop: context.eventLoop)
  443. self.maxGraceTimer?.schedule(on: context.eventLoop) {
  444. loopBound.value.close(promise: nil)
  445. }
  446. }
  447. case .none:
  448. ()
  449. }
  450. }
  451. private func keepaliveTimerFired(context: ChannelHandlerContext) {
  452. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
  453. context.write(self.wrapInboundOut(ping), promise: nil)
  454. self.maybeFlush(context: context)
  455. // Schedule a timeout on waiting for the response.
  456. let loopBound = LoopBoundView(handler: self, context: context)
  457. self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
  458. loopBound.initiateGracefulShutdown()
  459. }
  460. }
  461. }