HTTP2ToRawGRPCServerCodec.swift 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias OutboundOut = HTTP2Frame.FramePayload
  23. private var logger: Logger
  24. private var state: HTTP2ToRawGRPCStateMachine
  25. private let errorDelegate: ServerErrorDelegate?
  26. private var context: ChannelHandlerContext!
  27. private let servicesByName: [Substring: CallHandlerProvider]
  28. private let encoding: ServerMessageEncoding
  29. private let normalizeHeaders: Bool
  30. private let maxReceiveMessageLength: Int
  31. /// The configuration state of the handler.
  32. private var configurationState: Configuration = .notConfigured
  33. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  34. /// burst of reading has completed.
  35. private var isReading = false
  36. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  37. /// then it is held until the read completes in order to elide unnecessary flushes.
  38. private var flushPending = false
  39. private enum Configuration {
  40. case notConfigured
  41. case configured(GRPCServerHandlerProtocol)
  42. var isConfigured: Bool {
  43. switch self {
  44. case .configured:
  45. return true
  46. case .notConfigured:
  47. return false
  48. }
  49. }
  50. mutating func tearDown() -> GRPCServerHandlerProtocol? {
  51. switch self {
  52. case .notConfigured:
  53. return nil
  54. case let .configured(handler):
  55. self = .notConfigured
  56. return handler
  57. }
  58. }
  59. }
  60. init(
  61. servicesByName: [Substring: CallHandlerProvider],
  62. encoding: ServerMessageEncoding,
  63. errorDelegate: ServerErrorDelegate?,
  64. normalizeHeaders: Bool,
  65. maximumReceiveMessageLength: Int,
  66. logger: Logger
  67. ) {
  68. self.logger = logger
  69. self.errorDelegate = errorDelegate
  70. self.servicesByName = servicesByName
  71. self.encoding = encoding
  72. self.normalizeHeaders = normalizeHeaders
  73. self.maxReceiveMessageLength = maximumReceiveMessageLength
  74. self.state = HTTP2ToRawGRPCStateMachine()
  75. }
  76. internal func handlerAdded(context: ChannelHandlerContext) {
  77. self.context = context
  78. }
  79. internal func handlerRemoved(context: ChannelHandlerContext) {
  80. self.context = nil
  81. self.configurationState = .notConfigured
  82. }
  83. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  84. switch self.configurationState {
  85. case .notConfigured:
  86. context.close(mode: .all, promise: nil)
  87. case let .configured(hander):
  88. hander.receiveError(error)
  89. }
  90. }
  91. internal func channelInactive(context: ChannelHandlerContext) {
  92. if let handler = self.configurationState.tearDown() {
  93. handler.finish()
  94. } else {
  95. context.fireChannelInactive()
  96. }
  97. }
  98. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  99. self.isReading = true
  100. let payload = self.unwrapInboundIn(data)
  101. switch payload {
  102. case let .headers(payload):
  103. let receiveHeaders = self.state.receive(
  104. headers: payload.headers,
  105. eventLoop: context.eventLoop,
  106. errorDelegate: self.errorDelegate,
  107. remoteAddress: context.channel.remoteAddress,
  108. logger: self.logger,
  109. allocator: context.channel.allocator,
  110. responseWriter: self,
  111. closeFuture: context.channel.closeFuture,
  112. services: self.servicesByName,
  113. encoding: self.encoding,
  114. normalizeHeaders: self.normalizeHeaders
  115. )
  116. switch receiveHeaders {
  117. case let .configure(handler):
  118. assert(!self.configurationState.isConfigured)
  119. self.configurationState = .configured(handler)
  120. self.configured()
  121. case let .rejectRPC(trailers):
  122. assert(!self.configurationState.isConfigured)
  123. // We're not handling this request: write headers and end stream.
  124. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  125. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  126. }
  127. case let .data(payload):
  128. switch payload.data {
  129. case var .byteBuffer(buffer):
  130. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  131. switch action {
  132. case .tryReading:
  133. self.tryReadingMessage()
  134. case .finishHandler:
  135. let handler = self.configurationState.tearDown()
  136. handler?.finish()
  137. case .nothing:
  138. ()
  139. }
  140. case .fileRegion:
  141. preconditionFailure("Unexpected IOData.fileRegion")
  142. }
  143. // Ignored.
  144. case .alternativeService,
  145. .goAway,
  146. .origin,
  147. .ping,
  148. .priority,
  149. .pushPromise,
  150. .rstStream,
  151. .settings,
  152. .windowUpdate:
  153. ()
  154. }
  155. }
  156. internal func channelReadComplete(context: ChannelHandlerContext) {
  157. self.isReading = false
  158. if self.flushPending {
  159. self.flushPending = false
  160. context.flush()
  161. }
  162. context.fireChannelReadComplete()
  163. }
  164. /// Called when the pipeline has finished configuring.
  165. private func configured() {
  166. switch self.state.pipelineConfigured() {
  167. case let .forwardHeaders(headers):
  168. switch self.configurationState {
  169. case .notConfigured:
  170. preconditionFailure()
  171. case let .configured(handler):
  172. handler.receiveMetadata(headers)
  173. }
  174. case let .forwardHeadersAndRead(headers):
  175. switch self.configurationState {
  176. case .notConfigured:
  177. preconditionFailure()
  178. case let .configured(handler):
  179. handler.receiveMetadata(headers)
  180. }
  181. self.tryReadingMessage()
  182. }
  183. }
  184. /// Try to read a request message from the buffer.
  185. private func tryReadingMessage() {
  186. // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`.
  187. // Almost all cases return directly out of the loop.
  188. while true {
  189. let action = self.state.readNextRequest(
  190. maxLength: self.maxReceiveMessageLength
  191. )
  192. switch action {
  193. case .none:
  194. return
  195. case let .forwardMessage(buffer):
  196. switch self.configurationState {
  197. case .notConfigured:
  198. preconditionFailure()
  199. case let .configured(handler):
  200. handler.receiveMessage(buffer)
  201. }
  202. return
  203. case let .forwardMessageThenReadNextMessage(buffer):
  204. switch self.configurationState {
  205. case .notConfigured:
  206. preconditionFailure()
  207. case let .configured(handler):
  208. handler.receiveMessage(buffer)
  209. }
  210. continue
  211. case .forwardEnd:
  212. switch self.configurationState {
  213. case .notConfigured:
  214. preconditionFailure()
  215. case let .configured(handler):
  216. handler.receiveEnd()
  217. }
  218. return
  219. case let .errorCaught(error):
  220. switch self.configurationState {
  221. case .notConfigured:
  222. preconditionFailure()
  223. case let .configured(handler):
  224. handler.receiveError(error)
  225. }
  226. return
  227. }
  228. }
  229. }
  230. internal func sendMetadata(
  231. _ headers: HPACKHeaders,
  232. flush: Bool,
  233. promise: EventLoopPromise<Void>?
  234. ) {
  235. switch self.state.send(headers: headers) {
  236. case let .success(headers):
  237. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  238. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  239. if flush {
  240. self.markFlushPoint()
  241. }
  242. case let .failure(error):
  243. promise?.fail(error)
  244. }
  245. }
  246. internal func sendMessage(
  247. _ buffer: ByteBuffer,
  248. metadata: MessageMetadata,
  249. promise: EventLoopPromise<Void>?
  250. ) {
  251. let writeBuffer = self.state.send(
  252. buffer: buffer,
  253. allocator: self.context.channel.allocator,
  254. compress: metadata.compress
  255. )
  256. switch writeBuffer {
  257. case let .success(buffer):
  258. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  259. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  260. if metadata.flush {
  261. self.markFlushPoint()
  262. }
  263. case let .failure(error):
  264. promise?.fail(error)
  265. }
  266. }
  267. internal func sendEnd(
  268. status: GRPCStatus,
  269. trailers: HPACKHeaders,
  270. promise: EventLoopPromise<Void>?
  271. ) {
  272. switch self.state.send(status: status, trailers: trailers) {
  273. case let .sendTrailers(trailers):
  274. self.sendTrailers(trailers, promise: promise)
  275. case let .sendTrailersAndFinish(trailers):
  276. self.sendTrailers(trailers, promise: promise)
  277. // 'finish' the handler.
  278. let handler = self.configurationState.tearDown()
  279. handler?.finish()
  280. case let .failure(error):
  281. promise?.fail(error)
  282. }
  283. }
  284. private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  285. // Always end stream for status and trailers.
  286. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  287. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  288. // We'll always flush on end.
  289. self.markFlushPoint()
  290. }
  291. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  292. /// or emit a flush now if we are not.
  293. private func markFlushPoint() {
  294. if self.isReading {
  295. self.flushPending = true
  296. } else {
  297. self.flushPending = false
  298. self.context.flush()
  299. }
  300. }
  301. }