GRPCIdleHandler.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. import NIOTransportServices
  21. internal final class GRPCIdleHandler: ChannelInboundHandler {
  22. typealias InboundIn = HTTP2Frame
  23. typealias OutboundOut = HTTP2Frame
  24. /// The amount of time to wait before closing the channel when there are no active streams.
  25. /// If nil, then we shouldn't schedule idle tasks.
  26. private let idleTimeout: TimeAmount?
  27. /// The maximum amount of time the connection is allowed to live before quiescing.
  28. private let maxAge: TimeAmount?
  29. /// The ping handler.
  30. private var pingHandler: PingHandler
  31. /// The scheduled task which will close the connection gently after the max connection age
  32. /// has been reached.
  33. private var scheduledMaxAgeClose: Scheduled<Void>?
  34. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  35. private var scheduledKeepAliveClose: Scheduled<Void>?
  36. /// The scheduled task which will ping.
  37. private var scheduledPing: RepeatedTask?
  38. /// The mode we're operating in.
  39. private let mode: Mode
  40. /// The time the handler was created.
  41. private let creationTime: NIODeadline
  42. /// Returns the age of the connection in seconds.
  43. private var connectionAgeInSeconds: UInt64 {
  44. let now = NIODeadline.now()
  45. let nanoseconds = now.uptimeNanoseconds - self.creationTime.uptimeNanoseconds
  46. let seconds = nanoseconds / 1_000_000_000
  47. return seconds
  48. }
  49. private var context: ChannelHandlerContext?
  50. /// The mode of operation: the client tracks additional connection state in the connection
  51. /// manager.
  52. internal enum Mode {
  53. case client(ConnectionManager, HTTP2StreamMultiplexer)
  54. case server
  55. var connectionManager: ConnectionManager? {
  56. switch self {
  57. case let .client(manager, _):
  58. return manager
  59. case .server:
  60. return nil
  61. }
  62. }
  63. }
  64. /// The current state.
  65. private var stateMachine: GRPCIdleHandlerStateMachine
  66. init(
  67. connectionManager: ConnectionManager,
  68. multiplexer: HTTP2StreamMultiplexer,
  69. idleTimeout: TimeAmount,
  70. maxAge: TimeAmount?,
  71. keepalive configuration: ClientConnectionKeepalive,
  72. logger: Logger
  73. ) {
  74. self.mode = .client(connectionManager, multiplexer)
  75. switch connectionManager.idleBehavior {
  76. case .neverGoIdle:
  77. self.idleTimeout = nil
  78. case .closeWhenIdleTimeout:
  79. self.idleTimeout = idleTimeout
  80. }
  81. self.stateMachine = .init(role: .client, logger: logger)
  82. self.pingHandler = PingHandler(
  83. pingCode: 5,
  84. interval: configuration.interval,
  85. timeout: configuration.timeout,
  86. permitWithoutCalls: configuration.permitWithoutCalls,
  87. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  88. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  89. )
  90. self.creationTime = .now()
  91. self.maxAge = maxAge
  92. }
  93. init(
  94. idleTimeout: TimeAmount,
  95. keepalive configuration: ServerConnectionKeepalive,
  96. logger: Logger
  97. ) {
  98. self.mode = .server
  99. self.stateMachine = .init(role: .server, logger: logger)
  100. self.idleTimeout = idleTimeout
  101. self.pingHandler = PingHandler(
  102. pingCode: 10,
  103. interval: configuration.interval,
  104. timeout: configuration.timeout,
  105. permitWithoutCalls: configuration.permitWithoutCalls,
  106. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  107. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  108. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  109. maximumPingStrikes: configuration.maximumPingStrikes
  110. )
  111. self.creationTime = .now()
  112. self.maxAge = nil
  113. }
  114. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  115. // Prod the connection manager.
  116. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  117. switch event {
  118. case .idle:
  119. manager.idle()
  120. case .inactive:
  121. manager.channelInactive()
  122. case .ready:
  123. manager.ready()
  124. case .quiescing:
  125. manager.beginQuiescing()
  126. }
  127. }
  128. // Max concurrent streams changed.
  129. if let manager = self.mode.connectionManager,
  130. let maxConcurrentStreams = operations.maxConcurrentStreamsChange
  131. {
  132. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  133. }
  134. // Handle idle timeout creation/cancellation.
  135. if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask {
  136. switch idleTask {
  137. case let .cancel(task):
  138. self.stateMachine.logger.debug("idle timeout task cancelled")
  139. task.cancel()
  140. case .schedule:
  141. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  142. self.stateMachine.logger.debug(
  143. "scheduling idle timeout task",
  144. metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"]
  145. )
  146. let task = context.eventLoop.scheduleTask(in: idleTimeout) {
  147. self.stateMachine.logger.debug("idle timeout task fired")
  148. self.idleTimeoutFired()
  149. }
  150. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  151. }
  152. }
  153. }
  154. // Send a GOAWAY frame.
  155. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  156. self.stateMachine.logger.debug(
  157. "sending GOAWAY frame",
  158. metadata: [
  159. MetadataKey.h2GoAwayLastStreamID: "\(Int(streamID))"
  160. ]
  161. )
  162. let goAwayFrame = HTTP2Frame(
  163. streamID: .rootStream,
  164. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  165. )
  166. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  167. // We emit a ping after some GOAWAY frames.
  168. if operations.shouldPingAfterGoAway {
  169. let pingFrame = HTTP2Frame(
  170. streamID: .rootStream,
  171. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  172. )
  173. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  174. }
  175. self.context?.flush()
  176. }
  177. // Close the channel, if necessary.
  178. if operations.shouldCloseChannel, let context = self.context {
  179. // Close on the next event-loop tick so we don't drop any events which are
  180. // currently being processed.
  181. context.eventLoop.execute {
  182. self.stateMachine.logger.debug(
  183. "closing connection",
  184. metadata: ["connection_age_secs": .stringConvertible(self.connectionAgeInSeconds)]
  185. )
  186. context.close(mode: .all, promise: nil)
  187. }
  188. }
  189. }
  190. private func handlePingAction(_ action: PingHandler.Action) {
  191. switch action {
  192. case .none:
  193. ()
  194. case .ack:
  195. // NIO's HTTP2 handler acks for us so this is a no-op. Log so it doesn't appear that we are
  196. // ignoring pings.
  197. self.stateMachine.logger.debug(
  198. "sending PING frame",
  199. metadata: [MetadataKey.h2PingAck: "true"]
  200. )
  201. case .cancelScheduledTimeout:
  202. self.scheduledKeepAliveClose?.cancel()
  203. self.scheduledKeepAliveClose = nil
  204. case let .schedulePing(delay, timeout):
  205. self.schedulePing(in: delay, timeout: timeout)
  206. case let .reply(framePayload):
  207. switch framePayload {
  208. case .ping(_, let ack):
  209. self.stateMachine.logger.debug(
  210. "sending PING frame",
  211. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  212. )
  213. default:
  214. ()
  215. }
  216. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  217. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  218. case .ratchetDownLastSeenStreamID:
  219. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  220. }
  221. }
  222. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  223. guard delay != .nanoseconds(.max) else {
  224. return
  225. }
  226. self.stateMachine.logger.debug(
  227. "scheduled keepalive pings",
  228. metadata: [MetadataKey.intervalMs: "\(delay.milliseconds)"]
  229. )
  230. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  231. initialDelay: delay,
  232. delay: delay
  233. ) { _ in
  234. let action = self.pingHandler.pingFired()
  235. if case .none = action { return }
  236. self.handlePingAction(action)
  237. // `timeout` is less than `interval`, guaranteeing that the close task
  238. // will be fired before a new ping is triggered.
  239. assert(timeout < delay, "`timeout` must be less than `interval`")
  240. self.scheduleClose(in: timeout)
  241. }
  242. }
  243. private func scheduleClose(in timeout: TimeAmount) {
  244. self.scheduledKeepAliveClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  245. self.stateMachine.logger.debug("keepalive timer expired")
  246. self.perform(operations: self.stateMachine.shutdownNow())
  247. }
  248. }
  249. private func idleTimeoutFired() {
  250. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  251. }
  252. func handlerAdded(context: ChannelHandlerContext) {
  253. self.context = context
  254. }
  255. func handlerRemoved(context: ChannelHandlerContext) {
  256. self.context = nil
  257. }
  258. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  259. if let created = event as? NIOHTTP2StreamCreatedEvent {
  260. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  261. self.handlePingAction(self.pingHandler.streamCreated())
  262. self.mode.connectionManager?.streamOpened()
  263. context.fireUserInboundEventTriggered(event)
  264. } else if let closed = event as? StreamClosedEvent {
  265. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  266. self.handlePingAction(self.pingHandler.streamClosed())
  267. self.mode.connectionManager?.streamClosed()
  268. context.fireUserInboundEventTriggered(event)
  269. } else if event is ChannelShouldQuiesceEvent {
  270. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  271. // Swallow this event.
  272. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  273. let tlsVersion = try? context.channel.getTLSVersionSync()
  274. self.stateMachine.logger.debug(
  275. "TLS handshake completed",
  276. metadata: [
  277. "alpn": "\(negotiatedProtocol ?? "nil")",
  278. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  279. ]
  280. )
  281. context.fireUserInboundEventTriggered(event)
  282. } else {
  283. #if canImport(Network)
  284. if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  285. if let waitsForConnectivity = event as? NIOTSNetworkEvents.WaitingForConnectivity {
  286. self.mode.connectionManager?.channelError(waitsForConnectivity.transientError)
  287. }
  288. }
  289. #endif
  290. context.fireUserInboundEventTriggered(event)
  291. }
  292. }
  293. func errorCaught(context: ChannelHandlerContext, error: Error) {
  294. // No state machine action here.
  295. self.mode.connectionManager?.channelError(error)
  296. context.fireErrorCaught(error)
  297. }
  298. func channelActive(context: ChannelHandlerContext) {
  299. self.stateMachine.logger.addIPAddressMetadata(
  300. local: context.localAddress,
  301. remote: context.remoteAddress
  302. )
  303. // If a max age has been set then start a timer. This will only be cancelled when it fires or when
  304. // the channel eventually becomes inactive.
  305. if let maxAge = self.maxAge {
  306. assert(self.scheduledMaxAgeClose == nil)
  307. self.scheduledMaxAgeClose = context.eventLoop.scheduleTask(in: maxAge) {
  308. let operations = self.stateMachine.reachedMaxAge()
  309. self.perform(operations: operations)
  310. }
  311. }
  312. // No state machine action here.
  313. switch self.mode {
  314. case let .client(connectionManager, multiplexer):
  315. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  316. case .server:
  317. ()
  318. }
  319. context.fireChannelActive()
  320. }
  321. func channelInactive(context: ChannelHandlerContext) {
  322. self.perform(operations: self.stateMachine.channelInactive())
  323. self.scheduledPing?.cancel()
  324. self.scheduledKeepAliveClose?.cancel()
  325. self.scheduledMaxAgeClose?.cancel()
  326. self.scheduledPing = nil
  327. self.scheduledKeepAliveClose = nil
  328. self.scheduledMaxAgeClose = nil
  329. context.fireChannelInactive()
  330. }
  331. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  332. let frame = self.unwrapInboundIn(data)
  333. switch frame.payload {
  334. case let .goAway(lastStreamID, errorCode, _):
  335. self.stateMachine.logger.debug(
  336. "received GOAWAY frame",
  337. metadata: [
  338. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  339. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  340. ]
  341. )
  342. self.perform(operations: self.stateMachine.receiveGoAway())
  343. case let .settings(.settings(settings)):
  344. self.perform(operations: self.stateMachine.receiveSettings(settings))
  345. case let .ping(data, ack):
  346. self.stateMachine.logger.debug(
  347. "received PING frame",
  348. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  349. )
  350. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  351. default:
  352. // We're not interested in other events.
  353. ()
  354. }
  355. context.fireChannelRead(data)
  356. }
  357. }
  358. extension HTTP2SettingsParameter {
  359. internal var loggingMetadataKey: String {
  360. switch self {
  361. case .headerTableSize:
  362. return "h2_settings_header_table_size"
  363. case .enablePush:
  364. return "h2_settings_enable_push"
  365. case .maxConcurrentStreams:
  366. return "h2_settings_max_concurrent_streams"
  367. case .initialWindowSize:
  368. return "h2_settings_initial_window_size"
  369. case .maxFrameSize:
  370. return "h2_settings_max_frame_size"
  371. case .maxHeaderListSize:
  372. return "h2_settings_max_header_list_size"
  373. case .enableConnectProtocol:
  374. return "h2_settings_enable_connect_protocol"
  375. default:
  376. return String(describing: self)
  377. }
  378. }
  379. }
  380. extension TimeAmount {
  381. fileprivate var milliseconds: Int64 {
  382. self.nanoseconds / 1_000_000
  383. }
  384. }