/* * Copyright 2024, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ internal import CGRPCZlib internal import GRPCCore internal import NIOCore enum Zlib { enum Method { case deflate case gzip fileprivate var windowBits: Int32 { switch self { case .deflate: return 15 case .gzip: return 31 } } } } extension Zlib { /// Creates a new compressor for the given compression format. /// /// This compressor is only suitable for compressing whole messages at a time. /// /// - Important: ``Compressor/end()`` must be called when the compressor is not needed /// anymore, to deallocate any resources allocated by `Zlib`. struct Compressor { // TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version. private var stream: UnsafeMutablePointer private let method: Method init(method: Method) { self.method = method self.stream = .allocate(capacity: 1) self.stream.initialize(to: z_stream()) self.stream.deflateInit(windowBits: self.method.windowBits) } /// Compresses the data in `input` into the `output` buffer. /// /// - Parameter input: The complete data to be compressed. /// - Parameter output: The `ByteBuffer` into which the compressed message should be written. /// - Returns: The number of bytes written into the `output` buffer. @discardableResult func compress(_ input: [UInt8], into output: inout ByteBuffer) throws(ZlibError) -> Int { defer { self.reset() } let upperBound = self.stream.deflateBound(inputBytes: input.count) return try self.stream.deflate(input, into: &output, upperBound: upperBound) } /// Resets compression state. private func reset() { do { try self.stream.deflateReset() } catch { self.end() self.stream.initialize(to: z_stream()) self.stream.deflateInit(windowBits: self.method.windowBits) } } /// Deallocates any resources allocated by Zlib. func end() { self.stream.deflateEnd() self.stream.deallocate() } } } extension Zlib { /// Creates a new decompressor for the given compression format. /// /// This decompressor is only suitable for compressing whole messages at a time. /// /// - Important: ``Decompressor/end()`` must be called when the compressor is not needed /// anymore, to deallocate any resources allocated by `Zlib`. struct Decompressor { // TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version. private var stream: UnsafeMutablePointer private let method: Method init(method: Method) { self.method = method self.stream = UnsafeMutablePointer.allocate(capacity: 1) self.stream.initialize(to: z_stream()) self.stream.inflateInit(windowBits: self.method.windowBits) } /// Returns the decompressed bytes from ``input``. /// /// - Parameters: /// - input: The buffer read compressed bytes from. /// - limit: The largest size a decompressed payload may be. func decompress(_ input: inout ByteBuffer, limit: Int) throws -> [UInt8] { defer { self.reset() } return try self.stream.inflate(input: &input, limit: limit) } /// Resets decompression state. private func reset() { do { try self.stream.inflateReset() } catch { self.end() self.stream.initialize(to: z_stream()) self.stream.inflateInit(windowBits: self.method.windowBits) } } /// Deallocates any resources allocated by Zlib. func end() { self.stream.inflateEnd() self.stream.deallocate() } } } struct ZlibError: Error, Hashable { /// Error code returned from Zlib. var code: Int /// Error message produced by Zlib. var message: String init(code: Int, message: String) { self.code = code self.message = message } } extension UnsafeMutablePointer { func inflateInit(windowBits: Int32) { self.pointee.zfree = nil self.pointee.zalloc = nil self.pointee.opaque = nil let rc = CGRPCZlib_inflateInit2(self, windowBits) // Possible return codes: // - Z_OK // - Z_MEM_ERROR: not enough memory // // If we can't allocate memory then we can't progress anyway so not throwing an error here is // okay. precondition(rc == Z_OK, "inflateInit2 failed with error (\(rc)) \(self.lastError ?? "")") } func inflateReset() throws { let rc = CGRPCZlib_inflateReset(self) // Possible return codes: // - Z_OK // - Z_STREAM_ERROR: the source stream state was inconsistent. switch rc { case Z_OK: () case Z_STREAM_ERROR: throw ZlibError(code: Int(rc), message: self.lastError ?? "") default: preconditionFailure("inflateReset returned unexpected code (\(rc))") } } func inflateEnd() { _ = CGRPCZlib_inflateEnd(self) } func deflateInit(windowBits: Int32) { self.pointee.zfree = nil self.pointee.zalloc = nil self.pointee.opaque = nil let rc = CGRPCZlib_deflateInit2( self, Z_DEFAULT_COMPRESSION, // compression level Z_DEFLATED, // compression method (this must be Z_DEFLATED) windowBits, // window size, i.e. deflate/gzip 8, // memory level (this is the default value in the docs) Z_DEFAULT_STRATEGY // compression strategy ) // Possible return codes: // - Z_OK // - Z_MEM_ERROR: not enough memory // - Z_STREAM_ERROR: a parameter was invalid // // If we can't allocate memory then we can't progress anyway, and we control the parameters // so not throwing an error here is okay. precondition(rc == Z_OK, "deflateInit2 failed with error (\(rc)) \(self.lastError ?? "")") } func deflateReset() throws { let rc = CGRPCZlib_deflateReset(self) // Possible return codes: // - Z_OK // - Z_STREAM_ERROR: the source stream state was inconsistent. switch rc { case Z_OK: () case Z_STREAM_ERROR: throw ZlibError(code: Int(rc), message: self.lastError ?? "") default: preconditionFailure("deflateReset returned unexpected code (\(rc))") } } func deflateEnd() { _ = CGRPCZlib_deflateEnd(self) } func deflateBound(inputBytes: Int) -> Int { let bound = CGRPCZlib_deflateBound(self, UInt(inputBytes)) return Int(bound) } func setNextInputBuffer(_ buffer: UnsafeMutableBufferPointer) { if let baseAddress = buffer.baseAddress { self.pointee.next_in = baseAddress self.pointee.avail_in = UInt32(buffer.count) } else { self.pointee.next_in = nil self.pointee.avail_in = 0 } } func setNextInputBuffer(_ buffer: UnsafeMutableRawBufferPointer?) { if let buffer = buffer, let baseAddress = buffer.baseAddress { self.pointee.next_in = CGRPCZlib_castVoidToBytefPointer(baseAddress) self.pointee.avail_in = UInt32(buffer.count) } else { self.pointee.next_in = nil self.pointee.avail_in = 0 } } func setNextOutputBuffer(_ buffer: UnsafeMutableBufferPointer) { if let baseAddress = buffer.baseAddress { self.pointee.next_out = baseAddress self.pointee.avail_out = UInt32(buffer.count) } else { self.pointee.next_out = nil self.pointee.avail_out = 0 } } func setNextOutputBuffer(_ buffer: UnsafeMutableRawBufferPointer?) { if let buffer = buffer, let baseAddress = buffer.baseAddress { self.pointee.next_out = CGRPCZlib_castVoidToBytefPointer(baseAddress) self.pointee.avail_out = UInt32(buffer.count) } else { self.pointee.next_out = nil self.pointee.avail_out = 0 } } /// Number of bytes available to read `self.nextInputBuffer`. See also: `z_stream.avail_in`. var availableInputBytes: Int { get { Int(self.pointee.avail_in) } set { self.pointee.avail_in = UInt32(newValue) } } /// The remaining writable space in `nextOutputBuffer`. See also: `z_stream.avail_out`. var availableOutputBytes: Int { get { Int(self.pointee.avail_out) } set { self.pointee.avail_out = UInt32(newValue) } } /// The total number of bytes written to the output buffer. See also: `z_stream.total_out`. var totalOutputBytes: Int { Int(self.pointee.total_out) } /// The last error message that zlib wrote. No message is guaranteed on error, however, `nil` is /// guaranteed if there is no error. See also `z_stream.msg`. var lastError: String? { self.pointee.msg.map { String(cString: $0) } } func inflate(input: inout ByteBuffer, limit: Int) throws -> [UInt8] { return try input.readWithUnsafeMutableReadableBytes { inputPointer in self.setNextInputBuffer(inputPointer) defer { self.setNextInputBuffer(nil) self.setNextOutputBuffer(nil) } // Assume the output will be twice as large as the input. var output = [UInt8](repeating: 0, count: min(inputPointer.count * 2, limit)) var offset = 0 while true { let (finished, written) = try output[offset...].withUnsafeMutableBytes { outPointer in self.setNextOutputBuffer(outPointer) let finished: Bool // Possible return codes: // - Z_OK: some progress has been made // - Z_STREAM_END: the end of the compressed data has been reached and all uncompressed // output has been produced // - Z_NEED_DICT: a preset dictionary is needed at this point // - Z_DATA_ERROR: the input data was corrupted // - Z_STREAM_ERROR: the stream structure was inconsistent // - Z_MEM_ERROR there was not enough memory // - Z_BUF_ERROR if no progress was possible or if there was not enough room in the output // buffer when Z_FINISH is used. // // Note that Z_OK is not okay here since we always flush with Z_FINISH and therefore // use Z_STREAM_END as our success criteria. let rc = CGRPCZlib_inflate(self, Z_FINISH) switch rc { case Z_STREAM_END: finished = true case Z_BUF_ERROR: finished = false default: throw RPCError( code: .internalError, message: "Decompression error", cause: ZlibError(code: Int(rc), message: self.lastError ?? "") ) } let size = outPointer.count - self.availableOutputBytes return (finished, size) } if finished { output.removeLast(output.count - self.totalOutputBytes) let bytesRead = inputPointer.count - self.availableInputBytes return (bytesRead, output) } else { offset += written let newSize = min(output.count * 2, limit) if newSize == output.count { throw RPCError(code: .resourceExhausted, message: "Message is too large to decompress.") } else { output.append(contentsOf: repeatElement(0, count: newSize - output.count)) } } } } } func deflate( _ input: [UInt8], into output: inout ByteBuffer, upperBound: Int ) throws(ZlibError) -> Int { defer { self.setNextInputBuffer(nil) self.setNextOutputBuffer(nil) } do { var input = input return try input.withUnsafeMutableBytes { input in self.setNextInputBuffer(input) return try output.writeWithUnsafeMutableBytes(minimumWritableBytes: upperBound) { output in self.setNextOutputBuffer(output) let rc = CGRPCZlib_deflate(self, Z_FINISH) // Possible return codes: // - Z_OK: some progress has been made // - Z_STREAM_END: all input has been consumed and all output has been produced (only when // flush is set to Z_FINISH) // - Z_STREAM_ERROR: the stream state was inconsistent // - Z_BUF_ERROR: no progress is possible // // The documentation notes that Z_BUF_ERROR is not fatal, and deflate() can be called again // with more input and more output space to continue compressing. However, we // call `deflateBound()` before `deflate()` which guarantees that the output size will not be // larger than the value returned by `deflateBound()` if `Z_FINISH` flush is used. As such, // the only acceptable outcome is `Z_STREAM_END`. guard rc == Z_STREAM_END else { throw ZlibError(code: Int(rc), message: self.lastError ?? "") } return output.count - self.availableOutputBytes } } } catch let error as ZlibError { throw error } catch { // Shouldn't happen as 'withUnsafeMutableBytes' and 'writeWithUnsafeMutableBytes' are // marked 'rethrows' (but don't support typed throws, yet) and the closure only throws // an 'RPCError' which is handled above. fatalError("Unexpected error of type \(type(of: error))") } } }