HTTP2ToRawGRPCServerCodec.swift 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias OutboundOut = HTTP2Frame.FramePayload
  23. private var logger: Logger
  24. private var state: HTTP2ToRawGRPCStateMachine
  25. private let errorDelegate: ServerErrorDelegate?
  26. private var context: ChannelHandlerContext!
  27. /// The mode we're operating in.
  28. private var mode: Mode = .notConfigured
  29. /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
  30. /// burst of reading has completed.
  31. private var isReading = false
  32. /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
  33. /// then it is held until the read completes in order to elide unnecessary flushes.
  34. private var flushPending = false
  35. private enum Mode {
  36. case notConfigured
  37. case handler(GRPCServerHandlerProtocol)
  38. }
  39. init(
  40. servicesByName: [Substring: CallHandlerProvider],
  41. encoding: ServerMessageEncoding,
  42. errorDelegate: ServerErrorDelegate?,
  43. normalizeHeaders: Bool,
  44. logger: Logger
  45. ) {
  46. self.logger = logger
  47. self.errorDelegate = errorDelegate
  48. self.state = HTTP2ToRawGRPCStateMachine(
  49. services: servicesByName,
  50. encoding: encoding,
  51. normalizeHeaders: normalizeHeaders
  52. )
  53. }
  54. internal func handlerAdded(context: ChannelHandlerContext) {
  55. self.context = context
  56. }
  57. internal func handlerRemoved(context: ChannelHandlerContext) {
  58. self.context = nil
  59. self.mode = .notConfigured
  60. }
  61. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  62. switch self.mode {
  63. case .notConfigured:
  64. context.close(mode: .all, promise: nil)
  65. case let .handler(hander):
  66. hander.receiveError(error)
  67. }
  68. }
  69. internal func channelInactive(context: ChannelHandlerContext) {
  70. switch self.mode {
  71. case .notConfigured:
  72. context.fireChannelInactive()
  73. case let .handler(handler):
  74. handler.finish()
  75. }
  76. }
  77. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  78. self.isReading = true
  79. let payload = self.unwrapInboundIn(data)
  80. switch payload {
  81. case let .headers(payload):
  82. let receiveHeaders = self.state.receive(
  83. headers: payload.headers,
  84. eventLoop: context.eventLoop,
  85. errorDelegate: self.errorDelegate,
  86. remoteAddress: context.channel.remoteAddress,
  87. logger: self.logger,
  88. allocator: context.channel.allocator,
  89. responseWriter: self
  90. )
  91. switch receiveHeaders {
  92. case let .configure(handler):
  93. self.mode = .handler(handler)
  94. self.configured()
  95. case let .rejectRPC(trailers):
  96. // We're not handling this request: write headers and end stream.
  97. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  98. context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
  99. }
  100. case let .data(payload):
  101. switch payload.data {
  102. case var .byteBuffer(buffer):
  103. let tryToRead = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  104. if tryToRead {
  105. self.tryReadingMessage()
  106. }
  107. case .fileRegion:
  108. preconditionFailure("Unexpected IOData.fileRegion")
  109. }
  110. // Ignored.
  111. case .alternativeService,
  112. .goAway,
  113. .origin,
  114. .ping,
  115. .priority,
  116. .pushPromise,
  117. .rstStream,
  118. .settings,
  119. .windowUpdate:
  120. ()
  121. }
  122. }
  123. internal func channelReadComplete(context: ChannelHandlerContext) {
  124. self.isReading = false
  125. if self.flushPending {
  126. self.flushPending = false
  127. context.flush()
  128. }
  129. context.fireChannelReadComplete()
  130. }
  131. /// Called when the pipeline has finished configuring.
  132. private func configured() {
  133. switch self.state.pipelineConfigured() {
  134. case let .forwardHeaders(headers):
  135. switch self.mode {
  136. case .notConfigured:
  137. preconditionFailure()
  138. case let .handler(handler):
  139. handler.receiveMetadata(headers)
  140. }
  141. case let .forwardHeadersAndRead(headers):
  142. switch self.mode {
  143. case .notConfigured:
  144. preconditionFailure()
  145. case let .handler(handler):
  146. handler.receiveMetadata(headers)
  147. }
  148. self.tryReadingMessage()
  149. }
  150. }
  151. /// Try to read a request message from the buffer.
  152. private func tryReadingMessage() {
  153. let action = self.state.readNextRequest()
  154. switch action {
  155. case .none:
  156. ()
  157. case let .forwardMessage(buffer):
  158. switch self.mode {
  159. case .notConfigured:
  160. preconditionFailure()
  161. case let .handler(handler):
  162. handler.receiveMessage(buffer)
  163. }
  164. case let .forwardMessageAndEnd(buffer):
  165. switch self.mode {
  166. case .notConfigured:
  167. preconditionFailure()
  168. case let .handler(handler):
  169. handler.receiveMessage(buffer)
  170. handler.receiveEnd()
  171. }
  172. case let .forwardMessageThenReadNextMessage(buffer):
  173. switch self.mode {
  174. case .notConfigured:
  175. preconditionFailure()
  176. case let .handler(handler):
  177. handler.receiveMessage(buffer)
  178. }
  179. self.tryReadingMessage()
  180. case .forwardEnd:
  181. switch self.mode {
  182. case .notConfigured:
  183. preconditionFailure()
  184. case let .handler(handler):
  185. handler.receiveEnd()
  186. }
  187. case let .errorCaught(error):
  188. switch self.mode {
  189. case .notConfigured:
  190. preconditionFailure()
  191. case let .handler(handler):
  192. handler.receiveError(error)
  193. }
  194. }
  195. }
  196. internal func sendMetadata(
  197. _ headers: HPACKHeaders,
  198. flush: Bool,
  199. promise: EventLoopPromise<Void>?
  200. ) {
  201. switch self.state.send(headers: headers) {
  202. case let .success(headers):
  203. let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  204. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  205. if flush {
  206. self.markFlushPoint()
  207. }
  208. case let .failure(error):
  209. promise?.fail(error)
  210. }
  211. }
  212. internal func sendMessage(
  213. _ buffer: ByteBuffer,
  214. metadata: MessageMetadata,
  215. promise: EventLoopPromise<Void>?
  216. ) {
  217. let writeBuffer = self.state.send(
  218. buffer: buffer,
  219. allocator: self.context.channel.allocator,
  220. compress: metadata.compress
  221. )
  222. switch writeBuffer {
  223. case let .success(buffer):
  224. let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  225. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  226. if metadata.flush {
  227. self.markFlushPoint()
  228. }
  229. case let .failure(error):
  230. promise?.fail(error)
  231. }
  232. }
  233. internal func sendEnd(
  234. status: GRPCStatus,
  235. trailers: HPACKHeaders,
  236. promise: EventLoopPromise<Void>?
  237. ) {
  238. switch self.state.send(status: status, trailers: trailers) {
  239. case let .success(trailers):
  240. // Always end stream for status and trailers.
  241. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
  242. self.context.write(self.wrapOutboundOut(payload), promise: promise)
  243. // We'll always flush on end.
  244. self.markFlushPoint()
  245. case let .failure(error):
  246. promise?.fail(error)
  247. }
  248. }
  249. /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
  250. /// or emit a flush now if we are not.
  251. private func markFlushPoint() {
  252. if self.isReading {
  253. self.flushPending = true
  254. } else {
  255. self.flushPending = false
  256. self.context.flush()
  257. }
  258. }
  259. }