GRPCIdleHandler.swift 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. /// The amount of time to wait before closing the channel when there are no active streams.
  22. private let idleTimeout: TimeAmount
  23. /// The number of active streams.
  24. private var activeStreams = 0
  25. /// The maximum number of streams we may create concurrently on this connection.
  26. private var maxConcurrentStreams: Int?
  27. /// The scheduled task which will close the channel.
  28. private var scheduledIdle: Scheduled<Void>?
  29. /// Client and server have slightly different behaviours; track which we are following.
  30. private var mode: Mode
  31. /// A logger.
  32. private let logger: Logger
  33. /// The mode of operation: the client tracks additional connection state in the connection
  34. /// manager.
  35. internal enum Mode {
  36. case client(ConnectionManager)
  37. case server
  38. }
  39. /// The current connection state.
  40. private var state: State = .notReady
  41. private enum State {
  42. // We haven't marked the connection as "ready" yet.
  43. case notReady
  44. // The connection has been marked as "ready".
  45. case ready
  46. // We called `close` on the channel.
  47. case closed
  48. }
  49. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount) {
  50. self.mode = mode
  51. self.idleTimeout = idleTimeout
  52. self.logger = logger
  53. }
  54. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  55. switch self.state {
  56. case .notReady, .ready:
  57. if let created = event as? NIOHTTP2StreamCreatedEvent {
  58. // We have a stream: don't go idle
  59. self.scheduledIdle?.cancel()
  60. self.scheduledIdle = nil
  61. self.activeStreams += 1
  62. self.logger.debug("HTTP2 stream created", metadata: [
  63. MetadataKey.h2StreamID: "\(created.streamID)",
  64. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  65. ])
  66. if self.activeStreams == self.maxConcurrentStreams {
  67. self.logger.warning("HTTP2 max concurrent stream limit reached", metadata: [
  68. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  69. ])
  70. }
  71. } else if let closed = event as? StreamClosedEvent {
  72. let droppingBelowMaxConcurrentStreamLimit = self.maxConcurrentStreams == self.activeStreams
  73. self.activeStreams -= 1
  74. self.logger.debug("HTTP2 stream closed", metadata: [
  75. MetadataKey.h2StreamID: "\(closed.streamID)",
  76. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  77. ])
  78. if droppingBelowMaxConcurrentStreamLimit {
  79. self.logger.notice(
  80. "HTTP2 active stream count fell below max concurrent stream limit",
  81. metadata: [MetadataKey.h2ActiveStreams: "\(self.activeStreams)"]
  82. )
  83. }
  84. // No active streams: go idle soon.
  85. if self.activeStreams == 0 {
  86. self.scheduleIdleTimeout(context: context)
  87. }
  88. } else if event is ConnectionIdledEvent {
  89. // Force idle (closing) because we received a `ConnectionIdledEvent` from a keepalive handler
  90. self.idle(context: context, force: true)
  91. }
  92. case .closed:
  93. ()
  94. }
  95. context.fireUserInboundEventTriggered(event)
  96. }
  97. func errorCaught(context: ChannelHandlerContext, error: Error) {
  98. switch (self.mode, self.state) {
  99. case let (.client(manager), .notReady),
  100. let (.client(manager), .ready):
  101. // We're most likely about to become inactive: let the manager know the reason why.
  102. manager.channelError(error)
  103. case (.client, .closed),
  104. (.server, _):
  105. ()
  106. }
  107. context.fireErrorCaught(error)
  108. }
  109. func channelActive(context: ChannelHandlerContext) {
  110. switch (self.mode, self.state) {
  111. // The client should become active: we'll only schedule the idling when the channel
  112. // becomes 'ready'.
  113. case let (.client(manager), .notReady):
  114. manager.channelActive(channel: context.channel)
  115. case (.server, .notReady),
  116. (_, .ready),
  117. (_, .closed):
  118. ()
  119. }
  120. context.fireChannelActive()
  121. }
  122. func handlerRemoved(context: ChannelHandlerContext) {
  123. self.scheduledIdle?.cancel()
  124. self.scheduledIdle = nil
  125. self.state = .closed
  126. }
  127. func channelInactive(context: ChannelHandlerContext) {
  128. self.scheduledIdle?.cancel()
  129. self.scheduledIdle = nil
  130. switch (self.mode, self.state) {
  131. case let (.client(manager), .notReady):
  132. self.state = .closed
  133. manager.channelInactive()
  134. case let (.client(manager), .ready):
  135. self.state = .closed
  136. if self.activeStreams == 0 {
  137. // We're ready and there are no active streams: we can treat this as the server idling our
  138. // connection.
  139. manager.idle()
  140. } else {
  141. manager.channelInactive()
  142. }
  143. case (.server, .notReady),
  144. (.server, .ready),
  145. (_, .closed):
  146. self.state = .closed
  147. }
  148. context.fireChannelInactive()
  149. }
  150. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  151. let frame = self.unwrapInboundIn(data)
  152. if frame.streamID == .rootStream {
  153. switch frame.payload {
  154. case let .settings(.settings(settings)):
  155. // Log any changes to HTTP/2 settings.
  156. self.logger.debug(
  157. "HTTP2 settings update",
  158. metadata: Dictionary(settings.map {
  159. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  160. }, uniquingKeysWith: { a, _ in a })
  161. )
  162. let maxConcurrentStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })
  163. if let maxConcurrentStreams = maxConcurrentStreams?.value {
  164. self.maxConcurrentStreams = maxConcurrentStreams
  165. }
  166. switch self.state {
  167. case .notReady:
  168. // This must be the initial settings frame, we can move to the ready state now.
  169. self.state = .ready
  170. switch self.mode {
  171. case let .client(manager):
  172. let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
  173. manager.logger.info("gRPC connection ready", metadata: [
  174. MetadataKey.remoteAddress: "\(remoteAddressDescription)",
  175. MetadataKey.eventLoop: "\(context.eventLoop)",
  176. ])
  177. // Let the manager know we're ready.
  178. manager.ready()
  179. case .server:
  180. ()
  181. }
  182. // Start the idle timeout.
  183. self.scheduleIdleTimeout(context: context)
  184. default:
  185. ()
  186. }
  187. case .goAway:
  188. switch self.state {
  189. case .ready, .notReady:
  190. self.idle(context: context)
  191. case .closed:
  192. ()
  193. }
  194. default:
  195. // Ignore all other frame types.
  196. ()
  197. }
  198. }
  199. context.fireChannelRead(data)
  200. }
  201. private func scheduleIdleTimeout(context: ChannelHandlerContext) {
  202. guard self.activeStreams == 0, self.idleTimeout.nanoseconds != .max else {
  203. return
  204. }
  205. self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  206. self.idle(context: context)
  207. }
  208. }
  209. private func idle(context: ChannelHandlerContext, force: Bool = false) {
  210. // Don't idle if there are active streams unless we manually request
  211. // example: keepalive handler sends a `ConnectionIdledEvent` event
  212. guard self.activeStreams == 0 || force else {
  213. return
  214. }
  215. switch self.state {
  216. case .notReady, .ready:
  217. self.state = .closed
  218. switch self.mode {
  219. case let .client(manager):
  220. manager.idle()
  221. case .server:
  222. ()
  223. }
  224. self.logger.debug("Closing idle channel")
  225. context.close(mode: .all, promise: nil)
  226. // We need to guard against double closure here. We may go idle as a result of receiving a
  227. // GOAWAY frame or because our scheduled idle timeout fired.
  228. case .closed:
  229. ()
  230. }
  231. }
  232. }
  233. extension HTTP2SettingsParameter {
  234. fileprivate var loggingMetadataKey: String {
  235. switch self {
  236. case .headerTableSize:
  237. return "h2_settings_header_table_size"
  238. case .enablePush:
  239. return "h2_settings_enable_push"
  240. case .maxConcurrentStreams:
  241. return "h2_settings_max_concurrent_streams"
  242. case .initialWindowSize:
  243. return "h2_settings_initial_window_size"
  244. case .maxFrameSize:
  245. return "h2_settings_max_frame_size"
  246. case .maxHeaderListSize:
  247. return "h2_settings_max_header_list_size"
  248. case .enableConnectProtocol:
  249. return "h2_settings_enable_connect_protocol"
  250. default:
  251. return String(describing: self)
  252. }
  253. }
  254. }