GRPCIdleHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. internal final class GRPCIdleHandler: ChannelInboundHandler {
  21. typealias InboundIn = HTTP2Frame
  22. typealias OutboundOut = HTTP2Frame
  23. /// The amount of time to wait before closing the channel when there are no active streams.
  24. private let idleTimeout: TimeAmount
  25. /// The ping handler.
  26. private var pingHandler: PingHandler
  27. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  28. private var scheduledClose: Scheduled<Void>?
  29. /// The scheduled task which will ping.
  30. private var scheduledPing: RepeatedTask?
  31. /// The mode we're operating in.
  32. private let mode: Mode
  33. private var context: ChannelHandlerContext?
  34. /// The mode of operation: the client tracks additional connection state in the connection
  35. /// manager.
  36. internal enum Mode {
  37. case client(ConnectionManager, HTTP2StreamMultiplexer)
  38. case server
  39. var connectionManager: ConnectionManager? {
  40. switch self {
  41. case let .client(manager, _):
  42. return manager
  43. case .server:
  44. return nil
  45. }
  46. }
  47. }
  48. /// The current state.
  49. private var stateMachine: GRPCIdleHandlerStateMachine
  50. init(
  51. connectionManager: ConnectionManager,
  52. multiplexer: HTTP2StreamMultiplexer,
  53. idleTimeout: TimeAmount,
  54. keepalive configuration: ClientConnectionKeepalive,
  55. logger: Logger
  56. ) {
  57. self.mode = .client(connectionManager, multiplexer)
  58. self.idleTimeout = idleTimeout
  59. self.stateMachine = .init(role: .client, logger: logger)
  60. self.pingHandler = PingHandler(
  61. pingCode: 5,
  62. interval: configuration.interval,
  63. timeout: configuration.timeout,
  64. permitWithoutCalls: configuration.permitWithoutCalls,
  65. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  66. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  67. )
  68. }
  69. init(
  70. idleTimeout: TimeAmount,
  71. keepalive configuration: ServerConnectionKeepalive,
  72. logger: Logger
  73. ) {
  74. self.mode = .server
  75. self.stateMachine = .init(role: .server, logger: logger)
  76. self.idleTimeout = idleTimeout
  77. self.pingHandler = PingHandler(
  78. pingCode: 10,
  79. interval: configuration.interval,
  80. timeout: configuration.timeout,
  81. permitWithoutCalls: configuration.permitWithoutCalls,
  82. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  83. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  84. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  85. maximumPingStrikes: configuration.maximumPingStrikes
  86. )
  87. }
  88. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  89. guard let context = self.context else {
  90. return
  91. }
  92. let frame = HTTP2Frame(
  93. streamID: .rootStream,
  94. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  95. )
  96. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  97. }
  98. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  99. // Prod the connection manager.
  100. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  101. switch event {
  102. case .idle:
  103. manager.idle()
  104. case .inactive:
  105. manager.channelInactive()
  106. case .ready:
  107. manager.ready()
  108. case .quiescing:
  109. manager.beginQuiescing()
  110. }
  111. }
  112. // Max concurrent streams changed.
  113. if let manager = self.mode.connectionManager,
  114. let maxConcurrentStreams = operations.maxConcurrentStreamsChange
  115. {
  116. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  117. }
  118. // Handle idle timeout creation/cancellation.
  119. if let idleTask = operations.idleTask {
  120. switch idleTask {
  121. case let .cancel(task):
  122. task.cancel()
  123. case .schedule:
  124. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  125. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  126. self.idleTimeoutFired()
  127. }
  128. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  129. }
  130. }
  131. }
  132. // Send a GOAWAY frame.
  133. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  134. let goAwayFrame = HTTP2Frame(
  135. streamID: .rootStream,
  136. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  137. )
  138. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  139. // We emit a ping after some GOAWAY frames.
  140. if operations.shouldPingAfterGoAway {
  141. let pingFrame = HTTP2Frame(
  142. streamID: .rootStream,
  143. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  144. )
  145. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  146. }
  147. self.context?.flush()
  148. }
  149. // Close the channel, if necessary.
  150. if operations.shouldCloseChannel, let context = self.context {
  151. // Close on the next event-loop tick so we don't drop any events which are
  152. // currently being processed.
  153. context.eventLoop.execute {
  154. context.close(mode: .all, promise: nil)
  155. }
  156. }
  157. }
  158. private func handlePingAction(_ action: PingHandler.Action) {
  159. switch action {
  160. case .none:
  161. ()
  162. case .ack:
  163. // NIO's HTTP2 handler acks for us so this is a no-op.
  164. ()
  165. case .cancelScheduledTimeout:
  166. self.scheduledClose?.cancel()
  167. self.scheduledClose = nil
  168. case let .schedulePing(delay, timeout):
  169. self.schedulePing(in: delay, timeout: timeout)
  170. case let .reply(framePayload):
  171. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  172. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  173. case .ratchetDownLastSeenStreamID:
  174. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  175. }
  176. }
  177. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  178. guard delay != .nanoseconds(.max) else {
  179. return
  180. }
  181. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  182. initialDelay: delay,
  183. delay: delay
  184. ) { _ in
  185. let action = self.pingHandler.pingFired()
  186. if case .none = action { return }
  187. self.handlePingAction(action)
  188. // `timeout` is less than `interval`, guaranteeing that the close task
  189. // will be fired before a new ping is triggered.
  190. assert(timeout < delay, "`timeout` must be less than `interval`")
  191. self.scheduleClose(in: timeout)
  192. }
  193. }
  194. private func scheduleClose(in timeout: TimeAmount) {
  195. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  196. self.perform(operations: self.stateMachine.shutdownNow())
  197. }
  198. }
  199. private func idleTimeoutFired() {
  200. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  201. }
  202. func handlerAdded(context: ChannelHandlerContext) {
  203. self.context = context
  204. }
  205. func handlerRemoved(context: ChannelHandlerContext) {
  206. self.context = nil
  207. }
  208. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  209. if let created = event as? NIOHTTP2StreamCreatedEvent {
  210. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  211. self.handlePingAction(self.pingHandler.streamCreated())
  212. self.mode.connectionManager?.streamOpened()
  213. context.fireUserInboundEventTriggered(event)
  214. } else if let closed = event as? StreamClosedEvent {
  215. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  216. self.handlePingAction(self.pingHandler.streamClosed())
  217. self.mode.connectionManager?.streamClosed()
  218. context.fireUserInboundEventTriggered(event)
  219. } else if event is ChannelShouldQuiesceEvent {
  220. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  221. // Swallow this event.
  222. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  223. let tlsVersion = try? context.channel.getTLSVersionSync()
  224. self.stateMachine.logger.debug(
  225. "TLS handshake completed",
  226. metadata: [
  227. "alpn": "\(negotiatedProtocol ?? "nil")",
  228. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  229. ]
  230. )
  231. context.fireUserInboundEventTriggered(event)
  232. } else {
  233. context.fireUserInboundEventTriggered(event)
  234. }
  235. }
  236. func errorCaught(context: ChannelHandlerContext, error: Error) {
  237. // No state machine action here.
  238. self.mode.connectionManager?.channelError(error)
  239. context.fireErrorCaught(error)
  240. }
  241. func channelActive(context: ChannelHandlerContext) {
  242. self.stateMachine.logger.addIPAddressMetadata(
  243. local: context.localAddress,
  244. remote: context.remoteAddress
  245. )
  246. // No state machine action here.
  247. switch self.mode {
  248. case let .client(connectionManager, multiplexer):
  249. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  250. case .server:
  251. ()
  252. }
  253. context.fireChannelActive()
  254. }
  255. func channelInactive(context: ChannelHandlerContext) {
  256. self.perform(operations: self.stateMachine.channelInactive())
  257. self.scheduledPing?.cancel()
  258. self.scheduledClose?.cancel()
  259. self.scheduledPing = nil
  260. self.scheduledClose = nil
  261. context.fireChannelInactive()
  262. }
  263. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  264. let frame = self.unwrapInboundIn(data)
  265. switch frame.payload {
  266. case let .goAway(lastStreamID, errorCode, _):
  267. self.stateMachine.logger.debug(
  268. "received GOAWAY frame",
  269. metadata: [
  270. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  271. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  272. ]
  273. )
  274. self.perform(operations: self.stateMachine.receiveGoAway())
  275. case let .settings(.settings(settings)):
  276. self.perform(operations: self.stateMachine.receiveSettings(settings))
  277. case let .ping(data, ack):
  278. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  279. default:
  280. // We're not interested in other events.
  281. ()
  282. }
  283. context.fireChannelRead(data)
  284. }
  285. }
  286. extension HTTP2SettingsParameter {
  287. internal var loggingMetadataKey: String {
  288. switch self {
  289. case .headerTableSize:
  290. return "h2_settings_header_table_size"
  291. case .enablePush:
  292. return "h2_settings_enable_push"
  293. case .maxConcurrentStreams:
  294. return "h2_settings_max_concurrent_streams"
  295. case .initialWindowSize:
  296. return "h2_settings_initial_window_size"
  297. case .maxFrameSize:
  298. return "h2_settings_max_frame_size"
  299. case .maxHeaderListSize:
  300. return "h2_settings_max_header_list_size"
  301. case .enableConnectProtocol:
  302. return "h2_settings_enable_connect_protocol"
  303. default:
  304. return String(describing: self)
  305. }
  306. }
  307. }