GRPCIdleHandler.swift 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. /// The amount of time to wait before closing the channel when there are no active streams.
  22. private let idleTimeout: TimeAmount
  23. /// The number of active streams.
  24. private var activeStreams = 0
  25. /// The scheduled task which will close the channel.
  26. private var scheduledIdle: Scheduled<Void>?
  27. /// Client and server have slightly different behaviours; track which we are following.
  28. private var mode: Mode
  29. /// A logger.
  30. private let logger: Logger
  31. /// The mode of operation: the client tracks additional connection state in the connection
  32. /// manager.
  33. internal enum Mode {
  34. case client(ConnectionManager)
  35. case server
  36. }
  37. /// The current connection state.
  38. private var state: State = .notReady
  39. private enum State {
  40. // We haven't marked the connection as "ready" yet.
  41. case notReady
  42. // The connection has been marked as "ready".
  43. case ready
  44. // We called `close` on the channel.
  45. case closed
  46. }
  47. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount = .minutes(5)) {
  48. self.mode = mode
  49. self.idleTimeout = idleTimeout
  50. self.logger = logger
  51. }
  52. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  53. switch self.state {
  54. case .notReady, .ready:
  55. if let created = event as? NIOHTTP2StreamCreatedEvent {
  56. // We have a stream: don't go idle
  57. self.scheduledIdle?.cancel()
  58. self.scheduledIdle = nil
  59. self.activeStreams += 1
  60. self.logger.debug("HTTP2 stream created", metadata: [
  61. MetadataKey.h2StreamID: "\(created.streamID)",
  62. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  63. ])
  64. } else if let closed = event as? StreamClosedEvent {
  65. self.activeStreams -= 1
  66. self.logger.debug("HTTP2 stream closed", metadata: [
  67. MetadataKey.h2StreamID: "\(closed.streamID)",
  68. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  69. ])
  70. // No active streams: go idle soon.
  71. if self.activeStreams == 0 {
  72. self.scheduleIdleTimeout(context: context)
  73. }
  74. } else if event is ConnectionIdledEvent {
  75. // Force idle (closing) because we received a `ConnectionIdledEvent` from a keepalive handler
  76. self.idle(context: context, force: true)
  77. }
  78. case .closed:
  79. ()
  80. }
  81. context.fireUserInboundEventTriggered(event)
  82. }
  83. func channelActive(context: ChannelHandlerContext) {
  84. switch (self.mode, self.state) {
  85. // The client should become active: we'll only schedule the idling when the channel
  86. // becomes 'ready'.
  87. case let (.client(manager), .notReady):
  88. manager.channelActive(channel: context.channel)
  89. case (.server, .notReady),
  90. (_, .ready),
  91. (_, .closed):
  92. ()
  93. }
  94. context.fireChannelActive()
  95. }
  96. func handlerRemoved(context: ChannelHandlerContext) {
  97. self.scheduledIdle?.cancel()
  98. }
  99. func channelInactive(context: ChannelHandlerContext) {
  100. self.scheduledIdle?.cancel()
  101. self.scheduledIdle = nil
  102. switch (self.mode, self.state) {
  103. case let (.client(manager), .notReady),
  104. let (.client(manager), .ready):
  105. manager.channelInactive()
  106. case (.server, .notReady),
  107. (.server, .ready),
  108. (_, .closed):
  109. ()
  110. }
  111. context.fireChannelInactive()
  112. }
  113. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  114. let frame = self.unwrapInboundIn(data)
  115. if frame.streamID == .rootStream {
  116. switch (self.state, frame.payload) {
  117. // We only care about SETTINGS as long as we are in state `.notReady`.
  118. case let (.notReady, .settings(content)):
  119. self.state = .ready
  120. switch self.mode {
  121. case let .client(manager):
  122. let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
  123. manager.logger.info("gRPC connection ready", metadata: [
  124. MetadataKey.remoteAddress: "\(remoteAddressDescription)",
  125. MetadataKey.eventLoop: "\(context.eventLoop)",
  126. ])
  127. // Let the manager know we're ready.
  128. manager.ready()
  129. case .server:
  130. ()
  131. }
  132. if case let .settings(settings) = content {
  133. self.logger.debug(
  134. "received initial HTTP2 settings",
  135. metadata: Dictionary(settings.map {
  136. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  137. }, uniquingKeysWith: { a, _ in a })
  138. )
  139. }
  140. // Start the idle timeout.
  141. self.scheduleIdleTimeout(context: context)
  142. case (.notReady, .goAway),
  143. (.ready, .goAway):
  144. self.idle(context: context)
  145. default:
  146. ()
  147. }
  148. }
  149. context.fireChannelRead(data)
  150. }
  151. private func scheduleIdleTimeout(context: ChannelHandlerContext) {
  152. guard self.activeStreams == 0 else {
  153. return
  154. }
  155. self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  156. self.idle(context: context)
  157. }
  158. }
  159. private func idle(context: ChannelHandlerContext, force: Bool = false) {
  160. // Don't idle if there are active streams unless we manually request
  161. // example: keepalive handler sends a `ConnectionIdledEvent` event
  162. guard self.activeStreams == 0 || force else {
  163. return
  164. }
  165. switch self.state {
  166. case .notReady, .ready:
  167. self.state = .closed
  168. switch self.mode {
  169. case let .client(manager):
  170. manager.idle()
  171. case .server:
  172. ()
  173. }
  174. self.logger.debug("Closing idle channel")
  175. context.close(mode: .all, promise: nil)
  176. // We need to guard against double closure here. We may go idle as a result of receiving a
  177. // GOAWAY frame or because our scheduled idle timeout fired.
  178. case .closed:
  179. ()
  180. }
  181. }
  182. }
  183. extension HTTP2SettingsParameter {
  184. fileprivate var loggingMetadataKey: String {
  185. switch self {
  186. case .headerTableSize:
  187. return "h2_settings_header_table_size"
  188. case .enablePush:
  189. return "h2_settings_enable_push"
  190. case .maxConcurrentStreams:
  191. return "h2_settings_max_concurrent_streams"
  192. case .initialWindowSize:
  193. return "h2_settings_initial_window_size"
  194. case .maxFrameSize:
  195. return "h2_settings_max_frame_size"
  196. case .maxHeaderListSize:
  197. return "h2_settings_max_header_list_size"
  198. case .enableConnectProtocol:
  199. return "h2_settings_enable_connect_protocol"
  200. default:
  201. return String(describing: self)
  202. }
  203. }
  204. }