| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIO
- import NIOHPACK
- import NIOHTTP2
- struct HTTP2ToRawGRPCStateMachine {
- /// The current state.
- private var state: State
- /// Temporarily sets `self.state` to `._modifying` before calling the provided block and setting
- /// `self.state` to the `State` modified by the block.
- ///
- /// Since we hold state as associated data on our `State` enum, any modification to that state
- /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
- /// to `._modifying` allows us to avoid an extra reference to any heap allocated data and
- /// therefore avoid a copy on write.
- private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
- var state: State = ._modifying
- swap(&self.state, &state)
- defer {
- swap(&self.state, &state)
- }
- return body(&state)
- }
- internal init(
- services: [Substring: CallHandlerProvider],
- encoding: ServerMessageEncoding,
- normalizeHeaders: Bool = true
- ) {
- let state = RequestIdleResponseIdleState(
- services: services,
- encoding: encoding,
- normalizeHeaders: normalizeHeaders
- )
- self.state = .requestIdleResponseIdle(state)
- }
- }
- extension HTTP2ToRawGRPCStateMachine {
- enum State {
- // Both peers are idle. Nothing has happened to the stream.
- case requestIdleResponseIdle(RequestIdleResponseIdleState)
- // Received valid headers. Nothing has been sent in response.
- case requestOpenResponseIdle(RequestOpenResponseIdleState)
- // Received valid headers and request(s). Response headers have been sent.
- case requestOpenResponseOpen(RequestOpenResponseOpenState)
- // The request stream is closed. Nothing has been sent in response.
- case requestClosedResponseIdle(RequestClosedResponseIdleState)
- // The request stream is closed. Response headers have been sent.
- case requestClosedResponseOpen(RequestClosedResponseOpenState)
- // Both streams are closed. This state is terminal.
- case requestClosedResponseClosed
- // Not a real state. See 'withStateAvoidingCoWs'.
- case _modifying
- }
- struct RequestIdleResponseIdleState {
- /// The service providers, keyed by service name.
- var services: [Substring: CallHandlerProvider]
- /// The encoding configuration for this server.
- var encoding: ServerMessageEncoding
- /// Whether to normalize user-provided metadata.
- var normalizeHeaders: Bool
- }
- struct RequestOpenResponseIdleState {
- /// A length prefixed message reader for request messages.
- var reader: LengthPrefixedMessageReader
- /// A length prefixed message writer for response messages.
- var writer: LengthPrefixedMessageWriter
- /// The content type of the RPC.
- var contentType: ContentType
- /// An accept encoding header to send in the response headers indicating the message encoding
- /// that the server supports.
- var acceptEncoding: String?
- /// A message encoding header to send in the response headers indicating the encoding which will
- /// be used for responses.
- var responseEncoding: String?
- /// Whether to normalize user-provided metadata.
- var normalizeHeaders: Bool
- /// The pipeline configuration state.
- var configurationState: ConfigurationState
- }
- struct RequestClosedResponseIdleState {
- /// A length prefixed message reader for request messages.
- var reader: LengthPrefixedMessageReader
- /// A length prefixed message writer for response messages.
- var writer: LengthPrefixedMessageWriter
- /// The content type of the RPC.
- var contentType: ContentType
- /// An accept encoding header to send in the response headers indicating the message encoding
- /// that the server supports.
- var acceptEncoding: String?
- /// A message encoding header to send in the response headers indicating the encoding which will
- /// be used for responses.
- var responseEncoding: String?
- /// Whether to normalize user-provided metadata.
- var normalizeHeaders: Bool
- /// The pipeline configuration state.
- var configurationState: ConfigurationState
- init(from state: RequestOpenResponseIdleState) {
- self.reader = state.reader
- self.writer = state.writer
- self.contentType = state.contentType
- self.acceptEncoding = state.acceptEncoding
- self.responseEncoding = state.responseEncoding
- self.normalizeHeaders = state.normalizeHeaders
- self.configurationState = state.configurationState
- }
- }
- struct RequestOpenResponseOpenState {
- /// A length prefixed message reader for request messages.
- var reader: LengthPrefixedMessageReader
- /// A length prefixed message writer for response messages.
- var writer: LengthPrefixedMessageWriter
- /// Whether to normalize user-provided metadata.
- var normalizeHeaders: Bool
- init(from state: RequestOpenResponseIdleState) {
- self.reader = state.reader
- self.writer = state.writer
- self.normalizeHeaders = state.normalizeHeaders
- }
- }
- struct RequestClosedResponseOpenState {
- /// A length prefixed message reader for request messages.
- var reader: LengthPrefixedMessageReader
- /// A length prefixed message writer for response messages.
- var writer: LengthPrefixedMessageWriter
- /// Whether to normalize user-provided metadata.
- var normalizeHeaders: Bool
- init(from state: RequestOpenResponseOpenState) {
- self.reader = state.reader
- self.writer = state.writer
- self.normalizeHeaders = state.normalizeHeaders
- }
- init(from state: RequestClosedResponseIdleState) {
- self.reader = state.reader
- self.writer = state.writer
- self.normalizeHeaders = state.normalizeHeaders
- }
- }
- /// The pipeline configuration state.
- enum ConfigurationState {
- /// The pipeline is being configured. Any message data will be buffered into an appropriate
- /// message reader.
- case configuring(HPACKHeaders)
- /// The pipeline is configured.
- case configured
- /// Returns true if the configuration is in the `.configured` state.
- var isConfigured: Bool {
- switch self {
- case .configuring:
- return false
- case .configured:
- return true
- }
- }
- /// Configuration has completed.
- mutating func configured() -> HPACKHeaders {
- switch self {
- case .configured:
- preconditionFailure("Invalid state: already configured")
- case let .configuring(headers):
- self = .configured
- return headers
- }
- }
- }
- }
- extension HTTP2ToRawGRPCStateMachine {
- /// Actions to take as a result of interacting with the state machine.
- enum Action {
- /// No action is required.
- case none
- /// Configure the pipeline using the provided call handler.
- case configure(GRPCCallHandler)
- /// An error was caught, fire it down the pipeline.
- case errorCaught(Error)
- /// Forward the request headers to the next handler.
- case forwardHeaders(HPACKHeaders)
- /// Forward the buffer to the next handler.
- case forwardMessage(ByteBuffer)
- /// Forward the buffer to the next handler and then send end stream.
- case forwardMessageAndEnd(ByteBuffer)
- /// Forward the request headers to the next handler then try reading request messages.
- case forwardHeadersThenReadNextMessage(HPACKHeaders)
- /// Forward the buffer to the next handler then try reading request messages.
- case forwardMessageThenReadNextMessage(ByteBuffer)
- /// Forward end of stream to the next handler.
- case forwardEnd
- /// Try to read a request message.
- case readNextRequest
- /// Write the frame to the channel, optionally insert an extra flush (i.e. if the state machine
- /// is turning a request around rather than processing a response part).
- case write(HTTP2Frame.FramePayload, EventLoopPromise<Void>?, flush: Bool)
- /// Complete the promise with the given result.
- case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
- }
- struct StateAndAction {
- /// The next state.
- var state: State
- /// The action to take.
- var action: Action
- }
- }
- // MARK: Receive Headers
- // This is the only state in which we can receive headers.
- extension HTTP2ToRawGRPCStateMachine.RequestIdleResponseIdleState {
- func receive(
- headers: HPACKHeaders,
- eventLoop: EventLoop,
- errorDelegate: ServerErrorDelegate?,
- remoteAddress: SocketAddress?,
- logger: Logger
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- // Extract and validate the content type. If it's nil we need to close.
- guard let contentType = self.extractContentType(from: headers) else {
- return self.unsupportedContentType()
- }
- // Now extract the request message encoding and setup an appropriate message reader.
- // We may send back a list of acceptable request message encodings as well.
- let reader: LengthPrefixedMessageReader
- let acceptableRequestEncoding: String?
- switch self.extractRequestEncoding(from: headers) {
- case let .valid(messageReader, acceptEncodingHeader):
- reader = messageReader
- acceptableRequestEncoding = acceptEncodingHeader
- case let .invalid(status, acceptableRequestEncoding):
- return self.invalidRequestEncoding(
- status: status,
- acceptableRequestEncoding: acceptableRequestEncoding,
- contentType: contentType
- )
- }
- // Figure out which encoding we should use for responses.
- let (writer, responseEncoding) = self.extractResponseEncoding(from: headers)
- // Parse the path, and create a call handler.
- guard let path = headers.first(name: ":path") else {
- return self.methodNotImplemented("", contentType: contentType)
- }
- guard let callPath = CallPath(requestURI: path),
- let service = self.services[Substring(callPath.service)] else {
- return self.methodNotImplemented(path, contentType: contentType)
- }
- // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
- // some of which is exposed to service providers.
- let context = CallHandlerContext(
- errorDelegate: errorDelegate,
- logger: logger,
- encoding: self.encoding,
- eventLoop: eventLoop,
- path: path,
- remoteAddress: remoteAddress
- )
- // We have a matching service, hopefully we have a provider for the method too.
- let method = Substring(callPath.method)
- guard let handler = service.handleMethod(method, callHandlerContext: context) else {
- return self.methodNotImplemented(path, contentType: contentType)
- }
- // Finally, on to the next state!
- let requestOpenResponseIdle = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
- reader: reader,
- writer: writer,
- contentType: contentType,
- acceptEncoding: acceptableRequestEncoding,
- responseEncoding: responseEncoding,
- normalizeHeaders: self.normalizeHeaders,
- configurationState: .configuring(headers)
- )
- return .init(
- state: .requestOpenResponseIdle(requestOpenResponseIdle),
- action: .configure(handler)
- )
- }
- /// The 'content-type' is not supported; close with status code 415.
- private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
- //
- // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
- // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
- // clients from interpreting a gRPC error response, which uses status 200 (OK), as
- // successful.
- let trailers = HPACKHeaders([(":status", "415")])
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
- )
- }
- /// The RPC method is not implemented. Close with an appropriate status.
- private func methodNotImplemented(
- _ path: String,
- contentType: ContentType
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
- for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
- contentType: contentType,
- acceptableRequestEncoding: nil,
- userProvidedHeaders: nil,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
- )
- }
- /// The request encoding specified by the client is not supported. Close with an appropriate
- /// status.
- private func invalidRequestEncoding(
- status: GRPCStatus,
- acceptableRequestEncoding: String?,
- contentType: ContentType
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
- for: status,
- contentType: contentType,
- acceptableRequestEncoding: acceptableRequestEncoding,
- userProvidedHeaders: nil,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
- )
- }
- /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
- /// request message encoding is not supported.
- ///
- /// - Parameters:
- /// - encoding: The unsupported request message encoding sent by the client.
- /// - acceptable: The list if acceptable request message encoding the client may use.
- /// - Returns: The status and trailers to return to the client.
- private func makeStatusAndTrailersForUnsupportedEncoding(
- _ encoding: String,
- advertisedEncoding: [String]
- ) -> (GRPCStatus, acceptEncoding: String?) {
- let status: GRPCStatus
- let acceptEncoding: String?
- if advertisedEncoding.isEmpty {
- // No compression is supported; there's nothing to tell the client about.
- status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
- acceptEncoding = nil
- } else {
- // Return a list of supported encodings which we advertise. (The list we advertise may be a
- // subset of the encodings we support.)
- acceptEncoding = advertisedEncoding.joined(separator: ",")
- status = GRPCStatus(
- code: .unimplemented,
- message: "\(encoding) compression is not supported, supported algorithms are " +
- "listed in '\(GRPCHeaderName.acceptEncoding)'"
- )
- }
- return (status, acceptEncoding)
- }
- /// Extract and validate the 'content-type' sent by the client.
- /// - Parameter headers: The headers to extract the 'content-type' from
- private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
- return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
- }
- /// The result of validating the request encoding header.
- private enum RequestEncodingValidation {
- /// The encoding was valid.
- case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
- /// The encoding was invalid, the RPC should be terminated with this status.
- case invalid(status: GRPCStatus, acceptEncoding: String?)
- }
- /// Extract and validate the request message encoding header.
- /// - Parameters:
- /// - headers: The headers to extract the message encoding header from.
- /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
- /// and an accept encoding response header if the request encoding was valid, or a pair of
- /// `GRPCStatus` and trailers to close the RPC with.
- private func extractRequestEncoding(from headers: HPACKHeaders) -> RequestEncodingValidation {
- let encodings = headers[canonicalForm: GRPCHeaderName.encoding]
- // Fail if there's more than one encoding header.
- if encodings.count > 1 {
- let status = GRPCStatus(
- code: .invalidArgument,
- message: "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
- )
- return .invalid(status: status, acceptEncoding: nil)
- }
- let encodingHeader = encodings.first
- let result: RequestEncodingValidation
- let validator = MessageEncodingHeaderValidator(encoding: self.encoding)
- switch validator.validate(requestEncoding: encodingHeader) {
- case let .supported(algorithm, decompressionLimit, acceptEncoding):
- // Request message encoding is valid and supported.
- result = .valid(
- messageReader: LengthPrefixedMessageReader(
- compression: algorithm,
- decompressionLimit: decompressionLimit
- ),
- acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
- )
- case .noCompression:
- // No message encoding header was present. This means no compression is being used.
- result = .valid(
- messageReader: LengthPrefixedMessageReader(),
- acceptEncoding: nil
- )
- case let .unsupported(encoding, acceptable):
- // Request encoding is not supported.
- let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
- encoding,
- advertisedEncoding: acceptable
- )
- result = .invalid(status: status, acceptEncoding: acceptEncoding)
- }
- return result
- }
- /// Extract a suitable message encoding for responses.
- /// - Parameters:
- /// - headers: The headers to extract the acceptable response message encoding from.
- /// - configuration: The encoding configuration for the server.
- /// - Returns: A message writer and the response encoding header to send back to the client.
- private func extractResponseEncoding(
- from headers: HPACKHeaders
- ) -> (LengthPrefixedMessageWriter, String?) {
- let writer: LengthPrefixedMessageWriter
- let responseEncoding: String?
- switch self.encoding {
- case let .enabled(configuration):
- // Extract the encodings acceptable to the client for response messages.
- let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
- // Select the first algorithm that we support and have enabled. If we don't find one then we
- // won't compress response messages.
- let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
- CompressionAlgorithm(rawValue: value)
- }.first {
- configuration.enabledAlgorithms.contains($0)
- }
- writer = LengthPrefixedMessageWriter(compression: algorithm)
- responseEncoding = algorithm?.name
- case .disabled:
- // The server doesn't have compression enabled.
- writer = LengthPrefixedMessageWriter(compression: .none)
- responseEncoding = nil
- }
- return (writer, responseEncoding)
- }
- }
- // MARK: - Receive Data
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
- mutating func receive(
- buffer: inout ByteBuffer,
- endStream: Bool
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- // Append the bytes to the reader.
- self.reader.append(buffer: &buffer)
- let state: HTTP2ToRawGRPCStateMachine.State
- let action: HTTP2ToRawGRPCStateMachine.Action
- switch (self.configurationState.isConfigured, endStream) {
- case (true, true):
- /// Configured and end stream: read from the buffer, end will be sent as a result of draining
- /// the reader in the next state.
- state = .requestClosedResponseIdle(.init(from: self))
- action = .readNextRequest
- case (true, false):
- /// Configured but not end stream, just read from the buffer.
- state = .requestOpenResponseIdle(self)
- action = .readNextRequest
- case (false, true):
- // Not configured yet, but end of stream. Request stream is now closed but there's no point
- // reading yet.
- state = .requestClosedResponseIdle(.init(from: self))
- action = .none
- case (false, false):
- // Not configured yet, not end stream. No point reading a message yet since we don't have
- // anywhere to deliver it.
- state = .requestOpenResponseIdle(self)
- action = .none
- }
- return .init(state: state, action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
- mutating func receive(
- buffer: inout ByteBuffer,
- endStream: Bool
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- self.reader.append(buffer: &buffer)
- let state: HTTP2ToRawGRPCStateMachine.State
- if endStream {
- // End stream, so move to the closed state. Any end of request stream events events will
- // happen as a result of reading from the closed state.
- state = .requestClosedResponseOpen(.init(from: self))
- } else {
- state = .requestOpenResponseOpen(self)
- }
- return .init(state: state, action: .readNextRequest)
- }
- }
- // MARK: - Send Headers
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
- func send(
- headers userProvidedHeaders: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let headers = HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
- contentType: self.contentType,
- responseEncoding: self.responseEncoding,
- acceptableRequestEncoding: self.acceptEncoding,
- userProvidedHeaders: userProvidedHeaders,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestOpenResponseOpen(.init(from: self)),
- action: .write(.headers(.init(headers: headers)), promise, flush: false)
- )
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
- func send(
- headers userProvidedHeaders: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let headers = HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
- contentType: self.contentType,
- responseEncoding: self.responseEncoding,
- acceptableRequestEncoding: self.acceptEncoding,
- userProvidedHeaders: userProvidedHeaders,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestClosedResponseOpen(.init(from: self)),
- action: .write(.headers(.init(headers: headers)), promise, flush: false)
- )
- }
- }
- // MARK: - Send Data
- extension HTTP2ToRawGRPCStateMachine {
- static func writeGRPCFramedMessage(
- _ buffer: ByteBuffer,
- compress: Bool,
- allocator: ByteBufferAllocator,
- promise: EventLoopPromise<Void>?,
- writer: LengthPrefixedMessageWriter
- ) -> Action {
- do {
- let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
- return .write(.data(.init(data: .byteBuffer(prefixed))), promise, flush: false)
- } catch {
- return .completePromise(promise, .failure(error))
- }
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
- func send(
- buffer: ByteBuffer,
- allocator: ByteBufferAllocator,
- compress: Bool,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
- buffer,
- compress: compress,
- allocator: allocator,
- promise: promise,
- writer: self.writer
- )
- return .init(state: .requestOpenResponseOpen(self), action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
- func send(
- buffer: ByteBuffer,
- allocator: ByteBufferAllocator,
- compress: Bool,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
- buffer,
- compress: compress,
- allocator: allocator,
- promise: promise,
- writer: self.writer
- )
- return .init(state: .requestClosedResponseOpen(self), action: action)
- }
- }
- // MARK: - Send End
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
- func send(
- status: GRPCStatus,
- trailers userProvidedTrailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
- for: status,
- contentType: self.contentType,
- acceptableRequestEncoding: self.acceptEncoding,
- userProvidedHeaders: userProvidedTrailers,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
- )
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
- func send(
- status: GRPCStatus,
- trailers userProvidedTrailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
- for: status,
- contentType: self.contentType,
- acceptableRequestEncoding: self.acceptEncoding,
- userProvidedHeaders: userProvidedTrailers,
- normalizeUserProvidedHeaders: self.normalizeHeaders
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
- )
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
- func send(
- status: GRPCStatus,
- trailers userProvidedTrailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
- for: status,
- userProvidedHeaders: userProvidedTrailers,
- normalizeUserProvidedHeaders: true
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
- )
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
- func send(
- status: GRPCStatus,
- trailers userProvidedTrailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
- for: status,
- userProvidedHeaders: userProvidedTrailers,
- normalizeUserProvidedHeaders: true
- )
- return .init(
- state: .requestClosedResponseClosed,
- action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
- )
- }
- }
- // MARK: - Pipeline Configured
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
- mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let headers = self.configurationState.configured()
- let action: HTTP2ToRawGRPCStateMachine.Action
- // If there are unprocessed bytes then we need to read messages as well.
- let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
- if hasUnprocessedBytes {
- // If there are unprocessed bytes, we need to try to read after sending the metadata.
- action = .forwardHeadersThenReadNextMessage(headers)
- } else {
- // No unprocessed bytes; the reader is empty. Just send the metadata.
- action = .forwardHeaders(headers)
- }
- return .init(state: .requestOpenResponseIdle(self), action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
- mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let headers = self.configurationState.configured()
- return .init(
- state: .requestClosedResponseIdle(self),
- // Since we're already closed, we need to forward the headers and start reading.
- action: .forwardHeadersThenReadNextMessage(headers)
- )
- }
- }
- // MARK: - Read Next Request
- extension HTTP2ToRawGRPCStateMachine {
- static func read(
- from reader: inout LengthPrefixedMessageReader,
- requestStreamClosed: Bool
- ) -> HTTP2ToRawGRPCStateMachine.Action {
- do {
- // Try to read a message.
- guard let buffer = try reader.nextMessage() else {
- // We didn't read a message: if we're closed then there's no chance of receiving more bytes,
- // just forward the end of stream. If we're not closed then we could receive more bytes so
- // there's no need to take any action at this point.
- return requestStreamClosed ? .forwardEnd : .none
- }
- guard reader.unprocessedBytes == 0 else {
- // There are still unprocessed bytes, continue reading.
- return .forwardMessageThenReadNextMessage(buffer)
- }
- // If we're closed and there's nothing left to read, then we're done, forward the message and
- // end of stream. If we're closed we could still receive more bytes (or end stream) so just
- // forward the message.
- return requestStreamClosed ? .forwardMessageAndEnd(buffer) : .forwardMessage(buffer)
- } catch {
- return .errorCaught(error)
- }
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
- mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
- return .init(state: .requestOpenResponseIdle(self), action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
- mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
- return .init(state: .requestOpenResponseOpen(self), action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
- mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
- return .init(state: .requestClosedResponseIdle(self), action: action)
- }
- }
- extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
- mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
- let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
- return .init(state: .requestClosedResponseOpen(self), action: action)
- }
- }
- // MARK: - Top Level State Changes
- extension HTTP2ToRawGRPCStateMachine {
- /// Receive request headers.
- mutating func receive(
- headers: HPACKHeaders,
- eventLoop: EventLoop,
- errorDelegate: ServerErrorDelegate?,
- remoteAddress: SocketAddress?,
- logger: Logger
- ) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.receive(
- headers: headers,
- eventLoop: eventLoop,
- errorDelegate: errorDelegate,
- remoteAddress: remoteAddress,
- logger: logger
- )
- }
- }
- /// Receive request buffer.
- mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.receive(buffer: &buffer, endStream: endStream)
- }
- }
- /// Send response headers.
- mutating func send(headers: HPACKHeaders, promise: EventLoopPromise<Void>?) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.send(headers: headers, promise: promise)
- }
- }
- /// Send a response buffer.
- mutating func send(
- buffer: ByteBuffer,
- allocator: ByteBufferAllocator,
- compress: Bool,
- promise: EventLoopPromise<Void>?
- ) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.send(buffer: buffer, allocator: allocator, compress: compress, promise: promise)
- }
- }
- /// Send status and trailers.
- mutating func send(
- status: GRPCStatus,
- trailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> Action {
- return self.withStateAvoidingCoWs { state in
- state.send(status: status, trailers: trailers, promise: promise)
- }
- }
- /// The pipeline has been configured with a service provider.
- mutating func pipelineConfigured() -> Action {
- return self.withStateAvoidingCoWs { state in
- state.pipelineConfigured()
- }
- }
- /// Try to read a request message.
- mutating func readNextRequest() -> Action {
- return self.withStateAvoidingCoWs { state in
- state.readNextRequest()
- }
- }
- }
- extension HTTP2ToRawGRPCStateMachine.State {
- mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- case .requestIdleResponseIdle:
- preconditionFailure("Invalid state: pipeline configured before receiving request headers")
- case var .requestOpenResponseIdle(state):
- let stateAndAction = state.pipelineConfigured()
- self = stateAndAction.state
- return stateAndAction.action
- case var .requestClosedResponseIdle(state):
- let stateAndAction = state.pipelineConfigured()
- self = stateAndAction.state
- return stateAndAction.action
- case .requestOpenResponseOpen,
- .requestClosedResponseOpen,
- .requestClosedResponseClosed:
- preconditionFailure("Invalid state: response stream opened before pipeline was configured")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- mutating func receive(
- headers: HPACKHeaders,
- eventLoop: EventLoop,
- errorDelegate: ServerErrorDelegate?,
- remoteAddress: SocketAddress?,
- logger: Logger
- ) -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- // This is the only state in which we can receive headers. Everything else is invalid.
- case let .requestIdleResponseIdle(state):
- let stateAndAction = state.receive(
- headers: headers,
- eventLoop: eventLoop,
- errorDelegate: errorDelegate,
- remoteAddress: remoteAddress,
- logger: logger
- )
- self = stateAndAction.state
- return stateAndAction.action
- // We can't receive headers in any of these states.
- case .requestOpenResponseIdle,
- .requestOpenResponseOpen,
- .requestClosedResponseIdle,
- .requestClosedResponseOpen,
- .requestClosedResponseClosed:
- preconditionFailure("Invalid state")
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- /// Receive a buffer from the client.
- mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> HTTP2ToRawGRPCStateMachine
- .Action {
- switch self {
- case .requestIdleResponseIdle:
- /// This isn't allowed: we must receive the request headers first.
- preconditionFailure("Invalid state")
- case var .requestOpenResponseIdle(state):
- let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
- self = stateAndAction.state
- return stateAndAction.action
- case var .requestOpenResponseOpen(state):
- let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
- self = stateAndAction.state
- return stateAndAction.action
- case .requestClosedResponseIdle,
- .requestClosedResponseOpen:
- preconditionFailure("Invalid state: the request stream is already closed")
- case .requestClosedResponseClosed:
- // This is okay: we could have closed before receiving end.
- return .none
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- case .requestIdleResponseIdle:
- preconditionFailure("Invalid state")
- case var .requestOpenResponseIdle(state):
- let stateAndAction = state.readNextRequest()
- self = stateAndAction.state
- return stateAndAction.action
- case var .requestOpenResponseOpen(state):
- let stateAndAction = state.readNextRequest()
- self = stateAndAction.state
- return stateAndAction.action
- case var .requestClosedResponseIdle(state):
- let stateAndAction = state.readNextRequest()
- self = stateAndAction.state
- return stateAndAction.action
- case var .requestClosedResponseOpen(state):
- let stateAndAction = state.readNextRequest()
- self = stateAndAction.state
- return stateAndAction.action
- case .requestClosedResponseClosed:
- return .none
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- mutating func send(
- headers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- case .requestIdleResponseIdle:
- preconditionFailure("Invalid state: the request stream isn't open")
- case let .requestOpenResponseIdle(state):
- let stateAndAction = state.send(headers: headers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case let .requestClosedResponseIdle(state):
- let stateAndAction = state.send(headers: headers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case .requestOpenResponseOpen,
- .requestClosedResponseOpen,
- .requestClosedResponseClosed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- mutating func send(
- buffer: ByteBuffer,
- allocator: ByteBufferAllocator,
- compress: Bool,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- case .requestIdleResponseIdle:
- preconditionFailure("Invalid state: the request stream is still closed")
- case .requestOpenResponseIdle,
- .requestClosedResponseIdle:
- let error = GRPCError.InvalidState("Response headers must be sent before response message")
- return .completePromise(promise, .failure(error))
- case let .requestOpenResponseOpen(state):
- let stateAndAction = state.send(
- buffer: buffer,
- allocator: allocator,
- compress: compress,
- promise: promise
- )
- self = stateAndAction.state
- return stateAndAction.action
- case let .requestClosedResponseOpen(state):
- let stateAndAction = state.send(
- buffer: buffer,
- allocator: allocator,
- compress: compress,
- promise: promise
- )
- self = stateAndAction.state
- return stateAndAction.action
- case .requestClosedResponseClosed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- mutating func send(
- status: GRPCStatus,
- trailers: HPACKHeaders,
- promise: EventLoopPromise<Void>?
- ) -> HTTP2ToRawGRPCStateMachine.Action {
- switch self {
- case .requestIdleResponseIdle:
- preconditionFailure("Invalid state: the request stream is still closed")
- case let .requestOpenResponseIdle(state):
- let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case let .requestClosedResponseIdle(state):
- let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case let .requestOpenResponseOpen(state):
- let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case let .requestClosedResponseOpen(state):
- let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
- self = stateAndAction.state
- return stateAndAction.action
- case .requestClosedResponseClosed:
- return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
- case ._modifying:
- preconditionFailure("Left in modifying state")
- }
- }
- }
- // MARK: - Helpers
- extension HTTP2ToRawGRPCStateMachine {
- static func makeResponseHeaders(
- contentType: ContentType,
- responseEncoding: String?,
- acceptableRequestEncoding: String?,
- userProvidedHeaders: HPACKHeaders,
- normalizeUserProvidedHeaders: Bool
- ) -> HPACKHeaders {
- // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
- // 'grpc-accept-encoding' as well.
- let capacity = 4 + userProvidedHeaders.count
- var headers = HPACKHeaders()
- headers.reserveCapacity(capacity)
- headers.add(name: ":status", value: "200")
- headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
- if let responseEncoding = responseEncoding {
- headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
- }
- if let acceptEncoding = acceptableRequestEncoding {
- headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
- }
- // Add user provided headers, normalizing if required.
- headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
- return headers
- }
- static func makeResponseTrailersOnly(
- for status: GRPCStatus,
- contentType: ContentType,
- acceptableRequestEncoding: String?,
- userProvidedHeaders: HPACKHeaders?,
- normalizeUserProvidedHeaders: Bool
- ) -> HPACKHeaders {
- // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
- // 'grpc-message' and 'grpc-accept-encoding'.
- let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
- var headers = HPACKHeaders()
- headers.reserveCapacity(capacity)
- // Add the required trailers.
- headers.add(name: ":status", value: "200")
- headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
- headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
- if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
- headers.add(name: GRPCHeaderName.statusMessage, value: message)
- }
- // We may include this if the requested encoding was not valid.
- if let acceptEncoding = acceptableRequestEncoding {
- headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
- }
- if let userProvided = userProvidedHeaders {
- headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
- }
- return headers
- }
- static func makeResponseTrailers(
- for status: GRPCStatus,
- userProvidedHeaders: HPACKHeaders,
- normalizeUserProvidedHeaders: Bool
- ) -> HPACKHeaders {
- // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
- let capacity = 2 + userProvidedHeaders.count
- var trailers = HPACKHeaders()
- trailers.reserveCapacity(capacity)
- // status code.
- trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
- // status message, if present.
- if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
- trailers.add(name: GRPCHeaderName.statusMessage, value: message)
- }
- // user provided trailers.
- trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
- return trailers
- }
- }
- private extension HPACKHeaders {
- mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
- if normalize {
- self.add(contentsOf: other.lazy.map { name, value, indexable in
- (name: name.lowercased(), value: value, indexable: indexable)
- })
- } else {
- self.add(contentsOf: other)
- }
- }
- }
|