GRPCMessageDeframer.swift 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. /// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames:
  19. /// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length
  20. /// - It reads and decompresses the payload, if compressed
  21. /// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport
  22. struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder {
  23. /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
  24. static let metadataLength = 5
  25. static let defaultMaximumPayloadSize = Int.max
  26. typealias InboundOut = [UInt8]
  27. private let decompressor: Zlib.Decompressor?
  28. private let maximumPayloadSize: Int
  29. /// Create a new ``GRPCMessageDeframer``.
  30. /// - Parameters:
  31. /// - maximumPayloadSize: The maximum size a message payload can be.
  32. /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages.
  33. /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean
  34. /// up any resources allocated by `Zlib`.
  35. init(
  36. maximumPayloadSize: Int = Self.defaultMaximumPayloadSize,
  37. decompressor: Zlib.Decompressor? = nil
  38. ) {
  39. self.maximumPayloadSize = maximumPayloadSize
  40. self.decompressor = decompressor
  41. }
  42. mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
  43. guard buffer.readableBytes >= Self.metadataLength else {
  44. // If we cannot read enough bytes to cover the metadata's length, then we
  45. // need to wait for more bytes to become available to us.
  46. return nil
  47. }
  48. // Store the current reader index in case we don't yet have enough
  49. // bytes in the buffer to decode a full frame, and need to reset it.
  50. // The force-unwraps for the compression flag and message length are safe,
  51. // because we've checked just above that we've got at least enough bytes to
  52. // read all of the metadata.
  53. let originalReaderIndex = buffer.readerIndex
  54. let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1
  55. let messageLength = buffer.readInteger(as: UInt32.self)!
  56. if messageLength > self.maximumPayloadSize {
  57. throw RPCError(
  58. code: .resourceExhausted,
  59. message: """
  60. Message has exceeded the configured maximum payload size \
  61. (max: \(self.maximumPayloadSize), actual: \(messageLength))
  62. """
  63. )
  64. }
  65. guard var message = buffer.readSlice(length: Int(messageLength)) else {
  66. // `ByteBuffer/readSlice(length:)` returns nil when there are not enough
  67. // bytes to read the requested length. This can happen if we don't yet have
  68. // enough bytes buffered to read the full message payload.
  69. // By reading the metadata though, we have already moved the reader index,
  70. // so we must reset it to its previous, original position for now,
  71. // and return. We'll try decoding again, once more bytes become available
  72. // in our buffer.
  73. buffer.moveReaderIndex(to: originalReaderIndex)
  74. return nil
  75. }
  76. if isMessageCompressed {
  77. guard let decompressor = self.decompressor else {
  78. // We cannot decompress the payload - throw an error.
  79. throw RPCError(
  80. code: .internalError,
  81. message: "Received a compressed message payload, but no decompressor has been configured."
  82. )
  83. }
  84. return try decompressor.decompress(&message, limit: self.maximumPayloadSize)
  85. } else {
  86. return Array(buffer: message)
  87. }
  88. }
  89. mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
  90. try self.decode(buffer: &buffer)
  91. }
  92. }