ClientConnectionHandler.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. /// An event which happens on a client's HTTP/2 connection.
  19. @_spi(Package)
  20. public enum ClientConnectionEvent: Sendable {
  21. public enum CloseReason: Sendable {
  22. /// The server sent a GOAWAY frame to the client.
  23. case goAway(HTTP2ErrorCode, String)
  24. /// The keep alive timer fired and subsequently timed out.
  25. case keepaliveExpired
  26. /// The connection became idle.
  27. case idle
  28. /// The local peer initiated the close.
  29. case initiatedLocally
  30. /// The connection was closed unexpectedly
  31. case unexpected(Error?, isIdle: Bool)
  32. }
  33. /// The connection is now ready.
  34. case ready
  35. /// The connection has started shutting down, no new streams should be created.
  36. case closing(CloseReason)
  37. }
  38. /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
  39. ///
  40. /// This handler is responsible for managing several aspects of the connection. These include:
  41. /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
  42. /// connection if necessary.
  43. /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
  44. /// 3. Forwarding lifecycle events to the next handler.
  45. ///
  46. /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
  47. final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
  48. typealias InboundIn = HTTP2Frame
  49. typealias InboundOut = ClientConnectionEvent
  50. typealias OutboundIn = Never
  51. typealias OutboundOut = HTTP2Frame
  52. enum OutboundEvent: Hashable, Sendable {
  53. /// Close the connection gracefully
  54. case closeGracefully
  55. }
  56. /// The `EventLoop` of the `Channel` this handler exists in.
  57. private let eventLoop: EventLoop
  58. /// The maximum amount of time the connection may be idle for. If the connection remains idle
  59. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  60. /// closed.
  61. private var maxIdleTimer: Timer?
  62. /// The amount of time to wait before sending a keep alive ping.
  63. private var keepaliveTimer: Timer?
  64. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  65. /// `keepaliveTimer` is set.
  66. private var keepaliveTimeoutTimer: Timer
  67. /// Opaque data sent in keep alive pings.
  68. private let keepalivePingData: HTTP2PingData
  69. /// The current state of the connection.
  70. private var state: StateMachine
  71. /// Whether a flush is pending.
  72. private var flushPending: Bool
  73. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  74. /// Resets once `channelReadComplete` returns.
  75. private var inReadLoop: Bool
  76. /// The context of the channel this handler is in.
  77. private var context: ChannelHandlerContext?
  78. /// Creates a new handler which manages the lifecycle of a connection.
  79. ///
  80. /// - Parameters:
  81. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  82. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  83. /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
  84. /// ping.
  85. /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
  86. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  87. /// is received.
  88. /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
  89. /// in progress.
  90. init(
  91. eventLoop: EventLoop,
  92. maxIdleTime: TimeAmount?,
  93. keepaliveTime: TimeAmount?,
  94. keepaliveTimeout: TimeAmount?,
  95. keepaliveWithoutCalls: Bool
  96. ) {
  97. self.eventLoop = eventLoop
  98. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  99. self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
  100. self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
  101. self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  102. self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
  103. self.flushPending = false
  104. self.inReadLoop = false
  105. }
  106. func handlerAdded(context: ChannelHandlerContext) {
  107. assert(context.eventLoop === self.eventLoop)
  108. self.context = context
  109. }
  110. func handlerRemoved(context: ChannelHandlerContext) {
  111. self.context = nil
  112. }
  113. func channelActive(context: ChannelHandlerContext) {
  114. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  115. self.keepaliveTimerFired(context: context)
  116. }
  117. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  118. self.maxIdleTimerFired(context: context)
  119. }
  120. }
  121. func channelInactive(context: ChannelHandlerContext) {
  122. switch self.state.closed() {
  123. case .none:
  124. ()
  125. case .unexpectedClose(let error, let isIdle):
  126. let event = self.wrapInboundOut(.closing(.unexpected(error, isIdle: isIdle)))
  127. context.fireChannelRead(event)
  128. case .succeed(let promise):
  129. promise.succeed()
  130. }
  131. self.keepaliveTimer?.cancel()
  132. self.keepaliveTimeoutTimer.cancel()
  133. }
  134. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  135. switch event {
  136. case let event as NIOHTTP2StreamCreatedEvent:
  137. self._streamCreated(event.streamID, channel: context.channel)
  138. case let event as StreamClosedEvent:
  139. self._streamClosed(event.streamID, channel: context.channel)
  140. default:
  141. ()
  142. }
  143. context.fireUserInboundEventTriggered(event)
  144. }
  145. func errorCaught(context: ChannelHandlerContext, error: any Error) {
  146. self.state.receivedError(error)
  147. context.fireErrorCaught(error)
  148. }
  149. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  150. let frame = self.unwrapInboundIn(data)
  151. self.inReadLoop = true
  152. switch frame.payload {
  153. case .goAway(_, let errorCode, let data):
  154. // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
  155. // closing the connection.
  156. switch self.state.beginGracefulShutdown(promise: nil) {
  157. case .sendGoAway(let close):
  158. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  159. let message = data.map { String(buffer: $0) } ?? ""
  160. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  161. // Clients should send GOAWAYs when closing a connection.
  162. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  163. if close {
  164. context.close(promise: nil)
  165. }
  166. case .none:
  167. ()
  168. }
  169. case .ping(let data, let ack):
  170. // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
  171. // particular only those carrying the keep-alive data.
  172. if ack, data == self.keepalivePingData {
  173. self.keepaliveTimeoutTimer.cancel()
  174. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  175. self.keepaliveTimerFired(context: context)
  176. }
  177. }
  178. case .settings(.settings(_)):
  179. let isInitialSettings = self.state.receivedSettings()
  180. // The first settings frame indicates that the connection is now ready to use. The channel
  181. // becoming active is insufficient as, for example, a TLS handshake may fail after
  182. // establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
  183. if isInitialSettings {
  184. context.fireChannelRead(self.wrapInboundOut(.ready))
  185. }
  186. default:
  187. ()
  188. }
  189. }
  190. func channelReadComplete(context: ChannelHandlerContext) {
  191. while self.flushPending {
  192. self.flushPending = false
  193. context.flush()
  194. }
  195. self.inReadLoop = false
  196. context.fireChannelReadComplete()
  197. }
  198. func triggerUserOutboundEvent(
  199. context: ChannelHandlerContext,
  200. event: Any,
  201. promise: EventLoopPromise<Void>?
  202. ) {
  203. if let event = event as? OutboundEvent {
  204. switch event {
  205. case .closeGracefully:
  206. switch self.state.beginGracefulShutdown(promise: promise) {
  207. case .sendGoAway(let close):
  208. context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
  209. // The client could send a GOAWAY at this point but it's not really necessary, the server
  210. // can't open streams anyway, the client will just close the connection when it's done.
  211. if close {
  212. context.close(promise: nil)
  213. }
  214. case .none:
  215. ()
  216. }
  217. }
  218. } else {
  219. context.triggerUserOutboundEvent(event, promise: promise)
  220. }
  221. }
  222. }
  223. extension ClientConnectionHandler {
  224. struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
  225. // @unchecked is okay: the only methods do the appropriate event-loop dance.
  226. private let handler: ClientConnectionHandler
  227. init(_ handler: ClientConnectionHandler) {
  228. self.handler = handler
  229. }
  230. func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  231. if self.handler.eventLoop.inEventLoop {
  232. self.handler._streamCreated(id, channel: channel)
  233. } else {
  234. self.handler.eventLoop.execute {
  235. self.handler._streamCreated(id, channel: channel)
  236. }
  237. }
  238. }
  239. func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  240. if self.handler.eventLoop.inEventLoop {
  241. self.handler._streamClosed(id, channel: channel)
  242. } else {
  243. self.handler.eventLoop.execute {
  244. self.handler._streamClosed(id, channel: channel)
  245. }
  246. }
  247. }
  248. }
  249. var http2StreamDelegate: HTTP2StreamDelegate {
  250. return HTTP2StreamDelegate(self)
  251. }
  252. private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  253. self.eventLoop.assertInEventLoop()
  254. // Stream created, so the connection isn't idle.
  255. self.maxIdleTimer?.cancel()
  256. self.state.streamOpened(id)
  257. }
  258. private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  259. guard let context = self.context else { return }
  260. self.eventLoop.assertInEventLoop()
  261. switch self.state.streamClosed(id) {
  262. case .startIdleTimer(let cancelKeepalive):
  263. // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
  264. // not stop if keep-alive is allowed when there are no active calls).
  265. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  266. self.maxIdleTimerFired(context: context)
  267. }
  268. if cancelKeepalive {
  269. self.keepaliveTimer?.cancel()
  270. }
  271. case .close:
  272. // Connection was closing but waiting for all streams to close. They must all be closed
  273. // now so close the connection.
  274. context.close(promise: nil)
  275. case .none:
  276. ()
  277. }
  278. }
  279. }
  280. extension ClientConnectionHandler {
  281. private func maybeFlush(context: ChannelHandlerContext) {
  282. if self.inReadLoop {
  283. self.flushPending = true
  284. } else {
  285. context.flush()
  286. }
  287. }
  288. private func keepaliveTimerFired(context: ChannelHandlerContext) {
  289. guard self.state.sendKeepalivePing() else { return }
  290. // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
  291. // is acknowledged.
  292. self.keepaliveTimer?.cancel()
  293. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
  294. context.write(self.wrapOutboundOut(ping), promise: nil)
  295. self.maybeFlush(context: context)
  296. // Schedule a timeout on waiting for the response.
  297. self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
  298. self.keepaliveTimeoutExpired(context: context)
  299. }
  300. }
  301. private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
  302. guard self.state.beginClosing() else { return }
  303. context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
  304. self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
  305. context.close(promise: nil)
  306. }
  307. private func maxIdleTimerFired(context: ChannelHandlerContext) {
  308. guard self.state.beginClosing() else { return }
  309. context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
  310. self.writeAndFlushGoAway(context: context, message: "idle")
  311. context.close(promise: nil)
  312. }
  313. private func writeAndFlushGoAway(
  314. context: ChannelHandlerContext,
  315. errorCode: HTTP2ErrorCode = .noError,
  316. message: String? = nil
  317. ) {
  318. let goAway = HTTP2Frame(
  319. streamID: .rootStream,
  320. payload: .goAway(
  321. lastStreamID: 0,
  322. errorCode: errorCode,
  323. opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
  324. )
  325. )
  326. context.write(self.wrapOutboundOut(goAway), promise: nil)
  327. self.maybeFlush(context: context)
  328. }
  329. }
  330. extension ClientConnectionHandler {
  331. struct StateMachine {
  332. private var state: State
  333. private enum State {
  334. case active(Active)
  335. case closing(Closing)
  336. case closed
  337. struct Active {
  338. var openStreams: Set<HTTP2StreamID>
  339. var allowKeepaliveWithoutCalls: Bool
  340. var receivedConnectionPreface: Bool
  341. var error: (any Error)?
  342. init(allowKeepaliveWithoutCalls: Bool) {
  343. self.openStreams = []
  344. self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
  345. self.receivedConnectionPreface = false
  346. self.error = nil
  347. }
  348. mutating func receivedSettings() -> Bool {
  349. let isFirstSettingsFrame = !self.receivedConnectionPreface
  350. self.receivedConnectionPreface = true
  351. return isFirstSettingsFrame
  352. }
  353. }
  354. struct Closing {
  355. var allowKeepaliveWithoutCalls: Bool
  356. var openStreams: Set<HTTP2StreamID>
  357. var closePromise: Optional<EventLoopPromise<Void>>
  358. init(from state: Active, closePromise: EventLoopPromise<Void>?) {
  359. self.openStreams = state.openStreams
  360. self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
  361. self.closePromise = closePromise
  362. }
  363. }
  364. }
  365. init(allowKeepaliveWithoutCalls: Bool) {
  366. self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
  367. }
  368. /// Record that a SETTINGS frame was received from the remote peer.
  369. ///
  370. /// - Returns: `true` if this was the first SETTINGS frame received.
  371. mutating func receivedSettings() -> Bool {
  372. switch self.state {
  373. case .active(var active):
  374. let isFirstSettingsFrame = active.receivedSettings()
  375. self.state = .active(active)
  376. return isFirstSettingsFrame
  377. case .closing, .closed:
  378. return false
  379. }
  380. }
  381. /// Record that an error was received.
  382. mutating func receivedError(_ error: any Error) {
  383. switch self.state {
  384. case .active(var active):
  385. active.error = error
  386. self.state = .active(active)
  387. case .closing, .closed:
  388. ()
  389. }
  390. }
  391. /// Record that the stream with the given ID has been opened.
  392. mutating func streamOpened(_ id: HTTP2StreamID) {
  393. switch self.state {
  394. case .active(var state):
  395. let (inserted, _) = state.openStreams.insert(id)
  396. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  397. self.state = .active(state)
  398. case .closing(var state):
  399. let (inserted, _) = state.openStreams.insert(id)
  400. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  401. self.state = .closing(state)
  402. case .closed:
  403. ()
  404. }
  405. }
  406. enum OnStreamClosed: Equatable {
  407. /// Start the idle timer, after which the connection should be closed gracefully.
  408. case startIdleTimer(cancelKeepalive: Bool)
  409. /// Close the connection.
  410. case close
  411. /// Do nothing.
  412. case none
  413. }
  414. /// Record that the stream with the given ID has been closed.
  415. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  416. let onStreamClosed: OnStreamClosed
  417. switch self.state {
  418. case .active(var state):
  419. let removedID = state.openStreams.remove(id)
  420. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  421. if state.openStreams.isEmpty {
  422. onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
  423. } else {
  424. onStreamClosed = .none
  425. }
  426. self.state = .active(state)
  427. case .closing(var state):
  428. let removedID = state.openStreams.remove(id)
  429. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  430. onStreamClosed = state.openStreams.isEmpty ? .close : .none
  431. self.state = .closing(state)
  432. case .closed:
  433. onStreamClosed = .none
  434. }
  435. return onStreamClosed
  436. }
  437. /// Returns whether a keep alive ping should be sent to the server.
  438. mutating func sendKeepalivePing() -> Bool {
  439. let sendKeepalivePing: Bool
  440. // Only send a ping if there are open streams or there are no open streams and keep alive
  441. // is permitted when there are no active calls.
  442. switch self.state {
  443. case .active(let state):
  444. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  445. case .closing(let state):
  446. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  447. case .closed:
  448. sendKeepalivePing = false
  449. }
  450. return sendKeepalivePing
  451. }
  452. enum OnGracefulShutDown: Equatable {
  453. case sendGoAway(Bool)
  454. case none
  455. }
  456. mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
  457. let onGracefulShutdown: OnGracefulShutDown
  458. switch self.state {
  459. case .active(let state):
  460. // Only close immediately if there are no open streams. The client doesn't need to
  461. // ratchet down the last stream ID as only the client creates streams in gRPC.
  462. let close = state.openStreams.isEmpty
  463. onGracefulShutdown = .sendGoAway(close)
  464. self.state = .closing(State.Closing(from: state, closePromise: promise))
  465. case .closing(var state):
  466. state.closePromise.setOrCascade(to: promise)
  467. self.state = .closing(state)
  468. onGracefulShutdown = .none
  469. case .closed:
  470. onGracefulShutdown = .none
  471. }
  472. return onGracefulShutdown
  473. }
  474. /// Returns whether the connection should be closed.
  475. mutating func beginClosing() -> Bool {
  476. switch self.state {
  477. case .active(let active):
  478. self.state = .closing(State.Closing(from: active, closePromise: nil))
  479. return true
  480. case .closing, .closed:
  481. return false
  482. }
  483. }
  484. enum OnClosed {
  485. case succeed(EventLoopPromise<Void>)
  486. case unexpectedClose(Error?, isIdle: Bool)
  487. case none
  488. }
  489. /// Marks the state as closed.
  490. mutating func closed() -> OnClosed {
  491. switch self.state {
  492. case .active(let state):
  493. self.state = .closed
  494. return .unexpectedClose(state.error, isIdle: state.openStreams.isEmpty)
  495. case .closing(let closing):
  496. self.state = .closed
  497. return closing.closePromise.map { .succeed($0) } ?? .none
  498. case .closed:
  499. self.state = .closed
  500. return .none
  501. }
  502. }
  503. }
  504. }