ClientConnectionHandler.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. /// An event which happens on a client's HTTP/2 connection.
  19. enum ClientConnectionEvent: Sendable, Hashable {
  20. enum CloseReason: Sendable, Hashable {
  21. /// The server sent a GOAWAY frame to the client.
  22. case goAway(HTTP2ErrorCode, String)
  23. /// The keep alive timer fired and subsequently timed out.
  24. case keepAliveExpired
  25. /// The connection became idle.
  26. case idle
  27. }
  28. /// The connection has started shutting down, no new streams should be created.
  29. case closing(CloseReason)
  30. }
  31. /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
  32. ///
  33. /// This handler is responsible for managing several aspects of the connection. These include:
  34. /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
  35. /// connection if necessary.
  36. /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
  37. /// 3. Forwarding lifecycle events to the next handler.
  38. ///
  39. /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
  40. final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
  41. typealias InboundIn = HTTP2Frame
  42. typealias InboundOut = ClientConnectionEvent
  43. typealias OutboundIn = Never
  44. typealias OutboundOut = HTTP2Frame
  45. /// The `EventLoop` of the `Channel` this handler exists in.
  46. private let eventLoop: EventLoop
  47. /// The maximum amount of time the connection may be idle for. If the connection remains idle
  48. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  49. /// closed.
  50. private var maxIdleTimer: Timer?
  51. /// The amount of time to wait before sending a keep alive ping.
  52. private var keepAliveTimer: Timer?
  53. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  54. /// `keepAliveTimer` is set.
  55. private var keepAliveTimeoutTimer: Timer
  56. /// Opaque data sent in keep alive pings.
  57. private let keepAlivePingData: HTTP2PingData
  58. /// The current state of the connection.
  59. private var state: StateMachine
  60. /// Whether a flush is pending.
  61. private var flushPending: Bool
  62. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  63. /// Resets once `channelReadComplete` returns.
  64. private var inReadLoop: Bool
  65. /// Creates a new handler which manages the lifecycle of a connection.
  66. ///
  67. /// - Parameters:
  68. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  69. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  70. /// - keepAliveTime: The amount of time to wait after reading data before sending a keep-alive
  71. /// ping.
  72. /// - keepAliveTimeout: The amount of time the client has to reply after the server sends a
  73. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  74. /// is received.
  75. /// - keepAliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
  76. /// in progress.
  77. init(
  78. eventLoop: EventLoop,
  79. maxIdleTime: TimeAmount?,
  80. keepAliveTime: TimeAmount?,
  81. keepAliveTimeout: TimeAmount?,
  82. keepAliveWithoutCalls: Bool
  83. ) {
  84. self.eventLoop = eventLoop
  85. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  86. self.keepAliveTimer = keepAliveTime.map { Timer(delay: $0, repeat: true) }
  87. self.keepAliveTimeoutTimer = Timer(delay: keepAliveTimeout ?? .seconds(20))
  88. self.keepAlivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  89. self.state = StateMachine(allowKeepAliveWithoutCalls: keepAliveWithoutCalls)
  90. self.flushPending = false
  91. self.inReadLoop = false
  92. }
  93. func handlerAdded(context: ChannelHandlerContext) {
  94. assert(context.eventLoop === self.eventLoop)
  95. }
  96. func channelActive(context: ChannelHandlerContext) {
  97. self.keepAliveTimer?.schedule(on: context.eventLoop) {
  98. self.keepAliveTimerFired(context: context)
  99. }
  100. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  101. self.maxIdleTimerFired(context: context)
  102. }
  103. }
  104. func channelInactive(context: ChannelHandlerContext) {
  105. self.state.closed()
  106. self.keepAliveTimer?.cancel()
  107. self.keepAliveTimeoutTimer.cancel()
  108. }
  109. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  110. switch event {
  111. case let event as NIOHTTP2StreamCreatedEvent:
  112. // Stream created, so the connection isn't idle.
  113. self.maxIdleTimer?.cancel()
  114. self.state.streamOpened(event.streamID)
  115. case let event as StreamClosedEvent:
  116. switch self.state.streamClosed(event.streamID) {
  117. case .startIdleTimer(let cancelKeepAlive):
  118. // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
  119. // not stop if keep-alive is allowed when there are no active calls).
  120. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  121. self.maxIdleTimerFired(context: context)
  122. }
  123. if cancelKeepAlive {
  124. self.keepAliveTimer?.cancel()
  125. }
  126. case .close:
  127. // Connection was closing but waiting for all streams to close. They must all be closed
  128. // now so close the connection.
  129. context.close(promise: nil)
  130. case .none:
  131. ()
  132. }
  133. default:
  134. ()
  135. }
  136. context.fireUserInboundEventTriggered(event)
  137. }
  138. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  139. let frame = self.unwrapInboundIn(data)
  140. self.inReadLoop = true
  141. switch frame.payload {
  142. case .goAway(_, let errorCode, let data):
  143. // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
  144. // closing the connection.
  145. switch self.state.beginGracefulShutdown() {
  146. case .sendGoAway(let close):
  147. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  148. let message = data.map { String(buffer: $0) } ?? ""
  149. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  150. // Clients should send GOAWAYs when closing a connection.
  151. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  152. if close {
  153. context.close(promise: nil)
  154. }
  155. case .none:
  156. ()
  157. }
  158. case .ping(let data, let ack):
  159. // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
  160. // particular only those carrying the keep-alive data.
  161. if ack, data == self.keepAlivePingData {
  162. self.keepAliveTimeoutTimer.cancel()
  163. self.keepAliveTimer?.schedule(on: context.eventLoop) {
  164. self.keepAliveTimerFired(context: context)
  165. }
  166. }
  167. default:
  168. ()
  169. }
  170. }
  171. func channelReadComplete(context: ChannelHandlerContext) {
  172. while self.flushPending {
  173. self.flushPending = false
  174. context.flush()
  175. }
  176. self.inReadLoop = false
  177. context.fireChannelReadComplete()
  178. }
  179. }
  180. extension ClientConnectionHandler {
  181. private func maybeFlush(context: ChannelHandlerContext) {
  182. if self.inReadLoop {
  183. self.flushPending = true
  184. } else {
  185. context.flush()
  186. }
  187. }
  188. private func keepAliveTimerFired(context: ChannelHandlerContext) {
  189. guard self.state.sendKeepAlivePing() else { return }
  190. // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
  191. // is acknowledged.
  192. self.keepAliveTimer?.cancel()
  193. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepAlivePingData, ack: false))
  194. context.write(self.wrapOutboundOut(ping), promise: nil)
  195. self.maybeFlush(context: context)
  196. // Schedule a timeout on waiting for the response.
  197. self.keepAliveTimeoutTimer.schedule(on: context.eventLoop) {
  198. self.keepAliveTimeoutExpired(context: context)
  199. }
  200. }
  201. private func keepAliveTimeoutExpired(context: ChannelHandlerContext) {
  202. guard self.state.beginClosing() else { return }
  203. context.fireChannelRead(self.wrapInboundOut(.closing(.keepAliveExpired)))
  204. self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
  205. context.close(promise: nil)
  206. }
  207. private func maxIdleTimerFired(context: ChannelHandlerContext) {
  208. guard self.state.beginClosing() else { return }
  209. context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
  210. self.writeAndFlushGoAway(context: context, message: "idle")
  211. context.close(promise: nil)
  212. }
  213. private func writeAndFlushGoAway(
  214. context: ChannelHandlerContext,
  215. errorCode: HTTP2ErrorCode = .noError,
  216. message: String? = nil
  217. ) {
  218. let goAway = HTTP2Frame(
  219. streamID: .rootStream,
  220. payload: .goAway(
  221. lastStreamID: 0,
  222. errorCode: errorCode,
  223. opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
  224. )
  225. )
  226. context.write(self.wrapOutboundOut(goAway), promise: nil)
  227. self.maybeFlush(context: context)
  228. }
  229. }
  230. extension ClientConnectionHandler {
  231. struct StateMachine {
  232. private var state: State
  233. private enum State {
  234. case active(Active)
  235. case closing(Closing)
  236. case closed
  237. struct Active {
  238. var openStreams: Set<HTTP2StreamID>
  239. var allowKeepAliveWithoutCalls: Bool
  240. init(allowKeepAliveWithoutCalls: Bool) {
  241. self.openStreams = []
  242. self.allowKeepAliveWithoutCalls = allowKeepAliveWithoutCalls
  243. }
  244. }
  245. struct Closing {
  246. var allowKeepAliveWithoutCalls: Bool
  247. var openStreams: Set<HTTP2StreamID>
  248. init(from state: Active) {
  249. self.openStreams = state.openStreams
  250. self.allowKeepAliveWithoutCalls = state.allowKeepAliveWithoutCalls
  251. }
  252. }
  253. }
  254. init(allowKeepAliveWithoutCalls: Bool) {
  255. self.state = .active(State.Active(allowKeepAliveWithoutCalls: allowKeepAliveWithoutCalls))
  256. }
  257. /// Record that the stream with the given ID has been opened.
  258. mutating func streamOpened(_ id: HTTP2StreamID) {
  259. switch self.state {
  260. case .active(var state):
  261. let (inserted, _) = state.openStreams.insert(id)
  262. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  263. self.state = .active(state)
  264. case .closing(var state):
  265. let (inserted, _) = state.openStreams.insert(id)
  266. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  267. self.state = .closing(state)
  268. case .closed:
  269. ()
  270. }
  271. }
  272. enum OnStreamClosed: Equatable {
  273. /// Start the idle timer, after which the connection should be closed gracefully.
  274. case startIdleTimer(cancelKeepAlive: Bool)
  275. /// Close the connection.
  276. case close
  277. /// Do nothing.
  278. case none
  279. }
  280. /// Record that the stream with the given ID has been closed.
  281. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  282. let onStreamClosed: OnStreamClosed
  283. switch self.state {
  284. case .active(var state):
  285. let removedID = state.openStreams.remove(id)
  286. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  287. if state.openStreams.isEmpty {
  288. onStreamClosed = .startIdleTimer(cancelKeepAlive: !state.allowKeepAliveWithoutCalls)
  289. } else {
  290. onStreamClosed = .none
  291. }
  292. self.state = .active(state)
  293. case .closing(var state):
  294. let removedID = state.openStreams.remove(id)
  295. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  296. onStreamClosed = state.openStreams.isEmpty ? .close : .none
  297. self.state = .closing(state)
  298. case .closed:
  299. onStreamClosed = .none
  300. }
  301. return onStreamClosed
  302. }
  303. /// Returns whether a keep alive ping should be sent to the server.
  304. mutating func sendKeepAlivePing() -> Bool {
  305. let sendKeepAlivePing: Bool
  306. // Only send a ping if there are open streams or there are no open streams and keep alive
  307. // is permitted when there are no active calls.
  308. switch self.state {
  309. case .active(let state):
  310. sendKeepAlivePing = !state.openStreams.isEmpty || state.allowKeepAliveWithoutCalls
  311. case .closing(let state):
  312. sendKeepAlivePing = !state.openStreams.isEmpty || state.allowKeepAliveWithoutCalls
  313. case .closed:
  314. sendKeepAlivePing = false
  315. }
  316. return sendKeepAlivePing
  317. }
  318. enum OnGracefulShutDown: Equatable {
  319. case sendGoAway(Bool)
  320. case none
  321. }
  322. mutating func beginGracefulShutdown() -> OnGracefulShutDown {
  323. let onGracefulShutdown: OnGracefulShutDown
  324. switch self.state {
  325. case .active(let state):
  326. // Only close immediately if there are no open streams. The client doesn't need to
  327. // ratchet down the last stream ID as only the client creates streams in gRPC.
  328. let close = state.openStreams.isEmpty
  329. onGracefulShutdown = .sendGoAway(close)
  330. self.state = .closing(State.Closing(from: state))
  331. case .closing, .closed:
  332. onGracefulShutdown = .none
  333. }
  334. return onGracefulShutdown
  335. }
  336. /// Returns whether the connection should be closed.
  337. mutating func beginClosing() -> Bool {
  338. switch self.state {
  339. case .active(let active):
  340. self.state = .closing(State.Closing(from: active))
  341. return true
  342. case .closing, .closed:
  343. return false
  344. }
  345. }
  346. /// Marks the state as closed.
  347. mutating func closed() {
  348. self.state = .closed
  349. }
  350. }
  351. }