Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp): Add the write path for single-file archives. #646

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_archive/reader/Segment.hpp
src/clp/streaming_archive/reader/SegmentManager.cpp
src/clp/streaming_archive/reader/SegmentManager.hpp
src/clp/streaming_archive/single_file_archive/Defs.hpp
src/clp/streaming_archive/single_file_archive/writer.cpp
src/clp/streaming_archive/single_file_archive/writer.hpp
Comment on lines +508 to +510
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing test coverage for the new single-file archive implementation

The new single-file archive files have been properly added to the CMake configuration, but I don't see any corresponding test files for this functionality. While there are several test files in the codebase:

  • There's only one test file that mentions "Archive": test-clp_s-end_to_end.cpp
  • None of the existing test files appear to specifically cover the new single-file archive implementation

Recommended actions:

  • Create unit tests for the new single-file archive writer implementation
  • Consider adding tests in these areas:
    • Basic writer functionality in writer.cpp
    • Edge cases and error handling
    • Integration with the existing archive system
🔗 Analysis chain

LGTM! The new single-file archive source files are properly integrated.

The files are correctly placed in a dedicated directory and follow the project's naming conventions.

Let's verify the test coverage for these new files:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for corresponding test files and test coverage for the new SFA implementation.

# Look for test files related to single-file archive
rg -l "single_file_archive" tests/

# Check for test cases using the new writer
rg -A 5 "TEST.*single.*file.*archive" tests/

Length of output: 175


Script:

#!/bin/bash
# Let's first find the test directory location
fd -t d "test" .

# Then search for single file archive related files and tests
rg -l "single_file_archive" .

# Look for test cases specifically
rg -A 5 "TEST.*single.*file.*archive" .

Length of output: 1270


Script:

#!/bin/bash
# Let's check the test files in the core component
fd -t f ".*test.*\.(cpp|hpp)$" components/core/tests/

# Look for any test files that might be testing the archive functionality
rg -l "Archive" components/core/tests/

# Check if there are any test files that include "writer" in their name or content
rg -l "writer" components/core/tests/

Length of output: 1828

src/clp/streaming_archive/writer/Archive.cpp
src/clp/streaming_archive/writer/Archive.hpp
src/clp/streaming_archive/writer/File.cpp
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ set(
../streaming_archive/reader/Segment.hpp
../streaming_archive/reader/SegmentManager.cpp
../streaming_archive/reader/SegmentManager.hpp
../streaming_archive/single_file_archive/Defs.hpp
../streaming_archive/single_file_archive/writer.cpp
../streaming_archive/single_file_archive/writer.hpp
../streaming_archive/writer/Archive.cpp
../streaming_archive/writer/Archive.hpp
../streaming_archive/writer/File.cpp
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
->default_value(m_schema_file_path),
"Path to a schema file. If not specified, heuristics are used to determine "
"dictionary variables. See README-Schema.md for details."
)(
"single-file-archive",
po::bool_switch(&m_single_file_archive),
"Output archive as a single-file"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Output archive as a single-file"
"Output archive as a single-file archive"

Also, should we support the option that allows user to specify the filename?

);

po::options_description all_compression_options;
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
explicit CommandLineArguments(std::string const& program_name)
: CommandLineArgumentsBase(program_name),
m_show_progress(false),
m_single_file_archive(false),
m_sort_input_files(true),
m_print_archive_stats_progress(false),
m_target_segment_uncompressed_size(1L * 1024 * 1024 * 1024),
Expand All @@ -45,6 +46,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

bool show_progress() const { return m_show_progress; }

