From 7f55892ebe0cdb9ceb74cdab1989d34a5b50b9e0 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Tue, 19 Nov 2024 01:30:37 -0500 Subject: [PATCH 01/12] Clang-tidy all compressor/constant files in streaming compression and unit test --- .../core/src/clp/TraceableException.hpp | 1 + .../clp/streaming_compression/Compressor.hpp | 41 ++- .../clp/streaming_compression/Constants.hpp | 1 - .../passthrough/Compressor.cpp | 15 +- .../passthrough/Compressor.hpp | 38 ++- .../streaming_compression/zstd/Compressor.cpp | 80 +++-- .../streaming_compression/zstd/Compressor.hpp | 40 ++- .../streaming_compression/zstd/Constants.hpp | 5 +- .../core/tests/test-StreamingCompression.cpp | 280 +++++------------- 9 files changed, 214 insertions(+), 287 deletions(-) diff --git a/components/core/src/clp/TraceableException.hpp b/components/core/src/clp/TraceableException.hpp index cd8e33f4b..f60273f93 100644 --- a/components/core/src/clp/TraceableException.hpp +++ b/components/core/src/clp/TraceableException.hpp @@ -39,6 +39,7 @@ class TraceableException : public std::exception { // Macros // Define a version of __FILE__ that's relative to the source directory #ifdef SOURCE_PATH_SIZE + // NOLINTNEXTLINE #define __FILENAME__ ((__FILE__) + SOURCE_PATH_SIZE) #else // We don't know the source path size, so just default to __FILE__ diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index 165696091..ee7846aee 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -1,14 +1,20 @@ #ifndef CLP_STREAMING_COMPRESSION_COMPRESSOR_HPP #define CLP_STREAMING_COMPRESSION_COMPRESSOR_HPP -#include -#include +#include +#include + +#include "../ErrorCode.hpp" +#include "../FileWriter.hpp" #include "../TraceableException.hpp" #include "../WriterInterface.hpp" #include "Constants.hpp" namespace clp::streaming_compression { +/** + * Generic compressor interface. + */ class Compressor : public WriterInterface { public: // Types @@ -19,7 +25,7 @@ class Compressor : public WriterInterface { : TraceableException(error_code, filename, line_number) {} // Methods - char const* what() const noexcept override { + [[nodiscard]] auto what() const noexcept -> char const* override { return "streaming_compression::Compressor operation failed"; } }; @@ -30,9 +36,12 @@ class Compressor : public WriterInterface { // Destructor virtual ~Compressor() = default; - // Explicitly disable copy and move constructor/assignment + // Explicitly disable copy constructor/assignment and enable the move version Compressor(Compressor const&) = delete; - Compressor& operator=(Compressor const&) = delete; + auto operator=(Compressor const&) -> Compressor& = delete; + + Compressor(Compressor&&) noexcept = default; + auto operator=(Compressor&&) -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -40,24 +49,36 @@ class Compressor : public WriterInterface { * @param pos * @return ErrorCode_Unsupported */ - ErrorCode try_seek_from_begin(size_t pos) override { return ErrorCode_Unsupported; } + [[nodiscard]] auto try_seek_from_begin([[maybe_unused]] size_t pos) -> ErrorCode override { + return ErrorCode_Unsupported; + } /** * Unsupported operation * @param pos * @return ErrorCode_Unsupported */ - ErrorCode try_seek_from_current(off_t offset) override { return ErrorCode_Unsupported; } + [[nodiscard]] auto try_seek_from_current([[maybe_unused]] off_t offset) -> ErrorCode override { + return ErrorCode_Unsupported; + } // Methods /** * Closes the compression stream */ - virtual void close() = 0; + virtual auto close() -> void = 0; + + /** + * Initializes the compression stream with the given compression level + * @param file_writer + * @param compression_level + */ + virtual auto open(FileWriter& file_writer, [[maybe_unused]] int compression_level = 0) -> void + = 0; -protected: +private: // Variables - CompressorType m_type; + CompressorType m_type{}; }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp index 4649c2e98..6c6a78bfc 100644 --- a/components/core/src/clp/streaming_compression/Constants.hpp +++ b/components/core/src/clp/streaming_compression/Constants.hpp @@ -1,7 +1,6 @@ #ifndef CLP_STREAMING_COMPRESSION_CONSTANTS_HPP #define CLP_STREAMING_COMPRESSION_CONSTANTS_HPP -#include #include namespace clp::streaming_compression { diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 750ab48c1..112bf7d39 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -1,9 +1,12 @@ #include "Compressor.hpp" -#include "../../Defs.h" +#include + +#include "../../ErrorCode.hpp" +#include "../../TraceableException.hpp" namespace clp::streaming_compression::passthrough { -void Compressor::write(char const* data, size_t const data_length) { +auto Compressor::write(char const* data, size_t const data_length) -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } @@ -19,7 +22,7 @@ void Compressor::write(char const* data, size_t const data_length) { m_compressed_stream_file_writer->write(data, data_length); } -void Compressor::flush() { +auto Compressor::flush() -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } @@ -27,7 +30,7 @@ void Compressor::flush() { m_compressed_stream_file_writer->flush(); } -ErrorCode Compressor::try_get_pos(size_t& pos) const { +auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode { if (nullptr == m_compressed_stream_file_writer) { return ErrorCode_NotInit; } @@ -35,11 +38,11 @@ ErrorCode Compressor::try_get_pos(size_t& pos) const { return m_compressed_stream_file_writer->try_get_pos(pos); } -void Compressor::close() { +auto Compressor::close() -> void { m_compressed_stream_file_writer = nullptr; } -void Compressor::open(FileWriter& file_writer) { +auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int compression_level) -> void { m_compressed_stream_file_writer = &file_writer; } } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index b3735bd1e..a24024666 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -1,9 +1,13 @@ #ifndef CLP_STREAMING_COMPRESSION_PASSTHROUGH_COMPRESSOR_HPP #define CLP_STREAMING_COMPRESSION_PASSTHROUGH_COMPRESSOR_HPP +#include + +#include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" #include "../Compressor.hpp" +#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { /** @@ -19,19 +23,23 @@ class Compressor : public ::clp::streaming_compression::Compressor { : TraceableException(error_code, filename, line_number) {} // Methods - char const* what() const noexcept override { + [[nodiscard]] auto what() const noexcept -> char const* override { return "streaming_compression::passthrough::Compressor operation failed"; } }; // Constructors - Compressor() - : ::clp::streaming_compression::Compressor(CompressorType::Passthrough), - m_compressed_stream_file_writer(nullptr) {} + Compressor() : ::clp::streaming_compression::Compressor{CompressorType::Passthrough} {} + + // Destructor + ~Compressor() override = default; - // Explicitly disable copy and move constructor/assignment + // Explicitly disable copy constructor/assignment and enable the move version Compressor(Compressor const&) = delete; - Compressor& operator=(Compressor const&) = delete; + auto operator=(Compressor const&) -> Compressor& = delete; + + Compressor(Compressor&&) noexcept = default; + auto operator=(Compressor&&) -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -39,35 +47,37 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param data * @param data_length */ - void write(char const* data, size_t data_length) override; + auto write(char const* data, size_t data_length) -> void override; + /** * Flushes any buffered data */ - void flush() override; + auto flush() -> void override; + /** * Tries to get the current position of the write head * @param pos Position of the write head * @return ErrorCode_NotInit if the compressor is not open * @return Same as FileWriter::try_get_pos */ - ErrorCode try_get_pos(size_t& pos) const override; + [[nodiscard]] auto try_get_pos(size_t& pos) const -> ErrorCode override; // Methods implementing the Compressor interface /** * Closes the compressor */ - void close() override; + auto close() -> void override; - // Methods /** - * Initializes the compressor + * Initializes the compression stream * @param file_writer + * @param compression_level */ - void open(FileWriter& file_writer); + auto open(FileWriter& file_writer, [[maybe_unused]] int compression_level = 0) -> void override; private: // Variables - FileWriter* m_compressed_stream_file_writer; + FileWriter* m_compressed_stream_file_writer{nullptr}; }; } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index ebbf9b574..8adc47aae 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -1,14 +1,25 @@ #include "Compressor.hpp" -#include "../../Defs.h" -#include "../../spdlog_with_specializations.hpp" +#include +#include + +#include +#include + +#include "../../ErrorCode.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "../Constants.hpp" namespace clp::streaming_compression::zstd { Compressor::Compressor() - : ::clp::streaming_compression::Compressor(CompressorType::ZSTD), - m_compression_stream_contains_data(false), - m_compressed_stream_file_writer(nullptr) { - m_compression_stream = ZSTD_createCStream(); + : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, + m_compressed_stream_block{}, + m_compressed_stream_file_writer{nullptr}, + m_compression_stream_contains_data{false}, + m_compression_stream{ZSTD_createCStream()}, + m_uncompressed_stream_pos{0} { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -19,20 +30,20 @@ Compressor::~Compressor() { ZSTD_freeCStream(m_compression_stream); } -void Compressor::open(FileWriter& file_writer, int const compression_level) { +auto Compressor::open(FileWriter& file_writer, int const compression_level) -> void { if (nullptr != m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } // Setup compressed stream parameters - size_t compressed_stream_block_size = ZSTD_CStreamOutSize(); - m_compressed_stream_block_buffer = std::make_unique(compressed_stream_block_size); - m_compressed_stream_block.dst = m_compressed_stream_block_buffer.get(); + auto const compressed_stream_block_size{ZSTD_CStreamOutSize()}; + m_compressed_stream_block_buffer.reserve(compressed_stream_block_size); + m_compressed_stream_block.dst = m_compressed_stream_block_buffer.data(); m_compressed_stream_block.size = compressed_stream_block_size; // Setup compression stream - auto init_result = ZSTD_initCStream(m_compression_stream, compression_level); - if (ZSTD_isError(init_result)) { + auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; + if (zstd_is_error(init_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}", ZSTD_getErrorName(init_result) @@ -45,7 +56,7 @@ void Compressor::open(FileWriter& file_writer, int const compression_level) { m_uncompressed_stream_pos = 0; } -void Compressor::close() { +auto Compressor::close() -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } @@ -54,7 +65,7 @@ void Compressor::close() { m_compressed_stream_file_writer = nullptr; } -void Compressor::write(char const* data, size_t data_length) { +auto Compressor::write(char const* data, size_t data_length) -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } @@ -70,23 +81,23 @@ void Compressor::write(char const* data, size_t data_length) { ZSTD_inBuffer uncompressed_stream_block = {data, data_length, 0}; while (uncompressed_stream_block.pos < uncompressed_stream_block.size) { m_compressed_stream_block.pos = 0; - auto error = ZSTD_compressStream( + auto const compress_result{ZSTD_compressStream( m_compression_stream, &m_compressed_stream_block, &uncompressed_stream_block - ); - if (ZSTD_isError(error)) { + )}; + if (zstd_is_error(compress_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}", - ZSTD_getErrorName(error) + ZSTD_getErrorName(compress_result) ); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); } - if (m_compressed_stream_block.pos) { + if (m_compressed_stream_block.pos > 0) { // Write to disk only if there is data in the compressed stream // block buffer m_compressed_stream_file_writer->write( - reinterpret_cast(m_compressed_stream_block.dst), + static_cast(m_compressed_stream_block.dst), m_compressed_stream_block.pos ); } @@ -96,14 +107,14 @@ void Compressor::write(char const* data, size_t data_length) { m_uncompressed_stream_pos += data_length; } -void Compressor::flush() { +auto Compressor::flush() -> void { if (false == m_compression_stream_contains_data) { return; } m_compressed_stream_block.pos = 0; - auto end_stream_result = ZSTD_endStream(m_compression_stream, &m_compressed_stream_block); - if (end_stream_result) { + auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; + if (end_stream_result > 0) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( @@ -113,14 +124,14 @@ void Compressor::flush() { throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); } m_compressed_stream_file_writer->write( - reinterpret_cast(m_compressed_stream_block.dst), + static_cast(m_compressed_stream_block.dst), m_compressed_stream_block.pos ); m_compression_stream_contains_data = false; } -ErrorCode Compressor::try_get_pos(size_t& pos) const { +auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode { if (nullptr == m_compressed_stream_file_writer) { return ErrorCode_NotInit; } @@ -129,30 +140,35 @@ ErrorCode Compressor::try_get_pos(size_t& pos) const { return ErrorCode_Success; } -void Compressor::flush_without_ending_frame() { +auto Compressor::flush_without_ending_frame() -> void { if (false == m_compression_stream_contains_data) { return; } while (true) { m_compressed_stream_block.pos = 0; - auto result = ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block); - if (ZSTD_isError(result)) { + auto flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; + if (zstd_is_error(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", - ZSTD_getErrorName(result) + ZSTD_getErrorName(flush_result) ); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); } - if (m_compressed_stream_block.pos) { + if (m_compressed_stream_block.pos > 0) { m_compressed_stream_file_writer->write( - reinterpret_cast(m_compressed_stream_block.dst), + static_cast(m_compressed_stream_block.dst), m_compressed_stream_block.pos ); } - if (0 == result) { + if (0 == flush_result) { break; } } } + +auto Compressor::zstd_is_error(size_t size_t_function_result) -> bool { + return ZSTD_isError(size_t_function_result) != 0 + && ZSTD_error_no_error != ZSTD_getErrorCode(size_t_function_result); +} } // namespace clp::streaming_compression::zstd diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 75971dfa8..de41709bc 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -1,12 +1,12 @@ #ifndef CLP_STREAMING_COMPRESSION_ZSTD_COMPRESSOR_HPP #define CLP_STREAMING_COMPRESSION_ZSTD_COMPRESSOR_HPP -#include -#include +#include +#include #include -#include +#include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" #include "../Compressor.hpp" @@ -23,7 +23,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { : TraceableException(error_code, filename, line_number) {} // Methods - char const* what() const noexcept override { + [[nodiscard]] auto what() const noexcept -> char const* override { return "streaming_compression::zstd::Compressor operation failed"; } }; @@ -32,11 +32,14 @@ class Compressor : public ::clp::streaming_compression::Compressor { Compressor(); // Destructor - ~Compressor(); + ~Compressor() override; - // Explicitly disable copy and move constructor/assignment + // Explicitly disable copy constructor/assignment and enable the move version Compressor(Compressor const&) = delete; - Compressor& operator=(Compressor const&) = delete; + auto operator=(Compressor const&) -> Compressor& = delete; + + Compressor(Compressor&&) noexcept = default; + auto operator=(Compressor&&) -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -44,11 +47,11 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param data * @param data_length */ - void write(char const* data, size_t data_length) override; + auto write(char const* data, size_t data_length) -> void override; /** * Writes any internally buffered data to file and ends the current frame */ - void flush() override; + auto flush() -> void override; /** * Tries to get the current position of the write head @@ -56,26 +59,26 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @return ErrorCode_NotInit if the compressor is not open * @return ErrorCode_Success on success */ - ErrorCode try_get_pos(size_t& pos) const override; + [[nodiscard]] auto try_get_pos(size_t& pos) const -> ErrorCode override; // Methods implementing the Compressor interface /** * Closes the compressor */ - void close() override; + auto close() -> void override; - // Methods /** - * Initialize streaming compressor + * Initializes the compression stream with the given compression level * @param file_writer * @param compression_level */ - void open(FileWriter& file_writer, int compression_level = cDefaultCompressionLevel); + auto open(FileWriter& file_writer, int compression_level = cDefaultCompressionLevel) + -> void override; /** * Flushes the stream without ending the current frame */ - void flush_without_ending_frame(); + auto flush_without_ending_frame() -> void; private: // Variables @@ -86,9 +89,14 @@ class Compressor : public ::clp::streaming_compression::Compressor { bool m_compression_stream_contains_data; ZSTD_outBuffer m_compressed_stream_block; - std::unique_ptr m_compressed_stream_block_buffer; + std::vector m_compressed_stream_block_buffer; size_t m_uncompressed_stream_pos; + + /** + * Tells if a `size_t` ZStd function result is an error code and is not `ZSTD_error_no_error` + */ + [[nodiscard]] static auto zstd_is_error(size_t size_t_function_result) -> bool; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/src/clp/streaming_compression/zstd/Constants.hpp b/components/core/src/clp/streaming_compression/zstd/Constants.hpp index a0e57e3e1..153478377 100644 --- a/components/core/src/clp/streaming_compression/zstd/Constants.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Constants.hpp @@ -1,11 +1,8 @@ #ifndef CLP_STREAMING_COMPRESSION_ZSTD_CONSTANTS_HPP #define CLP_STREAMING_COMPRESSION_ZSTD_CONSTANTS_HPP -#include -#include - namespace clp::streaming_compression::zstd { -constexpr int cDefaultCompressionLevel = 3; +constexpr int cDefaultCompressionLevel{3}; } // namespace clp::streaming_compression::zstd #endif // CLP_STREAMING_COMPRESSION_ZSTD_CONSTANTS_HPP diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 747a38a05..2849f20a9 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -1,11 +1,18 @@ +#include #include +#include #include +#include -#include +#include #include #include +#include "../src/clp/ErrorCode.hpp" +#include "../src/clp/FileWriter.hpp" #include "../src/clp/ReadOnlyMemoryMappedFile.hpp" +#include "../src/clp/streaming_compression/Compressor.hpp" +#include "../src/clp/streaming_compression/Decompressor.hpp" #include "../src/clp/streaming_compression/passthrough/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Decompressor.hpp" #include "../src/clp/streaming_compression/zstd/Compressor.hpp" @@ -13,218 +20,83 @@ using clp::ErrorCode_Success; using clp::FileWriter; +using clp::streaming_compression::Compressor; +using clp::streaming_compression::Decompressor; + +namespace { +constexpr size_t cUncompressedDataSize{128L * 1024 * 1024}; // 128MB +constexpr auto cCompressionChunkSizes = std::to_array( + {cUncompressedDataSize / 100, + cUncompressedDataSize / 50, + cUncompressedDataSize / 25, + cUncompressedDataSize / 10, + cUncompressedDataSize / 5, + cUncompressedDataSize / 2, + cUncompressedDataSize} +); +constexpr size_t cUncompressedDataPatternPeriod = 26; // lower-case alphabet +} // namespace TEST_CASE("StreamingCompression", "[StreamingCompression]") { - // Initialize data to test compression and decompression - size_t uncompressed_data_size = 128L * 1024 * 1024; // 128MB - char* uncompressed_data = new char[uncompressed_data_size]; - for (size_t i = 0; i < uncompressed_data_size; ++i) { - uncompressed_data[i] = (char)('a' + (i % 26)); + std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; + std::vector compression_chunk_sizes{ + cCompressionChunkSizes.begin(), + cCompressionChunkSizes.end() + }; + std::unique_ptr compressor{}; + std::unique_ptr decompressor{}; + + SECTION("Initiate zstd single phase compression") { + compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize()); + compressor = std::make_unique(); + decompressor = std::make_unique(); } - // Create output buffer - char* decompressed_data = new char[uncompressed_data_size]; - - SECTION("zstd single phase compression") { - // Clear output buffer - memset(decompressed_data, 0, uncompressed_data_size); - std::string compressed_file_path = "compressed_file.zstd.bin.1"; - - // Compress - FileWriter file_writer; - file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); - clp::streaming_compression::zstd::Compressor compressor; - compressor.open(file_writer); - compressor.write(uncompressed_data, ZSTD_CStreamInSize()); - compressor.write(uncompressed_data, uncompressed_data_size / 100); - compressor.write(uncompressed_data, uncompressed_data_size / 50); - compressor.write(uncompressed_data, uncompressed_data_size / 25); - compressor.write(uncompressed_data, uncompressed_data_size / 10); - compressor.write(uncompressed_data, uncompressed_data_size / 5); - compressor.write(uncompressed_data, uncompressed_data_size / 2); - compressor.write(uncompressed_data, uncompressed_data_size); - compressor.close(); - file_writer.close(); - - // Decompress - clp::streaming_compression::zstd::Decompressor decompressor; - REQUIRE(ErrorCode_Success == decompressor.open(compressed_file_path)); - size_t uncompressed_bytes = 0; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - ZSTD_CStreamInSize() - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, ZSTD_CStreamInSize()) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += ZSTD_CStreamInSize(); - - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 100 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 100) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 100; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 50 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 50) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 50; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 25 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 25) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 25; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 10 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 10) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 10; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 5 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 5) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 5; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 2 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 2) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 2; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size; - - // Cleanup - boost::filesystem::remove(compressed_file_path); + SECTION("Initiate passthrough compression") { + compressor = std::make_unique(); + decompressor = std::make_unique(); } - SECTION("passthrough compression") { - // Clear output buffer - memset(decompressed_data, 0, uncompressed_data_size); - std::string compressed_file_path = "compressed_file.passthrough.bin"; - - // Compress - FileWriter file_writer; - file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); - clp::streaming_compression::passthrough::Compressor compressor; - compressor.open(file_writer); - compressor.write(uncompressed_data, uncompressed_data_size / 100); - compressor.write(uncompressed_data, uncompressed_data_size / 50); - compressor.write(uncompressed_data, uncompressed_data_size / 25); - compressor.write(uncompressed_data, uncompressed_data_size / 10); - compressor.write(uncompressed_data, uncompressed_data_size / 5); - compressor.write(uncompressed_data, uncompressed_data_size / 2); - compressor.write(uncompressed_data, uncompressed_data_size); - compressor.close(); - file_writer.close(); - - // Decompress - // Memory map compressed file - clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{compressed_file_path}; - clp::streaming_compression::passthrough::Decompressor decompressor; - auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; - decompressor.open(compressed_file_view.data(), compressed_file_view.size()); + // Initialize buffers + std::vector uncompressed_buffer{}; + uncompressed_buffer.reserve(cUncompressedDataSize); + for (size_t i{0}; i < cUncompressedDataSize; ++i) { + uncompressed_buffer.push_back((char)('a' + (i % cUncompressedDataPatternPeriod))); + } - size_t uncompressed_bytes = 0; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 100 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 100) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 100; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 50 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 50) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 50; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 25 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 25) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 25; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 10 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 10) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 10; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 5 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 5) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 5; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size / 2 - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size / 2) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size / 2; - REQUIRE(ErrorCode_Success - == decompressor.get_decompressed_stream_region( - uncompressed_bytes, - decompressed_data, - uncompressed_data_size - )); - REQUIRE(memcmp(uncompressed_data, decompressed_data, uncompressed_data_size) == 0); - memset(decompressed_data, 0, uncompressed_data_size); - uncompressed_bytes += uncompressed_data_size; + std::vector decompressed_buffer{}; + decompressed_buffer.reserve(cUncompressedDataSize); - // Cleanup - boost::filesystem::remove(compressed_file_path); + // Compress + FileWriter file_writer; + file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); + compressor->open(file_writer); + for (auto const chunk_size : compression_chunk_sizes) { + compressor->write(uncompressed_buffer.data(), chunk_size); + } + compressor->close(); + file_writer.close(); + + // Decompress and compare + clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{compressed_file_path}; + auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; + decompressor->open(compressed_file_view.data(), compressed_file_view.size()); + + size_t uncompressed_bytes{0}; + for (auto chunk_size : compression_chunk_sizes) { + memset(decompressed_buffer.data(), 0, cUncompressedDataSize); + REQUIRE( + (ErrorCode_Success + == decompressor->get_decompressed_stream_region( + uncompressed_bytes, + decompressed_buffer.data(), + chunk_size + )) + ); + REQUIRE((memcmp(uncompressed_buffer.data(), decompressed_buffer.data(), chunk_size) == 0)); + uncompressed_bytes += chunk_size; } // Cleanup - delete[] uncompressed_data; - delete[] decompressed_data; + boost::filesystem::remove(compressed_file_path); } From 100f23037816b835e64b53e6375e2d4c7e6931a2 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Tue, 19 Nov 2024 02:12:32 -0500 Subject: [PATCH 02/12] Nit pick fixes --- .../clp/streaming_compression/Compressor.hpp | 18 ++++++++----- .../passthrough/Compressor.cpp | 3 ++- .../passthrough/Compressor.hpp | 7 ++--- .../streaming_compression/zstd/Compressor.cpp | 10 +++---- .../streaming_compression/zstd/Compressor.hpp | 27 ++++++++++++------- .../core/tests/test-StreamingCompression.cpp | 8 +++--- 6 files changed, 43 insertions(+), 30 deletions(-) diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index ee7846aee..602a16afc 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -22,7 +22,7 @@ class Compressor : public WriterInterface { public: // Constructors OperationFailed(ErrorCode error_code, char const* const filename, int line_number) - : TraceableException(error_code, filename, line_number) {} + : TraceableException{error_code, filename, line_number} {} // Methods [[nodiscard]] auto what() const noexcept -> char const* override { @@ -31,7 +31,7 @@ class Compressor : public WriterInterface { }; // Constructor - explicit Compressor(CompressorType type) : m_type(type) {} + explicit Compressor(CompressorType type) : m_type{type} {} // Destructor virtual ~Compressor() = default; @@ -41,7 +41,7 @@ class Compressor : public WriterInterface { auto operator=(Compressor const&) -> Compressor& = delete; Compressor(Compressor&&) noexcept = default; - auto operator=(Compressor&&) -> Compressor& = default; + auto operator=(Compressor&&) noexcept -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -68,17 +68,23 @@ class Compressor : public WriterInterface { */ virtual auto close() -> void = 0; + /** + * Initializes the compression stream + * @param file_writer + */ + virtual auto open(FileWriter& file_writer) -> void { open(file_writer, 0); } + /** * Initializes the compression stream with the given compression level * @param file_writer * @param compression_level */ - virtual auto open(FileWriter& file_writer, [[maybe_unused]] int compression_level = 0) -> void - = 0; + virtual auto open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void + = 0; private: // Variables - CompressorType m_type{}; + CompressorType m_type; }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 112bf7d39..5f1914a3b 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -42,7 +42,8 @@ auto Compressor::close() -> void { m_compressed_stream_file_writer = nullptr; } -auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int compression_level) -> void { +auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) + -> void { m_compressed_stream_file_writer = &file_writer; } } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index a24024666..8674e8937 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -20,7 +20,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { public: // Constructors OperationFailed(ErrorCode error_code, char const* const filename, int line_number) - : TraceableException(error_code, filename, line_number) {} + : TraceableException{error_code, filename, line_number} {} // Methods [[nodiscard]] auto what() const noexcept -> char const* override { @@ -39,7 +39,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { auto operator=(Compressor const&) -> Compressor& = delete; Compressor(Compressor&&) noexcept = default; - auto operator=(Compressor&&) -> Compressor& = default; + auto operator=(Compressor&&) noexcept -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -73,7 +73,8 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param file_writer * @param compression_level */ - auto open(FileWriter& file_writer, [[maybe_unused]] int compression_level = 0) -> void override; + auto + open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void override; private: // Variables diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 8adc47aae..c75177201 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -15,11 +15,7 @@ namespace clp::streaming_compression::zstd { Compressor::Compressor() : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, - m_compressed_stream_block{}, - m_compressed_stream_file_writer{nullptr}, - m_compression_stream_contains_data{false}, - m_compression_stream{ZSTD_createCStream()}, - m_uncompressed_stream_pos{0} { + m_compression_stream{ZSTD_createCStream()} { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -37,7 +33,7 @@ auto Compressor::open(FileWriter& file_writer, int const compression_level) -> v // Setup compressed stream parameters auto const compressed_stream_block_size{ZSTD_CStreamOutSize()}; - m_compressed_stream_block_buffer.reserve(compressed_stream_block_size); + m_compressed_stream_block_buffer.resize(compressed_stream_block_size); m_compressed_stream_block.dst = m_compressed_stream_block_buffer.data(); m_compressed_stream_block.size = compressed_stream_block_size; @@ -147,7 +143,7 @@ auto Compressor::flush_without_ending_frame() -> void { while (true) { m_compressed_stream_block.pos = 0; - auto flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; + auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; if (zstd_is_error(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index de41709bc..0341c8bbf 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -20,11 +20,12 @@ class Compressor : public ::clp::streaming_compression::Compressor { public: // Constructors OperationFailed(ErrorCode error_code, char const* const filename, int line_number) - : TraceableException(error_code, filename, line_number) {} + : TraceableException{error_code, filename, line_number} {} // Methods [[nodiscard]] auto what() const noexcept -> char const* override { - return "streaming_compression::zstd::Compressor operation failed"; + return "streaming_compression::zstd::Compressor " + "operation failed"; } }; @@ -39,7 +40,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { auto operator=(Compressor const&) -> Compressor& = delete; Compressor(Compressor&&) noexcept = default; - auto operator=(Compressor&&) -> Compressor& = default; + auto operator=(Compressor&&) noexcept -> Compressor& = default; // Methods implementing the WriterInterface /** @@ -48,6 +49,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param data_length */ auto write(char const* data, size_t data_length) -> void override; + /** * Writes any internally buffered data to file and ends the current frame */ @@ -67,13 +69,20 @@ class Compressor : public ::clp::streaming_compression::Compressor { */ auto close() -> void override; + /** + * Initializes the compression stream with the default compression level + * @param file_writer + */ + auto open(FileWriter& file_writer) -> void override { + this->open(file_writer, cDefaultCompressionLevel); + } + /** * Initializes the compression stream with the given compression level * @param file_writer * @param compression_level */ - auto open(FileWriter& file_writer, int compression_level = cDefaultCompressionLevel) - -> void override; + auto open(FileWriter& file_writer, int const compression_level) -> void override; /** * Flushes the stream without ending the current frame @@ -82,16 +91,16 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer; + FileWriter* m_compressed_stream_file_writer{nullptr}; // Compressed stream variables ZSTD_CStream* m_compression_stream; - bool m_compression_stream_contains_data; + bool m_compression_stream_contains_data{false}; - ZSTD_outBuffer m_compressed_stream_block; + ZSTD_outBuffer m_compressed_stream_block{}; std::vector m_compressed_stream_block_buffer; - size_t m_uncompressed_stream_pos; + size_t m_uncompressed_stream_pos{0}; /** * Tells if a `size_t` ZStd function result is an error code and is not `ZSTD_error_no_error` diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 2849f20a9..63d0fb4d8 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -59,13 +59,13 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { // Initialize buffers std::vector uncompressed_buffer{}; - uncompressed_buffer.reserve(cUncompressedDataSize); + uncompressed_buffer.resize(cUncompressedDataSize); for (size_t i{0}; i < cUncompressedDataSize; ++i) { - uncompressed_buffer.push_back((char)('a' + (i % cUncompressedDataPatternPeriod))); + uncompressed_buffer.at(i) = ((char)('a' + (i % cUncompressedDataPatternPeriod))); } std::vector decompressed_buffer{}; - decompressed_buffer.reserve(cUncompressedDataSize); + decompressed_buffer.resize(cUncompressedDataSize); // Compress FileWriter file_writer; @@ -83,7 +83,7 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { decompressor->open(compressed_file_view.data(), compressed_file_view.size()); size_t uncompressed_bytes{0}; - for (auto chunk_size : compression_chunk_sizes) { + for (auto const chunk_size : compression_chunk_sizes) { memset(decompressed_buffer.data(), 0, cUncompressedDataSize); REQUIRE( (ErrorCode_Success From 719f9eee023b59ac83ccccaec5c9a3a933c03b92 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Tue, 19 Nov 2024 02:26:50 -0500 Subject: [PATCH 03/12] Logic typo --- .../core/src/clp/streaming_compression/zstd/Compressor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index c75177201..723244b3a 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -110,7 +110,7 @@ auto Compressor::flush() -> void { m_compressed_stream_block.pos = 0; auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; - if (end_stream_result > 0) { + if (zstd_is_error(end_stream_result)) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( From 1c66a07dab08f9dc61311521202b7475b059b987 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 02:39:18 -0500 Subject: [PATCH 04/12] Adress review comments --- .../clp/streaming_compression/Compressor.hpp | 15 ++---- .../passthrough/Compressor.cpp | 3 +- .../passthrough/Compressor.hpp | 7 ++- .../streaming_compression/zstd/Compressor.cpp | 35 ++++++++---- .../streaming_compression/zstd/Compressor.hpp | 10 ++-- .../core/tests/test-StreamingCompression.cpp | 53 ++++++++++--------- 6 files changed, 64 insertions(+), 59 deletions(-) diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index 602a16afc..4963fbc13 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -36,10 +36,11 @@ class Compressor : public WriterInterface { // Destructor virtual ~Compressor() = default; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -64,7 +65,7 @@ class Compressor : public WriterInterface { // Methods /** - * Closes the compression stream + * Closes the compressor */ virtual auto close() -> void = 0; @@ -72,15 +73,7 @@ class Compressor : public WriterInterface { * Initializes the compression stream * @param file_writer */ - virtual auto open(FileWriter& file_writer) -> void { open(file_writer, 0); } - - /** - * Initializes the compression stream with the given compression level - * @param file_writer - * @param compression_level - */ - virtual auto open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void - = 0; + virtual auto open(FileWriter& file_writer) -> void = 0; private: // Variables diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 5f1914a3b..d4ab89dbe 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -42,8 +42,7 @@ auto Compressor::close() -> void { m_compressed_stream_file_writer = nullptr; } -auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) - -> void { +auto Compressor::open(FileWriter& file_writer) -> void { m_compressed_stream_file_writer = &file_writer; } } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index 8674e8937..a94086196 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -34,10 +34,11 @@ class Compressor : public ::clp::streaming_compression::Compressor { // Destructor ~Compressor() override = default; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -71,10 +72,8 @@ class Compressor : public ::clp::streaming_compression::Compressor { /** * Initializes the compression stream * @param file_writer - * @param compression_level */ - auto - open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void override; + auto open(FileWriter& file_writer) -> void override; private: // Variables diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 723244b3a..f842ea278 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -12,6 +12,26 @@ #include "../Compressor.hpp" #include "../Constants.hpp" +namespace { +/** + * Checks if a value returned by ZStd function indicates an error code. + * + * For most ZStd functions that return `size_t` results, instead of returning a union type that can + * either be a valid result or an error code, an unanimous `size_t` type is returned. + * Usually, if the return value exceeds the maximum possible value of valid results, it is treated + * as an error code. However, the exact behavior is function-dependent, so ZStd provides: + * 1. A value checking function `ZSTD_isError` + * 2. A size_t <-> error_code_enum mapping function `ZSTD_getErrorCode`. + * See also: https://facebook.github.io/zstd/zstd_manual.html + * + * @param result A `size_t` type result returned by ZStd functions + * @return Whether the result is an error code and indicates an error has occurred + */ +auto is_error(size_t result) -> bool { + return 0 != ZSTD_isError(result) && ZSTD_error_no_error != ZSTD_getErrorCode(result); +} +} // namespace + namespace clp::streaming_compression::zstd { Compressor::Compressor() : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, @@ -26,7 +46,7 @@ Compressor::~Compressor() { ZSTD_freeCStream(m_compression_stream); } -auto Compressor::open(FileWriter& file_writer, int const compression_level) -> void { +auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { if (nullptr != m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } @@ -39,7 +59,7 @@ auto Compressor::open(FileWriter& file_writer, int const compression_level) -> v // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; - if (zstd_is_error(init_result)) { + if (is_error(init_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}", ZSTD_getErrorName(init_result) @@ -82,7 +102,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void { &m_compressed_stream_block, &uncompressed_stream_block )}; - if (zstd_is_error(compress_result)) { + if (is_error(compress_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}", ZSTD_getErrorName(compress_result) @@ -110,7 +130,7 @@ auto Compressor::flush() -> void { m_compressed_stream_block.pos = 0; auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; - if (zstd_is_error(end_stream_result)) { + if (is_error(end_stream_result)) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( @@ -144,7 +164,7 @@ auto Compressor::flush_without_ending_frame() -> void { while (true) { m_compressed_stream_block.pos = 0; auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; - if (zstd_is_error(flush_result)) { + if (is_error(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", ZSTD_getErrorName(flush_result) @@ -162,9 +182,4 @@ auto Compressor::flush_without_ending_frame() -> void { } } } - -auto Compressor::zstd_is_error(size_t size_t_function_result) -> bool { - return ZSTD_isError(size_t_function_result) != 0 - && ZSTD_error_no_error != ZSTD_getErrorCode(size_t_function_result); -} } // namespace clp::streaming_compression::zstd diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 0341c8bbf..32c0f2338 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -35,10 +35,11 @@ class Compressor : public ::clp::streaming_compression::Compressor { // Destructor ~Compressor() override; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -82,7 +83,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param file_writer * @param compression_level */ - auto open(FileWriter& file_writer, int const compression_level) -> void override; + auto open(FileWriter& file_writer, int compression_level) -> void; /** * Flushes the stream without ending the current frame @@ -101,11 +102,6 @@ class Compressor : public ::clp::streaming_compression::Compressor { std::vector m_compressed_stream_block_buffer; size_t m_uncompressed_stream_pos{0}; - - /** - * Tells if a `size_t` ZStd function result is an error code and is not `ZSTD_error_no_error` - */ - [[nodiscard]] static auto zstd_is_error(size_t size_t_function_result) -> bool; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 63d0fb4d8..cbcb12364 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -23,28 +23,26 @@ using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; -namespace { -constexpr size_t cUncompressedDataSize{128L * 1024 * 1024}; // 128MB -constexpr auto cCompressionChunkSizes = std::to_array( - {cUncompressedDataSize / 100, - cUncompressedDataSize / 50, - cUncompressedDataSize / 25, - cUncompressedDataSize / 10, - cUncompressedDataSize / 5, - cUncompressedDataSize / 2, - cUncompressedDataSize} -); -constexpr size_t cUncompressedDataPatternPeriod = 26; // lower-case alphabet -} // namespace - TEST_CASE("StreamingCompression", "[StreamingCompression]") { + constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB + constexpr auto cCompressionChunkSizes = std::to_array( + {cBufferSize / 100, + cBufferSize / 50, + cBufferSize / 25, + cBufferSize / 10, + cBufferSize / 5, + cBufferSize / 2, + cBufferSize} + ); + constexpr size_t cAlphabetLength = 26; + std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; std::vector compression_chunk_sizes{ cCompressionChunkSizes.begin(), cCompressionChunkSizes.end() }; - std::unique_ptr compressor{}; - std::unique_ptr decompressor{}; + std::unique_ptr compressor; + std::unique_ptr decompressor; SECTION("Initiate zstd single phase compression") { compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize()); @@ -59,13 +57,13 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { // Initialize buffers std::vector uncompressed_buffer{}; - uncompressed_buffer.resize(cUncompressedDataSize); - for (size_t i{0}; i < cUncompressedDataSize; ++i) { - uncompressed_buffer.at(i) = ((char)('a' + (i % cUncompressedDataPatternPeriod))); + uncompressed_buffer.resize(cBufferSize); + for (size_t i{0}; i < cBufferSize; ++i) { + uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); } std::vector decompressed_buffer{}; - decompressed_buffer.resize(cUncompressedDataSize); + decompressed_buffer.resize(cBufferSize); // Compress FileWriter file_writer; @@ -82,19 +80,24 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; decompressor->open(compressed_file_view.data(), compressed_file_view.size()); - size_t uncompressed_bytes{0}; + size_t num_uncompressed_bytes{0}; for (auto const chunk_size : compression_chunk_sizes) { - memset(decompressed_buffer.data(), 0, cUncompressedDataSize); + // Clear the buffer to ensure that we are not comparing values from a previous test + std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); REQUIRE( (ErrorCode_Success == decompressor->get_decompressed_stream_region( - uncompressed_bytes, + num_uncompressed_bytes, decompressed_buffer.data(), chunk_size )) ); - REQUIRE((memcmp(uncompressed_buffer.data(), decompressed_buffer.data(), chunk_size) == 0)); - uncompressed_bytes += chunk_size; + REQUIRE(std::equal( + uncompressed_buffer.begin(), + uncompressed_buffer.begin() + chunk_size, + decompressed_buffer.begin() + )); + num_uncompressed_bytes += chunk_size; } // Cleanup From 7322687e017c25e390cbfacfe885a438e28c3926 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 13:41:36 -0500 Subject: [PATCH 05/12] Use clp::Array for runtime fix-sized arrays --- .../passthrough/Compressor.cpp | 6 ++++ .../passthrough/Compressor.hpp | 5 ++-- .../streaming_compression/zstd/Compressor.cpp | 20 ++++++++----- .../streaming_compression/zstd/Compressor.hpp | 13 +++++---- .../core/tests/test-StreamingCompression.cpp | 29 +++++++++---------- 5 files changed, 40 insertions(+), 33 deletions(-) diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index d4ab89dbe..57bf36ebb 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -4,8 +4,14 @@ #include "../../ErrorCode.hpp" #include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { +Compressor::Compressor() + : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, + m_compressed_stream_file_writer{nullptr} {} + auto Compressor::write(char const* data, size_t const data_length) -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index a94086196..9ec291c19 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -7,7 +7,6 @@ #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" #include "../Compressor.hpp" -#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { /** @@ -29,7 +28,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { }; // Constructors - Compressor() : ::clp::streaming_compression::Compressor{CompressorType::Passthrough} {} + Compressor(); // Destructor ~Compressor() override = default; @@ -77,7 +76,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer{nullptr}; + FileWriter* m_compressed_stream_file_writer; }; } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index f842ea278..cbfabbe89 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -1,11 +1,11 @@ #include "Compressor.hpp" #include -#include #include #include +#include "../../Array.hpp" #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" @@ -35,7 +35,17 @@ auto is_error(size_t result) -> bool { namespace clp::streaming_compression::zstd { Compressor::Compressor() : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, - m_compression_stream{ZSTD_createCStream()} { + m_compressed_stream_file_writer{nullptr}, + m_compression_stream{ZSTD_createCStream()}, + m_compression_stream_contains_data{false}, + m_compressed_stream_block_size{ZSTD_CStreamOutSize()}, + m_compressed_stream_block_buffer{Array{m_compressed_stream_block_size}}, + m_compressed_stream_block{ + .dst = m_compressed_stream_block_buffer.data(), + .size = m_compressed_stream_block_size, + .pos = 0 + }, + m_uncompressed_stream_pos{0} { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -51,12 +61,6 @@ auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } - // Setup compressed stream parameters - auto const compressed_stream_block_size{ZSTD_CStreamOutSize()}; - m_compressed_stream_block_buffer.resize(compressed_stream_block_size); - m_compressed_stream_block.dst = m_compressed_stream_block_buffer.data(); - m_compressed_stream_block.size = compressed_stream_block_size; - // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; if (is_error(init_result)) { diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 32c0f2338..0908b4fcc 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -2,10 +2,10 @@ #define CLP_STREAMING_COMPRESSION_ZSTD_COMPRESSOR_HPP #include -#include #include +#include "../../Array.hpp" #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" @@ -92,16 +92,17 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer{nullptr}; + FileWriter* m_compressed_stream_file_writer; // Compressed stream variables ZSTD_CStream* m_compression_stream; - bool m_compression_stream_contains_data{false}; + bool m_compression_stream_contains_data; - ZSTD_outBuffer m_compressed_stream_block{}; - std::vector m_compressed_stream_block_buffer; + size_t m_compressed_stream_block_size; + Array m_compressed_stream_block_buffer; + ZSTD_outBuffer m_compressed_stream_block; - size_t m_uncompressed_stream_pos{0}; + size_t m_uncompressed_stream_pos; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index cbcb12364..f3083a1cc 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -1,13 +1,14 @@ +#include #include #include #include #include -#include #include #include #include +#include "../src/clp/Array.hpp" #include "../src/clp/ErrorCode.hpp" #include "../src/clp/FileWriter.hpp" #include "../src/clp/ReadOnlyMemoryMappedFile.hpp" @@ -18,12 +19,14 @@ #include "../src/clp/streaming_compression/zstd/Compressor.hpp" #include "../src/clp/streaming_compression/zstd/Decompressor.hpp" +using clp::Array; using clp::ErrorCode_Success; using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; TEST_CASE("StreamingCompression", "[StreamingCompression]") { + // Initialize constants constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB constexpr auto cCompressionChunkSizes = std::to_array( {cBufferSize / 100, @@ -35,41 +38,35 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { cBufferSize} ); constexpr size_t cAlphabetLength = 26; - std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; - std::vector compression_chunk_sizes{ - cCompressionChunkSizes.begin(), - cCompressionChunkSizes.end() - }; + + // Initialize compression devices std::unique_ptr compressor; std::unique_ptr decompressor; - SECTION("Initiate zstd single phase compression") { - compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize()); + SECTION("ZStd single phase compression") { compressor = std::make_unique(); decompressor = std::make_unique(); } - SECTION("Initiate passthrough compression") { + SECTION("Passthrough compression") { compressor = std::make_unique(); decompressor = std::make_unique(); } // Initialize buffers - std::vector uncompressed_buffer{}; - uncompressed_buffer.resize(cBufferSize); + Array uncompressed_buffer{cBufferSize}; for (size_t i{0}; i < cBufferSize; ++i) { uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); } - std::vector decompressed_buffer{}; - decompressed_buffer.resize(cBufferSize); + Array decompressed_buffer{cBufferSize}; // Compress FileWriter file_writer; file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); compressor->open(file_writer); - for (auto const chunk_size : compression_chunk_sizes) { + for (auto const chunk_size : cCompressionChunkSizes) { compressor->write(uncompressed_buffer.data(), chunk_size); } compressor->close(); @@ -81,9 +78,9 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { decompressor->open(compressed_file_view.data(), compressed_file_view.size()); size_t num_uncompressed_bytes{0}; - for (auto const chunk_size : compression_chunk_sizes) { + for (auto const chunk_size : cCompressionChunkSizes) { // Clear the buffer to ensure that we are not comparing values from a previous test - std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); + std::ranges::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); REQUIRE( (ErrorCode_Success == decompressor->get_decompressed_stream_region( From dec933953363d3f7e01f3b0cd21b8c0ff0741646 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 13:47:27 -0500 Subject: [PATCH 06/12] Typo fix --- .../src/clp/streaming_compression/passthrough/Compressor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 57bf36ebb..305c61056 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -9,7 +9,7 @@ namespace clp::streaming_compression::passthrough { Compressor::Compressor() - : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, + : ::clp::streaming_compression::Compressor{CompressorType::Passthrough}, m_compressed_stream_file_writer{nullptr} {} auto Compressor::write(char const* data, size_t const data_length) -> void { From 795c10618e86fa2f626a107238d4d959eb4e86c3 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 23:19:09 -0500 Subject: [PATCH 07/12] Apply suggestions from code review Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> --- components/core/src/clp/streaming_compression/Compressor.hpp | 2 +- .../core/src/clp/streaming_compression/zstd/Compressor.cpp | 2 +- components/core/tests/test-StreamingCompression.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index 4963fbc13..f1bfecb4e 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -13,7 +13,7 @@ namespace clp::streaming_compression { /** - * Generic compressor interface. + * Abstract compressor interface. */ class Compressor : public WriterInterface { public: diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index cbfabbe89..0ab9abe9c 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -24,7 +24,7 @@ namespace { * 2. A size_t <-> error_code_enum mapping function `ZSTD_getErrorCode`. * See also: https://facebook.github.io/zstd/zstd_manual.html * - * @param result A `size_t` type result returned by ZStd functions + * @param result A `size_t` type result returned from ZStd APIs * @return Whether the result is an error code and indicates an error has occurred */ auto is_error(size_t result) -> bool { diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index f3083a1cc..42fbb0721 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -37,7 +37,7 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { cBufferSize / 2, cBufferSize} ); - constexpr size_t cAlphabetLength = 26; + constexpr size_t cAlphabetLength{26}; std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; // Initialize compression devices From c5d551da60afd4d8f528712c478c16298a227310 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 23:43:24 -0500 Subject: [PATCH 08/12] Address review concern --- components/core/CMakeLists.txt | 1 - components/core/src/clp/clg/CMakeLists.txt | 1 - components/core/src/clp/clo/CMakeLists.txt | 1 - components/core/src/clp/clp/CMakeLists.txt | 1 - .../clp/streaming_compression/Compressor.hpp | 7 +-- .../clp/streaming_compression/Constants.hpp | 13 ------ .../streaming_compression/Decompressor.hpp | 7 +-- .../passthrough/Compressor.cpp | 6 --- .../passthrough/Compressor.hpp | 6 +-- .../passthrough/Decompressor.hpp | 3 +- .../streaming_compression/zstd/Compressor.cpp | 45 +++---------------- .../streaming_compression/zstd/Compressor.hpp | 19 ++++---- .../zstd/Decompressor.cpp | 3 +- .../core/tests/test-StreamingCompression.cpp | 11 +++++ 14 files changed, 34 insertions(+), 90 deletions(-) delete mode 100644 components/core/src/clp/streaming_compression/Constants.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index e5c9b06c8..8d9604abd 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -460,7 +460,6 @@ set(SOURCE_FILES_unitTest src/clp/streaming_archive/writer/utils.cpp src/clp/streaming_archive/writer/utils.hpp src/clp/streaming_compression/Compressor.hpp - src/clp/streaming_compression/Constants.hpp src/clp/streaming_compression/Decompressor.hpp src/clp/streaming_compression/passthrough/Compressor.cpp src/clp/streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index c3c8e3aea..de93cb39c 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -89,7 +89,6 @@ set( ../streaming_archive/writer/File.hpp ../streaming_archive/writer/Segment.cpp ../streaming_archive/writer/Segment.hpp - ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index 39cc72b60..6355363bc 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -89,7 +89,6 @@ set( ../streaming_archive/writer/File.hpp ../streaming_archive/writer/Segment.cpp ../streaming_archive/writer/Segment.hpp - ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index eff32ce46..0ec3b7147 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -117,7 +117,6 @@ set( ../streaming_archive/writer/utils.cpp ../streaming_archive/writer/utils.hpp ../streaming_compression/Compressor.hpp - ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index f1bfecb4e..ac55bd270 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -9,7 +9,6 @@ #include "../FileWriter.hpp" #include "../TraceableException.hpp" #include "../WriterInterface.hpp" -#include "Constants.hpp" namespace clp::streaming_compression { /** @@ -31,7 +30,7 @@ class Compressor : public WriterInterface { }; // Constructor - explicit Compressor(CompressorType type) : m_type{type} {} + Compressor() = default; // Destructor virtual ~Compressor() = default; @@ -74,10 +73,6 @@ class Compressor : public WriterInterface { * @param file_writer */ virtual auto open(FileWriter& file_writer) -> void = 0; - -private: - // Variables - CompressorType m_type; }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp deleted file mode 100644 index 6c6a78bfc..000000000 --- a/components/core/src/clp/streaming_compression/Constants.hpp +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef CLP_STREAMING_COMPRESSION_CONSTANTS_HPP -#define CLP_STREAMING_COMPRESSION_CONSTANTS_HPP - -#include - -namespace clp::streaming_compression { -enum class CompressorType : uint8_t { - ZSTD = 0x10, - Passthrough = 0xFF, -}; -} // namespace clp::streaming_compression - -#endif // CLP_STREAMING_COMPRESSION_CONSTANTS_HPP diff --git a/components/core/src/clp/streaming_compression/Decompressor.hpp b/components/core/src/clp/streaming_compression/Decompressor.hpp index 31666acd9..fa626b2bc 100644 --- a/components/core/src/clp/streaming_compression/Decompressor.hpp +++ b/components/core/src/clp/streaming_compression/Decompressor.hpp @@ -6,7 +6,6 @@ #include "../FileReader.hpp" #include "../ReaderInterface.hpp" #include "../TraceableException.hpp" -#include "Constants.hpp" namespace clp::streaming_compression { class Decompressor : public ReaderInterface { @@ -25,7 +24,7 @@ class Decompressor : public ReaderInterface { }; // Constructor - explicit Decompressor(CompressorType type) : m_compression_type(type) {} + Decompressor() = default; // Destructor ~Decompressor() = default; @@ -57,10 +56,6 @@ class Decompressor : public ReaderInterface { char* extraction_buf, size_t extraction_len ) = 0; - -protected: - // Variables - CompressorType m_compression_type; }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 305c61056..d4ab89dbe 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -4,14 +4,8 @@ #include "../../ErrorCode.hpp" #include "../../TraceableException.hpp" -#include "../Compressor.hpp" -#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { -Compressor::Compressor() - : ::clp::streaming_compression::Compressor{CompressorType::Passthrough}, - m_compressed_stream_file_writer{nullptr} {} - auto Compressor::write(char const* data, size_t const data_length) -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index 9ec291c19..f7ccd5004 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -27,8 +27,8 @@ class Compressor : public ::clp::streaming_compression::Compressor { } }; - // Constructors - Compressor(); + // Constructor + Compressor() = default; // Destructor ~Compressor() override = default; @@ -76,7 +76,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer; + FileWriter* m_compressed_stream_file_writer{nullptr}; }; } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp index 49501dc6e..8ad87b964 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp @@ -26,8 +26,7 @@ class Decompressor : public ::clp::streaming_compression::Decompressor { // Constructors Decompressor() - : ::clp::streaming_compression::Decompressor(CompressorType::Passthrough), - m_input_type(InputType::NotInitialized), + : m_input_type(InputType::NotInitialized), m_compressed_data_buf(nullptr), m_compressed_data_buf_len(0), m_decompressed_stream_pos(0) {} diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 0ab9abe9c..74789f4a8 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -5,47 +5,12 @@ #include #include -#include "../../Array.hpp" #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" -#include "../Compressor.hpp" -#include "../Constants.hpp" - -namespace { -/** - * Checks if a value returned by ZStd function indicates an error code. - * - * For most ZStd functions that return `size_t` results, instead of returning a union type that can - * either be a valid result or an error code, an unanimous `size_t` type is returned. - * Usually, if the return value exceeds the maximum possible value of valid results, it is treated - * as an error code. However, the exact behavior is function-dependent, so ZStd provides: - * 1. A value checking function `ZSTD_isError` - * 2. A size_t <-> error_code_enum mapping function `ZSTD_getErrorCode`. - * See also: https://facebook.github.io/zstd/zstd_manual.html - * - * @param result A `size_t` type result returned from ZStd APIs - * @return Whether the result is an error code and indicates an error has occurred - */ -auto is_error(size_t result) -> bool { - return 0 != ZSTD_isError(result) && ZSTD_error_no_error != ZSTD_getErrorCode(result); -} -} // namespace namespace clp::streaming_compression::zstd { -Compressor::Compressor() - : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, - m_compressed_stream_file_writer{nullptr}, - m_compression_stream{ZSTD_createCStream()}, - m_compression_stream_contains_data{false}, - m_compressed_stream_block_size{ZSTD_CStreamOutSize()}, - m_compressed_stream_block_buffer{Array{m_compressed_stream_block_size}}, - m_compressed_stream_block{ - .dst = m_compressed_stream_block_buffer.data(), - .size = m_compressed_stream_block_size, - .pos = 0 - }, - m_uncompressed_stream_pos{0} { +Compressor::Compressor() { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -63,7 +28,7 @@ auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; - if (is_error(init_result)) { + if (ZSTD_error_no_error != ZSTD_getErrorCode(init_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}", ZSTD_getErrorName(init_result) @@ -106,7 +71,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void { &m_compressed_stream_block, &uncompressed_stream_block )}; - if (is_error(compress_result)) { + if (ZSTD_error_no_error != ZSTD_getErrorCode(compress_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}", ZSTD_getErrorName(compress_result) @@ -134,7 +99,7 @@ auto Compressor::flush() -> void { m_compressed_stream_block.pos = 0; auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; - if (is_error(end_stream_result)) { + if (ZSTD_error_no_error != ZSTD_getErrorCode(end_stream_result)) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( @@ -168,7 +133,7 @@ auto Compressor::flush_without_ending_frame() -> void { while (true) { m_compressed_stream_block.pos = 0; auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; - if (is_error(flush_result)) { + if (ZSTD_error_no_error != ZSTD_getErrorCode(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", ZSTD_getErrorName(flush_result) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 0908b4fcc..c2736737e 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -92,17 +92,20 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer; + FileWriter* m_compressed_stream_file_writer{nullptr}; // Compressed stream variables - ZSTD_CStream* m_compression_stream; - bool m_compression_stream_contains_data; - - size_t m_compressed_stream_block_size; - Array m_compressed_stream_block_buffer; - ZSTD_outBuffer m_compressed_stream_block; + ZSTD_CStream* m_compression_stream{ZSTD_createCStream()}; + bool m_compression_stream_contains_data{false}; + + Array m_compressed_stream_block_buffer{ZSTD_CStreamOutSize()}; + ZSTD_outBuffer m_compressed_stream_block{ + .dst = m_compressed_stream_block_buffer.data(), + .size = m_compressed_stream_block_buffer.size(), + .pos = 0 + }; - size_t m_uncompressed_stream_pos; + size_t m_uncompressed_stream_pos{0}; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp index 818379a24..c3802023c 100644 --- a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp @@ -8,8 +8,7 @@ namespace clp::streaming_compression::zstd { Decompressor::Decompressor() - : ::clp::streaming_compression::Decompressor(CompressorType::ZSTD), - m_input_type(InputType::NotInitialized), + : m_input_type(InputType::NotInitialized), m_decompression_stream(nullptr), m_file_reader(nullptr), m_file_reader_initial_pos(0), diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 42fbb0721..0fbae9e3a 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -97,6 +98,16 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { num_uncompressed_bytes += chunk_size; } + // Sanity check + REQUIRE( + (std::accumulate( + cCompressionChunkSizes.cbegin(), + cCompressionChunkSizes.cend(), + size_t{0} + ) + == num_uncompressed_bytes) + ); + // Cleanup boost::filesystem::remove(compressed_file_path); } From 4dc14f8f370e7b365fc1626de5aaf1942d6023b4 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 22 Nov 2024 01:45:03 -0500 Subject: [PATCH 09/12] Logging typo fix --- .../core/src/clp/streaming_compression/zstd/Compressor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 74789f4a8..229b96582 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -135,7 +135,7 @@ auto Compressor::flush_without_ending_frame() -> void { auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; if (ZSTD_error_no_error != ZSTD_getErrorCode(flush_result)) { SPDLOG_ERROR( - "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", + "streaming_compression::zstd::Compressor: ZSTD_flushStream() error: {}", ZSTD_getErrorName(flush_result) ); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); From 74d62bfe53a49e169974ec353b39e9bec0f01a12 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 22 Nov 2024 18:05:40 -0500 Subject: [PATCH 10/12] Add back constants file --- components/core/CMakeLists.txt | 1 + components/core/src/clp/clg/CMakeLists.txt | 1 + components/core/src/clp/clo/CMakeLists.txt | 1 + components/core/src/clp/clp/CMakeLists.txt | 1 + .../src/clp/streaming_compression/Constants.hpp | 14 ++++++++++++++ .../src/clp/streaming_compression/Decompressor.hpp | 7 ++++++- .../passthrough/Decompressor.hpp | 3 ++- .../streaming_compression/zstd/Decompressor.cpp | 3 ++- 8 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 components/core/src/clp/streaming_compression/Constants.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 8d9604abd..e5c9b06c8 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -460,6 +460,7 @@ set(SOURCE_FILES_unitTest src/clp/streaming_archive/writer/utils.cpp src/clp/streaming_archive/writer/utils.hpp src/clp/streaming_compression/Compressor.hpp + src/clp/streaming_compression/Constants.hpp src/clp/streaming_compression/Decompressor.hpp src/clp/streaming_compression/passthrough/Compressor.cpp src/clp/streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index de93cb39c..c3c8e3aea 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -89,6 +89,7 @@ set( ../streaming_archive/writer/File.hpp ../streaming_archive/writer/Segment.cpp ../streaming_archive/writer/Segment.hpp + ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index 6355363bc..39cc72b60 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -89,6 +89,7 @@ set( ../streaming_archive/writer/File.hpp ../streaming_archive/writer/Segment.cpp ../streaming_archive/writer/Segment.hpp + ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index 0ec3b7147..eff32ce46 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -117,6 +117,7 @@ set( ../streaming_archive/writer/utils.cpp ../streaming_archive/writer/utils.hpp ../streaming_compression/Compressor.hpp + ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp new file mode 100644 index 000000000..4649c2e98 --- /dev/null +++ b/components/core/src/clp/streaming_compression/Constants.hpp @@ -0,0 +1,14 @@ +#ifndef CLP_STREAMING_COMPRESSION_CONSTANTS_HPP +#define CLP_STREAMING_COMPRESSION_CONSTANTS_HPP + +#include +#include + +namespace clp::streaming_compression { +enum class CompressorType : uint8_t { + ZSTD = 0x10, + Passthrough = 0xFF, +}; +} // namespace clp::streaming_compression + +#endif // CLP_STREAMING_COMPRESSION_CONSTANTS_HPP diff --git a/components/core/src/clp/streaming_compression/Decompressor.hpp b/components/core/src/clp/streaming_compression/Decompressor.hpp index fa626b2bc..31666acd9 100644 --- a/components/core/src/clp/streaming_compression/Decompressor.hpp +++ b/components/core/src/clp/streaming_compression/Decompressor.hpp @@ -6,6 +6,7 @@ #include "../FileReader.hpp" #include "../ReaderInterface.hpp" #include "../TraceableException.hpp" +#include "Constants.hpp" namespace clp::streaming_compression { class Decompressor : public ReaderInterface { @@ -24,7 +25,7 @@ class Decompressor : public ReaderInterface { }; // Constructor - Decompressor() = default; + explicit Decompressor(CompressorType type) : m_compression_type(type) {} // Destructor ~Decompressor() = default; @@ -56,6 +57,10 @@ class Decompressor : public ReaderInterface { char* extraction_buf, size_t extraction_len ) = 0; + +protected: + // Variables + CompressorType m_compression_type; }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp index 8ad87b964..49501dc6e 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Decompressor.hpp @@ -26,7 +26,8 @@ class Decompressor : public ::clp::streaming_compression::Decompressor { // Constructors Decompressor() - : m_input_type(InputType::NotInitialized), + : ::clp::streaming_compression::Decompressor(CompressorType::Passthrough), + m_input_type(InputType::NotInitialized), m_compressed_data_buf(nullptr), m_compressed_data_buf_len(0), m_decompressed_stream_pos(0) {} diff --git a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp index c3802023c..818379a24 100644 --- a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp @@ -8,7 +8,8 @@ namespace clp::streaming_compression::zstd { Decompressor::Decompressor() - : m_input_type(InputType::NotInitialized), + : ::clp::streaming_compression::Decompressor(CompressorType::ZSTD), + m_input_type(InputType::NotInitialized), m_decompression_stream(nullptr), m_file_reader(nullptr), m_file_reader_initial_pos(0), From 51001db62db7409d3572be62ec745580e3cd8641 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 22 Nov 2024 18:15:42 -0500 Subject: [PATCH 11/12] Final round --- .../clp/streaming_compression/zstd/Compressor.cpp | 15 ++++++++++----- .../clp/streaming_compression/zstd/Compressor.hpp | 6 +----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 229b96582..f78b71c46 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -10,7 +10,12 @@ #include "../../TraceableException.hpp" namespace clp::streaming_compression::zstd { -Compressor::Compressor() { +Compressor::Compressor() + : m_compressed_stream_block{ + .dst = m_compressed_stream_block_buffer.data(), + .size = m_compressed_stream_block_buffer.size(), + .pos = 0 + } { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -28,7 +33,7 @@ auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; - if (ZSTD_error_no_error != ZSTD_getErrorCode(init_result)) { + if (0 != ZSTD_isError(init_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}", ZSTD_getErrorName(init_result) @@ -71,7 +76,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void { &m_compressed_stream_block, &uncompressed_stream_block )}; - if (ZSTD_error_no_error != ZSTD_getErrorCode(compress_result)) { + if (0 != ZSTD_isError(compress_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}", ZSTD_getErrorName(compress_result) @@ -99,7 +104,7 @@ auto Compressor::flush() -> void { m_compressed_stream_block.pos = 0; auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; - if (ZSTD_error_no_error != ZSTD_getErrorCode(end_stream_result)) { + if (0 != ZSTD_isError(end_stream_result)) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( @@ -133,7 +138,7 @@ auto Compressor::flush_without_ending_frame() -> void { while (true) { m_compressed_stream_block.pos = 0; auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; - if (ZSTD_error_no_error != ZSTD_getErrorCode(flush_result)) { + if (0 != ZSTD_isError(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_flushStream() error: {}", ZSTD_getErrorName(flush_result) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index c2736737e..f55275f3e 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -99,11 +99,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { bool m_compression_stream_contains_data{false}; Array m_compressed_stream_block_buffer{ZSTD_CStreamOutSize()}; - ZSTD_outBuffer m_compressed_stream_block{ - .dst = m_compressed_stream_block_buffer.data(), - .size = m_compressed_stream_block_buffer.size(), - .pos = 0 - }; + ZSTD_outBuffer m_compressed_stream_block; size_t m_uncompressed_stream_pos{0}; }; From 816245c1483569bf157d06a0e3820167a1244cba Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 22 Nov 2024 18:29:26 -0500 Subject: [PATCH 12/12] Fix header --- .../core/src/clp/streaming_compression/zstd/Compressor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index f78b71c46..948ec1967 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp"