HTTP1ToRawGRPCServerCodec.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import Foundation
  2. import NIO
  3. import NIOHTTP1
  4. import NIOFoundationCompat
  5. /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
  6. public enum RawGRPCServerRequestPart {
  7. case head(HTTPRequestHead)
  8. case message(ByteBuffer)
  9. case end
  10. }
  11. /// Outgoing gRPC package with an unknown message type (represented by a byte buffer).
  12. public enum RawGRPCServerResponsePart {
  13. case headers(HTTPHeaders)
  14. case message(ByteBuffer)
  15. case status(GRPCStatus)
  16. }
  17. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  18. ///
  19. /// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operationg the protocol
  20. /// on a "higher" level.
  21. ///
  22. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  23. /// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1
  24. /// (sometimes also called pPRC) easier in the future.
  25. ///
  26. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  27. public final class HTTP1ToRawGRPCServerCodec {
  28. /// Expected content types for incoming requests.
  29. private enum ContentType: String {
  30. /// Binary encoded gRPC request.
  31. case binary = "application/grpc"
  32. /// Base64 encoded gRPC-Web request.
  33. case text = "application/grpc-web-text"
  34. /// Binary encoded gRPC-Web request.
  35. case web = "application/grpc-web"
  36. }
  37. private enum State {
  38. case expectingHeaders
  39. case expectingCompressedFlag
  40. case expectingMessageLength
  41. case receivedMessageLength(UInt32)
  42. var expectingBody: Bool {
  43. switch self {
  44. case .expectingHeaders: return false
  45. case .expectingCompressedFlag, .expectingMessageLength, .receivedMessageLength: return true
  46. }
  47. }
  48. }
  49. private var state = State.expectingHeaders
  50. private var contentType: ContentType?
  51. // The following buffers use force unwrapping explicitly. With optionals, developers
  52. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  53. // with structs, since the guard-else would create a new copy of the struct, which
  54. // would then have to be re-assigned into the class variable for the changes to take effect.
  55. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  56. // Buffer to store binary encoded protos as they're being received.
  57. private var binaryRequestBuffer: NIO.ByteBuffer!
  58. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  59. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  60. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  61. private var requestTextBuffer: NIO.ByteBuffer!
  62. private var responseTextBuffer: NIO.ByteBuffer!
  63. }
  64. extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
  65. public typealias InboundIn = HTTPServerRequestPart
  66. public typealias InboundOut = RawGRPCServerRequestPart
  67. public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  68. switch self.unwrapInboundIn(data) {
  69. case .head(let requestHead):
  70. guard case .expectingHeaders = state
  71. else { preconditionFailure("received headers while in state \(state)") }
  72. state = .expectingCompressedFlag
  73. binaryRequestBuffer = ctx.channel.allocator.buffer(capacity: 5)
  74. if let contentTypeHeader = requestHead.headers["content-type"].first {
  75. contentType = ContentType(rawValue: contentTypeHeader)
  76. } else {
  77. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  78. contentType = .binary
  79. }
  80. if contentType == .text {
  81. requestTextBuffer = ctx.channel.allocator.buffer(capacity: 0)
  82. }
  83. ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  84. case .body(var body):
  85. precondition(binaryRequestBuffer != nil, "buffer not initialized")
  86. assert(state.expectingBody, "received body while in state \(state)")
  87. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  88. // it to the binary buffer. If the request is chunked, this section will process the text
  89. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  90. // where it will expect a new incoming chunk.
  91. if contentType == .text {
  92. precondition(requestTextBuffer != nil)
  93. requestTextBuffer.write(buffer: &body)
  94. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  95. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  96. guard let base64Encoded = requestTextBuffer.readString(length:readyBytes),
  97. let decodedData = Data(base64Encoded: base64Encoded) else {
  98. //! FIXME: Improve error handling when the message couldn't be decoded as base64.
  99. ctx.close(mode: .all, promise: nil)
  100. return
  101. }
  102. binaryRequestBuffer.write(bytes: decodedData)
  103. } else {
  104. binaryRequestBuffer.write(buffer: &body)
  105. }
  106. // Iterate over all available incoming data, trying to read length-delimited messages.
  107. // Each message has the following format:
  108. // - 1 byte "compressed" flag (currently always zero, as we do not support for compression)
  109. // - 4 byte signed-integer payload length (N)
  110. // - N bytes payload (normally a valid wire-format protocol buffer)
  111. requestProcessing: while true {
  112. switch state {
  113. case .expectingHeaders: preconditionFailure("unexpected state \(state)")
  114. case .expectingCompressedFlag:
  115. guard let compressionFlag: Int8 = binaryRequestBuffer.readInteger() else { break requestProcessing }
  116. //! FIXME: Avoid crashing here and instead drop the connection.
  117. precondition(compressionFlag == 0, "unexpected compression flag \(compressionFlag); compression is not supported and we did not indicate support for it")
  118. state = .expectingMessageLength
  119. case .expectingMessageLength:
  120. guard let messageLength: UInt32 = binaryRequestBuffer.readInteger() else { break requestProcessing }
  121. state = .receivedMessageLength(messageLength)
  122. case .receivedMessageLength(let messageLength):
  123. guard let messageBytes = binaryRequestBuffer.readBytes(length: numericCast(messageLength)) else { break }
  124. //! FIXME: Use a slice of this buffer instead of copying to a new buffer.
  125. var messageBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.count)
  126. messageBuffer.write(bytes: messageBytes)
  127. ctx.fireChannelRead(self.wrapInboundOut(.message(messageBuffer)))
  128. //! FIXME: Call buffer.discardReadBytes() here?
  129. //! ALTERNATIVE: Check if the buffer has no further data right now, then clear it.
  130. state = .expectingCompressedFlag
  131. }
  132. }
  133. case .end(let trailers):
  134. if let trailers = trailers {
  135. //! FIXME: Better handle this error.
  136. print("unexpected trailers received: \(trailers)")
  137. return
  138. }
  139. ctx.fireChannelRead(self.wrapInboundOut(.end))
  140. }
  141. }
  142. }
  143. extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
  144. public typealias OutboundIn = RawGRPCServerResponsePart
  145. public typealias OutboundOut = HTTPServerResponsePart
  146. public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  147. let responsePart = self.unwrapOutboundIn(data)
  148. switch responsePart {
  149. case .headers:
  150. var headers = HTTPHeaders()
  151. var version = HTTPVersion(major: 2, minor: 0)
  152. if let contentType = contentType {
  153. headers.add(name: "content-type", value: contentType.rawValue)
  154. if contentType != .binary {
  155. version = .init(major: 1, minor: 1)
  156. }
  157. }
  158. if contentType == .text {
  159. responseTextBuffer = ctx.channel.allocator.buffer(capacity: 0)
  160. }
  161. ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  162. case .message(var messageBytes):
  163. // Write out a length-delimited message payload. See `channelRead` fpor the corresponding format.
  164. var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.readableBytes + 5)
  165. responseBuffer.write(integer: Int8(0)) // Compression flag: no compression
  166. responseBuffer.write(integer: UInt32(messageBytes.readableBytes))
  167. responseBuffer.write(buffer: &messageBytes)
  168. if contentType == .text {
  169. precondition(responseTextBuffer != nil)
  170. // Store the response into an independent buffer. We can't return the message directly as
  171. // it needs to be aggregated with all the responses plus the trailers, in order to have
  172. // the base64 response properly encoded in a single byte stream.
  173. responseTextBuffer!.write(buffer: &responseBuffer)
  174. // Since we stored the written data, mark the write promise as successful so that the
  175. // ServerStreaming provider continues sending the data.
  176. promise?.succeed(result: Void())
  177. } else {
  178. ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
  179. }
  180. case .status(let status):
  181. var trailers = status.trailingMetadata
  182. trailers.add(name: "grpc-status", value: String(describing: status.code.rawValue))
  183. trailers.add(name: "grpc-message", value: status.message)
  184. if contentType == .text {
  185. precondition(responseTextBuffer != nil)
  186. // Encode the trailers into the response byte stream as a length delimited message, as per
  187. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  188. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  189. responseTextBuffer.write(integer: UInt8(0x80))
  190. responseTextBuffer.write(integer: UInt32(textTrailers.utf8.count))
  191. responseTextBuffer.write(string: textTrailers)
  192. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  193. // base64. Investigate whether this might have any effect on the transport mechanism and
  194. // client decoding. Initial results say that they are inocuous, but we might have to keep
  195. // an eye on this in case something trips up.
  196. if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
  197. let encodedData = binaryData.base64EncodedString()
  198. responseTextBuffer.clear()
  199. responseTextBuffer.reserveCapacity(encodedData.utf8.count)
  200. responseTextBuffer.write(string: encodedData)
  201. }
  202. // After collecting all response for gRPC Web connections, send one final aggregated
  203. // response.
  204. ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  205. ctx.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  206. } else {
  207. ctx.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  208. }
  209. }
  210. }
  211. }