Explorar el Código

Discard excess read bytes in LengthPrefixedMessageReader. (#781)

It is possible, in some use-cases, for LengthPrefixedMessageReader to
build up a buffer with loads of "free" space at the front that is not
usable, but that cannot be freed due to incoming messages arriving
sufficiently quickly.

We should, from time to time, try to discard those messages by
compacting the buffer. It's hard to have a good heuristic here, so this
patch represents a first-pass: if there's more than 1kB of usable space
in the buffer that we could free up with compaction, and if that space
is larger than the usable bytes in the buffer, we should try to obtain
it.

This effectively doubles the size of the buffer "for free", which is
what a resizing would do anyway. It also makes a subsequent resizing
cheaper, as we no longer need to copy the leading bytes to preserve the
reader index.
Cory Benfield hace 5 años
padre
commit
d975125c8b

+ 16 - 1
Sources/GRPC/LengthPrefixedMessageReader.swift

@@ -78,6 +78,11 @@ internal struct LengthPrefixedMessageReader {
     return self.buffer.map { $0.readableBytes } ?? 0
   }
 
+  /// Returns the number of bytes that have been consumed and not discarded.
+  internal var _consumedNonDiscardedBytes: Int {
+    return self.buffer.map { $0.readerIndex } ?? 0
+  }
+
   /// Whether the reader is mid-way through reading a message.
   internal var isReading: Bool {
     switch self.state {
@@ -127,8 +132,18 @@ internal struct LengthPrefixedMessageReader {
   ///
   /// This allows the next call to `append` to avoid writing the contents of the appended buffer.
   private mutating func nilBufferIfPossible() {
-    if self.buffer?.readableBytes == 0 {
+    let readableBytes = self.buffer?.readableBytes ?? 0
+    let readerIndex = self.buffer?.readerIndex ?? 0
+    let capacity = self.buffer?.capacity ?? 0
+
+    if readableBytes == 0 {
       self.buffer = nil
+    } else if readerIndex > 1024 && readerIndex > (capacity / 2) {
+      // A rough-heuristic: if there is a kilobyte of read data, and there is more data that
+      // has been read than there is space in the rest of the buffer, we'll try to discard some
+      // read bytes here. We're trying to avoid doing this if there is loads of writable bytes that
+      // we'll have to shift.
+      self.buffer?.discardReadBytes()
     }
   }
 

+ 24 - 0
Tests/GRPCTests/LengthPrefixedMessageReaderTests.swift

@@ -236,4 +236,28 @@ class LengthPrefixedMessageReaderTests: GRPCTestCase {
 
     XCTAssertEqual(0, buffer.readableBytes)
   }
+
+  func testExcessiveBytesAreDiscarded() throws {
+    // We're going to use a 1kB message here for ease of testing.
+    let message = Array(repeating: UInt8(0), count: 1024)
+    let largeMessage: [UInt8] = [
+        0x00,                       // 1-byte compression flag
+        0x00, 0x00, 0x04, 0x00,     // 4-byte message length (1024)
+    ] + message
+    var buffer = byteBuffer(withBytes: largeMessage)
+    buffer.writeBytes(largeMessage)
+    buffer.writeBytes(largeMessage)
+    reader.append(buffer: &buffer)
+
+    XCTAssertEqual(reader.unprocessedBytes, (1024 + 5) * 3)
+    XCTAssertEqual(reader._consumedNonDiscardedBytes, 0)
+
+    self.assertMessagesEqual(expected: message, actual: try reader.nextMessage())
+    XCTAssertEqual(reader.unprocessedBytes, (1024 + 5) * 2)
+    XCTAssertEqual(reader._consumedNonDiscardedBytes, 1024 + 5)
+
+    self.assertMessagesEqual(expected: message, actual: try reader.nextMessage())
+    XCTAssertEqual(reader.unprocessedBytes, 1024 + 5)
+    XCTAssertEqual(reader._consumedNonDiscardedBytes, 0)
+  }
 }

+ 1 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -499,6 +499,7 @@ extension LengthPrefixedMessageReaderTests {
     // to regenerate.
     static let __allTests__LengthPrefixedMessageReaderTests = [
         ("testAppendReadsAllBytes", testAppendReadsAllBytes),
+        ("testExcessiveBytesAreDiscarded", testExcessiveBytesAreDiscarded),
         ("testNextMessageDeliveredAcrossMultipleByteBuffers", testNextMessageDeliveredAcrossMultipleByteBuffers),
         ("testNextMessageDoesNotThrowWhenCompressionFlagIsExpectedButNotSet", testNextMessageDoesNotThrowWhenCompressionFlagIsExpectedButNotSet),
         ("testNextMessageReturnsMessageForZeroLengthMessage", testNextMessageReturnsMessageForZeroLengthMessage),