GRPCMessageFramer.swift 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. /// A ``GRPCMessageFramer`` helps with the framing of gRPC data frames:
  18. /// - It prepends data with the required metadata (compression flag and message length).
  19. /// - It compresses messages using the specified compression algorithm (if configured).
  20. /// - It coalesces multiple messages (appended into the `Framer` by calling ``append(_:compress:)``)
  21. /// into a single `ByteBuffer`.
  22. struct GRPCMessageFramer {
  23. /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
  24. static let metadataLength = 5
  25. /// Maximum size the `writeBuffer` can be when concatenating multiple frames.
  26. /// This limit will not be considered if only a single message/frame is written into the buffer, meaning
  27. /// frames with messages over 64KB can still be written.
  28. /// - Note: This is expressed as the power of 2 closer to 64KB (i.e., 64KiB) because `ByteBuffer`
  29. /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
  30. static let maximumWriteBufferLength = 65_536
  31. private var pendingMessages: OneOrManyQueue<PendingMessage>
  32. private struct PendingMessage {
  33. let bytes: [UInt8]
  34. let compress: Bool
  35. }
  36. private var writeBuffer: ByteBuffer
  37. init() {
  38. self.pendingMessages = OneOrManyQueue()
  39. self.writeBuffer = ByteBuffer()
  40. }
  41. /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
  42. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
  43. /// If `compress` is true, then the given bytes will be compressed using the configured compression algorithm.
  44. mutating func append(_ bytes: [UInt8], compress: Bool) {
  45. self.pendingMessages.append(PendingMessage(bytes: bytes, compress: compress))
  46. }
  47. /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
  48. /// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
  49. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
  50. mutating func next() throws -> ByteBuffer? {
  51. if self.pendingMessages.isEmpty {
  52. // Nothing pending: exit early.
  53. return nil
  54. }
  55. defer {
  56. // To avoid holding an excessively large buffer, if its size is larger than
  57. // our threshold (`maximumWriteBufferLength`), then reset it to a new `ByteBuffer`.
  58. if self.writeBuffer.capacity > Self.maximumWriteBufferLength {
  59. self.writeBuffer = ByteBuffer()
  60. }
  61. }
  62. var requiredCapacity = 0
  63. for message in self.pendingMessages {
  64. requiredCapacity += message.bytes.count + Self.metadataLength
  65. }
  66. self.writeBuffer.clear(minimumCapacity: requiredCapacity)
  67. while let message = self.pendingMessages.pop() {
  68. try self.encode(message)
  69. }
  70. return self.writeBuffer
  71. }
  72. private mutating func encode(_ message: PendingMessage) throws {
  73. if message.compress {
  74. self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag
  75. // TODO: compress message and write the compressed message length + bytes
  76. } else {
  77. self.writeBuffer.writeMultipleIntegers(
  78. UInt8(0), // Clear compression flag
  79. UInt32(message.bytes.count) // Set message length
  80. )
  81. self.writeBuffer.writeBytes(message.bytes)
  82. }
  83. }
  84. }