HTTP1ToGRPCServerCodec.swift 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOFoundationCompat
  20. import Logging
  21. /// Incoming gRPC package with a fixed message type.
  22. ///
  23. /// - Important: This is **NOT** part of the public API.
  24. public enum _GRPCServerRequestPart<RequestPayload: GRPCPayload> {
  25. case head(HTTPRequestHead)
  26. case message(RequestPayload)
  27. case end
  28. }
  29. /// Outgoing gRPC package with a fixed message type.
  30. ///
  31. /// - Important: This is **NOT** part of the public API.
  32. public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
  33. case headers(HTTPHeaders)
  34. case message(_MessageContext<ResponsePayload>)
  35. case statusAndTrailers(GRPCStatus, HTTPHeaders)
  36. }
  37. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  38. ///
  39. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  40. /// primitives while providing all the functionality we need. In addition, it allows us to support
  41. /// gRPC-Web (gRPC over HTTP1).
  42. ///
  43. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  44. public final class HTTP1ToGRPCServerCodec<Request: GRPCPayload, Response: GRPCPayload> {
  45. public init(encoding: Server.Configuration.MessageEncoding, logger: Logger) {
  46. self.encoding = encoding
  47. self.logger = logger
  48. var accessLog = Logger(subsystem: .serverAccess)
  49. accessLog[metadataKey: MetadataKey.requestID] = logger[metadataKey: MetadataKey.requestID]
  50. self.accessLog = accessLog
  51. self.messageReader = LengthPrefixedMessageReader()
  52. self.messageWriter = LengthPrefixedMessageWriter()
  53. }
  54. private var contentType: ContentType?
  55. private let encoding: Server.Configuration.MessageEncoding
  56. private var acceptEncodingHeader: String? = nil
  57. private var responseEncodingHeader: String? = nil
  58. private let logger: Logger
  59. private let accessLog: Logger
  60. private var stopwatch: Stopwatch?
  61. // The following buffers use force unwrapping explicitly. With optionals, developers
  62. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  63. // with structs, since the guard-else would create a new copy of the struct, which
  64. // would then have to be re-assigned into the class variable for the changes to take effect.
  65. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  66. // Buffer to store binary encoded protos as they're being received if the proto is split across
  67. // multiple buffers.
  68. private var binaryRequestBuffer: NIO.ByteBuffer!
  69. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  70. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  71. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  72. private var requestTextBuffer: NIO.ByteBuffer!
  73. private var responseTextBuffer: NIO.ByteBuffer!
  74. var inboundState = InboundState.expectingHeaders {
  75. willSet {
  76. guard newValue != self.inboundState else { return }
  77. self.logger.debug("inbound state changed", metadata: ["old_state": "\(self.inboundState)", "new_state": "\(newValue)"])
  78. }
  79. }
  80. var outboundState = OutboundState.expectingHeaders {
  81. willSet {
  82. guard newValue != self.outboundState else { return }
  83. self.logger.debug("outbound state changed", metadata: ["old_state": "\(self.outboundState)", "new_state": "\(newValue)"])
  84. }
  85. }
  86. var messageReader: LengthPrefixedMessageReader
  87. var messageWriter: LengthPrefixedMessageWriter
  88. }
  89. extension HTTP1ToGRPCServerCodec {
  90. enum InboundState {
  91. case expectingHeaders
  92. case expectingBody
  93. // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close.
  94. case ignore
  95. }
  96. enum OutboundState {
  97. case expectingHeaders
  98. case expectingBodyOrStatus
  99. case ignore
  100. }
  101. }
  102. extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
  103. public typealias InboundIn = HTTPServerRequestPart
  104. public typealias InboundOut = _GRPCServerRequestPart<Request>
  105. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  106. if case .ignore = inboundState {
  107. self.logger.notice("ignoring read data", metadata: ["data": "\(data)"])
  108. return
  109. }
  110. do {
  111. switch self.unwrapInboundIn(data) {
  112. case .head(let requestHead):
  113. inboundState = try processHead(context: context, requestHead: requestHead)
  114. case .body(var body):
  115. inboundState = try processBody(context: context, body: &body)
  116. case .end(let trailers):
  117. inboundState = try processEnd(context: context, trailers: trailers)
  118. }
  119. } catch {
  120. context.fireErrorCaught(error)
  121. inboundState = .ignore
  122. }
  123. }
  124. func processHead(context: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState {
  125. self.logger.debug("processing request head", metadata: ["head": "\(requestHead)"])
  126. guard case .expectingHeaders = inboundState else {
  127. self.logger.error("invalid state while processing request head",
  128. metadata: ["state": "\(inboundState)", "head": "\(requestHead)"])
  129. throw GRPCError.InvalidState("expected state .expectingHeaders, got \(inboundState)").captureContext()
  130. }
  131. self.stopwatch = .start()
  132. self.accessLog.debug("rpc call started", metadata: [
  133. "path": "\(requestHead.uri)",
  134. "method": "\(requestHead.method)",
  135. "version": "\(requestHead.version)"
  136. ])
  137. if let contentType = requestHead.headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) {
  138. self.contentType = contentType
  139. } else {
  140. self.logger.debug("no 'content-type' header, assuming content type is 'application/grpc'")
  141. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  142. self.contentType = .protobuf
  143. }
  144. if self.contentType == .webTextProtobuf {
  145. requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
  146. }
  147. // What compression was used for sending requests?
  148. if let encodingHeader = requestHead.headers.first(name: GRPCHeaderName.encoding) {
  149. switch self.validate(requestEncoding: encodingHeader) {
  150. case .unsupported:
  151. // We don't support this encoding, we must let the client know what we do support.
  152. self.acceptEncodingHeader = self.makeAcceptEncodingHeader()
  153. let message: String
  154. let headers: HTTPHeaders
  155. if let advertised = self.acceptEncodingHeader {
  156. message = "'\(encodingHeader)' compression is not supported, supported: \(advertised)"
  157. headers = [GRPCHeaderName.acceptEncoding: advertised]
  158. } else {
  159. message = "'\(encodingHeader)' compression is not supported"
  160. headers = .init()
  161. }
  162. let status = GRPCStatus(code: .unimplemented, message: message)
  163. defer {
  164. self.write(context: context, data: NIOAny(OutboundIn.statusAndTrailers(status, headers)), promise: nil)
  165. self.flush(context: context)
  166. }
  167. // We're about to fast-fail, so ignore any following inbound messages.
  168. return .ignore
  169. case .supported(let algorithm):
  170. self.messageReader = LengthPrefixedMessageReader(compression: algorithm)
  171. case .supportedButNotDisclosed(let algorithm):
  172. self.messageReader = LengthPrefixedMessageReader(compression: algorithm)
  173. // From: https://github.com/grpc/grpc/blob/master/doc/compression.md
  174. //
  175. // Note that a peer MAY choose to not disclose all the encodings it supports. However, if
  176. // it receives a message compressed in an undisclosed but supported encoding, it MUST
  177. // include said encoding in the response's grpc-accept-encoding header.
  178. self.acceptEncodingHeader = self.makeAcceptEncodingHeader(includeExtra: algorithm)
  179. }
  180. } else {
  181. self.messageReader = LengthPrefixedMessageReader(compression: .none)
  182. self.acceptEncodingHeader = nil
  183. }
  184. // What compression should we use for writing responses?
  185. let clientAcceptableEncoding = requestHead.headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  186. if let responseEncoding = self.selectResponseEncoding(from: clientAcceptableEncoding) {
  187. self.messageWriter = LengthPrefixedMessageWriter(compression: responseEncoding)
  188. self.responseEncodingHeader = responseEncoding.name
  189. } else {
  190. self.messageWriter = LengthPrefixedMessageWriter(compression: .none)
  191. self.responseEncodingHeader = nil
  192. }
  193. context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  194. return .expectingBody
  195. }
  196. func processBody(context: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState {
  197. self.logger.debug("processing body: \(body)")
  198. guard case .expectingBody = inboundState else {
  199. self.logger.error("invalid state while processing body",
  200. metadata: ["state": "\(inboundState)", "body": "\(body)"])
  201. throw GRPCError.InvalidState("expected state .expectingBody, got \(inboundState)").captureContext()
  202. }
  203. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  204. // it to the binary buffer. If the request is chunked, this section will process the text
  205. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  206. // where it will expect a new incoming chunk.
  207. if self.contentType == .webTextProtobuf {
  208. precondition(requestTextBuffer != nil)
  209. requestTextBuffer.writeBuffer(&body)
  210. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  211. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  212. guard let base64Encoded = requestTextBuffer.readString(length: readyBytes),
  213. let decodedData = Data(base64Encoded: base64Encoded) else {
  214. throw GRPCError.Base64DecodeError().captureContext()
  215. }
  216. body.writeBytes(decodedData)
  217. }
  218. self.messageReader.append(buffer: &body)
  219. var requests: [Request] = []
  220. do {
  221. while var buffer = try self.messageReader.nextMessage() {
  222. requests.append(try Request(serializedByteBuffer: &buffer))
  223. }
  224. } catch {
  225. context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
  226. return .ignore
  227. }
  228. requests.forEach {
  229. context.fireChannelRead(self.wrapInboundOut(.message($0)))
  230. }
  231. return .expectingBody
  232. }
  233. private func processEnd(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState {
  234. self.logger.debug("processing end")
  235. if let trailers = trailers {
  236. self.logger.error("unexpected trailers when processing stream end", metadata: ["trailers": "\(trailers)"])
  237. throw GRPCError.InvalidState("unexpected trailers received").captureContext()
  238. }
  239. context.fireChannelRead(self.wrapInboundOut(.end))
  240. return .ignore
  241. }
  242. }
  243. extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
  244. public typealias OutboundIn = _GRPCServerResponsePart<Response>
  245. public typealias OutboundOut = HTTPServerResponsePart
  246. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  247. if case .ignore = self.outboundState {
  248. self.logger.notice("ignoring written data: \(data)")
  249. promise?.fail(GRPCError.InvalidState("rpc has already finished").captureContext())
  250. return
  251. }
  252. switch self.unwrapOutboundIn(data) {
  253. case .headers(var headers):
  254. guard case .expectingHeaders = self.outboundState else {
  255. self.logger.error("invalid state while writing headers",
  256. metadata: ["state": "\(self.outboundState)", "headers": "\(headers)"])
  257. return
  258. }
  259. var version = HTTPVersion(major: 2, minor: 0)
  260. if let contentType = self.contentType {
  261. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  262. if contentType != .protobuf {
  263. version = .init(major: 1, minor: 1)
  264. }
  265. }
  266. if self.contentType == .webTextProtobuf {
  267. responseTextBuffer = context.channel.allocator.buffer(capacity: 0)
  268. }
  269. // Are we compressing responses?
  270. if let responseEncoding = self.responseEncodingHeader {
  271. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  272. }
  273. // The client may have sent us a message using an encoding we didn't advertise; we'll send
  274. // an accept-encoding header back if that's the case.
  275. if let acceptEncoding = self.acceptEncodingHeader {
  276. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  277. }
  278. context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  279. self.outboundState = .expectingBodyOrStatus
  280. case .message(let messageContext):
  281. guard case .expectingBodyOrStatus = self.outboundState else {
  282. self.logger.error("invalid state while writing message", metadata: ["state": "\(self.outboundState)"])
  283. return
  284. }
  285. do {
  286. if contentType == .webTextProtobuf {
  287. // Store the response into an independent buffer. We can't return the message directly as
  288. // it needs to be aggregated with all the responses plus the trailers, in order to have
  289. // the base64 response properly encoded in a single byte stream.
  290. precondition(self.responseTextBuffer != nil)
  291. try self.messageWriter.write(
  292. messageContext.message,
  293. into: &self.responseTextBuffer,
  294. compressed: messageContext.compressed
  295. )
  296. // Since we stored the written data, mark the write promise as successful so that the
  297. // ServerStreaming provider continues sending the data.
  298. promise?.succeed(())
  299. } else {
  300. var lengthPrefixedMessageBuffer = context.channel.allocator.buffer(capacity: 0)
  301. try self.messageWriter.write(
  302. messageContext.message,
  303. into: &lengthPrefixedMessageBuffer,
  304. compressed: messageContext.compressed
  305. )
  306. context.write(self.wrapOutboundOut(.body(.byteBuffer(lengthPrefixedMessageBuffer))), promise: promise)
  307. }
  308. } catch {
  309. let error = GRPCError.SerializationFailure().captureContext()
  310. promise?.fail(error)
  311. context.fireErrorCaught(error)
  312. self.outboundState = .ignore
  313. return
  314. }
  315. self.outboundState = .expectingBodyOrStatus
  316. case let .statusAndTrailers(status, trailers):
  317. // If we error before sending the initial headers then we won't have sent the request head.
  318. // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still
  319. // need to loop back and send the request head first.
  320. if case .expectingHeaders = self.outboundState {
  321. self.write(context: context, data: NIOAny(OutboundIn.headers(HTTPHeaders())), promise: nil)
  322. }
  323. var trailers = trailers
  324. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  325. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  326. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  327. }
  328. if contentType == .webTextProtobuf {
  329. precondition(responseTextBuffer != nil)
  330. // Encode the trailers into the response byte stream as a length delimited message, as per
  331. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  332. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  333. responseTextBuffer.writeInteger(UInt8(0x80))
  334. responseTextBuffer.writeInteger(UInt32(textTrailers.utf8.count))
  335. responseTextBuffer.writeString(textTrailers)
  336. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  337. // base64. Investigate whether this might have any effect on the transport mechanism and
  338. // client decoding. Initial results say that they are inocuous, but we might have to keep
  339. // an eye on this in case something trips up.
  340. if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
  341. let encodedData = binaryData.base64EncodedString()
  342. responseTextBuffer.clear()
  343. responseTextBuffer.reserveCapacity(encodedData.utf8.count)
  344. responseTextBuffer.writeString(encodedData)
  345. }
  346. // After collecting all response for gRPC Web connections, send one final aggregated
  347. // response.
  348. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  349. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  350. } else {
  351. context.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  352. }
  353. // Log the call duration and status
  354. if let stopwatch = self.stopwatch {
  355. self.stopwatch = nil
  356. let millis = stopwatch.elapsedMillis()
  357. self.accessLog.debug("rpc call finished", metadata: [
  358. "duration_ms": "\(millis)",
  359. "status_code": "\(status.code.rawValue)"
  360. ])
  361. }
  362. self.outboundState = .ignore
  363. self.inboundState = .ignore
  364. }
  365. }
  366. }
  367. private extension HTTP1ToGRPCServerCodec {
  368. enum RequestEncodingValidation {
  369. /// The compression is not supported. The RPC should fail with an appropriate status and include
  370. /// our supported algorithms in the trailers.
  371. case unsupported
  372. /// Compression is supported.
  373. case supported(CompressionAlgorithm)
  374. /// Compression is supported but we did not disclose our support for it. We should continue but
  375. /// also send the acceptable compression methods (including the encoding the client specified)
  376. /// in the initial response metadata.
  377. case supportedButNotDisclosed(CompressionAlgorithm)
  378. }
  379. /// Validates the value of the 'grpc-encoding' header against compression algorithms supported and
  380. /// advertised by this peer.
  381. ///
  382. /// - Parameter requestEncoding: The value of the 'grpc-encoding' header.
  383. func validate(requestEncoding: String) -> RequestEncodingValidation {
  384. guard let algorithm = CompressionAlgorithm(rawValue: requestEncoding) else {
  385. return .unsupported
  386. }
  387. if self.encoding.enabled.contains(algorithm) {
  388. return .supported(algorithm)
  389. } else {
  390. return .supportedButNotDisclosed(algorithm)
  391. }
  392. }
  393. /// Makes a 'grpc-accept-encoding' header from the advertised encodings and an additional value
  394. /// if one is specified.
  395. func makeAcceptEncodingHeader(includeExtra extra: CompressionAlgorithm? = nil) -> String? {
  396. switch (self.encoding.enabled.isEmpty, extra) {
  397. case (false, .some(let extra)):
  398. return (self.encoding.enabled + CollectionOfOne(extra)).map { $0.name }.joined(separator: ",")
  399. case (false, .none):
  400. return self.encoding.enabled.map { $0.name }.joined(separator: ",")
  401. case (true, .some(let extra)):
  402. return extra.name
  403. case (true, .none):
  404. return nil
  405. }
  406. }
  407. /// Selects an appropriate response encoding from the list of encodings sent to us by the client.
  408. /// Returns `nil` if there were no appropriate algorithms, in which case the server will send
  409. /// messages uncompressed.
  410. func selectResponseEncoding(from acceptableEncoding: [Substring]) -> CompressionAlgorithm? {
  411. return acceptableEncoding.compactMap {
  412. CompressionAlgorithm(rawValue: String($0))
  413. }.first {
  414. self.encoding.enabled.contains($0)
  415. }
  416. }
  417. }