GRPCIdleHandler.swift 7.4 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. /// The amount of time to wait before closing the channel when there are no active streams.
  22. private let idleTimeout: TimeAmount
  23. /// The number of active streams.
  24. private var activeStreams = 0
  25. /// The scheduled task which will close the channel.
  26. private var scheduledIdle: Scheduled<Void>?
  27. /// Client and server have slightly different behaviours; track which we are following.
  28. private var mode: Mode
  29. /// A logger.
  30. private let logger: Logger
  31. /// The mode of operation: the client tracks additional connection state in the connection
  32. /// manager.
  33. internal enum Mode {
  34. case client(ConnectionManager)
  35. case server
  36. }
  37. /// The current connection state.
  38. private var state: State = .notReady
  39. private enum State {
  40. // We haven't marked the connection as "ready" yet.
  41. case notReady
  42. // The connection has been marked as "ready".
  43. case ready
  44. // We called `close` on the channel.
  45. case closed
  46. }
  47. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount) {
  48. self.mode = mode
  49. self.idleTimeout = idleTimeout
  50. self.logger = logger
  51. }
  52. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  53. switch self.state {
  54. case .notReady, .ready:
  55. if let created = event as? NIOHTTP2StreamCreatedEvent {
  56. // We have a stream: don't go idle
  57. self.scheduledIdle?.cancel()
  58. self.scheduledIdle = nil
  59. self.activeStreams += 1
  60. self.logger.debug("HTTP2 stream created", metadata: [
  61. MetadataKey.h2StreamID: "\(created.streamID)",
  62. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  63. ])
  64. } else if let closed = event as? StreamClosedEvent {
  65. self.activeStreams -= 1
  66. self.logger.debug("HTTP2 stream closed", metadata: [
  67. MetadataKey.h2StreamID: "\(closed.streamID)",
  68. MetadataKey.h2ActiveStreams: "\(self.activeStreams)",
  69. ])
  70. // No active streams: go idle soon.
  71. if self.activeStreams == 0 {
  72. self.scheduleIdleTimeout(context: context)
  73. }
  74. } else if event is ConnectionIdledEvent {
  75. // Force idle (closing) because we received a `ConnectionIdledEvent` from a keepalive handler
  76. self.idle(context: context, force: true)
  77. }
  78. case .closed:
  79. ()
  80. }
  81. context.fireUserInboundEventTriggered(event)
  82. }
  83. func channelActive(context: ChannelHandlerContext) {
  84. switch (self.mode, self.state) {
  85. // The client should become active: we'll only schedule the idling when the channel
  86. // becomes 'ready'.
  87. case let (.client(manager), .notReady):
  88. manager.channelActive(channel: context.channel)
  89. case (.server, .notReady),
  90. (_, .ready),
  91. (_, .closed):
  92. ()
  93. }
  94. context.fireChannelActive()
  95. }
  96. func handlerRemoved(context: ChannelHandlerContext) {
  97. self.scheduledIdle?.cancel()
  98. self.scheduledIdle = nil
  99. self.state = .closed
  100. }
  101. func channelInactive(context: ChannelHandlerContext) {
  102. self.scheduledIdle?.cancel()
  103. self.scheduledIdle = nil
  104. switch (self.mode, self.state) {
  105. case let (.client(manager), .notReady):
  106. self.state = .closed
  107. manager.channelInactive()
  108. case let (.client(manager), .ready):
  109. self.state = .closed
  110. if self.activeStreams == 0 {
  111. // We're ready and there are no active streams: we can treat this as the server idling our
  112. // connection.
  113. manager.idle()
  114. } else {
  115. manager.channelInactive()
  116. }
  117. case (.server, .notReady),
  118. (.server, .ready),
  119. (_, .closed):
  120. self.state = .closed
  121. }
  122. context.fireChannelInactive()
  123. }
  124. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  125. let frame = self.unwrapInboundIn(data)
  126. if frame.streamID == .rootStream {
  127. switch (self.state, frame.payload) {
  128. // We only care about SETTINGS as long as we are in state `.notReady`.
  129. case let (.notReady, .settings(content)):
  130. self.state = .ready
  131. switch self.mode {
  132. case let .client(manager):
  133. let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
  134. manager.logger.info("gRPC connection ready", metadata: [
  135. MetadataKey.remoteAddress: "\(remoteAddressDescription)",
  136. MetadataKey.eventLoop: "\(context.eventLoop)",
  137. ])
  138. // Let the manager know we're ready.
  139. manager.ready()
  140. case .server:
  141. ()
  142. }
  143. if case let .settings(settings) = content {
  144. self.logger.debug(
  145. "received initial HTTP2 settings",
  146. metadata: Dictionary(settings.map {
  147. ("\($0.parameter.loggingMetadataKey)", "\($0.value)")
  148. }, uniquingKeysWith: { a, _ in a })
  149. )
  150. }
  151. // Start the idle timeout.
  152. self.scheduleIdleTimeout(context: context)
  153. case (.notReady, .goAway),
  154. (.ready, .goAway):
  155. self.idle(context: context)
  156. default:
  157. ()
  158. }
  159. }
  160. context.fireChannelRead(data)
  161. }
  162. private func scheduleIdleTimeout(context: ChannelHandlerContext) {
  163. guard self.activeStreams == 0, self.idleTimeout.nanoseconds != .max else {
  164. return
  165. }
  166. self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  167. self.idle(context: context)
  168. }
  169. }
  170. private func idle(context: ChannelHandlerContext, force: Bool = false) {
  171. // Don't idle if there are active streams unless we manually request
  172. // example: keepalive handler sends a `ConnectionIdledEvent` event
  173. guard self.activeStreams == 0 || force else {
  174. return
  175. }
  176. switch self.state {
  177. case .notReady, .ready:
  178. self.state = .closed
  179. switch self.mode {
  180. case let .client(manager):
  181. manager.idle()
  182. case .server:
  183. ()
  184. }
  185. self.logger.debug("Closing idle channel")
  186. context.close(mode: .all, promise: nil)
  187. // We need to guard against double closure here. We may go idle as a result of receiving a
  188. // GOAWAY frame or because our scheduled idle timeout fired.
  189. case .closed:
  190. ()
  191. }
  192. }
  193. }
  194. extension HTTP2SettingsParameter {
  195. fileprivate var loggingMetadataKey: String {
  196. switch self {
  197. case .headerTableSize:
  198. return "h2_settings_header_table_size"
  199. case .enablePush:
  200. return "h2_settings_enable_push"
  201. case .maxConcurrentStreams:
  202. return "h2_settings_max_concurrent_streams"
  203. case .initialWindowSize:
  204. return "h2_settings_initial_window_size"
  205. case .maxFrameSize:
  206. return "h2_settings_max_frame_size"
  207. case .maxHeaderListSize:
  208. return "h2_settings_max_header_list_size"
  209. case .enableConnectProtocol:
  210. return "h2_settings_enable_connect_protocol"
  211. default:
  212. return String(describing: self)
  213. }
  214. }
  215. }