GRPCIdleHandler.swift 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal final class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. typealias OutboundOut = HTTP2Frame
  22. /// The amount of time to wait before closing the channel when there are no active streams.
  23. private let idleTimeout: TimeAmount
  24. /// The ping handler.
  25. private var pingHandler: PingHandler
  26. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  27. private var scheduledClose: Scheduled<Void>?
  28. /// The scheduled task which will ping.
  29. private var scheduledPing: RepeatedTask?
  30. /// The mode we're operating in.
  31. private let mode: Mode
  32. /// A logger.
  33. private let logger: Logger
  34. private var context: ChannelHandlerContext?
  35. /// The mode of operation: the client tracks additional connection state in the connection
  36. /// manager.
  37. internal enum Mode {
  38. case client(ConnectionManager, HTTP2StreamMultiplexer)
  39. case server
  40. var connectionManager: ConnectionManager? {
  41. switch self {
  42. case let .client(manager, _):
  43. return manager
  44. case .server:
  45. return nil
  46. }
  47. }
  48. }
  49. /// The current state.
  50. private var stateMachine: GRPCIdleHandlerStateMachine
  51. init(
  52. connectionManager: ConnectionManager,
  53. multiplexer: HTTP2StreamMultiplexer,
  54. idleTimeout: TimeAmount,
  55. keepalive configuration: ClientConnectionKeepalive,
  56. logger: Logger
  57. ) {
  58. self.mode = .client(connectionManager, multiplexer)
  59. self.idleTimeout = idleTimeout
  60. self.stateMachine = .init(role: .client, logger: logger)
  61. self.pingHandler = PingHandler(
  62. pingCode: 5,
  63. interval: configuration.interval,
  64. timeout: configuration.timeout,
  65. permitWithoutCalls: configuration.permitWithoutCalls,
  66. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  67. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  68. )
  69. self.logger = logger
  70. }
  71. init(
  72. idleTimeout: TimeAmount,
  73. keepalive configuration: ServerConnectionKeepalive,
  74. logger: Logger
  75. ) {
  76. self.mode = .server
  77. self.stateMachine = .init(role: .server, logger: logger)
  78. self.idleTimeout = idleTimeout
  79. self.pingHandler = PingHandler(
  80. pingCode: 10,
  81. interval: configuration.interval,
  82. timeout: configuration.timeout,
  83. permitWithoutCalls: configuration.permitWithoutCalls,
  84. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  85. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  86. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  87. maximumPingStrikes: configuration.maximumPingStrikes
  88. )
  89. self.logger = logger
  90. }
  91. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  92. guard let context = self.context else {
  93. return
  94. }
  95. let frame = HTTP2Frame(
  96. streamID: .rootStream,
  97. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  98. )
  99. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  100. }
  101. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  102. // Prod the connection manager.
  103. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  104. switch event {
  105. case .idle:
  106. manager.idle()
  107. case .inactive:
  108. manager.channelInactive()
  109. case .ready:
  110. manager.ready()
  111. case .quiescing:
  112. manager.beginQuiescing()
  113. }
  114. }
  115. // Max concurrent streams changed.
  116. if let manager = self.mode.connectionManager,
  117. let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
  118. manager.monitor.maxConcurrentStreamsChanged(maxConcurrentStreams)
  119. }
  120. // Handle idle timeout creation/cancellation.
  121. if let idleTask = operations.idleTask {
  122. switch idleTask {
  123. case let .cancel(task):
  124. task.cancel()
  125. case .schedule:
  126. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  127. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  128. self.idleTimeoutFired()
  129. }
  130. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  131. }
  132. }
  133. }
  134. // Send a GOAWAY frame.
  135. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  136. let goAwayFrame = HTTP2Frame(
  137. streamID: .rootStream,
  138. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  139. )
  140. self.context?.writeAndFlush(self.wrapOutboundOut(goAwayFrame), promise: nil)
  141. }
  142. // Close the channel, if necessary.
  143. if operations.shouldCloseChannel {
  144. self.context?.close(mode: .all, promise: nil)
  145. }
  146. }
  147. private func handlePingAction(_ action: PingHandler.Action) {
  148. switch action {
  149. case .none:
  150. ()
  151. case .cancelScheduledTimeout:
  152. self.scheduledClose?.cancel()
  153. self.scheduledClose = nil
  154. case let .schedulePing(delay, timeout):
  155. self.schedulePing(in: delay, timeout: timeout)
  156. case let .reply(framePayload):
  157. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  158. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  159. }
  160. }
  161. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  162. guard delay != .nanoseconds(.max) else {
  163. return
  164. }
  165. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  166. initialDelay: delay,
  167. delay: delay
  168. ) { _ in
  169. self.handlePingAction(self.pingHandler.pingFired())
  170. // `timeout` is less than `interval`, guaranteeing that the close task
  171. // will be fired before a new ping is triggered.
  172. assert(timeout < delay, "`timeout` must be less than `interval`")
  173. self.scheduleClose(in: timeout)
  174. }
  175. }
  176. private func scheduleClose(in timeout: TimeAmount) {
  177. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  178. self.perform(operations: self.stateMachine.shutdownNow())
  179. }
  180. }
  181. private func idleTimeoutFired() {
  182. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  183. }
  184. func handlerAdded(context: ChannelHandlerContext) {
  185. self.context = context
  186. }
  187. func handlerRemoved(context: ChannelHandlerContext) {
  188. self.context = nil
  189. }
  190. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  191. if let created = event as? NIOHTTP2StreamCreatedEvent {
  192. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  193. self.handlePingAction(self.pingHandler.streamCreated())
  194. context.fireUserInboundEventTriggered(event)
  195. } else if let closed = event as? StreamClosedEvent {
  196. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  197. self.handlePingAction(self.pingHandler.streamClosed())
  198. if let manager = self.mode.connectionManager {
  199. manager.monitor.streamClosed()
  200. }
  201. context.fireUserInboundEventTriggered(event)
  202. } else if event is ChannelShouldQuiesceEvent {
  203. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  204. // Swallow this event.
  205. } else {
  206. context.fireUserInboundEventTriggered(event)
  207. }
  208. }
  209. func errorCaught(context: ChannelHandlerContext, error: Error) {
  210. // No state machine action here.
  211. self.mode.connectionManager?.channelError(error)
  212. context.fireErrorCaught(error)
  213. }
  214. func channelActive(context: ChannelHandlerContext) {
  215. // No state machine action here.
  216. switch self.mode {
  217. case let .client(connectionManager, multiplexer):
  218. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  219. case .server:
  220. ()
  221. }
  222. context.fireChannelActive()
  223. }
  224. func channelInactive(context: ChannelHandlerContext) {
  225. self.perform(operations: self.stateMachine.channelInactive())
  226. self.scheduledPing?.cancel()
  227. self.scheduledClose?.cancel()
  228. self.scheduledPing = nil
  229. self.scheduledClose = nil
  230. context.fireChannelInactive()
  231. }
  232. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  233. let frame = self.unwrapInboundIn(data)
  234. switch frame.payload {
  235. case .goAway:
  236. self.perform(operations: self.stateMachine.receiveGoAway())
  237. case let .settings(.settings(settings)):
  238. self.perform(operations: self.stateMachine.receiveSettings(settings))
  239. case let .ping(data, ack):
  240. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  241. default:
  242. // We're not interested in other events.
  243. ()
  244. }
  245. context.fireChannelRead(data)
  246. }
  247. }
  248. extension HTTP2SettingsParameter {
  249. internal var loggingMetadataKey: String {
  250. switch self {
  251. case .headerTableSize:
  252. return "h2_settings_header_table_size"
  253. case .enablePush:
  254. return "h2_settings_enable_push"
  255. case .maxConcurrentStreams:
  256. return "h2_settings_max_concurrent_streams"
  257. case .initialWindowSize:
  258. return "h2_settings_initial_window_size"
  259. case .maxFrameSize:
  260. return "h2_settings_max_frame_size"
  261. case .maxHeaderListSize:
  262. return "h2_settings_max_header_list_size"
  263. case .enableConnectProtocol:
  264. return "h2_settings_enable_connect_protocol"
  265. default:
  266. return String(describing: self)
  267. }
  268. }
  269. }