GRPCMessageDecoder.swift 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import GRPCCore
  17. package import NIOCore
  18. /// A ``GRPCMessageDecoder`` helps with the deframing of gRPC data frames:
  19. /// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length
  20. /// - It reads and decompresses the payload, if compressed
  21. /// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport
  22. @available(gRPCSwiftNIOTransport 1.0, *)
  23. struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder {
  24. /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
  25. static let metadataLength = 5
  26. typealias InboundOut = ByteBuffer
  27. private let decompressor: Zlib.Decompressor?
  28. private let maxPayloadSize: Int
  29. /// Create a new ``GRPCMessageDeframer``.
  30. /// - Parameters:
  31. /// - maxPayloadSize: The maximum size a message payload can be.
  32. /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages.
  33. /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean
  34. /// up any resources allocated by `Zlib`.
  35. init(
  36. maxPayloadSize: Int,
  37. decompressor: Zlib.Decompressor? = nil
  38. ) {
  39. self.maxPayloadSize = maxPayloadSize
  40. self.decompressor = decompressor
  41. }
  42. mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
  43. guard buffer.readableBytes >= Self.metadataLength else {
  44. // If we cannot read enough bytes to cover the metadata's length, then we
  45. // need to wait for more bytes to become available to us.
  46. return nil
  47. }
  48. // Store the current reader index in case we don't yet have enough
  49. // bytes in the buffer to decode a full frame, and need to reset it.
  50. // The force-unwraps for the compression flag and message length are safe,
  51. // because we've checked just above that we've got at least enough bytes to
  52. // read all of the metadata.
  53. let originalReaderIndex = buffer.readerIndex
  54. let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1
  55. let messageLength = buffer.readInteger(as: UInt32.self)!
  56. if messageLength > self.maxPayloadSize {
  57. throw RPCError(
  58. code: .resourceExhausted,
  59. message: """
  60. Message has exceeded the configured maximum payload size \
  61. (max: \(self.maxPayloadSize), actual: \(messageLength))
  62. """
  63. )
  64. }
  65. guard var message = buffer.readSlice(length: Int(messageLength)) else {
  66. // `ByteBuffer/readSlice(length:)` returns nil when there are not enough
  67. // bytes to read the requested length. This can happen if we don't yet have
  68. // enough bytes buffered to read the full message payload.
  69. // By reading the metadata though, we have already moved the reader index,
  70. // so we must reset it to its previous, original position for now,
  71. // and return. We'll try decoding again, once more bytes become available
  72. // in our buffer.
  73. buffer.moveReaderIndex(to: originalReaderIndex)
  74. return nil
  75. }
  76. if isMessageCompressed {
  77. guard let decompressor = self.decompressor else {
  78. // We cannot decompress the payload - throw an error.
  79. throw RPCError(
  80. code: .internalError,
  81. message: "Received a compressed message payload, but no decompressor has been configured."
  82. )
  83. }
  84. return try decompressor.decompress(&message, limit: self.maxPayloadSize)
  85. } else {
  86. return message
  87. }
  88. }
  89. mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
  90. try self.decode(buffer: &buffer)
  91. }
  92. }
  93. @available(gRPCSwiftNIOTransport 1.0, *)
  94. package struct GRPCMessageDeframer {
  95. private var decoder: GRPCMessageDecoder
  96. private var buffer: Optional<ByteBuffer>
  97. package var _readerIndex: Int? {
  98. self.buffer?.readerIndex
  99. }
  100. init(maxPayloadSize: Int, decompressor: Zlib.Decompressor?) {
  101. self.decoder = GRPCMessageDecoder(
  102. maxPayloadSize: maxPayloadSize,
  103. decompressor: decompressor
  104. )
  105. self.buffer = nil
  106. }
  107. package init(maxPayloadSize: Int) {
  108. self.decoder = GRPCMessageDecoder(maxPayloadSize: maxPayloadSize, decompressor: nil)
  109. self.buffer = nil
  110. }
  111. package mutating func append(_ buffer: ByteBuffer) {
  112. if self.buffer == nil || self.buffer!.readableBytes == 0 {
  113. self.buffer = buffer
  114. } else {
  115. // Avoid having too many read bytes in the buffer which can lead to the buffer growing much
  116. // larger than is necessary.
  117. let readerIndex = self.buffer!.readerIndex
  118. if readerIndex > 1024 && readerIndex > (self.buffer!.capacity / 2) {
  119. self.buffer!.discardReadBytes()
  120. }
  121. self.buffer!.writeImmutableBuffer(buffer)
  122. }
  123. }
  124. package mutating func decodeNext() throws -> ByteBuffer? {
  125. guard (self.buffer?.readableBytes ?? 0) > 0 else { return nil }
  126. // Above checks mean this is both non-nil and non-empty.
  127. let message = try self.decoder.decode(buffer: &self.buffer!)
  128. return message
  129. }
  130. }
  131. @available(gRPCSwiftNIOTransport 1.0, *)
  132. extension GRPCMessageDeframer {
  133. mutating func decode(into queue: inout OneOrManyQueue<ByteBuffer>) throws {
  134. while let next = try self.decodeNext() {
  135. queue.append(next)
  136. }
  137. }
  138. }