bool get_use_single_file_archive() const { return m_single_file_archive; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can name it to be "use_single_file_archive" or maybe even "single_file_archive", same as other boolean.


bool sort_input_files() const { return m_sort_input_files; }

bool print_archive_stats_progress() const { return m_print_archive_stats_progress; }
Expand Down Expand Up @@ -92,6 +95,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
bool m_single_file_archive;
bool m_print_archive_stats_progress;
size_t m_target_encoded_file_size;
size_t m_target_segment_uncompressed_size;
Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ void FileCompressor::parse_and_encode_with_heuristic(

// Parse content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking, would it be cleaner if we add a new method with name like "should_split" to archive_writer, and embed this if logic into the method.

Now the same if statements have been duplicated at multiple places, which is inefficient and error prone since one change requires you to update multiple places

&& false == archive_writer.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path_for_compression,
Expand Down Expand Up @@ -337,7 +339,9 @@ bool FileCompressor::try_compressing_as_archive(
parent_directories.emplace(file_parent_path);
}

if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}

Expand Down Expand Up @@ -537,7 +541,9 @@ std::error_code FileCompressor::compress_ir_stream_by_encoding(
}

// Split archive/encoded file if necessary before writing the new event
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path,
Expand Down
9 changes: 7 additions & 2 deletions components/core/src/clp/clp/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ bool compress(
archive_user_config.global_metadata_db = global_metadata_db.get();
archive_user_config.print_archive_stats_progress
= command_line_args.print_archive_stats_progress();
archive_user_config.use_single_file_archive = command_line_args.get_use_single_file_archive();

// Open Archive
streaming_archive::writer::Archive archive_writer;
Expand Down Expand Up @@ -135,7 +136,9 @@ bool compress(
);
}
for (auto it = files_to_compress.cbegin(); it != files_to_compress.cend(); ++it) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down Expand Up @@ -163,7 +166,9 @@ bool compress(
file_group_id_comparator);
// Compress grouped files
for (auto const& file_to_compress : grouped_files_to_compress) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down
22 changes: 22 additions & 0 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
#include <cstdint>

#include "../Defs.h"
#include "../ffi/encoding_methods.hpp"
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"

namespace clp::streaming_archive {

static constexpr std::string_view cCompressionTypeZstd = "ZSTD";

/**
* A class to encapsulate metadata directly relating to an archive.
*/
Expand Down Expand Up @@ -79,6 +83,18 @@ class ArchiveMetadata {

[[nodiscard]] auto get_end_timestamp() const { return m_end_timestamp; }

[[nodiscard]] auto get_variable_encoding_methods_version() const -> std::string const& {
return m_variable_encoding_methods_version;
}

[[nodiscard]] auto get_variables_schema_version() const -> std::string const& {
return m_variables_schema_version;
}

[[nodiscard]] auto get_compression_type() const -> std::string const& {
return m_compression_type;
}

/**
* Expands the archive's time range based to encompass the given time range
* @param begin_timestamp
Expand All @@ -102,6 +118,12 @@ class ArchiveMetadata {
// The size of the archive
uint64_t m_compressed_size{0};
uint64_t m_dynamic_compressed_size{0};
// TODO: The following fields are used in single-file archive; however, they are not
// currently part of multi-file archive metadata. Modifying multi-file archive metadata
// disk format is potentially a breaking change and not currently required.
std::string m_variable_encoding_methods_version{ffi::cVariableEncodingMethodsVersion};
std::string m_variables_schema_version{ffi::cVariablesSchemaVersion};
std::string m_compression_type{cCompressionTypeZstd};
};
} // namespace clp::streaming_archive

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP

#include <cstdint>
#include <string>

#include "../../Defs.h"
#include "../Constants.hpp"
#include "msgpack.hpp"

namespace clp::streaming_archive::single_file_archive {

using single_file_archive_format_version_t = uint32_t;

// Single file archive version.
constexpr uint8_t cArchiveMajorVersion{0};
constexpr uint8_t cArchiveMinorVersion{1};
constexpr uint16_t cArchivePatchVersion{1};
constexpr single_file_archive_format_version_t cArchiveVersion{
cArchiveMajorVersion << 24 | cArchiveMinorVersion << 16 | cArchivePatchVersion
};

static constexpr size_t cNumMagicNumberChars{4};
static constexpr std::array<uint8_t, cNumMagicNumberChars>
cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'};
static constexpr std::string_view cUnstructuredSfaExtension{".clp"};
static constexpr size_t cFileSizeWarningThreshold{100L * 1024 * 1024};

static constexpr size_t cNumStaticFiles{5};
constexpr std::array<char const*, cNumStaticFiles> cStaticArchiveFileNames{
cMetadataDBFileName,
cLogTypeDictFilename,
cLogTypeSegmentIndexFilename,
cVarDictFilename,
cVarSegmentIndexFilename
};

static constexpr size_t cNumUnused{6};

struct __attribute__((packed)) SingleFileArchiveHeader {
std::array<uint8_t, cNumMagicNumberChars> magic;
single_file_archive_format_version_t version;
uint64_t metadata_size;
std::array<uint64_t, cNumUnused> unused;
};

struct FileInfo {
std::string n;
uint64_t o;
MSGPACK_DEFINE_MAP(n, o);
};

struct MultiFileArchiveMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinZhihao-723 @kirkrodrigues
My understanding is that we should use class instead of struct, even if it's for data holding purpose. Can you guys confirm?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://google.github.io/styleguide/cppguide.html#Structs_vs._Classes

Use a struct only for passive objects that carry data; everything else is a class.

The struct must not have invariants that imply relationships between different fields, since direct user access to those fields may break those invariants.

Do any of the fields in this struct have relationships between each other?

Copy link
Contributor Author

@davemarco davemarco Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The relationships between the fields are weak, however, one could argue relationships between uncompressed and compressed size.

One could also argue that the ArchiveMetadata class, the location where the values come from, should be responsible for enforcing the invariant, and not this struct, which is just a temporary container for msgpack serialization.

Also note that class members are prefixed with "m_", which would complicate msgpack serialization since as default it just uses the variables names as keys.

Lastly, I took this from clp-s code since i thought it was more elegant than the nholman json class serialization interface.

archive_format_version_t archive_format_version;
std::string variable_encoding_methods_version;
std::string variables_schema_version;
std::string compression_type;
std::string creator_id;
epochtime_t begin_timestamp;
epochtime_t end_timestamp;
uint64_t uncompressed_size;
uint64_t compressed_size;
MSGPACK_DEFINE_MAP(
archive_format_version,
variable_encoding_methods_version,
variables_schema_version,
compression_type,
creator_id,
begin_timestamp,
end_timestamp,
uncompressed_size,
compressed_size
);
};

struct SingleFileArchiveMetadata {
std::vector<FileInfo> archive_files;
MultiFileArchiveMetadata archive_metadata;
uint64_t num_segments;
MSGPACK_DEFINE_MAP(archive_files, archive_metadata, num_segments);
};
} // namespace clp::streaming_archive::single_file_archive

#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
Loading
Loading