GRPCMessageDecoderTests.swift 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOTestUtils
  19. import XCTest
  20. @testable import GRPCNIOTransportCore
  21. @available(gRPCSwiftNIOTransport 2.0, *)
  22. final class GRPCMessageDecoderTests: XCTestCase {
  23. func testReadMultipleMessagesWithoutCompression() throws {
  24. let firstMessage = {
  25. var buffer = ByteBuffer()
  26. buffer.writeInteger(UInt8(0))
  27. buffer.writeInteger(UInt32(16))
  28. buffer.writeRepeatingByte(42, count: 16)
  29. return buffer
  30. }()
  31. let secondMessage = {
  32. var buffer = ByteBuffer()
  33. buffer.writeInteger(UInt8(0))
  34. buffer.writeInteger(UInt32(8))
  35. buffer.writeRepeatingByte(43, count: 8)
  36. return buffer
  37. }()
  38. try ByteToMessageDecoderVerifier.verifyDecoder(
  39. inputOutputPairs: [
  40. (firstMessage, [ByteBuffer(repeating: UInt8(42), count: 16)]),
  41. (secondMessage, [ByteBuffer(repeating: UInt8(43), count: 8)]),
  42. ]) {
  43. GRPCMessageDecoder(maxPayloadSize: .max)
  44. }
  45. }
  46. func testReadMessageOverSizeLimitWithoutCompression() throws {
  47. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  48. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  49. var buffer = ByteBuffer()
  50. buffer.writeInteger(UInt8(0))
  51. buffer.writeInteger(UInt32(101))
  52. buffer.writeRepeatingByte(42, count: 101)
  53. XCTAssertThrowsError(
  54. ofType: RPCError.self,
  55. try processor.process(buffer: buffer) { _ in
  56. XCTFail("No message should be produced.")
  57. }
  58. ) { error in
  59. XCTAssertEqual(error.code, .resourceExhausted)
  60. XCTAssertEqual(
  61. error.message,
  62. "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
  63. )
  64. }
  65. }
  66. func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws {
  67. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  68. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  69. var buffer = ByteBuffer()
  70. buffer.writeInteger(UInt8(0))
  71. // Set the message length field to be over the maximum payload size, but
  72. // don't write the actual message bytes. This is to ensure that the payload
  73. // size limit is enforced _before_ the payload is actually read.
  74. buffer.writeInteger(UInt32(101))
  75. XCTAssertThrowsError(
  76. ofType: RPCError.self,
  77. try processor.process(buffer: buffer) { _ in
  78. XCTFail("No message should be produced.")
  79. }
  80. ) { error in
  81. XCTAssertEqual(error.code, .resourceExhausted)
  82. XCTAssertEqual(
  83. error.message,
  84. "Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
  85. )
  86. }
  87. }
  88. func testCompressedMessageWithoutConfiguringDecompressor() throws {
  89. let deframer = GRPCMessageDecoder(maxPayloadSize: 100)
  90. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  91. var buffer = ByteBuffer()
  92. buffer.writeInteger(UInt8(1))
  93. buffer.writeInteger(UInt32(10))
  94. buffer.writeRepeatingByte(42, count: 10)
  95. XCTAssertThrowsError(
  96. ofType: RPCError.self,
  97. try processor.process(buffer: buffer) { _ in
  98. XCTFail("No message should be produced.")
  99. }
  100. ) { error in
  101. XCTAssertEqual(error.code, .internalError)
  102. XCTAssertEqual(
  103. error.message,
  104. "Received a compressed message payload, but no decompressor has been configured."
  105. )
  106. }
  107. }
  108. private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws {
  109. let decompressor = Zlib.Decompressor(method: method)
  110. let compressor = Zlib.Compressor(method: method)
  111. var framer = GRPCMessageFramer()
  112. defer {
  113. decompressor.end()
  114. compressor.end()
  115. }
  116. let firstMessage = try {
  117. framer.append(ByteBuffer(repeating: 42, count: 100), promise: nil)
  118. return try framer.next(compressor: compressor)!
  119. }()
  120. let secondMessage = try {
  121. framer.append(ByteBuffer(repeating: 43, count: 110), promise: nil)
  122. return try framer.next(compressor: compressor)!
  123. }()
  124. try ByteToMessageDecoderVerifier.verifyDecoder(
  125. inputOutputPairs: [
  126. (firstMessage.bytes, [ByteBuffer(repeating: 42, count: 100)]),
  127. (secondMessage.bytes, [ByteBuffer(repeating: 43, count: 110)]),
  128. ]) {
  129. GRPCMessageDecoder(maxPayloadSize: 1000, decompressor: decompressor)
  130. }
  131. }
  132. func testReadMultipleMessagesWithDeflateCompression() throws {
  133. try self.testReadMultipleMessagesWithCompression(method: .deflate)
  134. }
  135. func testReadMultipleMessagesWithGZIPCompression() throws {
  136. try self.testReadMultipleMessagesWithCompression(method: .gzip)
  137. }
  138. func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws {
  139. let deframer = GRPCMessageDecoder(maxPayloadSize: 1)
  140. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  141. let compressor = Zlib.Compressor(method: .gzip)
  142. var framer = GRPCMessageFramer()
  143. defer {
  144. compressor.end()
  145. }
  146. framer.append(ByteBuffer(repeating: 42, count: 100), promise: nil)
  147. let framedMessage = try framer.next(compressor: compressor)!
  148. XCTAssertThrowsError(
  149. ofType: RPCError.self,
  150. try processor.process(buffer: framedMessage.bytes) { _ in
  151. XCTFail("No message should be produced.")
  152. }
  153. ) { error in
  154. XCTAssertEqual(error.code, .resourceExhausted)
  155. XCTAssertEqual(
  156. error.message,
  157. """
  158. Message has exceeded the configured maximum payload size \
  159. (max: 1, actual: \(framedMessage.bytes.readableBytes - GRPCMessageDecoder.metadataLength))
  160. """
  161. )
  162. }
  163. }
  164. private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws {
  165. let decompressor = Zlib.Decompressor(method: method)
  166. let deframer = GRPCMessageDecoder(maxPayloadSize: 100, decompressor: decompressor)
  167. let processor = NIOSingleStepByteToMessageProcessor(deframer)
  168. let compressor = Zlib.Compressor(method: method)
  169. var framer = GRPCMessageFramer()
  170. defer {
  171. decompressor.end()
  172. compressor.end()
  173. }
  174. framer.append(ByteBuffer(repeating: 42, count: 101), promise: nil)
  175. let framedMessage = try framer.next(compressor: compressor)!
  176. XCTAssertThrowsError(
  177. ofType: RPCError.self,
  178. try processor.process(buffer: framedMessage.bytes) { _ in
  179. XCTFail("No message should be produced.")
  180. }
  181. ) { error in
  182. XCTAssertEqual(error.code, .resourceExhausted)
  183. XCTAssertEqual(error.message, "Message is too large to decompress.")
  184. }
  185. }
  186. func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws {
  187. try self.testReadDecompressedMessageOverSizeLimit(method: .deflate)
  188. }
  189. func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws {
  190. try self.testReadDecompressedMessageOverSizeLimit(method: .gzip)
  191. }
  192. }
  193. @available(gRPCSwiftNIOTransport 2.0, *)
  194. extension GRPCMessageFramer {
  195. mutating func next(
  196. compressor: Zlib.Compressor? = nil
  197. ) throws(RPCError) -> (bytes: ByteBuffer, promise: EventLoopPromise<Void>?)? {
  198. if let (result, promise) = self.nextResult(compressor: compressor) {
  199. switch result {
  200. case .success(let buffer):
  201. return (bytes: buffer, promise: promise)
  202. case .failure(let error):
  203. promise?.fail(error)
  204. throw error
  205. }
  206. } else {
  207. return nil
  208. }
  209. }
  210. }