Browse Source

Remove unused code (#1549)

Motivation:

We added a coalescing message framer in #1539 and #1546. The old framer
is no longer used.

Modification:

- Remote the now unused message writer and tests

Result:

Less unused code.
George Barnett 2 years ago
parent
commit
71f87ccd83

+ 0 - 133
Sources/GRPC/LengthPrefixedMessageWriter.swift

@@ -1,133 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import NIOCore
-import NIOHPACK
-
-internal struct LengthPrefixedMessageWriter {
-  static let metadataLength = 5
-
-  /// The compression algorithm to use, if one should be used.
-  let compression: CompressionAlgorithm?
-  private let compressor: Zlib.Deflate?
-
-  /// Whether the compression message flag should be set.
-  private var shouldSetCompressionFlag: Bool {
-    return self.compression != nil
-  }
-
-  /// A scratch buffer that we encode messages into: if the buffer isn't held elsewhere then we
-  /// can avoid having to allocate a new one.
-  private var scratch: ByteBuffer
-
-  init(compression: CompressionAlgorithm? = nil, allocator: ByteBufferAllocator) {
-    self.compression = compression
-    self.scratch = allocator.buffer(capacity: 0)
-
-    switch self.compression?.algorithm {
-    case .none, .some(.identity):
-      self.compressor = nil
-    case .some(.deflate):
-      self.compressor = Zlib.Deflate(format: .deflate)
-    case .some(.gzip):
-      self.compressor = Zlib.Deflate(format: .gzip)
-    }
-  }
-
-  private mutating func compress(
-    buffer: ByteBuffer,
-    using compressor: Zlib.Deflate
-  ) throws -> ByteBuffer {
-    // The compressor will allocate the correct size. For now the leading 5 bytes will do.
-    self.scratch.clear(minimumCapacity: 5)
-    // Set the compression byte.
-    self.scratch.writeInteger(UInt8(1))
-    // Set the length to zero; we'll write the actual value in a moment.
-    let payloadSizeIndex = self.scratch.writerIndex
-    self.scratch.writeInteger(UInt32(0))
-
-    let bytesWritten: Int
-
-    do {
-      var buffer = buffer
-      bytesWritten = try compressor.deflate(&buffer, into: &self.scratch)
-    } catch {
-      throw error
-    }
-
-    // Now fill in the message length.
-    self.scratch.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
-
-    // Finally, the compression context should be reset between messages.
-    compressor.reset()
-
-    return self.scratch
-  }
-
-  /// Writes the readable bytes of `buffer` as a gRPC length-prefixed message.
-  ///
-  /// - Parameters:
-  ///   - buffer: The bytes to compress and length-prefix.
-  ///   - compressed: Whether the bytes should be compressed. This is ignored if not compression
-  ///     mechanism was configured on this writer.
-  /// - Returns: A buffer containing the length prefixed bytes.
-  mutating func write(
-    buffer: ByteBuffer,
-    compressed: Bool = true
-  ) throws -> (ByteBuffer, ByteBuffer?) {
-    if compressed, let compressor = self.compressor {
-      let compressedAndFramedPayload = try self.compress(buffer: buffer, using: compressor)
-      return (compressedAndFramedPayload, nil)
-    } else if buffer.readableBytes > Self.singleBufferSizeLimit {
-      // Buffer is larger than the limit for emitting a single buffer: create a second buffer
-      // containing just the message header.
-      self.scratch.clear(minimumCapacity: 5)
-      self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
-      return (self.scratch, buffer)
-    } else {
-      // We're not compressing and the message is within our single buffer size limit.
-      self.scratch.clear(minimumCapacity: 5 &+ buffer.readableBytes)
-      self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
-      self.scratch.writeImmutableBuffer(buffer)
-      return (self.scratch, nil)
-    }
-  }
-
-  /// Message size above which we emit two buffers: one containing the header and one with the
-  /// actual message bytes. At or below the limit we copy the message into a new buffer containing
-  /// both the header and the message.
-  ///
-  /// Using two buffers avoids expensive copies of large messages. For smaller messages the copy
-  /// is cheaper than the additional allocations and overhead required to send an extra HTTP/2 DATA
-  /// frame.
-  ///
-  /// The value of 8192 was chosen empirically. We subtract the length of the message header
-  /// as `ByteBuffer` reserve capacity in powers of two and want to avoid overallocating.
-  private static let singleBufferSizeLimit = 8192 - 5
-}
-
-extension ByteBuffer {
-  @discardableResult
-  mutating func writePayloadLength(_ length: UInt32, at index: Int) -> Int {
-    let writerIndex = self.writerIndex
-    defer {
-      self.moveWriterIndex(to: writerIndex)
-    }
-
-    self.moveWriterIndex(to: index)
-    return self.writeInteger(length)
-  }
-}

+ 39 - 50
Tests/GRPCTests/GRPCClientStateMachineTests.swift

@@ -38,16 +38,21 @@ class GRPCClientStateMachineTests: GRPCTestCase {
   func writeMessage(_ message: String) throws -> ByteBuffer {
     let buffer = self.allocator.buffer(string: message)
 
-    var writer = LengthPrefixedMessageWriter(compression: .none, allocator: .init())
-    var (buffer1, buffer2) = try writer.write(
-      buffer: buffer,
-      compressed: false
-    )
-
-    if var buffer2 = buffer2 {
-      buffer1.writeBuffer(&buffer2)
+    var writer = CoalescingLengthPrefixedMessageWriter(compression: .none, allocator: .init())
+    writer.append(buffer: buffer, compress: false, promise: nil)
+
+    var result: ByteBuffer?
+    while let next = writer.next() {
+      switch next.0 {
+      case let .success(buffer):
+        result.setOrWriteImmutableBuffer(buffer)
+      case let .failure(error):
+        throw error
+      }
     }
-    return buffer1
+
+    // We wrote a message, we must get at least one buffer out (or throw).
+    return result!
   }
 
   /// Writes a message into the given `buffer`.
@@ -1119,6 +1124,14 @@ extension GRPCClientStateMachineTests {
 class ReadStateTests: GRPCTestCase {
   var allocator = ByteBufferAllocator()
 
+  func writeMessage(_ message: String) -> ByteBuffer {
+    var buffer = self.allocator.buffer(capacity: 5 + message.utf8.count)
+    buffer.writeInteger(UInt8(0))
+    buffer.writeInteger(UInt32(message.utf8.count))
+    buffer.writeBytes(message.utf8)
+    return buffer
+  }
+
   func testReadWhenNoExpectedMessages() {
     var state: ReadState = .notReading
     var buffer = self.allocator.buffer(capacity: 0)
@@ -1129,17 +1142,13 @@ class ReadStateTests: GRPCTestCase {
   }
 
   func testReadWithLeftOverBytesForOneExpectedMessage() throws {
-    // Write a message into the buffer:
-    let message = ByteBuffer(string: "Hello!")
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-    var buffers = try writer.write(buffer: message)
-    XCTAssertNil(buffers.1)
+    var buffer = self.writeMessage("Hello!")
     // And some extra junk bytes:
     let bytes: [UInt8] = [0x00]
-    buffers.0.writeBytes(bytes)
+    buffer.writeBytes(bytes)
 
     var state: ReadState = .one()
-    state.readMessages(&buffers.0, maxLength: .max).assertFailure {
+    state.readMessages(&buffer, maxLength: .max).assertFailure {
       XCTAssertEqual($0, .leftOverBytes)
     }
     state.assertNotReading()
@@ -1147,31 +1156,22 @@ class ReadStateTests: GRPCTestCase {
 
   func testReadTooManyMessagesForOneExpectedMessages() throws {
     // Write a message into the buffer twice:
-    let message = ByteBuffer(string: "Hello!")
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-    var buffers1 = try writer.write(buffer: message)
-    var buffers2 = try writer.write(buffer: message)
-    XCTAssertNil(buffers1.1)
-    XCTAssertNil(buffers2.1)
-    buffers1.0.writeBuffer(&buffers2.0)
+    var buffer1 = self.writeMessage("Hello!")
+    let buffer2 = buffer1
+    buffer1.writeImmutableBuffer(buffer2)
 
     var state: ReadState = .one()
-    state.readMessages(&buffers1.0, maxLength: .max).assertFailure {
+    state.readMessages(&buffer1, maxLength: .max).assertFailure {
       XCTAssertEqual($0, .cardinalityViolation)
     }
     state.assertNotReading()
   }
 
   func testReadOneMessageForOneExpectedMessages() throws {
-    // Write a message into the buffer twice:
-    let message = ByteBuffer(string: "Hello!")
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-    var (buffer, other) = try writer.write(buffer: message)
-    XCTAssertNil(other)
-
+    var buffer = self.writeMessage("Hello!")
     var state: ReadState = .one()
     state.readMessages(&buffer, maxLength: .max).assertSuccess {
-      XCTAssertEqual($0, [message])
+      XCTAssertEqual($0, [ByteBuffer(string: "Hello!")])
     }
 
     // We shouldn't be able to read anymore.
@@ -1179,15 +1179,10 @@ class ReadStateTests: GRPCTestCase {
   }
 
   func testReadOneMessageForManyExpectedMessages() throws {
-    // Write a message into the buffer twice:
-    let message = ByteBuffer(string: "Hello!")
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-    var (buffer, other) = try writer.write(buffer: message)
-    XCTAssertNil(other)
-
+    var buffer = self.writeMessage("Hello!")
     var state: ReadState = .many()
     state.readMessages(&buffer, maxLength: .max).assertSuccess {
-      XCTAssertEqual($0, [message])
+      XCTAssertEqual($0, [ByteBuffer(string: "Hello!")])
     }
 
     // We should still be able to read.
@@ -1195,20 +1190,14 @@ class ReadStateTests: GRPCTestCase {
   }
 
   func testReadManyMessagesForManyExpectedMessages() throws {
-    // Write a message into the buffer twice:
-    let message = ByteBuffer(string: "Hello!")
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-
-    var (first, _) = try writer.write(buffer: message)
-    var (second, _) = try writer.write(buffer: message)
-    var (third, _) = try writer.write(buffer: message)
-
-    first.writeBuffer(&second)
-    first.writeBuffer(&third)
+    let lengthPrefixed = self.writeMessage("Hello!")
+    var buffer = lengthPrefixed
+    buffer.writeImmutableBuffer(lengthPrefixed)
+    buffer.writeImmutableBuffer(lengthPrefixed)
 
     var state: ReadState = .many()
-    state.readMessages(&first, maxLength: .max).assertSuccess {
-      XCTAssertEqual($0, [message, message, message])
+    state.readMessages(&buffer, maxLength: .max).assertSuccess {
+      XCTAssertEqual($0, Array(repeating: ByteBuffer(string: "Hello!"), count: 3))
     }
 
     // We should still be able to read.

+ 0 - 109
Tests/GRPCTests/LengthPrefixedMessageWriterTests.swift

@@ -1,109 +0,0 @@
-/*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@testable import GRPC
-import NIOCore
-import XCTest
-
-class LengthPrefixedMessageWriterTests: GRPCTestCase {
-  func testWriteBytesWithNoLeadingSpaceOrCompression() throws {
-    var writer = LengthPrefixedMessageWriter()
-    let allocator = ByteBufferAllocator()
-    let buffer = allocator.buffer(bytes: [1, 2, 3])
-
-    var (prefixed, other) = try writer.write(buffer: buffer)
-    XCTAssertNil(other)
-    XCTAssertEqual(prefixed.readInteger(as: UInt8.self), 0)
-    XCTAssertEqual(prefixed.readInteger(as: UInt32.self), 3)
-    XCTAssertEqual(prefixed.readBytes(length: 3), [1, 2, 3])
-    XCTAssertEqual(prefixed.readableBytes, 0)
-  }
-
-  func testWriteBytesWithLeadingSpaceAndNoCompression() throws {
-    var writer = LengthPrefixedMessageWriter()
-    let allocator = ByteBufferAllocator()
-
-    var buffer = allocator.buffer(bytes: Array(repeating: 0, count: 5) + [1, 2, 3])
-    buffer.moveReaderIndex(forwardBy: 5)
-
-    var (prefixed, other) = try writer.write(buffer: buffer)
-    XCTAssertNil(other)
-    XCTAssertEqual(prefixed.readInteger(as: UInt8.self), 0)
-    XCTAssertEqual(prefixed.readInteger(as: UInt32.self), 3)
-    XCTAssertEqual(prefixed.readBytes(length: 3), [1, 2, 3])
-    XCTAssertEqual(prefixed.readableBytes, 0)
-  }
-
-  func testWriteBytesWithNoLeadingSpaceAndCompression() throws {
-    var writer = LengthPrefixedMessageWriter(compression: .gzip)
-    let allocator = ByteBufferAllocator()
-
-    let buffer = allocator.buffer(bytes: [1, 2, 3])
-    var (prefixed, other) = try writer.write(buffer: buffer)
-    XCTAssertNil(other)
-
-    XCTAssertEqual(prefixed.readInteger(as: UInt8.self), 1)
-    let size = prefixed.readInteger(as: UInt32.self)!
-    XCTAssertGreaterThanOrEqual(size, 0)
-    XCTAssertNotNil(prefixed.readBytes(length: Int(size)))
-    XCTAssertEqual(prefixed.readableBytes, 0)
-  }
-
-  func testWriteBytesWithLeadingSpaceAndCompression() throws {
-    var writer = LengthPrefixedMessageWriter(compression: .gzip)
-    let allocator = ByteBufferAllocator()
-
-    var buffer = allocator.buffer(bytes: Array(repeating: 0, count: 5) + [1, 2, 3])
-    buffer.moveReaderIndex(forwardBy: 5)
-    var (prefixed, other) = try writer.write(buffer: buffer)
-    XCTAssertNil(other)
-
-    XCTAssertEqual(prefixed.readInteger(as: UInt8.self), 1)
-    let size = prefixed.readInteger(as: UInt32.self)!
-    XCTAssertGreaterThanOrEqual(size, 0)
-    XCTAssertNotNil(prefixed.readBytes(length: Int(size)))
-    XCTAssertEqual(prefixed.readableBytes, 0)
-  }
-
-  func testLargeCompressedPayloadEmitsOneBuffer() throws {
-    var writer = LengthPrefixedMessageWriter(compression: .gzip)
-    let allocator = ByteBufferAllocator()
-    let message = ByteBuffer(repeating: 0, count: 16 * 1024 * 1024)
-
-    var (lengthPrefixed, other) = try writer.write(buffer: message)
-    XCTAssertNil(other)
-    XCTAssertEqual(lengthPrefixed.readInteger(as: UInt8.self), 1)
-    let length = lengthPrefixed.readInteger(as: UInt32.self)
-    XCTAssertEqual(length, UInt32(lengthPrefixed.readableBytes))
-  }
-
-  func testLargeUncompressedPayloadEmitsTwoBuffers() throws {
-    var writer = LengthPrefixedMessageWriter(compression: .none)
-    let allocator = ByteBufferAllocator()
-    let message = ByteBuffer(repeating: 0, count: 16 * 1024 * 1024)
-
-    var (header, payload) = try writer.write(buffer: message)
-    XCTAssertEqual(header.readInteger(as: UInt8.self), 0)
-    XCTAssertEqual(header.readInteger(as: UInt32.self), UInt32(message.readableBytes))
-    XCTAssertEqual(header.readableBytes, 0)
-    XCTAssertEqual(payload, message)
-  }
-}
-
-extension LengthPrefixedMessageWriter {
-  init(compression: CompressionAlgorithm? = nil) {
-    self.init(compression: compression, allocator: .init())
-  }
-}