|
|
@@ -75,7 +75,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
let maximumPayloadSize: Int
|
|
|
var framer: GRPCMessageFramer
|
|
|
var compressor: Zlib.Compressor?
|
|
|
- var outboundCompression: CompressionAlgorithm?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
// The deframer must be optional because the client will not have one configured
|
|
|
// until the server opens and sends a grpc-encoding header.
|
|
|
@@ -89,12 +89,14 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(
|
|
|
previousState: ClientIdleServerIdleState,
|
|
|
compressor: Zlib.Compressor?,
|
|
|
+ outboundCompression: CompressionAlgorithm,
|
|
|
framer: GRPCMessageFramer,
|
|
|
decompressor: Zlib.Decompressor?,
|
|
|
deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
|
) {
|
|
|
self.maximumPayloadSize = previousState.maximumPayloadSize
|
|
|
self.compressor = compressor
|
|
|
+ self.outboundCompression = outboundCompression
|
|
|
self.framer = framer
|
|
|
self.decompressor = decompressor
|
|
|
self.deframer = deframer
|
|
|
@@ -105,6 +107,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
struct ClientOpenServerOpenState {
|
|
|
var framer: GRPCMessageFramer
|
|
|
var compressor: Zlib.Compressor?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
|
|
|
var decompressor: Zlib.Decompressor?
|
|
|
@@ -118,6 +121,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
|
|
|
self.deframer = deframer
|
|
|
self.decompressor = decompressor
|
|
|
@@ -129,6 +133,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
struct ClientOpenServerClosedState {
|
|
|
var framer: GRPCMessageFramer?
|
|
|
var compressor: Zlib.Compressor?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
|
var decompressor: Zlib.Decompressor?
|
|
|
@@ -145,6 +150,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientIdleServerIdleState) {
|
|
|
self.framer = nil
|
|
|
self.compressor = nil
|
|
|
+ self.outboundCompression = .none
|
|
|
self.deframer = nil
|
|
|
self.decompressor = nil
|
|
|
self.inboundMessageBuffer = .init()
|
|
|
@@ -153,6 +159,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientOpenServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.deframer = previousState.deframer
|
|
|
self.decompressor = previousState.decompressor
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
@@ -161,6 +168,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientOpenServerIdleState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
// The server went directly from idle to closed - this means it sent a
|
|
|
// trailers-only response:
|
|
|
@@ -177,7 +185,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
let maximumPayloadSize: Int
|
|
|
var framer: GRPCMessageFramer
|
|
|
var compressor: Zlib.Compressor?
|
|
|
- var outboundCompression: CompressionAlgorithm?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
|
var decompressor: Zlib.Decompressor?
|
|
|
@@ -195,9 +203,12 @@ private enum GRPCStreamStateMachineState {
|
|
|
|
|
|
if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
|
|
|
self.compressor = Zlib.Compressor(method: zlibMethod)
|
|
|
+ self.outboundCompression = compressionAlgorithm
|
|
|
+ } else {
|
|
|
+ self.compressor = nil
|
|
|
+ self.outboundCompression = .none
|
|
|
}
|
|
|
self.framer = GRPCMessageFramer()
|
|
|
- self.outboundCompression = compressionAlgorithm
|
|
|
// We don't need a deframer since we won't receive any messages from the
|
|
|
// client: it's closed.
|
|
|
self.deframer = nil
|
|
|
@@ -218,6 +229,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
struct ClientClosedServerOpenState {
|
|
|
var framer: GRPCMessageFramer
|
|
|
var compressor: Zlib.Compressor?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
|
|
|
var decompressor: Zlib.Decompressor?
|
|
|
@@ -227,6 +239,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientOpenServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.deframer = previousState.deframer
|
|
|
self.decompressor = previousState.decompressor
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
@@ -236,6 +249,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientClosedServerIdleState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
|
|
|
// In the case of the server, we don't need to deframe/decompress any more
|
|
|
// messages, since the client's closed.
|
|
|
@@ -252,6 +266,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
|
|
|
// In the case of the client, it will only be able to set up the deframer
|
|
|
// after it receives the chosen encoding from the server.
|
|
|
@@ -274,6 +289,7 @@ private enum GRPCStreamStateMachineState {
|
|
|
// the client.
|
|
|
var framer: GRPCMessageFramer?
|
|
|
var compressor: Zlib.Compressor?
|
|
|
+ var outboundCompression: CompressionAlgorithm
|
|
|
|
|
|
// These are already deframed, so we don't need the deframer anymore.
|
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
|
@@ -288,36 +304,42 @@ private enum GRPCStreamStateMachineState {
|
|
|
init(previousState: ClientIdleServerIdleState) {
|
|
|
self.framer = nil
|
|
|
self.compressor = nil
|
|
|
+ self.outboundCompression = .none
|
|
|
self.inboundMessageBuffer = .init()
|
|
|
}
|
|
|
|
|
|
init(previousState: ClientClosedServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
}
|
|
|
|
|
|
init(previousState: ClientClosedServerIdleState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
}
|
|
|
|
|
|
init(previousState: ClientOpenServerIdleState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
}
|
|
|
|
|
|
init(previousState: ClientOpenServerOpenState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
}
|
|
|
|
|
|
init(previousState: ClientOpenServerClosedState) {
|
|
|
self.framer = previousState.framer
|
|
|
self.compressor = previousState.compressor
|
|
|
+ self.outboundCompression = previousState.outboundCompression
|
|
|
self.inboundMessageBuffer = previousState.inboundMessageBuffer
|
|
|
}
|
|
|
}
|
|
|
@@ -555,6 +577,7 @@ extension GRPCStreamStateMachine {
|
|
|
.init(
|
|
|
previousState: state,
|
|
|
compressor: compressor,
|
|
|
+ outboundCompression: outboundEncoding,
|
|
|
framer: GRPCMessageFramer(),
|
|
|
decompressor: nil,
|
|
|
deframer: nil
|
|
|
@@ -1019,10 +1042,6 @@ extension GRPCStreamStateMachine {
|
|
|
headers.add(outboundEncoding.name, forKey: .encoding)
|
|
|
}
|
|
|
|
|
|
- for acceptedEncoding in configuration.acceptedEncodings.elements.filter({ $0 != .none }) {
|
|
|
- headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
|
|
|
- }
|
|
|
-
|
|
|
for metadataPair in customMetadata {
|
|
|
headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
|
|
|
}
|
|
|
@@ -1037,6 +1056,7 @@ extension GRPCStreamStateMachine {
|
|
|
// Server sends initial metadata
|
|
|
switch self.state {
|
|
|
case .clientOpenServerIdle(let state):
|
|
|
+ let outboundEncoding = state.outboundCompression
|
|
|
self.state = .clientOpenServerOpen(
|
|
|
.init(
|
|
|
previousState: state,
|
|
|
@@ -1048,14 +1068,15 @@ extension GRPCStreamStateMachine {
|
|
|
)
|
|
|
)
|
|
|
return self.makeResponseHeaders(
|
|
|
- outboundEncoding: state.outboundCompression,
|
|
|
+ outboundEncoding: outboundEncoding,
|
|
|
configuration: configuration,
|
|
|
customMetadata: metadata
|
|
|
)
|
|
|
case .clientClosedServerIdle(let state):
|
|
|
+ let outboundEncoding = state.outboundCompression
|
|
|
self.state = .clientClosedServerOpen(.init(previousState: state))
|
|
|
return self.makeResponseHeaders(
|
|
|
- outboundEncoding: state.outboundCompression,
|
|
|
+ outboundEncoding: outboundEncoding,
|
|
|
configuration: configuration,
|
|
|
customMetadata: metadata
|
|
|
)
|
|
|
@@ -1326,8 +1347,6 @@ extension GRPCStreamStateMachine {
|
|
|
canonicalForm: true
|
|
|
)
|
|
|
// Find the preferred encoding and use it to compress responses.
|
|
|
- // If it's identity, just skip it altogether, since we won't be
|
|
|
- // compressing.
|
|
|
for clientAdvertisedEncoding in clientAdvertisedEncodings {
|
|
|
if let algorithm = CompressionAlgorithm(name: clientAdvertisedEncoding),
|
|
|
configuration.acceptedEncodings.contains(algorithm)
|
|
|
@@ -1358,6 +1377,7 @@ extension GRPCStreamStateMachine {
|
|
|
.init(
|
|
|
previousState: state,
|
|
|
compressor: compressor,
|
|
|
+ outboundCompression: outboundEncoding,
|
|
|
framer: GRPCMessageFramer(),
|
|
|
decompressor: decompressor,
|
|
|
deframer: NIOSingleStepByteToMessageProcessor(deframer)
|