HTTP2ToRawGRPCServerCodec.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias OutboundOut = HTTP2Frame.FramePayload
  23. private var logger: Logger
  24. private var state: HTTP2ToRawGRPCStateMachine
  25. private let errorDelegate: ServerErrorDelegate?
  26. private var context: ChannelHandlerContext!
  27. private let servicesByName: [Substring: CallHandlerProvider]
  28. private let encoding: ServerMessageEncoding
  29. private let normalizeHeaders: Bool
  30. private let maxReceiveMessageLength: Int
  31. /// The configuration state of the handler.
  32. private var configurationState: Configuration = .notConfigured
  33. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  34. /// burst of reading has completed.
  35. private var isReading = false
  36. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  37. /// then it is held until the read completes in order to elide unnecessary flushes.
  38. private var flushPending = false
  39. private enum Configuration {
  40. case notConfigured
  41. case configured(GRPCServerHandlerProtocol)
  42. var isConfigured: Bool {
  43. switch self {
  44. case .configured:
  45. return true
  46. case .notConfigured:
  47. return false
  48. }
  49. }
  50. mutating func tearDown() -> GRPCServerHandlerProtocol? {
  51. switch self {
  52. case .notConfigured:
  53. return nil
  54. case let .configured(handler):
  55. self = .notConfigured
  56. return handler
  57. }
  58. }
  59. }
  60. init(
  61. servicesByName: [Substring: CallHandlerProvider],
  62. encoding: ServerMessageEncoding,
  63. errorDelegate: ServerErrorDelegate?,
  64. normalizeHeaders: Bool,
  65. maximumReceiveMessageLength: Int,
  66. logger: Logger
  67. ) {
  68. self.logger = logger
  69. self.errorDelegate = errorDelegate
  70. self.servicesByName = servicesByName
  71. self.encoding = encoding
  72. self.normalizeHeaders = normalizeHeaders
  73. self.maxReceiveMessageLength = maximumReceiveMessageLength
  74. self.state = HTTP2ToRawGRPCStateMachine()
  75. }
  76. internal func handlerAdded(context: ChannelHandlerContext) {
  77. self.context = context
  78. }
  79. internal func handlerRemoved(context: ChannelHandlerContext) {
  80. self.context = nil
  81. self.configurationState = .notConfigured
  82. }
  83. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  84. switch self.configurationState {
  85. case .notConfigured:
  86. context.close(mode: .all, promise: nil)
  87. case let .configured(hander):
  88. hander.receiveError(error)
  89. }
  90. }
  91. internal func channelInactive(context: ChannelHandlerContext) {
  92. if let handler = self.configurationState.tearDown() {
  93. handler.finish()
  94. } else {
  95. context.fireChannelInactive()
  96. }
  97. }
  98. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  99. self.isReading = true
  100. let payload = self.unwrapInboundIn(data)
  101. switch payload {
  102. case let .headers(payload):
  103. let receiveHeaders = self.state.receive(
  104. headers: payload.headers,
  105. eventLoop: context.eventLoop,
  106. errorDelegate: self.errorDelegate,
  107. remoteAddress: context.channel.remoteAddress,
  108. logger: self.logger,
  109. allocator: context.channel.allocator,
  110. responseWriter: self,
  111. closeFuture: context.channel.closeFuture,
  112. services: self.servicesByName,
  113. encoding: self.encoding,
  114. normalizeHeaders: self.normalizeHeaders
  115. )
  116. switch receiveHeaders {
  117. case let .configure(handler):
  118. assert(!self.configurationState.isConfigured)
  119. self.configurationState = .configured(handler)
  120. self.configured()
  121. case let .rejectRPC(trailers):
  122. assert(!self.configurationState.isConfigured)
  123. // We're not handling this request: write headers and end stream.
  124. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  125. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  126. }
  127. case let .data(payload):
  128. switch payload.data {
  129. case var .byteBuffer(buffer):
  130. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  131. switch action {
  132. case .tryReading:
  133. self.tryReadingMessage()
  134. case .finishHandler:
  135. let handler = self.configurationState.tearDown()
  136. handler?.finish()
  137. case .nothing:
  138. ()
  139. }
  140. case .fileRegion:
  141. preconditionFailure("Unexpected IOData.fileRegion")
  142. }
  143. // Ignored.
  144. case .alternativeService,
  145. .goAway,
  146. .origin,
  147. .ping,
  148. .priority,
  149. .pushPromise,
  150. .rstStream,
  151. .settings,
  152. .windowUpdate:
  153. ()
  154. }
  155. }
  156. internal func channelReadComplete(context: ChannelHandlerContext) {
  157. self.isReading = false
  158. if self.flushPending {
  159. self.deliverPendingResponses()
  160. self.flushPending = false
  161. context.flush()
  162. }
  163. context.fireChannelReadComplete()
  164. }
  165. private func deliverPendingResponses() {
  166. while let (result, promise) = self.state.nextResponse() {
  167. switch result {
  168. case let .success(buffer):
  169. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  170. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  171. case let .failure(error):
  172. promise?.fail(error)
  173. }
  174. }
  175. }
  176. /// Called when the pipeline has finished configuring.
  177. private func configured() {
  178. switch self.state.pipelineConfigured() {
  179. case let .forwardHeaders(headers):
  180. switch self.configurationState {
  181. case .notConfigured:
  182. preconditionFailure()
  183. case let .configured(handler):
  184. handler.receiveMetadata(headers)
  185. }
  186. case let .forwardHeadersAndRead(headers):
  187. switch self.configurationState {
  188. case .notConfigured:
  189. preconditionFailure()
  190. case let .configured(handler):
  191. handler.receiveMetadata(headers)
  192. }
  193. self.tryReadingMessage()
  194. }
  195. }
  196. /// Try to read a request message from the buffer.
  197. private func tryReadingMessage() {
  198. // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`.
  199. // Almost all cases return directly out of the loop.
  200. while true {
  201. let action = self.state.readNextRequest(
  202. maxLength: self.maxReceiveMessageLength
  203. )
  204. switch action {
  205. case .none:
  206. return
  207. case let .forwardMessage(buffer):
  208. switch self.configurationState {
  209. case .notConfigured:
  210. preconditionFailure()
  211. case let .configured(handler):
  212. handler.receiveMessage(buffer)
  213. }
  214. return
  215. case let .forwardMessageThenReadNextMessage(buffer):
  216. switch self.configurationState {
  217. case .notConfigured:
  218. preconditionFailure()
  219. case let .configured(handler):
  220. handler.receiveMessage(buffer)
  221. }
  222. continue
  223. case .forwardEnd:
  224. switch self.configurationState {
  225. case .notConfigured:
  226. preconditionFailure()
  227. case let .configured(handler):
  228. handler.receiveEnd()
  229. }
  230. return
  231. case let .errorCaught(error):
  232. switch self.configurationState {
  233. case .notConfigured:
  234. preconditionFailure()
  235. case let .configured(handler):
  236. handler.receiveError(error)
  237. }
  238. return
  239. }
  240. }
  241. }
  242. internal func sendMetadata(
  243. _ headers: HPACKHeaders,
  244. flush: Bool,
  245. promise: EventLoopPromise<Void>?
  246. ) {
  247. switch self.state.send(headers: headers) {
  248. case let .success(headers):
  249. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  250. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  251. if flush {
  252. self.markFlushPoint()
  253. }
  254. case let .failure(error):
  255. promise?.fail(error)
  256. }
  257. }
  258. internal func sendMessage(
  259. _ buffer: ByteBuffer,
  260. metadata: MessageMetadata,
  261. promise: EventLoopPromise<Void>?
  262. ) {
  263. let result = self.state.send(
  264. buffer: buffer,
  265. compress: metadata.compress,
  266. promise: promise
  267. )
  268. switch result {
  269. case .success:
  270. if metadata.flush {
  271. self.markFlushPoint()
  272. }
  273. case let .failure(error):
  274. promise?.fail(error)
  275. }
  276. }
  277. internal func sendEnd(
  278. status: GRPCStatus,
  279. trailers: HPACKHeaders,
  280. promise: EventLoopPromise<Void>?
  281. ) {
  282. // About to end the stream: send any pending responses.
  283. self.deliverPendingResponses()
  284. switch self.state.send(status: status, trailers: trailers) {
  285. case let .sendTrailers(trailers):
  286. self.sendTrailers(trailers, promise: promise)
  287. case let .sendTrailersAndFinish(trailers):
  288. self.sendTrailers(trailers, promise: promise)
  289. // 'finish' the handler.
  290. let handler = self.configurationState.tearDown()
  291. handler?.finish()
  292. case let .failure(error):
  293. promise?.fail(error)
  294. }
  295. }
  296. private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  297. // Always end stream for status and trailers.
  298. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  299. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  300. // We'll always flush on end.
  301. self.markFlushPoint()
  302. }
  303. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  304. /// or emit a flush now if we are not.
  305. private func markFlushPoint() {
  306. if self.isReading {
  307. self.flushPending = true
  308. } else {
  309. // About to flush: send any pending responses.
  310. self.deliverPendingResponses()
  311. self.flushPending = false
  312. self.context.flush()
  313. }
  314. }
  315. }