HTTP1ToRawGRPCServerCodec.swift 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import Foundation
  2. import NIO
  3. import NIOHTTP1
  4. /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
  5. public enum RawGRPCServerRequestPart {
  6. case head(HTTPRequestHead)
  7. case message(ByteBuffer)
  8. case end
  9. }
  10. /// Outgoing gRPC package with an unknown message type (represented by a byte buffer).
  11. public enum RawGRPCServerResponsePart {
  12. case headers(HTTPHeaders)
  13. case message(ByteBuffer)
  14. case status(GRPCStatus)
  15. }
  16. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  17. ///
  18. /// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operationg the protocol
  19. /// on a "higher" level.
  20. ///
  21. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  22. /// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1
  23. /// (sometimes also called pPRC) easier in the future.
  24. ///
  25. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  26. public final class HTTP1ToRawGRPCServerCodec {
  27. private enum State {
  28. case expectingHeaders
  29. case expectingCompressedFlag
  30. case expectingMessageLength
  31. case receivedMessageLength(UInt32)
  32. var expectingBody: Bool {
  33. switch self {
  34. case .expectingHeaders: return false
  35. case .expectingCompressedFlag, .expectingMessageLength, .receivedMessageLength: return true
  36. }
  37. }
  38. }
  39. private var state = State.expectingHeaders
  40. private var buffer: NIO.ByteBuffer?
  41. }
  42. extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
  43. public typealias InboundIn = HTTPServerRequestPart
  44. public typealias InboundOut = RawGRPCServerRequestPart
  45. public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  46. switch self.unwrapInboundIn(data) {
  47. case .head(let requestHead):
  48. guard case .expectingHeaders = state
  49. else { preconditionFailure("received headers while in state \(state)") }
  50. state = .expectingCompressedFlag
  51. buffer = ctx.channel.allocator.buffer(capacity: 5)
  52. ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  53. case .body(var body):
  54. guard var buffer = buffer
  55. else { preconditionFailure("buffer not initialized") }
  56. assert(state.expectingBody, "received body while in state \(state)")
  57. buffer.write(buffer: &body)
  58. // Iterate over all available incoming data, trying to read length-delimited messages.
  59. // Each message has the following format:
  60. // - 1 byte "compressed" flag (currently always zero, as we do not support for compression)
  61. // - 4 byte signed-integer payload length (N)
  62. // - N bytes payload (normally a valid wire-format protocol buffer)
  63. requestProcessing: while true {
  64. switch state {
  65. case .expectingHeaders: preconditionFailure("unexpected state \(state)")
  66. case .expectingCompressedFlag:
  67. guard let compressionFlag: Int8 = buffer.readInteger() else { break requestProcessing }
  68. //! FIXME: Avoid crashing here and instead drop the connection.
  69. precondition(compressionFlag == 0, "unexpected compression flag \(compressionFlag); compression is not supported and we did not indicate support for it")
  70. state = .expectingMessageLength
  71. case .expectingMessageLength:
  72. guard let messageLength: UInt32 = buffer.readInteger() else { break requestProcessing }
  73. state = .receivedMessageLength(messageLength)
  74. case .receivedMessageLength(let messageLength):
  75. guard let messageBytes = buffer.readBytes(length: numericCast(messageLength)) else { break }
  76. //! FIXME: Use a slice of this buffer instead of copying to a new buffer.
  77. var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.count)
  78. responseBuffer.write(bytes: messageBytes)
  79. ctx.fireChannelRead(self.wrapInboundOut(.message(responseBuffer)))
  80. //! FIXME: Call buffer.discardReadBytes() here?
  81. //! ALTERNATIVE: Check if the buffer has no further data right now, then clear it.
  82. state = .expectingCompressedFlag
  83. }
  84. }
  85. case .end(let trailers):
  86. if let trailers = trailers {
  87. //! FIXME: Better handle this error.
  88. print("unexpected trailers received: \(trailers)")
  89. return
  90. }
  91. ctx.fireChannelRead(self.wrapInboundOut(.end))
  92. }
  93. }
  94. }
  95. extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
  96. public typealias OutboundIn = RawGRPCServerResponsePart
  97. public typealias OutboundOut = HTTPServerResponsePart
  98. public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  99. let responsePart = self.unwrapOutboundIn(data)
  100. switch responsePart {
  101. case .headers(let headers):
  102. //! FIXME: Should return a different version if we want to support pPRC.
  103. ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: .init(major: 2, minor: 0), status: .ok, headers: headers))), promise: promise)
  104. case .message(var messageBytes):
  105. // Write out a length-delimited message payload. See `channelRead` fpor the corresponding format.
  106. var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.readableBytes + 5)
  107. responseBuffer.write(integer: Int8(0)) // Compression flag: no compression
  108. responseBuffer.write(integer: UInt32(messageBytes.readableBytes))
  109. responseBuffer.write(buffer: &messageBytes)
  110. ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
  111. case .status(let status):
  112. var trailers = status.trailingMetadata
  113. trailers.add(name: "grpc-status", value: String(describing: status.code.rawValue))
  114. trailers.add(name: "grpc-message", value: status.message)
  115. ctx.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  116. }
  117. }
  118. }