GRPCIdleHandler.swift 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. internal final class GRPCIdleHandler: ChannelInboundHandler {
  20. typealias InboundIn = HTTP2Frame
  21. typealias OutboundOut = HTTP2Frame
  22. /// The amount of time to wait before closing the channel when there are no active streams.
  23. private let idleTimeout: TimeAmount
  24. /// The mode we're operating in.
  25. private let mode: Mode
  26. /// A logger.
  27. private let logger: Logger
  28. private var context: ChannelHandlerContext?
  29. /// The mode of operation: the client tracks additional connection state in the connection
  30. /// manager.
  31. internal enum Mode {
  32. case client(ConnectionManager, HTTP2StreamMultiplexer)
  33. case server
  34. var connectionManager: ConnectionManager? {
  35. switch self {
  36. case let .client(manager, _):
  37. return manager
  38. case .server:
  39. return nil
  40. }
  41. }
  42. }
  43. /// The current state.
  44. private var stateMachine: GRPCIdleHandlerStateMachine
  45. init(mode: Mode, logger: Logger, idleTimeout: TimeAmount) {
  46. self.mode = mode
  47. self.idleTimeout = idleTimeout
  48. self.logger = logger
  49. switch mode {
  50. case .client:
  51. self.stateMachine = .init(role: .client, logger: logger)
  52. case .server:
  53. self.stateMachine = .init(role: .server, logger: logger)
  54. }
  55. }
  56. private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
  57. guard let context = self.context else {
  58. return
  59. }
  60. let frame = HTTP2Frame(
  61. streamID: .rootStream,
  62. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  63. )
  64. context.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
  65. }
  66. private func perform(operations: GRPCIdleHandlerStateMachine.Operations) {
  67. // Prod the connection manager.
  68. if let event = operations.connectionManagerEvent, let manager = self.mode.connectionManager {
  69. switch event {
  70. case .idle:
  71. manager.idle()
  72. case .inactive:
  73. manager.channelInactive()
  74. case .ready:
  75. manager.ready()
  76. }
  77. }
  78. // Handle idle timeout creation/cancellation.
  79. if let idleTask = operations.idleTask {
  80. switch idleTask {
  81. case let .cancel(task):
  82. task.cancel()
  83. case .schedule:
  84. if self.idleTimeout != .nanoseconds(.max), let context = self.context {
  85. let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
  86. self.idleTimeoutFired()
  87. }
  88. self.perform(operations: self.stateMachine.scheduledIdleTimeoutTask(task))
  89. }
  90. }
  91. }
  92. // Send a GOAWAY frame.
  93. if let streamID = operations.sendGoAwayWithLastPeerInitiatedStreamID {
  94. let goAwayFrame = HTTP2Frame(
  95. streamID: .rootStream,
  96. payload: .goAway(lastStreamID: streamID, errorCode: .noError, opaqueData: nil)
  97. )
  98. self.context?.writeAndFlush(self.wrapOutboundOut(goAwayFrame), promise: nil)
  99. }
  100. // Close the channel, if necessary.
  101. if operations.shouldCloseChannel {
  102. self.context?.close(mode: .all, promise: nil)
  103. }
  104. }
  105. private func idleTimeoutFired() {
  106. self.perform(operations: self.stateMachine.idleTimeoutTaskFired())
  107. }
  108. func handlerAdded(context: ChannelHandlerContext) {
  109. self.context = context
  110. }
  111. func handlerRemoved(context: ChannelHandlerContext) {
  112. self.context = nil
  113. }
  114. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  115. if let created = event as? NIOHTTP2StreamCreatedEvent {
  116. self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID))
  117. context.fireUserInboundEventTriggered(event)
  118. } else if let closed = event as? StreamClosedEvent {
  119. self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID))
  120. context.fireUserInboundEventTriggered(event)
  121. } else if event is ConnectionIdledEvent {
  122. self.perform(operations: self.stateMachine.shutdownNow())
  123. // Swallow this event.
  124. } else {
  125. context.fireUserInboundEventTriggered(event)
  126. }
  127. }
  128. func errorCaught(context: ChannelHandlerContext, error: Error) {
  129. // No state machine action here.
  130. self.mode.connectionManager?.channelError(error)
  131. context.fireErrorCaught(error)
  132. }
  133. func channelActive(context: ChannelHandlerContext) {
  134. // No state machine action here.
  135. switch self.mode {
  136. case let .client(connectionManager, multiplexer):
  137. connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer)
  138. case .server:
  139. ()
  140. }
  141. context.fireChannelActive()
  142. }
  143. func channelInactive(context: ChannelHandlerContext) {
  144. self.perform(operations: self.stateMachine.channelInactive())
  145. context.fireChannelInactive()
  146. }
  147. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  148. let frame = self.unwrapInboundIn(data)
  149. switch frame.payload {
  150. case .goAway:
  151. self.perform(operations: self.stateMachine.receiveGoAway())
  152. case let .settings(.settings(settings)):
  153. self.perform(operations: self.stateMachine.receiveSettings(settings))
  154. default:
  155. // We're not interested in other events.
  156. ()
  157. }
  158. context.fireChannelRead(data)
  159. }
  160. }
  161. extension HTTP2SettingsParameter {
  162. internal var loggingMetadataKey: String {
  163. switch self {
  164. case .headerTableSize:
  165. return "h2_settings_header_table_size"
  166. case .enablePush:
  167. return "h2_settings_enable_push"
  168. case .maxConcurrentStreams:
  169. return "h2_settings_max_concurrent_streams"
  170. case .initialWindowSize:
  171. return "h2_settings_initial_window_size"
  172. case .maxFrameSize:
  173. return "h2_settings_max_frame_size"
  174. case .maxHeaderListSize:
  175. return "h2_settings_max_header_list_size"
  176. case .enableConnectProtocol:
  177. return "h2_settings_enable_connect_protocol"
  178. default:
  179. return String(describing: self)
  180. }
  181. }
  182. }