ServerConnectionManagementHandler+StateMachine.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. extension ServerConnectionManagementHandler {
  19. /// Tracks the state of TCP connections at the server.
  20. ///
  21. /// The state machine manages the state for the graceful shutdown procedure as well as policing
  22. /// client-side keep alive.
  23. struct StateMachine {
  24. /// Current state.
  25. private var state: State
  26. /// Opaque data sent to the client in a PING frame after emitting the first GOAWAY frame
  27. /// as part of graceful shutdown.
  28. private let goAwayPingData: HTTP2PingData
  29. /// Create a new state machine.
  30. ///
  31. /// - Parameters:
  32. /// - allowKeepaliveWithoutCalls: Whether the client is permitted to send keep alive pings
  33. /// when there are no active calls.
  34. /// - minPingReceiveIntervalWithoutCalls: The minimum time interval required between keep
  35. /// alive pings when there are no active calls.
  36. /// - goAwayPingData: Opaque data sent to the client in a PING frame when the server
  37. /// initiates graceful shutdown.
  38. init(
  39. allowKeepaliveWithoutCalls: Bool,
  40. minPingReceiveIntervalWithoutCalls: TimeAmount,
  41. goAwayPingData: HTTP2PingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  42. ) {
  43. let keepalive = Keepalive(
  44. allowWithoutCalls: allowKeepaliveWithoutCalls,
  45. minPingReceiveIntervalWithoutCalls: minPingReceiveIntervalWithoutCalls
  46. )
  47. self.state = .active(State.Active(keepalive: keepalive))
  48. self.goAwayPingData = goAwayPingData
  49. }
  50. /// Record that the stream with the given ID has been opened.
  51. mutating func streamOpened(_ id: HTTP2StreamID) {
  52. switch self.state {
  53. case .active(var state):
  54. self.state = ._modifying
  55. state.lastStreamID = id
  56. let (inserted, _) = state.openStreams.insert(id)
  57. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  58. self.state = .active(state)
  59. case .closing(var state):
  60. self.state = ._modifying
  61. state.lastStreamID = id
  62. let (inserted, _) = state.openStreams.insert(id)
  63. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  64. self.state = .closing(state)
  65. case .closed:
  66. ()
  67. case ._modifying:
  68. preconditionFailure()
  69. }
  70. }
  71. enum OnStreamClosed: Equatable {
  72. /// Start the idle timer, after which the connection should be closed gracefully.
  73. case startIdleTimer
  74. /// Close the connection.
  75. case close
  76. /// Do nothing.
  77. case none
  78. }
  79. /// Record that the stream with the given ID has been closed.
  80. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  81. let onStreamClosed: OnStreamClosed
  82. switch self.state {
  83. case .active(var state):
  84. self.state = ._modifying
  85. let removedID = state.openStreams.remove(id)
  86. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  87. onStreamClosed = state.openStreams.isEmpty ? .startIdleTimer : .none
  88. self.state = .active(state)
  89. case .closing(var state):
  90. self.state = ._modifying
  91. let removedID = state.openStreams.remove(id)
  92. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  93. // If the second GOAWAY hasn't been sent it isn't safe to close if there are no open
  94. // streams: the client may have opened a stream which the server doesn't know about yet.
  95. let canClose = state.sentSecondGoAway && state.openStreams.isEmpty
  96. onStreamClosed = canClose ? .close : .none
  97. self.state = .closing(state)
  98. case .closed:
  99. onStreamClosed = .none
  100. case ._modifying:
  101. preconditionFailure()
  102. }
  103. return onStreamClosed
  104. }
  105. enum OnPing: Equatable {
  106. /// Send a GOAWAY frame with the code "enhance your calm" and immediately close the connection.
  107. case enhanceYourCalmThenClose(HTTP2StreamID)
  108. /// Acknowledge the ping.
  109. case sendAck
  110. /// Ignore the ping.
  111. case none
  112. }
  113. /// Received a ping with the given data.
  114. ///
  115. /// - Parameters:
  116. /// - time: The time at which the ping was received.
  117. /// - data: The data sent with the ping.
  118. mutating func receivedPing(atTime time: NIODeadline, data: HTTP2PingData) -> OnPing {
  119. let onPing: OnPing
  120. switch self.state {
  121. case .active(var state):
  122. self.state = ._modifying
  123. let tooManyPings = state.keepalive.receivedPing(
  124. atTime: time,
  125. hasOpenStreams: !state.openStreams.isEmpty
  126. )
  127. if tooManyPings {
  128. onPing = .enhanceYourCalmThenClose(state.lastStreamID)
  129. self.state = .closed
  130. } else {
  131. onPing = .sendAck
  132. self.state = .active(state)
  133. }
  134. case .closing(var state):
  135. self.state = ._modifying
  136. let tooManyPings = state.keepalive.receivedPing(
  137. atTime: time,
  138. hasOpenStreams: !state.openStreams.isEmpty
  139. )
  140. if tooManyPings {
  141. onPing = .enhanceYourCalmThenClose(state.lastStreamID)
  142. self.state = .closed
  143. } else {
  144. onPing = .sendAck
  145. self.state = .closing(state)
  146. }
  147. case .closed:
  148. onPing = .none
  149. case ._modifying:
  150. preconditionFailure()
  151. }
  152. return onPing
  153. }
  154. enum OnPingAck: Equatable {
  155. /// Send a GOAWAY frame with no error and the given last stream ID, optionally closing the
  156. /// connection immediately afterwards.
  157. case sendGoAway(lastStreamID: HTTP2StreamID, close: Bool)
  158. /// Ignore the ack.
  159. case none
  160. }
  161. /// Received a PING frame with the 'ack' flag set.
  162. mutating func receivedPingAck(data: HTTP2PingData) -> OnPingAck {
  163. let onPingAck: OnPingAck
  164. switch self.state {
  165. case .closing(var state):
  166. self.state = ._modifying
  167. // If only one GOAWAY has been sent and the data matches the data from the GOAWAY ping then
  168. // the server should send another GOAWAY ratcheting down the last stream ID. If no streams
  169. // are open then the server can close the connection immediately after, otherwise it must
  170. // wait until all streams are closed.
  171. if !state.sentSecondGoAway, data == self.goAwayPingData {
  172. state.sentSecondGoAway = true
  173. if state.openStreams.isEmpty {
  174. self.state = .closed
  175. onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: true)
  176. } else {
  177. self.state = .closing(state)
  178. onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: false)
  179. }
  180. } else {
  181. onPingAck = .none
  182. }
  183. self.state = .closing(state)
  184. case .active, .closed:
  185. onPingAck = .none
  186. case ._modifying:
  187. preconditionFailure()
  188. }
  189. return onPingAck
  190. }
  191. enum OnStartGracefulShutdown: Equatable {
  192. /// Initiate graceful shutdown by sending a GOAWAY frame with the last stream ID set as the max
  193. /// stream ID and no error. Follow it immediately with a PING frame with the given data.
  194. case sendGoAwayAndPing(HTTP2PingData)
  195. /// Ignore the request to start graceful shutdown.
  196. case none
  197. }
  198. /// Request that the connection begins graceful shutdown.
  199. mutating func startGracefulShutdown() -> OnStartGracefulShutdown {
  200. let onStartGracefulShutdown: OnStartGracefulShutdown
  201. switch self.state {
  202. case .active(let state):
  203. self.state = .closing(State.Closing(from: state))
  204. onStartGracefulShutdown = .sendGoAwayAndPing(self.goAwayPingData)
  205. case .closing, .closed:
  206. onStartGracefulShutdown = .none
  207. case ._modifying:
  208. preconditionFailure()
  209. }
  210. return onStartGracefulShutdown
  211. }
  212. /// Reset the state of keep-alive policing.
  213. mutating func resetKeepaliveState() {
  214. switch self.state {
  215. case .active(var state):
  216. self.state = ._modifying
  217. state.keepalive.reset()
  218. self.state = .active(state)
  219. case .closing(var state):
  220. self.state = ._modifying
  221. state.keepalive.reset()
  222. self.state = .closing(state)
  223. case .closed:
  224. ()
  225. case ._modifying:
  226. preconditionFailure()
  227. }
  228. }
  229. /// Marks the state as closed.
  230. mutating func markClosed() {
  231. self.state = .closed
  232. }
  233. }
  234. }
  235. extension ServerConnectionManagementHandler.StateMachine {
  236. fileprivate struct Keepalive {
  237. /// Allow the client to send keep alive pings when there are no active calls.
  238. private let allowWithoutCalls: Bool
  239. /// The minimum time interval which pings may be received at when there are no active calls.
  240. private let minPingReceiveIntervalWithoutCalls: TimeAmount
  241. /// The maximum number of "bad" pings sent by the client the server tolerates before closing
  242. /// the connection.
  243. private let maxPingStrikes: Int
  244. /// The number of "bad" pings sent by the client. This can be reset when the server sends
  245. /// DATA or HEADERS frames.
  246. ///
  247. /// Ping strikes account for pings being occasionally being used for purposes other than keep
  248. /// alive (a low number of strikes is therefore expected and okay).
  249. private var pingStrikes: Int
  250. /// The last time a valid ping happened.
  251. ///
  252. /// Note: `distantPast` isn't used to indicate no previous valid ping as `NIODeadline` uses
  253. /// the monotonic clock on Linux which uses an undefined starting point and in some cases isn't
  254. /// always that distant.
  255. private var lastValidPingTime: NIODeadline?
  256. init(allowWithoutCalls: Bool, minPingReceiveIntervalWithoutCalls: TimeAmount) {
  257. self.allowWithoutCalls = allowWithoutCalls
  258. self.minPingReceiveIntervalWithoutCalls = minPingReceiveIntervalWithoutCalls
  259. self.maxPingStrikes = 2
  260. self.pingStrikes = 0
  261. self.lastValidPingTime = nil
  262. }
  263. /// Reset ping strikes and the time of the last valid ping.
  264. mutating func reset() {
  265. self.lastValidPingTime = nil
  266. self.pingStrikes = 0
  267. }
  268. /// Returns whether the client has sent too many pings.
  269. mutating func receivedPing(atTime time: NIODeadline, hasOpenStreams: Bool) -> Bool {
  270. let interval: TimeAmount
  271. if hasOpenStreams || self.allowWithoutCalls {
  272. interval = self.minPingReceiveIntervalWithoutCalls
  273. } else {
  274. // If there are no open streams and keep alive pings aren't allowed without calls then
  275. // use an interval of two hours.
  276. //
  277. // This comes from gRFC A8: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
  278. interval = .hours(2)
  279. }
  280. // If there's no last ping time then the first is acceptable.
  281. let isAcceptablePing = self.lastValidPingTime.map { $0 + interval <= time } ?? true
  282. let tooManyPings: Bool
  283. if isAcceptablePing {
  284. self.lastValidPingTime = time
  285. tooManyPings = false
  286. } else {
  287. self.pingStrikes += 1
  288. tooManyPings = self.pingStrikes > self.maxPingStrikes
  289. }
  290. return tooManyPings
  291. }
  292. }
  293. }
  294. extension ServerConnectionManagementHandler.StateMachine {
  295. fileprivate enum State {
  296. /// The connection is active.
  297. struct Active {
  298. /// The number of open streams.
  299. var openStreams: Set<HTTP2StreamID>
  300. /// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
  301. var lastStreamID: HTTP2StreamID
  302. /// The state of keep alive.
  303. var keepalive: Keepalive
  304. init(keepalive: Keepalive) {
  305. self.openStreams = []
  306. self.lastStreamID = .rootStream
  307. self.keepalive = keepalive
  308. }
  309. }
  310. /// The connection is closing gracefully, an initial GOAWAY frame has been sent (with the
  311. /// last stream ID set to max).
  312. struct Closing {
  313. /// The number of open streams.
  314. var openStreams: Set<HTTP2StreamID>
  315. /// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
  316. var lastStreamID: HTTP2StreamID
  317. /// The state of keep alive.
  318. var keepalive: Keepalive
  319. /// Whether the second GOAWAY frame has been sent with a lower stream ID.
  320. var sentSecondGoAway: Bool
  321. init(from state: Active) {
  322. self.openStreams = state.openStreams
  323. self.lastStreamID = state.lastStreamID
  324. self.keepalive = state.keepalive
  325. self.sentSecondGoAway = false
  326. }
  327. }
  328. case active(Active)
  329. case closing(Closing)
  330. case closed
  331. case _modifying
  332. }
  333. }