ClientConnectionHandler.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. /// An event which happens on a client's HTTP/2 connection.
  19. enum ClientConnectionEvent: Sendable, Hashable {
  20. enum CloseReason: Sendable, Hashable {
  21. /// The server sent a GOAWAY frame to the client.
  22. case goAway(HTTP2ErrorCode, String)
  23. /// The keep alive timer fired and subsequently timed out.
  24. case keepaliveExpired
  25. /// The connection became idle.
  26. case idle
  27. /// The local peer initiated the close.
  28. case initiatedLocally
  29. }
  30. /// The connection has started shutting down, no new streams should be created.
  31. case closing(CloseReason)
  32. }
  33. /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
  34. ///
  35. /// This handler is responsible for managing several aspects of the connection. These include:
  36. /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
  37. /// connection if necessary.
  38. /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
  39. /// 3. Forwarding lifecycle events to the next handler.
  40. ///
  41. /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
  42. final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
  43. typealias InboundIn = HTTP2Frame
  44. typealias InboundOut = ClientConnectionEvent
  45. typealias OutboundIn = Never
  46. typealias OutboundOut = HTTP2Frame
  47. enum OutboundEvent: Hashable, Sendable {
  48. /// Close the connection gracefully
  49. case closeGracefully
  50. }
  51. /// The `EventLoop` of the `Channel` this handler exists in.
  52. private let eventLoop: EventLoop
  53. /// The maximum amount of time the connection may be idle for. If the connection remains idle
  54. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  55. /// closed.
  56. private var maxIdleTimer: Timer?
  57. /// The amount of time to wait before sending a keep alive ping.
  58. private var keepaliveTimer: Timer?
  59. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  60. /// `keepaliveTimer` is set.
  61. private var keepaliveTimeoutTimer: Timer
  62. /// Opaque data sent in keep alive pings.
  63. private let keepalivePingData: HTTP2PingData
  64. /// The current state of the connection.
  65. private var state: StateMachine
  66. /// Whether a flush is pending.
  67. private var flushPending: Bool
  68. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  69. /// Resets once `channelReadComplete` returns.
  70. private var inReadLoop: Bool
  71. /// Creates a new handler which manages the lifecycle of a connection.
  72. ///
  73. /// - Parameters:
  74. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  75. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  76. /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
  77. /// ping.
  78. /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
  79. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  80. /// is received.
  81. /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
  82. /// in progress.
  83. init(
  84. eventLoop: EventLoop,
  85. maxIdleTime: TimeAmount?,
  86. keepaliveTime: TimeAmount?,
  87. keepaliveTimeout: TimeAmount?,
  88. keepaliveWithoutCalls: Bool
  89. ) {
  90. self.eventLoop = eventLoop
  91. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  92. self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
  93. self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
  94. self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  95. self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
  96. self.flushPending = false
  97. self.inReadLoop = false
  98. }
  99. func handlerAdded(context: ChannelHandlerContext) {
  100. assert(context.eventLoop === self.eventLoop)
  101. }
  102. func channelActive(context: ChannelHandlerContext) {
  103. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  104. self.keepaliveTimerFired(context: context)
  105. }
  106. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  107. self.maxIdleTimerFired(context: context)
  108. }
  109. }
  110. func channelInactive(context: ChannelHandlerContext) {
  111. switch self.state.closed() {
  112. case .none:
  113. ()
  114. case .succeed(let promise):
  115. promise.succeed()
  116. }
  117. self.keepaliveTimer?.cancel()
  118. self.keepaliveTimeoutTimer.cancel()
  119. }
  120. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  121. switch event {
  122. case let event as NIOHTTP2StreamCreatedEvent:
  123. // Stream created, so the connection isn't idle.
  124. self.maxIdleTimer?.cancel()
  125. self.state.streamOpened(event.streamID)
  126. case let event as StreamClosedEvent:
  127. switch self.state.streamClosed(event.streamID) {
  128. case .startIdleTimer(let cancelKeepalive):
  129. // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
  130. // not stop if keep-alive is allowed when there are no active calls).
  131. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  132. self.maxIdleTimerFired(context: context)
  133. }
  134. if cancelKeepalive {
  135. self.keepaliveTimer?.cancel()
  136. }
  137. case .close:
  138. // Connection was closing but waiting for all streams to close. They must all be closed
  139. // now so close the connection.
  140. context.close(promise: nil)
  141. case .none:
  142. ()
  143. }
  144. default:
  145. ()
  146. }
  147. context.fireUserInboundEventTriggered(event)
  148. }
  149. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  150. let frame = self.unwrapInboundIn(data)
  151. self.inReadLoop = true
  152. switch frame.payload {
  153. case .goAway(_, let errorCode, let data):
  154. // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
  155. // closing the connection.
  156. switch self.state.beginGracefulShutdown(promise: nil) {
  157. case .sendGoAway(let close):
  158. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  159. let message = data.map { String(buffer: $0) } ?? ""
  160. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  161. // Clients should send GOAWAYs when closing a connection.
  162. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  163. if close {
  164. context.close(promise: nil)
  165. }
  166. case .none:
  167. ()
  168. }
  169. case .ping(let data, let ack):
  170. // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
  171. // particular only those carrying the keep-alive data.
  172. if ack, data == self.keepalivePingData {
  173. self.keepaliveTimeoutTimer.cancel()
  174. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  175. self.keepaliveTimerFired(context: context)
  176. }
  177. }
  178. default:
  179. ()
  180. }
  181. }
  182. func channelReadComplete(context: ChannelHandlerContext) {
  183. while self.flushPending {
  184. self.flushPending = false
  185. context.flush()
  186. }
  187. self.inReadLoop = false
  188. context.fireChannelReadComplete()
  189. }
  190. func triggerUserOutboundEvent(
  191. context: ChannelHandlerContext,
  192. event: Any,
  193. promise: EventLoopPromise<Void>?
  194. ) {
  195. if let event = event as? OutboundEvent {
  196. switch event {
  197. case .closeGracefully:
  198. switch self.state.beginGracefulShutdown(promise: promise) {
  199. case .sendGoAway(let close):
  200. context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
  201. // Clients should send GOAWAYs when closing a connection.
  202. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  203. if close {
  204. context.close(promise: nil)
  205. }
  206. case .none:
  207. ()
  208. }
  209. }
  210. } else {
  211. context.triggerUserOutboundEvent(event, promise: promise)
  212. }
  213. }
  214. }
  215. extension ClientConnectionHandler {
  216. private func maybeFlush(context: ChannelHandlerContext) {
  217. if self.inReadLoop {
  218. self.flushPending = true
  219. } else {
  220. context.flush()
  221. }
  222. }
  223. private func keepaliveTimerFired(context: ChannelHandlerContext) {
  224. guard self.state.sendKeepalivePing() else { return }
  225. // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
  226. // is acknowledged.
  227. self.keepaliveTimer?.cancel()
  228. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
  229. context.write(self.wrapOutboundOut(ping), promise: nil)
  230. self.maybeFlush(context: context)
  231. // Schedule a timeout on waiting for the response.
  232. self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
  233. self.keepaliveTimeoutExpired(context: context)
  234. }
  235. }
  236. private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
  237. guard self.state.beginClosing() else { return }
  238. context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
  239. self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
  240. context.close(promise: nil)
  241. }
  242. private func maxIdleTimerFired(context: ChannelHandlerContext) {
  243. guard self.state.beginClosing() else { return }
  244. context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
  245. self.writeAndFlushGoAway(context: context, message: "idle")
  246. context.close(promise: nil)
  247. }
  248. private func writeAndFlushGoAway(
  249. context: ChannelHandlerContext,
  250. errorCode: HTTP2ErrorCode = .noError,
  251. message: String? = nil
  252. ) {
  253. let goAway = HTTP2Frame(
  254. streamID: .rootStream,
  255. payload: .goAway(
  256. lastStreamID: 0,
  257. errorCode: errorCode,
  258. opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
  259. )
  260. )
  261. context.write(self.wrapOutboundOut(goAway), promise: nil)
  262. self.maybeFlush(context: context)
  263. }
  264. }
  265. extension ClientConnectionHandler {
  266. struct StateMachine {
  267. private var state: State
  268. private enum State {
  269. case active(Active)
  270. case closing(Closing)
  271. case closed
  272. struct Active {
  273. var openStreams: Set<HTTP2StreamID>
  274. var allowKeepaliveWithoutCalls: Bool
  275. init(allowKeepaliveWithoutCalls: Bool) {
  276. self.openStreams = []
  277. self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
  278. }
  279. }
  280. struct Closing {
  281. var allowKeepaliveWithoutCalls: Bool
  282. var openStreams: Set<HTTP2StreamID>
  283. var closePromise: Optional<EventLoopPromise<Void>>
  284. init(from state: Active, closePromise: EventLoopPromise<Void>?) {
  285. self.openStreams = state.openStreams
  286. self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
  287. self.closePromise = closePromise
  288. }
  289. }
  290. }
  291. init(allowKeepaliveWithoutCalls: Bool) {
  292. self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
  293. }
  294. /// Record that the stream with the given ID has been opened.
  295. mutating func streamOpened(_ id: HTTP2StreamID) {
  296. switch self.state {
  297. case .active(var state):
  298. let (inserted, _) = state.openStreams.insert(id)
  299. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  300. self.state = .active(state)
  301. case .closing(var state):
  302. let (inserted, _) = state.openStreams.insert(id)
  303. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  304. self.state = .closing(state)
  305. case .closed:
  306. ()
  307. }
  308. }
  309. enum OnStreamClosed: Equatable {
  310. /// Start the idle timer, after which the connection should be closed gracefully.
  311. case startIdleTimer(cancelKeepalive: Bool)
  312. /// Close the connection.
  313. case close
  314. /// Do nothing.
  315. case none
  316. }
  317. /// Record that the stream with the given ID has been closed.
  318. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  319. let onStreamClosed: OnStreamClosed
  320. switch self.state {
  321. case .active(var state):
  322. let removedID = state.openStreams.remove(id)
  323. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  324. if state.openStreams.isEmpty {
  325. onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
  326. } else {
  327. onStreamClosed = .none
  328. }
  329. self.state = .active(state)
  330. case .closing(var state):
  331. let removedID = state.openStreams.remove(id)
  332. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  333. onStreamClosed = state.openStreams.isEmpty ? .close : .none
  334. self.state = .closing(state)
  335. case .closed:
  336. onStreamClosed = .none
  337. }
  338. return onStreamClosed
  339. }
  340. /// Returns whether a keep alive ping should be sent to the server.
  341. mutating func sendKeepalivePing() -> Bool {
  342. let sendKeepalivePing: Bool
  343. // Only send a ping if there are open streams or there are no open streams and keep alive
  344. // is permitted when there are no active calls.
  345. switch self.state {
  346. case .active(let state):
  347. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  348. case .closing(let state):
  349. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  350. case .closed:
  351. sendKeepalivePing = false
  352. }
  353. return sendKeepalivePing
  354. }
  355. enum OnGracefulShutDown: Equatable {
  356. case sendGoAway(Bool)
  357. case none
  358. }
  359. mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
  360. let onGracefulShutdown: OnGracefulShutDown
  361. switch self.state {
  362. case .active(let state):
  363. // Only close immediately if there are no open streams. The client doesn't need to
  364. // ratchet down the last stream ID as only the client creates streams in gRPC.
  365. let close = state.openStreams.isEmpty
  366. onGracefulShutdown = .sendGoAway(close)
  367. self.state = .closing(State.Closing(from: state, closePromise: promise))
  368. case .closing(var state):
  369. state.closePromise.setOrCascade(to: promise)
  370. self.state = .closing(state)
  371. onGracefulShutdown = .none
  372. case .closed:
  373. onGracefulShutdown = .none
  374. }
  375. return onGracefulShutdown
  376. }
  377. /// Returns whether the connection should be closed.
  378. mutating func beginClosing() -> Bool {
  379. switch self.state {
  380. case .active(let active):
  381. self.state = .closing(State.Closing(from: active, closePromise: nil))
  382. return true
  383. case .closing, .closed:
  384. return false
  385. }
  386. }
  387. enum OnClosed {
  388. case succeed(EventLoopPromise<Void>)
  389. case none
  390. }
  391. /// Marks the state as closed.
  392. mutating func closed() -> OnClosed {
  393. switch self.state {
  394. case .active, .closed:
  395. self.state = .closed
  396. return .none
  397. case .closing(let closing):
  398. self.state = .closed
  399. return closing.closePromise.map { .succeed($0) } ?? .none
  400. }
  401. }
  402. }
  403. }
  404. extension Optional {
  405. // TODO: replace with https://github.com/apple/swift-nio/pull/2697
  406. mutating func setOrCascade<Value>(
  407. to promise: EventLoopPromise<Value>?
  408. ) where Wrapped == EventLoopPromise<Value> {
  409. guard let promise = promise else { return }
  410. switch self {
  411. case .none:
  412. self = .some(promise)
  413. case .some(let existing):
  414. existing.futureResult.cascade(to: promise)
  415. }
  416. }
  417. }