HTTP1ToGRPCServerCodec.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOFoundationCompat
  20. import Logging
  21. /// Incoming gRPC package with a fixed message type.
  22. ///
  23. /// - Important: This is **NOT** part of the public API.
  24. public enum _GRPCServerRequestPart<RequestPayload: GRPCPayload> {
  25. case head(HTTPRequestHead)
  26. case message(RequestPayload)
  27. case end
  28. }
  29. /// Outgoing gRPC package with a fixed message type.
  30. ///
  31. /// - Important: This is **NOT** part of the public API.
  32. public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
  33. case headers(HTTPHeaders)
  34. case message(ResponsePayload)
  35. case statusAndTrailers(GRPCStatus, HTTPHeaders)
  36. }
  37. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  38. ///
  39. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  40. /// primitives while providing all the functionality we need. In addition, it allows us to support
  41. /// gRPC-Web (gRPC over HTTP1).
  42. ///
  43. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  44. public final class HTTP1ToGRPCServerCodec<Request: GRPCPayload, Response: GRPCPayload> {
  45. public init(logger: Logger) {
  46. self.logger = logger
  47. var accessLog = Logger(subsystem: .serverAccess)
  48. accessLog[metadataKey: MetadataKey.requestID] = logger[metadataKey: MetadataKey.requestID]
  49. self.accessLog = accessLog
  50. self.messageReader = LengthPrefixedMessageReader()
  51. self.messageWriter = LengthPrefixedMessageWriter()
  52. }
  53. private var contentType: ContentType?
  54. private let logger: Logger
  55. private let accessLog: Logger
  56. private var stopwatch: Stopwatch?
  57. // The following buffers use force unwrapping explicitly. With optionals, developers
  58. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  59. // with structs, since the guard-else would create a new copy of the struct, which
  60. // would then have to be re-assigned into the class variable for the changes to take effect.
  61. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  62. // Buffer to store binary encoded protos as they're being received if the proto is split across
  63. // multiple buffers.
  64. private var binaryRequestBuffer: NIO.ByteBuffer!
  65. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  66. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  67. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  68. private var requestTextBuffer: NIO.ByteBuffer!
  69. private var responseTextBuffer: NIO.ByteBuffer!
  70. var inboundState = InboundState.expectingHeaders {
  71. willSet {
  72. guard newValue != self.inboundState else { return }
  73. self.logger.debug("inbound state changed", metadata: ["old_state": "\(self.inboundState)", "new_state": "\(newValue)"])
  74. }
  75. }
  76. var outboundState = OutboundState.expectingHeaders {
  77. willSet {
  78. guard newValue != self.outboundState else { return }
  79. self.logger.debug("outbound state changed", metadata: ["old_state": "\(self.outboundState)", "new_state": "\(newValue)"])
  80. }
  81. }
  82. var messageReader: LengthPrefixedMessageReader
  83. var messageWriter: LengthPrefixedMessageWriter
  84. }
  85. extension HTTP1ToGRPCServerCodec {
  86. enum InboundState {
  87. case expectingHeaders
  88. case expectingBody
  89. // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close.
  90. case ignore
  91. }
  92. enum OutboundState {
  93. case expectingHeaders
  94. case expectingBodyOrStatus
  95. case ignore
  96. }
  97. }
  98. extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
  99. public typealias InboundIn = HTTPServerRequestPart
  100. public typealias InboundOut = _GRPCServerRequestPart<Request>
  101. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  102. if case .ignore = inboundState {
  103. self.logger.notice("ignoring read data", metadata: ["data": "\(data)"])
  104. return
  105. }
  106. do {
  107. switch self.unwrapInboundIn(data) {
  108. case .head(let requestHead):
  109. inboundState = try processHead(context: context, requestHead: requestHead)
  110. case .body(var body):
  111. inboundState = try processBody(context: context, body: &body)
  112. case .end(let trailers):
  113. inboundState = try processEnd(context: context, trailers: trailers)
  114. }
  115. } catch {
  116. context.fireErrorCaught(error)
  117. inboundState = .ignore
  118. }
  119. }
  120. func processHead(context: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState {
  121. self.logger.debug("processing request head", metadata: ["head": "\(requestHead)"])
  122. guard case .expectingHeaders = inboundState else {
  123. self.logger.error("invalid state while processing request head",
  124. metadata: ["state": "\(inboundState)", "head": "\(requestHead)"])
  125. throw GRPCError.InvalidState("expected state .expectingHeaders, got \(inboundState)").captureContext()
  126. }
  127. self.stopwatch = .start()
  128. self.accessLog.debug("rpc call started", metadata: [
  129. "path": "\(requestHead.uri)",
  130. "method": "\(requestHead.method)",
  131. "version": "\(requestHead.version)"
  132. ])
  133. if let contentType = requestHead.headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) {
  134. self.contentType = contentType
  135. } else {
  136. self.logger.debug("no 'content-type' header, assuming content type is 'application/grpc'")
  137. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  138. self.contentType = .protobuf
  139. }
  140. if self.contentType == .webTextProtobuf {
  141. requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
  142. }
  143. context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  144. return .expectingBody
  145. }
  146. func processBody(context: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState {
  147. self.logger.debug("processing body: \(body)")
  148. guard case .expectingBody = inboundState else {
  149. self.logger.error("invalid state while processing body",
  150. metadata: ["state": "\(inboundState)", "body": "\(body)"])
  151. throw GRPCError.InvalidState("expected state .expectingBody, got \(inboundState)").captureContext()
  152. }
  153. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  154. // it to the binary buffer. If the request is chunked, this section will process the text
  155. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  156. // where it will expect a new incoming chunk.
  157. if self.contentType == .webTextProtobuf {
  158. precondition(requestTextBuffer != nil)
  159. requestTextBuffer.writeBuffer(&body)
  160. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  161. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  162. guard let base64Encoded = requestTextBuffer.readString(length: readyBytes),
  163. let decodedData = Data(base64Encoded: base64Encoded) else {
  164. throw GRPCError.Base64DecodeError().captureContext()
  165. }
  166. body.writeBytes(decodedData)
  167. }
  168. self.messageReader.append(buffer: &body)
  169. var requests: [Request] = []
  170. do {
  171. while var buffer = try self.messageReader.nextMessage() {
  172. requests.append(try Request(serializedByteBuffer: &buffer))
  173. }
  174. } catch {
  175. context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
  176. return .ignore
  177. }
  178. requests.forEach {
  179. context.fireChannelRead(self.wrapInboundOut(.message($0)))
  180. }
  181. return .expectingBody
  182. }
  183. private func processEnd(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState {
  184. self.logger.debug("processing end")
  185. if let trailers = trailers {
  186. self.logger.error("unexpected trailers when processing stream end", metadata: ["trailers": "\(trailers)"])
  187. throw GRPCError.InvalidState("unexpected trailers received").captureContext()
  188. }
  189. context.fireChannelRead(self.wrapInboundOut(.end))
  190. return .ignore
  191. }
  192. }
  193. extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
  194. public typealias OutboundIn = _GRPCServerResponsePart<Response>
  195. public typealias OutboundOut = HTTPServerResponsePart
  196. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  197. if case .ignore = self.outboundState {
  198. self.logger.notice("ignoring written data: \(data)")
  199. promise?.fail(GRPCError.InvalidState("rpc has already finished").captureContext())
  200. return
  201. }
  202. switch self.unwrapOutboundIn(data) {
  203. case .headers(var headers):
  204. guard case .expectingHeaders = self.outboundState else {
  205. self.logger.error("invalid state while writing headers",
  206. metadata: ["state": "\(self.outboundState)", "headers": "\(headers)"])
  207. return
  208. }
  209. var version = HTTPVersion(major: 2, minor: 0)
  210. if let contentType = self.contentType {
  211. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  212. if contentType != .protobuf {
  213. version = .init(major: 1, minor: 1)
  214. }
  215. }
  216. if self.contentType == .webTextProtobuf {
  217. responseTextBuffer = context.channel.allocator.buffer(capacity: 0)
  218. }
  219. context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  220. self.outboundState = .expectingBodyOrStatus
  221. case .message(let message):
  222. guard case .expectingBodyOrStatus = self.outboundState else {
  223. self.logger.error("invalid state while writing message", metadata: ["state": "\(self.outboundState)"])
  224. return
  225. }
  226. do {
  227. if contentType == .webTextProtobuf {
  228. // Store the response into an independent buffer. We can't return the message directly as
  229. // it needs to be aggregated with all the responses plus the trailers, in order to have
  230. // the base64 response properly encoded in a single byte stream.
  231. precondition(self.responseTextBuffer != nil)
  232. try self.messageWriter.write(message, into: &self.responseTextBuffer)
  233. // Since we stored the written data, mark the write promise as successful so that the
  234. // ServerStreaming provider continues sending the data.
  235. promise?.succeed(())
  236. } else {
  237. var lengthPrefixedMessageBuffer = context.channel.allocator.buffer(capacity: 0)
  238. try self.messageWriter.write(message, into: &lengthPrefixedMessageBuffer)
  239. context.write(self.wrapOutboundOut(.body(.byteBuffer(lengthPrefixedMessageBuffer))), promise: promise)
  240. }
  241. } catch {
  242. let error = GRPCError.SerializationFailure().captureContext()
  243. promise?.fail(error)
  244. context.fireErrorCaught(error)
  245. self.outboundState = .ignore
  246. return
  247. }
  248. self.outboundState = .expectingBodyOrStatus
  249. case let .statusAndTrailers(status, trailers):
  250. // If we error before sending the initial headers then we won't have sent the request head.
  251. // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still
  252. // need to loop back and send the request head first.
  253. if case .expectingHeaders = self.outboundState {
  254. self.write(context: context, data: NIOAny(OutboundIn.headers(HTTPHeaders())), promise: nil)
  255. }
  256. var trailers = trailers
  257. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  258. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  259. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  260. }
  261. if contentType == .webTextProtobuf {
  262. precondition(responseTextBuffer != nil)
  263. // Encode the trailers into the response byte stream as a length delimited message, as per
  264. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  265. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  266. responseTextBuffer.writeInteger(UInt8(0x80))
  267. responseTextBuffer.writeInteger(UInt32(textTrailers.utf8.count))
  268. responseTextBuffer.writeString(textTrailers)
  269. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  270. // base64. Investigate whether this might have any effect on the transport mechanism and
  271. // client decoding. Initial results say that they are inocuous, but we might have to keep
  272. // an eye on this in case something trips up.
  273. if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
  274. let encodedData = binaryData.base64EncodedString()
  275. responseTextBuffer.clear()
  276. responseTextBuffer.reserveCapacity(encodedData.utf8.count)
  277. responseTextBuffer.writeString(encodedData)
  278. }
  279. // After collecting all response for gRPC Web connections, send one final aggregated
  280. // response.
  281. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  282. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  283. } else {
  284. context.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  285. }
  286. // Log the call duration and status
  287. if let stopwatch = self.stopwatch {
  288. self.stopwatch = nil
  289. let millis = stopwatch.elapsedMillis()
  290. self.accessLog.debug("rpc call finished", metadata: [
  291. "duration_ms": "\(millis)",
  292. "status_code": "\(status.code.rawValue)"
  293. ])
  294. }
  295. self.outboundState = .ignore
  296. self.inboundState = .ignore
  297. }
  298. }
  299. }