GRPCMessageFramer.swift 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. internal import GRPCCore
  17. internal import NIOCore
  18. /// A ``GRPCMessageFramer`` helps with the framing of gRPC data frames:
  19. /// - It prepends data with the required metadata (compression flag and message length).
  20. /// - It compresses messages using the specified compression algorithm (if configured).
  21. /// - It coalesces multiple messages (appended into the `Framer` by calling ``append(_:compress:)``)
  22. /// into a single `ByteBuffer`.
  23. struct GRPCMessageFramer {
  24. /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
  25. static let metadataLength = 5
  26. /// Maximum size the `writeBuffer` can be when concatenating multiple frames.
  27. /// This limit will not be considered if only a single message/frame is written into the buffer, meaning
  28. /// frames with messages over 64KB can still be written.
  29. /// - Note: This is expressed as the power of 2 closer to 64KB (i.e., 64KiB) because `ByteBuffer`
  30. /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
  31. static let maximumWriteBufferLength = 65_536
  32. private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise<Void>?)>
  33. private var writeBuffer: ByteBuffer
  34. /// Create a new ``GRPCMessageFramer``.
  35. init() {
  36. self.pendingMessages = OneOrManyQueue()
  37. self.writeBuffer = ByteBuffer()
  38. }
  39. /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
  40. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
  41. mutating func append(_ bytes: [UInt8], promise: EventLoopPromise<Void>?) {
  42. self.pendingMessages.append((bytes, promise))
  43. }
  44. /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
  45. /// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
  46. /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
  47. /// they'll be framed as-is.
  48. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
  49. mutating func nextResult(
  50. compressor: Zlib.Compressor? = nil
  51. ) -> (result: Result<ByteBuffer, RPCError>, promise: EventLoopPromise<Void>?)? {
  52. if self.pendingMessages.isEmpty {
  53. // Nothing pending: exit early.
  54. return nil
  55. }
  56. defer {
  57. // To avoid holding an excessively large buffer, if its size is larger than
  58. // our threshold (`maximumWriteBufferLength`), then reset it to a new `ByteBuffer`.
  59. if self.writeBuffer.capacity > Self.maximumWriteBufferLength {
  60. self.writeBuffer = ByteBuffer()
  61. }
  62. }
  63. var requiredCapacity = 0
  64. for message in self.pendingMessages {
  65. requiredCapacity += message.bytes.count + Self.metadataLength
  66. }
  67. self.writeBuffer.clear(minimumCapacity: requiredCapacity)
  68. var pendingWritePromise: EventLoopPromise<Void>?
  69. while let message = self.pendingMessages.pop() {
  70. pendingWritePromise.setOrCascade(to: message.promise)
  71. do {
  72. try self.encode(message.bytes, compressor: compressor)
  73. } catch let rpcError {
  74. return (result: .failure(rpcError), promise: pendingWritePromise)
  75. }
  76. }
  77. return (result: .success(self.writeBuffer), promise: pendingWritePromise)
  78. }
  79. private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws(RPCError) {
  80. if let compressor {
  81. self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag
  82. // Write zeroes as length - we'll write the actual compressed size after compression.
  83. let lengthIndex = self.writeBuffer.writerIndex
  84. self.writeBuffer.writeInteger(UInt32(0))
  85. // Compress and overwrite the payload length field with the right length.
  86. do {
  87. let writtenBytes = try compressor.compress(message, into: &self.writeBuffer)
  88. self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex)
  89. } catch let zlibError {
  90. throw RPCError(code: .internalError, message: "Compression failed", cause: zlibError)
  91. }
  92. } else {
  93. self.writeBuffer.writeMultipleIntegers(
  94. UInt8(0), // Clear compression flag
  95. UInt32(message.count) // Set message length
  96. )
  97. self.writeBuffer.writeBytes(message)
  98. }
  99. }
  100. }