2
0

GRPCIdleHandler.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. import NIOTransportServices
  21. internal final class GRPCIdleHandler: ChannelInboundHandler {
  22. typealias InboundIn = HTTP2Frame
  23. typealias OutboundOut = HTTP2Frame
  24. /// The amount of time to wait before closing the channel when there are no active streams.
  25. private let idleTimeout: TimeAmount
  26. /// The ping handler.
  27. private var pingHandler: PingHandler
  28. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  29. private var scheduledClose: Scheduled<Void>?
  30. /// The scheduled task which will ping.
  31. private var scheduledPing: RepeatedTask?
  32. /// The mode we're operating in.
  33. private let mode: Mode
  34. private var context: ChannelHandlerContext?
  35. /// The mode of operation: the client tracks additional connection state in the connection
  36. /// manager.
  37. internal enum Mode {
  38. case client(ConnectionManager, HTTP2StreamMultiplexer)
  39. case server
  40. var connectionManager: ConnectionManager? {
  41. switch self {
  42. case let .client(manager, _):
  43. return manager
  44. case .server:
  45. return nil
  46. }
  47. }
  48. }
  49. /// The current state.
  50. private var stateMachine: GRPCIdleHandlerStateMachine
  51. init(
  52. connectionManager: ConnectionManager,
  53. multiplexer: HTTP2StreamMultiplexer,
  54. idleTimeout: TimeAmount,
  55. keepalive configuration: ClientConnectionKeepalive,
  56. logger: Logger
  57. ) {
  58. self.mode = .client(connectionManager, multiplexer)
  59. self.idleTimeout = idleTimeout
  60. self.stateMachine = .init(role: .client, logger: logger)
  61. self.pingHandler = PingHandler(
  62. pingCode: 5,
  63. interval: configuration.interval,
  64. timeout: configuration.timeout,
  65. permitWithoutCalls: configuration.permitWithoutCalls,
  66. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  67. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  68. )
  69. }
  70. init(
  71. idleTimeout: TimeAmount,
  72. keepalive configuration: ServerConnectionKeepalive,
  73. logger: Logger
  74. ) {
  75. self.mode = .server
  76. self.stateMachine = .init(role: .server, logger: logger)
  77. self.idleTimeout = idleTimeout
  78. self.pingHandler = PingHandler(
  79. pingCode: 10,
  80. interval: configuration.interval,
  81. timeout: configuration.timeout,
  82. permitWithoutCalls: configuration.permitWithoutCalls,
  83. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  84. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  85. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  86. maximumPingStrikes: configuration.maximumPingStrikes
  87. )
  88. }
  89. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  90. // Prod the connection manager.
  91. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  92. switch event {
  93. case .idle:
  94. manager.idle()
  95. case .inactive:
  96. manager.channelInactive()
  97. case .ready:
  98. manager.ready()
  99. case .quiescing:
  100. manager.beginQuiescing()
  101. }
  102. }
  103. // Max concurrent streams changed.
  104. if let manager = self.mode.connectionManager,
  105. let maxConcurrentStreams = operations.maxConcurrentStreamsChange
  106. {
  107. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  108. }
  109. // Handle idle timeout creation/cancellation.
  110. if let idleTask = operations.idleTask {
  111. switch idleTask {
  112. case let .cancel(task):
  113. self.stateMachine.logger.debug("idle timeout task cancelled")
  114. task.cancel()
  115. case .schedule:
  116. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  117. self.stateMachine.logger.debug(
  118. "scheduling idle timeout task",
  119. metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"]
  120. )
  121. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  122. self.stateMachine.logger.debug("idle timeout task fired")
  123. self.idleTimeoutFired()
  124. }
  125. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  126. }
  127. }
  128. }
  129. // Send a GOAWAY frame.
  130. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  131. self.stateMachine.logger.debug(
  132. "sending GOAWAY frame",
  133. metadata: [
  134. MetadataKey.h2GoAwayLastStreamID: "\(Int(streamID))"
  135. ]
  136. )
  137. let goAwayFrame = HTTP2Frame(
  138. streamID: .rootStream,
  139. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  140. )
  141. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  142. // We emit a ping after some GOAWAY frames.
  143. if operations.shouldPingAfterGoAway {
  144. let pingFrame = HTTP2Frame(
  145. streamID: .rootStream,
  146. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  147. )
  148. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  149. }
  150. self.context?.flush()
  151. }
  152. // Close the channel, if necessary.
  153. if operations.shouldCloseChannel, let context = self.context {
  154. // Close on the next event-loop tick so we don't drop any events which are
  155. // currently being processed.
  156. context.eventLoop.execute {
  157. self.stateMachine.logger.debug("closing connection")
  158. context.close(mode: .all, promise: nil)
  159. }
  160. }
  161. }
  162. private func handlePingAction(_ action: PingHandler.Action) {
  163. switch action {
  164. case .none:
  165. ()
  166. case .ack:
  167. // NIO's HTTP2 handler acks for us so this is a no-op. Log so it doesn't appear that we are
  168. // ignoring pings.
  169. self.stateMachine.logger.debug(
  170. "sending PING frame",
  171. metadata: [MetadataKey.h2PingAck: "true"]
  172. )
  173. case .cancelScheduledTimeout:
  174. self.scheduledClose?.cancel()
  175. self.scheduledClose = nil
  176. case let .schedulePing(delay, timeout):
  177. self.schedulePing(in: delay, timeout: timeout)
  178. case let .reply(framePayload):
  179. switch framePayload {
  180. case .ping(_, let ack):
  181. self.stateMachine.logger.debug(
  182. "sending PING frame",
  183. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  184. )
  185. default:
  186. ()
  187. }
  188. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  189. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  190. case .ratchetDownLastSeenStreamID:
  191. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  192. }
  193. }
  194. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  195. guard delay != .nanoseconds(.max) else {
  196. return
  197. }
  198. self.stateMachine.logger.debug(
  199. "scheduled keepalive pings",
  200. metadata: [MetadataKey.intervalMs: "\(delay.milliseconds)"]
  201. )
  202. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  203. initialDelay: delay,
  204. delay: delay
  205. ) { _ in
  206. let action = self.pingHandler.pingFired()
  207. if case .none = action { return }
  208. self.handlePingAction(action)
  209. // `timeout` is less than `interval`, guaranteeing that the close task
  210. // will be fired before a new ping is triggered.
  211. assert(timeout < delay, "`timeout` must be less than `interval`")
  212. self.scheduleClose(in: timeout)
  213. }
  214. }
  215. private func scheduleClose(in timeout: TimeAmount) {
  216. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  217. self.stateMachine.logger.debug("keepalive timer expired")
  218. self.perform(operations: self.stateMachine.shutdownNow())
  219. }
  220. }
  221. private func idleTimeoutFired() {
  222. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  223. }
  224. func handlerAdded(context: ChannelHandlerContext) {
  225. self.context = context
  226. }
  227. func handlerRemoved(context: ChannelHandlerContext) {
  228. self.context = nil
  229. }
  230. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  231. if let created = event as? NIOHTTP2StreamCreatedEvent {
  232. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  233. self.handlePingAction(self.pingHandler.streamCreated())
  234. self.mode.connectionManager?.streamOpened()
  235. context.fireUserInboundEventTriggered(event)
  236. } else if let closed = event as? StreamClosedEvent {
  237. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  238. self.handlePingAction(self.pingHandler.streamClosed())
  239. self.mode.connectionManager?.streamClosed()
  240. context.fireUserInboundEventTriggered(event)
  241. } else if event is ChannelShouldQuiesceEvent {
  242. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  243. // Swallow this event.
  244. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  245. let tlsVersion = try? context.channel.getTLSVersionSync()
  246. self.stateMachine.logger.debug(
  247. "TLS handshake completed",
  248. metadata: [
  249. "alpn": "\(negotiatedProtocol ?? "nil")",
  250. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  251. ]
  252. )
  253. context.fireUserInboundEventTriggered(event)
  254. } else {
  255. #if canImport(Network)
  256. if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  257. if let waitsForConnectivity = event as? NIOTSNetworkEvents.WaitingForConnectivity {
  258. self.mode.connectionManager?.channelError(waitsForConnectivity.transientError)
  259. }
  260. }
  261. #endif
  262. context.fireUserInboundEventTriggered(event)
  263. }
  264. }
  265. func errorCaught(context: ChannelHandlerContext, error: Error) {
  266. // No state machine action here.
  267. self.mode.connectionManager?.channelError(error)
  268. context.fireErrorCaught(error)
  269. }
  270. func channelActive(context: ChannelHandlerContext) {
  271. self.stateMachine.logger.addIPAddressMetadata(
  272. local: context.localAddress,
  273. remote: context.remoteAddress
  274. )
  275. // No state machine action here.
  276. switch self.mode {
  277. case let .client(connectionManager, multiplexer):
  278. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  279. case .server:
  280. ()
  281. }
  282. context.fireChannelActive()
  283. }
  284. func channelInactive(context: ChannelHandlerContext) {
  285. self.perform(operations: self.stateMachine.channelInactive())
  286. self.scheduledPing?.cancel()
  287. self.scheduledClose?.cancel()
  288. self.scheduledPing = nil
  289. self.scheduledClose = nil
  290. context.fireChannelInactive()
  291. }
  292. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  293. let frame = self.unwrapInboundIn(data)
  294. switch frame.payload {
  295. case let .goAway(lastStreamID, errorCode, _):
  296. self.stateMachine.logger.debug(
  297. "received GOAWAY frame",
  298. metadata: [
  299. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  300. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  301. ]
  302. )
  303. self.perform(operations: self.stateMachine.receiveGoAway())
  304. case let .settings(.settings(settings)):
  305. self.perform(operations: self.stateMachine.receiveSettings(settings))
  306. case let .ping(data, ack):
  307. self.stateMachine.logger.debug(
  308. "received PING frame",
  309. metadata: [MetadataKey.h2PingAck: "\(ack)"]
  310. )
  311. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  312. default:
  313. // We're not interested in other events.
  314. ()
  315. }
  316. context.fireChannelRead(data)
  317. }
  318. }
  319. extension HTTP2SettingsParameter {
  320. internal var loggingMetadataKey: String {
  321. switch self {
  322. case .headerTableSize:
  323. return "h2_settings_header_table_size"
  324. case .enablePush:
  325. return "h2_settings_enable_push"
  326. case .maxConcurrentStreams:
  327. return "h2_settings_max_concurrent_streams"
  328. case .initialWindowSize:
  329. return "h2_settings_initial_window_size"
  330. case .maxFrameSize:
  331. return "h2_settings_max_frame_size"
  332. case .maxHeaderListSize:
  333. return "h2_settings_max_header_list_size"
  334. case .enableConnectProtocol:
  335. return "h2_settings_enable_connect_protocol"
  336. default:
  337. return String(describing: self)
  338. }
  339. }
  340. }
  341. extension TimeAmount {
  342. fileprivate var milliseconds: Int64 {
  343. self.nanoseconds / 1_000_000
  344. }
  345. }