GRPCIdleHandler.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. internal final class GRPCIdleHandler: ChannelInboundHandler {
  21. typealias InboundIn = HTTP2Frame
  22. typealias OutboundOut = HTTP2Frame
  23. /// The amount of time to wait before closing the channel when there are no active streams.
  24. private let idleTimeout: TimeAmount
  25. /// The ping handler.
  26. private var pingHandler: PingHandler
  27. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  28. private var scheduledClose: Scheduled<Void>?
  29. /// The scheduled task which will ping.
  30. private var scheduledPing: RepeatedTask?
  31. /// The mode we're operating in.
  32. ///
  33. /// This is a `var` to allow the client configuration state to be updated.
  34. private var mode: Mode
  35. private var context: ChannelHandlerContext?
  36. /// Keeps track of the client configuration state.
  37. /// We need two levels of configuration to break the dependency cycle with the stream multiplexer.
  38. internal enum ClientConfigurationState {
  39. case partial(ConnectionManager)
  40. case complete(ConnectionManager, NIOHTTP2Handler.StreamMultiplexer)
  41. case deinitialized
  42. mutating func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
  43. switch self {
  44. case let .partial(connectionManager):
  45. self = .complete(connectionManager, multiplexer)
  46. case .complete:
  47. preconditionFailure("Setting the multiplexer twice is not supported.")
  48. case .deinitialized:
  49. preconditionFailure(
  50. "Setting the multiplexer after removing from a channel is not supported."
  51. )
  52. }
  53. }
  54. }
  55. /// The mode of operation: the client tracks additional connection state in the connection
  56. /// manager.
  57. internal enum Mode {
  58. case client(ClientConfigurationState)
  59. case server
  60. mutating func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
  61. switch self {
  62. case var .client(clientConfigurationState):
  63. clientConfigurationState.setMultiplexer(multiplexer)
  64. self = .client(clientConfigurationState)
  65. case .server:
  66. preconditionFailure("Setting the multiplexer in server mode is not supported.")
  67. }
  68. }
  69. var connectionManager: ConnectionManager? {
  70. switch self {
  71. case let .client(configurationState):
  72. switch configurationState {
  73. case let .complete(connectionManager, _):
  74. return connectionManager
  75. case let .partial(connectionManager):
  76. return connectionManager
  77. case .deinitialized:
  78. return nil
  79. }
  80. case .server:
  81. return nil
  82. }
  83. }
  84. mutating func deinitialize() {
  85. switch self {
  86. case .client:
  87. self = .client(.deinitialized)
  88. case .server:
  89. break // nothing to drop
  90. }
  91. }
  92. }
  93. /// The current state.
  94. private var stateMachine: GRPCIdleHandlerStateMachine
  95. init(
  96. connectionManager: ConnectionManager,
  97. idleTimeout: TimeAmount,
  98. keepalive configuration: ClientConnectionKeepalive,
  99. logger: Logger
  100. ) {
  101. self.mode = .client(.partial(connectionManager))
  102. self.idleTimeout = idleTimeout
  103. self.stateMachine = .init(role: .client, logger: logger)
  104. self.pingHandler = PingHandler(
  105. pingCode: 5,
  106. interval: configuration.interval,
  107. timeout: configuration.timeout,
  108. permitWithoutCalls: configuration.permitWithoutCalls,
  109. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  110. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  111. )
  112. }
  113. init(
  114. idleTimeout: TimeAmount,
  115. keepalive configuration: ServerConnectionKeepalive,
  116. logger: Logger
  117. ) {
  118. self.mode = .server
  119. self.stateMachine = .init(role: .server, logger: logger)
  120. self.idleTimeout = idleTimeout
  121. self.pingHandler = PingHandler(
  122. pingCode: 10,
  123. interval: configuration.interval,
  124. timeout: configuration.timeout,
  125. permitWithoutCalls: configuration.permitWithoutCalls,
  126. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  127. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  128. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  129. maximumPingStrikes: configuration.maximumPingStrikes
  130. )
  131. }
  132. internal func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
  133. self.mode.setMultiplexer(multiplexer)
  134. }
  135. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  136. guard let context = self.context else {
  137. return
  138. }
  139. let frame = HTTP2Frame(
  140. streamID: .rootStream,
  141. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  142. )
  143. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  144. }
  145. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  146. // Prod the connection manager.
  147. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  148. switch event {
  149. case .idle:
  150. manager.idle()
  151. case .inactive:
  152. manager.channelInactive()
  153. case .ready:
  154. manager.ready()
  155. case .quiescing:
  156. manager.beginQuiescing()
  157. }
  158. }
  159. // Max concurrent streams changed.
  160. if let manager = self.mode.connectionManager,
  161. let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
  162. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  163. }
  164. // Handle idle timeout creation/cancellation.
  165. if let idleTask = operations.idleTask {
  166. switch idleTask {
  167. case let .cancel(task):
  168. task.cancel()
  169. case .schedule:
  170. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  171. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  172. self.idleTimeoutFired()
  173. }
  174. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  175. }
  176. }
  177. }
  178. // Send a GOAWAY frame.
  179. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  180. let goAwayFrame = HTTP2Frame(
  181. streamID: .rootStream,
  182. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  183. )
  184. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  185. // We emit a ping after some GOAWAY frames.
  186. if operations.shouldPingAfterGoAway {
  187. let pingFrame = HTTP2Frame(
  188. streamID: .rootStream,
  189. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  190. )
  191. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  192. }
  193. self.context?.flush()
  194. }
  195. // Close the channel, if necessary.
  196. if operations.shouldCloseChannel, let context = self.context {
  197. // Close on the next event-loop tick so we don't drop any events which are
  198. // currently being processed.
  199. context.eventLoop.execute {
  200. context.close(mode: .all, promise: nil)
  201. }
  202. }
  203. }
  204. private func handlePingAction(_ action: PingHandler.Action) {
  205. switch action {
  206. case .none:
  207. ()
  208. case .ack:
  209. // NIO's HTTP2 handler acks for us so this is a no-op.
  210. ()
  211. case .cancelScheduledTimeout:
  212. self.scheduledClose?.cancel()
  213. self.scheduledClose = nil
  214. case let .schedulePing(delay, timeout):
  215. self.schedulePing(in: delay, timeout: timeout)
  216. case let .reply(framePayload):
  217. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  218. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  219. case .ratchetDownLastSeenStreamID:
  220. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  221. }
  222. }
  223. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  224. guard delay != .nanoseconds(.max) else {
  225. return
  226. }
  227. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  228. initialDelay: delay,
  229. delay: delay
  230. ) { _ in
  231. self.handlePingAction(self.pingHandler.pingFired())
  232. // `timeout` is less than `interval`, guaranteeing that the close task
  233. // will be fired before a new ping is triggered.
  234. assert(timeout < delay, "`timeout` must be less than `interval`")
  235. self.scheduleClose(in: timeout)
  236. }
  237. }
  238. private func scheduleClose(in timeout: TimeAmount) {
  239. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  240. self.perform(operations: self.stateMachine.shutdownNow())
  241. }
  242. }
  243. private func idleTimeoutFired() {
  244. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  245. }
  246. func handlerAdded(context: ChannelHandlerContext) {
  247. self.context = context
  248. }
  249. func handlerRemoved(context: ChannelHandlerContext) {
  250. self.context = nil
  251. self.mode.deinitialize()
  252. }
  253. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  254. if event is ChannelShouldQuiesceEvent {
  255. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  256. // Swallow this event.
  257. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  258. let tlsVersion = try? context.channel.getTLSVersionSync()
  259. self.stateMachine.logger.debug("TLS handshake completed", metadata: [
  260. "alpn": "\(negotiatedProtocol ?? "nil")",
  261. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  262. ])
  263. context.fireUserInboundEventTriggered(event)
  264. } else {
  265. context.fireUserInboundEventTriggered(event)
  266. }
  267. }
  268. func errorCaught(context: ChannelHandlerContext, error: Error) {
  269. // No state machine action here.
  270. self.mode.connectionManager?.channelError(error)
  271. context.fireErrorCaught(error)
  272. }
  273. func channelActive(context: ChannelHandlerContext) {
  274. self.stateMachine.logger.addIPAddressMetadata(
  275. local: context.localAddress,
  276. remote: context.remoteAddress
  277. )
  278. // No state machine action here.
  279. switch self.mode {
  280. case let .client(configurationState):
  281. switch configurationState {
  282. case let .complete(connectionManager, multiplexer):
  283. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  284. case .partial:
  285. preconditionFailure("not yet initialised")
  286. case .deinitialized:
  287. preconditionFailure("removed from channel")
  288. }
  289. case .server:
  290. ()
  291. }
  292. context.fireChannelActive()
  293. }
  294. func channelInactive(context: ChannelHandlerContext) {
  295. self.perform(operations: self.stateMachine.channelInactive())
  296. self.scheduledPing?.cancel()
  297. self.scheduledClose?.cancel()
  298. self.scheduledPing = nil
  299. self.scheduledClose = nil
  300. context.fireChannelInactive()
  301. }
  302. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  303. let frame = self.unwrapInboundIn(data)
  304. switch frame.payload {
  305. case let .goAway(lastStreamID, errorCode, _):
  306. self.stateMachine.logger.debug("received GOAWAY frame", metadata: [
  307. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  308. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  309. ])
  310. self.perform(operations: self.stateMachine.receiveGoAway())
  311. case let .settings(.settings(settings)):
  312. self.perform(operations: self.stateMachine.receiveSettings(settings))
  313. case let .ping(data, ack):
  314. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  315. default:
  316. // We're not interested in other events.
  317. ()
  318. }
  319. context.fireChannelRead(data)
  320. }
  321. }
  322. extension GRPCIdleHandler: NIOHTTP2StreamDelegate {
  323. func streamCreated(_ id: NIOHTTP2.HTTP2StreamID, channel: NIOCore.Channel) {
  324. self.perform(operations: self.stateMachine.streamCreated(withID: id))
  325. self.handlePingAction(self.pingHandler.streamCreated())
  326. self.mode.connectionManager?.streamOpened()
  327. }
  328. func streamClosed(_ id: NIOHTTP2.HTTP2StreamID, channel: NIOCore.Channel) {
  329. self.perform(operations: self.stateMachine.streamClosed(withID: id))
  330. self.handlePingAction(self.pingHandler.streamClosed())
  331. self.mode.connectionManager?.streamClosed()
  332. }
  333. }
  334. extension HTTP2SettingsParameter {
  335. internal var loggingMetadataKey: String {
  336. switch self {
  337. case .headerTableSize:
  338. return "h2_settings_header_table_size"
  339. case .enablePush:
  340. return "h2_settings_enable_push"
  341. case .maxConcurrentStreams:
  342. return "h2_settings_max_concurrent_streams"
  343. case .initialWindowSize:
  344. return "h2_settings_initial_window_size"
  345. case .maxFrameSize:
  346. return "h2_settings_max_frame_size"
  347. case .maxHeaderListSize:
  348. return "h2_settings_max_header_list_size"
  349. case .enableConnectProtocol:
  350. return "h2_settings_enable_connect_protocol"
  351. default:
  352. return String(describing: self)
  353. }
  354. }
  355. }