HTTP2ToRawGRPCServerCodec.swift 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias OutboundOut = HTTP2Frame.FramePayload
  23. private var logger: Logger
  24. private var state: HTTP2ToRawGRPCStateMachine
  25. private let errorDelegate: ServerErrorDelegate?
  26. private var context: ChannelHandlerContext!
  27. private let servicesByName: [Substring: CallHandlerProvider]
  28. private let encoding: ServerMessageEncoding
  29. private let normalizeHeaders: Bool
  30. /// The configuration state of the handler.
  31. private var configurationState: Configuration = .notConfigured
  32. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  33. /// burst of reading has completed.
  34. private var isReading = false
  35. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  36. /// then it is held until the read completes in order to elide unnecessary flushes.
  37. private var flushPending = false
  38. private enum Configuration {
  39. case notConfigured
  40. case configured(GRPCServerHandlerProtocol)
  41. var isConfigured: Bool {
  42. switch self {
  43. case .configured:
  44. return true
  45. case .notConfigured:
  46. return false
  47. }
  48. }
  49. mutating func tearDown() -> GRPCServerHandlerProtocol? {
  50. switch self {
  51. case .notConfigured:
  52. return nil
  53. case let .configured(handler):
  54. self = .notConfigured
  55. return handler
  56. }
  57. }
  58. }
  59. init(
  60. servicesByName: [Substring: CallHandlerProvider],
  61. encoding: ServerMessageEncoding,
  62. errorDelegate: ServerErrorDelegate?,
  63. normalizeHeaders: Bool,
  64. logger: Logger
  65. ) {
  66. self.logger = logger
  67. self.errorDelegate = errorDelegate
  68. self.servicesByName = servicesByName
  69. self.encoding = encoding
  70. self.normalizeHeaders = normalizeHeaders
  71. self.state = HTTP2ToRawGRPCStateMachine()
  72. }
  73. internal func handlerAdded(context: ChannelHandlerContext) {
  74. self.context = context
  75. }
  76. internal func handlerRemoved(context: ChannelHandlerContext) {
  77. self.context = nil
  78. self.configurationState = .notConfigured
  79. }
  80. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  81. switch self.configurationState {
  82. case .notConfigured:
  83. context.close(mode: .all, promise: nil)
  84. case let .configured(hander):
  85. hander.receiveError(error)
  86. }
  87. }
  88. internal func channelInactive(context: ChannelHandlerContext) {
  89. if let handler = self.configurationState.tearDown() {
  90. handler.finish()
  91. } else {
  92. context.fireChannelInactive()
  93. }
  94. }
  95. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  96. self.isReading = true
  97. let payload = self.unwrapInboundIn(data)
  98. switch payload {
  99. case let .headers(payload):
  100. let receiveHeaders = self.state.receive(
  101. headers: payload.headers,
  102. eventLoop: context.eventLoop,
  103. errorDelegate: self.errorDelegate,
  104. remoteAddress: context.channel.remoteAddress,
  105. logger: self.logger,
  106. allocator: context.channel.allocator,
  107. responseWriter: self,
  108. closeFuture: context.channel.closeFuture,
  109. services: self.servicesByName,
  110. encoding: self.encoding,
  111. normalizeHeaders: self.normalizeHeaders
  112. )
  113. switch receiveHeaders {
  114. case let .configure(handler):
  115. assert(!self.configurationState.isConfigured)
  116. self.configurationState = .configured(handler)
  117. self.configured()
  118. case let .rejectRPC(trailers):
  119. assert(!self.configurationState.isConfigured)
  120. // We're not handling this request: write headers and end stream.
  121. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  122. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  123. }
  124. case let .data(payload):
  125. switch payload.data {
  126. case var .byteBuffer(buffer):
  127. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  128. switch action {
  129. case .tryReading:
  130. self.tryReadingMessage()
  131. case .finishHandler:
  132. let handler = self.configurationState.tearDown()
  133. handler?.finish()
  134. case .nothing:
  135. ()
  136. }
  137. case .fileRegion:
  138. preconditionFailure("Unexpected IOData.fileRegion")
  139. }
  140. // Ignored.
  141. case .alternativeService,
  142. .goAway,
  143. .origin,
  144. .ping,
  145. .priority,
  146. .pushPromise,
  147. .rstStream,
  148. .settings,
  149. .windowUpdate:
  150. ()
  151. }
  152. }
  153. internal func channelReadComplete(context: ChannelHandlerContext) {
  154. self.isReading = false
  155. if self.flushPending {
  156. self.flushPending = false
  157. context.flush()
  158. }
  159. context.fireChannelReadComplete()
  160. }
  161. /// Called when the pipeline has finished configuring.
  162. private func configured() {
  163. switch self.state.pipelineConfigured() {
  164. case let .forwardHeaders(headers):
  165. switch self.configurationState {
  166. case .notConfigured:
  167. preconditionFailure()
  168. case let .configured(handler):
  169. handler.receiveMetadata(headers)
  170. }
  171. case let .forwardHeadersAndRead(headers):
  172. switch self.configurationState {
  173. case .notConfigured:
  174. preconditionFailure()
  175. case let .configured(handler):
  176. handler.receiveMetadata(headers)
  177. }
  178. self.tryReadingMessage()
  179. }
  180. }
  181. /// Try to read a request message from the buffer.
  182. private func tryReadingMessage() {
  183. let action = self.state.readNextRequest()
  184. switch action {
  185. case .none:
  186. ()
  187. case let .forwardMessage(buffer):
  188. switch self.configurationState {
  189. case .notConfigured:
  190. preconditionFailure()
  191. case let .configured(handler):
  192. handler.receiveMessage(buffer)
  193. }
  194. case let .forwardMessageThenReadNextMessage(buffer):
  195. switch self.configurationState {
  196. case .notConfigured:
  197. preconditionFailure()
  198. case let .configured(handler):
  199. handler.receiveMessage(buffer)
  200. }
  201. self.tryReadingMessage()
  202. case .forwardEnd:
  203. switch self.configurationState {
  204. case .notConfigured:
  205. preconditionFailure()
  206. case let .configured(handler):
  207. handler.receiveEnd()
  208. }
  209. case let .errorCaught(error):
  210. switch self.configurationState {
  211. case .notConfigured:
  212. preconditionFailure()
  213. case let .configured(handler):
  214. handler.receiveError(error)
  215. }
  216. }
  217. }
  218. internal func sendMetadata(
  219. _ headers: HPACKHeaders,
  220. flush: Bool,
  221. promise: EventLoopPromise<Void>?
  222. ) {
  223. switch self.state.send(headers: headers) {
  224. case let .success(headers):
  225. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  226. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  227. if flush {
  228. self.markFlushPoint()
  229. }
  230. case let .failure(error):
  231. promise?.fail(error)
  232. }
  233. }
  234. internal func sendMessage(
  235. _ buffer: ByteBuffer,
  236. metadata: MessageMetadata,
  237. promise: EventLoopPromise<Void>?
  238. ) {
  239. let writeBuffer = self.state.send(
  240. buffer: buffer,
  241. allocator: self.context.channel.allocator,
  242. compress: metadata.compress
  243. )
  244. switch writeBuffer {
  245. case let .success(buffer):
  246. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  247. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  248. if metadata.flush {
  249. self.markFlushPoint()
  250. }
  251. case let .failure(error):
  252. promise?.fail(error)
  253. }
  254. }
  255. internal func sendEnd(
  256. status: GRPCStatus,
  257. trailers: HPACKHeaders,
  258. promise: EventLoopPromise<Void>?
  259. ) {
  260. switch self.state.send(status: status, trailers: trailers) {
  261. case let .sendTrailers(trailers):
  262. self.sendTrailers(trailers, promise: promise)
  263. case let .sendTrailersAndFinish(trailers):
  264. self.sendTrailers(trailers, promise: promise)
  265. // 'finish' the handler.
  266. let handler = self.configurationState.tearDown()
  267. handler?.finish()
  268. case let .failure(error):
  269. promise?.fail(error)
  270. }
  271. }
  272. private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  273. // Always end stream for status and trailers.
  274. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  275. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  276. // We'll always flush on end.
  277. self.markFlushPoint()
  278. }
  279. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  280. /// or emit a flush now if we are not.
  281. private func markFlushPoint() {
  282. if self.isReading {
  283. self.flushPending = true
  284. } else {
  285. self.flushPending = false
  286. self.context.flush()
  287. }
  288. }
  289. }