소스 검색

Add `GRPCMessageDeframer` (#1776)

Gustavo Cairo 1 년 전
부모
커밋
a6802a80fa
3개의 변경된 파일324개의 추가작업 그리고 0개의 파일을 삭제
  1. 2 0
      Package.swift
  2. 103 0
      Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift
  3. 219 0
      Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift

+ 2 - 0
Package.swift

@@ -128,6 +128,7 @@ extension Target.Dependency {
     name: "NIOTransportServices",
     package: "swift-nio-transport-services"
   )
+  static let nioTestUtils: Self = .product(name: "NIOTestUtils", package: "swift-nio")
   static let logging: Self = .product(name: "Logging", package: "swift-log")
   static let protobuf: Self = .product(name: "SwiftProtobuf", package: "swift-protobuf")
   static let protobufPluginLibrary: Self = .product(
@@ -312,6 +313,7 @@ extension Target {
       .nioCore,
       .nioHTTP2,
       .nioEmbedded,
+      .nioTestUtils,
     ]
   )
   

+ 103 - 0
Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift

@@ -0,0 +1,103 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+import NIOCore
+
+/// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames:
+/// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length
+/// - It reads and decompresses the payload, if compressed
+/// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport
+struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder {
+  /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
+  static let metadataLength = 5
+  static let defaultMaximumPayloadSize = Int.max
+
+  typealias InboundOut = [UInt8]
+
+  private let decompressor: Zlib.Decompressor?
+  private let maximumPayloadSize: Int
+
+  /// Create a new ``GRPCMessageDeframer``.
+  /// - Parameters:
+  ///   - maximumPayloadSize: The maximum size a message payload can be.
+  ///   - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages.
+  /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean
+  /// up any resources allocated by `Zlib`.
+  init(
+    maximumPayloadSize: Int = Self.defaultMaximumPayloadSize,
+    decompressor: Zlib.Decompressor? = nil
+  ) {
+    self.maximumPayloadSize = maximumPayloadSize
+    self.decompressor = decompressor
+  }
+
+  mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
+    guard buffer.readableBytes >= Self.metadataLength else {
+      // If we cannot read enough bytes to cover the metadata's length, then we
+      // need to wait for more bytes to become available to us.
+      return nil
+    }
+
+    // Store the current reader index in case we don't yet have enough
+    // bytes in the buffer to decode a full frame, and need to reset it.
+    // The force-unwraps for the compression flag and message length are safe,
+    // because we've checked just above that we've got at least enough bytes to
+    // read all of the metadata.
+    let originalReaderIndex = buffer.readerIndex
+    let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1
+    let messageLength = buffer.readInteger(as: UInt32.self)!
+
+    if messageLength > self.maximumPayloadSize {
+      throw RPCError(
+        code: .resourceExhausted,
+        message: """
+          Message has exceeded the configured maximum payload size \
+          (max: \(self.maximumPayloadSize), actual: \(messageLength))
+          """
+      )
+    }
+
+    guard var message = buffer.readSlice(length: Int(messageLength)) else {
+      // `ByteBuffer/readSlice(length:)` returns nil when there are not enough
+      // bytes to read the requested length. This can happen if we don't yet have
+      // enough bytes buffered to read the full message payload.
+      // By reading the metadata though, we have already moved the reader index,
+      // so we must reset it to its previous, original position for now,
+      // and return. We'll try decoding again, once more bytes become available
+      // in our buffer.
+      buffer.moveReaderIndex(to: originalReaderIndex)
+      return nil
+    }
+
+    if isMessageCompressed {
+      guard let decompressor = self.decompressor else {
+        // We cannot decompress the payload - throw an error.
+        throw RPCError(
+          code: .internalError,
+          message: "Received a compressed message payload, but no decompressor has been configured."
+        )
+      }
+      return try decompressor.decompress(&message, limit: self.maximumPayloadSize)
+    } else {
+      return Array(buffer: message)
+    }
+  }
+
+  mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
+    try self.decode(buffer: &buffer)
+  }
+}

+ 219 - 0
Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift

