Skip to content

Commit

Permalink
Add option to disable log order, and respond to other review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Nov 18, 2024
1 parent 2d1b76f commit e69a34a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 15 deletions.
6 changes: 5 additions & 1 deletion components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
)(
"print-archive-stats",
po::bool_switch(&m_print_archive_stats),
"Print statistics (json) about the archive after it's compressed."
"Print statistics (json) about the archixve after it's compressed."
)(
"structurize-arrays",
po::bool_switch(&m_structurize_arrays),
"Structurize arrays instead of compressing them as clp strings."
)(
"disable-log-order",
po::bool_switch(&m_no_record_log_order),
"Do not record log order at ingestion time."
);
// clang-format on

Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class CommandLineArguments {

std::vector<std::string> const& get_projection_columns() const { return m_projection_columns; }

bool get_record_log_order() const { return false == m_no_record_log_order; }

private:
// Methods
/**
Expand Down Expand Up @@ -178,6 +180,7 @@ class CommandLineArguments {
bool m_ordered_decompression{false};
size_t m_ordered_chunk_size{0};
size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB
bool m_no_record_log_order{false};

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
Expand Down
27 changes: 20 additions & 7 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ JsonParser::JsonParser(JsonParserOption const& option)
m_target_encoded_size(option.target_encoded_size),
m_max_document_size(option.max_document_size),
m_timestamp_key(option.timestamp_key),
m_structurize_arrays(option.structurize_arrays) {
m_structurize_arrays(option.structurize_arrays),
m_record_log_order(option.record_log_order) {
if (false == FileUtils::validate_path(option.file_paths)) {
exit(1);
}
Expand Down Expand Up @@ -447,6 +448,16 @@ bool JsonParser::parse() {
m_num_messages = 0;
size_t bytes_consumed_up_to_prev_archive = 0;
size_t bytes_consumed_up_to_prev_record = 0;

int32_t log_event_idx_node_id{};
auto add_log_event_idx_node = [&]() {
if (m_record_log_order) {
log_event_idx_node_id
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
}
};
add_log_event_idx_node();

while (json_file_iterator.get_json(json_it)) {
m_current_schema.clear();

Expand All @@ -468,12 +479,13 @@ bool JsonParser::parse() {
}

// Add log_event_idx field to metadata for record
auto log_event_idx = add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
m_current_parsed_message.add_value(
log_event_idx,
m_archive_writer->get_next_log_event_id()
);
m_current_schema.insert_ordered(log_event_idx);
if (m_record_log_order) {
m_current_parsed_message.add_value(
log_event_idx_node_id,
m_archive_writer->get_next_log_event_id()
);
m_current_schema.insert_ordered(log_event_idx_node_id);
}

// Some errors from simdjson are latent until trying to access invalid JSON fields.
// Instead of checking for an error every time we access a JSON field in parse_line we
Expand Down Expand Up @@ -504,6 +516,7 @@ bool JsonParser::parse() {
);
bytes_consumed_up_to_prev_archive = bytes_consumed_up_to_prev_record;
split_archive();
add_log_event_idx_node();
}

m_current_parsed_message.clear();
Expand Down
14 changes: 8 additions & 6 deletions components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ struct JsonParserOption {
std::vector<std::string> file_paths;
std::string timestamp_key;
std::string archives_dir;
size_t target_encoded_size;
size_t max_document_size;
size_t min_table_size;
int compression_level;
bool print_archive_stats;
bool structurize_arrays;
size_t target_encoded_size{};
size_t max_document_size{};
size_t min_table_size{};
int compression_level{};
bool print_archive_stats{};
bool structurize_arrays{};
bool record_log_order{true};
std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
};

Expand Down Expand Up @@ -118,6 +119,7 @@ class JsonParser {
size_t m_target_encoded_size;
size_t m_max_document_size;
bool m_structurize_arrays{false};
bool m_record_log_order{true};
};
} // namespace clp_s

Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/SchemaTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SchemaTree {
}

/**
* @return the Id of the root of the Object sub-tree.
* @return the Id of the root of the Object sub-tree that records the structure of JSON data.
* @return -1 if the Object sub-tree does not exist.
*/
int32_t get_object_subtree_node_id() const { return m_object_subtree_id; }
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ bool compress(CommandLineArguments const& command_line_arguments) {
option.timestamp_key = command_line_arguments.get_timestamp_key();
option.print_archive_stats = command_line_arguments.print_archive_stats();
option.structurize_arrays = command_line_arguments.get_structurize_arrays();
option.record_log_order = command_line_arguments.get_record_log_order();

auto const& db_config_container = command_line_arguments.get_metadata_db_config();
if (db_config_container.has_value()) {
Expand Down

0 comments on commit e69a34a

Please sign in to comment.