HTTP2ToRawGRPCServerCodec.swift 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. private let errorDelegate: ServerErrorDelegate?
  28. private var context: ChannelHandlerContext!
  29. /// The mode we're operating in.
  30. private var mode: Mode = .notConfigured
  31. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  32. /// burst of reading has completed.
  33. private var isReading = false
  34. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  35. /// then it is held until the read completes in order to elide unnecessary flushes.
  36. private var flushPending = false
  37. private enum Mode {
  38. case notConfigured
  39. case legacy
  40. case handler(GRPCServerHandlerProtocol)
  41. }
  42. init(
  43. servicesByName: [Substring: CallHandlerProvider],
  44. encoding: ServerMessageEncoding,
  45. errorDelegate: ServerErrorDelegate?,
  46. normalizeHeaders: Bool,
  47. logger: Logger
  48. ) {
  49. self.logger = logger
  50. self.errorDelegate = errorDelegate
  51. self.state = HTTP2ToRawGRPCStateMachine(
  52. services: servicesByName,
  53. encoding: encoding,
  54. normalizeHeaders: normalizeHeaders
  55. )
  56. }
  57. internal func handlerAdded(context: ChannelHandlerContext) {
  58. self.context = context
  59. }
  60. internal func handlerRemoved(context: ChannelHandlerContext) {
  61. self.context = nil
  62. }
  63. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  64. switch self.mode {
  65. case .notConfigured:
  66. context.close(mode: .all, promise: nil)
  67. case .legacy:
  68. context.fireErrorCaught(error)
  69. case let .handler(hander):
  70. hander.receiveError(error)
  71. }
  72. }
  73. internal func channelInactive(context: ChannelHandlerContext) {
  74. switch self.mode {
  75. case .notConfigured, .legacy:
  76. context.fireChannelInactive()
  77. case let .handler(handler):
  78. handler.finish()
  79. }
  80. }
  81. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  82. self.isReading = true
  83. let payload = self.unwrapInboundIn(data)
  84. switch payload {
  85. case let .headers(payload):
  86. let receiveHeaders = self.state.receive(
  87. headers: payload.headers,
  88. eventLoop: context.eventLoop,
  89. errorDelegate: self.errorDelegate,
  90. remoteAddress: context.channel.remoteAddress,
  91. logger: self.logger,
  92. allocator: context.channel.allocator,
  93. responseWriter: self
  94. )
  95. switch receiveHeaders {
  96. case let .configureLegacy(handler):
  97. self.mode = .legacy
  98. context.channel.pipeline.addHandler(handler).whenSuccess {
  99. self.configured()
  100. }
  101. case let .configure(handler):
  102. self.mode = .handler(handler)
  103. self.configured()
  104. case let .rejectRPC(trailers):
  105. // We're not handling this request: write headers and end stream.
  106. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  107. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  108. }
  109. case let .data(payload):
  110. switch payload.data {
  111. case var .byteBuffer(buffer):
  112. let tryToRead = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  113. if tryToRead {
  114. self.tryReadingMessage()
  115. }
  116. case .fileRegion:
  117. preconditionFailure("Unexpected IOData.fileRegion")
  118. }
  119. // Ignored.
  120. case .alternativeService,
  121. .goAway,
  122. .origin,
  123. .ping,
  124. .priority,
  125. .pushPromise,
  126. .rstStream,
  127. .settings,
  128. .windowUpdate:
  129. ()
  130. }
  131. }
  132. internal func channelReadComplete(context: ChannelHandlerContext) {
  133. self.isReading = false
  134. if self.flushPending {
  135. self.flushPending = false
  136. context.flush()
  137. }
  138. context.fireChannelReadComplete()
  139. }
  140. internal func write(
  141. context: ChannelHandlerContext,
  142. data: NIOAny,
  143. promise: EventLoopPromise<Void>?
  144. ) {
  145. let responsePart = self.unwrapOutboundIn(data)
  146. switch responsePart {
  147. case let .metadata(headers):
  148. self.sendMetadata(headers, promise: promise)
  149. case let .message(buffer, metadata):
  150. self.sendMessage(buffer, metadata: metadata, promise: promise)
  151. case let .end(status, trailers):
  152. self.sendEnd(status: status, trailers: trailers, promise: promise)
  153. }
  154. }
  155. internal func flush(context: ChannelHandlerContext) {
  156. if self.isReading {
  157. // We're already reading; record the flush and emit it when the read completes.
  158. self.flushPending = true
  159. } else {
  160. // Not reading: flush now.
  161. context.flush()
  162. }
  163. }
  164. /// Called when the pipeline has finished configuring.
  165. private func configured() {
  166. switch self.state.pipelineConfigured() {
  167. case let .forwardHeaders(headers):
  168. switch self.mode {
  169. case .notConfigured:
  170. preconditionFailure()
  171. case .legacy:
  172. self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
  173. case let .handler(handler):
  174. handler.receiveMetadata(headers)
  175. }
  176. case let .forwardHeadersAndRead(headers):
  177. switch self.mode {
  178. case .notConfigured:
  179. preconditionFailure()
  180. case .legacy:
  181. self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
  182. case let .handler(handler):
  183. handler.receiveMetadata(headers)
  184. }
  185. self.tryReadingMessage()
  186. }
  187. }
  188. /// Try to read a request message from the buffer.
  189. private func tryReadingMessage() {
  190. let action = self.state.readNextRequest()
  191. switch action {
  192. case .none:
  193. ()
  194. case let .forwardMessage(buffer):
  195. switch self.mode {
  196. case .notConfigured:
  197. preconditionFailure()
  198. case .legacy:
  199. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  200. case let .handler(handler):
  201. handler.receiveMessage(buffer)
  202. }
  203. case let .forwardMessageAndEnd(buffer):
  204. switch self.mode {
  205. case .notConfigured:
  206. preconditionFailure()
  207. case .legacy:
  208. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  209. self.context.fireChannelRead(self.wrapInboundOut(.end))
  210. case let .handler(handler):
  211. handler.receiveMessage(buffer)
  212. handler.receiveEnd()
  213. }
  214. case let .forwardMessageThenReadNextMessage(buffer):
  215. switch self.mode {
  216. case .notConfigured:
  217. preconditionFailure()
  218. case .legacy:
  219. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  220. case let .handler(handler):
  221. handler.receiveMessage(buffer)
  222. }
  223. self.tryReadingMessage()
  224. case .forwardEnd:
  225. switch self.mode {
  226. case .notConfigured:
  227. preconditionFailure()
  228. case .legacy:
  229. self.context.fireChannelRead(self.wrapInboundOut(.end))
  230. case let .handler(handler):
  231. handler.receiveEnd()
  232. }
  233. case let .errorCaught(error):
  234. switch self.mode {
  235. case .notConfigured:
  236. preconditionFailure()
  237. case .legacy:
  238. self.context.fireErrorCaught(error)
  239. case let .handler(handler):
  240. handler.receiveError(error)
  241. }
  242. }
  243. }
  244. internal func sendMetadata(
  245. _ headers: HPACKHeaders,
  246. promise: EventLoopPromise<Void>?
  247. ) {
  248. switch self.state.send(headers: headers) {
  249. case let .success(headers):
  250. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  251. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  252. if self.isReading {
  253. self.flushPending = true
  254. } else {
  255. self.context.flush()
  256. }
  257. case let .failure(error):
  258. promise?.fail(error)
  259. }
  260. }
  261. internal func sendMessage(
  262. _ buffer: ByteBuffer,
  263. metadata: MessageMetadata,
  264. promise: EventLoopPromise<Void>?
  265. ) {
  266. let writeBuffer = self.state.send(
  267. buffer: buffer,
  268. allocator: self.context.channel.allocator,
  269. compress: metadata.compress
  270. )
  271. switch writeBuffer {
  272. case let .success(buffer):
  273. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  274. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  275. if self.isReading {
  276. self.flushPending = true
  277. } else {
  278. self.context.flush()
  279. }
  280. case let .failure(error):
  281. promise?.fail(error)
  282. }
  283. }
  284. internal func sendEnd(
  285. status: GRPCStatus,
  286. trailers: HPACKHeaders,
  287. promise: EventLoopPromise<Void>?
  288. ) {
  289. switch self.state.send(status: status, trailers: trailers) {
  290. case let .success(trailers):
  291. // Always end stream for status and trailers.
  292. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  293. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  294. if self.isReading {
  295. self.flushPending = true
  296. } else {
  297. self.context.flush()
  298. }
  299. case let .failure(error):
  300. promise?.fail(error)
  301. }
  302. }
  303. }