HTTP1ToRawGRPCServerCodec.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import Foundation
  2. import NIO
  3. import NIOHTTP1
  4. import NIOFoundationCompat
  5. /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
  6. public enum RawGRPCServerRequestPart {
  7. case head(HTTPRequestHead)
  8. case message(ByteBuffer)
  9. case end
  10. }
  11. /// Outgoing gRPC package with an unknown message type (represented by `Data`).
  12. public enum RawGRPCServerResponsePart {
  13. case headers(HTTPHeaders)
  14. case message(Data)
  15. case status(GRPCStatus)
  16. }
  17. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  18. ///
  19. /// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operationg the protocol
  20. /// on a "higher" level.
  21. ///
  22. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  23. /// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1
  24. /// (sometimes also called pPRC) easier in the future.
  25. ///
  26. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  27. public final class HTTP1ToRawGRPCServerCodec {
  28. public init() {}
  29. // 1-byte for compression flag, 4-bytes for message length.
  30. private let protobufMetadataSize = 5
  31. private var contentType: ContentType?
  32. // The following buffers use force unwrapping explicitly. With optionals, developers
  33. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  34. // with structs, since the guard-else would create a new copy of the struct, which
  35. // would then have to be re-assigned into the class variable for the changes to take effect.
  36. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  37. // Buffer to store binary encoded protos as they're being received if the proto is split across
  38. // multiple buffers.
  39. private var binaryRequestBuffer: NIO.ByteBuffer!
  40. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  41. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  42. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  43. private var requestTextBuffer: NIO.ByteBuffer!
  44. private var responseTextBuffer: NIO.ByteBuffer!
  45. var inboundState = InboundState.expectingHeaders
  46. var outboundState = OutboundState.expectingHeaders
  47. var messageWriter = LengthPrefixedMessageWriter()
  48. var messageReader = LengthPrefixedMessageReader(mode: .server, compressionMechanism: .none)
  49. }
  50. extension HTTP1ToRawGRPCServerCodec {
  51. /// Expected content types for incoming requests.
  52. private enum ContentType: String {
  53. /// Binary encoded gRPC request.
  54. case binary = "application/grpc"
  55. /// Base64 encoded gRPC-Web request.
  56. case text = "application/grpc-web-text"
  57. /// Binary encoded gRPC-Web request.
  58. case web = "application/grpc-web"
  59. }
  60. enum InboundState {
  61. case expectingHeaders
  62. case expectingBody
  63. // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close.
  64. case ignore
  65. }
  66. enum OutboundState {
  67. case expectingHeaders
  68. case expectingBodyOrStatus
  69. case ignore
  70. }
  71. }
  72. extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
  73. public typealias InboundIn = HTTPServerRequestPart
  74. public typealias InboundOut = RawGRPCServerRequestPart
  75. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  76. if case .ignore = inboundState { return }
  77. do {
  78. switch self.unwrapInboundIn(data) {
  79. case .head(let requestHead):
  80. inboundState = try processHead(context: context, requestHead: requestHead)
  81. case .body(var body):
  82. inboundState = try processBody(context: context, body: &body)
  83. case .end(let trailers):
  84. inboundState = try processEnd(context: context, trailers: trailers)
  85. }
  86. } catch {
  87. context.fireErrorCaught(error)
  88. inboundState = .ignore
  89. }
  90. }
  91. func processHead(context: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState {
  92. guard case .expectingHeaders = inboundState else {
  93. throw GRPCError.server(.invalidState("expecteded state .expectingHeaders, got \(inboundState)"))
  94. }
  95. if let contentTypeHeader = requestHead.headers["content-type"].first {
  96. contentType = ContentType(rawValue: contentTypeHeader)
  97. } else {
  98. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  99. contentType = .binary
  100. }
  101. if contentType == .text {
  102. requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
  103. }
  104. context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  105. return .expectingBody
  106. }
  107. func processBody(context: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState {
  108. guard case .expectingBody = inboundState else {
  109. throw GRPCError.server(.invalidState("expecteded state .expectingBody, got \(inboundState)"))
  110. }
  111. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  112. // it to the binary buffer. If the request is chunked, this section will process the text
  113. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  114. // where it will expect a new incoming chunk.
  115. if contentType == .text {
  116. precondition(requestTextBuffer != nil)
  117. requestTextBuffer.writeBuffer(&body)
  118. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  119. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  120. guard let base64Encoded = requestTextBuffer.readString(length: readyBytes),
  121. let decodedData = Data(base64Encoded: base64Encoded) else {
  122. throw GRPCError.server(.base64DecodeError)
  123. }
  124. body.writeBytes(decodedData)
  125. }
  126. self.messageReader.append(buffer: &body)
  127. while let message = try self.messageReader.nextMessage() {
  128. context.fireChannelRead(self.wrapInboundOut(.message(message)))
  129. }
  130. return .expectingBody
  131. }
  132. private func processEnd(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState {
  133. if let trailers = trailers {
  134. throw GRPCError.server(.invalidState("unexpected trailers received \(trailers)"))
  135. }
  136. context.fireChannelRead(self.wrapInboundOut(.end))
  137. return .ignore
  138. }
  139. }
  140. extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
  141. public typealias OutboundIn = RawGRPCServerResponsePart
  142. public typealias OutboundOut = HTTPServerResponsePart
  143. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  144. if case .ignore = outboundState { return }
  145. switch self.unwrapOutboundIn(data) {
  146. case .headers(var headers):
  147. guard case .expectingHeaders = outboundState else { return }
  148. var version = HTTPVersion(major: 2, minor: 0)
  149. if let contentType = contentType {
  150. headers.add(name: "content-type", value: contentType.rawValue)
  151. if contentType != .binary {
  152. version = .init(major: 1, minor: 1)
  153. }
  154. }
  155. if contentType == .text {
  156. responseTextBuffer = context.channel.allocator.buffer(capacity: 0)
  157. }
  158. context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  159. outboundState = .expectingBodyOrStatus
  160. case .message(let messageBytes):
  161. guard case .expectingBodyOrStatus = outboundState else { return }
  162. if contentType == .text {
  163. precondition(self.responseTextBuffer != nil)
  164. // Store the response into an independent buffer. We can't return the message directly as
  165. // it needs to be aggregated with all the responses plus the trailers, in order to have
  166. // the base64 response properly encoded in a single byte stream.
  167. messageWriter.write(messageBytes, into: &self.responseTextBuffer, usingCompression: .none)
  168. // Since we stored the written data, mark the write promise as successful so that the
  169. // ServerStreaming provider continues sending the data.
  170. promise?.succeed(())
  171. } else {
  172. var responseBuffer = context.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength)
  173. messageWriter.write(messageBytes, into: &responseBuffer, usingCompression: .none)
  174. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
  175. }
  176. outboundState = .expectingBodyOrStatus
  177. case .status(let status):
  178. // If we error before sending the initial headers (e.g. unimplemented method) then we won't have sent the request head.
  179. // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still need to loop back and
  180. // send the request head first.
  181. if case .expectingHeaders = outboundState {
  182. self.write(context: context, data: NIOAny(RawGRPCServerResponsePart.headers(HTTPHeaders())), promise: nil)
  183. }
  184. var trailers = status.trailingMetadata
  185. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  186. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  187. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  188. }
  189. if contentType == .text {
  190. precondition(responseTextBuffer != nil)
  191. // Encode the trailers into the response byte stream as a length delimited message, as per
  192. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  193. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  194. responseTextBuffer.writeInteger(UInt8(0x80))
  195. responseTextBuffer.writeInteger(UInt32(textTrailers.utf8.count))
  196. responseTextBuffer.writeString(textTrailers)
  197. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  198. // base64. Investigate whether this might have any effect on the transport mechanism and
  199. // client decoding. Initial results say that they are inocuous, but we might have to keep
  200. // an eye on this in case something trips up.
  201. if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
  202. let encodedData = binaryData.base64EncodedString()
  203. responseTextBuffer.clear()
  204. responseTextBuffer.reserveCapacity(encodedData.utf8.count)
  205. responseTextBuffer.writeString(encodedData)
  206. }
  207. // After collecting all response for gRPC Web connections, send one final aggregated
  208. // response.
  209. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  210. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  211. } else {
  212. context.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  213. }
  214. outboundState = .ignore
  215. inboundState = .ignore
  216. }
  217. }
  218. }