CoalescingLengthPrefixedMessageWriterTests.swift 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. * Copyright 2022, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOEmbedded
  18. import XCTest
  19. @testable import GRPC
  20. internal final class CoalescingLengthPrefixedMessageWriterTests: GRPCTestCase {
  21. private let loop = EmbeddedEventLoop()
  22. private func makeWriter(
  23. compression: CompressionAlgorithm? = .none
  24. ) -> CoalescingLengthPrefixedMessageWriter {
  25. return .init(compression: compression, allocator: .init())
  26. }
  27. private func testSingleSmallWrite(withPromise: Bool) throws {
  28. var writer = self.makeWriter()
  29. let promise = withPromise ? self.loop.makePromise(of: Void.self) : nil
  30. writer.append(buffer: .smallEnoughToCoalesce, compress: false, promise: promise)
  31. let (result, maybePromise) = try XCTUnwrap(writer.next())
  32. try result.assertValue { buffer in
  33. var buffer = buffer
  34. let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
  35. XCTAssertFalse(compressed)
  36. XCTAssertEqual(length, UInt32(ByteBuffer.smallEnoughToCoalesce.readableBytes))
  37. XCTAssertEqual(buffer.readSlice(length: Int(length)), .smallEnoughToCoalesce)
  38. XCTAssertEqual(buffer.readableBytes, 0)
  39. }
  40. // No more bufers.
  41. XCTAssertNil(writer.next())
  42. if withPromise {
  43. XCTAssertNotNil(maybePromise)
  44. } else {
  45. XCTAssertNil(maybePromise)
  46. }
  47. // Don't leak the promise.
  48. maybePromise?.succeed(())
  49. }
  50. private func testMultipleSmallWrites(withPromise: Bool) throws {
  51. var writer = self.makeWriter()
  52. let messages = 100
  53. for _ in 0 ..< messages {
  54. let promise = withPromise ? self.loop.makePromise(of: Void.self) : nil
  55. writer.append(buffer: .smallEnoughToCoalesce, compress: false, promise: promise)
  56. }
  57. let (result, maybePromise) = try XCTUnwrap(writer.next())
  58. try result.assertValue { buffer in
  59. var buffer = buffer
  60. // Read all the messages.
  61. for _ in 0 ..< messages {
  62. let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
  63. XCTAssertFalse(compressed)
  64. XCTAssertEqual(length, UInt32(ByteBuffer.smallEnoughToCoalesce.readableBytes))
  65. XCTAssertEqual(buffer.readSlice(length: Int(length)), .smallEnoughToCoalesce)
  66. }
  67. XCTAssertEqual(buffer.readableBytes, 0)
  68. }
  69. // No more bufers.
  70. XCTAssertNil(writer.next())
  71. if withPromise {
  72. XCTAssertNotNil(maybePromise)
  73. } else {
  74. XCTAssertNil(maybePromise)
  75. }
  76. // Don't leak the promise.
  77. maybePromise?.succeed(())
  78. }
  79. func testSingleSmallWriteWithPromise() throws {
  80. try self.testSingleSmallWrite(withPromise: true)
  81. }
  82. func testSingleSmallWriteWithoutPromise() throws {
  83. try self.testSingleSmallWrite(withPromise: false)
  84. }
  85. func testMultipleSmallWriteWithPromise() throws {
  86. try self.testMultipleSmallWrites(withPromise: true)
  87. }
  88. func testMultipleSmallWriteWithoutPromise() throws {
  89. try self.testMultipleSmallWrites(withPromise: false)
  90. }
  91. func testSingleLargeMessage() throws {
  92. var writer = self.makeWriter()
  93. writer.append(buffer: .tooBigToCoalesce, compress: false, promise: nil)
  94. let (result1, promise1) = try XCTUnwrap(writer.next())
  95. XCTAssertNil(promise1)
  96. try result1.assertValue { buffer in
  97. var buffer = buffer
  98. let (compress, length) = try XCTUnwrap(buffer.readMessageHeader())
  99. XCTAssertFalse(compress)
  100. XCTAssertEqual(Int(length), ByteBuffer.tooBigToCoalesce.readableBytes)
  101. XCTAssertEqual(buffer.readableBytes, 0)
  102. }
  103. let (result2, promise2) = try XCTUnwrap(writer.next())
  104. XCTAssertNil(promise2)
  105. result2.assertValue { buffer in
  106. XCTAssertEqual(buffer, .tooBigToCoalesce)
  107. }
  108. XCTAssertNil(writer.next())
  109. }
  110. func testMessagesBeforeLargeAreCoalesced() throws {
  111. var writer = self.makeWriter()
  112. // First two should be coalesced. The third should be split as two buffers.
  113. writer.append(buffer: .smallEnoughToCoalesce, compress: false, promise: nil)
  114. writer.append(buffer: .smallEnoughToCoalesce, compress: false, promise: nil)
  115. writer.append(buffer: .tooBigToCoalesce, compress: false, promise: nil)
  116. let (result1, _) = try XCTUnwrap(writer.next())
  117. try result1.assertValue { buffer in
  118. var buffer = buffer
  119. for _ in 0 ..< 2 {
  120. let (compress, length) = try XCTUnwrap(buffer.readMessageHeader())
  121. XCTAssertFalse(compress)
  122. XCTAssertEqual(Int(length), ByteBuffer.smallEnoughToCoalesce.readableBytes)
  123. XCTAssertEqual(buffer.readSlice(length: Int(length)), .smallEnoughToCoalesce)
  124. }
  125. XCTAssertEqual(buffer.readableBytes, 0)
  126. }
  127. let (result2, _) = try XCTUnwrap(writer.next())
  128. try result2.assertValue { buffer in
  129. var buffer = buffer
  130. let (compress, length) = try XCTUnwrap(buffer.readMessageHeader())
  131. XCTAssertFalse(compress)
  132. XCTAssertEqual(Int(length), ByteBuffer.tooBigToCoalesce.readableBytes)
  133. XCTAssertEqual(buffer.readableBytes, 0)
  134. }
  135. let (result3, _) = try XCTUnwrap(writer.next())
  136. result3.assertValue { buffer in
  137. XCTAssertEqual(buffer, .tooBigToCoalesce)
  138. }
  139. XCTAssertNil(writer.next())
  140. }
  141. func testCompressedMessagesAreAlwaysCoalesced() throws {
  142. var writer = self.makeWriter(compression: .gzip)
  143. writer.append(buffer: .smallEnoughToCoalesce, compress: false, promise: nil)
  144. writer.append(buffer: .tooBigToCoalesce, compress: true, promise: nil)
  145. let (result, _) = try XCTUnwrap(writer.next())
  146. try result.assertValue { buffer in
  147. var buffer = buffer
  148. let (compress1, length1) = try XCTUnwrap(buffer.readMessageHeader())
  149. XCTAssertFalse(compress1)
  150. XCTAssertEqual(Int(length1), ByteBuffer.smallEnoughToCoalesce.readableBytes)
  151. XCTAssertEqual(buffer.readSlice(length: Int(length1)), .smallEnoughToCoalesce)
  152. let (compress2, length2) = try XCTUnwrap(buffer.readMessageHeader())
  153. XCTAssertTrue(compress2)
  154. // Can't assert the length or the content, only that the length must be equal
  155. // to the number of remaining bytes.
  156. XCTAssertEqual(Int(length2), buffer.readableBytes)
  157. }
  158. XCTAssertNil(writer.next())
  159. }
  160. }
  161. extension Result {
  162. func assertValue(_ body: (Success) throws -> Void) rethrows {
  163. switch self {
  164. case let .success(success):
  165. try body(success)
  166. case let .failure(error):
  167. XCTFail("Unexpected failure: \(error)")
  168. }
  169. }
  170. }
  171. extension ByteBuffer {
  172. fileprivate static let smallEnoughToCoalesce = Self(repeating: 42, count: 128)
  173. fileprivate static let tooBigToCoalesce = Self(
  174. repeating: 42,
  175. count: CoalescingLengthPrefixedMessageWriter.singleBufferSizeLimit + 1
  176. )
  177. mutating func readMessageHeader() -> (Bool, UInt32)? {
  178. if let (compressed, length) = self.readMultipleIntegers(as: (UInt8, UInt32).self) {
  179. return (compressed != 0, length)
  180. } else {
  181. return nil
  182. }
  183. }
  184. }