GRPCIdleHandler.swift 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. /// The amount of time to wait before closing the channel when there are no active streams.
  22. private let idleTimeout: TimeAmount
  23. /// The number of active streams.
  24. private var activeStreams = 0
  25. /// The maximum number of streams we may create concurrently on this connection.
  26. private var maxConcurrentStreams: Int?
  27. /// The scheduled task which will close the channel.
  28. private var scheduledIdle: Scheduled<Void>?
  29. /// Client and server have slightly different behaviours; track which we are following.
  30. private var mode: Mode
  31. /// A logger.
  32. private let logger: Logger
  33. /// The mode of operation: the client tracks additional connection state in the connection
  34. /// manager.
  35. internal enum Mode {
  36. case client(ConnectionManager)
  37. case server
  38. }
  39. /// The current connection state.
  40. private var state: State = .notReady
  41. private enum State {
  42. // We haven't marked the connection as "ready" yet.
  43. case notReady
  44. // The connection has been marked as "ready".
  45. case ready
  46. // We called `close` on the channel.
  47. case closed
  48. }
  49. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount) {
  50. self.mode = mode
  51. self.idleTimeout = idleTimeout
  52. self.logger = logger
  53. }
  54. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  55. switch self.state {
  56. case .notReady, .ready:
  57. if let created = event as? NIOHTTP2StreamCreatedEvent {
  58. // We have a stream: don't go idle
  59. self.scheduledIdle?.cancel()
  60. self.scheduledIdle = nil
  61. self.activeStreams += 1
  62. self.logger.debug("HTTP2 stream created", metadata: [
  63. MetadataKey.h2StreamID: "\(created.streamID)",
  64. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  65. ])
  66. if self.activeStreams == self.maxConcurrentStreams {
  67. self.logger.warning("HTTP2 max concurrent stream limit reached", metadata: [
  68. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  69. ])
  70. }
  71. } else if let closed = event as? StreamClosedEvent {
  72. let droppingBelowMaxConcurrentStreamLimit = self.maxConcurrentStreams == self.activeStreams
  73. self.activeStreams -= 1
  74. self.logger.debug("HTTP2 stream closed", metadata: [
  75. MetadataKey.h2StreamID: "\(closed.streamID)",
  76. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  77. ])
  78. if droppingBelowMaxConcurrentStreamLimit {
  79. self.logger.notice(
  80. "HTTP2 active stream count fell below max concurrent stream limit",
  81. metadata: [MetadataKey.h2ActiveStreams: "\(self.activeStreams)"]
  82. )
  83. }
  84. // No active streams: go idle soon.
  85. if self.activeStreams == 0 {
  86. self.scheduleIdleTimeout(context: context)
  87. }
  88. } else if event is ConnectionIdledEvent {
  89. // Force idle (closing) because we received a `ConnectionIdledEvent` from a keepalive handler
  90. self.idle(context: context, force: true)
  91. }
  92. case .closed:
  93. ()
  94. }
  95. context.fireUserInboundEventTriggered(event)
  96. }
  97. func channelActive(context: ChannelHandlerContext) {
  98. switch (self.mode, self.state) {
  99. // The client should become active: we'll only schedule the idling when the channel
  100. // becomes 'ready'.
  101. case let (.client(manager), .notReady):
  102. manager.channelActive(channel: context.channel)
  103. case (.server, .notReady),
  104. (_, .ready),
  105. (_, .closed):
  106. ()
  107. }
  108. context.fireChannelActive()
  109. }
  110. func handlerRemoved(context: ChannelHandlerContext) {
  111. self.scheduledIdle?.cancel()
  112. self.scheduledIdle = nil
  113. self.state = .closed
  114. }
  115. func channelInactive(context: ChannelHandlerContext) {
  116. self.scheduledIdle?.cancel()
  117. self.scheduledIdle = nil
  118. switch (self.mode, self.state) {
  119. case let (.client(manager), .notReady):
  120. self.state = .closed
  121. manager.channelInactive()
  122. case let (.client(manager), .ready):
  123. self.state = .closed
  124. if self.activeStreams == 0 {
  125. // We're ready and there are no active streams: we can treat this as the server idling our
  126. // connection.
  127. manager.idle()
  128. } else {
  129. manager.channelInactive()
  130. }
  131. case (.server, .notReady),
  132. (.server, .ready),
  133. (_, .closed):
  134. self.state = .closed
  135. }
  136. context.fireChannelInactive()
  137. }
  138. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  139. let frame = self.unwrapInboundIn(data)
  140. if frame.streamID == .rootStream {
  141. switch frame.payload {
  142. case let .settings(.settings(settings)):
  143. // Log any changes to HTTP/2 settings.
  144. self.logger.debug(
  145. "HTTP2 settings update",
  146. metadata: Dictionary(settings.map {
  147. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  148. }, uniquingKeysWith: { a, _ in a })
  149. )
  150. let maxConcurrentStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })
  151. if let maxConcurrentStreams = maxConcurrentStreams?.value {
  152. self.maxConcurrentStreams = maxConcurrentStreams
  153. }
  154. switch self.state {
  155. case .notReady:
  156. // This must be the initial settings frame, we can move to the ready state now.
  157. self.state = .ready
  158. switch self.mode {
  159. case let .client(manager):
  160. let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
  161. manager.logger.info("gRPC connection ready", metadata: [
  162. MetadataKey.remoteAddress: "\(remoteAddressDescription)",
  163. MetadataKey.eventLoop: "\(context.eventLoop)",
  164. ])
  165. // Let the manager know we're ready.
  166. manager.ready()
  167. case .server:
  168. ()
  169. }
  170. // Start the idle timeout.
  171. self.scheduleIdleTimeout(context: context)
  172. default:
  173. ()
  174. }
  175. case .goAway:
  176. switch self.state {
  177. case .ready, .notReady:
  178. self.idle(context: context)
  179. case .closed:
  180. ()
  181. }
  182. default:
  183. // Ignore all other frame types.
  184. ()
  185. }
  186. }
  187. context.fireChannelRead(data)
  188. }
  189. private func scheduleIdleTimeout(context: ChannelHandlerContext) {
  190. guard self.activeStreams == 0, self.idleTimeout.nanoseconds != .max else {
  191. return
  192. }
  193. self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  194. self.idle(context: context)
  195. }
  196. }
  197. private func idle(context: ChannelHandlerContext, force: Bool = false) {
  198. // Don't idle if there are active streams unless we manually request
  199. // example: keepalive handler sends a `ConnectionIdledEvent` event
  200. guard self.activeStreams == 0 || force else {
  201. return
  202. }
  203. switch self.state {
  204. case .notReady, .ready:
  205. self.state = .closed
  206. switch self.mode {
  207. case let .client(manager):
  208. manager.idle()
  209. case .server:
  210. ()
  211. }
  212. self.logger.debug("Closing idle channel")
  213. context.close(mode: .all, promise: nil)
  214. // We need to guard against double closure here. We may go idle as a result of receiving a
  215. // GOAWAY frame or because our scheduled idle timeout fired.
  216. case .closed:
  217. ()
  218. }
  219. }
  220. }
  221. extension HTTP2SettingsParameter {
  222. fileprivate var loggingMetadataKey: String {
  223. switch self {
  224. case .headerTableSize:
  225. return "h2_settings_header_table_size"
  226. case .enablePush:
  227. return "h2_settings_enable_push"
  228. case .maxConcurrentStreams:
  229. return "h2_settings_max_concurrent_streams"
  230. case .initialWindowSize:
  231. return "h2_settings_initial_window_size"
  232. case .maxFrameSize:
  233. return "h2_settings_max_frame_size"
  234. case .maxHeaderListSize:
  235. return "h2_settings_max_header_list_size"
  236. case .enableConnectProtocol:
  237. return "h2_settings_enable_connect_protocol"
  238. default:
  239. return String(describing: self)
  240. }
  241. }
  242. }