HTTP1ToRawGRPCServerCodec.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOFoundationCompat
  20. import Logging
  21. /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
  22. public enum RawGRPCServerRequestPart {
  23. case head(HTTPRequestHead)
  24. case message(ByteBuffer)
  25. case end
  26. }
  27. /// Outgoing gRPC package with an unknown message type (represented by `Data`).
  28. public enum RawGRPCServerResponsePart {
  29. case headers(HTTPHeaders)
  30. case message(Data)
  31. case statusAndTrailers(GRPCStatus, HTTPHeaders)
  32. }
  33. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  34. ///
  35. /// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operationg the protocol
  36. /// on a "higher" level.
  37. ///
  38. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  39. /// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1
  40. /// (sometimes also called pPRC) easier in the future.
  41. ///
  42. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  43. public final class HTTP1ToRawGRPCServerCodec {
  44. public init(logger: Logger) {
  45. self.logger = logger.addingMetadata(key: MetadataKey.channelHandler, value: "HTTP1ToRawGRPCServerCodec")
  46. var accessLog = Logger(subsystem: .serverAccess)
  47. accessLog[metadataKey: MetadataKey.requestID] = logger[metadataKey: MetadataKey.requestID]
  48. self.accessLog = accessLog
  49. self.messageReader = LengthPrefixedMessageReader(
  50. mode: .server,
  51. compressionMechanism: .none,
  52. logger: logger
  53. )
  54. }
  55. // 1-byte for compression flag, 4-bytes for message length.
  56. private let protobufMetadataSize = 5
  57. private var contentType: ContentType?
  58. private let logger: Logger
  59. private let accessLog: Logger
  60. private var stopwatch: Stopwatch?
  61. // The following buffers use force unwrapping explicitly. With optionals, developers
  62. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  63. // with structs, since the guard-else would create a new copy of the struct, which
  64. // would then have to be re-assigned into the class variable for the changes to take effect.
  65. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  66. // Buffer to store binary encoded protos as they're being received if the proto is split across
  67. // multiple buffers.
  68. private var binaryRequestBuffer: NIO.ByteBuffer!
  69. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  70. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  71. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  72. private var requestTextBuffer: NIO.ByteBuffer!
  73. private var responseTextBuffer: NIO.ByteBuffer!
  74. var inboundState = InboundState.expectingHeaders {
  75. willSet {
  76. guard newValue != self.inboundState else { return }
  77. self.logger.info("inbound state changed from \(self.inboundState) to \(newValue)")
  78. }
  79. }
  80. var outboundState = OutboundState.expectingHeaders {
  81. willSet {
  82. guard newValue != self.outboundState else { return }
  83. self.logger.info("outbound state changed from \(self.outboundState) to \(newValue)")
  84. }
  85. }
  86. var messageWriter = LengthPrefixedMessageWriter()
  87. var messageReader: LengthPrefixedMessageReader
  88. }
  89. extension HTTP1ToRawGRPCServerCodec {
  90. /// Expected content types for incoming requests.
  91. private enum ContentType: String {
  92. /// Binary encoded gRPC request.
  93. case binary = "application/grpc"
  94. /// Base64 encoded gRPC-Web request.
  95. case text = "application/grpc-web-text"
  96. /// Binary encoded gRPC-Web request.
  97. case web = "application/grpc-web"
  98. }
  99. enum InboundState {
  100. case expectingHeaders
  101. case expectingBody
  102. // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close.
  103. case ignore
  104. }
  105. enum OutboundState {
  106. case expectingHeaders
  107. case expectingBodyOrStatus
  108. case ignore
  109. }
  110. }
  111. extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
  112. public typealias InboundIn = HTTPServerRequestPart
  113. public typealias InboundOut = RawGRPCServerRequestPart
  114. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  115. if case .ignore = inboundState {
  116. self.logger.notice("ignoring read data: \(data)")
  117. return
  118. }
  119. do {
  120. switch self.unwrapInboundIn(data) {
  121. case .head(let requestHead):
  122. inboundState = try processHead(context: context, requestHead: requestHead)
  123. case .body(var body):
  124. inboundState = try processBody(context: context, body: &body)
  125. case .end(let trailers):
  126. inboundState = try processEnd(context: context, trailers: trailers)
  127. }
  128. } catch {
  129. context.fireErrorCaught(error)
  130. inboundState = .ignore
  131. }
  132. }
  133. func processHead(context: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState {
  134. self.logger.debug("processing request head", metadata: ["head": "\(requestHead)"])
  135. guard case .expectingHeaders = inboundState else {
  136. self.logger.error("invalid state '\(inboundState)' while processing request head", metadata: ["head": "\(requestHead)"])
  137. throw GRPCError.server(.invalidState("expecteded state .expectingHeaders, got \(inboundState)"))
  138. }
  139. self.stopwatch = .start()
  140. self.accessLog.info("rpc call started", metadata: [
  141. "path": "\(requestHead.uri)",
  142. "method": "\(requestHead.method)",
  143. "version": "\(requestHead.version)"
  144. ])
  145. if let contentTypeHeader = requestHead.headers["content-type"].first {
  146. self.contentType = ContentType(rawValue: contentTypeHeader)
  147. } else {
  148. self.logger.debug("no 'content-type' header, assuming content type is 'application/grpc'")
  149. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  150. self.contentType = .binary
  151. }
  152. if self.contentType == .text {
  153. requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
  154. }
  155. context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  156. return .expectingBody
  157. }
  158. func processBody(context: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState {
  159. self.logger.debug("processing body: \(body)")
  160. guard case .expectingBody = inboundState else {
  161. self.logger.error("invalid state '\(inboundState)' while processing body", metadata: ["body": "\(body)"])
  162. throw GRPCError.server(.invalidState("expecteded state .expectingBody, got \(inboundState)"))
  163. }
  164. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  165. // it to the binary buffer. If the request is chunked, this section will process the text
  166. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  167. // where it will expect a new incoming chunk.
  168. if self.contentType == .text {
  169. precondition(requestTextBuffer != nil)
  170. requestTextBuffer.writeBuffer(&body)
  171. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  172. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  173. guard let base64Encoded = requestTextBuffer.readString(length: readyBytes),
  174. let decodedData = Data(base64Encoded: base64Encoded) else {
  175. throw GRPCError.server(.base64DecodeError)
  176. }
  177. body.writeBytes(decodedData)
  178. }
  179. self.messageReader.append(buffer: &body)
  180. while let message = try self.messageReader.nextMessage() {
  181. context.fireChannelRead(self.wrapInboundOut(.message(message)))
  182. }
  183. return .expectingBody
  184. }
  185. private func processEnd(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState {
  186. self.logger.debug("processing end")
  187. if let trailers = trailers {
  188. self.logger.error("unexpected trailers when processing stream end", metadata: ["trailers": "\(trailers)"])
  189. throw GRPCError.server(.invalidState("unexpected trailers received \(trailers)"))
  190. }
  191. context.fireChannelRead(self.wrapInboundOut(.end))
  192. return .ignore
  193. }
  194. }
  195. extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
  196. public typealias OutboundIn = RawGRPCServerResponsePart
  197. public typealias OutboundOut = HTTPServerResponsePart
  198. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  199. if case .ignore = self.outboundState {
  200. self.logger.notice("ignoring written data: \(data)")
  201. promise?.fail(GRPCServerError.serverNotWritable)
  202. return
  203. }
  204. switch self.unwrapOutboundIn(data) {
  205. case .headers(var headers):
  206. guard case .expectingHeaders = self.outboundState else {
  207. self.logger.error("invalid state '\(self.outboundState)' while writing headers", metadata: ["headers": "\(headers)"])
  208. return
  209. }
  210. var version = HTTPVersion(major: 2, minor: 0)
  211. if let contentType = self.contentType {
  212. headers.add(name: "content-type", value: contentType.rawValue)
  213. if contentType != .binary {
  214. version = .init(major: 1, minor: 1)
  215. }
  216. }
  217. if self.contentType == .text {
  218. responseTextBuffer = context.channel.allocator.buffer(capacity: 0)
  219. }
  220. context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  221. self.outboundState = .expectingBodyOrStatus
  222. case .message(let messageBytes):
  223. guard case .expectingBodyOrStatus = self.outboundState else {
  224. self.logger.error("invalid state '\(self.outboundState)' while writing message", metadata: ["message": "\(messageBytes)"])
  225. return
  226. }
  227. if contentType == .text {
  228. precondition(self.responseTextBuffer != nil)
  229. // Store the response into an independent buffer. We can't return the message directly as
  230. // it needs to be aggregated with all the responses plus the trailers, in order to have
  231. // the base64 response properly encoded in a single byte stream.
  232. messageWriter.write(messageBytes, into: &self.responseTextBuffer, usingCompression: .none)
  233. // Since we stored the written data, mark the write promise as successful so that the
  234. // ServerStreaming provider continues sending the data.
  235. promise?.succeed(())
  236. } else {
  237. var responseBuffer = context.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength)
  238. messageWriter.write(messageBytes, into: &responseBuffer, usingCompression: .none)
  239. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
  240. }
  241. self.outboundState = .expectingBodyOrStatus
  242. case let .statusAndTrailers(status, trailers):
  243. // If we error before sending the initial headers (e.g. unimplemented method) then we won't have sent the request head.
  244. // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still need to loop back and
  245. // send the request head first.
  246. if case .expectingHeaders = self.outboundState {
  247. self.write(context: context, data: NIOAny(RawGRPCServerResponsePart.headers(HTTPHeaders())), promise: nil)
  248. }
  249. var trailers = trailers
  250. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  251. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  252. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  253. }
  254. if contentType == .text {
  255. precondition(responseTextBuffer != nil)
  256. // Encode the trailers into the response byte stream as a length delimited message, as per
  257. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  258. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  259. responseTextBuffer.writeInteger(UInt8(0x80))
  260. responseTextBuffer.writeInteger(UInt32(textTrailers.utf8.count))
  261. responseTextBuffer.writeString(textTrailers)
  262. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  263. // base64. Investigate whether this might have any effect on the transport mechanism and
  264. // client decoding. Initial results say that they are inocuous, but we might have to keep
  265. // an eye on this in case something trips up.
  266. if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
  267. let encodedData = binaryData.base64EncodedString()
  268. responseTextBuffer.clear()
  269. responseTextBuffer.reserveCapacity(encodedData.utf8.count)
  270. responseTextBuffer.writeString(encodedData)
  271. }
  272. // After collecting all response for gRPC Web connections, send one final aggregated
  273. // response.
  274. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  275. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  276. } else {
  277. context.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  278. }
  279. // Log the call duration and status
  280. if let stopwatch = self.stopwatch {
  281. self.stopwatch = nil
  282. let millis = stopwatch.elapsedMillis()
  283. self.accessLog.info("rpc call finished", metadata: [
  284. "duration_ms": "\(millis)",
  285. "status_code": "\(status.code.rawValue)"
  286. ])
  287. }
  288. self.outboundState = .ignore
  289. self.inboundState = .ignore
  290. }
  291. }
  292. }