GRPCIdleHandler.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. import NIOTransportServices
  21. internal final class GRPCIdleHandler: ChannelInboundHandler {
  22. typealias InboundIn = HTTP2Frame
  23. typealias OutboundOut = HTTP2Frame
  24. /// The amount of time to wait before closing the channel when there are no active streams.
  25. /// If nil, then we shouldn't schedule idle tasks.
  26. private let idleTimeout: TimeAmount?
  27. /// The ping handler.
  28. private var pingHandler: PingHandler
  29. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  30. private var scheduledClose: Scheduled<Void>?
  31. /// The scheduled task which will ping.
  32. private var scheduledPing: RepeatedTask?
  33. /// The mode we're operating in.
  34. private let mode: Mode
  35. /// The time the handler was created.
  36. private let creationTime: NIODeadline
  37. /// Returns the age of the connection in seconds.
  38. private var connectionAgeInSeconds: UInt64 {
  39. let now = NIODeadline.now()
  40. let nanoseconds = now.uptimeNanoseconds - self.creationTime.uptimeNanoseconds
  41. let seconds = nanoseconds / 1_000_000_000
  42. return seconds
  43. }
  44. private var context: ChannelHandlerContext?
  45. /// The mode of operation: the client tracks additional connection state in the connection
  46. /// manager.
  47. internal enum Mode {
  48. case client(ConnectionManager, HTTP2StreamMultiplexer)
  49. case server
  50. var connectionManager: ConnectionManager? {
  51. switch self {
  52. case let .client(manager, _):
  53. return manager
  54. case .server:
  55. return nil
  56. }
  57. }
  58. }
  59. /// The current state.
  60. private var stateMachine: GRPCIdleHandlerStateMachine
  61. init(
  62. connectionManager: ConnectionManager,
  63. multiplexer: HTTP2StreamMultiplexer,
  64. idleTimeout: TimeAmount,
  65. keepalive configuration: ClientConnectionKeepalive,
  66. logger: Logger
  67. ) {
  68. self.mode = .client(connectionManager, multiplexer)
  69. switch connectionManager.idleBehavior {
  70. case .neverGoIdle:
  71. self.idleTimeout = nil
  72. case .closeWhenIdleTimeout:
  73. self.idleTimeout = idleTimeout
  74. }
  75. self.stateMachine = .init(role: .client, logger: logger)
  76. self.pingHandler = PingHandler(
  77. pingCode: 5,
  78. interval: configuration.interval,
  79. timeout: configuration.timeout,
  80. permitWithoutCalls: configuration.permitWithoutCalls,
  81. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  82. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  83. )
  84. self.creationTime = .now()
  85. }
  86. init(
  87. idleTimeout: TimeAmount,
  88. keepalive configuration: ServerConnectionKeepalive,
  89. logger: Logger
  90. ) {
  91. self.mode = .server
  92. self.stateMachine = .init(role: .server, logger: logger)
  93. self.idleTimeout = idleTimeout
  94. self.pingHandler = PingHandler(
  95. pingCode: 10,
  96. interval: configuration.interval,
  97. timeout: configuration.timeout,
  98. permitWithoutCalls: configuration.permitWithoutCalls,
  99. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  100. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  101. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  102. maximumPingStrikes: configuration.maximumPingStrikes
  103. )
  104. self.creationTime = .now()
  105. }
  106. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  107. // Prod the connection manager.
  108. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  109. switch event {
  110. case .idle:
  111. manager.idle()
  112. case .inactive:
  113. manager.channelInactive()
  114. case .ready:
  115. manager.ready()
  116. case .quiescing:
  117. manager.beginQuiescing()
  118. }
  119. }
  120. // Max concurrent streams changed.
  121. if let manager = self.mode.connectionManager,
  122. let maxConcurrentStreams = operations.maxConcurrentStreamsChange
  123. {
  124. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  125. }
  126. // Handle idle timeout creation/cancellation.
  127. if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask {
  128. switch idleTask {
  129. case let .cancel(task):
  130. self.stateMachine.logger.debug("idle timeout task cancelled")
  131. task.cancel()
  132. case .schedule:
  133. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  134. self.stateMachine.logger.debug(
  135. "scheduling idle timeout task",
  136. metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"]
  137. )
  138. let task = context.eventLoop.scheduleTask(in: idleTimeout) {
  139. self.stateMachine.logger.debug("idle timeout task fired")
  140. self.idleTimeoutFired()
  141. }
  142. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  143. }
  144. }
  145. }
  146. // Send a GOAWAY frame.
  147. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  148. self.stateMachine.logger.debug(
  149. "sending GOAWAY frame",
  150. metadata: [
  151. MetadataKey.h2GoAwayLastStreamID: "\(Int(streamID))"
  152. ]
  153. )
  154. let goAwayFrame = HTTP2Frame(
  155. streamID: .rootStream,
  156. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  157. )
  158. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  159. // We emit a ping after some GOAWAY frames.
  160. if operations.shouldPingAfterGoAway {
  161. let pingFrame = HTTP2Frame(
  162. streamID: .rootStream,
  163. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  164. )
  165. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  166. }
  167. self.context?.flush()
  168. }
  169. // Close the channel, if necessary.
  170. if operations.shouldCloseChannel, let context = self.context {
  171. // Close on the next event-loop tick so we don't drop any events which are
  172. // currently being processed.
  173. context.eventLoop.execute {
  174. self.stateMachine.logger.debug(
  175. "closing connection",
  176. metadata: ["connection_age_secs": .stringConvertible(self.connectionAgeInSeconds)]
  177. )
  178. context.close(mode: .all, promise: nil)
  179. }
  180. }
  181. }
  182. private func handlePingAction(_ action: PingHandler.Action) {
  183. switch action {
  184. case .none:
  185. ()
  186. case .ack:
  187. // NIO's HTTP2 handler acks for us so this is a no-op. Log so it doesn't appear that we are
  188. // ignoring pings.
  189. self.stateMachine.logger.debug(
  190. "sending PING frame",
  191. metadata: [MetadataKey.h2PingAck: "true"]
  192. )
  193. case .cancelScheduledTimeout:
  194. self.scheduledClose?.cancel()
  195. self.scheduledClose = nil
  196. case let .schedulePing(delay, timeout):
  197. self.schedulePing(in: delay, timeout: timeout)
  198. case let .reply(framePayload):
  199. switch framePayload {
  200. case .ping(_, let ack):
  201. self.stateMachine.logger.debug(
  202. "sending PING frame",
  203. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  204. )
  205. default:
  206. ()
  207. }
  208. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  209. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  210. case .ratchetDownLastSeenStreamID:
  211. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  212. }
  213. }
  214. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  215. guard delay != .nanoseconds(.max) else {
  216. return
  217. }
  218. self.stateMachine.logger.debug(
  219. "scheduled keepalive pings",
  220. metadata: [MetadataKey.intervalMs: "\(delay.milliseconds)"]
  221. )
  222. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  223. initialDelay: delay,
  224. delay: delay
  225. ) { _ in
  226. let action = self.pingHandler.pingFired()
  227. if case .none = action { return }
  228. self.handlePingAction(action)
  229. // `timeout` is less than `interval`, guaranteeing that the close task
  230. // will be fired before a new ping is triggered.
  231. assert(timeout < delay, "`timeout` must be less than `interval`")
  232. self.scheduleClose(in: timeout)
  233. }
  234. }
  235. private func scheduleClose(in timeout: TimeAmount) {
  236. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  237. self.stateMachine.logger.debug("keepalive timer expired")
  238. self.perform(operations: self.stateMachine.shutdownNow())
  239. }
  240. }
  241. private func idleTimeoutFired() {
  242. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  243. }
  244. func handlerAdded(context: ChannelHandlerContext) {
  245. self.context = context
  246. }
  247. func handlerRemoved(context: ChannelHandlerContext) {
  248. self.context = nil
  249. }
  250. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  251. if let created = event as? NIOHTTP2StreamCreatedEvent {
  252. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  253. self.handlePingAction(self.pingHandler.streamCreated())
  254. self.mode.connectionManager?.streamOpened()
  255. context.fireUserInboundEventTriggered(event)
  256. } else if let closed = event as? StreamClosedEvent {
  257. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  258. self.handlePingAction(self.pingHandler.streamClosed())
  259. self.mode.connectionManager?.streamClosed()
  260. context.fireUserInboundEventTriggered(event)
  261. } else if event is ChannelShouldQuiesceEvent {
  262. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  263. // Swallow this event.
  264. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  265. let tlsVersion = try? context.channel.getTLSVersionSync()
  266. self.stateMachine.logger.debug(
  267. "TLS handshake completed",
  268. metadata: [
  269. "alpn": "\(negotiatedProtocol ?? "nil")",
  270. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  271. ]
  272. )
  273. context.fireUserInboundEventTriggered(event)
  274. } else {
  275. #if canImport(Network)
  276. if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  277. if let waitsForConnectivity = event as? NIOTSNetworkEvents.WaitingForConnectivity {
  278. self.mode.connectionManager?.channelError(waitsForConnectivity.transientError)
  279. }
  280. }
  281. #endif
  282. context.fireUserInboundEventTriggered(event)
  283. }
  284. }
  285. func errorCaught(context: ChannelHandlerContext, error: Error) {
  286. // No state machine action here.
  287. self.mode.connectionManager?.channelError(error)
  288. context.fireErrorCaught(error)
  289. }
  290. func channelActive(context: ChannelHandlerContext) {
  291. self.stateMachine.logger.addIPAddressMetadata(
  292. local: context.localAddress,
  293. remote: context.remoteAddress
  294. )
  295. // No state machine action here.
  296. switch self.mode {
  297. case let .client(connectionManager, multiplexer):
  298. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  299. case .server:
  300. ()
  301. }
  302. context.fireChannelActive()
  303. }
  304. func channelInactive(context: ChannelHandlerContext) {
  305. self.perform(operations: self.stateMachine.channelInactive())
  306. self.scheduledPing?.cancel()
  307. self.scheduledClose?.cancel()
  308. self.scheduledPing = nil
  309. self.scheduledClose = nil
  310. context.fireChannelInactive()
  311. }
  312. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  313. let frame = self.unwrapInboundIn(data)
  314. switch frame.payload {
  315. case let .goAway(lastStreamID, errorCode, _):
  316. self.stateMachine.logger.debug(
  317. "received GOAWAY frame",
  318. metadata: [
  319. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  320. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  321. ]
  322. )
  323. self.perform(operations: self.stateMachine.receiveGoAway())
  324. case let .settings(.settings(settings)):
  325. self.perform(operations: self.stateMachine.receiveSettings(settings))
  326. case let .ping(data, ack):
  327. self.stateMachine.logger.debug(
  328. "received PING frame",
  329. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  330. )
  331. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  332. default:
  333. // We're not interested in other events.
  334. ()
  335. }
  336. context.fireChannelRead(data)
  337. }
  338. }
  339. extension HTTP2SettingsParameter {
  340. internal var loggingMetadataKey: String {
  341. switch self {
  342. case .headerTableSize:
  343. return "h2_settings_header_table_size"
  344. case .enablePush:
  345. return "h2_settings_enable_push"
  346. case .maxConcurrentStreams:
  347. return "h2_settings_max_concurrent_streams"
  348. case .initialWindowSize:
  349. return "h2_settings_initial_window_size"
  350. case .maxFrameSize:
  351. return "h2_settings_max_frame_size"
  352. case .maxHeaderListSize:
  353. return "h2_settings_max_header_list_size"
  354. case .enableConnectProtocol:
  355. return "h2_settings_enable_connect_protocol"
  356. default:
  357. return String(describing: self)
  358. }
  359. }
  360. }
  361. extension TimeAmount {
  362. fileprivate var milliseconds: Int64 {
  363. self.nanoseconds / 1_000_000
  364. }
  365. }