ServerConnectionManagementHandler.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. /// A `ChannelHandler` which manages the lifecycle of a gRPC connection over HTTP/2.
  19. ///
  20. /// This handler is responsible for managing several aspects of the connection. These include:
  21. /// 1. Handling the graceful close of connections. When gracefully closing a connection the server
  22. /// sends a GOAWAY frame with the last stream ID set to the maximum stream ID allowed followed by
  23. /// a PING frame. On receipt of the PING frame the server sends another GOAWAY frame with the
  24. /// highest ID of all streams which have been opened. After this, the handler closes the
  25. /// connection once all streams are closed.
  26. /// 2. Enforcing that graceful shutdown doesn't exceed a configured limit (if configured).
  27. /// 3. Gracefully closing the connection once it reaches the maximum configured age (if configured).
  28. /// 4. Gracefully closing the connection once it has been idle for a given period of time (if
  29. /// configured).
  30. /// 5. Periodically sending keep alive pings to the client (if configured) and closing the
  31. /// connection if necessary.
  32. /// 6. Policing pings sent by the client to ensure that the client isn't misconfigured to send
  33. /// too many pings.
  34. ///
  35. /// Some of the behaviours are described in:
  36. /// - [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md), and
  37. /// - [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md).
  38. final class ServerConnectionManagementHandler: ChannelDuplexHandler {
  39. typealias InboundIn = HTTP2Frame
  40. typealias InboundOut = HTTP2Frame
  41. typealias OutboundIn = HTTP2Frame
  42. typealias OutboundOut = HTTP2Frame
  43. /// The `EventLoop` of the `Channel` this handler exists in.
  44. private let eventLoop: EventLoop
  45. /// The maximum amount of time a connection may be idle for. If the connection remains idle
  46. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  47. /// closed.
  48. private var maxIdleTimer: Timer?
  49. /// The maximum age of a connection. If the connection remains open after this amount of time
  50. /// then it will be gracefully closed.
  51. private var maxAgeTimer: Timer?
  52. /// The maximum amount of time a connection may spend closing gracefully, after which it is
  53. /// closed abruptly. The timer starts after the second GOAWAY frame has been sent.
  54. private var maxGraceTimer: Timer?
  55. /// The amount of time to wait before sending a keep alive ping.
  56. private var keepAliveTimer: Timer?
  57. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  58. /// `keepAliveTimer` is set.
  59. private var keepAliveTimeoutTimer: Timer
  60. private struct Timer {
  61. /// The delay to wait before running the task.
  62. private let delay: TimeAmount
  63. /// The task to run, if scheduled.
  64. private var task: Scheduled<Void>?
  65. init(delay: TimeAmount) {
  66. self.delay = delay
  67. self.task = nil
  68. }
  69. /// Schedule a task on the given `EventLoop`.
  70. mutating func schedule(on eventLoop: EventLoop, task: @escaping () throws -> Void) {
  71. self.task?.cancel()
  72. self.task = eventLoop.scheduleTask(in: self.delay, task)
  73. }
  74. /// Cancels the task, if one was scheduled.
  75. mutating func cancel() {
  76. self.task?.cancel()
  77. self.task = nil
  78. }
  79. }
  80. /// Opaque data sent in keep alive pings.
  81. private let keepAlivePingData: HTTP2PingData
  82. /// Whether a flush is pending.
  83. private var flushPending: Bool
  84. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  85. /// Resets once `channelReadComplete` returns.
  86. private var inReadLoop: Bool
  87. /// The current state of the connection.
  88. private var state: StateMachine
  89. /// The clock.
  90. private let clock: Clock
  91. /// A clock providing the current time.
  92. ///
  93. /// This is necessary for testing where a manual clock can be used and advanced from the test.
  94. /// While NIO's `EmbeddedEventLoop` provides control over its view of time (and therefore any
  95. /// events scheduled on it) it doesn't offer a way to get the current time. This is usually done
  96. /// via `NIODeadline`.
  97. enum Clock {
  98. case nio
  99. case manual(Manual)
  100. func now() -> NIODeadline {
  101. switch self {
  102. case .nio:
  103. return .now()
  104. case .manual(let clock):
  105. return clock.time
  106. }
  107. }
  108. final class Manual {
  109. private(set) var time: NIODeadline
  110. init() {
  111. self.time = .uptimeNanoseconds(0)
  112. }
  113. func advance(by amount: TimeAmount) {
  114. self.time = self.time + amount
  115. }
  116. }
  117. }
  118. /// Stats about recently written frames. Used to determine whether to reset keep-alive state.
  119. private var frameStats: FrameStats
  120. struct FrameStats {
  121. private(set) var didWriteHeadersOrData = false
  122. /// Mark that a HEADERS frame has been written.
  123. mutating func wroteHeaders() {
  124. self.didWriteHeadersOrData = true
  125. }
  126. /// Mark that DATA frame has been written.
  127. mutating func wroteData() {
  128. self.didWriteHeadersOrData = true
  129. }
  130. /// Resets the state such that no HEADERS or DATA frames have been written.
  131. mutating func reset() {
  132. self.didWriteHeadersOrData = false
  133. }
  134. }
  135. /// A synchronous view over this handler.
  136. var syncView: SyncView {
  137. return SyncView(self)
  138. }
  139. /// A synchronous view over this handler.
  140. ///
  141. /// Methods on this view *must* be called from the same `EventLoop` as the `Channel` in which
  142. /// this handler exists.
  143. struct SyncView {
  144. private let handler: ServerConnectionManagementHandler
  145. fileprivate init(_ handler: ServerConnectionManagementHandler) {
  146. self.handler = handler
  147. }
  148. /// Notify the handler that the connection has received a flush event.
  149. func connectionWillFlush() {
  150. // The handler can't rely on `flush(context:)` due to its expected position in the pipeline.
  151. // It's expected to be placed after the HTTP/2 handler (i.e. closer to the application) as
  152. // it needs to receive HTTP/2 frames. However, flushes from stream channels aren't sent down
  153. // the entire connection channel, instead they are sent from the point in the channel they
  154. // are multiplexed from (either the HTTP/2 handler or the HTTP/2 multiplexing handler,
  155. // depending on how multiplexing is configured).
  156. self.handler.eventLoop.assertInEventLoop()
  157. if self.handler.frameStats.didWriteHeadersOrData {
  158. self.handler.frameStats.reset()
  159. self.handler.state.resetKeepAliveState()
  160. }
  161. }
  162. /// Notify the handler that a HEADERS frame was written in the last write loop.
  163. func wroteHeadersFrame() {
  164. self.handler.eventLoop.assertInEventLoop()
  165. self.handler.frameStats.wroteHeaders()
  166. }
  167. /// Notify the handler that a DATA frame was written in the last write loop.
  168. func wroteDataFrame() {
  169. self.handler.eventLoop.assertInEventLoop()
  170. self.handler.frameStats.wroteData()
  171. }
  172. }
  173. /// Creates a new handler which manages the lifecycle of a connection.
  174. ///
  175. /// - Parameters:
  176. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  177. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  178. /// - maxAge: The maximum amount of time a connection may exist before being gracefully closed.
  179. /// - maxGraceTime: The maximum amount of time that the connection has to close gracefully.
  180. /// - keepAliveTime: The amount of time to wait after reading data before sending a keep-alive
  181. /// ping.
  182. /// - keepAliveTimeout: The amount of time the client has to reply after the server sends a
  183. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  184. /// is received.
  185. /// - allowKeepAliveWithoutCalls: Whether the server allows the client to send keep-alive pings
  186. /// when there are no calls in progress.
  187. /// - minPingIntervalWithoutCalls: The minimum allowed interval the client is allowed to send
  188. /// keep-alive pings. Pings more frequent than this interval count as 'strikes' and the
  189. /// connection is closed if there are too many strikes.
  190. /// - clock: A clock providing the current time.
  191. init(
  192. eventLoop: EventLoop,
  193. maxIdleTime: TimeAmount?,
  194. maxAge: TimeAmount?,
  195. maxGraceTime: TimeAmount?,
  196. keepAliveTime: TimeAmount?,
  197. keepAliveTimeout: TimeAmount?,
  198. allowKeepAliveWithoutCalls: Bool,
  199. minPingIntervalWithoutCalls: TimeAmount,
  200. clock: Clock = .nio
  201. ) {
  202. self.eventLoop = eventLoop
  203. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  204. self.maxAgeTimer = maxAge.map { Timer(delay: $0) }
  205. self.maxGraceTimer = maxGraceTime.map { Timer(delay: $0) }
  206. self.keepAliveTimer = keepAliveTime.map { Timer(delay: $0) }
  207. // Always create a keep alive timeout timer, it's only used if there is a keep alive timer.
  208. self.keepAliveTimeoutTimer = Timer(delay: keepAliveTimeout ?? .seconds(20))
  209. // Generate a random value to be used as keep alive ping data.
  210. let pingData = UInt64.random(in: .min ... .max)
  211. self.keepAlivePingData = HTTP2PingData(withInteger: pingData)
  212. self.state = StateMachine(
  213. allowKeepAliveWithoutCalls: allowKeepAliveWithoutCalls,
  214. minPingReceiveIntervalWithoutCalls: minPingIntervalWithoutCalls,
  215. goAwayPingData: HTTP2PingData(withInteger: ~pingData)
  216. )
  217. self.flushPending = false
  218. self.inReadLoop = false
  219. self.clock = clock
  220. self.frameStats = FrameStats()
  221. }
  222. func handlerAdded(context: ChannelHandlerContext) {
  223. assert(context.eventLoop === self.eventLoop)
  224. }
  225. func channelActive(context: ChannelHandlerContext) {
  226. self.maxAgeTimer?.schedule(on: context.eventLoop) {
  227. self.initiateGracefulShutdown(context: context)
  228. }
  229. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  230. self.initiateGracefulShutdown(context: context)
  231. }
  232. self.keepAliveTimer?.schedule(on: context.eventLoop) {
  233. self.keepAliveTimerFired(context: context)
  234. }
  235. context.fireChannelActive()
  236. }
  237. func channelInactive(context: ChannelHandlerContext) {
  238. self.maxIdleTimer?.cancel()
  239. self.maxAgeTimer?.cancel()
  240. self.maxGraceTimer?.cancel()
  241. self.keepAliveTimer?.cancel()
  242. self.keepAliveTimeoutTimer.cancel()
  243. context.fireChannelInactive()
  244. }
  245. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  246. switch event {
  247. case let event as NIOHTTP2StreamCreatedEvent:
  248. // The connection isn't idle if a stream is open.
  249. self.maxIdleTimer?.cancel()
  250. self.state.streamOpened(event.streamID)
  251. case let event as StreamClosedEvent:
  252. switch self.state.streamClosed(event.streamID) {
  253. case .startIdleTimer:
  254. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  255. self.initiateGracefulShutdown(context: context)
  256. }
  257. case .close:
  258. context.close(mode: .all, promise: nil)
  259. case .none:
  260. ()
  261. }
  262. default:
  263. ()
  264. }
  265. context.fireUserInboundEventTriggered(event)
  266. }
  267. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  268. self.inReadLoop = true
  269. // Any read data indicates that the connection is alive so cancel the keep-alive timers.
  270. self.keepAliveTimer?.cancel()
  271. self.keepAliveTimeoutTimer.cancel()
  272. let frame = self.unwrapInboundIn(data)
  273. switch frame.payload {
  274. case .ping(let data, let ack):
  275. if ack {
  276. self.handlePingAck(context: context, data: data)
  277. } else {
  278. self.handlePing(context: context, data: data)
  279. }
  280. default:
  281. () // Only interested in PING frames, ignore the rest.
  282. }
  283. context.fireChannelRead(data)
  284. }
  285. func channelReadComplete(context: ChannelHandlerContext) {
  286. while self.flushPending {
  287. self.flushPending = false
  288. context.flush()
  289. }
  290. self.inReadLoop = false
  291. // Done reading: schedule the keep-alive timer.
  292. self.keepAliveTimer?.schedule(on: context.eventLoop) {
  293. self.keepAliveTimerFired(context: context)
  294. }
  295. context.fireChannelReadComplete()
  296. }
  297. func flush(context: ChannelHandlerContext) {
  298. self.maybeFlush(context: context)
  299. }
  300. }
  301. extension ServerConnectionManagementHandler {
  302. private func maybeFlush(context: ChannelHandlerContext) {
  303. if self.inReadLoop {
  304. self.flushPending = true
  305. } else {
  306. context.flush()
  307. }
  308. }
  309. private func initiateGracefulShutdown(context: ChannelHandlerContext) {
  310. context.eventLoop.assertInEventLoop()
  311. // Cancel any timers if initiating shutdown.
  312. self.maxIdleTimer?.cancel()
  313. self.maxAgeTimer?.cancel()
  314. self.keepAliveTimer?.cancel()
  315. self.keepAliveTimeoutTimer.cancel()
  316. switch self.state.startGracefulShutdown() {
  317. case .sendGoAwayAndPing(let pingData):
  318. // There's a time window between the server sending a GOAWAY frame and the client receiving
  319. // it. During this time the client may open new streams as it doesn't yet know about the
  320. // GOAWAY frame.
  321. //
  322. // The server therefore sends a GOAWAY with the last stream ID set to the maximum stream ID
  323. // and follows it with a PING frame. When the server receives the ack for the PING frame it
  324. // knows that the client has received the initial GOAWAY frame and that no more streams may
  325. // be opened. The server can then send an additional GOAWAY frame with a more representative
  326. // last stream ID.
  327. let goAway = HTTP2Frame(
  328. streamID: .rootStream,
  329. payload: .goAway(
  330. lastStreamID: .maxID,
  331. errorCode: .noError,
  332. opaqueData: nil
  333. )
  334. )
  335. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(pingData, ack: false))
  336. context.write(self.wrapOutboundOut(goAway), promise: nil)
  337. context.write(self.wrapOutboundOut(ping), promise: nil)
  338. self.maybeFlush(context: context)
  339. case .none:
  340. () // Already shutting down.
  341. }
  342. }
  343. private func handlePing(context: ChannelHandlerContext, data: HTTP2PingData) {
  344. switch self.state.receivedPing(atTime: self.clock.now(), data: data) {
  345. case .enhanceYourCalmThenClose(let streamID):
  346. let goAway = HTTP2Frame(
  347. streamID: .rootStream,
  348. payload: .goAway(
  349. lastStreamID: streamID,
  350. errorCode: .enhanceYourCalm,
  351. opaqueData: context.channel.allocator.buffer(string: "too_many_pings")
  352. )
  353. )
  354. context.write(self.wrapOutboundOut(goAway), promise: nil)
  355. self.maybeFlush(context: context)
  356. context.close(promise: nil)
  357. case .sendAck:
  358. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(data, ack: true))
  359. context.write(self.wrapOutboundOut(ping), promise: nil)
  360. self.maybeFlush(context: context)
  361. case .none:
  362. ()
  363. }
  364. }
  365. private func handlePingAck(context: ChannelHandlerContext, data: HTTP2PingData) {
  366. switch self.state.receivedPingAck(data: data) {
  367. case .sendGoAway(let streamID, let close):
  368. let goAway = HTTP2Frame(
  369. streamID: .rootStream,
  370. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  371. )
  372. context.write(self.wrapOutboundOut(goAway), promise: nil)
  373. self.maybeFlush(context: context)
  374. if close {
  375. context.close(promise: nil)
  376. } else {
  377. // RPCs may have a grace period for finishing once the second GOAWAY frame has finished.
  378. // If this is set close the connection abruptly once the grace period passes.
  379. self.maxGraceTimer?.schedule(on: context.eventLoop) {
  380. context.close(promise: nil)
  381. }
  382. }
  383. case .none:
  384. ()
  385. }
  386. }
  387. private func keepAliveTimerFired(context: ChannelHandlerContext) {
  388. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepAlivePingData, ack: false))
  389. context.write(self.wrapInboundOut(ping), promise: nil)
  390. self.maybeFlush(context: context)
  391. // Schedule a timeout on waiting for the response.
  392. self.keepAliveTimeoutTimer.schedule(on: context.eventLoop) {
  393. self.initiateGracefulShutdown(context: context)
  394. }
  395. }
  396. }