GRPCKeepaliveHandlers.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. /// Provides keepalive pings.
  19. ///
  20. /// The logic is determined by the gRPC keepalive
  21. /// [documentation] (https://github.com/grpc/grpc/blob/master/doc/keepalive.md).
  22. internal class GRPCClientKeepaliveHandler: ChannelInboundHandler, _ChannelKeepaliveHandler {
  23. typealias InboundIn = HTTP2Frame
  24. typealias OutboundOut = HTTP2Frame
  25. init(configuration: ClientConnectionKeepalive) {
  26. self.pingHandler = PingHandler(
  27. pingCode: 5,
  28. interval: configuration.interval,
  29. timeout: configuration.timeout,
  30. permitWithoutCalls: configuration.permitWithoutCalls,
  31. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  32. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
  33. )
  34. }
  35. /// The ping handler.
  36. var pingHandler: PingHandler
  37. /// The scheduled task which will ping.
  38. var scheduledPing: RepeatedTask?
  39. /// The scheduled task which will close the connection.
  40. var scheduledClose: Scheduled<Void>?
  41. }
  42. internal class GRPCServerKeepaliveHandler: ChannelInboundHandler, _ChannelKeepaliveHandler {
  43. typealias InboundIn = HTTP2Frame
  44. typealias OutboundOut = HTTP2Frame
  45. init(configuration: ServerConnectionKeepalive) {
  46. self.pingHandler = PingHandler(
  47. pingCode: 10,
  48. interval: configuration.interval,
  49. timeout: configuration.timeout,
  50. permitWithoutCalls: configuration.permitWithoutCalls,
  51. maximumPingsWithoutData: configuration.maximumPingsWithoutData,
  52. minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData,
  53. minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
  54. maximumPingStrikes: configuration.maximumPingStrikes
  55. )
  56. }
  57. /// The ping handler.
  58. var pingHandler: PingHandler
  59. /// The scheduled task which will ping.
  60. var scheduledPing: RepeatedTask?
  61. /// The scheduled task which will close the connection.
  62. var scheduledClose: Scheduled<Void>?
  63. }
  64. protocol _ChannelKeepaliveHandler: ChannelInboundHandler where OutboundOut == HTTP2Frame,
  65. InboundIn == HTTP2Frame {
  66. var pingHandler: PingHandler { get set }
  67. var scheduledPing: RepeatedTask? { get set }
  68. var scheduledClose: Scheduled<Void>? { get set }
  69. }
  70. extension _ChannelKeepaliveHandler {
  71. func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  72. if event is NIOHTTP2StreamCreatedEvent {
  73. self.perform(action: self.pingHandler.streamCreated(), context: context)
  74. } else if event is StreamClosedEvent {
  75. self.perform(action: self.pingHandler.streamClosed(), context: context)
  76. }
  77. context.fireUserInboundEventTriggered(event)
  78. }
  79. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  80. switch self.unwrapInboundIn(data).payload {
  81. case let .ping(pingData, ack: ack):
  82. self.perform(action: self.pingHandler.read(pingData: pingData, ack: ack), context: context)
  83. default:
  84. break
  85. }
  86. context.fireChannelRead(data)
  87. }
  88. func channelInactive(context: ChannelHandlerContext) {
  89. self.cancelScheduledPing()
  90. self.cancelScheduledTimeout()
  91. context.fireChannelInactive()
  92. }
  93. func handlerRemoved(context: ChannelHandlerContext) {
  94. self.cancelScheduledPing()
  95. self.cancelScheduledTimeout()
  96. }
  97. private func perform(action: PingHandler.Action, context: ChannelHandlerContext) {
  98. switch action {
  99. case let .schedulePing(delay, timeout):
  100. self.schedulePing(delay: delay, timeout: timeout, context: context)
  101. case .cancelScheduledTimeout:
  102. self.cancelScheduledTimeout()
  103. case let .reply(payload):
  104. self.send(payload: payload, context: context)
  105. case .none:
  106. break
  107. }
  108. }
  109. private func send(payload: HTTP2Frame.FramePayload, context: ChannelHandlerContext) {
  110. let frame = self.wrapOutboundOut(.init(streamID: .rootStream, payload: payload))
  111. context.writeAndFlush(frame, promise: nil)
  112. }
  113. private func schedulePing(delay: TimeAmount, timeout: TimeAmount,
  114. context: ChannelHandlerContext) {
  115. guard delay != .nanoseconds(Int64.max) else { return }
  116. self.scheduledPing = context.eventLoop
  117. .scheduleRepeatedTask(initialDelay: delay, delay: delay) { _ in
  118. self.perform(action: self.pingHandler.pingFired(), context: context)
  119. // `timeout` is less than `interval`, guaranteeing that the close task
  120. // will be fired before a new ping is triggered.
  121. assert(timeout < delay, "`timeout` must be less than `interval`")
  122. self.scheduleClose(timeout: timeout, context: context)
  123. }
  124. }
  125. private func scheduleClose(timeout: TimeAmount, context: ChannelHandlerContext) {
  126. self.scheduledClose = context.eventLoop.scheduleTask(in: timeout) {
  127. context.fireUserInboundEventTriggered(ConnectionIdledEvent())
  128. }
  129. }
  130. private func cancelScheduledPing() {
  131. self.scheduledPing?.cancel()
  132. self.scheduledPing = nil
  133. }
  134. private func cancelScheduledTimeout() {
  135. self.scheduledClose?.cancel()
  136. self.scheduledClose = nil
  137. }
  138. }
  139. struct PingHandler {
  140. /// Code for ping
  141. private let pingCode: UInt64
  142. /// The amount of time to wait before sending a keepalive ping.
  143. private let interval: TimeAmount
  144. /// The amount of time to wait for an acknowledgment.
  145. /// If it does not receive an acknowledgment within this time, it will close the connection
  146. private let timeout: TimeAmount
  147. /// Send keepalive pings even if there are no calls in flight.
  148. private let permitWithoutCalls: Bool
  149. /// Maximum number of pings that can be sent when there is no data/header frame to be sent.
  150. private let maximumPingsWithoutData: UInt
  151. /// If there are no data/header frames being received:
  152. /// The minimum amount of time to wait between successive pings.
  153. private let minimumSentPingIntervalWithoutData: TimeAmount
  154. /// If there are no data/header frames being sent:
  155. /// The minimum amount of time expected between receiving successive pings.
  156. /// If the time between successive pings is less than this value, then the ping will be considered a bad ping from the peer.
  157. /// Such a ping counts as a "ping strike".
  158. /// Ping strikes are only applicable to server handler
  159. private let minimumReceivedPingIntervalWithoutData: TimeAmount?
  160. /// Maximum number of bad pings that the server will tolerate before sending an HTTP2 GOAWAY frame and closing the connection.
  161. /// Setting it to `0` allows the server to accept any number of bad pings.
  162. /// Ping strikes are only applicable to server handler
  163. private let maximumPingStrikes: UInt?
  164. /// When the handler started pinging
  165. private var startedAt: NIODeadline?
  166. /// When the last ping was received
  167. private var lastReceivedPingDate: NIODeadline?
  168. /// When the last ping was sent
  169. private var lastSentPingDate: NIODeadline?
  170. /// The number of pings sent on the transport without any data
  171. private var sentPingsWithoutData = 0
  172. /// Number of strikes
  173. private var pingStrikes: UInt = 0
  174. /// The scheduled task which will close the connection.
  175. private var scheduledClose: Scheduled<Void>?
  176. /// Number of active streams
  177. private var activeStreams = 0 {
  178. didSet {
  179. if self.activeStreams > 0 {
  180. self.sentPingsWithoutData = 0
  181. }
  182. }
  183. }
  184. private static let goAwayFrame = HTTP2Frame.FramePayload.goAway(
  185. lastStreamID: .rootStream,
  186. errorCode: .enhanceYourCalm, opaqueData: nil
  187. )
  188. // For testing only
  189. var _testingOnlyNow: NIODeadline?
  190. enum Action {
  191. case none
  192. case schedulePing(delay: TimeAmount, timeout: TimeAmount)
  193. case cancelScheduledTimeout
  194. case reply(HTTP2Frame.FramePayload)
  195. }
  196. init(
  197. pingCode: UInt64,
  198. interval: TimeAmount,
  199. timeout: TimeAmount,
  200. permitWithoutCalls: Bool,
  201. maximumPingsWithoutData: UInt,
  202. minimumSentPingIntervalWithoutData: TimeAmount,
  203. minimumReceivedPingIntervalWithoutData: TimeAmount? = nil,
  204. maximumPingStrikes: UInt? = nil
  205. ) {
  206. self.pingCode = pingCode
  207. self.interval = interval
  208. self.timeout = timeout
  209. self.permitWithoutCalls = permitWithoutCalls
  210. self.maximumPingsWithoutData = maximumPingsWithoutData
  211. self.minimumSentPingIntervalWithoutData = minimumSentPingIntervalWithoutData
  212. self.minimumReceivedPingIntervalWithoutData = minimumReceivedPingIntervalWithoutData
  213. self.maximumPingStrikes = maximumPingStrikes
  214. }
  215. mutating func streamCreated() -> Action {
  216. self.activeStreams += 1
  217. if self.startedAt == nil {
  218. self.startedAt = self.now()
  219. return .schedulePing(delay: self.interval, timeout: self.timeout)
  220. } else {
  221. return .none
  222. }
  223. }
  224. mutating func streamClosed() -> Action {
  225. self.activeStreams -= 1
  226. return .none
  227. }
  228. mutating func read(pingData: HTTP2PingData, ack: Bool) -> Action {
  229. if ack {
  230. return self.handlePong(pingData)
  231. } else {
  232. return self.handlePing(pingData)
  233. }
  234. }
  235. private func handlePong(_ pingData: HTTP2PingData) -> Action {
  236. if pingData.integer == self.pingCode {
  237. return .cancelScheduledTimeout
  238. } else {
  239. return .none
  240. }
  241. }
  242. private mutating func handlePing(_ pingData: HTTP2PingData) -> Action {
  243. // Do we support ping strikes (only servers support ping strikes)?
  244. if let maximumPingStrikes = self.maximumPingStrikes {
  245. // Is this a ping strike?
  246. if self.isPingStrike {
  247. self.pingStrikes += 1
  248. // A maximum ping strike of zero indicates that we tolerate any number of strikes.
  249. if maximumPingStrikes != 0, self.pingStrikes > maximumPingStrikes {
  250. return .reply(PingHandler.goAwayFrame)
  251. } else {
  252. return .none
  253. }
  254. } else {
  255. // This is a valid ping, reset our strike count and reply with a pong.
  256. self.pingStrikes = 0
  257. self.lastReceivedPingDate = self.now()
  258. return .reply(self.generatePingFrame(code: pingData.integer, ack: true))
  259. }
  260. } else {
  261. // We don't support ping strikes. We'll just reply with a pong.
  262. //
  263. // Note: we don't need to update `pingStrikes` or `lastReceivedPingDate` as we don't
  264. // support ping strikes.
  265. return .reply(self.generatePingFrame(code: pingData.integer, ack: true))
  266. }
  267. }
  268. mutating func pingFired() -> Action {
  269. if self.shouldBlockPing {
  270. return .none
  271. } else {
  272. return .reply(self.generatePingFrame(code: self.pingCode, ack: false))
  273. }
  274. }
  275. private mutating func generatePingFrame(code: UInt64, ack: Bool) -> HTTP2Frame.FramePayload {
  276. if self.activeStreams == 0 {
  277. self.sentPingsWithoutData += 1
  278. }
  279. self.lastSentPingDate = self.now()
  280. return HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: code), ack: ack)
  281. }
  282. /// Returns true if, on receipt of a ping, the ping should be regarded as a ping strike.
  283. ///
  284. /// A ping is considered a 'strike' if:
  285. /// - There are no active streams.
  286. /// - We allow pings to be sent when there are no active streams (i.e. `self.permitWithoutCalls`).
  287. /// - The time since the last ping we received is less than the minimum allowed interval.
  288. ///
  289. /// - Precondition: Ping strikes are supported (i.e. `self.maximumPingStrikes != nil`)
  290. private var isPingStrike: Bool {
  291. assert(
  292. self.maximumPingStrikes != nil,
  293. "Ping strikes are not supported but we're checking for one"
  294. )
  295. guard self.activeStreams == 0, self.permitWithoutCalls,
  296. let lastReceivedPingDate = self.lastReceivedPingDate,
  297. let minimumReceivedPingIntervalWithoutData = self.minimumReceivedPingIntervalWithoutData
  298. else {
  299. return false
  300. }
  301. return self.now() - lastReceivedPingDate < minimumReceivedPingIntervalWithoutData
  302. }
  303. private var shouldBlockPing: Bool {
  304. // There is no active call on the transport and pings should not be sent
  305. guard self.activeStreams > 0 || self.permitWithoutCalls else {
  306. return true
  307. }
  308. // There is no active call on the transport but pings should be sent
  309. if self.activeStreams == 0, self.permitWithoutCalls {
  310. // The number of pings already sent on the transport without any data has already exceeded the limit
  311. if self.sentPingsWithoutData > self.maximumPingsWithoutData {
  312. return true
  313. }
  314. // The time elapsed since the previous ping is less than the minimum required
  315. if let lastSentPingDate = self.lastSentPingDate,
  316. self.now() - lastSentPingDate < self.minimumSentPingIntervalWithoutData {
  317. return true
  318. }
  319. return false
  320. }
  321. return false
  322. }
  323. private func now() -> NIODeadline {
  324. return self._testingOnlyNow ?? .now()
  325. }
  326. }