HTTP2ToRawGRPCServerCodec.swift 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias OutboundOut = HTTP2Frame.FramePayload
  23. private var logger: Logger
  24. private var state: HTTP2ToRawGRPCStateMachine
  25. private let errorDelegate: ServerErrorDelegate?
  26. private var context: ChannelHandlerContext!
  27. /// The mode we're operating in.
  28. private var mode: Mode = .notConfigured
  29. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  30. /// burst of reading has completed.
  31. private var isReading = false
  32. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  33. /// then it is held until the read completes in order to elide unnecessary flushes.
  34. private var flushPending = false
  35. private enum Mode {
  36. case notConfigured
  37. case handler(GRPCServerHandlerProtocol)
  38. }
  39. init(
  40. servicesByName: [Substring: CallHandlerProvider],
  41. encoding: ServerMessageEncoding,
  42. errorDelegate: ServerErrorDelegate?,
  43. normalizeHeaders: Bool,
  44. logger: Logger
  45. ) {
  46. self.logger = logger
  47. self.errorDelegate = errorDelegate
  48. self.state = HTTP2ToRawGRPCStateMachine(
  49. services: servicesByName,
  50. encoding: encoding,
  51. normalizeHeaders: normalizeHeaders
  52. )
  53. }
  54. internal func handlerAdded(context: ChannelHandlerContext) {
  55. self.context = context
  56. }
  57. internal func handlerRemoved(context: ChannelHandlerContext) {
  58. self.context = nil
  59. self.mode = .notConfigured
  60. }
  61. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  62. switch self.mode {
  63. case .notConfigured:
  64. context.close(mode: .all, promise: nil)
  65. case let .handler(hander):
  66. hander.receiveError(error)
  67. }
  68. }
  69. internal func channelInactive(context: ChannelHandlerContext) {
  70. switch self.mode {
  71. case .notConfigured:
  72. context.fireChannelInactive()
  73. case let .handler(handler):
  74. handler.finish()
  75. }
  76. }
  77. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  78. self.isReading = true
  79. let payload = self.unwrapInboundIn(data)
  80. switch payload {
  81. case let .headers(payload):
  82. let receiveHeaders = self.state.receive(
  83. headers: payload.headers,
  84. eventLoop: context.eventLoop,
  85. errorDelegate: self.errorDelegate,
  86. remoteAddress: context.channel.remoteAddress,
  87. logger: self.logger,
  88. allocator: context.channel.allocator,
  89. responseWriter: self,
  90. closeFuture: context.channel.closeFuture
  91. )
  92. switch receiveHeaders {
  93. case let .configure(handler):
  94. self.mode = .handler(handler)
  95. self.configured()
  96. case let .rejectRPC(trailers):
  97. // We're not handling this request: write headers and end stream.
  98. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  99. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  100. }
  101. case let .data(payload):
  102. switch payload.data {
  103. case var .byteBuffer(buffer):
  104. let tryToRead = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  105. if tryToRead {
  106. self.tryReadingMessage()
  107. }
  108. case .fileRegion:
  109. preconditionFailure("Unexpected IOData.fileRegion")
  110. }
  111. // Ignored.
  112. case .alternativeService,
  113. .goAway,
  114. .origin,
  115. .ping,
  116. .priority,
  117. .pushPromise,
  118. .rstStream,
  119. .settings,
  120. .windowUpdate:
  121. ()
  122. }
  123. }
  124. internal func channelReadComplete(context: ChannelHandlerContext) {
  125. self.isReading = false
  126. if self.flushPending {
  127. self.flushPending = false
  128. context.flush()
  129. }
  130. context.fireChannelReadComplete()
  131. }
  132. /// Called when the pipeline has finished configuring.
  133. private func configured() {
  134. switch self.state.pipelineConfigured() {
  135. case let .forwardHeaders(headers):
  136. switch self.mode {
  137. case .notConfigured:
  138. preconditionFailure()
  139. case let .handler(handler):
  140. handler.receiveMetadata(headers)
  141. }
  142. case let .forwardHeadersAndRead(headers):
  143. switch self.mode {
  144. case .notConfigured:
  145. preconditionFailure()
  146. case let .handler(handler):
  147. handler.receiveMetadata(headers)
  148. }
  149. self.tryReadingMessage()
  150. }
  151. }
  152. /// Try to read a request message from the buffer.
  153. private func tryReadingMessage() {
  154. let action = self.state.readNextRequest()
  155. switch action {
  156. case .none:
  157. ()
  158. case let .forwardMessage(buffer):
  159. switch self.mode {
  160. case .notConfigured:
  161. preconditionFailure()
  162. case let .handler(handler):
  163. handler.receiveMessage(buffer)
  164. }
  165. case let .forwardMessageAndEnd(buffer):
  166. switch self.mode {
  167. case .notConfigured:
  168. preconditionFailure()
  169. case let .handler(handler):
  170. handler.receiveMessage(buffer)
  171. handler.receiveEnd()
  172. }
  173. case let .forwardMessageThenReadNextMessage(buffer):
  174. switch self.mode {
  175. case .notConfigured:
  176. preconditionFailure()
  177. case let .handler(handler):
  178. handler.receiveMessage(buffer)
  179. }
  180. self.tryReadingMessage()
  181. case .forwardEnd:
  182. switch self.mode {
  183. case .notConfigured:
  184. preconditionFailure()
  185. case let .handler(handler):
  186. handler.receiveEnd()
  187. }
  188. case let .errorCaught(error):
  189. switch self.mode {
  190. case .notConfigured:
  191. preconditionFailure()
  192. case let .handler(handler):
  193. handler.receiveError(error)
  194. }
  195. }
  196. }
  197. internal func sendMetadata(
  198. _ headers: HPACKHeaders,
  199. flush: Bool,
  200. promise: EventLoopPromise<Void>?
  201. ) {
  202. switch self.state.send(headers: headers) {
  203. case let .success(headers):
  204. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  205. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  206. if flush {
  207. self.markFlushPoint()
  208. }
  209. case let .failure(error):
  210. promise?.fail(error)
  211. }
  212. }
  213. internal func sendMessage(
  214. _ buffer: ByteBuffer,
  215. metadata: MessageMetadata,
  216. promise: EventLoopPromise<Void>?
  217. ) {
  218. let writeBuffer = self.state.send(
  219. buffer: buffer,
  220. allocator: self.context.channel.allocator,
  221. compress: metadata.compress
  222. )
  223. switch writeBuffer {
  224. case let .success(buffer):
  225. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  226. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  227. if metadata.flush {
  228. self.markFlushPoint()
  229. }
  230. case let .failure(error):
  231. promise?.fail(error)
  232. }
  233. }
  234. internal func sendEnd(
  235. status: GRPCStatus,
  236. trailers: HPACKHeaders,
  237. promise: EventLoopPromise<Void>?
  238. ) {
  239. switch self.state.send(status: status, trailers: trailers) {
  240. case let .success(trailers):
  241. // Always end stream for status and trailers.
  242. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  243. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  244. // We'll always flush on end.
  245. self.markFlushPoint()
  246. case let .failure(error):
  247. promise?.fail(error)
  248. }
  249. }
  250. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  251. /// or emit a flush now if we are not.
  252. private func markFlushPoint() {
  253. if self.isReading {
  254. self.flushPending = true
  255. } else {
  256. self.flushPending = false
  257. self.context.flush()
  258. }
  259. }
  260. }