GRPCIdleHandler.swift 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal final class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. typealias OutboundOut = HTTP2Frame
  22. /// The amount of time to wait before closing the channel when there are no active streams.
  23. private let idleTimeout: TimeAmount
  24. /// The ping handler.
  25. private var pingHandler: PingHandler
  26. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  27. private var scheduledClose: Scheduled<Void>?
  28. /// The scheduled task which will ping.
  29. private var scheduledPing: RepeatedTask?
  30. /// The mode we're operating in.
  31. private let mode: Mode
  32. /// A logger.
  33. private let logger: Logger
  34. private var context: ChannelHandlerContext?
  35. /// The mode of operation: the client tracks additional connection state in the connection
  36. /// manager.
  37. internal enum Mode {
  38. case client(ConnectionManager, HTTP2StreamMultiplexer)
  39. case server
  40. var connectionManager: ConnectionManager? {
  41. switch self {
  42. case let .client(manager, _):
  43. return manager
  44. case .server:
  45. return nil
  46. }
  47. }
  48. }
  49. /// The current state.
  50. private var stateMachine: GRPCIdleHandlerStateMachine
  51. init(
  52. connectionManager: ConnectionManager,
  53. multiplexer: HTTP2StreamMultiplexer,
  54. idleTimeout: TimeAmount,
  55. keepalive configuration: ClientConnectionKeepalive,
  56. logger: Logger
  57. ) {
  58. self.mode = .client(connectionManager, multiplexer)
  59. self.idleTimeout = idleTimeout
  60. self.stateMachine = .init(role: .client, logger: logger)
  61. self.pingHandler = PingHandler(
  62. pingCode: 5,
  63. interval: configuration.interval,
  64. timeout: configuration.timeout,
  65. permitWithoutCalls: configuration.permitWithoutCalls,
  66. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  67. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  68. )
  69. self.logger = logger
  70. }
  71. init(
  72. idleTimeout: TimeAmount,
  73. keepalive configuration: ServerConnectionKeepalive,
  74. logger: Logger
  75. ) {
  76. self.mode = .server
  77. self.stateMachine = .init(role: .server, logger: logger)
  78. self.idleTimeout = idleTimeout
  79. self.pingHandler = PingHandler(
  80. pingCode: 10,
  81. interval: configuration.interval,
  82. timeout: configuration.timeout,
  83. permitWithoutCalls: configuration.permitWithoutCalls,
  84. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  85. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  86. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  87. maximumPingStrikes: configuration.maximumPingStrikes
  88. )
  89. self.logger = logger
  90. }
  91. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  92. guard let context = self.context else {
  93. return
  94. }
  95. let frame = HTTP2Frame(
  96. streamID: .rootStream,
  97. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  98. )
  99. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  100. }
  101. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  102. // Prod the connection manager.
  103. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  104. switch event {
  105. case .idle:
  106. manager.idle()
  107. case .inactive:
  108. manager.channelInactive()
  109. case .ready:
  110. manager.ready()
  111. case .quiescing:
  112. manager.beginQuiescing()
  113. }
  114. }
  115. // Handle idle timeout creation/cancellation.
  116. if let idleTask = operations.idleTask {
  117. switch idleTask {
  118. case let .cancel(task):
  119. task.cancel()
  120. case .schedule:
  121. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  122. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  123. self.idleTimeoutFired()
  124. }
  125. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  126. }
  127. }
  128. }
  129. // Send a GOAWAY frame.
  130. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  131. let goAwayFrame = HTTP2Frame(
  132. streamID: .rootStream,
  133. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  134. )
  135. self.context?.writeAndFlush(self.wrapOutboundOut(goAwayFrame), promise: nil)
  136. }
  137. // Close the channel, if necessary.
  138. if operations.shouldCloseChannel {
  139. self.context?.close(mode: .all, promise: nil)
  140. }
  141. }
  142. private func handlePingAction(_ action: PingHandler.Action) {
  143. switch action {
  144. case .none:
  145. ()
  146. case .cancelScheduledTimeout:
  147. self.scheduledClose?.cancel()
  148. self.scheduledClose = nil
  149. case let .schedulePing(delay, timeout):
  150. self.schedulePing(in: delay, timeout: timeout)
  151. case let .reply(framePayload):
  152. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  153. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  154. }
  155. }
  156. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  157. guard delay != .nanoseconds(.max) else {
  158. return
  159. }
  160. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  161. initialDelay: delay,
  162. delay: delay
  163. ) { _ in
  164. self.handlePingAction(self.pingHandler.pingFired())
  165. // `timeout` is less than `interval`, guaranteeing that the close task
  166. // will be fired before a new ping is triggered.
  167. assert(timeout < delay, "`timeout` must be less than `interval`")
  168. self.scheduleClose(in: timeout)
  169. }
  170. }
  171. private func scheduleClose(in timeout: TimeAmount) {
  172. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  173. self.perform(operations: self.stateMachine.shutdownNow())
  174. }
  175. }
  176. private func idleTimeoutFired() {
  177. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  178. }
  179. func handlerAdded(context: ChannelHandlerContext) {
  180. self.context = context
  181. }
  182. func handlerRemoved(context: ChannelHandlerContext) {
  183. self.context = nil
  184. }
  185. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  186. if let created = event as? NIOHTTP2StreamCreatedEvent {
  187. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  188. self.handlePingAction(self.pingHandler.streamCreated())
  189. context.fireUserInboundEventTriggered(event)
  190. } else if let closed = event as? StreamClosedEvent {
  191. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  192. self.handlePingAction(self.pingHandler.streamClosed())
  193. context.fireUserInboundEventTriggered(event)
  194. } else if event is ChannelShouldQuiesceEvent {
  195. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  196. // Swallow this event.
  197. } else {
  198. context.fireUserInboundEventTriggered(event)
  199. }
  200. }
  201. func errorCaught(context: ChannelHandlerContext, error: Error) {
  202. // No state machine action here.
  203. self.mode.connectionManager?.channelError(error)
  204. context.fireErrorCaught(error)
  205. }
  206. func channelActive(context: ChannelHandlerContext) {
  207. // No state machine action here.
  208. switch self.mode {
  209. case let .client(connectionManager, multiplexer):
  210. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  211. case .server:
  212. ()
  213. }
  214. context.fireChannelActive()
  215. }
  216. func channelInactive(context: ChannelHandlerContext) {
  217. self.perform(operations: self.stateMachine.channelInactive())
  218. self.scheduledPing?.cancel()
  219. self.scheduledClose?.cancel()
  220. self.scheduledPing = nil
  221. self.scheduledClose = nil
  222. context.fireChannelInactive()
  223. }
  224. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  225. let frame = self.unwrapInboundIn(data)
  226. switch frame.payload {
  227. case .goAway:
  228. self.perform(operations: self.stateMachine.receiveGoAway())
  229. case let .settings(.settings(settings)):
  230. self.perform(operations: self.stateMachine.receiveSettings(settings))
  231. case let .ping(data, ack):
  232. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  233. default:
  234. // We're not interested in other events.
  235. ()
  236. }
  237. context.fireChannelRead(data)
  238. }
  239. }
  240. extension HTTP2SettingsParameter {
  241. internal var loggingMetadataKey: String {
  242. switch self {
  243. case .headerTableSize:
  244. return "h2_settings_header_table_size"
  245. case .enablePush:
  246. return "h2_settings_enable_push"
  247. case .maxConcurrentStreams:
  248. return "h2_settings_max_concurrent_streams"
  249. case .initialWindowSize:
  250. return "h2_settings_initial_window_size"
  251. case .maxFrameSize:
  252. return "h2_settings_max_frame_size"
  253. case .maxHeaderListSize:
  254. return "h2_settings_max_header_list_size"
  255. case .enableConnectProtocol:
  256. return "h2_settings_enable_connect_protocol"
  257. default:
  258. return String(describing: self)
  259. }
  260. }
  261. }