|
@@ -110,7 +110,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
struct ClientOpenServerClosedState {
|
|
struct ClientOpenServerClosedState {
|
|
|
- var framer: GRPCMessageFramer
|
|
|
|
|
|
|
+ var framer: GRPCMessageFramer?
|
|
|
var compressor: Zlib.Compressor?
|
|
var compressor: Zlib.Compressor?
|
|
|
|
|
|
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
@@ -118,6 +118,21 @@ private enum GRPCStreamStateMachineState {
|
|
|
|
|
|
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
|
|
|
|
|
|
|
|
+ // This transition should only happen on the server-side when, upon receiving
|
|
|
|
|
+ // initial client metadata, some of the headers are invalid and we must reject
|
|
|
|
|
+ // the RPC.
|
|
|
|
|
+ // We will mark the client as open (because it sent initial metadata albeit
|
|
|
|
|
+ // invalid) but we'll close the server, meaning all future messages sent from
|
|
|
|
|
+ // the client will be ignored. Because of this, we won't need to frame or
|
|
|
|
|
+ // deframe any messages, as we won't be reading or writing any messages.
|
|
|
|
|
+ init(previousState: ClientIdleServerIdleState) {
|
|
|
|
|
+ self.framer = nil
|
|
|
|
|
+ self.compressor = nil
|
|
|
|
|
+ self.deframer = nil
|
|
|
|
|
+ self.decompressor = nil
|
|
|
|
|
+ self.inboundMessageBuffer = .init()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
init(previousState: ClientOpenServerOpenState) {
|
|
init(previousState: ClientOpenServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
self.compressor = previousState.compressor
|
|
@@ -240,12 +255,25 @@ private enum GRPCStreamStateMachineState {
|
|
|
// We still need the framer and compressor in case the server has closed
|
|
// We still need the framer and compressor in case the server has closed
|
|
|
// but its buffer is not yet empty and still needs to send messages out to
|
|
// but its buffer is not yet empty and still needs to send messages out to
|
|
|
// the client.
|
|
// the client.
|
|
|
- var framer: GRPCMessageFramer
|
|
|
|
|
|
|
+ var framer: GRPCMessageFramer?
|
|
|
var compressor: Zlib.Compressor?
|
|
var compressor: Zlib.Compressor?
|
|
|
|
|
|
|
|
// These are already deframed, so we don't need the deframer anymore.
|
|
// These are already deframed, so we don't need the deframer anymore.
|
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
|
|
|
|
|
|
|
|
+ // This transition should only happen on the server-side when, upon receiving
|
|
|
|
|
+ // initial client metadata, some of the headers are invalid and we must reject
|
|
|
|
|
+ // the RPC.
|
|
|
|
|
+ // We will mark the client as closed (because it set the EOS flag, even if
|
|
|
|
|
+ // the initial metadata was invalid) and we'll close the server too.
|
|
|
|
|
+ // Because of this, we won't need to frame any messages, as we
|
|
|
|
|
+ // won't be writing any messages.
|
|
|
|
|
+ init(previousState: ClientIdleServerIdleState) {
|
|
|
|
|
+ self.framer = nil
|
|
|
|
|
+ self.compressor = nil
|
|
|
|
|
+ self.inboundMessageBuffer = .init()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
init(previousState: ClientClosedServerOpenState) {
|
|
init(previousState: ClientClosedServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
self.compressor = previousState.compressor
|
|
@@ -1062,6 +1090,21 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ mutating private func closeServerAndBuildRejectRPCAction(
|
|
|
|
|
+ currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
|
|
|
|
|
+ endStream: Bool,
|
|
|
|
|
+ rejectWithStatus status: Status
|
|
|
|
|
+ ) -> OnMetadataReceived {
|
|
|
|
|
+ if endStream {
|
|
|
|
|
+ self.state = .clientClosedServerClosed(.init(previousState: currentState))
|
|
|
|
|
+ } else {
|
|
|
|
|
+ self.state = .clientOpenServerClosed(.init(previousState: currentState))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
|
|
|
|
|
+ return .rejectRPC(trailers: trailers)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
private mutating func serverReceive(
|
|
private mutating func serverReceive(
|
|
|
metadata: HPACKHeaders,
|
|
metadata: HPACKHeaders,
|
|
|
endStream: Bool,
|
|
endStream: Bool,
|
|
@@ -1071,7 +1114,9 @@ extension GRPCStreamStateMachine {
|
|
|
case .clientIdleServerIdle(let state):
|
|
case .clientIdleServerIdle(let state):
|
|
|
let contentType = metadata.firstString(forKey: .contentType)
|
|
let contentType = metadata.firstString(forKey: .contentType)
|
|
|
.flatMap { ContentType(value: $0) }
|
|
.flatMap { ContentType(value: $0) }
|
|
|
- guard contentType != nil else {
|
|
|
|
|
|
|
+ if contentType == nil {
|
|
|
|
|
+ self.state = .clientOpenServerClosed(.init(previousState: state))
|
|
|
|
|
+
|
|
|
// Respond with HTTP-level Unsupported Media Type status code.
|
|
// Respond with HTTP-level Unsupported Media Type status code.
|
|
|
var trailers = HPACKHeaders()
|
|
var trailers = HPACKHeaders()
|
|
|
trailers.add("415", forKey: .status)
|
|
trailers.add("415", forKey: .status)
|
|
@@ -1080,13 +1125,50 @@ extension GRPCStreamStateMachine {
|
|
|
|
|
|
|
|
let path = metadata.firstString(forKey: .path)
|
|
let path = metadata.firstString(forKey: .path)
|
|
|
.flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
|
|
.flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
|
|
|
- guard path != nil else {
|
|
|
|
|
- let status = Status(
|
|
|
|
|
- code: .unimplemented,
|
|
|
|
|
- message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
|
|
|
|
|
|
|
+ if path == nil {
|
|
|
|
|
+ return self.closeServerAndBuildRejectRPCAction(
|
|
|
|
|
+ currentState: state,
|
|
|
|
|
+ endStream: endStream,
|
|
|
|
|
+ rejectWithStatus: Status(
|
|
|
|
|
+ code: .unimplemented,
|
|
|
|
|
+ message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let scheme = metadata.firstString(forKey: .scheme)
|
|
|
|
|
+ .flatMap { Scheme(rawValue: $0) }
|
|
|
|
|
+ if scheme == nil {
|
|
|
|
|
+ return self.closeServerAndBuildRejectRPCAction(
|
|
|
|
|
+ currentState: state,
|
|
|
|
|
+ endStream: endStream,
|
|
|
|
|
+ rejectWithStatus: Status(
|
|
|
|
|
+ code: .invalidArgument,
|
|
|
|
|
+ message: ":scheme header must be present and one of \"http\" or \"https\"."
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ guard let method = metadata.firstString(forKey: .method), method == "POST" else {
|
|
|
|
|
+ return self.closeServerAndBuildRejectRPCAction(
|
|
|
|
|
+ currentState: state,
|
|
|
|
|
+ endStream: endStream,
|
|
|
|
|
+ rejectWithStatus: Status(
|
|
|
|
|
+ code: .invalidArgument,
|
|
|
|
|
+ message: ":method header is expected to be present and have a value of \"POST\"."
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ guard let te = metadata.firstString(forKey: .te), te == "trailers" else {
|
|
|
|
|
+ return self.closeServerAndBuildRejectRPCAction(
|
|
|
|
|
+ currentState: state,
|
|
|
|
|
+ endStream: endStream,
|
|
|
|
|
+ rejectWithStatus: Status(
|
|
|
|
|
+ code: .invalidArgument,
|
|
|
|
|
+ message: "\"te\" header is expected to be present and have a value of \"trailers\"."
|
|
|
|
|
+ )
|
|
|
)
|
|
)
|
|
|
- let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
|
|
|
|
|
- return .rejectRPC(trailers: trailers)
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
|
|
func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
|
|
@@ -1265,7 +1347,7 @@ extension GRPCStreamStateMachine {
|
|
|
self.state = .clientClosedServerOpen(state)
|
|
self.state = .clientClosedServerOpen(state)
|
|
|
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
|
|
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
|
|
|
case .clientOpenServerClosed(var state):
|
|
case .clientOpenServerClosed(var state):
|
|
|
- let response = try state.framer.next(compressor: state.compressor)
|
|
|
|
|
|
|
+ let response = try state.framer?.next(compressor: state.compressor)
|
|
|
self.state = .clientOpenServerClosed(state)
|
|
self.state = .clientOpenServerClosed(state)
|
|
|
if let response {
|
|
if let response {
|
|
|
return .sendMessage(response)
|
|
return .sendMessage(response)
|
|
@@ -1273,7 +1355,7 @@ extension GRPCStreamStateMachine {
|
|
|
return .noMoreMessages
|
|
return .noMoreMessages
|
|
|
}
|
|
}
|
|
|
case .clientClosedServerClosed(var state):
|
|
case .clientClosedServerClosed(var state):
|
|
|
- let response = try state.framer.next(compressor: state.compressor)
|
|
|
|
|
|
|
+ let response = try state.framer?.next(compressor: state.compressor)
|
|
|
self.state = .clientClosedServerClosed(state)
|
|
self.state = .clientClosedServerClosed(state)
|
|
|
if let response {
|
|
if let response {
|
|
|
return .sendMessage(response)
|
|
return .sendMessage(response)
|