ClientConnectionHandler.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package import NIOCore
  17. package import NIOHTTP2
  18. /// An event which happens on a client's HTTP/2 connection.
  19. package enum ClientConnectionEvent: Sendable {
  20. package enum CloseReason: Sendable {
  21. /// The server sent a GOAWAY frame to the client.
  22. case goAway(HTTP2ErrorCode, String)
  23. /// The keep alive timer fired and subsequently timed out.
  24. case keepaliveExpired
  25. /// The connection became idle.
  26. case idle
  27. /// The local peer initiated the close.
  28. case initiatedLocally
  29. /// The connection was closed unexpectedly
  30. case unexpected((any Error)?, isIdle: Bool)
  31. }
  32. /// The connection is now ready.
  33. case ready
  34. /// The connection has started shutting down, no new streams should be created.
  35. case closing(CloseReason)
  36. }
  37. /// A `ChannelHandler` which manages part of the lifecycle of a gRPC connection over HTTP/2.
  38. ///
  39. /// This handler is responsible for managing several aspects of the connection. These include:
  40. /// 1. Periodically sending keep alive pings to the server (if configured) and closing the
  41. /// connection if necessary.
  42. /// 2. Closing the connection if it is idle (has no open streams) for a configured amount of time.
  43. /// 3. Forwarding lifecycle events to the next handler.
  44. ///
  45. /// Some of the behaviours are described in [gRFC A8](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
  46. package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandler {
  47. package typealias InboundIn = HTTP2Frame
  48. package typealias InboundOut = ClientConnectionEvent
  49. package typealias OutboundIn = Never
  50. package typealias OutboundOut = HTTP2Frame
  51. package enum OutboundEvent: Hashable, Sendable {
  52. /// Close the connection gracefully
  53. case closeGracefully
  54. }
  55. /// The `EventLoop` of the `Channel` this handler exists in.
  56. private let eventLoop: any EventLoop
  57. /// The maximum amount of time the connection may be idle for. If the connection remains idle
  58. /// (i.e. has no open streams) for this period of time then the connection will be gracefully
  59. /// closed.
  60. private var maxIdleTimer: Timer?
  61. /// The amount of time to wait before sending a keep alive ping.
  62. private var keepaliveTimer: Timer?
  63. /// The amount of time the client has to reply after sending a keep alive ping. Only used if
  64. /// `keepaliveTimer` is set.
  65. private var keepaliveTimeoutTimer: Timer
  66. /// Opaque data sent in keep alive pings.
  67. private let keepalivePingData: HTTP2PingData
  68. /// The current state of the connection.
  69. private var state: StateMachine
  70. /// Whether a flush is pending.
  71. private var flushPending: Bool
  72. /// Whether `channelRead` has been called and `channelReadComplete` hasn't yet been called.
  73. /// Resets once `channelReadComplete` returns.
  74. private var inReadLoop: Bool
  75. /// The context of the channel this handler is in.
  76. private var context: ChannelHandlerContext?
  77. /// Creates a new handler which manages the lifecycle of a connection.
  78. ///
  79. /// - Parameters:
  80. /// - eventLoop: The `EventLoop` of the `Channel` this handler is placed in.
  81. /// - maxIdleTime: The maximum amount time a connection may be idle for before being closed.
  82. /// - keepaliveTime: The amount of time to wait after reading data before sending a keep-alive
  83. /// ping.
  84. /// - keepaliveTimeout: The amount of time the client has to reply after the server sends a
  85. /// keep-alive ping to keep the connection open. The connection is closed if no reply
  86. /// is received.
  87. /// - keepaliveWithoutCalls: Whether the client sends keep-alive pings when there are no calls
  88. /// in progress.
  89. package init(
  90. eventLoop: any EventLoop,
  91. maxIdleTime: TimeAmount?,
  92. keepaliveTime: TimeAmount?,
  93. keepaliveTimeout: TimeAmount?,
  94. keepaliveWithoutCalls: Bool
  95. ) {
  96. self.eventLoop = eventLoop
  97. self.maxIdleTimer = maxIdleTime.map { Timer(delay: $0) }
  98. self.keepaliveTimer = keepaliveTime.map { Timer(delay: $0, repeat: true) }
  99. self.keepaliveTimeoutTimer = Timer(delay: keepaliveTimeout ?? .seconds(20))
  100. self.keepalivePingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
  101. self.state = StateMachine(allowKeepaliveWithoutCalls: keepaliveWithoutCalls)
  102. self.flushPending = false
  103. self.inReadLoop = false
  104. }
  105. package func handlerAdded(context: ChannelHandlerContext) {
  106. assert(context.eventLoop === self.eventLoop)
  107. self.context = context
  108. }
  109. package func handlerRemoved(context: ChannelHandlerContext) {
  110. self.context = nil
  111. }
  112. package func channelInactive(context: ChannelHandlerContext) {
  113. switch self.state.closed() {
  114. case .none:
  115. ()
  116. case .unexpectedClose(let error, let isIdle):
  117. let event = self.wrapInboundOut(.closing(.unexpected(error, isIdle: isIdle)))
  118. context.fireChannelRead(event)
  119. case .succeed(let promise):
  120. promise.succeed()
  121. }
  122. self.keepaliveTimer?.cancel()
  123. self.keepaliveTimeoutTimer.cancel()
  124. context.fireChannelInactive()
  125. }
  126. package func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  127. switch event {
  128. case let event as NIOHTTP2StreamCreatedEvent:
  129. self._streamCreated(event.streamID, channel: context.channel)
  130. case let event as StreamClosedEvent:
  131. self._streamClosed(event.streamID, channel: context.channel)
  132. default:
  133. ()
  134. }
  135. context.fireUserInboundEventTriggered(event)
  136. }
  137. package func errorCaught(context: ChannelHandlerContext, error: any Error) {
  138. // Store the error and close, this will result in the final close event being fired down
  139. // the pipeline with an appropriate close reason and appropriate error. (This avoids
  140. // the async channel just throwing the error.)
  141. self.state.receivedError(error)
  142. context.close(mode: .all, promise: nil)
  143. }
  144. package func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  145. let frame = self.unwrapInboundIn(data)
  146. self.inReadLoop = true
  147. switch frame.payload {
  148. case .goAway(_, let errorCode, let data):
  149. if errorCode == .noError {
  150. // Receiving a GOAWAY frame means we need to stop creating streams immediately and start
  151. // closing the connection.
  152. switch self.state.beginGracefulShutdown(promise: nil) {
  153. case .sendGoAway(let close):
  154. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  155. let message = data.map { String(buffer: $0) } ?? ""
  156. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  157. // Clients should send GOAWAYs when closing a connection.
  158. self.writeAndFlushGoAway(context: context, errorCode: .noError)
  159. if close {
  160. context.close(promise: nil)
  161. }
  162. case .none:
  163. ()
  164. }
  165. } else {
  166. // Some error, begin closing.
  167. if self.state.beginClosing() {
  168. // gRPC servers may indicate why the GOAWAY was sent in the opaque data.
  169. let message = data.map { String(buffer: $0) } ?? ""
  170. context.fireChannelRead(self.wrapInboundOut(.closing(.goAway(errorCode, message))))
  171. context.close(promise: nil)
  172. }
  173. }
  174. case .ping(let data, let ack):
  175. // Pings are ack'd by the HTTP/2 handler so we only pay attention to acks here, and in
  176. // particular only those carrying the keep-alive data.
  177. if ack, data == self.keepalivePingData {
  178. let loopBound = LoopBoundView(handler: self, context: context)
  179. self.keepaliveTimeoutTimer.cancel()
  180. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  181. loopBound.keepaliveTimerFired()
  182. }
  183. }
  184. case .settings(.settings(_)):
  185. let isInitialSettings = self.state.receivedSettings()
  186. // The first settings frame indicates that the connection is now ready to use. The channel
  187. // becoming active is insufficient as, for example, a TLS handshake may fail after
  188. // establishing the TCP connection, or the server isn't configured for gRPC (or HTTP/2).
  189. if isInitialSettings {
  190. let loopBound = LoopBoundView(handler: self, context: context)
  191. self.keepaliveTimer?.schedule(on: context.eventLoop) {
  192. loopBound.keepaliveTimerFired()
  193. }
  194. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  195. loopBound.maxIdleTimerFired()
  196. }
  197. context.fireChannelRead(self.wrapInboundOut(.ready))
  198. }
  199. default:
  200. ()
  201. }
  202. }
  203. package func channelReadComplete(context: ChannelHandlerContext) {
  204. while self.flushPending {
  205. self.flushPending = false
  206. context.flush()
  207. }
  208. self.inReadLoop = false
  209. context.fireChannelReadComplete()
  210. }
  211. package func triggerUserOutboundEvent(
  212. context: ChannelHandlerContext,
  213. event: Any,
  214. promise: EventLoopPromise<Void>?
  215. ) {
  216. if let event = event as? OutboundEvent {
  217. switch event {
  218. case .closeGracefully:
  219. switch self.state.beginGracefulShutdown(promise: promise) {
  220. case .sendGoAway(let close):
  221. context.fireChannelRead(self.wrapInboundOut(.closing(.initiatedLocally)))
  222. // The client could send a GOAWAY at this point but it's not really necessary, the server
  223. // can't open streams anyway, the client will just close the connection when it's done.
  224. if close {
  225. context.close(promise: nil)
  226. }
  227. case .none:
  228. ()
  229. }
  230. }
  231. } else {
  232. context.triggerUserOutboundEvent(event, promise: promise)
  233. }
  234. }
  235. }
  236. extension ClientConnectionHandler {
  237. struct LoopBoundView: @unchecked Sendable {
  238. private let handler: ClientConnectionHandler
  239. private let context: ChannelHandlerContext
  240. init(handler: ClientConnectionHandler, context: ChannelHandlerContext) {
  241. self.handler = handler
  242. self.context = context
  243. }
  244. func keepaliveTimerFired() {
  245. self.context.eventLoop.assertInEventLoop()
  246. self.handler.keepaliveTimerFired(context: self.context)
  247. }
  248. func keepaliveTimeoutExpired() {
  249. self.context.eventLoop.assertInEventLoop()
  250. self.handler.keepaliveTimeoutExpired(context: self.context)
  251. }
  252. func maxIdleTimerFired() {
  253. self.context.eventLoop.assertInEventLoop()
  254. self.handler.maxIdleTimerFired(context: self.context)
  255. }
  256. }
  257. }
  258. extension ClientConnectionHandler {
  259. package struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
  260. // @unchecked is okay: the only methods do the appropriate event-loop dance.
  261. private let handler: ClientConnectionHandler
  262. init(_ handler: ClientConnectionHandler) {
  263. self.handler = handler
  264. }
  265. package func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  266. if self.handler.eventLoop.inEventLoop {
  267. self.handler._streamCreated(id, channel: channel)
  268. } else {
  269. self.handler.eventLoop.execute {
  270. self.handler._streamCreated(id, channel: channel)
  271. }
  272. }
  273. }
  274. package func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  275. if self.handler.eventLoop.inEventLoop {
  276. self.handler._streamClosed(id, channel: channel)
  277. } else {
  278. self.handler.eventLoop.execute {
  279. self.handler._streamClosed(id, channel: channel)
  280. }
  281. }
  282. }
  283. }
  284. package var http2StreamDelegate: HTTP2StreamDelegate {
  285. return HTTP2StreamDelegate(self)
  286. }
  287. private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
  288. self.eventLoop.assertInEventLoop()
  289. // Stream created, so the connection isn't idle.
  290. self.maxIdleTimer?.cancel()
  291. self.state.streamOpened(id)
  292. }
  293. private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
  294. guard let context = self.context else { return }
  295. self.eventLoop.assertInEventLoop()
  296. switch self.state.streamClosed(id) {
  297. case .startIdleTimer(let cancelKeepalive):
  298. // All streams are closed, restart the idle timer, and stop the keep-alive timer (it may
  299. // not stop if keep-alive is allowed when there are no active calls).
  300. let loopBound = LoopBoundView(handler: self, context: context)
  301. self.maxIdleTimer?.schedule(on: context.eventLoop) {
  302. loopBound.maxIdleTimerFired()
  303. }
  304. if cancelKeepalive {
  305. self.keepaliveTimer?.cancel()
  306. }
  307. case .close:
  308. // Connection was closing but waiting for all streams to close. They must all be closed
  309. // now so close the connection.
  310. context.close(promise: nil)
  311. case .none:
  312. ()
  313. }
  314. }
  315. }
  316. extension ClientConnectionHandler {
  317. private func maybeFlush(context: ChannelHandlerContext) {
  318. if self.inReadLoop {
  319. self.flushPending = true
  320. } else {
  321. context.flush()
  322. }
  323. }
  324. private func keepaliveTimerFired(context: ChannelHandlerContext) {
  325. guard self.state.sendKeepalivePing() else { return }
  326. // Cancel the keep alive timer when the client sends a ping. The timer is resumed when the ping
  327. // is acknowledged.
  328. self.keepaliveTimer?.cancel()
  329. let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(self.keepalivePingData, ack: false))
  330. context.write(self.wrapOutboundOut(ping), promise: nil)
  331. self.maybeFlush(context: context)
  332. // Schedule a timeout on waiting for the response.
  333. let loopBound = LoopBoundView(handler: self, context: context)
  334. self.keepaliveTimeoutTimer.schedule(on: context.eventLoop) {
  335. loopBound.keepaliveTimeoutExpired()
  336. }
  337. }
  338. private func keepaliveTimeoutExpired(context: ChannelHandlerContext) {
  339. guard self.state.beginClosing() else { return }
  340. context.fireChannelRead(self.wrapInboundOut(.closing(.keepaliveExpired)))
  341. self.writeAndFlushGoAway(context: context, message: "keepalive_expired")
  342. context.close(promise: nil)
  343. }
  344. private func maxIdleTimerFired(context: ChannelHandlerContext) {
  345. guard self.state.beginClosing() else { return }
  346. context.fireChannelRead(self.wrapInboundOut(.closing(.idle)))
  347. self.writeAndFlushGoAway(context: context, message: "idle")
  348. context.close(promise: nil)
  349. }
  350. private func writeAndFlushGoAway(
  351. context: ChannelHandlerContext,
  352. errorCode: HTTP2ErrorCode = .noError,
  353. message: String? = nil
  354. ) {
  355. let goAway = HTTP2Frame(
  356. streamID: .rootStream,
  357. payload: .goAway(
  358. lastStreamID: 0,
  359. errorCode: errorCode,
  360. opaqueData: message.map { context.channel.allocator.buffer(string: $0) }
  361. )
  362. )
  363. context.write(self.wrapOutboundOut(goAway), promise: nil)
  364. self.maybeFlush(context: context)
  365. }
  366. }
  367. extension ClientConnectionHandler {
  368. struct StateMachine {
  369. private var state: State
  370. private enum State {
  371. case active(Active)
  372. case closing(Closing)
  373. case closed
  374. case _modifying
  375. struct Active {
  376. var openStreams: Set<HTTP2StreamID>
  377. var allowKeepaliveWithoutCalls: Bool
  378. var receivedConnectionPreface: Bool
  379. var error: (any Error)?
  380. init(allowKeepaliveWithoutCalls: Bool) {
  381. self.openStreams = []
  382. self.allowKeepaliveWithoutCalls = allowKeepaliveWithoutCalls
  383. self.receivedConnectionPreface = false
  384. self.error = nil
  385. }
  386. mutating func receivedSettings() -> Bool {
  387. let isFirstSettingsFrame = !self.receivedConnectionPreface
  388. self.receivedConnectionPreface = true
  389. return isFirstSettingsFrame
  390. }
  391. }
  392. struct Closing {
  393. var allowKeepaliveWithoutCalls: Bool
  394. var openStreams: Set<HTTP2StreamID>
  395. var closePromise: Optional<EventLoopPromise<Void>>
  396. var isGraceful: Bool
  397. init(from state: Active, isGraceful: Bool, closePromise: EventLoopPromise<Void>?) {
  398. self.openStreams = state.openStreams
  399. self.isGraceful = isGraceful
  400. self.allowKeepaliveWithoutCalls = state.allowKeepaliveWithoutCalls
  401. self.closePromise = closePromise
  402. }
  403. }
  404. }
  405. init(allowKeepaliveWithoutCalls: Bool) {
  406. self.state = .active(State.Active(allowKeepaliveWithoutCalls: allowKeepaliveWithoutCalls))
  407. }
  408. /// Record that a SETTINGS frame was received from the remote peer.
  409. ///
  410. /// - Returns: `true` if this was the first SETTINGS frame received.
  411. mutating func receivedSettings() -> Bool {
  412. switch self.state {
  413. case .active(var active):
  414. self.state = ._modifying
  415. let isFirstSettingsFrame = active.receivedSettings()
  416. self.state = .active(active)
  417. return isFirstSettingsFrame
  418. case .closing, .closed:
  419. return false
  420. case ._modifying:
  421. preconditionFailure()
  422. }
  423. }
  424. /// Record that an error was received.
  425. mutating func receivedError(_ error: any Error) {
  426. switch self.state {
  427. case .active(var active):
  428. self.state = ._modifying
  429. active.error = error
  430. self.state = .active(active)
  431. case .closing, .closed:
  432. ()
  433. case ._modifying:
  434. preconditionFailure()
  435. }
  436. }
  437. /// Record that the stream with the given ID has been opened.
  438. mutating func streamOpened(_ id: HTTP2StreamID) {
  439. switch self.state {
  440. case .active(var state):
  441. self.state = ._modifying
  442. let (inserted, _) = state.openStreams.insert(id)
  443. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  444. self.state = .active(state)
  445. case .closing(var state):
  446. self.state = ._modifying
  447. let (inserted, _) = state.openStreams.insert(id)
  448. assert(inserted, "Can't open stream \(Int(id)), it's already open")
  449. self.state = .closing(state)
  450. case .closed:
  451. ()
  452. case ._modifying:
  453. preconditionFailure()
  454. }
  455. }
  456. enum OnStreamClosed: Equatable {
  457. /// Start the idle timer, after which the connection should be closed gracefully.
  458. case startIdleTimer(cancelKeepalive: Bool)
  459. /// Close the connection.
  460. case close
  461. /// Do nothing.
  462. case none
  463. }
  464. /// Record that the stream with the given ID has been closed.
  465. mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
  466. let onStreamClosed: OnStreamClosed
  467. switch self.state {
  468. case .active(var state):
  469. self.state = ._modifying
  470. let removedID = state.openStreams.remove(id)
  471. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  472. if state.openStreams.isEmpty {
  473. onStreamClosed = .startIdleTimer(cancelKeepalive: !state.allowKeepaliveWithoutCalls)
  474. } else {
  475. onStreamClosed = .none
  476. }
  477. self.state = .active(state)
  478. case .closing(var state):
  479. self.state = ._modifying
  480. let removedID = state.openStreams.remove(id)
  481. assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
  482. onStreamClosed = state.openStreams.isEmpty ? .close : .none
  483. self.state = .closing(state)
  484. case .closed:
  485. onStreamClosed = .none
  486. case ._modifying:
  487. preconditionFailure()
  488. }
  489. return onStreamClosed
  490. }
  491. /// Returns whether a keep alive ping should be sent to the server.
  492. func sendKeepalivePing() -> Bool {
  493. let sendKeepalivePing: Bool
  494. // Only send a ping if there are open streams or there are no open streams and keep alive
  495. // is permitted when there are no active calls.
  496. switch self.state {
  497. case .active(let state):
  498. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  499. case .closing(let state):
  500. sendKeepalivePing = !state.openStreams.isEmpty || state.allowKeepaliveWithoutCalls
  501. case .closed:
  502. sendKeepalivePing = false
  503. case ._modifying:
  504. preconditionFailure()
  505. }
  506. return sendKeepalivePing
  507. }
  508. enum OnGracefulShutDown: Equatable {
  509. case sendGoAway(Bool)
  510. case none
  511. }
  512. mutating func beginGracefulShutdown(promise: EventLoopPromise<Void>?) -> OnGracefulShutDown {
  513. let onGracefulShutdown: OnGracefulShutDown
  514. switch self.state {
  515. case .active(let state):
  516. self.state = ._modifying
  517. // Only close immediately if there are no open streams. The client doesn't need to
  518. // ratchet down the last stream ID as only the client creates streams in gRPC.
  519. let close = state.openStreams.isEmpty
  520. onGracefulShutdown = .sendGoAway(close)
  521. self.state = .closing(State.Closing(from: state, isGraceful: true, closePromise: promise))
  522. case .closing(var state):
  523. self.state = ._modifying
  524. state.closePromise.setOrCascade(to: promise)
  525. self.state = .closing(state)
  526. onGracefulShutdown = .none
  527. case .closed:
  528. onGracefulShutdown = .none
  529. case ._modifying:
  530. preconditionFailure()
  531. }
  532. return onGracefulShutdown
  533. }
  534. /// Returns whether the connection should be closed.
  535. mutating func beginClosing() -> Bool {
  536. switch self.state {
  537. case .active(let active):
  538. self.state = .closing(State.Closing(from: active, isGraceful: false, closePromise: nil))
  539. return true
  540. case .closing(var state):
  541. self.state = ._modifying
  542. let forceShutdown = state.isGraceful
  543. state.isGraceful = false
  544. self.state = .closing(state)
  545. return forceShutdown
  546. case .closed:
  547. return false
  548. case ._modifying:
  549. preconditionFailure()
  550. }
  551. }
  552. enum OnClosed {
  553. case succeed(EventLoopPromise<Void>)
  554. case unexpectedClose((any Error)?, isIdle: Bool)
  555. case none
  556. }
  557. /// Marks the state as closed.
  558. mutating func closed() -> OnClosed {
  559. switch self.state {
  560. case .active(let state):
  561. self.state = .closed
  562. return .unexpectedClose(state.error, isIdle: state.openStreams.isEmpty)
  563. case .closing(let closing):
  564. self.state = .closed
  565. return closing.closePromise.map { .succeed($0) } ?? .none
  566. case .closed:
  567. self.state = .closed
  568. return .none
  569. case ._modifying:
  570. preconditionFailure()
  571. }
  572. }
  573. }
  574. }