GRPCMessageDecoderTests.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOTestUtils
  19. import XCTest
  20. @testable import GRPCNIOTransportCore
  21. final class GRPCMessageDecoderTests: XCTestCase {
  22. func testReadMultipleMessagesWithoutCompression() throws {
  23. let firstMessage = {
  24. var buffer = ByteBuffer()
  25. buffer.writeInteger(UInt8(0))
  26. buffer.writeInteger(UInt32(16))
  27. buffer.writeRepeatingByte(42, count: 16)
  28. return buffer
  29. }()
  30. let secondMessage = {
  31. var buffer = ByteBuffer()
  32. buffer.writeInteger(UInt8(0))
  33. buffer.writeInteger(UInt32(8))
  34. buffer.writeRepeatingByte(43, count: 8)
  35. return buffer
  36. }()
  37. try ByteToMessageDecoderVerifier.verifyDecoder(
  38. inputOutputPairs: [
  39. (firstMessage, [Array(repeating: UInt8(42), count: 16)]),
  40. (secondMessage, [Array(repeating: UInt8(43), count: 8)]),
  41. ]) {
  42. GRPCMessageDecoder(maxPayloadSize: .max)
  43. }
  44. }
  45. func testReadMessageOverSizeLimitWithoutCompression() throws {
  46. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  47. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  48. var buffer = ByteBuffer()
  49. buffer.writeInteger(UInt8(0))
  50. buffer.writeInteger(UInt32(101))
  51. buffer.writeRepeatingByte(42, count: 101)
  52. XCTAssertThrowsError(
  53. ofType: RPCError.self,
  54. try processor.process(buffer: buffer) { _ in
  55. XCTFail("No message should be produced.")
  56. }
  57. ) { error in
  58. XCTAssertEqual(error.code, .resourceExhausted)
  59. XCTAssertEqual(
  60. error.message,
  61. "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
  62. )
  63. }
  64. }
  65. func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws {
  66. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  67. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  68. var buffer = ByteBuffer()
  69. buffer.writeInteger(UInt8(0))
  70. // Set the message length field to be over the maximum payload size, but
  71. // don't write the actual message bytes. This is to ensure that the payload
  72. // size limit is enforced _before_ the payload is actually read.
  73. buffer.writeInteger(UInt32(101))
  74. XCTAssertThrowsError(
  75. ofType: RPCError.self,
  76. try processor.process(buffer: buffer) { _ in
  77. XCTFail("No message should be produced.")
  78. }
  79. ) { error in
  80. XCTAssertEqual(error.code, .resourceExhausted)
  81. XCTAssertEqual(
  82. error.message,
  83. "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
  84. )
  85. }
  86. }
  87. func testCompressedMessageWithoutConfiguringDecompressor() throws {
  88. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  89. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  90. var buffer = ByteBuffer()
  91. buffer.writeInteger(UInt8(1))
  92. buffer.writeInteger(UInt32(10))
  93. buffer.writeRepeatingByte(42, count: 10)
  94. XCTAssertThrowsError(
  95. ofType: RPCError.self,
  96. try processor.process(buffer: buffer) { _ in
  97. XCTFail("No message should be produced.")
  98. }
  99. ) { error in
  100. XCTAssertEqual(error.code, .internalError)
  101. XCTAssertEqual(
  102. error.message,
  103. "Received a compressed message payload, but no decompressor has been configured."
  104. )
  105. }
  106. }
  107. private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws {
  108. let decompressor = Zlib.Decompressor(method: method)
  109. let compressor = Zlib.Compressor(method: method)
  110. var framer = GRPCMessageFramer()
  111. defer {
  112. decompressor.end()
  113. compressor.end()
  114. }
  115. let firstMessage = try {
  116. framer.append(Array(repeating: 42, count: 100), promise: nil)
  117. return try framer.next(compressor: compressor)!
  118. }()
  119. let secondMessage = try {
  120. framer.append(Array(repeating: 43, count: 110), promise: nil)
  121. return try framer.next(compressor: compressor)!
  122. }()
  123. try ByteToMessageDecoderVerifier.verifyDecoder(
  124. inputOutputPairs: [
  125. (firstMessage.bytes, [Array(repeating: 42, count: 100)]),
  126. (secondMessage.bytes, [Array(repeating: 43, count: 110)]),
  127. ]) {
  128. GRPCMessageDecoder(maxPayloadSize: 1000, decompressor: decompressor)
  129. }
  130. }
  131. func testReadMultipleMessagesWithDeflateCompression() throws {
  132. try self.testReadMultipleMessagesWithCompression(method: .deflate)
  133. }
  134. func testReadMultipleMessagesWithGZIPCompression() throws {
  135. try self.testReadMultipleMessagesWithCompression(method: .gzip)
  136. }
  137. func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws {
  138. let deframer = GRPCMessageDecoder(maxPayloadSize: 1)
  139. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  140. let compressor = Zlib.Compressor(method: .gzip)
  141. var framer = GRPCMessageFramer()
  142. defer {
  143. compressor.end()
  144. }
  145. framer.append(Array(repeating: 42, count: 100), promise: nil)
  146. let framedMessage = try framer.next(compressor: compressor)!
  147. XCTAssertThrowsError(
  148. ofType: RPCError.self,
  149. try processor.process(buffer: framedMessage.bytes) { _ in
  150. XCTFail("No message should be produced.")
  151. }
  152. ) { error in
  153. XCTAssertEqual(error.code, .resourceExhausted)
  154. XCTAssertEqual(
  155. error.message,
  156. """
  157. Message has exceeded the configured maximum payload size \
  158. (max: 1, actual: \(framedMessage.bytes.readableBytes - GRPCMessageDecoder.metadataLength))
  159. """
  160. )
  161. }
  162. }
  163. private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws {
  164. let decompressor = Zlib.Decompressor(method: method)
  165. let deframer = GRPCMessageDecoder(maxPayloadSize: 100, decompressor: decompressor)
  166. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  167. let compressor = Zlib.Compressor(method: method)
  168. var framer = GRPCMessageFramer()
  169. defer {
  170. decompressor.end()
  171. compressor.end()
  172. }
  173. framer.append(Array(repeating: 42, count: 101), promise: nil)
  174. let framedMessage = try framer.next(compressor: compressor)!
  175. XCTAssertThrowsError(
  176. ofType: RPCError.self,
  177. try processor.process(buffer: framedMessage.bytes) { _ in
  178. XCTFail("No message should be produced.")
  179. }
  180. ) { error in
  181. XCTAssertEqual(error.code, .resourceExhausted)
  182. XCTAssertEqual(error.message, "Message is too large to decompress.")
  183. }
  184. }
  185. func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws {
  186. try self.testReadDecompressedMessageOverSizeLimit(method: .deflate)
  187. }
  188. func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws {
  189. try self.testReadDecompressedMessageOverSizeLimit(method: .gzip)
  190. }
  191. }
  192. extension GRPCMessageFramer {
  193. mutating func next(
  194. compressor: Zlib.Compressor? = nil
  195. ) throws(RPCError) -> (bytes: ByteBuffer, promise: EventLoopPromise<Void>?)? {
  196. if let (result, promise) = self.nextResult(compressor: compressor) {
  197. switch result {
  198. case .success(let buffer):
  199. return (bytes: buffer, promise: promise)
  200. case .failure(let error):
  201. promise?.fail(error)
  202. throw error
  203. }
  204. } else {
  205. return nil
  206. }
  207. }
  208. }