ClientConnectionHandler.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. /// An event which happens on a client's HTTP/2 connection.
  19. @_spi(Package)
  20. public enum ClientConnectionEvent: Sendable, Hashable {
  21. public enum CloseReason: Sendable, Hashable {
  22. /// The server sent a GOAWAY frame to the client.
  23. case goAway(HTTP2ErrorCode, String)
  24. /// The keep alive timer fired and subsequently timed out.
  25. case keepaliveExpired
  26. /// The connection became idle.
  27. case idle
  28. /// The local peer initiated the close.
  29. case initiatedLocally
  30. }
  31. /// The connection is now ready.
  32. case ready
  33. /// The connection has started shutting down, no new streams should be created.
  34. case closing(CloseReason)
  35. }
  36. /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
  37. ///
  38. /// This handler is responsible for managing several aspects of the connection. These include:
  39. /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
  40. /// connection if necessary.
  41. /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
  42. /// 3. Forwarding lifecycle events to the next handler.
  43. ///
  44. /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
  45. final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
  46. typealias InboundIn = HTTP2Frame
  47. typealias InboundOut = ClientConnectionEvent
  48. typealias OutboundIn = Never
  49. typealias OutboundOut = HTTP2Frame
  50. enum OutboundEvent: Hashable, Sendable {
  51. /// Close the connection gracefully
  52. case closeGracefully
  53. }
  54. /// The `EventLoop` of the `Channel` this handler exists in.
  55. private let eventLoop: EventLoop
  56. /// The maximum amount of time the connection may be idle for. If the connection remains idle
  57. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  58. /// closed.
  59. private var maxIdleTimer: Timer?
  60. /// The amount of time to wait before sending a keep alive ping.
  61. private var keepaliveTimer: Timer?
  62. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  63. /// `keepaliveTimer` is set.
  64. private var keepaliveTimeoutTimer: Timer
  65. /// Opaque data sent in keep alive pings.
  66. private let keepalivePingData: HTTP2PingData
  67. /// The current state of the connection.
  68. private var state: StateMachine
  69. /// Whether a flush is pending.
  70. private var flushPending: Bool
  71. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  72. /// Resets once `channelReadComplete` returns.
  73. private var inReadLoop: Bool
  74. /// The context of the channel this handler is in.
  75. private var context: ChannelHandlerContext?
  76. /// Creates a new handler which manages the lifecycle of a connection.
  77. ///
  78. /// - Parameters:
  79. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  80. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  81. /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
  82. /// ping.
  83. /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
  84. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  85. /// is received.
  86. /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
  87. /// in progress.
  88. init(
  89. eventLoop: EventLoop,
  90. maxIdleTime: TimeAmount?,
  91. keepaliveTime: TimeAmount?,
  92. keepaliveTimeout: TimeAmount?,
  93. keepaliveWithoutCalls: Bool
  94. ) {
  95. self.eventLoop = eventLoop
  96. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  97. self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
  98. self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
  99. self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  100. self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
  101. self.flushPending = false
  102. self.inReadLoop = false
  103. }
  104. func handlerAdded(context: ChannelHandlerContext) {
  105. assert(context.eventLoop === self.eventLoop)
  106. self.context = context
  107. }
  108. func handlerRemoved(context: ChannelHandlerContext) {
  109. self.context = nil
  110. }
  111. func channelActive(context: ChannelHandlerContext) {
  112. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  113. self.keepaliveTimerFired(context: context)
  114. }
  115. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  116. self.maxIdleTimerFired(context: context)
  117. }
  118. }
  119. func channelInactive(context: ChannelHandlerContext) {
  120. switch self.state.closed() {
  121. case .none:
  122. ()
  123. case .succeed(let promise):
  124. promise.succeed()
  125. }
  126. self.keepaliveTimer?.cancel()
  127. self.keepaliveTimeoutTimer.cancel()
  128. }
  129. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  130. switch event {
  131. case let event as NIOHTTP2StreamCreatedEvent:
  132. self.streamCreated(event.streamID, channel: context.channel)
  133. case let event as StreamClosedEvent:
  134. self.streamClosed(event.streamID, channel: context.channel)
  135. default:
  136. ()
  137. }
  138. context.fireUserInboundEventTriggered(event)
  139. }
  140. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  141. let frame = self.unwrapInboundIn(data)
  142. self.inReadLoop = true
  143. switch frame.payload {
  144. case .goAway(_, let errorCode, let data):
  145. // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
  146. // closing the connection.
  147. switch self.state.beginGracefulShutdown(promise: nil) {
  148. case .sendGoAway(let close):
  149. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  150. let message = data.map { String(buffer: $0) } ?? ""
  151. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  152. // Clients should send GOAWAYs when closing a connection.
  153. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  154. if close {
  155. context.close(promise: nil)
  156. }
  157. case .none:
  158. ()
  159. }
  160. case .ping(let data, let ack):
  161. // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
  162. // particular only those carrying the keep-alive data.
  163. if ack, data == self.keepalivePingData {
  164. self.keepaliveTimeoutTimer.cancel()
  165. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  166. self.keepaliveTimerFired(context: context)
  167. }
  168. }
  169. case .settings(.settings(_)):
  170. let isInitialSettings = self.state.receivedSettings()
  171. // The first settings frame indicates that the connection is now ready to use. The channel
  172. // becoming active is insufficient as, for example, a TLS handshake may fail after
  173. // establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
  174. if isInitialSettings {
  175. context.fireChannelRead(self.wrapInboundOut(.ready))
  176. }
  177. default:
  178. ()
  179. }
  180. }
  181. func channelReadComplete(context: ChannelHandlerContext) {
  182. while self.flushPending {
  183. self.flushPending = false
  184. context.flush()
  185. }
  186. self.inReadLoop = false
  187. context.fireChannelReadComplete()
  188. }
  189. func triggerUserOutboundEvent(
  190. context: ChannelHandlerContext,
  191. event: Any,
  192. promise: EventLoopPromise<Void>?
  193. ) {
  194. if let event = event as? OutboundEvent {
  195. switch event {
  196. case .closeGracefully:
  197. switch self.state.beginGracefulShutdown(promise: promise) {
  198. case .sendGoAway(let close):
  199. context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
  200. // Clients should send GOAWAYs when closing a connection.
  201. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  202. if close {
  203. context.close(promise: nil)
  204. }
  205. case .none:
  206. ()
  207. }
  208. }
  209. } else {
  210. context.triggerUserOutboundEvent(event, promise: promise)
  211. }
  212. }
  213. }
  214. extension ClientConnectionHandler: NIOHTTP2StreamDelegate {
  215. func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  216. self.eventLoop.assertInEventLoop()
  217. // Stream created, so the connection isn't idle.
  218. self.maxIdleTimer?.cancel()
  219. self.state.streamOpened(id)
  220. }
  221. func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  222. guard let context = self.context else { return }
  223. self.eventLoop.assertInEventLoop()
  224. switch self.state.streamClosed(id) {
  225. case .startIdleTimer(let cancelKeepalive):
  226. // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
  227. // not stop if keep-alive is allowed when there are no active calls).
  228. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  229. self.maxIdleTimerFired(context: context)
  230. }
  231. if cancelKeepalive {
  232. self.keepaliveTimer?.cancel()
  233. }
  234. case .close:
  235. // Connection was closing but waiting for all streams to close. They must all be closed
  236. // now so close the connection.
  237. context.close(promise: nil)
  238. case .none:
  239. ()
  240. }
  241. }
  242. }
  243. extension ClientConnectionHandler {
  244. private func maybeFlush(context: ChannelHandlerContext) {
  245. if self.inReadLoop {
  246. self.flushPending = true
  247. } else {
  248. context.flush()
  249. }
  250. }
  251. private func keepaliveTimerFired(context: ChannelHandlerContext) {
  252. guard self.state.sendKeepalivePing() else { return }
  253. // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
  254. // is acknowledged.
  255. self.keepaliveTimer?.cancel()
  256. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
  257. context.write(self.wrapOutboundOut(ping), promise: nil)
  258. self.maybeFlush(context: context)
  259. // Schedule a timeout on waiting for the response.
  260. self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
  261. self.keepaliveTimeoutExpired(context: context)
  262. }
  263. }
  264. private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
  265. guard self.state.beginClosing() else { return }
  266. context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
  267. self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
  268. context.close(promise: nil)
  269. }
  270. private func maxIdleTimerFired(context: ChannelHandlerContext) {
  271. guard self.state.beginClosing() else { return }
  272. context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
  273. self.writeAndFlushGoAway(context: context, message: "idle")
  274. context.close(promise: nil)
  275. }
  276. private func writeAndFlushGoAway(
  277. context: ChannelHandlerContext,
  278. errorCode: HTTP2ErrorCode = .noError,
  279. message: String? = nil
  280. ) {
  281. let goAway = HTTP2Frame(
  282. streamID: .rootStream,
  283. payload: .goAway(
  284. lastStreamID: 0,
  285. errorCode: errorCode,
  286. opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
  287. )
  288. )
  289. context.write(self.wrapOutboundOut(goAway), promise: nil)
  290. self.maybeFlush(context: context)
  291. }
  292. }
  293. extension ClientConnectionHandler {
  294. struct StateMachine {
  295. private var state: State
  296. private enum State {
  297. case active(Active)
  298. case closing(Closing)
  299. case closed
  300. struct Active {
  301. var openStreams: Set<HTTP2StreamID>
  302. var allowKeepaliveWithoutCalls: Bool
  303. var receivedConnectionPreface: Bool
  304. init(allowKeepaliveWithoutCalls: Bool) {
  305. self.openStreams = []
  306. self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
  307. self.receivedConnectionPreface = false
  308. }
  309. mutating func receivedSettings() -> Bool {
  310. let isFirstSettingsFrame = !self.receivedConnectionPreface
  311. self.receivedConnectionPreface = true
  312. return isFirstSettingsFrame
  313. }
  314. }
  315. struct Closing {
  316. var allowKeepaliveWithoutCalls: Bool
  317. var openStreams: Set<HTTP2StreamID>
  318. var closePromise: Optional<EventLoopPromise<Void>>
  319. init(from state: Active, closePromise: EventLoopPromise<Void>?) {
  320. self.openStreams = state.openStreams
  321. self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
  322. self.closePromise = closePromise
  323. }
  324. }
  325. }
  326. init(allowKeepaliveWithoutCalls: Bool) {
  327. self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
  328. }
  329. /// Record that a SETTINGS frame was received from the remote peer.
  330. ///
  331. /// - Returns: `true` if this was the first SETTINGS frame received.
  332. mutating func receivedSettings() -> Bool {
  333. switch self.state {
  334. case .active(var active):
  335. let isFirstSettingsFrame = active.receivedSettings()
  336. self.state = .active(active)
  337. return isFirstSettingsFrame
  338. case .closing, .closed:
  339. return false
  340. }
  341. }
  342. /// Record that the stream with the given ID has been opened.
  343. mutating func streamOpened(_ id: HTTP2StreamID) {
  344. switch self.state {
  345. case .active(var state):
  346. let (inserted, _) = state.openStreams.insert(id)
  347. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  348. self.state = .active(state)
  349. case .closing(var state):
  350. let (inserted, _) = state.openStreams.insert(id)
  351. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  352. self.state = .closing(state)
  353. case .closed:
  354. ()
  355. }
  356. }
  357. enum OnStreamClosed: Equatable {
  358. /// Start the idle timer, after which the connection should be closed gracefully.
  359. case startIdleTimer(cancelKeepalive: Bool)
  360. /// Close the connection.
  361. case close
  362. /// Do nothing.
  363. case none
  364. }
  365. /// Record that the stream with the given ID has been closed.
  366. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  367. let onStreamClosed: OnStreamClosed
  368. switch self.state {
  369. case .active(var state):
  370. let removedID = state.openStreams.remove(id)
  371. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  372. if state.openStreams.isEmpty {
  373. onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
  374. } else {
  375. onStreamClosed = .none
  376. }
  377. self.state = .active(state)
  378. case .closing(var state):
  379. let removedID = state.openStreams.remove(id)
  380. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  381. onStreamClosed = state.openStreams.isEmpty ? .close : .none
  382. self.state = .closing(state)
  383. case .closed:
  384. onStreamClosed = .none
  385. }
  386. return onStreamClosed
  387. }
  388. /// Returns whether a keep alive ping should be sent to the server.
  389. mutating func sendKeepalivePing() -> Bool {
  390. let sendKeepalivePing: Bool
  391. // Only send a ping if there are open streams or there are no open streams and keep alive
  392. // is permitted when there are no active calls.
  393. switch self.state {
  394. case .active(let state):
  395. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  396. case .closing(let state):
  397. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  398. case .closed:
  399. sendKeepalivePing = false
  400. }
  401. return sendKeepalivePing
  402. }
  403. enum OnGracefulShutDown: Equatable {
  404. case sendGoAway(Bool)
  405. case none
  406. }
  407. mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
  408. let onGracefulShutdown: OnGracefulShutDown
  409. switch self.state {
  410. case .active(let state):
  411. // Only close immediately if there are no open streams. The client doesn't need to
  412. // ratchet down the last stream ID as only the client creates streams in gRPC.
  413. let close = state.openStreams.isEmpty
  414. onGracefulShutdown = .sendGoAway(close)
  415. self.state = .closing(State.Closing(from: state, closePromise: promise))
  416. case .closing(var state):
  417. state.closePromise.setOrCascade(to: promise)
  418. self.state = .closing(state)
  419. onGracefulShutdown = .none
  420. case .closed:
  421. onGracefulShutdown = .none
  422. }
  423. return onGracefulShutdown
  424. }
  425. /// Returns whether the connection should be closed.
  426. mutating func beginClosing() -> Bool {
  427. switch self.state {
  428. case .active(let active):
  429. self.state = .closing(State.Closing(from: active, closePromise: nil))
  430. return true
  431. case .closing, .closed:
  432. return false
  433. }
  434. }
  435. enum OnClosed {
  436. case succeed(EventLoopPromise<Void>)
  437. case none
  438. }
  439. /// Marks the state as closed.
  440. mutating func closed() -> OnClosed {
  441. switch self.state {
  442. case .active, .closed:
  443. self.state = .closed
  444. return .none
  445. case .closing(let closing):
  446. self.state = .closed
  447. return closing.closePromise.map { .succeed($0) } ?? .none
  448. }
  449. }
  450. }
  451. }