2
0

ServerConnectionManagementHandler+StateMachine.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. extension ServerConnectionManagementHandler {
  19. /// Tracks the state of TCP connections at the server.
  20. ///
  21. /// The state machine manages the state for the graceful shutdown procedure as well as policing
  22. /// client-side keep alive.
  23. struct StateMachine {
  24. /// Current state.
  25. private var state: State
  26. /// Opaque data sent to the client in a PING frame after emitting the first GOAWAY frame
  27. /// as part of graceful shutdown.
  28. private let goAwayPingData: HTTP2PingData
  29. /// Create a new state machine.
  30. ///
  31. /// - Parameters:
  32. /// - allowKeepAliveWithoutCalls: Whether the client is permitted to send keep alive pings
  33. /// when there are no active calls.
  34. /// - minPingReceiveIntervalWithoutCalls: The minimum time interval required between keep
  35. /// alive pings when there are no active calls.
  36. /// - goAwayPingData: Opaque data sent to the client in a PING frame when the server
  37. /// initiates graceful shutdown.
  38. init(
  39. allowKeepAliveWithoutCalls: Bool,
  40. minPingReceiveIntervalWithoutCalls: TimeAmount,
  41. goAwayPingData: HTTP2PingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  42. ) {
  43. let keepAlive = KeepAlive(
  44. allowWithoutCalls: allowKeepAliveWithoutCalls,
  45. minPingReceiveIntervalWithoutCalls: minPingReceiveIntervalWithoutCalls
  46. )
  47. self.state = .active(State.Active(keepAlive: keepAlive))
  48. self.goAwayPingData = goAwayPingData
  49. }
  50. /// Record that the stream with the given ID has been opened.
  51. mutating func streamOpened(_ id: HTTP2StreamID) {
  52. switch self.state {
  53. case .active(var state):
  54. state.lastStreamID = id
  55. let (inserted, _) = state.openStreams.insert(id)
  56. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  57. self.state = .active(state)
  58. case .closing(var state):
  59. state.lastStreamID = id
  60. let (inserted, _) = state.openStreams.insert(id)
  61. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  62. self.state = .closing(state)
  63. case .closed:
  64. ()
  65. }
  66. }
  67. enum OnStreamClosed: Equatable {
  68. /// Start the idle timer, after which the connection should be closed gracefully.
  69. case startIdleTimer
  70. /// Close the connection.
  71. case close
  72. /// Do nothing.
  73. case none
  74. }
  75. /// Record that the stream with the given ID has been closed.
  76. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  77. let onStreamClosed: OnStreamClosed
  78. switch self.state {
  79. case .active(var state):
  80. let removedID = state.openStreams.remove(id)
  81. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  82. onStreamClosed = state.openStreams.isEmpty ? .startIdleTimer : .none
  83. self.state = .active(state)
  84. case .closing(var state):
  85. let removedID = state.openStreams.remove(id)
  86. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  87. // If the second GOAWAY hasn't been sent it isn't safe to close if there are no open
  88. // streams: the client may have opened a stream which the server doesn't know about yet.
  89. let canClose = state.sentSecondGoAway && state.openStreams.isEmpty
  90. onStreamClosed = canClose ? .close : .none
  91. self.state = .closing(state)
  92. case .closed:
  93. onStreamClosed = .none
  94. }
  95. return onStreamClosed
  96. }
  97. enum OnPing: Equatable {
  98. /// Send a GOAWAY frame with the code "enhance your calm" and immediately close the connection.
  99. case enhanceYourCalmThenClose(HTTP2StreamID)
  100. /// Acknowledge the ping.
  101. case sendAck
  102. /// Ignore the ping.
  103. case none
  104. }
  105. /// Received a ping with the given data.
  106. ///
  107. /// - Parameters:
  108. /// - time: The time at which the ping was received.
  109. /// - data: The data sent with the ping.
  110. mutating func receivedPing(atTime time: NIODeadline, data: HTTP2PingData) -> OnPing {
  111. let onPing: OnPing
  112. switch self.state {
  113. case .active(var state):
  114. let tooManyPings = state.keepAlive.receivedPing(
  115. atTime: time,
  116. hasOpenStreams: !state.openStreams.isEmpty
  117. )
  118. if tooManyPings {
  119. onPing = .enhanceYourCalmThenClose(state.lastStreamID)
  120. self.state = .closed
  121. } else {
  122. onPing = .sendAck
  123. self.state = .active(state)
  124. }
  125. case .closing(var state):
  126. let tooManyPings = state.keepAlive.receivedPing(
  127. atTime: time,
  128. hasOpenStreams: !state.openStreams.isEmpty
  129. )
  130. if tooManyPings {
  131. onPing = .enhanceYourCalmThenClose(state.lastStreamID)
  132. self.state = .closed
  133. } else {
  134. onPing = .sendAck
  135. self.state = .closing(state)
  136. }
  137. case .closed:
  138. onPing = .none
  139. }
  140. return onPing
  141. }
  142. enum OnPingAck: Equatable {
  143. /// Send a GOAWAY frame with no error and the given last stream ID, optionally closing the
  144. /// connection immediately afterwards.
  145. case sendGoAway(lastStreamID: HTTP2StreamID, close: Bool)
  146. /// Ignore the ack.
  147. case none
  148. }
  149. /// Received a PING frame with the 'ack' flag set.
  150. mutating func receivedPingAck(data: HTTP2PingData) -> OnPingAck {
  151. let onPingAck: OnPingAck
  152. switch self.state {
  153. case .closing(var state):
  154. // If only one GOAWAY has been sent and the data matches the data from the GOAWAY ping then
  155. // the server should send another GOAWAY ratcheting down the last stream ID. If no streams
  156. // are open then the server can close the connection immediately after, otherwise it must
  157. // wait until all streams are closed.
  158. if !state.sentSecondGoAway, data == self.goAwayPingData {
  159. state.sentSecondGoAway = true
  160. if state.openStreams.isEmpty {
  161. self.state = .closed
  162. onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: true)
  163. } else {
  164. self.state = .closing(state)
  165. onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: false)
  166. }
  167. } else {
  168. onPingAck = .none
  169. }
  170. case .active, .closed:
  171. onPingAck = .none
  172. }
  173. return onPingAck
  174. }
  175. enum OnStartGracefulShutdown: Equatable {
  176. /// Initiate graceful shutdown by sending a GOAWAY frame with the last stream ID set as the max
  177. /// stream ID and no error. Follow it immediately with a PING frame with the given data.
  178. case sendGoAwayAndPing(HTTP2PingData)
  179. /// Ignore the request to start graceful shutdown.
  180. case none
  181. }
  182. /// Request that the connection begins graceful shutdown.
  183. mutating func startGracefulShutdown() -> OnStartGracefulShutdown {
  184. let onStartGracefulShutdown: OnStartGracefulShutdown
  185. switch self.state {
  186. case .active(let state):
  187. self.state = .closing(State.Closing(from: state))
  188. onStartGracefulShutdown = .sendGoAwayAndPing(self.goAwayPingData)
  189. case .closing, .closed:
  190. onStartGracefulShutdown = .none
  191. }
  192. return onStartGracefulShutdown
  193. }
  194. /// Reset the state of keep-alive policing.
  195. mutating func resetKeepAliveState() {
  196. switch self.state {
  197. case .active(var state):
  198. state.keepAlive.reset()
  199. self.state = .active(state)
  200. case .closing(var state):
  201. state.keepAlive.reset()
  202. self.state = .closing(state)
  203. case .closed:
  204. ()
  205. }
  206. }
  207. /// Marks the state as closed.
  208. mutating func markClosed() {
  209. self.state = .closed
  210. }
  211. }
  212. }
  213. extension ServerConnectionManagementHandler.StateMachine {
  214. fileprivate struct KeepAlive {
  215. /// Allow the client to send keep alive pings when there are no active calls.
  216. private let allowWithoutCalls: Bool
  217. /// The minimum time interval which pings may be received at when there are no active calls.
  218. private let minPingReceiveIntervalWithoutCalls: TimeAmount
  219. /// The maximum number of "bad" pings sent by the client the server tolerates before closing
  220. /// the connection.
  221. private let maxPingStrikes: Int
  222. /// The number of "bad" pings sent by the client. This can be reset when the server sends
  223. /// DATA or HEADERS frames.
  224. ///
  225. /// Ping strikes account for pings being occasionally being used for purposes other than keep
  226. /// alive (a low number of strikes is therefore expected and okay).
  227. private var pingStrikes: Int
  228. /// The last time a valid ping happened.
  229. ///
  230. /// Note: `distantPast` isn't used to indicate no previous valid ping as `NIODeadline` uses
  231. /// the monotonic clock on Linux which uses an undefined starting point and in some cases isn't
  232. /// always that distant.
  233. private var lastValidPingTime: NIODeadline?
  234. init(allowWithoutCalls: Bool, minPingReceiveIntervalWithoutCalls: TimeAmount) {
  235. self.allowWithoutCalls = allowWithoutCalls
  236. self.minPingReceiveIntervalWithoutCalls = minPingReceiveIntervalWithoutCalls
  237. self.maxPingStrikes = 2
  238. self.pingStrikes = 0
  239. self.lastValidPingTime = nil
  240. }
  241. /// Reset ping strikes and the time of the last valid ping.
  242. mutating func reset() {
  243. self.lastValidPingTime = nil
  244. self.pingStrikes = 0
  245. }
  246. /// Returns whether the client has sent too many pings.
  247. mutating func receivedPing(atTime time: NIODeadline, hasOpenStreams: Bool) -> Bool {
  248. let interval: TimeAmount
  249. if hasOpenStreams || self.allowWithoutCalls {
  250. interval = self.minPingReceiveIntervalWithoutCalls
  251. } else {
  252. // If there are no open streams and keep alive pings aren't allowed without calls then
  253. // use an interval of two hours.
  254. //
  255. // This comes from gRFC A8: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
  256. interval = .hours(2)
  257. }
  258. // If there's no last ping time then the first is acceptable.
  259. let isAcceptablePing = self.lastValidPingTime.map { $0 + interval <= time } ?? true
  260. let tooManyPings: Bool
  261. if isAcceptablePing {
  262. self.lastValidPingTime = time
  263. tooManyPings = false
  264. } else {
  265. self.pingStrikes += 1
  266. tooManyPings = self.pingStrikes > self.maxPingStrikes
  267. }
  268. return tooManyPings
  269. }
  270. }
  271. }
  272. extension ServerConnectionManagementHandler.StateMachine {
  273. fileprivate enum State {
  274. /// The connection is active.
  275. struct Active {
  276. /// The number of open streams.
  277. var openStreams: Set<HTTP2StreamID>
  278. /// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
  279. var lastStreamID: HTTP2StreamID
  280. /// The state of keep alive.
  281. var keepAlive: KeepAlive
  282. init(keepAlive: KeepAlive) {
  283. self.openStreams = []
  284. self.lastStreamID = .rootStream
  285. self.keepAlive = keepAlive
  286. }
  287. }
  288. /// The connection is closing gracefully, an initial GOAWAY frame has been sent (with the
  289. /// last stream ID set to max).
  290. struct Closing {
  291. /// The number of open streams.
  292. var openStreams: Set<HTTP2StreamID>
  293. /// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
  294. var lastStreamID: HTTP2StreamID
  295. /// The state of keep alive.
  296. var keepAlive: KeepAlive
  297. /// Whether the second GOAWAY frame has been sent with a lower stream ID.
  298. var sentSecondGoAway: Bool
  299. init(from state: Active) {
  300. self.openStreams = state.openStreams
  301. self.lastStreamID = state.lastStreamID
  302. self.keepAlive = state.keepAlive
  303. self.sentSecondGoAway = false
  304. }
  305. }
  306. case active(Active)
  307. case closing(Closing)
  308. case closed
  309. }
  310. }