GRPCIdleHandler.swift 7.1 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. /// The amount of time to wait before closing the channel when there are no active streams.
  22. private let idleTimeout: TimeAmount
  23. /// The number of active streams.
  24. private var activeStreams = 0
  25. /// The scheduled task which will close the channel.
  26. private var scheduledIdle: Scheduled<Void>?
  27. /// Client and server have slightly different behaviours; track which we are following.
  28. private var mode: Mode
  29. /// A logger.
  30. private let logger: Logger
  31. /// The mode of operation: the client tracks additional connection state in the connection
  32. /// manager.
  33. internal enum Mode {
  34. case client(ConnectionManager)
  35. case server
  36. }
  37. /// The current connection state.
  38. private var state: State = .notReady
  39. private enum State {
  40. // We haven't marked the connection as "ready" yet.
  41. case notReady
  42. // The connection has been marked as "ready".
  43. case ready
  44. // We called `close` on the channel.
  45. case closed
  46. }
  47. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount = .minutes(5)) {
  48. self.mode = mode
  49. self.idleTimeout = idleTimeout
  50. self.logger = logger
  51. }
  52. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  53. switch self.state {
  54. case .notReady, .ready:
  55. if let created = event as? NIOHTTP2StreamCreatedEvent {
  56. // We have a stream: don't go idle
  57. self.scheduledIdle?.cancel()
  58. self.scheduledIdle = nil
  59. self.activeStreams += 1
  60. self.logger.debug("HTTP2 stream created", metadata: [
  61. MetadataKey.h2StreamID: "\(created.streamID)",
  62. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  63. ])
  64. } else if let closed = event as? StreamClosedEvent {
  65. self.activeStreams -= 1
  66. self.logger.debug("HTTP2 stream closed", metadata: [
  67. MetadataKey.h2StreamID: "\(closed.streamID)",
  68. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  69. ])
  70. // No active streams: go idle soon.
  71. if self.activeStreams == 0 {
  72. self.scheduleIdleTimeout(context: context)
  73. }
  74. } else if event is ConnectionIdledEvent {
  75. // Force idle (closing) because we received a `ConnectionIdledEvent` from a keepalive handler
  76. self.idle(context: context, force: true)
  77. }
  78. case .closed:
  79. ()
  80. }
  81. context.fireUserInboundEventTriggered(event)
  82. }
  83. func channelActive(context: ChannelHandlerContext) {
  84. switch (self.mode, self.state) {
  85. // The client should become active: we'll only schedule the idling when the channel
  86. // becomes 'ready'.
  87. case let (.client(manager), .notReady):
  88. manager.channelActive(channel: context.channel)
  89. case (.server, .notReady),
  90. (_, .ready),
  91. (_, .closed):
  92. ()
  93. }
  94. context.fireChannelActive()
  95. }
  96. func handlerRemoved(context: ChannelHandlerContext) {
  97. self.scheduledIdle?.cancel()
  98. self.scheduledIdle = nil
  99. self.state = .closed
  100. }
  101. func channelInactive(context: ChannelHandlerContext) {
  102. self.scheduledIdle?.cancel()
  103. self.scheduledIdle = nil
  104. switch (self.mode, self.state) {
  105. case let (.client(manager), .notReady),
  106. let (.client(manager), .ready):
  107. self.state = .closed
  108. manager.channelInactive()
  109. case (.server, .notReady),
  110. (.server, .ready),
  111. (_, .closed):
  112. self.state = .closed
  113. }
  114. context.fireChannelInactive()
  115. }
  116. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  117. let frame = self.unwrapInboundIn(data)
  118. if frame.streamID == .rootStream {
  119. switch (self.state, frame.payload) {
  120. // We only care about SETTINGS as long as we are in state `.notReady`.
  121. case let (.notReady, .settings(content)):
  122. self.state = .ready
  123. switch self.mode {
  124. case let .client(manager):
  125. let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
  126. manager.logger.info("gRPC connection ready", metadata: [
  127. MetadataKey.remoteAddress: "\(remoteAddressDescription)",
  128. MetadataKey.eventLoop: "\(context.eventLoop)",
  129. ])
  130. // Let the manager know we're ready.
  131. manager.ready()
  132. case .server:
  133. ()
  134. }
  135. if case let .settings(settings) = content {
  136. self.logger.debug(
  137. "received initial HTTP2 settings",
  138. metadata: Dictionary(settings.map {
  139. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  140. }, uniquingKeysWith: { a, _ in a })
  141. )
  142. }
  143. // Start the idle timeout.
  144. self.scheduleIdleTimeout(context: context)
  145. case (.notReady, .goAway),
  146. (.ready, .goAway):
  147. self.idle(context: context)
  148. default:
  149. ()
  150. }
  151. }
  152. context.fireChannelRead(data)
  153. }
  154. private func scheduleIdleTimeout(context: ChannelHandlerContext) {
  155. guard self.activeStreams == 0 else {
  156. return
  157. }
  158. self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  159. self.idle(context: context)
  160. }
  161. }
  162. private func idle(context: ChannelHandlerContext, force: Bool = false) {
  163. // Don't idle if there are active streams unless we manually request
  164. // example: keepalive handler sends a `ConnectionIdledEvent` event
  165. guard self.activeStreams == 0 || force else {
  166. return
  167. }
  168. switch self.state {
  169. case .notReady, .ready:
  170. self.state = .closed
  171. switch self.mode {
  172. case let .client(manager):
  173. manager.idle()
  174. case .server:
  175. ()
  176. }
  177. self.logger.debug("Closing idle channel")
  178. context.close(mode: .all, promise: nil)
  179. // We need to guard against double closure here. We may go idle as a result of receiving a
  180. // GOAWAY frame or because our scheduled idle timeout fired.
  181. case .closed:
  182. ()
  183. }
  184. }
  185. }
  186. extension HTTP2SettingsParameter {
  187. fileprivate var loggingMetadataKey: String {
  188. switch self {
  189. case .headerTableSize:
  190. return "h2_settings_header_table_size"
  191. case .enablePush:
  192. return "h2_settings_enable_push"
  193. case .maxConcurrentStreams:
  194. return "h2_settings_max_concurrent_streams"
  195. case .initialWindowSize:
  196. return "h2_settings_initial_window_size"
  197. case .maxFrameSize:
  198. return "h2_settings_max_frame_size"
  199. case .maxHeaderListSize:
  200. return "h2_settings_max_header_list_size"
  201. case .enableConnectProtocol:
  202. return "h2_settings_enable_connect_protocol"
  203. default:
  204. return String(describing: self)
  205. }
  206. }
  207. }