2
0

HTTP1ToRawGRPCClientCodec.swift 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import Logging
  20. /// Outgoing gRPC package with an unknown message type (represented as the serialized protobuf message).
  21. public enum RawGRPCClientRequestPart {
  22. case head(HTTPRequestHead)
  23. case message(Data)
  24. case end
  25. }
  26. /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
  27. public enum RawGRPCClientResponsePart {
  28. case headers(HTTPHeaders)
  29. case message(ByteBuffer)
  30. case statusAndTrailers(GRPCStatus, HTTPHeaders?)
  31. }
  32. /// Codec for translating HTTP/1 responses from the server into untyped gRPC packages
  33. /// and vice-versa.
  34. ///
  35. /// Most of the inbound processing is done by `LengthPrefixedMessageReader`; which
  36. /// reads length-prefxied gRPC messages into `ByteBuffer`s containing serialized
  37. /// Protobuf messages.
  38. ///
  39. /// The outbound processing transforms serialized Protobufs into length-prefixed
  40. /// gRPC messages stored in `ByteBuffer`s.
  41. ///
  42. /// See `HTTP1ToRawGRPCServerCodec` for the corresponding server codec.
  43. public final class HTTP1ToRawGRPCClientCodec {
  44. public init(logger: Logger) {
  45. self.logger = logger.addingMetadata(
  46. key: MetadataKey.channelHandler,
  47. value: "HTTP1ToRawGRPCClientCodec"
  48. )
  49. self.messageReader = LengthPrefixedMessageReader(
  50. mode: .client,
  51. compressionMechanism: .none,
  52. logger: logger
  53. )
  54. }
  55. private enum State {
  56. case expectingHeaders
  57. case expectingBodyOrTrailers
  58. case ignore
  59. }
  60. private let logger: Logger
  61. private var state: State = .expectingHeaders {
  62. didSet {
  63. self.logger.debug("read state changed from \(oldValue) to \(self.state)")
  64. }
  65. }
  66. private let messageReader: LengthPrefixedMessageReader
  67. private let messageWriter = LengthPrefixedMessageWriter(compression: .none)
  68. private var inboundCompression: CompressionMechanism = .none
  69. }
  70. extension HTTP1ToRawGRPCClientCodec: ChannelInboundHandler {
  71. public typealias InboundIn = HTTPClientResponsePart
  72. public typealias InboundOut = RawGRPCClientResponsePart
  73. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  74. if case .ignore = state {
  75. self.logger.notice("ignoring read data: \(data)")
  76. return
  77. }
  78. do {
  79. switch self.unwrapInboundIn(data) {
  80. case .head(let head):
  81. state = try processHead(context: context, head: head)
  82. case .body(var message):
  83. state = try processBody(context: context, messageBuffer: &message)
  84. case .end(let trailers):
  85. state = try processTrailers(context: context, trailers: trailers)
  86. }
  87. } catch {
  88. context.fireErrorCaught(error)
  89. state = .ignore
  90. }
  91. }
  92. /// Forwards the headers from the request head to the next handler.
  93. ///
  94. /// - note: Requires the `.expectingHeaders` state.
  95. private func processHead(context: ChannelHandlerContext, head: HTTPResponseHead) throws -> State {
  96. self.logger.debug("processing response head: \(head)")
  97. guard case .expectingHeaders = state else {
  98. self.logger.error("invalid state '\(state)' while processing response head \(head)")
  99. throw GRPCError.client(.invalidState("received headers while in state \(state)"))
  100. }
  101. // Trailers-Only response.
  102. if head.headers.contains(name: GRPCHeaderName.statusCode) {
  103. self.logger.info("found status-code in headers, processing response head as trailers")
  104. self.state = .expectingBodyOrTrailers
  105. return try self.processTrailers(context: context, trailers: head.headers)
  106. }
  107. // This should be checked *after* the trailers-only response is handled since any status code
  108. // and message we already have should take precedence over one we generate from the HTTP status
  109. // code and reason.
  110. //
  111. // Source: https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  112. guard head.status == .ok else {
  113. self.logger.warning("response head did not have 200 OK status: \(head.status)")
  114. throw GRPCError.client(.HTTPStatusNotOk(head.status))
  115. }
  116. let inboundCompression: CompressionMechanism = head.headers[GRPCHeaderName.encoding]
  117. .first
  118. .map { CompressionMechanism(rawValue: $0) ?? .unknown } ?? .none
  119. guard inboundCompression.supported else {
  120. self.logger.error("remote peer is using unsupported compression: \(inboundCompression)")
  121. throw GRPCError.client(.unsupportedCompressionMechanism(inboundCompression.rawValue))
  122. }
  123. self.logger.info("using inbound compression: \(inboundCompression)")
  124. self.messageReader.compressionMechanism = inboundCompression
  125. context.fireChannelRead(self.wrapInboundOut(.headers(head.headers)))
  126. return .expectingBodyOrTrailers
  127. }
  128. /// Processes the given buffer; if a complete message is read then it is forwarded to the
  129. /// next channel handler.
  130. ///
  131. /// - note: Requires the `.expectingBodyOrTrailers` state.
  132. private func processBody(context: ChannelHandlerContext, messageBuffer: inout ByteBuffer) throws -> State {
  133. guard case .expectingBodyOrTrailers = state else {
  134. self.logger.error("invalid state '\(state)' while processing body \(messageBuffer)")
  135. throw GRPCError.client(.invalidState("received body while in state \(state)"))
  136. }
  137. self.messageReader.append(buffer: &messageBuffer)
  138. while let message = try self.messageReader.nextMessage() {
  139. context.fireChannelRead(self.wrapInboundOut(.message(message)))
  140. }
  141. return .expectingBodyOrTrailers
  142. }
  143. /// Forwards a `GRPCStatus` to the next handler. The status and message are extracted
  144. /// from the trailers if they exist; the `.unknown` status code is used if no status exists.
  145. private func processTrailers(context: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> State {
  146. guard case .expectingBodyOrTrailers = state else {
  147. self.logger.error("invalid state '\(state)' while processing trailers \(String(describing: trailers))")
  148. throw GRPCError.client(.invalidState("received trailers while in state \(state)"))
  149. }
  150. guard let trailers = trailers else {
  151. self.logger.notice("processing trailers, but no trailers were provided")
  152. let status = GRPCStatus(code: .unknown, message: nil)
  153. context.fireChannelRead(self.wrapInboundOut(.statusAndTrailers(status, nil)))
  154. return .ignore
  155. }
  156. let status = GRPCStatus(
  157. code: self.extractStatusCode(from: trailers),
  158. message: self.extractStatusMessage(from: trailers)
  159. )
  160. context.fireChannelRead(wrapInboundOut(.statusAndTrailers(status, trailers)))
  161. return .ignore
  162. }
  163. /// Extracts a status code from the given headers, or `.unknown` if one isn't available or the
  164. /// code is not valid. If multiple values are present, the first is taken.
  165. private func extractStatusCode(from headers: HTTPHeaders) -> GRPCStatus.Code {
  166. let statusCodes = headers[GRPCHeaderName.statusCode]
  167. guard !statusCodes.isEmpty else {
  168. self.logger.warning("no status-code header")
  169. return .unknown
  170. }
  171. if statusCodes.count > 1 {
  172. self.logger.notice("multiple values for status-code header: \(statusCodes), using the first")
  173. }
  174. // We have at least one value: force unwrapping is fine.
  175. let statusCode = statusCodes.first!
  176. if let code = Int(statusCode).flatMap({ GRPCStatus.Code(rawValue: $0) }) {
  177. return code
  178. } else {
  179. self.logger.warning("no known status-code for: \(statusCode)")
  180. return .unknown
  181. }
  182. }
  183. /// Extracts a status message from the given headers, or `nil` if one isn't available. If
  184. /// multiple values are present, the first is taken.
  185. private func extractStatusMessage(from headers: HTTPHeaders) -> String? {
  186. let statusMessages = headers[GRPCHeaderName.statusMessage]
  187. guard !statusMessages.isEmpty else {
  188. self.logger.debug("no status-message header")
  189. return nil
  190. }
  191. if statusMessages.count > 1 {
  192. self.logger.notice("multiple values for status-message header: \(statusMessages), using the first")
  193. }
  194. // We have at least one value: force unwrapping is fine.
  195. let unmarshalled = statusMessages.first!
  196. self.logger.debug("unmarshalling status-message: \(unmarshalled)")
  197. return GRPCStatusMessageMarshaller.unmarshall(unmarshalled)
  198. }
  199. }
  200. extension HTTP1ToRawGRPCClientCodec: ChannelOutboundHandler {
  201. public typealias OutboundIn = RawGRPCClientRequestPart
  202. public typealias OutboundOut = HTTPClientRequestPart
  203. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  204. switch self.unwrapOutboundIn(data) {
  205. case .head(let requestHead):
  206. self.logger.debug("writing request head: \(requestHead)")
  207. context.write(self.wrapOutboundOut(.head(requestHead)), promise: promise)
  208. case .message(let message):
  209. var request = context.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength)
  210. messageWriter.write(message, into: &request)
  211. self.logger.debug("writing length prefixed protobuf message")
  212. context.write(self.wrapOutboundOut(.body(.byteBuffer(request))), promise: promise)
  213. case .end:
  214. self.logger.debug("writing end")
  215. context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
  216. }
  217. }
  218. }