GRPCIdleHandlerStateMachine.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. /// Holds state for the 'GRPCIdleHandler', this isn't really just the idleness of the connection,
  20. /// it also holds state relevant to quiescing the connection as well as logging some HTTP/2 specific
  21. /// information (like stream creation/close events and changes to settings which can be useful when
  22. /// debugging live systems). Much of this information around the connection state is also used to
  23. /// inform the client connection manager since that's strongly tied to various channel and HTTP/2
  24. /// events.
  25. struct GRPCIdleHandlerStateMachine {
  26. /// Our role in the connection.
  27. enum Role {
  28. case server
  29. case client
  30. }
  31. /// The 'operating' state of the connection. This is the primary state we expect to be in: the
  32. /// connection is up and running and there are expected to be active RPCs, although this is by no
  33. /// means a requirement. Some of the situations in which there may be no active RPCs are:
  34. ///
  35. /// 1. Before the connection is 'ready' (that is, seen the first SETTINGS frame),
  36. /// 2. After the connection has dropped to zero active streams and before the idle timeout task
  37. /// has been scheduled.
  38. /// 3. When the connection has zero active streams and the connection was configured without an
  39. /// idle timeout.
  40. fileprivate struct Operating: CanOpenStreams, CanCloseStreams {
  41. /// Our role in the connection.
  42. var role: Role
  43. /// The number of open stream.
  44. var openStreams: Int
  45. /// The last stream ID initiated by the remote peer.
  46. var lastPeerInitiatedStreamID: HTTP2StreamID
  47. /// The maximum number of concurrent streams we are allowed to operate.
  48. var maxConcurrentStreams: Int
  49. /// We keep track of whether we've seen a SETTINGS frame. We expect to see one after the
  50. /// connection preface (RFC 7540 § 3.5). This is primarily for the benefit of the client which
  51. /// determines a connection to be 'ready' once it has seen the first SETTINGS frame. We also
  52. /// won't set an idle timeout until this becomes true.
  53. var hasSeenSettings: Bool
  54. fileprivate init(role: Role) {
  55. self.role = role
  56. self.openStreams = 0
  57. self.lastPeerInitiatedStreamID = .rootStream
  58. // Assumed until we know better.
  59. self.maxConcurrentStreams = 100
  60. self.hasSeenSettings = false
  61. }
  62. fileprivate init(fromWaitingToIdle state: WaitingToIdle) {
  63. self.role = state.role
  64. self.openStreams = 0
  65. self.lastPeerInitiatedStreamID = state.lastPeerInitiatedStreamID
  66. self.maxConcurrentStreams = state.maxConcurrentStreams
  67. // We won't transition to 'WaitingToIdle' unless we've seen a SETTINGS frame.
  68. self.hasSeenSettings = true
  69. }
  70. }
  71. /// The waiting-to-idle state is used when the connection has become 'ready', has no active
  72. /// RPCs and an idle timeout task has been scheduled. In this state, the connection will be closed
  73. /// once the idle is fired. The task will be cancelled on the creation of a stream.
  74. fileprivate struct WaitingToIdle {
  75. /// Our role in the connection.
  76. var role: Role
  77. /// The last stream ID initiated by the remote peer.
  78. var lastPeerInitiatedStreamID: HTTP2StreamID
  79. /// The maximum number of concurrent streams we are allowed to operate.
  80. var maxConcurrentStreams: Int
  81. /// A task which, when fired, will idle the connection.
  82. var idleTask: Scheduled<Void>
  83. fileprivate init(fromOperating state: Operating, idleTask: Scheduled<Void>) {
  84. // We won't transition to this state unless we've seen a SETTINGS frame.
  85. assert(state.hasSeenSettings)
  86. self.role = state.role
  87. self.lastPeerInitiatedStreamID = state.lastPeerInitiatedStreamID
  88. self.maxConcurrentStreams = state.maxConcurrentStreams
  89. self.idleTask = idleTask
  90. }
  91. }
  92. /// The quiescing state is entered only from the operating state. It may be entered if we receive
  93. /// a GOAWAY frame (the remote peer initiated the quiescing) or we initiate graceful shutdown
  94. /// locally.
  95. fileprivate struct Quiescing: TracksOpenStreams, CanCloseStreams {
  96. /// Our role in the connection.
  97. var role: Role
  98. /// The number of open stream.
  99. var openStreams: Int
  100. /// The last stream ID initiated by the remote peer.
  101. var lastPeerInitiatedStreamID: HTTP2StreamID
  102. /// The maximum number of concurrent streams we are allowed to operate.
  103. var maxConcurrentStreams: Int
  104. /// Whether this peer initiated shutting down.
  105. var initiatedByUs: Bool
  106. fileprivate init(fromOperating state: Operating, initiatedByUs: Bool) {
  107. // If we didn't initiate shutdown, the remote peer must have done so by sending a GOAWAY frame
  108. // in which case we must have seen a SETTINGS frame.
  109. assert(initiatedByUs || state.hasSeenSettings)
  110. self.role = state.role
  111. self.initiatedByUs = initiatedByUs
  112. self.openStreams = state.openStreams
  113. self.lastPeerInitiatedStreamID = state.lastPeerInitiatedStreamID
  114. self.maxConcurrentStreams = state.maxConcurrentStreams
  115. }
  116. }
  117. /// The closing state is entered when one of the previous states initiates a connection closure.
  118. /// From this state the only possible transition is to the closed state.
  119. fileprivate struct Closing {
  120. /// Our role in the connection.
  121. var role: Role
  122. /// Should the client connection manager receive an idle event when we close? (If not then it
  123. /// will attempt to establish a new connection immediately.)
  124. var shouldIdle: Bool
  125. fileprivate init(fromOperating state: Operating) {
  126. self.role = state.role
  127. // Idle if there are no open streams and we've seen the first SETTINGS frame.
  128. self.shouldIdle = !state.hasOpenStreams && state.hasSeenSettings
  129. }
  130. fileprivate init(fromQuiescing state: Quiescing) {
  131. self.role = state.role
  132. // If we initiated the quiescing then we shouldn't go idle (we want to shutdown instead).
  133. self.shouldIdle = !state.initiatedByUs
  134. }
  135. fileprivate init(fromWaitingToIdle state: WaitingToIdle, shouldIdle: Bool = true) {
  136. self.role = state.role
  137. self.shouldIdle = shouldIdle
  138. }
  139. }
  140. fileprivate enum State {
  141. case operating(Operating)
  142. case waitingToIdle(WaitingToIdle)
  143. case quiescing(Quiescing)
  144. case closing(Closing)
  145. case closed
  146. }
  147. /// The set of operations that should be performed as a result of interaction with the state
  148. /// machine.
  149. struct Operations {
  150. /// An event to notify the connection manager about.
  151. private(set) var connectionManagerEvent: ConnectionManagerEvent?
  152. /// The value of HTTP/2 SETTINGS_MAX_CONCURRENT_STREAMS changed.
  153. private(set) var maxConcurrentStreamsChange: Int?
  154. /// An idle task, either scheduling or cancelling an idle timeout.
  155. private(set) var idleTask: IdleTask?
  156. /// Send a GOAWAY frame with the last peer initiated stream ID set to this value.
  157. private(set) var sendGoAwayWithLastPeerInitiatedStreamID: HTTP2StreamID?
  158. /// Whether the channel should be closed.
  159. private(set) var shouldCloseChannel: Bool
  160. /// Whether a ping should be sent after a GOAWAY frame.
  161. private(set) var shouldPingAfterGoAway: Bool
  162. fileprivate static let none = Operations()
  163. fileprivate mutating func sendGoAwayFrame(
  164. lastPeerInitiatedStreamID streamID: HTTP2StreamID,
  165. followWithPing: Bool = false
  166. ) {
  167. self.sendGoAwayWithLastPeerInitiatedStreamID = streamID
  168. self.shouldPingAfterGoAway = followWithPing
  169. }
  170. fileprivate mutating func cancelIdleTask(_ task: Scheduled<Void>) {
  171. self.idleTask = .cancel(task)
  172. }
  173. fileprivate mutating func scheduleIdleTask() {
  174. self.idleTask = .schedule
  175. }
  176. fileprivate mutating func closeChannel() {
  177. self.shouldCloseChannel = true
  178. }
  179. fileprivate mutating func notifyConnectionManager(about event: ConnectionManagerEvent) {
  180. self.connectionManagerEvent = event
  181. }
  182. fileprivate mutating func maxConcurrentStreamsChanged(_ newValue: Int) {
  183. self.maxConcurrentStreamsChange = newValue
  184. }
  185. private init() {
  186. self.connectionManagerEvent = nil
  187. self.idleTask = nil
  188. self.sendGoAwayWithLastPeerInitiatedStreamID = nil
  189. self.shouldCloseChannel = false
  190. self.shouldPingAfterGoAway = false
  191. }
  192. }
  193. /// An event to notify the 'ConnectionManager' about.
  194. enum ConnectionManagerEvent {
  195. case inactive
  196. case idle
  197. case ready
  198. case quiescing
  199. }
  200. enum IdleTask {
  201. case schedule
  202. case cancel(Scheduled<Void>)
  203. }
  204. /// The current state.
  205. private var state: State
  206. /// A logger.
  207. internal var logger: Logger
  208. /// Create a new state machine.
  209. init(role: Role, logger: Logger) {
  210. self.state = .operating(.init(role: role))
  211. self.logger = logger
  212. }
  213. // MARK: Stream Events
  214. /// An HTTP/2 stream was created.
  215. mutating func streamCreated(withID streamID: HTTP2StreamID) -> Operations {
  216. var operations: Operations = .none
  217. switch self.state {
  218. case var .operating(state):
  219. // Create the stream.
  220. state.streamCreated(streamID, logger: self.logger)
  221. self.state = .operating(state)
  222. case let .waitingToIdle(state):
  223. var operating = Operating(fromWaitingToIdle: state)
  224. operating.streamCreated(streamID, logger: self.logger)
  225. self.state = .operating(operating)
  226. operations.cancelIdleTask(state.idleTask)
  227. case var .quiescing(state):
  228. switch state.role {
  229. case .client where streamID.isServerInitiated:
  230. state.lastPeerInitiatedStreamID = streamID
  231. case .server where streamID.isClientInitiated:
  232. state.lastPeerInitiatedStreamID = streamID
  233. default:
  234. ()
  235. }
  236. state.openStreams += 1
  237. self.state = .quiescing(state)
  238. case .closing, .closed:
  239. ()
  240. }
  241. return operations
  242. }
  243. /// An HTTP/2 stream was closed.
  244. mutating func streamClosed(withID streamID: HTTP2StreamID) -> Operations {
  245. var operations: Operations = .none
  246. switch self.state {
  247. case var .operating(state):
  248. state.streamClosed(streamID, logger: self.logger)
  249. if state.hasSeenSettings, !state.hasOpenStreams {
  250. operations.scheduleIdleTask()
  251. }
  252. self.state = .operating(state)
  253. case .waitingToIdle:
  254. // If we're waiting to idle then there can't be any streams open which can be closed.
  255. preconditionFailure()
  256. case var .quiescing(state):
  257. state.streamClosed(streamID, logger: self.logger)
  258. if state.hasOpenStreams {
  259. self.state = .quiescing(state)
  260. } else {
  261. self.state = .closing(.init(fromQuiescing: state))
  262. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  263. operations.closeChannel()
  264. }
  265. case .closing, .closed:
  266. ()
  267. }
  268. return operations
  269. }
  270. // MARK: - Idle Events
  271. /// The given task was scheduled to idle the connection.
  272. mutating func scheduledIdleTimeoutTask(_ task: Scheduled<Void>) -> Operations {
  273. var operations: Operations = .none
  274. switch self.state {
  275. case let .operating(state):
  276. if state.hasOpenStreams {
  277. operations.cancelIdleTask(task)
  278. } else {
  279. self.state = .waitingToIdle(.init(fromOperating: state, idleTask: task))
  280. }
  281. case .waitingToIdle:
  282. // There's already an idle task.
  283. preconditionFailure()
  284. case .quiescing, .closing, .closed:
  285. operations.cancelIdleTask(task)
  286. }
  287. return operations
  288. }
  289. /// The idle timeout task fired, the connection should be idled.
  290. mutating func idleTimeoutTaskFired() -> Operations {
  291. var operations: Operations = .none
  292. switch self.state {
  293. case let .waitingToIdle(state):
  294. self.state = .closing(.init(fromWaitingToIdle: state))
  295. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  296. operations.closeChannel()
  297. // We're either operating on streams, streams are going away, or the connection is going away
  298. // so we don't need to idle the connection.
  299. case .operating, .quiescing, .closing, .closed:
  300. ()
  301. }
  302. return operations
  303. }
  304. // MARK: - Shutdown Events
  305. /// Close the connection, this can be caused as a result of a keepalive timeout (i.e. the server
  306. /// has become unresponsive), we'll bin this connection as a result.
  307. mutating func shutdownNow() -> Operations {
  308. var operations = Operations.none
  309. switch self.state {
  310. case let .operating(state):
  311. var closing = Closing(fromOperating: state)
  312. closing.shouldIdle = false
  313. self.state = .closing(closing)
  314. operations.closeChannel()
  315. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  316. case let .waitingToIdle(state):
  317. // Don't idle.
  318. self.state = .closing(Closing(fromWaitingToIdle: state, shouldIdle: false))
  319. operations.closeChannel()
  320. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  321. operations.cancelIdleTask(state.idleTask)
  322. case let .quiescing(state):
  323. self.state = .closing(Closing(fromQuiescing: state))
  324. // We've already sent a GOAWAY frame if we're in this state, just close.
  325. operations.closeChannel()
  326. case .closing, .closed:
  327. ()
  328. }
  329. return operations
  330. }
  331. /// Initiate a graceful shutdown of this connection, that is, begin quiescing.
  332. mutating func initiateGracefulShutdown() -> Operations {
  333. var operations: Operations = .none
  334. switch self.state {
  335. case let .operating(state):
  336. operations.notifyConnectionManager(about: .quiescing)
  337. if state.hasOpenStreams {
  338. // There are open streams: send a GOAWAY frame and wait for the stream count to reach zero.
  339. //
  340. // It's okay if we haven't seen a SETTINGS frame at this point; we've initiated the shutdown
  341. // so making a connection is ready isn't necessary.
  342. // TODO: we should ratchet down the last initiated stream after 1-RTT.
  343. //
  344. // As a client we will just stop initiating streams.
  345. if state.role == .server {
  346. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  347. }
  348. self.state = .quiescing(.init(fromOperating: state, initiatedByUs: true))
  349. } else {
  350. // No open streams: send a GOAWAY frame and close the channel.
  351. self.state = .closing(.init(fromOperating: state))
  352. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  353. operations.closeChannel()
  354. }
  355. case let .waitingToIdle(state):
  356. // There can't be any open streams, but we have a few loose ends to clear up: we need to
  357. // cancel the idle timeout, send a GOAWAY frame and then close. We don't want to idle from the
  358. // closing state: we want to shutdown instead.
  359. self.state = .closing(.init(fromWaitingToIdle: state, shouldIdle: false))
  360. operations.cancelIdleTask(state.idleTask)
  361. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  362. operations.closeChannel()
  363. case var .quiescing(state):
  364. // We're already quiescing: either the remote initiated it or we're initiating it more than
  365. // once. Set ourselves as the initiator to ensure we don't idle when we eventually close, this
  366. // is important for the client: if the server initiated this then we establish a new
  367. // connection when we close, unless we also initiated shutdown.
  368. state.initiatedByUs = true
  369. self.state = .quiescing(state)
  370. case var .closing(state):
  371. // We've already called 'close()', make sure we don't go idle.
  372. state.shouldIdle = false
  373. self.state = .closing(state)
  374. case .closed:
  375. ()
  376. }
  377. return operations
  378. }
  379. /// The connection has reached it's max allowable age. Let existing RPCs continue, but don't
  380. /// allow any new ones.
  381. mutating func reachedMaxAge() -> Operations {
  382. // Treat this as if the other side sent us a GOAWAY: gently shutdown the connection.
  383. self.receiveGoAway()
  384. }
  385. /// We've received a GOAWAY frame from the remote peer. Either the remote peer wants to close the
  386. /// connection or they're responding to us shutting down the connection.
  387. mutating func receiveGoAway() -> Operations {
  388. var operations: Operations = .none
  389. switch self.state {
  390. case let .operating(state):
  391. // A SETTINGS frame MUST follow the connection preface. (RFC 7540 § 3.5)
  392. assert(state.hasSeenSettings)
  393. operations.notifyConnectionManager(about: .quiescing)
  394. if state.hasOpenStreams {
  395. switch state.role {
  396. case .client:
  397. // The server sent us a GOAWAY we'll just stop opening new streams and will send a GOAWAY
  398. // frame before we close later.
  399. ()
  400. case .server:
  401. // Client sent us a GOAWAY frame; we'll let the streams drain and then close. We'll tell
  402. // the client that we're going away and send them a ping. When we receive the pong we will
  403. // send another GOAWAY frame with a lower stream ID. In this case, the pong acts as an ack
  404. // for the GOAWAY.
  405. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: .maxID, followWithPing: true)
  406. }
  407. self.state = .quiescing(.init(fromOperating: state, initiatedByUs: false))
  408. } else {
  409. // No open streams, we can close as well.
  410. self.state = .closing(.init(fromOperating: state))
  411. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  412. operations.closeChannel()
  413. }
  414. case let .waitingToIdle(state):
  415. // There can't be any open streams, but we have a few loose ends to clear up: we need to
  416. // cancel the idle timeout, send a GOAWAY frame and then close.
  417. // We should also notify the connection manager that quiescing is happening.
  418. self.state = .closing(.init(fromWaitingToIdle: state))
  419. operations.notifyConnectionManager(about: .quiescing)
  420. operations.cancelIdleTask(state.idleTask)
  421. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: state.lastPeerInitiatedStreamID)
  422. operations.closeChannel()
  423. case .quiescing:
  424. // We're already quiescing, this changes nothing.
  425. ()
  426. case .closing, .closed:
  427. // We're already closing/closed (so must have emitted a GOAWAY frame already). Ignore this.
  428. ()
  429. }
  430. return operations
  431. }
  432. mutating func ratchetDownGoAwayStreamID() -> Operations {
  433. var operations: Operations = .none
  434. switch self.state {
  435. case let .quiescing(state):
  436. let streamID = state.lastPeerInitiatedStreamID
  437. operations.sendGoAwayFrame(lastPeerInitiatedStreamID: streamID)
  438. case .operating, .waitingToIdle, .closing, .closed:
  439. // We can only need to ratchet down the stream ID if we're already quiescing.
  440. ()
  441. }
  442. return operations
  443. }
  444. mutating func receiveSettings(_ settings: HTTP2Settings) -> Operations {
  445. // Log the change in settings.
  446. self.logger.debug(
  447. "HTTP2 settings update",
  448. metadata: Dictionary(
  449. settings.map {
  450. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  451. },
  452. uniquingKeysWith: { a, _ in a }
  453. )
  454. )
  455. var operations: Operations = .none
  456. switch self.state {
  457. case var .operating(state):
  458. let hasSeenSettingsPreviously = state.hasSeenSettings
  459. // If we hadn't previously seen settings then we need to notify the client connection manager
  460. // that we're now ready.
  461. if !hasSeenSettingsPreviously {
  462. operations.notifyConnectionManager(about: .ready)
  463. state.hasSeenSettings = true
  464. // Now that we know the connection is ready, we may want to start an idle timeout as well.
  465. if !state.hasOpenStreams {
  466. operations.scheduleIdleTask()
  467. }
  468. }
  469. // Update max concurrent streams.
  470. if let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value {
  471. operations.maxConcurrentStreamsChanged(maxStreams)
  472. state.maxConcurrentStreams = maxStreams
  473. } else if !hasSeenSettingsPreviously {
  474. // We hadn't seen settings before now and max concurrent streams wasn't set we should assume
  475. // the default and emit an update.
  476. operations.maxConcurrentStreamsChanged(100)
  477. state.maxConcurrentStreams = 100
  478. }
  479. self.state = .operating(state)
  480. case var .waitingToIdle(state):
  481. // Update max concurrent streams.
  482. if let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value {
  483. operations.maxConcurrentStreamsChanged(maxStreams)
  484. state.maxConcurrentStreams = maxStreams
  485. }
  486. self.state = .waitingToIdle(state)
  487. case .quiescing, .closing, .closed:
  488. ()
  489. }
  490. return operations
  491. }
  492. // MARK: - Channel Events
  493. // (Other channel events aren't included here as they don't impact the state machine.)
  494. /// 'channelActive' was called in the idle handler holding this state machine.
  495. mutating func channelInactive() -> Operations {
  496. var operations: Operations = .none
  497. switch self.state {
  498. case let .operating(state):
  499. self.state = .closed
  500. // We unexpectedly became inactive.
  501. if !state.hasSeenSettings || state.hasOpenStreams {
  502. // Haven't seen settings, or we've seen settings and there are open streams.
  503. operations.notifyConnectionManager(about: .inactive)
  504. } else {
  505. // Have seen settings and there are no open streams.
  506. operations.notifyConnectionManager(about: .idle)
  507. }
  508. case let .waitingToIdle(state):
  509. self.state = .closed
  510. // We were going to idle anyway.
  511. operations.notifyConnectionManager(about: .idle)
  512. operations.cancelIdleTask(state.idleTask)
  513. case let .quiescing(state):
  514. self.state = .closed
  515. if state.initiatedByUs || state.hasOpenStreams {
  516. operations.notifyConnectionManager(about: .inactive)
  517. } else {
  518. operations.notifyConnectionManager(about: .idle)
  519. }
  520. case let .closing(state):
  521. self.state = .closed
  522. if state.shouldIdle {
  523. operations.notifyConnectionManager(about: .idle)
  524. } else {
  525. operations.notifyConnectionManager(about: .inactive)
  526. }
  527. case .closed:
  528. ()
  529. }
  530. return operations
  531. }
  532. }
  533. // MARK: - Helper Protocols
  534. private protocol TracksOpenStreams {
  535. /// The number of open streams.
  536. var openStreams: Int { get set }
  537. }
  538. extension TracksOpenStreams {
  539. /// Whether any streams are open.
  540. fileprivate var hasOpenStreams: Bool {
  541. return self.openStreams != 0
  542. }
  543. }
  544. private protocol CanOpenStreams: TracksOpenStreams {
  545. /// The role of this peer in the connection.
  546. var role: GRPCIdleHandlerStateMachine.Role { get }
  547. /// The ID of the stream most recently initiated by the remote peer.
  548. var lastPeerInitiatedStreamID: HTTP2StreamID { get set }
  549. /// The maximum number of concurrent streams.
  550. var maxConcurrentStreams: Int { get set }
  551. mutating func streamCreated(_ streamID: HTTP2StreamID, logger: Logger)
  552. }
  553. extension CanOpenStreams {
  554. fileprivate mutating func streamCreated(_ streamID: HTTP2StreamID, logger: Logger) {
  555. self.openStreams += 1
  556. switch self.role {
  557. case .client where streamID.isServerInitiated:
  558. self.lastPeerInitiatedStreamID = streamID
  559. case .server where streamID.isClientInitiated:
  560. self.lastPeerInitiatedStreamID = streamID
  561. default:
  562. ()
  563. }
  564. logger.debug(
  565. "HTTP2 stream created",
  566. metadata: [
  567. MetadataKey.h2StreamID: "\(streamID)",
  568. MetadataKey.h2ActiveStreams: "\(self.openStreams)",
  569. ]
  570. )
  571. if self.openStreams == self.maxConcurrentStreams {
  572. logger.warning(
  573. "HTTP2 max concurrent stream limit reached",
  574. metadata: [
  575. MetadataKey.h2ActiveStreams: "\(self.openStreams)"
  576. ]
  577. )
  578. }
  579. }
  580. }
  581. private protocol CanCloseStreams: TracksOpenStreams {
  582. /// Notes that a stream has closed.
  583. mutating func streamClosed(_ streamID: HTTP2StreamID, logger: Logger)
  584. }
  585. extension CanCloseStreams {
  586. fileprivate mutating func streamClosed(_ streamID: HTTP2StreamID, logger: Logger) {
  587. self.openStreams -= 1
  588. logger.debug(
  589. "HTTP2 stream closed",
  590. metadata: [
  591. MetadataKey.h2StreamID: "\(streamID)",
  592. MetadataKey.h2ActiveStreams: "\(self.openStreams)",
  593. ]
  594. )
  595. }
  596. }