| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import NIOCore
- import NIOHPACK
- import NIOHTTP1
- import NIOHTTP2
- import struct Foundation.Data
- /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
- internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
- internal typealias InboundIn = HTTPServerRequestPart
- internal typealias InboundOut = HTTP2Frame.FramePayload
- internal typealias OutboundIn = HTTP2Frame.FramePayload
- internal typealias OutboundOut = HTTPServerResponsePart
- private var stateMachine: StateMachine
- /// Create a gRPC Web to server HTTP/2 codec.
- ///
- /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
- /// request headers.
- init(scheme: String) {
- self.stateMachine = StateMachine(scheme: scheme)
- }
- internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- let action = self.stateMachine.processInbound(
- serverRequestPart: self.unwrapInboundIn(data),
- allocator: context.channel.allocator
- )
- self.act(on: action, context: context)
- }
- internal func write(
- context: ChannelHandlerContext,
- data: NIOAny,
- promise: EventLoopPromise<Void>?
- ) {
- let action = self.stateMachine.processOutbound(
- framePayload: self.unwrapOutboundIn(data),
- promise: promise,
- allocator: context.channel.allocator
- )
- self.act(on: action, context: context)
- }
- /// Acts on an action returned by the state machine.
- private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
- switch action {
- case .none:
- ()
- case let .fireChannelRead(payload):
- context.fireChannelRead(self.wrapInboundOut(payload))
- case let .write(write):
- if let additionalPart = write.additionalPart {
- context.write(self.wrapOutboundOut(write.part), promise: nil)
- context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
- } else {
- context.write(self.wrapOutboundOut(write.part), promise: write.promise)
- }
- if write.closeChannel {
- context.close(mode: .all, promise: nil)
- }
- case let .completePromise(promise, result):
- promise?.completeWith(result)
- }
- }
- }
- extension GRPCWebToHTTP2ServerCodec {
- internal struct StateMachine {
- /// The current state.
- private var state: State
- private let scheme: String
- internal init(scheme: String) {
- self.state = .idle
- self.scheme = scheme
- }
- /// Process the inbound `HTTPServerRequestPart`.
- internal mutating func processInbound(
- serverRequestPart: HTTPServerRequestPart,
- allocator: ByteBufferAllocator
- ) -> Action {
- return self.state.processInbound(
- serverRequestPart: serverRequestPart,
- scheme: self.scheme,
- allocator: allocator
- )
- }
- /// Process the outbound `HTTP2Frame.FramePayload`.
- internal mutating func processOutbound(
- framePayload: HTTP2Frame.FramePayload,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> Action {
- return self.state.processOutbound(
- framePayload: framePayload,
- promise: promise,
- allocator: allocator
- )
- }
- /// An action to take as a result of interaction with the state machine.
- internal enum Action {
- case none
- case fireChannelRead(HTTP2Frame.FramePayload)
- case write(Write)
- case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
- internal struct Write {
- internal var part: HTTPServerResponsePart
- internal var additionalPart: HTTPServerResponsePart?
- internal var promise: EventLoopPromise<Void>?
- internal var closeChannel: Bool
- internal init(
- part: HTTPServerResponsePart,
- additionalPart: HTTPServerResponsePart? = nil,
- promise: EventLoopPromise<Void>?,
- closeChannel: Bool
- ) {
- self.part = part
- self.additionalPart = additionalPart
- self.promise = promise
- self.closeChannel = closeChannel
- }
- }
- }
- fileprivate enum State {
- /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
- /// receiving request headers.
- case idle
- /// Received request headers. Waiting for the end of request and response streams.
- case fullyOpen(InboundState, OutboundState)
- /// The server has closed the response stream, we may receive other request parts from the client.
- case clientOpenServerClosed(InboundState)
- /// The client has sent everything, the server still needs to close the response stream.
- case clientClosedServerOpen(OutboundState)
- /// Not a real state.
- case _modifying
- private var isModifying: Bool {
- switch self {
- case ._modifying:
- return true
- case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
- return false
- }
- }
- private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
- self = ._modifying
- defer {
- assert(!self.isModifying)
- }
- return body(&self)
- }
- }
- fileprivate struct InboundState {
- /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
- /// is being used, `nil` otherwise.
- var requestBuffer: ByteBuffer?
- init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
- self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
- }
- }
- fileprivate struct OutboundState {
- /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
- /// otherwise.
- var responseBuffer: CircularBuffer<ByteBuffer>?
- /// True if the response headers have been sent.
- var responseHeadersSent: Bool
- /// True if the server should close the connection when this request is done.
- var closeConnection: Bool
- init(isTextEncoded: Bool, closeConnection: Bool) {
- self.responseHeadersSent = false
- self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
- self.closeConnection = closeConnection
- }
- }
- }
- }
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- fileprivate mutating func processInbound(
- serverRequestPart: HTTPServerRequestPart,
- scheme: String,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch serverRequestPart {
- case let .head(head):
- return self.processRequestHead(head, scheme: scheme, allocator: allocator)
- case var .body(buffer):
- return self.processRequestBody(&buffer)
- case .end:
- return self.processRequestEnd(allocator: allocator)
- }
- }
- fileprivate mutating func processOutbound(
- framePayload: HTTP2Frame.FramePayload,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch framePayload {
- case let .headers(payload):
- return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
- case let .data(payload):
- return self.processResponseData(payload, promise: promise)
- case .priority,
- .rstStream,
- .settings,
- .pushPromise,
- .ping,
- .goAway,
- .windowUpdate,
- .alternativeService,
- .origin:
- preconditionFailure("Unsupported frame payload")
- }
- }
- }
- // MARK: - Inbound
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- private mutating func processRequestHead(
- _ head: HTTPRequestHead,
- scheme: String,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- return self.withStateAvoidingCoWs { state in
- let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
- // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
- // allocate a second headers block to use the normalization provided by NIO HTTP/2.
- //
- // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
- // extra copy.
- var headers = HPACKHeaders()
- headers.reserveCapacity(normalized.count + 4)
- headers.add(name: ":path", value: head.uri)
- headers.add(name: ":method", value: head.method.rawValue)
- headers.add(name: ":scheme", value: scheme)
- if let host = head.headers.first(name: "host") {
- headers.add(name: ":authority", value: host)
- }
- headers.add(contentsOf: normalized)
- // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
- // that will be done at the HTTP/2 level.
- let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
- let isWebText = contentType == .some(.webTextProtobuf)
- let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
- state = .fullyOpen(
- .init(isTextEncoded: isWebText, allocator: allocator),
- .init(isTextEncoded: isWebText, closeConnection: closeConnection)
- )
- return .fireChannelRead(.headers(.init(headers: headers)))
- }
- case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
- preconditionFailure("Invalid state: already received request head")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processRequestBody(
- _ buffer: inout ByteBuffer
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case .fullyOpen(var inbound, let outbound):
- return self.withStateAvoidingCoWs { state in
- let action = inbound.processInboundData(buffer: &buffer)
- state = .fullyOpen(inbound, outbound)
- return action
- }
- case var .clientOpenServerClosed(inbound):
- // The server is already done, but it's not our place to drop the request.
- return self.withStateAvoidingCoWs { state in
- let action = inbound.processInboundData(buffer: &buffer)
- state = .clientOpenServerClosed(inbound)
- return action
- }
- case .clientClosedServerOpen:
- preconditionFailure("End of request stream already received")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processRequestEnd(
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case let .fullyOpen(_, outbound):
- return self.withStateAvoidingCoWs { state in
- // We're done with inbound state.
- state = .clientClosedServerOpen(outbound)
- // Send an empty DATA frame with the end stream flag set.
- let empty = allocator.buffer(capacity: 0)
- return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
- }
- case .clientClosedServerOpen:
- preconditionFailure("End of request stream already received")
- case .clientOpenServerClosed:
- return self.withStateAvoidingCoWs { state in
- // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
- // it's necessary to communicate to the other peers that the response is done.
- state = .idle
- // Send an empty DATA frame with the end stream flag set.
- let empty = allocator.buffer(capacity: 0)
- return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
- }
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- }
- // MARK: - Outbound
- extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
- private mutating func processResponseTrailers(
- _ trailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case .fullyOpen(let inbound, var outbound):
- return self.withStateAvoidingCoWs { state in
- // Double check these are trailers.
- assert(outbound.responseHeadersSent)
- // We haven't seen the end of the request stream yet.
- state = .clientOpenServerClosed(inbound)
- // Avoid CoW-ing the buffers.
- let responseBuffers = outbound.responseBuffer
- outbound.responseBuffer = nil
- return Self.processTrailers(
- responseBuffers: responseBuffers,
- trailers: trailers,
- promise: promise,
- allocator: allocator,
- closeChannel: outbound.closeConnection
- )
- }
- case var .clientClosedServerOpen(state):
- return self.withStateAvoidingCoWs { nextState in
- // Client is closed and now so is the server.
- nextState = .idle
- // Avoid CoW-ing the buffers.
- let responseBuffers = state.responseBuffer
- state.responseBuffer = nil
- return Self.processTrailers(
- responseBuffers: responseBuffers,
- trailers: trailers,
- promise: promise,
- allocator: allocator,
- closeChannel: state.closeConnection
- )
- }
- case .clientOpenServerClosed:
- preconditionFailure("Already seen end of response stream")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private static func processTrailers(
- responseBuffers: CircularBuffer<ByteBuffer>?,
- trailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator,
- closeChannel: Bool
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- if var responseBuffers = responseBuffers {
- let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
- &responseBuffers,
- trailers: trailers,
- allocator: allocator
- )
- return .write(
- .init(
- part: .body(.byteBuffer(buffer)),
- additionalPart: .end(nil),
- promise: promise,
- closeChannel: closeChannel
- )
- )
- } else {
- // No response buffer; plain gRPC Web. Trailers are encoded into the body as a regular
- // length-prefixed message.
- let buffer = GRPCWebToHTTP2ServerCodec.formatTrailers(trailers, allocator: allocator)
- return .write(
- .init(
- part: .body(.byteBuffer(buffer)),
- additionalPart: .end(nil),
- promise: promise,
- closeChannel: closeChannel
- )
- )
- }
- }
- private mutating func processResponseTrailersOnly(
- _ trailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case let .fullyOpen(inbound, outbound):
- return self.withStateAvoidingCoWs { state in
- // We still haven't seen the end of the request stream.
- state = .clientOpenServerClosed(inbound)
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
- hpackHeaders: trailers,
- closeConnection: outbound.closeConnection
- )
- return .write(
- .init(
- part: .head(head),
- additionalPart: .end(nil),
- promise: promise,
- closeChannel: outbound.closeConnection
- )
- )
- }
- case let .clientClosedServerOpen(outbound):
- return self.withStateAvoidingCoWs { state in
- // We're done, back to idle.
- state = .idle
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
- hpackHeaders: trailers,
- closeConnection: outbound.closeConnection
- )
- return .write(
- .init(
- part: .head(head),
- additionalPart: .end(nil),
- promise: promise,
- closeChannel: outbound.closeConnection
- )
- )
- }
- case .clientOpenServerClosed:
- preconditionFailure("Already seen end of response stream")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processResponseHeaders(
- _ headers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case .fullyOpen(let inbound, var outbound):
- return self.withStateAvoidingCoWs { state in
- outbound.responseHeadersSent = true
- state = .fullyOpen(inbound, outbound)
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
- hpackHeaders: headers,
- closeConnection: outbound.closeConnection
- )
- return .write(.init(part: .head(head), promise: promise, closeChannel: false))
- }
- case var .clientClosedServerOpen(outbound):
- return self.withStateAvoidingCoWs { state in
- outbound.responseHeadersSent = true
- state = .clientClosedServerOpen(outbound)
- let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
- hpackHeaders: headers,
- closeConnection: outbound.closeConnection
- )
- return .write(.init(part: .head(head), promise: promise, closeChannel: false))
- }
- case .clientOpenServerClosed:
- preconditionFailure("Already seen end of response stream")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private mutating func processResponseHeaders(
- _ payload: HTTP2Frame.FramePayload.Headers,
- promise: EventLoopPromise<Void>?,
- allocator: ByteBufferAllocator
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case let .fullyOpen(_, outbound),
- let .clientClosedServerOpen(outbound):
- if outbound.responseHeadersSent {
- // Headers have been sent, these must be trailers, so end stream must be set.
- assert(payload.endStream)
- return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
- } else if payload.endStream {
- // Headers haven't been sent yet and end stream is set: this is a trailers only response
- // so we need to send 'end' as well.
- return self.processResponseTrailersOnly(payload.headers, promise: promise)
- } else {
- return self.processResponseHeaders(payload.headers, promise: promise)
- }
- case .clientOpenServerClosed:
- // We've already sent end.
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- private static func processResponseData(
- _ payload: HTTP2Frame.FramePayload.Data,
- promise: EventLoopPromise<Void>?,
- state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- if state.responseBuffer == nil {
- // Not gRPC Web Text; just write the body.
- return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
- } else {
- switch payload.data {
- case let .byteBuffer(buffer):
- // '!' is fine, we checked above.
- state.responseBuffer!.append(buffer)
- case .fileRegion:
- preconditionFailure("Unexpected IOData.fileRegion")
- }
- // The response is buffered, we can consider it dealt with.
- return .completePromise(promise, .success(()))
- }
- }
- private mutating func processResponseData(
- _ payload: HTTP2Frame.FramePayload.Data,
- promise: EventLoopPromise<Void>?
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- switch self {
- case .idle:
- preconditionFailure("Invalid state: haven't received request head")
- case .fullyOpen(let inbound, var outbound):
- return self.withStateAvoidingCoWs { state in
- let action = Self.processResponseData(payload, promise: promise, state: &outbound)
- state = .fullyOpen(inbound, outbound)
- return action
- }
- case var .clientClosedServerOpen(outbound):
- return self.withStateAvoidingCoWs { state in
- let action = Self.processResponseData(payload, promise: promise, state: &outbound)
- state = .clientClosedServerOpen(outbound)
- return action
- }
- case .clientOpenServerClosed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- }
- // MARK: - Helpers
- extension GRPCWebToHTTP2ServerCodec {
- private static func makeResponseHead(
- hpackHeaders: HPACKHeaders,
- closeConnection: Bool
- ) -> HTTPResponseHead {
- var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
- if closeConnection {
- headers.add(name: "connection", value: "close")
- }
- // Grab the status, if this is missing we've messed up in another handler.
- guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
- preconditionFailure("Invalid state: missing ':status' pseudo header")
- }
- return HTTPResponseHead(
- version: .init(major: 1, minor: 1),
- status: .init(statusCode: statusCode),
- headers: headers
- )
- }
- private static func formatTrailers(
- _ trailers: HPACKHeaders,
- allocator: ByteBufferAllocator
- ) -> ByteBuffer {
- // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
- let length = trailers.reduce(0) { partial, trailer in
- // +4 for: ":", " ", "\r", "\n"
- return partial + trailer.name.utf8.count + trailer.value.utf8.count + 4
- }
- var buffer = allocator.buffer(capacity: 5 + length)
- // Uncompressed trailer byte.
- buffer.writeInteger(UInt8(0x80))
- // Length.
- let lengthIndex = buffer.writerIndex
- buffer.writeInteger(UInt32(0))
- var bytesWritten = 0
- for (name, value, _) in trailers {
- bytesWritten += buffer.writeString(name)
- bytesWritten += buffer.writeString(": ")
- bytesWritten += buffer.writeString(value)
- bytesWritten += buffer.writeString("\r\n")
- }
- buffer.setInteger(UInt32(bytesWritten), at: lengthIndex)
- return buffer
- }
- private static func encodeResponsesAndTrailers(
- _ responses: inout CircularBuffer<ByteBuffer>,
- trailers: HPACKHeaders,
- allocator: ByteBufferAllocator
- ) -> ByteBuffer {
- // We need to encode the trailers along with any responses we're holding.
- responses.append(self.formatTrailers(trailers, allocator: allocator))
- let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
- // '!' is fine: responses isn't empty, we just appended the trailers.
- var buffer = responses.popFirst()!
- // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
- // but this is fine for now.
- var accumulatedData = buffer.readData(length: buffer.readableBytes)!
- accumulatedData.reserveCapacity(capacity)
- while let buffer = responses.popFirst() {
- accumulatedData.append(contentsOf: buffer.readableBytesView)
- }
- // We can reuse the popped buffer.
- let base64Encoded = accumulatedData.base64EncodedString()
- buffer.clear(minimumCapacity: base64Encoded.utf8.count)
- buffer.writeString(base64Encoded)
- return buffer
- }
- }
- extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
- fileprivate mutating func processInboundData(
- buffer: inout ByteBuffer
- ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
- if self.requestBuffer == nil {
- // We're not dealing with gRPC Web Text: just forward the buffer.
- return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
- }
- if self.requestBuffer!.readableBytes == 0 {
- self.requestBuffer = buffer
- } else {
- self.requestBuffer!.writeBuffer(&buffer)
- }
- let readableBytes = self.requestBuffer!.readableBytes
- // The length of base64 encoded data must be a multiple of 4.
- let bytesToRead = readableBytes - (readableBytes % 4)
- let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
- if bytesToRead > 0,
- let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
- let base64Decoded = Data(base64Encoded: base64Encoded)
- {
- // Recycle the input buffer and restore the request buffer.
- buffer.clear()
- buffer.writeContiguousBytes(base64Decoded)
- action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
- } else {
- action = .none
- }
- return action
- }
- }
- extension HTTPHeaders {
- fileprivate init(hpackHeaders headers: HPACKHeaders) {
- self.init()
- self.reserveCapacity(headers.count)
- // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
- let regularHeaders = headers.drop { name, _, _ in
- name.utf8.first == .some(UInt8(ascii: ":"))
- }.lazy.map { name, value, _ in
- (name, value)
- }
- self.add(contentsOf: regularHeaders)
- }
- }
|