GRPCIdleHandler.swift 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. internal final class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. typealias OutboundOut = HTTP2Frame
  22. /// The amount of time to wait before closing the channel when there are no active streams.
  23. private let idleTimeout: TimeAmount
  24. /// The ping handler.
  25. private var pingHandler: PingHandler
  26. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  27. private var scheduledClose: Scheduled<Void>?
  28. /// The scheduled task which will ping.
  29. private var scheduledPing: RepeatedTask?
  30. /// The mode we're operating in.
  31. private let mode: Mode
  32. private var context: ChannelHandlerContext?
  33. /// The mode of operation: the client tracks additional connection state in the connection
  34. /// manager.
  35. internal enum Mode {
  36. case client(ConnectionManager, HTTP2StreamMultiplexer)
  37. case server
  38. var connectionManager: ConnectionManager? {
  39. switch self {
  40. case let .client(manager, _):
  41. return manager
  42. case .server:
  43. return nil
  44. }
  45. }
  46. }
  47. /// The current state.
  48. private var stateMachine: GRPCIdleHandlerStateMachine
  49. init(
  50. connectionManager: ConnectionManager,
  51. multiplexer: HTTP2StreamMultiplexer,
  52. idleTimeout: TimeAmount,
  53. keepalive configuration: ClientConnectionKeepalive,
  54. logger: Logger
  55. ) {
  56. self.mode = .client(connectionManager, multiplexer)
  57. self.idleTimeout = idleTimeout
  58. self.stateMachine = .init(role: .client, logger: logger)
  59. self.pingHandler = PingHandler(
  60. pingCode: 5,
  61. interval: configuration.interval,
  62. timeout: configuration.timeout,
  63. permitWithoutCalls: configuration.permitWithoutCalls,
  64. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  65. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  66. )
  67. }
  68. init(
  69. idleTimeout: TimeAmount,
  70. keepalive configuration: ServerConnectionKeepalive,
  71. logger: Logger
  72. ) {
  73. self.mode = .server
  74. self.stateMachine = .init(role: .server, logger: logger)
  75. self.idleTimeout = idleTimeout
  76. self.pingHandler = PingHandler(
  77. pingCode: 10,
  78. interval: configuration.interval,
  79. timeout: configuration.timeout,
  80. permitWithoutCalls: configuration.permitWithoutCalls,
  81. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  82. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  83. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  84. maximumPingStrikes: configuration.maximumPingStrikes
  85. )
  86. }
  87. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  88. guard let context = self.context else {
  89. return
  90. }
  91. let frame = HTTP2Frame(
  92. streamID: .rootStream,
  93. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  94. )
  95. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  96. }
  97. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  98. // Prod the connection manager.
  99. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  100. switch event {
  101. case .idle:
  102. manager.idle()
  103. case .inactive:
  104. manager.channelInactive()
  105. case .ready:
  106. manager.ready()
  107. case .quiescing:
  108. manager.beginQuiescing()
  109. }
  110. }
  111. // Max concurrent streams changed.
  112. if let manager = self.mode.connectionManager,
  113. let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
  114. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  115. }
  116. // Handle idle timeout creation/cancellation.
  117. if let idleTask = operations.idleTask {
  118. switch idleTask {
  119. case let .cancel(task):
  120. task.cancel()
  121. case .schedule:
  122. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  123. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  124. self.idleTimeoutFired()
  125. }
  126. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  127. }
  128. }
  129. }
  130. // Send a GOAWAY frame.
  131. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  132. let goAwayFrame = HTTP2Frame(
  133. streamID: .rootStream,
  134. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  135. )
  136. self.context?.writeAndFlush(self.wrapOutboundOut(goAwayFrame), promise: nil)
  137. }
  138. // Close the channel, if necessary.
  139. if operations.shouldCloseChannel, let context = self.context {
  140. // Close on the next event-loop tick so we don't drop any events which are
  141. // currently being processed.
  142. context.eventLoop.execute {
  143. context.close(mode: .all, promise: nil)
  144. }
  145. }
  146. }
  147. private func handlePingAction(_ action: PingHandler.Action) {
  148. switch action {
  149. case .none:
  150. ()
  151. case .cancelScheduledTimeout:
  152. self.scheduledClose?.cancel()
  153. self.scheduledClose = nil
  154. case let .schedulePing(delay, timeout):
  155. self.schedulePing(in: delay, timeout: timeout)
  156. case let .reply(framePayload):
  157. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  158. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  159. }
  160. }
  161. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  162. guard delay != .nanoseconds(.max) else {
  163. return
  164. }
  165. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  166. initialDelay: delay,
  167. delay: delay
  168. ) { _ in
  169. self.handlePingAction(self.pingHandler.pingFired())
  170. // `timeout` is less than `interval`, guaranteeing that the close task
  171. // will be fired before a new ping is triggered.
  172. assert(timeout < delay, "`timeout` must be less than `interval`")
  173. self.scheduleClose(in: timeout)
  174. }
  175. }
  176. private func scheduleClose(in timeout: TimeAmount) {
  177. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  178. self.perform(operations: self.stateMachine.shutdownNow())
  179. }
  180. }
  181. private func idleTimeoutFired() {
  182. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  183. }
  184. func handlerAdded(context: ChannelHandlerContext) {
  185. self.context = context
  186. }
  187. func handlerRemoved(context: ChannelHandlerContext) {
  188. self.context = nil
  189. }
  190. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  191. if let created = event as? NIOHTTP2StreamCreatedEvent {
  192. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  193. self.handlePingAction(self.pingHandler.streamCreated())
  194. context.fireUserInboundEventTriggered(event)
  195. } else if let closed = event as? StreamClosedEvent {
  196. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  197. self.handlePingAction(self.pingHandler.streamClosed())
  198. self.mode.connectionManager?.streamClosed()
  199. context.fireUserInboundEventTriggered(event)
  200. } else if event is ChannelShouldQuiesceEvent {
  201. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  202. // Swallow this event.
  203. } else {
  204. context.fireUserInboundEventTriggered(event)
  205. }
  206. }
  207. func errorCaught(context: ChannelHandlerContext, error: Error) {
  208. // No state machine action here.
  209. self.mode.connectionManager?.channelError(error)
  210. context.fireErrorCaught(error)
  211. }
  212. func channelActive(context: ChannelHandlerContext) {
  213. self.stateMachine.logger.addIPAddressMetadata(
  214. local: context.localAddress,
  215. remote: context.remoteAddress
  216. )
  217. // No state machine action here.
  218. switch self.mode {
  219. case let .client(connectionManager, multiplexer):
  220. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  221. case .server:
  222. ()
  223. }
  224. context.fireChannelActive()
  225. }
  226. func channelInactive(context: ChannelHandlerContext) {
  227. self.perform(operations: self.stateMachine.channelInactive())
  228. self.scheduledPing?.cancel()
  229. self.scheduledClose?.cancel()
  230. self.scheduledPing = nil
  231. self.scheduledClose = nil
  232. context.fireChannelInactive()
  233. }
  234. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  235. let frame = self.unwrapInboundIn(data)
  236. switch frame.payload {
  237. case .goAway:
  238. self.perform(operations: self.stateMachine.receiveGoAway())
  239. case let .settings(.settings(settings)):
  240. self.perform(operations: self.stateMachine.receiveSettings(settings))
  241. case let .ping(data, ack):
  242. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  243. default:
  244. // We're not interested in other events.
  245. ()
  246. }
  247. context.fireChannelRead(data)
  248. }
  249. }
  250. extension HTTP2SettingsParameter {
  251. internal var loggingMetadataKey: String {
  252. switch self {
  253. case .headerTableSize:
  254. return "h2_settings_header_table_size"
  255. case .enablePush:
  256. return "h2_settings_enable_push"
  257. case .maxConcurrentStreams:
  258. return "h2_settings_max_concurrent_streams"
  259. case .initialWindowSize:
  260. return "h2_settings_initial_window_size"
  261. case .maxFrameSize:
  262. return "h2_settings_max_frame_size"
  263. case .maxHeaderListSize:
  264. return "h2_settings_max_header_list_size"
  265. case .enableConnectProtocol:
  266. return "h2_settings_enable_connect_protocol"
  267. default:
  268. return String(describing: self)
  269. }
  270. }
  271. }