@@ -0,0 +1,219 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+import NIOCore
+import NIOTestUtils
+import XCTest
+
+@testable import GRPCHTTP2Core
+
+final class GRPCMessageDeframerTests: XCTestCase {
+  func testReadMultipleMessagesWithoutCompression() throws {
+    let firstMessage = {
+      var buffer = ByteBuffer()
+      buffer.writeInteger(UInt8(0))
+      buffer.writeInteger(UInt32(16))
+      buffer.writeRepeatingByte(42, count: 16)
+      return buffer
+    }()
+
+    let secondMessage = {
+      var buffer = ByteBuffer()
+      buffer.writeInteger(UInt8(0))
+      buffer.writeInteger(UInt32(8))
+      buffer.writeRepeatingByte(43, count: 8)
+      return buffer
+    }()
+
+    try ByteToMessageDecoderVerifier.verifyDecoder(
+      inputOutputPairs: [
+        (firstMessage, [Array(repeating: UInt8(42), count: 16)]),
+        (secondMessage, [Array(repeating: UInt8(43), count: 8)]),
+      ]) {
+        GRPCMessageDeframer()
+      }
+  }
+
+  func testReadMessageOverSizeLimitWithoutCompression() throws {
+    let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
+    let processor = NIOSingleStepByteToMessageProcessor(deframer)
+
+    var buffer = ByteBuffer()
+    buffer.writeInteger(UInt8(0))
+    buffer.writeInteger(UInt32(101))
+    buffer.writeRepeatingByte(42, count: 101)
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try processor.process(buffer: buffer) { _ in
+        XCTFail("No message should be produced.")
+      }
+    ) { error in
+      XCTAssertEqual(error.code, .resourceExhausted)
+      XCTAssertEqual(
+        error.message,
+        "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
+      )
+    }
+  }
+
+  func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws {
+    let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
+    let processor = NIOSingleStepByteToMessageProcessor(deframer)
+
+    var buffer = ByteBuffer()
+    buffer.writeInteger(UInt8(0))
+    // Set the message length field to be over the maximum payload size, but
+    // don't write the actual message bytes. This is to ensure that the payload
+    // size limit is enforced _before_ the payload is actually read.
+    buffer.writeInteger(UInt32(101))
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try processor.process(buffer: buffer) { _ in
+        XCTFail("No message should be produced.")
+      }
+    ) { error in
+      XCTAssertEqual(error.code, .resourceExhausted)
+      XCTAssertEqual(
+        error.message,
+        "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
+      )
+    }
+  }
+
+  func testCompressedMessageWithoutConfiguringDecompressor() throws {
+    let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
+    let processor = NIOSingleStepByteToMessageProcessor(deframer)
+
+    var buffer = ByteBuffer()
+    buffer.writeInteger(UInt8(1))
+    buffer.writeInteger(UInt32(10))
+    buffer.writeRepeatingByte(42, count: 10)
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try processor.process(buffer: buffer) { _ in
+        XCTFail("No message should be produced.")
+      }
+    ) { error in
+      XCTAssertEqual(error.code, .internalError)
+      XCTAssertEqual(
+        error.message,
+        "Received a compressed message payload, but no decompressor has been configured."
+      )
+    }
+  }
+
+  private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws {
+    let decompressor = Zlib.Decompressor(method: method)
+    let compressor = Zlib.Compressor(method: method)
+    var framer = GRPCMessageFramer()
+    defer {
+      decompressor.end()
+      compressor.end()
+    }
+
+    let firstMessage = try {
+      framer.append(Array(repeating: 42, count: 100))
+      return try framer.next(compressor: compressor)!
+    }()
+
+    let secondMessage = try {
+      framer.append(Array(repeating: 43, count: 110))
+      return try framer.next(compressor: compressor)!
+    }()
+
+    try ByteToMessageDecoderVerifier.verifyDecoder(
+      inputOutputPairs: [
+        (firstMessage, [Array(repeating: 42, count: 100)]),
+        (secondMessage, [Array(repeating: 43, count: 110)]),
+      ]) {
+        GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor)
+      }
+  }
+
+  func testReadMultipleMessagesWithDeflateCompression() throws {
+    try self.testReadMultipleMessagesWithCompression(method: .deflate)
+  }
+
+  func testReadMultipleMessagesWithGZIPCompression() throws {
+    try self.testReadMultipleMessagesWithCompression(method: .gzip)
+  }
+
+  func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws {
+    let deframer = GRPCMessageDeframer(maximumPayloadSize: 1)
+    let processor = NIOSingleStepByteToMessageProcessor(deframer)
+    let compressor = Zlib.Compressor(method: .gzip)
+    var framer = GRPCMessageFramer()
+    defer {
+      compressor.end()
+    }
+
+    framer.append(Array(repeating: 42, count: 100))
+    let framedMessage = try framer.next(compressor: compressor)!
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try processor.process(buffer: framedMessage) { _ in
+        XCTFail("No message should be produced.")
+      }
+    ) { error in
+      XCTAssertEqual(error.code, .resourceExhausted)
+      XCTAssertEqual(
+        error.message,
+        """
+        Message has exceeded the configured maximum payload size \
+        (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength))
+        """
+      )
+    }
+  }
+
+  private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws {
+    let decompressor = Zlib.Decompressor(method: method)
+    let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor)
+    let processor = NIOSingleStepByteToMessageProcessor(deframer)
+    let compressor = Zlib.Compressor(method: method)
+    var framer = GRPCMessageFramer()
+    defer {
+      decompressor.end()
+      compressor.end()
+    }
+
+    framer.append(Array(repeating: 42, count: 101))
+    let framedMessage = try framer.next(compressor: compressor)!
+
+    XCTAssertThrowsError(
+      ofType: RPCError.self,
+      try processor.process(buffer: framedMessage) { _ in
+        XCTFail("No message should be produced.")
+      }
+    ) { error in
+      XCTAssertEqual(error.code, .resourceExhausted)
+      XCTAssertEqual(error.message, "Message is too large to decompress.")
+    }
+  }
+
+  func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws {
+    try self.testReadDecompressedMessageOverSizeLimit(method: .deflate)
+  }
+
+  func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws {
+    try self.testReadDecompressedMessageOverSizeLimit(method: .gzip)
+  }
+}