GRPCIdleHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. import NIOTLS
  20. internal final class GRPCIdleHandler: ChannelInboundHandler {
  21. typealias InboundIn = HTTP2Frame
  22. typealias OutboundOut = HTTP2Frame
  23. /// The amount of time to wait before closing the channel when there are no active streams.
  24. private let idleTimeout: TimeAmount
  25. /// The ping handler.
  26. private var pingHandler: PingHandler
  27. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  28. private var scheduledClose: Scheduled<Void>?
  29. /// The scheduled task which will ping.
  30. private var scheduledPing: RepeatedTask?
  31. /// The mode we're operating in.
  32. private let mode: Mode
  33. private var context: ChannelHandlerContext?
  34. /// The mode of operation: the client tracks additional connection state in the connection
  35. /// manager.
  36. internal enum Mode {
  37. case client(ConnectionManager, HTTP2StreamMultiplexer)
  38. case server
  39. var connectionManager: ConnectionManager? {
  40. switch self {
  41. case let .client(manager, _):
  42. return manager
  43. case .server:
  44. return nil
  45. }
  46. }
  47. }
  48. /// The current state.
  49. private var stateMachine: GRPCIdleHandlerStateMachine
  50. init(
  51. connectionManager: ConnectionManager,
  52. multiplexer: HTTP2StreamMultiplexer,
  53. idleTimeout: TimeAmount,
  54. keepalive configuration: ClientConnectionKeepalive,
  55. logger: Logger
  56. ) {
  57. self.mode = .client(connectionManager, multiplexer)
  58. self.idleTimeout = idleTimeout
  59. self.stateMachine = .init(role: .client, logger: logger)
  60. self.pingHandler = PingHandler(
  61. pingCode: 5,
  62. interval: configuration.interval,
  63. timeout: configuration.timeout,
  64. permitWithoutCalls: configuration.permitWithoutCalls,
  65. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  66. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  67. )
  68. }
  69. init(
  70. idleTimeout: TimeAmount,
  71. keepalive configuration: ServerConnectionKeepalive,
  72. logger: Logger
  73. ) {
  74. self.mode = .server
  75. self.stateMachine = .init(role: .server, logger: logger)
  76. self.idleTimeout = idleTimeout
  77. self.pingHandler = PingHandler(
  78. pingCode: 10,
  79. interval: configuration.interval,
  80. timeout: configuration.timeout,
  81. permitWithoutCalls: configuration.permitWithoutCalls,
  82. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  83. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  84. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  85. maximumPingStrikes: configuration.maximumPingStrikes
  86. )
  87. }
  88. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  89. guard let context = self.context else {
  90. return
  91. }
  92. let frame = HTTP2Frame(
  93. streamID: .rootStream,
  94. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  95. )
  96. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  97. }
  98. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  99. // Prod the connection manager.
  100. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  101. switch event {
  102. case .idle:
  103. manager.idle()
  104. case .inactive:
  105. manager.channelInactive()
  106. case .ready:
  107. manager.ready()
  108. case .quiescing:
  109. manager.beginQuiescing()
  110. }
  111. }
  112. // Max concurrent streams changed.
  113. if let manager = self.mode.connectionManager,
  114. let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
  115. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  116. }
  117. // Handle idle timeout creation/cancellation.
  118. if let idleTask = operations.idleTask {
  119. switch idleTask {
  120. case let .cancel(task):
  121. task.cancel()
  122. case .schedule:
  123. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  124. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  125. self.idleTimeoutFired()
  126. }
  127. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  128. }
  129. }
  130. }
  131. // Send a GOAWAY frame.
  132. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  133. let goAwayFrame = HTTP2Frame(
  134. streamID: .rootStream,
  135. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  136. )
  137. self.context?.write(self.wrapOutboundOut(goAwayFrame), promise: nil)
  138. // We emit a ping after some GOAWAY frames.
  139. if operations.shouldPingAfterGoAway {
  140. let pingFrame = HTTP2Frame(
  141. streamID: .rootStream,
  142. payload: .ping(self.pingHandler.pingDataGoAway, ack: false)
  143. )
  144. self.context?.write(self.wrapOutboundOut(pingFrame), promise: nil)
  145. }
  146. self.context?.flush()
  147. }
  148. // Close the channel, if necessary.
  149. if operations.shouldCloseChannel, let context = self.context {
  150. // Close on the next event-loop tick so we don't drop any events which are
  151. // currently being processed.
  152. context.eventLoop.execute {
  153. context.close(mode: .all, promise: nil)
  154. }
  155. }
  156. }
  157. private func handlePingAction(_ action: PingHandler.Action) {
  158. switch action {
  159. case .none:
  160. ()
  161. case .cancelScheduledTimeout:
  162. self.scheduledClose?.cancel()
  163. self.scheduledClose = nil
  164. case let .schedulePing(delay, timeout):
  165. self.schedulePing(in: delay, timeout: timeout)
  166. case let .reply(framePayload):
  167. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  168. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  169. case .ratchetDownLastSeenStreamID:
  170. self.perform(operations: self.stateMachine.ratchetDownGoAwayStreamID())
  171. }
  172. }
  173. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  174. guard delay != .nanoseconds(.max) else {
  175. return
  176. }
  177. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  178. initialDelay: delay,
  179. delay: delay
  180. ) { _ in
  181. self.handlePingAction(self.pingHandler.pingFired())
  182. // `timeout` is less than `interval`, guaranteeing that the close task
  183. // will be fired before a new ping is triggered.
  184. assert(timeout < delay, "`timeout` must be less than `interval`")
  185. self.scheduleClose(in: timeout)
  186. }
  187. }
  188. private func scheduleClose(in timeout: TimeAmount) {
  189. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  190. self.perform(operations: self.stateMachine.shutdownNow())
  191. }
  192. }
  193. private func idleTimeoutFired() {
  194. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  195. }
  196. func handlerAdded(context: ChannelHandlerContext) {
  197. self.context = context
  198. }
  199. func handlerRemoved(context: ChannelHandlerContext) {
  200. self.context = nil
  201. }
  202. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  203. if let created = event as? NIOHTTP2StreamCreatedEvent {
  204. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  205. self.handlePingAction(self.pingHandler.streamCreated())
  206. context.fireUserInboundEventTriggered(event)
  207. } else if let closed = event as? StreamClosedEvent {
  208. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  209. self.handlePingAction(self.pingHandler.streamClosed())
  210. self.mode.connectionManager?.streamClosed()
  211. context.fireUserInboundEventTriggered(event)
  212. } else if event is ChannelShouldQuiesceEvent {
  213. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  214. // Swallow this event.
  215. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent {
  216. let tlsVersion = try? context.channel.getTLSVersionSync()
  217. self.stateMachine.logger.debug("TLS handshake completed", metadata: [
  218. "alpn": "\(negotiatedProtocol ?? "nil")",
  219. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  220. ])
  221. context.fireUserInboundEventTriggered(event)
  222. } else {
  223. context.fireUserInboundEventTriggered(event)
  224. }
  225. }
  226. func errorCaught(context: ChannelHandlerContext, error: Error) {
  227. // No state machine action here.
  228. self.mode.connectionManager?.channelError(error)
  229. context.fireErrorCaught(error)
  230. }
  231. func channelActive(context: ChannelHandlerContext) {
  232. self.stateMachine.logger.addIPAddressMetadata(
  233. local: context.localAddress,
  234. remote: context.remoteAddress
  235. )
  236. // No state machine action here.
  237. switch self.mode {
  238. case let .client(connectionManager, multiplexer):
  239. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  240. case .server:
  241. ()
  242. }
  243. context.fireChannelActive()
  244. }
  245. func channelInactive(context: ChannelHandlerContext) {
  246. self.perform(operations: self.stateMachine.channelInactive())
  247. self.scheduledPing?.cancel()
  248. self.scheduledClose?.cancel()
  249. self.scheduledPing = nil
  250. self.scheduledClose = nil
  251. context.fireChannelInactive()
  252. }
  253. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  254. let frame = self.unwrapInboundIn(data)
  255. switch frame.payload {
  256. case let .goAway(lastStreamID, errorCode, _):
  257. self.stateMachine.logger.debug("received GOAWAY frame", metadata: [
  258. MetadataKey.h2GoAwayLastStreamID: "\(Int(lastStreamID))",
  259. MetadataKey.h2GoAwayError: "\(errorCode.networkCode)",
  260. ])
  261. self.perform(operations: self.stateMachine.receiveGoAway())
  262. case let .settings(.settings(settings)):
  263. self.perform(operations: self.stateMachine.receiveSettings(settings))
  264. case let .ping(data, ack):
  265. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  266. default:
  267. // We're not interested in other events.
  268. ()
  269. }
  270. context.fireChannelRead(data)
  271. }
  272. }
  273. extension HTTP2SettingsParameter {
  274. internal var loggingMetadataKey: String {
  275. switch self {
  276. case .headerTableSize:
  277. return "h2_settings_header_table_size"
  278. case .enablePush:
  279. return "h2_settings_enable_push"
  280. case .maxConcurrentStreams:
  281. return "h2_settings_max_concurrent_streams"
  282. case .initialWindowSize:
  283. return "h2_settings_initial_window_size"
  284. case .maxFrameSize:
  285. return "h2_settings_max_frame_size"
  286. case .maxHeaderListSize:
  287. return "h2_settings_max_header_list_size"
  288. case .enableConnectProtocol:
  289. return "h2_settings_enable_connect_protocol"
  290. default:
  291. return String(describing: self)
  292. }
  293. }
  294. }