|
|
@@ -121,6 +121,12 @@ extension HPACKHeaders {
|
|
|
GRPCHTTP2Keys.encoding.rawValue: "deflate",
|
|
|
GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
|
|
|
]
|
|
|
+ fileprivate static let serverInitialMetadataWithGZIPCompression: Self = [
|
|
|
+ GRPCHTTP2Keys.status.rawValue: "200",
|
|
|
+ GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
|
|
|
+ GRPCHTTP2Keys.encoding.rawValue: "gzip",
|
|
|
+ GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
|
|
|
+ ]
|
|
|
fileprivate static let serverTrailers: Self = [
|
|
|
GRPCHTTP2Keys.status.rawValue: "200",
|
|
|
GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
|
|
|
@@ -331,6 +337,93 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ func testReceiveInitialMetadataWhenServerIdle_ClientUnsupportedEncoding() throws {
|
|
|
+ // Create client with deflate compression enabled
|
|
|
+ var stateMachine = self.makeClientStateMachine(
|
|
|
+ targetState: .clientOpenServerIdle,
|
|
|
+ compressionEnabled: true
|
|
|
+ )
|
|
|
+
|
|
|
+ // Try opening server with gzip compression, which client does not support.
|
|
|
+ let action = try stateMachine.receive(
|
|
|
+ headers: .serverInitialMetadataWithGZIPCompression,
|
|
|
+ endStream: false
|
|
|
+ )
|
|
|
+
|
|
|
+ XCTAssertEqual(
|
|
|
+ action,
|
|
|
+ .receivedStatusAndMetadata(
|
|
|
+ status: Status(
|
|
|
+ code: .internalError,
|
|
|
+ message:
|
|
|
+ "The server picked a compression algorithm ('gzip') the client does not know about."
|
|
|
+ ),
|
|
|
+ metadata: [
|
|
|
+ ":status": "200",
|
|
|
+ "content-type": "application/grpc",
|
|
|
+ "grpc-encoding": "gzip",
|
|
|
+ "grpc-accept-encoding": "deflate",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ func testReceiveMessage_ClientCompressionEnabled() throws {
|
|
|
+ // Enable deflate compression on client
|
|
|
+ var stateMachine = self.makeClientStateMachine(
|
|
|
+ targetState: .clientOpenServerOpen,
|
|
|
+ compressionEnabled: true
|
|
|
+ )
|
|
|
+
|
|
|
+ let originalMessage = [UInt8]([42, 42, 43, 43])
|
|
|
+
|
|
|
+ // Receiving uncompressed message should still work.
|
|
|
+ let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none)
|
|
|
+ XCTAssertNoThrow(try stateMachine.receive(buffer: receivedUncompressedBytes, endStream: false))
|
|
|
+ var receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .noMoreMessages, .awaitMoreMessages:
|
|
|
+ XCTFail("Should have received message")
|
|
|
+ case .receiveMessage(let receivedMessaged):
|
|
|
+ XCTAssertEqual(originalMessage, receivedMessaged)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving compressed message with deflate should work
|
|
|
+ let receivedDeflateCompressedBytes = try self.frameMessage(
|
|
|
+ originalMessage,
|
|
|
+ compression: .deflate
|
|
|
+ )
|
|
|
+ XCTAssertNoThrow(
|
|
|
+ try stateMachine.receive(buffer: receivedDeflateCompressedBytes, endStream: false)
|
|
|
+ )
|
|
|
+ receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .noMoreMessages, .awaitMoreMessages:
|
|
|
+ XCTFail("Should have received message")
|
|
|
+ case .receiveMessage(let receivedMessaged):
|
|
|
+ XCTAssertEqual(originalMessage, receivedMessaged)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving compressed message with gzip (unsupported) should throw error
|
|
|
+ let receivedGZIPCompressedBytes = try self.frameMessage(originalMessage, compression: .gzip)
|
|
|
+ XCTAssertThrowsError(
|
|
|
+ ofType: RPCError.self,
|
|
|
+ try stateMachine.receive(buffer: receivedGZIPCompressedBytes, endStream: false)
|
|
|
+ ) { error in
|
|
|
+ XCTAssertEqual(error.code, .internalError)
|
|
|
+ XCTAssertEqual(error.message, "Decompression error")
|
|
|
+ }
|
|
|
+ receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .awaitMoreMessages:
|
|
|
+ ()
|
|
|
+ case .noMoreMessages:
|
|
|
+ XCTFail("Should be awaiting for more messages")
|
|
|
+ case .receiveMessage:
|
|
|
+ XCTFail("Should not have received message")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
func testReceiveInitialMetadataWhenServerIdle() throws {
|
|
|
for targetState in [
|
|
|
TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle,
|
|
|
@@ -681,7 +774,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
|
|
|
|
|
|
let request = try stateMachine.nextOutboundFrame()
|
|
|
- let framedMessage = try self.frameMessage(originalMessage, compress: true)
|
|
|
+ let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
|
|
|
XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
|
|
|
}
|
|
|
|
|
|
@@ -697,7 +790,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
|
|
|
|
|
|
let request = try stateMachine.nextOutboundFrame()
|
|
|
- let framedMessage = try self.frameMessage(originalMessage, compress: true)
|
|
|
+ let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
|
|
|
XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
|
|
|
}
|
|
|
|
|
|
@@ -806,7 +899,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
)
|
|
|
|
|
|
let originalMessage = [UInt8]([42, 42, 43, 43])
|
|
|
- let receivedBytes = try self.frameMessage(originalMessage, compress: true)
|
|
|
+ let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.receive(buffer: receivedBytes, endStream: false),
|
|
|
.readInbound
|
|
|
@@ -920,7 +1013,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
|
|
|
|
|
|
let message = [UInt8]([1, 2, 3, 4])
|
|
|
- let framedMessage = try self.frameMessage(message, compress: false)
|
|
|
+ let framedMessage = try self.frameMessage(message, compression: .none)
|
|
|
try stateMachine.send(message: message, promise: nil)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.nextOutboundFrame(),
|
|
|
@@ -932,9 +1025,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
|
|
|
|
|
|
let firstResponseBytes = [UInt8]([5, 6, 7])
|
|
|
- let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
|
|
|
+ let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
|
|
|
let secondResponseBytes = [UInt8]([8, 9, 10])
|
|
|
- let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
|
|
|
+ let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.receive(buffer: firstResponse, endStream: false),
|
|
|
.readInbound
|
|
|
@@ -993,7 +1086,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
|
|
|
|
|
|
let message = [UInt8]([1, 2, 3, 4])
|
|
|
- let framedMessage = try self.frameMessage(message, compress: false)
|
|
|
+ let framedMessage = try self.frameMessage(message, compression: .none)
|
|
|
XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil))
|
|
|
XCTAssertNoThrow(try stateMachine.closeOutbound())
|
|
|
XCTAssertEqual(
|
|
|
@@ -1020,9 +1113,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
|
|
|
|
|
|
let firstResponseBytes = [UInt8]([5, 6, 7])
|
|
|
- let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
|
|
|
+ let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
|
|
|
let secondResponseBytes = [UInt8]([8, 9, 10])
|
|
|
- let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
|
|
|
+ let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.receive(buffer: firstResponse, endStream: false),
|
|
|
.readInbound
|
|
|
@@ -1078,7 +1171,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
|
|
|
|
|
|
let message = [UInt8]([1, 2, 3, 4])
|
|
|
- let framedMessage = try self.frameMessage(message, compress: false)
|
|
|
+ let framedMessage = try self.frameMessage(message, compression: .none)
|
|
|
try stateMachine.send(message: message, promise: nil)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.nextOutboundFrame(),
|
|
|
@@ -1107,9 +1200,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
|
|
|
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
|
|
|
|
|
|
let firstResponseBytes = [UInt8]([5, 6, 7])
|
|
|
- let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
|
|
|
+ let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
|
|
|
let secondResponseBytes = [UInt8]([8, 9, 10])
|
|
|
- let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
|
|
|
+ let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.receive(buffer: firstResponse, endStream: false),
|
|
|
.readInbound
|
|
|
@@ -1757,9 +1850,6 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- //TODO: add more encoding-related validation tests (for both client and server)
|
|
|
- // and message encoding tests
|
|
|
-
|
|
|
func testReceiveMetadataWhenClientOpenAndServerIdle() throws {
|
|
|
var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
|
|
|
|
|
|
@@ -1881,6 +1971,62 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ func testReceiveMessage_ServerCompressionEnabled() throws {
|
|
|
+ // Enable deflate compression on server
|
|
|
+ var stateMachine = self.makeServerStateMachine(
|
|
|
+ targetState: .clientOpenServerOpen,
|
|
|
+ compressionEnabled: true
|
|
|
+ )
|
|
|
+
|
|
|
+ let originalMessage = [UInt8]([42, 42, 43, 43])
|
|
|
+
|
|
|
+ // Receiving uncompressed message should still work.
|
|
|
+ let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none)
|
|
|
+ XCTAssertNoThrow(try stateMachine.receive(buffer: receivedUncompressedBytes, endStream: false))
|
|
|
+ var receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .noMoreMessages, .awaitMoreMessages:
|
|
|
+ XCTFail("Should have received message")
|
|
|
+ case .receiveMessage(let receivedMessaged):
|
|
|
+ XCTAssertEqual(originalMessage, receivedMessaged)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving compressed message with deflate should work
|
|
|
+ let receivedDeflateCompressedBytes = try self.frameMessage(
|
|
|
+ originalMessage,
|
|
|
+ compression: .deflate
|
|
|
+ )
|
|
|
+ XCTAssertNoThrow(
|
|
|
+ try stateMachine.receive(buffer: receivedDeflateCompressedBytes, endStream: false)
|
|
|
+ )
|
|
|
+ receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .noMoreMessages, .awaitMoreMessages:
|
|
|
+ XCTFail("Should have received message")
|
|
|
+ case .receiveMessage(let receivedMessaged):
|
|
|
+ XCTAssertEqual(originalMessage, receivedMessaged)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving compressed message with gzip (unsupported) should throw error
|
|
|
+ let receivedGZIPCompressedBytes = try self.frameMessage(originalMessage, compression: .gzip)
|
|
|
+ XCTAssertThrowsError(
|
|
|
+ ofType: RPCError.self,
|
|
|
+ try stateMachine.receive(buffer: receivedGZIPCompressedBytes, endStream: false)
|
|
|
+ ) { error in
|
|
|
+ XCTAssertEqual(error.code, .internalError)
|
|
|
+ XCTAssertEqual(error.message, "Decompression error")
|
|
|
+ }
|
|
|
+ receivedAction = stateMachine.nextInboundMessage()
|
|
|
+ switch receivedAction {
|
|
|
+ case .awaitMoreMessages:
|
|
|
+ ()
|
|
|
+ case .noMoreMessages:
|
|
|
+ XCTFail("Should be awaiting for more messages")
|
|
|
+ case .receiveMessage:
|
|
|
+ XCTFail("Should not have received message")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
func testReceiveMessageWhenClientOpenAndServerClosed() {
|
|
|
var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
|
|
|
|
|
|
@@ -1993,7 +2139,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
|
|
|
|
|
|
let response = try stateMachine.nextOutboundFrame()
|
|
|
- let framedMessage = try self.frameMessage(originalMessage, compress: true)
|
|
|
+ let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
|
|
|
XCTAssertEqual(response, .sendFrame(frame: framedMessage, promise: nil))
|
|
|
}
|
|
|
|
|
|
@@ -2125,7 +2271,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
)
|
|
|
|
|
|
let originalMessage = [UInt8]([42, 42, 43, 43])
|
|
|
- let receivedBytes = try self.frameMessage(originalMessage, compress: true)
|
|
|
+ let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate)
|
|
|
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.receive(buffer: receivedBytes, endStream: false),
|
|
|
@@ -2247,7 +2393,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
|
|
|
// Client sends messages
|
|
|
let deframedMessage = [UInt8]([1, 2, 3, 4])
|
|
|
- let completeMessage = try self.frameMessage(deframedMessage, compress: false)
|
|
|
+ let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
|
|
|
// Split message into two parts to make sure the stitching together of the frames works well
|
|
|
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
|
|
|
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
|
|
|
@@ -2276,7 +2422,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
try stateMachine.send(message: secondResponse, promise: secondPromise)
|
|
|
|
|
|
// Make sure messages are outbound
|
|
|
- let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
|
|
|
+ let framedMessages = try self.frameMessages(
|
|
|
+ [firstResponse, secondResponse],
|
|
|
+ compression: .none
|
|
|
+ )
|
|
|
|
|
|
guard
|
|
|
case .sendFrame(let nextOutboundByteBuffer, let nextOutboundPromise) =
|
|
|
@@ -2326,7 +2475,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
|
|
|
// Client sends messages
|
|
|
let deframedMessage = [UInt8]([1, 2, 3, 4])
|
|
|
- let completeMessage = try self.frameMessage(deframedMessage, compress: false)
|
|
|
+ let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
|
|
|
// Split message into two parts to make sure the stitching together of the frames works well
|
|
|
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
|
|
|
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
|
|
|
@@ -2368,7 +2517,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
try stateMachine.send(message: secondResponse, promise: nil)
|
|
|
|
|
|
// Make sure messages are outbound
|
|
|
- let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
|
|
|
+ let framedMessages = try self.frameMessages(
|
|
|
+ [firstResponse, secondResponse],
|
|
|
+ compression: .none
|
|
|
+ )
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.nextOutboundFrame(),
|
|
|
.sendFrame(frame: framedMessages, promise: nil)
|
|
|
@@ -2400,7 +2552,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
|
|
|
// Client sends messages
|
|
|
let deframedMessage = [UInt8]([1, 2, 3, 4])
|
|
|
- let completeMessage = try self.frameMessage(deframedMessage, compress: false)
|
|
|
+ let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
|
|
|
// Split message into two parts to make sure the stitching together of the frames works well
|
|
|
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
|
|
|
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
|
|
|
@@ -2442,7 +2594,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
|
|
|
try stateMachine.send(message: secondResponse, promise: nil)
|
|
|
|
|
|
// Make sure messages are outbound
|
|
|
- let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
|
|
|
+ let framedMessages = try self.frameMessages(
|
|
|
+ [firstResponse, secondResponse],
|
|
|
+ compression: .none
|
|
|
+ )
|
|
|
XCTAssertEqual(
|
|
|
try stateMachine.nextOutboundFrame(),
|
|
|
.sendFrame(frame: framedMessages, promise: nil)
|
|
|
@@ -2473,16 +2628,20 @@ extension XCTestCase {
|
|
|
try expression(trailers)
|
|
|
}
|
|
|
|
|
|
- func frameMessage(_ message: [UInt8], compress: Bool) throws -> ByteBuffer {
|
|
|
- try frameMessages([message], compress: compress)
|
|
|
+ func frameMessage(_ message: [UInt8], compression: CompressionAlgorithm) throws -> ByteBuffer {
|
|
|
+ try frameMessages([message], compression: compression)
|
|
|
}
|
|
|
|
|
|
- func frameMessages(_ messages: [[UInt8]], compress: Bool) throws -> ByteBuffer {
|
|
|
+ func frameMessages(_ messages: [[UInt8]], compression: CompressionAlgorithm) throws -> ByteBuffer
|
|
|
+ {
|
|
|
var framer = GRPCMessageFramer()
|
|
|
let compressor: Zlib.Compressor? = {
|
|
|
- if compress {
|
|
|
+ switch compression {
|
|
|
+ case .deflate:
|
|
|
return Zlib.Compressor(method: .deflate)
|
|
|
- } else {
|
|
|
+ case .gzip:
|
|
|
+ return Zlib.Compressor(method: .gzip)
|
|
|
+ default:
|
|
|
return nil
|
|
|
}
|
|
|
}()
|