| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658 |
- /*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIOCore
- import NIOHPACK
- import NIOHTTP1
- import NIOHTTP2
- import SwiftProtobuf
- /// A gRPC client request message part.
- ///
- /// - Important: This is **NOT** part of the public API. It is declared as
- /// `public` because it is used within performance tests.
- public enum _GRPCClientRequestPart<Request> {
- /// The 'head' of the request, that is, information about the initiation of the RPC.
- case head(_GRPCRequestHead)
- /// A deserialized request message to send to the server.
- case message(_MessageContext<Request>)
- /// Indicates that the client does not intend to send any further messages.
- case end
- }
- /// As `_GRPCClientRequestPart` but messages are serialized.
- /// - Important: This is **NOT** part of the public API.
- public typealias _RawGRPCClientRequestPart = _GRPCClientRequestPart<ByteBuffer>
- /// A gRPC client response message part.
- ///
- /// - Important: This is **NOT** part of the public API.
- public enum _GRPCClientResponsePart<Response> {
- /// Metadata received as the server acknowledges the RPC.
- case initialMetadata(HPACKHeaders)
- /// A deserialized response message received from the server.
- case message(_MessageContext<Response>)
- /// The metadata received at the end of the RPC.
- case trailingMetadata(HPACKHeaders)
- /// The final status of the RPC.
- case status(GRPCStatus)
- }
- /// As `_GRPCClientResponsePart` but messages are serialized.
- /// - Important: This is **NOT** part of the public API.
- public typealias _RawGRPCClientResponsePart = _GRPCClientResponsePart<ByteBuffer>
- /// - Important: This is **NOT** part of the public API. It is declared as
- /// `public` because it is used within performance tests.
- public struct _GRPCRequestHead {
- private final class _Storage {
- var method: String
- var scheme: String
- var path: String
- var host: String
- var deadline: NIODeadline
- var encoding: ClientMessageEncoding
- init(
- method: String,
- scheme: String,
- path: String,
- host: String,
- deadline: NIODeadline,
- encoding: ClientMessageEncoding
- ) {
- self.method = method
- self.scheme = scheme
- self.path = path
- self.host = host
- self.deadline = deadline
- self.encoding = encoding
- }
- func copy() -> _Storage {
- return .init(
- method: self.method,
- scheme: self.scheme,
- path: self.path,
- host: self.host,
- deadline: self.deadline,
- encoding: self.encoding
- )
- }
- }
- private var _storage: _Storage
- // Don't put this in storage: it would CoW for every mutation.
- internal var customMetadata: HPACKHeaders
- internal var method: String {
- get {
- return self._storage.method
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.method = newValue
- }
- }
- internal var scheme: String {
- get {
- return self._storage.scheme
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.scheme = newValue
- }
- }
- internal var path: String {
- get {
- return self._storage.path
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.path = newValue
- }
- }
- internal var host: String {
- get {
- return self._storage.host
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.host = newValue
- }
- }
- internal var deadline: NIODeadline {
- get {
- return self._storage.deadline
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.deadline = newValue
- }
- }
- internal var encoding: ClientMessageEncoding {
- get {
- return self._storage.encoding
- }
- set {
- if !isKnownUniquelyReferenced(&self._storage) {
- self._storage = self._storage.copy()
- }
- self._storage.encoding = newValue
- }
- }
- public init(
- method: String,
- scheme: String,
- path: String,
- host: String,
- deadline: NIODeadline,
- customMetadata: HPACKHeaders,
- encoding: ClientMessageEncoding
- ) {
- self._storage = .init(
- method: method,
- scheme: scheme,
- path: path,
- host: host,
- deadline: deadline,
- encoding: encoding
- )
- self.customMetadata = customMetadata
- }
- }
- extension _GRPCRequestHead {
- internal init(
- scheme: String,
- path: String,
- host: String,
- options: CallOptions,
- requestID: String?
- ) {
- let metadata: HPACKHeaders
- if let requestID = requestID, let requestIDHeader = options.requestIDHeader {
- var customMetadata = options.customMetadata
- customMetadata.add(name: requestIDHeader, value: requestID)
- metadata = customMetadata
- } else {
- metadata = options.customMetadata
- }
- self = _GRPCRequestHead(
- method: options.cacheable ? "GET" : "POST",
- scheme: scheme,
- path: path,
- host: host,
- deadline: options.timeLimit.makeDeadline(),
- customMetadata: metadata,
- encoding: options.messageEncoding
- )
- }
- }
- /// The type of gRPC call.
- public enum GRPCCallType: Hashable, Sendable {
- /// Unary: a single request and a single response.
- case unary
- /// Client streaming: many requests and a single response.
- case clientStreaming
- /// Server streaming: a single request and many responses.
- case serverStreaming
- /// Bidirectional streaming: many request and many responses.
- case bidirectionalStreaming
- public var isStreamingRequests: Bool {
- switch self {
- case .clientStreaming, .bidirectionalStreaming:
- return true
- case .unary, .serverStreaming:
- return false
- }
- }
- public var isStreamingResponses: Bool {
- switch self {
- case .serverStreaming, .bidirectionalStreaming:
- return true
- case .unary, .clientStreaming:
- return false
- }
- }
- }
- // MARK: - GRPCClientChannelHandler
- /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
- ///
- /// This channel handler should typically be used in conjunction with another handler which
- /// reads the parsed `GRPCClientResponsePart<Response>` messages and surfaces them to the caller
- /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
- /// most one response message.
- ///
- /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
- /// and response streams, which share a single HTTP/2 stream for transport.
- ///
- /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
- ///
- /// ```
- /// let multiplexer: HTTP2StreamMultiplexer = // ...
- /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
- /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
- /// streamID: streamID,
- /// callType: callType,
- /// logger: logger
- /// )
- /// return channel.pipeline.addHandler(clientChannelHandler)
- /// }
- /// ```
- internal final class GRPCClientChannelHandler {
- private let logger: Logger
- private var stateMachine: GRPCClientStateMachine
- private let maximumReceiveMessageLength: Int
- /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
- ///
- /// - Parameters:
- /// - callType: Type of RPC call being made.
- /// - maximumReceiveMessageLength: Maximum allowed length in bytes of a received message.
- /// - logger: Logger.
- internal init(
- callType: GRPCCallType,
- maximumReceiveMessageLength: Int,
- logger: Logger
- ) {
- self.logger = logger
- self.maximumReceiveMessageLength = maximumReceiveMessageLength
- switch callType {
- case .unary:
- self.stateMachine = .init(requestArity: .one, responseArity: .one)
- case .clientStreaming:
- self.stateMachine = .init(requestArity: .many, responseArity: .one)
- case .serverStreaming:
- self.stateMachine = .init(requestArity: .one, responseArity: .many)
- case .bidirectionalStreaming:
- self.stateMachine = .init(requestArity: .many, responseArity: .many)
- }
- }
- }
- // MARK: - GRPCClientChannelHandler: Inbound
- extension GRPCClientChannelHandler: ChannelInboundHandler {
- internal typealias InboundIn = HTTP2Frame.FramePayload
- internal typealias InboundOut = _RawGRPCClientResponsePart
- internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- let payload = self.unwrapInboundIn(data)
- switch payload {
- case let .headers(content):
- self.readHeaders(content: content, context: context)
- case let .data(content):
- self.readData(content: content, context: context)
- // We don't need to handle other frame type, just drop them instead.
- default:
- // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
- // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
- break
- }
- }
- /// Read the content from an HTTP/2 HEADERS frame received from the server.
- ///
- /// We can receive headers in two cases:
- /// - when the RPC is being acknowledged, and
- /// - when the RPC is being terminated.
- ///
- /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
- /// specification refers to this as a "Trailers-Only" response.
- ///
- /// - Parameter content: Content of the headers frame.
- /// - Parameter context: Channel handler context.
- private func readHeaders(
- content: HTTP2Frame.FramePayload.Headers,
- context: ChannelHandlerContext
- ) {
- self.logger.trace(
- "received HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "HEADERS",
- MetadataKey.h2Headers: "\(content.headers)",
- MetadataKey.h2EndStream: "\(content.endStream)",
- ]
- )
- // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
- // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
- // contain a gRPC status code then they must be for a "Trailers-Only" response.
- if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
- // We have the headers, pass them to the next handler:
- context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
- // Are they valid headers?
- let result = self.stateMachine.receiveEndOfResponseStream(content.headers)
- .mapError { error -> GRPCError.WithContext in
- // The headers aren't valid so let's figure out a reasonable error to forward:
- switch error {
- case let .invalidContentType(contentType):
- return GRPCError.InvalidContentType(contentType).captureContext()
- case let .invalidHTTPStatus(status):
- return GRPCError.InvalidHTTPStatus(status).captureContext()
- case .invalidState:
- return GRPCError.InvalidState("parsing end-of-stream trailers").captureContext()
- }
- }
- // Okay, what should we tell the next handler?
- switch result {
- case let .success(status):
- context.fireChannelRead(self.wrapInboundOut(.status(status)))
- case let .failure(error):
- context.fireErrorCaught(error)
- }
- } else {
- // "Normal" response headers, but are they valid?
- let result = self.stateMachine.receiveResponseHeaders(content.headers)
- .mapError { error -> GRPCError.WithContext in
- // The headers aren't valid so let's figure out a reasonable error to forward:
- switch error {
- case let .invalidContentType(contentType):
- return GRPCError.InvalidContentType(contentType).captureContext()
- case let .invalidHTTPStatus(status):
- return GRPCError.InvalidHTTPStatus(status).captureContext()
- case .unsupportedMessageEncoding:
- return GRPCError.CompressionUnsupported().captureContext()
- case .invalidState:
- return GRPCError.InvalidState("parsing headers").captureContext()
- }
- }
- // Okay, what should we tell the next handler?
- switch result {
- case .success:
- context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
- case let .failure(error):
- context.fireErrorCaught(error)
- }
- }
- }
- /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
- /// necessary to deserialize a message (or messages).
- ///
- /// - Parameter content: Content of the data frame.
- /// - Parameter context: Channel handler context.
- private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
- // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
- guard case var .byteBuffer(buffer) = content.data else {
- preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
- }
- self.logger.trace(
- "received HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "DATA",
- MetadataKey.h2DataBytes: "\(content.data.readableBytes)",
- MetadataKey.h2EndStream: "\(content.endStream)",
- ]
- )
- self.consumeBytes(from: &buffer, context: context)
- // End stream is set; we don't usually expect this but can handle it in some situations.
- if content.endStream, let status = self.stateMachine.receiveEndOfResponseStream() {
- self.logger.warning("Unexpected end stream set on DATA frame")
- context.fireChannelRead(self.wrapInboundOut(.status(status)))
- }
- }
- private func consumeBytes(from buffer: inout ByteBuffer, context: ChannelHandlerContext) {
- // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
- // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
- // containing the gRPC status code) and an additional empty data frame is sent with the
- // end-of-stream flag set.
- guard buffer.readableBytes > 0 else {
- return
- }
- // Feed the buffer into the state machine.
- let result = self.stateMachine.receiveResponseBuffer(
- &buffer,
- maxMessageLength: self.maximumReceiveMessageLength
- ).mapError { error -> GRPCError.WithContext in
- switch error {
- case .cardinalityViolation:
- return GRPCError.StreamCardinalityViolation.response.captureContext()
- case .deserializationFailed, .leftOverBytes:
- return GRPCError.DeserializationFailure().captureContext()
- case let .decompressionLimitExceeded(compressedSize):
- return GRPCError.DecompressionLimitExceeded(compressedSize: compressedSize)
- .captureContext()
- case let .lengthExceedsLimit(underlyingError):
- return underlyingError.captureContext()
- case .invalidState:
- return GRPCError.InvalidState("parsing data as a response message").captureContext()
- }
- }
- // Did we get any messages?
- switch result {
- case let .success(messages):
- // Awesome: we got some messages. The state machine guarantees we only get at most a single
- // message for unary and client-streaming RPCs.
- for message in messages {
- // Note: `compressed: false` is currently just a placeholder. This is fine since the message
- // context is not currently exposed to the user. If we implement interceptors for the client
- // and decide to surface this information then we'll need to extract that information from
- // the message reader.
- context.fireChannelRead(self.wrapInboundOut(.message(.init(message, compressed: false))))
- }
- case let .failure(error):
- context.fireErrorCaught(error)
- }
- }
- }
- // MARK: - GRPCClientChannelHandler: Outbound
- extension GRPCClientChannelHandler: ChannelOutboundHandler {
- internal typealias OutboundIn = _RawGRPCClientRequestPart
- internal typealias OutboundOut = HTTP2Frame.FramePayload
- internal func write(
- context: ChannelHandlerContext,
- data: NIOAny,
- promise: EventLoopPromise<Void>?
- ) {
- switch self.unwrapOutboundIn(data) {
- case let .head(requestHead):
- // Feed the request into the state machine:
- switch self.stateMachine.sendRequestHeaders(
- requestHead: requestHead,
- allocator: context.channel.allocator
- ) {
- case let .success(headers):
- // We're clear to write some headers. Create an appropriate frame and write it.
- let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
- self.logger.trace(
- "writing HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "HEADERS",
- MetadataKey.h2Headers: "\(headers)",
- MetadataKey.h2EndStream: "false",
- ]
- )
- context.write(self.wrapOutboundOut(framePayload), promise: promise)
- case let .failure(sendRequestHeadersError):
- switch sendRequestHeadersError {
- case .invalidState:
- // This is bad: we need to trigger an error and close the channel.
- promise?.fail(sendRequestHeadersError)
- context.fireErrorCaught(GRPCError.InvalidState("unable to initiate RPC").captureContext())
- }
- }
- case let .message(request):
- // Feed the request message into the state machine:
- let result = self.stateMachine.sendRequest(
- request.message,
- compressed: request.compressed,
- promise: promise
- )
- switch result {
- case .success:
- ()
- case let .failure(writeError):
- switch writeError {
- case .cardinalityViolation:
- // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
- promise?.fail(writeError)
- case .serializationFailed:
- // This is bad: we need to trigger an error and close the channel.
- promise?.fail(writeError)
- context.fireErrorCaught(GRPCError.SerializationFailure().captureContext())
- case .invalidState:
- promise?.fail(writeError)
- context
- .fireErrorCaught(GRPCError.InvalidState("unable to write message").captureContext())
- }
- }
- case .end:
- // About to send end: write any outbound messages first.
- while let (result, promise) = self.stateMachine.nextRequest() {
- switch result {
- case let .success(buffer):
- let framePayload: HTTP2Frame.FramePayload = .data(
- .init(data: .byteBuffer(buffer), endStream: false)
- )
- self.logger.trace(
- "writing HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "DATA",
- MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
- MetadataKey.h2EndStream: "false",
- ]
- )
- context.write(self.wrapOutboundOut(framePayload), promise: promise)
- case let .failure(error):
- context.fireErrorCaught(error)
- promise?.fail(error)
- return
- }
- }
- // Okay: can we close the request stream?
- switch self.stateMachine.sendEndOfRequestStream() {
- case .success:
- // We can. Send an empty DATA frame with end-stream set.
- let empty = context.channel.allocator.buffer(capacity: 0)
- let framePayload: HTTP2Frame.FramePayload = .data(
- .init(data: .byteBuffer(empty), endStream: true)
- )
- self.logger.trace(
- "writing HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "DATA",
- MetadataKey.h2DataBytes: "0",
- MetadataKey.h2EndStream: "true",
- ]
- )
- context.write(self.wrapOutboundOut(framePayload), promise: promise)
- case let .failure(error):
- // Why can't we close the request stream?
- switch error {
- case .alreadyClosed:
- // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
- promise?.fail(error)
- case .invalidState:
- // This is bad: we need to trigger an error and close the channel.
- promise?.fail(error)
- context
- .fireErrorCaught(
- GRPCError.InvalidState("unable to close request stream")
- .captureContext()
- )
- }
- }
- }
- }
- func flush(context: ChannelHandlerContext) {
- // Drain any requests.
- while let (result, promise) = self.stateMachine.nextRequest() {
- switch result {
- case let .success(buffer):
- let framePayload: HTTP2Frame.FramePayload = .data(
- .init(data: .byteBuffer(buffer), endStream: false)
- )
- self.logger.trace(
- "writing HTTP2 frame",
- metadata: [
- MetadataKey.h2Payload: "DATA",
- MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
- MetadataKey.h2EndStream: "false",
- ]
- )
- context.write(self.wrapOutboundOut(framePayload), promise: promise)
- case let .failure(error):
- context.fireErrorCaught(error)
- promise?.fail(error)
- return
- }
- }
- context.flush()
- }
- }
|