| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import struct Foundation.Data
- import NIO
- import NIOHPACK
- import NIOHTTP1
- import NIOHTTP2
- /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
- internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
- internal typealias InboundIn = HTTPServerRequestPart
- internal typealias InboundOut = HTTP2Frame.FramePayload
- internal typealias OutboundIn = HTTP2Frame.FramePayload
- internal typealias OutboundOut = HTTPServerResponsePart
- private var stateMachine: StateMachine
- /// Create a gRPC Web to server HTTP/2 codec.
- ///
- /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
- /// request headers.
- init(scheme: String) {
- self.stateMachine = StateMachine(scheme: scheme)
- }
- internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- let action = self.stateMachine.processInbound(
- serverRequestPart: self.unwrapInboundIn(data),
- allocator: context.channel.allocator
- )
- self.act(on: action, context: context)
- }
- internal func write(
- context: ChannelHandlerContext,
- data: NIOAny,
- promise: EventLoopPromise<Void>?
- ) {
- let action = self.stateMachine.processOutbound(
- framePayload: self.unwrapOutboundIn(data),
- promise: promise,
- allocator: context.channel.allocator
- )
- self.act(on: action, context: context)
- }
- /// Acts on an action returned by the state machine.
- private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
- switch action {
- case .none:
- ()
- case let .fireChannelRead(payload):
- context.fireChannelRead(self.wrapInboundOut(payload))
- case let .write(part1, part2, promise):
- if let part2 = part2 {
- context.write(self.wrapOutboundOut(part1), promise: nil)
- context.write(self.wrapOutboundOut(part2), promise: promise)
- } else {
- context.write(self.wrapOutboundOut(part1), promise: promise)
- }
- case let .completePromise(promise, result):
- promise?.completeWith(result)
- }
- }
- }
- extension GRPCWebToHTTP2ServerCodec {
- struct StateMachine {
- /// The current state.
- private var state: State
- fileprivate init(scheme: String) {
- self.state = .idle(scheme: scheme)
- }
- private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
- var state: State = ._modifying
- swap(&self.state, &state)
- defer {
- swap(&self.state, &state)
- }
- return body(&state)
- }
- /// Process the inbound `HTTPServerRequestPart`.
- fileprivate mutating func processInbound(
- serverRequestPart: HTTPServerRequestPart,
- allocator: ByteBufferAllocator
- ) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.processInbound(serverRequestPart: serverRequestPart, allocator: allocator)
- }
- }
- /// Process the outbound `HTTP2Frame.FramePayload`.
- fileprivate mutating func processOutbound(
- framePayload: HTTP2Frame.FramePayload,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator)
- }
- }
- /// An action to take as a result of interaction with the state machine.
- fileprivate enum Action {
- case none
- case fireChannelRead(HTTP2Frame.FramePayload)
- case write(HTTPServerResponsePart, HTTPServerResponsePart?, EventLoopPromise<Void>?)
- case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
- }
- fileprivate enum State {
- /// Idle; nothing has been received or sent. The only valid transition is to 'open' when
- /// receiving request headers.
- case idle(scheme: String)
- /// Open; the request headers have been received and we have not sent the end of the response
- /// stream.
- case open(OpenState)
- /// Closed; the response stream (and therefore the request stream) has been closed.
- case closed
- /// Not a real state.
- case _modifying
- }
- fileprivate struct OpenState {
- /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
- /// is being used, `nil` otherwise.
- var requestBuffer: ByteBuffer?
- /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
- /// otherwise.
- var responseBuffer: CircularBuffer<ByteBuffer>?
- /// True if the end of the request stream has been received.
- var requestEndSeen: Bool
- /// True if the response headers have been sent.
- var responseHeadersSent: Bool
- init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
- self.requestEndSeen = false
- self.responseHeadersSent = false
- if isTextEncoded {
- self.requestBuffer = allocator.buffer(capacity: 0)
- self.responseBuffer = CircularBuffer()
- } else {
- self.requestBuffer = nil
- self.responseBuffer = nil
- }
- }
- }
- }
- }
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- fileprivate mutating func processInbound(
- serverRequestPart: HTTPServerRequestPart,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch serverRequestPart {
- case let .head(head):
- return self.processRequestHead(head, allocator: allocator)
- case var .body(buffer):
- return self.processRequestBody(&buffer)
- case .end:
- return self.processRequestEnd(allocator: allocator)
- }
- }
- fileprivate mutating func processOutbound(
- framePayload: HTTP2Frame.FramePayload,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch framePayload {
- case let .headers(payload):
- return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
- case let .data(payload):
- return self.processResponseData(payload, promise: promise)
- case .priority,
- .rstStream,
- .settings,
- .pushPromise,
- .ping,
- .goAway,
- .windowUpdate,
- .alternativeService,
- .origin:
- preconditionFailure("Unsupported frame payload")
- }
- }
- }
- // MARK: - Inbound
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- private mutating func processRequestHead(
- _ head: HTTPRequestHead,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case let .idle(scheme):
- let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
- // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
- // allocate a second headers block to use the normalization provided by NIO HTTP/2.
- //
- // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
- // extra copy.
- var headers = HPACKHeaders()
- headers.reserveCapacity(normalized.count + 4)
- headers.add(name: ":path", value: head.uri)
- headers.add(name: ":method", value: head.method.rawValue)
- headers.add(name: ":scheme", value: scheme)
- if let host = head.headers.first(name: "host") {
- headers.add(name: ":authority", value: host)
- }
- headers.add(contentsOf: normalized)
- // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
- // that will be done at the HTTP/2 level.
- let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
- let isWebText = contentType == .some(.webTextProtobuf)
- self = .open(.init(isTextEncoded: isWebText, allocator: allocator))
- return .fireChannelRead(.headers(.init(headers: headers)))
- case .open, .closed:
- preconditionFailure("Invalid state: already received request head")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processRequestBody(
- _ buffer: inout ByteBuffer
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case var .open(state):
- assert(!state.requestEndSeen, "Invalid state: request stream closed")
- if state.requestBuffer == nil {
- // We're not dealing with gRPC Web Text: just forward the buffer.
- return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
- }
- if state.requestBuffer!.readableBytes == 0 {
- state.requestBuffer = buffer
- } else {
- state.requestBuffer!.writeBuffer(&buffer)
- }
- let readableBytes = state.requestBuffer!.readableBytes
- // The length of base64 encoded data must be a multiple of 4.
- let bytesToRead = readableBytes - (readableBytes % 4)
- let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
- if bytesToRead > 0,
- let base64Encoded = state.requestBuffer!.readString(length: bytesToRead),
- let base64Decoded = Data(base64Encoded: base64Encoded) {
- // Recycle the input buffer and restore the request buffer.
- buffer.clear()
- buffer.writeContiguousBytes(base64Decoded)
- action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
- } else {
- action = .none
- }
- self = .open(state)
- return action
- case .closed:
- return .none
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processRequestEnd(
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case var .open(state):
- assert(!state.requestEndSeen, "Invalid state: already seen end stream ")
- state.requestEndSeen = true
- self = .open(state)
- // Send an empty DATA frame with the end stream flag set.
- let empty = allocator.buffer(capacity: 0)
- return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
- case .closed:
- return .none
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- }
- // MARK: - Outbound
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- private mutating func processResponseHeaders(
- _ payload: HTTP2Frame.FramePayload.Headers,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case var .open(state):
- let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
- if state.responseHeadersSent {
- // Headers have been sent, these must be trailers, so end stream must be set.
- assert(payload.endStream)
- if var responseBuffer = state.responseBuffer {
- // We have a response buffer; we're doing gRPC Web Text. Nil out the buffer to avoid CoWs.
- state.responseBuffer = nil
- let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
- &responseBuffer,
- trailers: payload.headers,
- allocator: allocator
- )
- self = .closed
- action = .write(.body(.byteBuffer(buffer)), .end(nil), promise)
- } else {
- // No response buffer; plain gRPC Web.
- let trailers = HTTPHeaders(hpackHeaders: payload.headers)
- self = .closed
- action = .write(.end(trailers), nil, promise)
- }
- } else if payload.endStream {
- // Headers haven't been sent yet and end stream is set: this is a trailers only response
- // so we need to send 'end' as well.
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(hpackHeaders: payload.headers)
- self = .closed
- action = .write(.head(head), .end(nil), promise)
- } else {
- // Headers haven't been sent, end stream isn't set. Just send response head.
- state.responseHeadersSent = true
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(hpackHeaders: payload.headers)
- self = .open(state)
- action = .write(.head(head), nil, promise)
- }
- return action
- case .closed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processResponseData(
- _ payload: HTTP2Frame.FramePayload.Data,
- promise: EventLoopPromise<Void>?
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case var .open(state):
- if state.responseBuffer == nil {
- // Not gRPC Web Text; just write the body.
- return .write(.body(payload.data), nil, promise)
- } else {
- switch payload.data {
- case let .byteBuffer(buffer):
- // '!' is fine, we checked above.
- state.responseBuffer!.append(buffer)
- case .fileRegion:
- preconditionFailure("Unexpected IOData.fileRegion")
- }
- self = .open(state)
- // The response is buffered, we can consider it dealt with.
- return .completePromise(promise, .success(()))
- }
- case .closed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- }
- // MARK: - Helpers
- extension GRPCWebToHTTP2ServerCodec {
- private static func makeResponseHead(hpackHeaders: HPACKHeaders) -> HTTPResponseHead {
- let headers = HTTPHeaders(hpackHeaders: hpackHeaders)
- // Grab the status, if this is missing we've messed up in another handler.
- guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
- preconditionFailure("Invalid state: missing ':status' pseudo header")
- }
- return HTTPResponseHead(
- version: .init(major: 1, minor: 1),
- status: .init(statusCode: statusCode),
- headers: headers
- )
- }
- private static func formatTrailers(
- _ trailers: HPACKHeaders,
- allocator: ByteBufferAllocator
- ) -> ByteBuffer {
- // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
- let encodedTrailers = trailers.map { name, value, _ in
- "\(name): \(value)"
- }.joined(separator: "\r\n")
- var buffer = allocator.buffer(capacity: 5 + encodedTrailers.utf8.count)
- // Uncompressed trailer byte.
- buffer.writeInteger(UInt8(0x80))
- // Length.
- buffer.writeInteger(UInt32(encodedTrailers.utf8.count))
- // Uncompressed trailers.
- buffer.writeString(encodedTrailers)
- return buffer
- }
- private static func encodeResponsesAndTrailers(
- _ responses: inout CircularBuffer<ByteBuffer>,
- trailers: HPACKHeaders,
- allocator: ByteBufferAllocator
- ) -> ByteBuffer {
- // We need to encode the trailers along with any responses we're holding.
- responses.append(self.formatTrailers(trailers, allocator: allocator))
- let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
- // '!' is fine: responses isn't empty, we just appended the trailers.
- var buffer = responses.popFirst()!
- // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
- // but this is fine for now.
- var accumulatedData = buffer.readData(length: buffer.readableBytes)!
- accumulatedData.reserveCapacity(capacity)
- while let buffer = responses.popFirst() {
- accumulatedData.append(contentsOf: buffer.readableBytesView)
- }
- // We can reuse the popped buffer.
- let base64Encoded = accumulatedData.base64EncodedString()
- buffer.clear(minimumCapacity: base64Encoded.utf8.count)
- buffer.writeString(base64Encoded)
- return buffer
- }
- }
- extension HTTPHeaders {
- fileprivate init(hpackHeaders headers: HPACKHeaders) {
- self.init()
- self.reserveCapacity(headers.count)
- // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
- let regularHeaders = headers.drop { name, _, _ in
- name.utf8.first == .some(UInt8(ascii: ":"))
- }.lazy.map { name, value, _ in
- (name, value)
- }
- self.add(contentsOf: regularHeaders)
- }
- }
|