GRPCMessageDecoder.swift 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import GRPCCore
  17. package import NIOCore
  18. /// A ``GRPCMessageDecoder`` helps with the deframing of gRPC data frames:
  19. /// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length
  20. /// - It reads and decompresses the payload, if compressed
  21. /// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport
  22. struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder {
  23. /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
  24. static let metadataLength = 5
  25. typealias InboundOut = [UInt8]
  26. private let decompressor: Zlib.Decompressor?
  27. private let maxPayloadSize: Int
  28. /// Create a new ``GRPCMessageDeframer``.
  29. /// - Parameters:
  30. /// - maxPayloadSize: The maximum size a message payload can be.
  31. /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages.
  32. /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean
  33. /// up any resources allocated by `Zlib`.
  34. init(
  35. maxPayloadSize: Int,
  36. decompressor: Zlib.Decompressor? = nil
  37. ) {
  38. self.maxPayloadSize = maxPayloadSize
  39. self.decompressor = decompressor
  40. }
  41. mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
  42. guard buffer.readableBytes >= Self.metadataLength else {
  43. // If we cannot read enough bytes to cover the metadata's length, then we
  44. // need to wait for more bytes to become available to us.
  45. return nil
  46. }
  47. // Store the current reader index in case we don't yet have enough
  48. // bytes in the buffer to decode a full frame, and need to reset it.
  49. // The force-unwraps for the compression flag and message length are safe,
  50. // because we've checked just above that we've got at least enough bytes to
  51. // read all of the metadata.
  52. let originalReaderIndex = buffer.readerIndex
  53. let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1
  54. let messageLength = buffer.readInteger(as: UInt32.self)!
  55. if messageLength > self.maxPayloadSize {
  56. throw RPCError(
  57. code: .resourceExhausted,
  58. message: """
  59. Message has exceeded the configured maximum payload size \
  60. (max: \(self.maxPayloadSize), actual: \(messageLength))
  61. """
  62. )
  63. }
  64. guard var message = buffer.readSlice(length: Int(messageLength)) else {
  65. // `ByteBuffer/readSlice(length:)` returns nil when there are not enough
  66. // bytes to read the requested length. This can happen if we don't yet have
  67. // enough bytes buffered to read the full message payload.
  68. // By reading the metadata though, we have already moved the reader index,
  69. // so we must reset it to its previous, original position for now,
  70. // and return. We'll try decoding again, once more bytes become available
  71. // in our buffer.
  72. buffer.moveReaderIndex(to: originalReaderIndex)
  73. return nil
  74. }
  75. if isMessageCompressed {
  76. guard let decompressor = self.decompressor else {
  77. // We cannot decompress the payload - throw an error.
  78. throw RPCError(
  79. code: .internalError,
  80. message: "Received a compressed message payload, but no decompressor has been configured."
  81. )
  82. }
  83. return try decompressor.decompress(&message, limit: self.maxPayloadSize)
  84. } else {
  85. return Array(buffer: message)
  86. }
  87. }
  88. mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
  89. try self.decode(buffer: &buffer)
  90. }
  91. }
  92. package struct GRPCMessageDeframer {
  93. private var decoder: GRPCMessageDecoder
  94. private var buffer: Optional<ByteBuffer>
  95. package var _readerIndex: Int? {
  96. self.buffer?.readerIndex
  97. }
  98. init(maxPayloadSize: Int, decompressor: Zlib.Decompressor?) {
  99. self.decoder = GRPCMessageDecoder(
  100. maxPayloadSize: maxPayloadSize,
  101. decompressor: decompressor
  102. )
  103. self.buffer = nil
  104. }
  105. package init(maxPayloadSize: Int) {
  106. self.decoder = GRPCMessageDecoder(maxPayloadSize: maxPayloadSize, decompressor: nil)
  107. self.buffer = nil
  108. }
  109. package mutating func append(_ buffer: ByteBuffer) {
  110. if self.buffer == nil || self.buffer!.readableBytes == 0 {
  111. self.buffer = buffer
  112. } else {
  113. // Avoid having too many read bytes in the buffer which can lead to the buffer growing much
  114. // larger than is necessary.
  115. let readerIndex = self.buffer!.readerIndex
  116. if readerIndex > 1024 && readerIndex > (self.buffer!.capacity / 2) {
  117. self.buffer!.discardReadBytes()
  118. }
  119. self.buffer!.writeImmutableBuffer(buffer)
  120. }
  121. }
  122. package mutating func decodeNext() throws -> [UInt8]? {
  123. guard (self.buffer?.readableBytes ?? 0) > 0 else { return nil }
  124. // Above checks mean this is both non-nil and non-empty.
  125. let message = try self.decoder.decode(buffer: &self.buffer!)
  126. return message
  127. }
  128. }
  129. extension GRPCMessageDeframer {
  130. mutating func decode(into queue: inout OneOrManyQueue<[UInt8]>) throws {
  131. while let next = try self.decodeNext() {
  132. queue.append(next)
  133. }
  134. }
  135. }