|
|
@@ -414,6 +414,7 @@ struct GRPCStreamStateMachine {
|
|
|
|
|
|
// Server-specific actions
|
|
|
case rejectRPC(trailers: HPACKHeaders)
|
|
|
+ case protocolViolation
|
|
|
}
|
|
|
|
|
|
mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
|
|
|
@@ -435,6 +436,7 @@ struct GRPCStreamStateMachine {
|
|
|
|
|
|
enum OnBufferReceivedAction: Equatable {
|
|
|
case readInbound
|
|
|
+ case doNothing
|
|
|
|
|
|
// Client-specific actions
|
|
|
|
|
|
@@ -1114,46 +1116,6 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func makeTrailers(
|
|
|
- status: Status,
|
|
|
- customMetadata: Metadata?,
|
|
|
- trailersOnly: Bool
|
|
|
- ) -> HPACKHeaders {
|
|
|
- // Trailers always contain the grpc-status header, and optionally,
|
|
|
- // grpc-message, and custom metadata.
|
|
|
- // If it's a trailers-only response, they will also contain :status and
|
|
|
- // content-type.
|
|
|
- var headers = HPACKHeaders()
|
|
|
- let customMetadataCount = customMetadata?.count ?? 0
|
|
|
- if trailersOnly {
|
|
|
- // Reserve 4 for capacity: 3 for the required headers, and 1 for the
|
|
|
- // optional status message.
|
|
|
- headers.reserveCapacity(4 + customMetadataCount)
|
|
|
- headers.add("200", forKey: .status)
|
|
|
- headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
|
|
|
- } else {
|
|
|
- // Reserve 2 for capacity: one for the required grpc-status, and
|
|
|
- // one for the optional message.
|
|
|
- headers.reserveCapacity(2 + customMetadataCount)
|
|
|
- }
|
|
|
-
|
|
|
- headers.add(String(status.code.rawValue), forKey: .grpcStatus)
|
|
|
-
|
|
|
- if !status.message.isEmpty {
|
|
|
- if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) {
|
|
|
- headers.add(percentEncodedMessage, forKey: .grpcStatusMessage)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if let customMetadata {
|
|
|
- for metadataPair in customMetadata {
|
|
|
- headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return headers
|
|
|
- }
|
|
|
-
|
|
|
private mutating func serverSend(
|
|
|
status: Status,
|
|
|
customMetadata: Metadata
|
|
|
@@ -1162,32 +1124,16 @@ extension GRPCStreamStateMachine {
|
|
|
switch self.state {
|
|
|
case .clientOpenServerOpen(let state):
|
|
|
self.state = .clientOpenServerClosed(.init(previousState: state))
|
|
|
- return self.makeTrailers(
|
|
|
- status: status,
|
|
|
- customMetadata: customMetadata,
|
|
|
- trailersOnly: false
|
|
|
- )
|
|
|
+ return .trailers(status: status, metadata: customMetadata)
|
|
|
case .clientClosedServerOpen(let state):
|
|
|
self.state = .clientClosedServerClosed(.init(previousState: state))
|
|
|
- return self.makeTrailers(
|
|
|
- status: status,
|
|
|
- customMetadata: customMetadata,
|
|
|
- trailersOnly: false
|
|
|
- )
|
|
|
+ return .trailers(status: status, metadata: customMetadata)
|
|
|
case .clientOpenServerIdle(let state):
|
|
|
self.state = .clientOpenServerClosed(.init(previousState: state))
|
|
|
- return self.makeTrailers(
|
|
|
- status: status,
|
|
|
- customMetadata: customMetadata,
|
|
|
- trailersOnly: true
|
|
|
- )
|
|
|
+ return .trailersOnly(status: status, metadata: customMetadata)
|
|
|
case .clientClosedServerIdle(let state):
|
|
|
self.state = .clientClosedServerClosed(.init(previousState: state))
|
|
|
- return self.makeTrailers(
|
|
|
- status: status,
|
|
|
- customMetadata: customMetadata,
|
|
|
- trailersOnly: true
|
|
|
- )
|
|
|
+ return .trailersOnly(status: status, metadata: customMetadata)
|
|
|
case .clientIdleServerIdle:
|
|
|
try self.invalidState(
|
|
|
"Server can't send status if client is idle."
|
|
|
@@ -1199,26 +1145,22 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- mutating private func closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
|
|
|
- endStream: Bool,
|
|
|
- rejectWithStatus status: Status
|
|
|
- ) -> OnMetadataReceived {
|
|
|
- if endStream {
|
|
|
- self.state = .clientClosedServerClosed(.init(previousState: currentState))
|
|
|
- } else {
|
|
|
- self.state = .clientOpenServerClosed(.init(previousState: currentState))
|
|
|
- }
|
|
|
-
|
|
|
- let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
|
|
|
- return .rejectRPC(trailers: trailers)
|
|
|
- }
|
|
|
-
|
|
|
private mutating func serverReceive(
|
|
|
headers: HPACKHeaders,
|
|
|
endStream: Bool,
|
|
|
configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
|
|
|
) throws -> OnMetadataReceived {
|
|
|
+ func closeServer(
|
|
|
+ from state: GRPCStreamStateMachineState.ClientIdleServerIdleState,
|
|
|
+ endStream: Bool
|
|
|
+ ) -> GRPCStreamStateMachineState {
|
|
|
+ if endStream {
|
|
|
+ return .clientClosedServerClosed(.init(previousState: state))
|
|
|
+ } else {
|
|
|
+ return .clientOpenServerClosed(.init(previousState: state))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
switch self.state {
|
|
|
case .clientIdleServerIdle(let state):
|
|
|
let contentType = headers.firstString(forKey: .contentType)
|
|
|
@@ -1233,10 +1175,9 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
|
|
|
guard let pathHeader = headers.firstString(forKey: .path) else {
|
|
|
- return self.closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: state,
|
|
|
- endStream: endStream,
|
|
|
- rejectWithStatus: Status(
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
code: .invalidArgument,
|
|
|
message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
|
|
|
)
|
|
|
@@ -1244,10 +1185,9 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
|
|
|
guard let path = MethodDescriptor(path: pathHeader) else {
|
|
|
- return self.closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: state,
|
|
|
- endStream: endStream,
|
|
|
- rejectWithStatus: Status(
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
code: .unimplemented,
|
|
|
message:
|
|
|
"The given \(GRPCHTTP2Keys.path.rawValue) (\(pathHeader)) does not correspond to a valid method."
|
|
|
@@ -1258,10 +1198,9 @@ extension GRPCStreamStateMachine {
|
|
|
let scheme = headers.firstString(forKey: .scheme)
|
|
|
.flatMap { GRPCStreamStateMachineConfiguration.Scheme(rawValue: $0) }
|
|
|
if scheme == nil {
|
|
|
- return self.closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: state,
|
|
|
- endStream: endStream,
|
|
|
- rejectWithStatus: Status(
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
code: .invalidArgument,
|
|
|
message: ":scheme header must be present and one of \"http\" or \"https\"."
|
|
|
)
|
|
|
@@ -1269,10 +1208,9 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
|
|
|
guard let method = headers.firstString(forKey: .method), method == "POST" else {
|
|
|
- return self.closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: state,
|
|
|
- endStream: endStream,
|
|
|
- rejectWithStatus: Status(
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
code: .invalidArgument,
|
|
|
message: ":method header is expected to be present and have a value of \"POST\"."
|
|
|
)
|
|
|
@@ -1280,10 +1218,9 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
|
|
|
guard let te = headers.firstString(forKey: .te), te == "trailers" else {
|
|
|
- return self.closeServerAndBuildRejectRPCAction(
|
|
|
- currentState: state,
|
|
|
- endStream: endStream,
|
|
|
- rejectWithStatus: Status(
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
code: .invalidArgument,
|
|
|
message: "\"te\" header is expected to be present and have a value of \"trailers\"."
|
|
|
)
|
|
|
@@ -1300,36 +1237,31 @@ extension GRPCStreamStateMachine {
|
|
|
var encodingValuesIterator = encodingValues.makeIterator()
|
|
|
if let rawEncoding = encodingValuesIterator.next() {
|
|
|
guard encodingValuesIterator.next() == nil else {
|
|
|
- let status = Status(
|
|
|
- code: .internalError,
|
|
|
- message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ return .rejectRPC(
|
|
|
+ trailers: .trailersOnly(
|
|
|
+ code: .internalError,
|
|
|
+ message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
|
|
|
+ )
|
|
|
)
|
|
|
- let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
|
|
|
- return .rejectRPC(trailers: trailers)
|
|
|
}
|
|
|
|
|
|
guard let clientEncoding = CompressionAlgorithm(name: rawEncoding),
|
|
|
configuration.acceptedEncodings.contains(clientEncoding)
|
|
|
else {
|
|
|
- let statusMessage = """
|
|
|
- \(rawEncoding) compression is not supported; \
|
|
|
- supported algorithms are listed in grpc-accept-encoding
|
|
|
- """
|
|
|
+ self.state = closeServer(from: state, endStream: endStream)
|
|
|
+ var trailers = HPACKHeaders.trailersOnly(
|
|
|
+ code: .unimplemented,
|
|
|
+ message: """
|
|
|
+ \(rawEncoding) compression is not supported; \
|
|
|
+ supported algorithms are listed in grpc-accept-encoding
|
|
|
+ """
|
|
|
+ )
|
|
|
|
|
|
- var customMetadata = Metadata()
|
|
|
- customMetadata.reserveCapacity(configuration.acceptedEncodings.count)
|
|
|
for acceptedEncoding in configuration.acceptedEncodings.elements {
|
|
|
- customMetadata.addString(
|
|
|
- acceptedEncoding.name,
|
|
|
- forKey: GRPCHTTP2Keys.acceptEncoding.rawValue
|
|
|
- )
|
|
|
+ trailers.add(name: GRPCHTTP2Keys.acceptEncoding.rawValue, value: acceptedEncoding.name)
|
|
|
}
|
|
|
|
|
|
- let trailers = self.makeTrailers(
|
|
|
- status: Status(code: .unimplemented, message: statusMessage),
|
|
|
- customMetadata: customMetadata,
|
|
|
- trailersOnly: true
|
|
|
- )
|
|
|
return .rejectRPC(trailers: trailers)
|
|
|
}
|
|
|
|
|
|
@@ -1386,14 +1318,13 @@ extension GRPCStreamStateMachine {
|
|
|
}
|
|
|
|
|
|
return .receivedMetadata(Metadata(headers: headers), path)
|
|
|
+
|
|
|
case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
|
|
|
- try self.invalidState(
|
|
|
- "Client shouldn't have sent metadata twice."
|
|
|
- )
|
|
|
+ // Metadata has already been received, should only be sent once by clients.
|
|
|
+ return .protocolViolation
|
|
|
+
|
|
|
case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
|
|
|
- try self.invalidState(
|
|
|
- "Client can't have sent metadata if closed."
|
|
|
- )
|
|
|
+ try self.invalidState("Client can't have sent metadata if closed.")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1401,11 +1332,12 @@ extension GRPCStreamStateMachine {
|
|
|
buffer: ByteBuffer,
|
|
|
endStream: Bool
|
|
|
) throws -> OnBufferReceivedAction {
|
|
|
+ let action: OnBufferReceivedAction
|
|
|
+
|
|
|
switch self.state {
|
|
|
case .clientIdleServerIdle:
|
|
|
- try self.invalidState(
|
|
|
- "Can't have received a message if client is idle."
|
|
|
- )
|
|
|
+ try self.invalidState("Can't have received a message if client is idle.")
|
|
|
+
|
|
|
case .clientOpenServerIdle(var state):
|
|
|
// Deframer must be present on the server side, as we know the decompression
|
|
|
// algorithm from the moment the client opens.
|
|
|
@@ -1418,6 +1350,9 @@ extension GRPCStreamStateMachine {
|
|
|
} else {
|
|
|
self.state = .clientOpenServerIdle(state)
|
|
|
}
|
|
|
+
|
|
|
+ action = .readInbound
|
|
|
+
|
|
|
case .clientOpenServerOpen(var state):
|
|
|
try state.deframer.process(buffer: buffer) { deframedMessage in
|
|
|
state.inboundMessageBuffer.append(deframedMessage)
|
|
|
@@ -1428,6 +1363,9 @@ extension GRPCStreamStateMachine {
|
|
|
} else {
|
|
|
self.state = .clientOpenServerOpen(state)
|
|
|
}
|
|
|
+
|
|
|
+ action = .readInbound
|
|
|
+
|
|
|
case .clientOpenServerClosed(let state):
|
|
|
// Client is not done sending request, but server has already closed.
|
|
|
// Ignore the rest of the request: do nothing, unless endStream is set,
|
|
|
@@ -1435,12 +1373,14 @@ extension GRPCStreamStateMachine {
|
|
|
if endStream {
|
|
|
self.state = .clientClosedServerClosed(.init(previousState: state))
|
|
|
}
|
|
|
+
|
|
|
+ action = .doNothing
|
|
|
+
|
|
|
case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
|
|
|
- try self.invalidState(
|
|
|
- "Client can't send a message if closed."
|
|
|
- )
|
|
|
+ try self.invalidState("Client can't send a message if closed.")
|
|
|
}
|
|
|
- return .readInbound
|
|
|
+
|
|
|
+ return action
|
|
|
}
|
|
|
|
|
|
private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
|
|
|
@@ -1482,24 +1422,26 @@ extension GRPCStreamStateMachine {
|
|
|
let request = state.inboundMessageBuffer.pop()
|
|
|
self.state = .clientOpenServerIdle(state)
|
|
|
return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
|
|
|
+
|
|
|
case .clientOpenServerOpen(var state):
|
|
|
let request = state.inboundMessageBuffer.pop()
|
|
|
self.state = .clientOpenServerOpen(state)
|
|
|
return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
|
|
|
- case .clientOpenServerClosed(var state):
|
|
|
+
|
|
|
+ case .clientClosedServerIdle(var state):
|
|
|
let request = state.inboundMessageBuffer.pop()
|
|
|
- self.state = .clientOpenServerClosed(state)
|
|
|
- return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
|
|
|
+ self.state = .clientClosedServerIdle(state)
|
|
|
+ return request.map { .receiveMessage($0) } ?? .noMoreMessages
|
|
|
+
|
|
|
case .clientClosedServerOpen(var state):
|
|
|
let request = state.inboundMessageBuffer.pop()
|
|
|
self.state = .clientClosedServerOpen(state)
|
|
|
return request.map { .receiveMessage($0) } ?? .noMoreMessages
|
|
|
- case .clientClosedServerClosed(var state):
|
|
|
- let request = state.inboundMessageBuffer.pop()
|
|
|
- self.state = .clientClosedServerClosed(state)
|
|
|
- return request.map { .receiveMessage($0) } ?? .noMoreMessages
|
|
|
- case .clientClosedServerIdle:
|
|
|
+
|
|
|
+ case .clientOpenServerClosed, .clientClosedServerClosed:
|
|
|
+ // Server has closed, no need to read.
|
|
|
return .noMoreMessages
|
|
|
+
|
|
|
case .clientIdleServerIdle:
|
|
|
return .awaitMoreMessages
|
|
|
}
|
|
|
@@ -1546,6 +1488,45 @@ extension HPACKHeaders {
|
|
|
internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
|
|
|
self.add(name: key.rawValue, value: value)
|
|
|
}
|
|
|
+
|
|
|
+ static func trailersOnly(code: Status.Code, message: String, metadata: Metadata = [:]) -> Self {
|
|
|
+ return .trailersOnly(status: Status(code: code, message: message), metadata: metadata)
|
|
|
+ }
|
|
|
+
|
|
|
+ static func trailersOnly(status: Status, metadata: Metadata = [:]) -> Self {
|
|
|
+ return .makeTrailers(isTrailersOnly: true, status: status, metadata: metadata)
|
|
|
+ }
|
|
|
+
|
|
|
+ static func trailers(status: Status, metadata: Metadata = [:]) -> Self {
|
|
|
+ return .makeTrailers(isTrailersOnly: false, status: status, metadata: metadata)
|
|
|
+ }
|
|
|
+
|
|
|
+ private static func makeTrailers(
|
|
|
+ isTrailersOnly: Bool,
|
|
|
+ status: Status,
|
|
|
+ metadata: Metadata
|
|
|
+ ) -> Self {
|
|
|
+ var trailers = HPACKHeaders()
|
|
|
+
|
|
|
+ if isTrailersOnly {
|
|
|
+ trailers.reserveCapacity(4 + metadata.count)
|
|
|
+ trailers.add("200", forKey: .status)
|
|
|
+ trailers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
|
|
|
+ } else {
|
|
|
+ trailers.reserveCapacity(2 + metadata.count)
|
|
|
+ }
|
|
|
+
|
|
|
+ trailers.add(String(status.code.rawValue), forKey: .grpcStatus)
|
|
|
+ if !status.message.isEmpty, let encoded = GRPCStatusMessageMarshaller.marshall(status.message) {
|
|
|
+ trailers.add(encoded, forKey: .grpcStatusMessage)
|
|
|
+ }
|
|
|
+
|
|
|
+ for (key, value) in metadata {
|
|
|
+ trailers.add(name: key, value: value.encoded())
|
|
|
+ }
|
|
|
+
|
|
|
+ return trailers
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
extension Zlib.Method {
|