HTTP1ToGRPCServerCodec.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOFoundationCompat
  20. import Logging
  21. import SwiftProtobuf
  22. /// Incoming gRPC package with a fixed message type.
  23. ///
  24. /// - Important: This is **NOT** part of the public API.
  25. public enum _GRPCServerRequestPart<Request> {
  26. case head(HTTPRequestHead)
  27. case message(Request)
  28. case end
  29. }
  30. public typealias _RawGRPCServerRequestPart = _GRPCServerRequestPart<ByteBuffer>
  31. /// Outgoing gRPC package with a fixed message type.
  32. ///
  33. /// - Important: This is **NOT** part of the public API.
  34. public enum _GRPCServerResponsePart<Response> {
  35. case headers(HTTPHeaders)
  36. case message(_MessageContext<Response>)
  37. case statusAndTrailers(GRPCStatus, HTTPHeaders)
  38. }
  39. public typealias _RawGRPCServerResponsePart = _GRPCServerResponsePart<ByteBuffer>
  40. /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
  41. ///
  42. /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
  43. /// primitives while providing all the functionality we need. In addition, it allows us to support
  44. /// gRPC-Web (gRPC over HTTP1).
  45. ///
  46. /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
  47. public final class HTTP1ToGRPCServerCodec {
  48. public init(encoding: ServerMessageEncoding, logger: Logger) {
  49. self.encoding = encoding
  50. self.encodingHeaderValidator = MessageEncodingHeaderValidator(encoding: encoding)
  51. self.logger = logger
  52. self.messageReader = LengthPrefixedMessageReader()
  53. self.messageWriter = LengthPrefixedMessageWriter()
  54. }
  55. private var contentType: ContentType?
  56. private let encoding: ServerMessageEncoding
  57. private let encodingHeaderValidator: MessageEncodingHeaderValidator
  58. private var acceptEncodingHeader: String? = nil
  59. private var responseEncodingHeader: String? = nil
  60. private let logger: Logger
  61. private var stopwatch: Stopwatch?
  62. // The following buffers use force unwrapping explicitly. With optionals, developers
  63. // are encouraged to unwrap them using guard-else statements. These don't work cleanly
  64. // with structs, since the guard-else would create a new copy of the struct, which
  65. // would then have to be re-assigned into the class variable for the changes to take effect.
  66. // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
  67. // Buffer to store binary encoded protos as they're being received if the proto is split across
  68. // multiple buffers.
  69. private var binaryRequestBuffer: NIO.ByteBuffer!
  70. // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
  71. // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
  72. // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
  73. private var requestTextBuffer: NIO.ByteBuffer!
  74. private var responseTextBuffers: CircularBuffer<ByteBuffer>?
  75. var inboundState = InboundState.expectingHeaders {
  76. willSet {
  77. guard newValue != self.inboundState else { return }
  78. self.logger.debug("inbound state changed", metadata: ["old_state": "\(self.inboundState)", "new_state": "\(newValue)"])
  79. }
  80. }
  81. var outboundState = OutboundState.expectingHeaders {
  82. willSet {
  83. guard newValue != self.outboundState else { return }
  84. self.logger.debug("outbound state changed", metadata: ["old_state": "\(self.outboundState)", "new_state": "\(newValue)"])
  85. }
  86. }
  87. var messageReader: LengthPrefixedMessageReader
  88. var messageWriter: LengthPrefixedMessageWriter
  89. }
  90. extension HTTP1ToGRPCServerCodec {
  91. enum InboundState {
  92. case expectingHeaders
  93. case expectingBody
  94. // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close.
  95. case ignore
  96. }
  97. enum OutboundState {
  98. case expectingHeaders
  99. case expectingBodyOrStatus
  100. case ignore
  101. }
  102. }
  103. extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
  104. public typealias InboundIn = HTTPServerRequestPart
  105. public typealias InboundOut = _RawGRPCServerRequestPart
  106. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  107. if case .ignore = inboundState {
  108. self.logger.notice("ignoring read data", metadata: ["data": "\(data)"])
  109. return
  110. }
  111. do {
  112. switch self.unwrapInboundIn(data) {
  113. case .head(let requestHead):
  114. inboundState = try processHead(context: context, requestHead: requestHead)
  115. case .body(var body):
  116. inboundState = try processBody(context: context, body: &body)
  117. case .end(let trailers):
  118. inboundState = try processEnd(context: context, trailers: trailers)
  119. }
  120. } catch {
  121. context.fireErrorCaught(error)
  122. inboundState = .ignore
  123. }
  124. }
  125. func processHead(context: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState {
  126. self.logger.debug("processing request head", metadata: ["head": "\(requestHead)"])
  127. guard case .expectingHeaders = inboundState else {
  128. self.logger.error("invalid state while processing request head",
  129. metadata: ["state": "\(inboundState)", "head": "\(requestHead)"])
  130. throw GRPCError.InvalidState("expected state .expectingHeaders, got \(inboundState)").captureContext()
  131. }
  132. self.stopwatch = .start()
  133. self.logger.debug("rpc call started", metadata: [
  134. "path": "\(requestHead.uri)",
  135. "method": "\(requestHead.method)",
  136. "version": "\(requestHead.version)"
  137. ])
  138. if let contentType = requestHead.headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) {
  139. self.contentType = contentType
  140. } else {
  141. self.logger.debug("no 'content-type' header, assuming content type is 'application/grpc'")
  142. // If the Content-Type is not present, assume the request is binary encoded gRPC.
  143. self.contentType = .protobuf
  144. }
  145. if self.contentType == .webTextProtobuf {
  146. requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
  147. }
  148. // What compression was used for sending requests?
  149. let encodingHeader = requestHead.headers.first(name: GRPCHeaderName.encoding)
  150. switch self.encodingHeaderValidator.validate(requestEncoding: encodingHeader) {
  151. case let .supported(algorithm, limit, acceptableEncoding):
  152. self.messageReader = LengthPrefixedMessageReader(compression: algorithm, decompressionLimit: limit)
  153. if acceptableEncoding.isEmpty {
  154. self.acceptEncodingHeader = nil
  155. } else {
  156. self.acceptEncodingHeader = acceptableEncoding.joined(separator: ",")
  157. }
  158. case .noCompression:
  159. self.messageReader = LengthPrefixedMessageReader()
  160. self.acceptEncodingHeader = nil
  161. case let .unsupported(header, acceptableEncoding):
  162. let message: String
  163. let headers: HTTPHeaders
  164. if acceptableEncoding.isEmpty {
  165. message = "compression is not supported"
  166. headers = .init()
  167. } else {
  168. let advertised = acceptableEncoding.joined(separator: ",")
  169. message = "'\(header)' compression is not supported, supported: \(advertised)"
  170. headers = [GRPCHeaderName.acceptEncoding: advertised]
  171. }
  172. let status = GRPCStatus(code: .unimplemented, message: message)
  173. defer {
  174. self.write(context: context, data: NIOAny(OutboundIn.statusAndTrailers(status, headers)), promise: nil)
  175. self.flush(context: context)
  176. }
  177. // We're about to fast-fail, so ignore any following inbound messages.
  178. return .ignore
  179. }
  180. // What compression should we use for writing responses?
  181. let clientAcceptableEncoding = requestHead.headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  182. if let responseEncoding = self.selectResponseEncoding(from: clientAcceptableEncoding) {
  183. self.messageWriter = LengthPrefixedMessageWriter(compression: responseEncoding)
  184. self.responseEncodingHeader = responseEncoding.name
  185. } else {
  186. self.messageWriter = LengthPrefixedMessageWriter(compression: .none)
  187. self.responseEncodingHeader = nil
  188. }
  189. context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
  190. return .expectingBody
  191. }
  192. func processBody(context: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState {
  193. self.logger.debug("processing body: \(body)")
  194. guard case .expectingBody = inboundState else {
  195. self.logger.error("invalid state while processing body",
  196. metadata: ["state": "\(inboundState)", "body": "\(body)"])
  197. throw GRPCError.InvalidState("expected state .expectingBody, got \(inboundState)").captureContext()
  198. }
  199. // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
  200. // it to the binary buffer. If the request is chunked, this section will process the text
  201. // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
  202. // where it will expect a new incoming chunk.
  203. if self.contentType == .webTextProtobuf {
  204. precondition(requestTextBuffer != nil)
  205. requestTextBuffer.writeBuffer(&body)
  206. // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
  207. let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
  208. guard let base64Encoded = requestTextBuffer.readString(length: readyBytes),
  209. let decodedData = Data(base64Encoded: base64Encoded) else {
  210. throw GRPCError.Base64DecodeError().captureContext()
  211. }
  212. body.writeBytes(decodedData)
  213. }
  214. self.messageReader.append(buffer: &body)
  215. var requests: [ByteBuffer] = []
  216. do {
  217. while let buffer = try self.messageReader.nextMessage() {
  218. requests.append(buffer)
  219. }
  220. } catch let grpcError as GRPCError.WithContext {
  221. context.fireErrorCaught(grpcError)
  222. return .ignore
  223. } catch {
  224. context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
  225. return .ignore
  226. }
  227. requests.forEach {
  228. context.fireChannelRead(self.wrapInboundOut(.message($0)))
  229. }
  230. return .expectingBody
  231. }
  232. private func processEnd(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState {
  233. self.logger.debug("processing end")
  234. if let trailers = trailers {
  235. self.logger.error("unexpected trailers when processing stream end", metadata: ["trailers": "\(trailers)"])
  236. throw GRPCError.InvalidState("unexpected trailers received").captureContext()
  237. }
  238. context.fireChannelRead(self.wrapInboundOut(.end))
  239. return .ignore
  240. }
  241. }
  242. extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
  243. public typealias OutboundIn = _RawGRPCServerResponsePart
  244. public typealias OutboundOut = HTTPServerResponsePart
  245. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  246. if case .ignore = self.outboundState {
  247. self.logger.notice("ignoring written data: \(data)")
  248. promise?.fail(GRPCError.InvalidState("rpc has already finished").captureContext())
  249. return
  250. }
  251. switch self.unwrapOutboundIn(data) {
  252. case .headers(var headers):
  253. guard case .expectingHeaders = self.outboundState else {
  254. self.logger.error("invalid state while writing headers",
  255. metadata: ["state": "\(self.outboundState)", "headers": "\(headers)"])
  256. return
  257. }
  258. var version = HTTPVersion(major: 2, minor: 0)
  259. if let contentType = self.contentType {
  260. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  261. if contentType != .protobuf {
  262. version = .init(major: 1, minor: 1)
  263. }
  264. }
  265. // Are we compressing responses?
  266. if let responseEncoding = self.responseEncodingHeader {
  267. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  268. }
  269. // The client may have sent us a message using an encoding we didn't advertise; we'll send
  270. // an accept-encoding header back if that's the case.
  271. if let acceptEncoding = self.acceptEncodingHeader {
  272. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  273. }
  274. context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
  275. self.outboundState = .expectingBodyOrStatus
  276. case .message(let messageContext):
  277. guard case .expectingBodyOrStatus = self.outboundState else {
  278. self.logger.error("invalid state while writing message", metadata: ["state": "\(self.outboundState)"])
  279. return
  280. }
  281. do {
  282. if contentType == .webTextProtobuf {
  283. // Store the response into an independent buffer. We can't return the message directly as
  284. // it needs to be aggregated with all the responses plus the trailers, in order to have
  285. // the base64 response properly encoded in a single byte stream.
  286. let buffer = try self.messageWriter.write(
  287. buffer: messageContext.message,
  288. allocator: context.channel.allocator,
  289. compressed: messageContext.compressed
  290. )
  291. self.appendResponseText(buffer)
  292. // Since we stored the written data, mark the write promise as successful so that the
  293. // ServerStreaming provider continues sending the data.
  294. promise?.succeed(())
  295. } else {
  296. let messageBuffer = try self.messageWriter.write(
  297. buffer: messageContext.message,
  298. allocator: context.channel.allocator,
  299. compressed: messageContext.compressed
  300. )
  301. context.write(self.wrapOutboundOut(.body(.byteBuffer(messageBuffer))), promise: promise)
  302. }
  303. } catch {
  304. let error = GRPCError.SerializationFailure().captureContext()
  305. promise?.fail(error)
  306. context.fireErrorCaught(error)
  307. self.outboundState = .ignore
  308. return
  309. }
  310. self.outboundState = .expectingBodyOrStatus
  311. case let .statusAndTrailers(status, trailers):
  312. // If we error before sending the initial headers then we won't have sent the request head.
  313. // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still
  314. // need to loop back and send the request head first.
  315. if case .expectingHeaders = self.outboundState {
  316. self.write(context: context, data: NIOAny(OutboundIn.headers(HTTPHeaders())), promise: nil)
  317. }
  318. var trailers = trailers
  319. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  320. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  321. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  322. }
  323. if contentType == .webTextProtobuf {
  324. // Encode the trailers into the response byte stream as a length delimited message, as per
  325. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  326. let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
  327. var trailersBuffer = context.channel.allocator.buffer(capacity: 5 + textTrailers.utf8.count)
  328. trailersBuffer.writeInteger(UInt8(0x80))
  329. trailersBuffer.writeInteger(UInt32(textTrailers.utf8.count))
  330. trailersBuffer.writeString(textTrailers)
  331. self.appendResponseText(trailersBuffer)
  332. // This code can only be called on the grpc-web path, so we know the response text buffers must be non-nil
  333. // and must contain at least one element.
  334. guard var buffers = self.responseTextBuffers else {
  335. preconditionFailure("Building web response text but responseTextBuffers are nil")
  336. }
  337. // Avoid a CoW
  338. self.responseTextBuffers = nil
  339. var responseTextBuffer = buffers.popFirst()!
  340. // Read the data from the first buffer.
  341. var accumulatedData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes)!
  342. // Reserve enough capacity and append the remaining buffers.
  343. let requiredExtraCapacity = buffers.lazy.map { $0.readableBytes }.reduce(0, +)
  344. accumulatedData.reserveCapacity(accumulatedData.count + requiredExtraCapacity)
  345. while let buffer = buffers.popFirst() {
  346. accumulatedData.append(contentsOf: buffer.readableBytesView)
  347. }
  348. // Restore the buffers.
  349. self.responseTextBuffers = buffers
  350. // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
  351. // base64. Investigate whether this might have any effect on the transport mechanism and
  352. // client decoding. Initial results say that they are innocuous, but we might have to keep
  353. // an eye on this in case something trips up.
  354. let encodedData = accumulatedData.base64EncodedString()
  355. // Reuse our first buffer.
  356. responseTextBuffer.clear(minimumCapacity: UInt32(encodedData.utf8.count))
  357. responseTextBuffer.writeString(encodedData)
  358. // After collecting all response for gRPC Web connections, send one final aggregated
  359. // response.
  360. context.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
  361. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  362. } else {
  363. context.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
  364. }
  365. // Log the call duration and status
  366. if let stopwatch = self.stopwatch {
  367. self.stopwatch = nil
  368. let millis = stopwatch.elapsedMillis()
  369. self.logger.debug("rpc call finished", metadata: [
  370. "duration_ms": "\(millis)",
  371. "status_code": "\(status.code.rawValue)"
  372. ])
  373. }
  374. self.outboundState = .ignore
  375. self.inboundState = .ignore
  376. }
  377. }
  378. private func appendResponseText(_ buffer: ByteBuffer) {
  379. if self.responseTextBuffers == nil {
  380. self.responseTextBuffers = CircularBuffer()
  381. }
  382. self.responseTextBuffers!.append(buffer)
  383. }
  384. }
  385. fileprivate extension HTTP1ToGRPCServerCodec {
  386. /// Selects an appropriate response encoding from the list of encodings sent to us by the client.
  387. /// Returns `nil` if there were no appropriate algorithms, in which case the server will send
  388. /// messages uncompressed.
  389. func selectResponseEncoding(from acceptableEncoding: [Substring]) -> CompressionAlgorithm? {
  390. guard case .enabled(let configuration) = self.encoding else {
  391. return nil
  392. }
  393. return acceptableEncoding.compactMap {
  394. CompressionAlgorithm(rawValue: String($0))
  395. }.first {
  396. configuration.enabledAlgorithms.contains($0)
  397. }
  398. }
  399. }
  400. struct MessageEncodingHeaderValidator {
  401. var encoding: ServerMessageEncoding
  402. enum ValidationResult {
  403. /// The requested compression is supported.
  404. case supported(algorithm: CompressionAlgorithm, decompressionLimit: DecompressionLimit, acceptEncoding: [String])
  405. /// The `requestEncoding` is not supported; `acceptEncoding` contains all algorithms we do
  406. /// support.
  407. case unsupported(requestEncoding: String, acceptEncoding: [String])
  408. /// No compression was requested.
  409. case noCompression
  410. }
  411. /// Validates the value of the 'grpc-encoding' header against compression algorithms supported and
  412. /// advertised by this peer.
  413. ///
  414. /// - Parameter requestEncoding: The value of the 'grpc-encoding' header.
  415. func validate(requestEncoding: String?) -> ValidationResult {
  416. switch (self.encoding, requestEncoding) {
  417. // Compression is enabled and the client sent a message encoding header. Do we support it?
  418. case (.enabled(let configuration), .some(let header)):
  419. guard let algorithm = CompressionAlgorithm(rawValue: header) else {
  420. return .unsupported(
  421. requestEncoding: header,
  422. acceptEncoding: configuration.enabledAlgorithms.map { $0.name }
  423. )
  424. }
  425. if configuration.enabledAlgorithms.contains(algorithm) {
  426. return .supported(
  427. algorithm: algorithm,
  428. decompressionLimit: configuration.decompressionLimit,
  429. acceptEncoding: []
  430. )
  431. } else {
  432. // From: https://github.com/grpc/grpc/blob/master/doc/compression.md
  433. //
  434. // Note that a peer MAY choose to not disclose all the encodings it supports. However, if
  435. // it receives a message compressed in an undisclosed but supported encoding, it MUST
  436. // include said encoding in the response's grpc-accept-encoding header.
  437. return .supported(
  438. algorithm: algorithm,
  439. decompressionLimit: configuration.decompressionLimit,
  440. acceptEncoding: configuration.enabledAlgorithms.map { $0.name } + CollectionOfOne(header)
  441. )
  442. }
  443. // Compression is disabled and the client sent a message encoding header. We clearly don't
  444. // support this. Note this is different to the supported but not advertised case since we have
  445. // explicitly not enabled compression.
  446. case (.disabled, .some(let header)):
  447. return .unsupported(requestEncoding: header, acceptEncoding: [])
  448. // The client didn't send a message encoding header.
  449. case (_, .none):
  450. return .noCompression
  451. }
  452. }
  453. }