HTTP2ToRawGRPCServerCodec.swift 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. private let errorDelegate: ServerErrorDelegate?
  28. private var context: ChannelHandlerContext!
  29. /// The mode we're operating in.
  30. private var mode: Mode = .notConfigured
  31. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  32. /// burst of reading has completed.
  33. private var isReading = false
  34. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  35. /// then it is held until the read completes in order to elide unnecessary flushes.
  36. private var flushPending = false
  37. private enum Mode {
  38. case notConfigured
  39. case legacy
  40. case handler(GRPCServerHandlerProtocol)
  41. }
  42. init(
  43. servicesByName: [Substring: CallHandlerProvider],
  44. encoding: ServerMessageEncoding,
  45. errorDelegate: ServerErrorDelegate?,
  46. normalizeHeaders: Bool,
  47. logger: Logger
  48. ) {
  49. self.logger = logger
  50. self.errorDelegate = errorDelegate
  51. self.state = HTTP2ToRawGRPCStateMachine(
  52. services: servicesByName,
  53. encoding: encoding,
  54. normalizeHeaders: normalizeHeaders
  55. )
  56. }
  57. internal func handlerAdded(context: ChannelHandlerContext) {
  58. self.context = context
  59. }
  60. internal func handlerRemoved(context: ChannelHandlerContext) {
  61. self.context = nil
  62. }
  63. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  64. switch self.mode {
  65. case .notConfigured:
  66. context.close(mode: .all, promise: nil)
  67. case .legacy:
  68. context.fireErrorCaught(error)
  69. case let .handler(hander):
  70. hander.receiveError(error)
  71. }
  72. }
  73. internal func channelInactive(context: ChannelHandlerContext) {
  74. switch self.mode {
  75. case .notConfigured, .legacy:
  76. context.fireChannelInactive()
  77. case let .handler(handler):
  78. handler.finish()
  79. }
  80. }
  81. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  82. self.isReading = true
  83. let payload = self.unwrapInboundIn(data)
  84. switch payload {
  85. case let .headers(payload):
  86. let receiveHeaders = self.state.receive(
  87. headers: payload.headers,
  88. eventLoop: context.eventLoop,
  89. errorDelegate: self.errorDelegate,
  90. remoteAddress: context.channel.remoteAddress,
  91. logger: self.logger,
  92. allocator: context.channel.allocator,
  93. responseWriter: self
  94. )
  95. switch receiveHeaders {
  96. case let .configureLegacy(handler):
  97. self.mode = .legacy
  98. context.channel.pipeline.addHandler(handler).whenSuccess {
  99. self.configured()
  100. }
  101. case let .configure(handler):
  102. self.mode = .handler(handler)
  103. self.configured()
  104. case let .rejectRPC(trailers):
  105. // We're not handling this request: write headers and end stream.
  106. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  107. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  108. }
  109. case let .data(payload):
  110. switch payload.data {
  111. case var .byteBuffer(buffer):
  112. let tryToRead = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  113. if tryToRead {
  114. self.tryReadingMessage()
  115. }
  116. case .fileRegion:
  117. preconditionFailure("Unexpected IOData.fileRegion")
  118. }
  119. // Ignored.
  120. case .alternativeService,
  121. .goAway,
  122. .origin,
  123. .ping,
  124. .priority,
  125. .pushPromise,
  126. .rstStream,
  127. .settings,
  128. .windowUpdate:
  129. ()
  130. }
  131. }
  132. internal func channelReadComplete(context: ChannelHandlerContext) {
  133. self.isReading = false
  134. if self.flushPending {
  135. self.flushPending = false
  136. context.flush()
  137. }
  138. context.fireChannelReadComplete()
  139. }
  140. internal func write(
  141. context: ChannelHandlerContext,
  142. data: NIOAny,
  143. promise: EventLoopPromise<Void>?
  144. ) {
  145. let responsePart = self.unwrapOutboundIn(data)
  146. switch responsePart {
  147. case let .metadata(headers):
  148. // We're in 'write' so we're using the old type of RPC handler which emits its own flushes,
  149. // no need to emit an extra one.
  150. self.sendMetadata(headers, flush: false, promise: promise)
  151. case let .message(buffer, metadata):
  152. self.sendMessage(buffer, metadata: metadata, promise: promise)
  153. case let .end(status, trailers):
  154. self.sendEnd(status: status, trailers: trailers, promise: promise)
  155. }
  156. }
  157. internal func flush(context: ChannelHandlerContext) {
  158. self.markFlushPoint()
  159. }
  160. /// Called when the pipeline has finished configuring.
  161. private func configured() {
  162. switch self.state.pipelineConfigured() {
  163. case let .forwardHeaders(headers):
  164. switch self.mode {
  165. case .notConfigured:
  166. preconditionFailure()
  167. case .legacy:
  168. self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
  169. case let .handler(handler):
  170. handler.receiveMetadata(headers)
  171. }
  172. case let .forwardHeadersAndRead(headers):
  173. switch self.mode {
  174. case .notConfigured:
  175. preconditionFailure()
  176. case .legacy:
  177. self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
  178. case let .handler(handler):
  179. handler.receiveMetadata(headers)
  180. }
  181. self.tryReadingMessage()
  182. }
  183. }
  184. /// Try to read a request message from the buffer.
  185. private func tryReadingMessage() {
  186. let action = self.state.readNextRequest()
  187. switch action {
  188. case .none:
  189. ()
  190. case let .forwardMessage(buffer):
  191. switch self.mode {
  192. case .notConfigured:
  193. preconditionFailure()
  194. case .legacy:
  195. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  196. case let .handler(handler):
  197. handler.receiveMessage(buffer)
  198. }
  199. case let .forwardMessageAndEnd(buffer):
  200. switch self.mode {
  201. case .notConfigured:
  202. preconditionFailure()
  203. case .legacy:
  204. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  205. self.context.fireChannelRead(self.wrapInboundOut(.end))
  206. case let .handler(handler):
  207. handler.receiveMessage(buffer)
  208. handler.receiveEnd()
  209. }
  210. case let .forwardMessageThenReadNextMessage(buffer):
  211. switch self.mode {
  212. case .notConfigured:
  213. preconditionFailure()
  214. case .legacy:
  215. self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  216. case let .handler(handler):
  217. handler.receiveMessage(buffer)
  218. }
  219. self.tryReadingMessage()
  220. case .forwardEnd:
  221. switch self.mode {
  222. case .notConfigured:
  223. preconditionFailure()
  224. case .legacy:
  225. self.context.fireChannelRead(self.wrapInboundOut(.end))
  226. case let .handler(handler):
  227. handler.receiveEnd()
  228. }
  229. case let .errorCaught(error):
  230. switch self.mode {
  231. case .notConfigured:
  232. preconditionFailure()
  233. case .legacy:
  234. self.context.fireErrorCaught(error)
  235. case let .handler(handler):
  236. handler.receiveError(error)
  237. }
  238. }
  239. }
  240. internal func sendMetadata(
  241. _ headers: HPACKHeaders,
  242. flush: Bool,
  243. promise: EventLoopPromise<Void>?
  244. ) {
  245. switch self.state.send(headers: headers) {
  246. case let .success(headers):
  247. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  248. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  249. if flush {
  250. self.markFlushPoint()
  251. }
  252. case let .failure(error):
  253. promise?.fail(error)
  254. }
  255. }
  256. internal func sendMessage(
  257. _ buffer: ByteBuffer,
  258. metadata: MessageMetadata,
  259. promise: EventLoopPromise<Void>?
  260. ) {
  261. let writeBuffer = self.state.send(
  262. buffer: buffer,
  263. allocator: self.context.channel.allocator,
  264. compress: metadata.compress
  265. )
  266. switch writeBuffer {
  267. case let .success(buffer):
  268. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  269. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  270. if metadata.flush {
  271. self.markFlushPoint()
  272. }
  273. case let .failure(error):
  274. promise?.fail(error)
  275. }
  276. }
  277. internal func sendEnd(
  278. status: GRPCStatus,
  279. trailers: HPACKHeaders,
  280. promise: EventLoopPromise<Void>?
  281. ) {
  282. switch self.state.send(status: status, trailers: trailers) {
  283. case let .success(trailers):
  284. // Always end stream for status and trailers.
  285. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  286. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  287. // We'll always flush on end.
  288. self.markFlushPoint()
  289. case let .failure(error):
  290. promise?.fail(error)
  291. }
  292. }
  293. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  294. /// or emit a flush now if we are not.
  295. private func markFlushPoint() {
  296. if self.isReading {
  297. self.flushPending = true
  298. } else {
  299. self.flushPending = false
  300. self.context.flush()
  301. }
  302. }
  303. }