|
|
@@ -32,15 +32,13 @@ import Logging
|
|
|
/// [gRPC Protocol](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md)
|
|
|
public class LengthPrefixedMessageReader {
|
|
|
public typealias Mode = GRPCError.Origin
|
|
|
- let logger: Logger
|
|
|
|
|
|
/// The mechanism that messages will be compressed with.
|
|
|
public var compressionMechanism: CompressionMechanism
|
|
|
|
|
|
- public init(mode: Mode, compressionMechanism: CompressionMechanism, logger: Logger) {
|
|
|
+ public init(mode: Mode, compressionMechanism: CompressionMechanism) {
|
|
|
self.mode = mode
|
|
|
self.compressionMechanism = compressionMechanism
|
|
|
- self.logger = logger
|
|
|
}
|
|
|
|
|
|
/// The result of trying to parse a message with the bytes we currently have.
|
|
|
@@ -63,11 +61,7 @@ public class LengthPrefixedMessageReader {
|
|
|
|
|
|
private let mode: Mode
|
|
|
private var buffer: ByteBuffer!
|
|
|
- private var state: ParseState = .expectingCompressedFlag {
|
|
|
- didSet {
|
|
|
- self.logger.trace("parse state changed from \(oldValue) to \(self.state)")
|
|
|
- }
|
|
|
- }
|
|
|
+ private var state: ParseState = .expectingCompressedFlag
|
|
|
|
|
|
/// Returns the number of unprocessed bytes.
|
|
|
internal var unprocessedBytes: Int {
|
|
|
@@ -89,15 +83,12 @@ public class LengthPrefixedMessageReader {
|
|
|
guard buffer.readableBytes > 0 else {
|
|
|
return
|
|
|
}
|
|
|
- self.logger.trace("appending \(buffer.readableBytes) bytes to buffer")
|
|
|
|
|
|
if self.buffer == nil {
|
|
|
- self.logger.trace("creating new buffer from slice")
|
|
|
self.buffer = buffer.slice()
|
|
|
// mark the bytes as "read"
|
|
|
buffer.moveReaderIndex(forwardBy: buffer.readableBytes)
|
|
|
} else {
|
|
|
- self.logger.trace("copying bytes into existing buffer")
|
|
|
self.buffer.writeBuffer(&buffer)
|
|
|
}
|
|
|
}
|
|
|
@@ -117,7 +108,6 @@ public class LengthPrefixedMessageReader {
|
|
|
return try nextMessage()
|
|
|
|
|
|
case .message(let message):
|
|
|
- self.logger.trace("read length-prefixed message")
|
|
|
self.nilBufferIfPossible()
|
|
|
return message
|
|
|
}
|
|
|
@@ -128,42 +118,34 @@ public class LengthPrefixedMessageReader {
|
|
|
/// This allows the next call to `append` to avoid writing the contents of the appended buffer.
|
|
|
private func nilBufferIfPossible() {
|
|
|
if self.buffer?.readableBytes == 0 {
|
|
|
- self.logger.trace("no readable bytes; nilling-out buffer")
|
|
|
self.buffer = nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private func processNextState() throws -> ParseResult {
|
|
|
guard self.buffer != nil else {
|
|
|
- self.logger.trace("no buffer to read from")
|
|
|
return .needMoreData
|
|
|
}
|
|
|
|
|
|
switch self.state {
|
|
|
case .expectingCompressedFlag:
|
|
|
guard let compressionFlag: Int8 = self.buffer.readInteger() else {
|
|
|
- self.logger.trace("1 more byte needed to read compression flag")
|
|
|
return .needMoreData
|
|
|
}
|
|
|
- self.logger.trace("read 1 byte compression flag: \(compressionFlag)")
|
|
|
try self.handleCompressionFlag(enabled: compressionFlag != 0)
|
|
|
self.state = .expectingMessageLength
|
|
|
|
|
|
case .expectingMessageLength:
|
|
|
guard let messageLength: UInt32 = self.buffer.readInteger() else {
|
|
|
- self.logger.trace("\(4 - buffer.readableBytes) more bytes needed to read message length")
|
|
|
return .needMoreData
|
|
|
}
|
|
|
- self.logger.trace("read 4 byte message length: \(messageLength)")
|
|
|
self.state = .expectingMessage(messageLength)
|
|
|
|
|
|
case .expectingMessage(let length):
|
|
|
let signedLength: Int = numericCast(length)
|
|
|
guard let message = self.buffer.readSlice(length: signedLength) else {
|
|
|
- self.logger.trace("\(signedLength - buffer.readableBytes) more bytes needed to read message")
|
|
|
return .needMoreData
|
|
|
}
|
|
|
- self.logger.trace("read \(message.readableBytes) byte message")
|
|
|
self.state = .expectingCompressedFlag
|
|
|
return .message(message)
|
|
|
}
|
|
|
@@ -173,18 +155,14 @@ public class LengthPrefixedMessageReader {
|
|
|
|
|
|
private func handleCompressionFlag(enabled flagEnabled: Bool) throws {
|
|
|
guard flagEnabled else {
|
|
|
- self.logger.trace("compression is not enabled for this message")
|
|
|
return
|
|
|
}
|
|
|
- self.logger.trace("compression is enabled for this message")
|
|
|
|
|
|
guard self.compressionMechanism.requiresFlag else {
|
|
|
- self.logger.error("compression flag was set but '\(self.compressionMechanism)' does not require it")
|
|
|
throw GRPCError.common(.unexpectedCompression, origin: mode)
|
|
|
}
|
|
|
|
|
|
guard self.compressionMechanism.supported else {
|
|
|
- self.logger.error("compression mechanism '\(self.compressionMechanism)' is not supported")
|
|
|
throw GRPCError.common(.unsupportedCompressionMechanism(compressionMechanism.rawValue), origin: mode)
|
|
|
}
|
|
|
}
|