2
0

GRPCIdleHandler.swift 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHTTP2
  19. internal final class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. typealias OutboundOut = HTTP2Frame
  22. /// The amount of time to wait before closing the channel when there are no active streams.
  23. private let idleTimeout: TimeAmount
  24. /// The ping handler.
  25. private var pingHandler: PingHandler
  26. /// The scheduled task which will close the connection after the keep-alive timeout has expired.
  27. private var scheduledClose: Scheduled<Void>?
  28. /// The scheduled task which will ping.
  29. private var scheduledPing: RepeatedTask?
  30. /// The mode we're operating in.
  31. private let mode: Mode
  32. private var context: ChannelHandlerContext?
  33. /// The mode of operation: the client tracks additional connection state in the connection
  34. /// manager.
  35. internal enum Mode {
  36. case client(ConnectionManager, HTTP2StreamMultiplexer)
  37. case server
  38. var connectionManager: ConnectionManager? {
  39. switch self {
  40. case let .client(manager, _):
  41. return manager
  42. case .server:
  43. return nil
  44. }
  45. }
  46. }
  47. /// The current state.
  48. private var stateMachine: GRPCIdleHandlerStateMachine
  49. init(
  50. connectionManager: ConnectionManager,
  51. multiplexer: HTTP2StreamMultiplexer,
  52. idleTimeout: TimeAmount,
  53. keepalive configuration: ClientConnectionKeepalive,
  54. logger: Logger
  55. ) {
  56. self.mode = .client(connectionManager, multiplexer)
  57. self.idleTimeout = idleTimeout
  58. self.stateMachine = .init(role: .client, logger: logger)
  59. self.pingHandler = PingHandler(
  60. pingCode: 5,
  61. interval: configuration.interval,
  62. timeout: configuration.timeout,
  63. permitWithoutCalls: configuration.permitWithoutCalls,
  64. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  65. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  66. )
  67. }
  68. init(
  69. idleTimeout: TimeAmount,
  70. keepalive configuration: ServerConnectionKeepalive,
  71. logger: Logger
  72. ) {
  73. self.mode = .server
  74. self.stateMachine = .init(role: .server, logger: logger)
  75. self.idleTimeout = idleTimeout
  76. self.pingHandler = PingHandler(
  77. pingCode: 10,
  78. interval: configuration.interval,
  79. timeout: configuration.timeout,
  80. permitWithoutCalls: configuration.permitWithoutCalls,
  81. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  82. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  83. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  84. maximumPingStrikes: configuration.maximumPingStrikes
  85. )
  86. }
  87. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  88. guard let context = self.context else {
  89. return
  90. }
  91. let frame = HTTP2Frame(
  92. streamID: .rootStream,
  93. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  94. )
  95. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  96. }
  97. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  98. // Prod the connection manager.
  99. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  100. switch event {
  101. case .idle:
  102. manager.idle()
  103. case .inactive:
  104. manager.channelInactive()
  105. case .ready:
  106. manager.ready()
  107. case .quiescing:
  108. manager.beginQuiescing()
  109. }
  110. }
  111. // Max concurrent streams changed.
  112. if let manager = self.mode.connectionManager,
  113. let maxConcurrentStreams = operations.maxConcurrentStreamsChange {
  114. manager.maxConcurrentStreamsChanged(maxConcurrentStreams)
  115. }
  116. // Handle idle timeout creation/cancellation.
  117. if let idleTask = operations.idleTask {
  118. switch idleTask {
  119. case let .cancel(task):
  120. task.cancel()
  121. case .schedule:
  122. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  123. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  124. self.idleTimeoutFired()
  125. }
  126. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  127. }
  128. }
  129. }
  130. // Send a GOAWAY frame.
  131. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  132. let goAwayFrame = HTTP2Frame(
  133. streamID: .rootStream,
  134. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  135. )
  136. self.context?.writeAndFlush(self.wrapOutboundOut(goAwayFrame), promise: nil)
  137. }
  138. // Close the channel, if necessary.
  139. if operations.shouldCloseChannel {
  140. self.context?.close(mode: .all, promise: nil)
  141. }
  142. }
  143. private func handlePingAction(_ action: PingHandler.Action) {
  144. switch action {
  145. case .none:
  146. ()
  147. case .cancelScheduledTimeout:
  148. self.scheduledClose?.cancel()
  149. self.scheduledClose = nil
  150. case let .schedulePing(delay, timeout):
  151. self.schedulePing(in: delay, timeout: timeout)
  152. case let .reply(framePayload):
  153. let frame = HTTP2Frame(streamID: .rootStream, payload: framePayload)
  154. self.context?.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  155. }
  156. }
  157. private func schedulePing(in delay: TimeAmount, timeout: TimeAmount) {
  158. guard delay != .nanoseconds(.max) else {
  159. return
  160. }
  161. self.scheduledPing = self.context?.eventLoop.scheduleRepeatedTask(
  162. initialDelay: delay,
  163. delay: delay
  164. ) { _ in
  165. self.handlePingAction(self.pingHandler.pingFired())
  166. // `timeout` is less than `interval`, guaranteeing that the close task
  167. // will be fired before a new ping is triggered.
  168. assert(timeout < delay, "`timeout` must be less than `interval`")
  169. self.scheduleClose(in: timeout)
  170. }
  171. }
  172. private func scheduleClose(in timeout: TimeAmount) {
  173. self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) {
  174. self.perform(operations: self.stateMachine.shutdownNow())
  175. }
  176. }
  177. private func idleTimeoutFired() {
  178. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  179. }
  180. func handlerAdded(context: ChannelHandlerContext) {
  181. self.context = context
  182. }
  183. func handlerRemoved(context: ChannelHandlerContext) {
  184. self.context = nil
  185. }
  186. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  187. if let created = event as? NIOHTTP2StreamCreatedEvent {
  188. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  189. self.handlePingAction(self.pingHandler.streamCreated())
  190. context.fireUserInboundEventTriggered(event)
  191. } else if let closed = event as? StreamClosedEvent {
  192. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  193. self.handlePingAction(self.pingHandler.streamClosed())
  194. self.mode.connectionManager?.streamClosed()
  195. context.fireUserInboundEventTriggered(event)
  196. } else if event is ChannelShouldQuiesceEvent {
  197. self.perform(operations: self.stateMachine.initiateGracefulShutdown())
  198. // Swallow this event.
  199. } else {
  200. context.fireUserInboundEventTriggered(event)
  201. }
  202. }
  203. func errorCaught(context: ChannelHandlerContext, error: Error) {
  204. // No state machine action here.
  205. self.mode.connectionManager?.channelError(error)
  206. context.fireErrorCaught(error)
  207. }
  208. func channelActive(context: ChannelHandlerContext) {
  209. self.stateMachine.logger.addIPAddressMetadata(
  210. local: context.localAddress,
  211. remote: context.remoteAddress
  212. )
  213. // No state machine action here.
  214. switch self.mode {
  215. case let .client(connectionManager, multiplexer):
  216. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  217. case .server:
  218. ()
  219. }
  220. context.fireChannelActive()
  221. }
  222. func channelInactive(context: ChannelHandlerContext) {
  223. self.perform(operations: self.stateMachine.channelInactive())
  224. self.scheduledPing?.cancel()
  225. self.scheduledClose?.cancel()
  226. self.scheduledPing = nil
  227. self.scheduledClose = nil
  228. context.fireChannelInactive()
  229. }
  230. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  231. let frame = self.unwrapInboundIn(data)
  232. switch frame.payload {
  233. case .goAway:
  234. self.perform(operations: self.stateMachine.receiveGoAway())
  235. case let .settings(.settings(settings)):
  236. self.perform(operations: self.stateMachine.receiveSettings(settings))
  237. case let .ping(data, ack):
  238. self.handlePingAction(self.pingHandler.read(pingData: data, ack: ack))
  239. default:
  240. // We're not interested in other events.
  241. ()
  242. }
  243. context.fireChannelRead(data)
  244. }
  245. }
  246. extension HTTP2SettingsParameter {
  247. internal var loggingMetadataKey: String {
  248. switch self {
  249. case .headerTableSize:
  250. return "h2_settings_header_table_size"
  251. case .enablePush:
  252. return "h2_settings_enable_push"
  253. case .maxConcurrentStreams:
  254. return "h2_settings_max_concurrent_streams"
  255. case .initialWindowSize:
  256. return "h2_settings_initial_window_size"
  257. case .maxFrameSize:
  258. return "h2_settings_max_frame_size"
  259. case .maxHeaderListSize:
  260. return "h2_settings_max_header_list_size"
  261. case .enableConnectProtocol:
  262. return "h2_settings_enable_connect_protocol"
  263. default:
  264. return String(describing: self)
  265. }
  266. }
  267. }