Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): Record log-order at compression time. #584

Merged
merged 13 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_log_event_idx_column_id = m_schema_tree->get_internal_field_id(constants::cLogEventIdxName);

m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
}
Expand Down Expand Up @@ -310,6 +312,12 @@ void ArchiveReader::initialize_schema_reader(
}
BaseColumnReader* column_reader = append_reader_column(reader, column_id);

if (column_id == m_log_event_idx_column_id
&& nullptr != dynamic_cast<Int64ColumnReader*>(column_reader))
{
reader.mark_column_as_log_event_idx(static_cast<Int64ColumnReader*>(column_reader));
}

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
{
reader.mark_column_as_timestamp(column_reader);
Expand Down Expand Up @@ -346,6 +354,7 @@ void ArchiveReader::close() {
m_cur_stream_id = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0ULL;
m_log_event_idx_column_id = -1;
}

std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class ArchiveReader {
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
int32_t m_log_event_idx_column_id{-1};
};
} // namespace clp_s

Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void ArchiveWriter::close() {
m_encoded_message_size = 0UL;
m_uncompressed_size = 0UL;
m_compressed_size = 0UL;
m_next_log_event_id = 0;
}

void ArchiveWriter::append_message(
Expand All @@ -86,6 +87,7 @@ void ArchiveWriter::append_message(
}

m_encoded_message_size += schema_writer->append_message(message);
++m_next_log_event_id;
}

size_t ArchiveWriter::get_data_size() {
Expand Down
9 changes: 8 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_ARCHIVEWRITER_HPP
#define CLP_S_ARCHIVEWRITER_HPP

#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>
Expand Down Expand Up @@ -93,10 +94,15 @@ class ArchiveWriter {
* @param key
* @return the node id
*/
int32_t add_node(int parent_node_id, NodeType type, std::string const& key) {
int32_t add_node(int parent_node_id, NodeType type, std::string_view const key) {
return m_schema_tree.add_node(parent_node_id, type, key);
}

/**
* @return the Id the next log event should receive when appended to the archive.
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
*/
int64_t get_next_log_event_id() const { return m_next_log_event_id; }

/**
* Return a schema's Id and add the schema to the
* schema map if it does not already exist.
Expand Down Expand Up @@ -174,6 +180,7 @@ class ArchiveWriter {
size_t m_encoded_message_size{};
size_t m_uncompressed_size{};
size_t m_compressed_size{};
int64_t m_next_log_event_id{};
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved

std::string m_id;

Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
decompression_options.add_options()(
"ordered",
po::bool_switch(&m_ordered_decompression),
"Enable decompression in ascending timestamp order for this archive"
"Enable decompression in log order for this archive"
)(
"ordered-chunk-size",
po::value<size_t>(&m_ordered_chunk_size)
->default_value(m_ordered_chunk_size),
"Number of records to include in each output file when decompressing records "
"in ascending timestamp order"
"in log order"
);
// clang-format on
extraction_options.add(decompression_options);
Expand Down
18 changes: 9 additions & 9 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ void JsonConstructor::construct_in_order() {
auto tables = m_archive_reader->read_all_tables();
using ReaderPointer = std::shared_ptr<SchemaReader>;
auto cmp = [](ReaderPointer& left, ReaderPointer& right) {
return left->get_next_timestamp() > right->get_next_timestamp();
return left->get_next_log_event_idx() > right->get_next_log_event_idx();
};
std::priority_queue record_queue(tables.begin(), tables.end(), cmp);
// Clear tables vector so that memory gets deallocated after we have marshalled all records for
// a given table
tables.clear();

epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
int64_t first_idx{0};
int64_t last_idx{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
Expand All @@ -98,8 +98,8 @@ void JsonConstructor::construct_in_order() {
std::vector<bsoncxx::document::value> results;
auto finalize_chunk = [&](bool open_new_writer) {
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
+ std::to_string(last_timestamp) + ".jsonl";
std::string new_file_name = src_path.string() + "_" + std::to_string(first_idx) + "_"
+ std::to_string(last_idx) + ".jsonl";
auto new_file_path = std::filesystem::path(new_file_name);
std::error_code ec;
std::filesystem::rename(src_path, new_file_path, ec);
Expand All @@ -119,11 +119,11 @@ void JsonConstructor::construct_in_order() {
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cBeginMsgIx,
static_cast<int64_t>(first_timestamp)
first_idx
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cEndMsgIx,
static_cast<int64_t>(last_timestamp)
last_idx
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cIsLastIrChunk,
Expand All @@ -140,9 +140,9 @@ void JsonConstructor::construct_in_order() {
while (false == record_queue.empty()) {
ReaderPointer next = record_queue.top();
record_queue.pop();
last_timestamp = next->get_next_timestamp();
last_idx = next->get_next_log_event_idx();
if (0 == num_records_marshalled) {
first_timestamp = last_timestamp;
first_idx = last_idx;
}
next->get_next_message(buffer);
if (false == next->done()) {
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JsonConstructor {
private:
/**
* Reads all of the tables from m_archive_reader and writes all of the records
* they contain to writer in timestamp order.
* they contain to writer in log order.
*/
void construct_in_order();

Expand Down
19 changes: 18 additions & 1 deletion components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,19 @@ bool JsonParser::parse() {
return false;
}

// Add internal log_event_idx field to record
auto log_event_idx = add_internal_field(constants::cLogEventIdxName, NodeType::Integer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the order column optional since it may affect the compression ratio a lot for some datasets? And if we want to record log order, we can call add_internal_field only once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make it optional I'd like to make it the default + force its use it in the package. I talked to @kirkrodrigues about it and the general consensus we reached is that allowing people to generate archives that don't record order now will cause problems for us down the line.

We could also do some brief experiments to see if we can reduce the overhead (e.g.by trying delta encoding or using 32 bit field size) if that would help convince you. If those kinds of optimizations work we can consider adding them in a later PR.

For add_internal_field I'm calling it that we so that we don't need to tie it to every different place where we flush an archive. If you want though I could put it in a lambda and call it in every one of those places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to set it as the default in the package.

For each archive, we can add the internal subtree and the node once, storing the node ID. After that, we just call add_value. Could you clarify what different places you're referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly line 506 where split_archive is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change to call add_metadata_field once before the ingestion loop and after every invocation of split_archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added a --disable-log-order option for the compression flow.

m_current_parsed_message.add_value(
log_event_idx,
m_archive_writer->get_next_log_event_id()
);
m_current_schema.insert_ordered(log_event_idx);
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved

// Some errors from simdjson are latent until trying to access invalid JSON fields.
// Instead of checking for an error every time we access a JSON field in parse_line we
// just catch simdjson_error here instead.
try {
parse_line(ref.value(), -1, "");
parse_line(ref.value(), constants::cRootNodeId, constants::cRootNodeName);
} catch (simdjson::simdjson_error& error) {
SPDLOG_ERROR(
"Encountered error - {} - while trying to parse {} after parsing {} bytes",
Expand Down Expand Up @@ -521,6 +529,15 @@ bool JsonParser::parse() {
return true;
}

int32_t JsonParser::add_internal_field(std::string_view const field_name, NodeType type) {
auto internal_subtree_id = m_archive_writer->add_node(
constants::cRootNodeId,
NodeType::Internal,
constants::cInternalSubtreeName
);
return m_archive_writer->add_node(internal_subtree_id, type, field_name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and documentation to the new method.

The method implementation is correct but could be more robust with error handling and documentation.

Consider applying these improvements:

+/**
+ * Creates an internal field under the Internal subtree.
+ * @param field_name Name of the internal field
+ * @param type Type of the internal field
+ * @return Node ID of the created field, or -1 if creation failed
+ */
 int32_t JsonParser::add_internal_field(std::string_view const field_name, NodeType type) {
     auto internal_subtree_id = m_archive_writer->add_node(
             constants::cRootNodeId,
             NodeType::Internal,
             constants::cInternalSubtreeName
     );
+    if (internal_subtree_id < 0) {
+        SPDLOG_ERROR("Failed to create internal subtree node");
+        return -1;
+    }
-    return m_archive_writer->add_node(internal_subtree_id, type, field_name);
+    auto field_id = m_archive_writer->add_node(internal_subtree_id, type, field_name);
+    if (field_id < 0) {
+        SPDLOG_ERROR("Failed to create internal field node");
+    }
+    return field_id;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
int32_t JsonParser::add_internal_field(std::string_view const field_name, NodeType type) {
auto internal_subtree_id = m_archive_writer->add_node(
constants::cRootNodeId,
NodeType::Internal,
constants::cInternalSubtreeName
);
return m_archive_writer->add_node(internal_subtree_id, type, field_name);
}
/**
* Creates an internal field under the Internal subtree.
* @param field_name Name of the internal field
* @param type Type of the internal field
* @return Node ID of the created field, or -1 if creation failed
*/
int32_t JsonParser::add_internal_field(std::string_view const field_name, NodeType type) {
auto internal_subtree_id = m_archive_writer->add_node(
constants::cRootNodeId,
NodeType::Internal,
constants::cInternalSubtreeName
);
if (internal_subtree_id < 0) {
SPDLOG_ERROR("Failed to create internal subtree node");
return -1;
}
auto field_id = m_archive_writer->add_node(internal_subtree_id, type, field_name);
if (field_id < 0) {
SPDLOG_ERROR("Failed to create internal field node");
}
return field_id;
}


void JsonParser::store() {
m_archive_writer->close();
}
Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <map>
#include <string>
#include <string_view>
#include <variant>
#include <vector>

Expand Down Expand Up @@ -94,6 +95,14 @@ class JsonParser {
*/
void split_archive();

/**
* Add an internal field to the MPT and get its Id.
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
*
* Note: this method should be called before parsing a record so that internal fields come first
* in each table. This isn't strictly necessary, but it is a nice convention.
*/
int32_t add_internal_field(std::string_view const field_name, NodeType type);

int m_num_messages;
std::vector<std::string> m_file_paths;

Expand Down
7 changes: 4 additions & 3 deletions components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define CLP_S_JSONSERIALIZER_HPP

#include <string>
#include <string_view>
#include <vector>

#include "ColumnReader.hpp"
Expand Down Expand Up @@ -66,7 +67,7 @@ class JsonSerializer {
return false;
}

void add_special_key(std::string const& key) { m_special_keys.push_back(key); }
void add_special_key(std::string_view const key) { m_special_keys.emplace_back(key); }

void begin_object() {
append_key();
Expand Down Expand Up @@ -110,13 +111,13 @@ class JsonSerializer {

void append_key() { append_key(m_special_keys[m_special_keys_index++]); }

void append_key(std::string const& key) {
void append_key(std::string_view const key) {
m_json_string += "\"";
m_json_string += key;
m_json_string += "\":";
}

void append_value(std::string const& value) {
void append_value(std::string_view const value) {
m_json_string += value;
m_json_string += ",";
}
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/ReaderUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ std::shared_ptr<SchemaTree> ReaderUtils::read_schema_tree(std::string const& arc
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}

std::string key;
for (size_t i = 0; i < num_nodes; i++) {
int32_t parent_id;
size_t key_length;
std::string key;
uint8_t node_type;

error_code = schema_tree_decompressor.try_read_numeric_value(parent_id);
Expand Down
23 changes: 16 additions & 7 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) {
}
}

int64_t SchemaReader::get_next_log_event_idx() const {
if (nullptr != m_log_event_idx_column) {
return std::get<int64_t>(m_log_event_idx_column->extract_value(m_cur_message));
}
return 0;
}

void SchemaReader::load(
std::shared_ptr<char[]> stream_buffer,
size_t offset,
Expand Down Expand Up @@ -86,7 +93,7 @@ void SchemaReader::generate_json_string() {
}
case JsonSerializer::Op::AddIntField: {
column = m_reordered_columns[column_id_index++];
auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
m_json_serializer.append_key(name);
m_json_serializer.append_value(
std::to_string(std::get<int64_t>(column->extract_value(m_cur_message)))
Expand All @@ -102,7 +109,7 @@ void SchemaReader::generate_json_string() {
}
case JsonSerializer::Op::AddFloatField: {
column = m_reordered_columns[column_id_index++];
auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
m_json_serializer.append_key(name);
m_json_serializer.append_value(
std::to_string(std::get<double>(column->extract_value(m_cur_message)))
Expand All @@ -118,7 +125,7 @@ void SchemaReader::generate_json_string() {
}
case JsonSerializer::Op::AddBoolField: {
column = m_reordered_columns[column_id_index++];
auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
m_json_serializer.append_key(name);
m_json_serializer.append_value(
std::get<uint8_t>(column->extract_value(m_cur_message)) != 0 ? "true"
Expand All @@ -136,7 +143,7 @@ void SchemaReader::generate_json_string() {
}
case JsonSerializer::Op::AddStringField: {
column = m_reordered_columns[column_id_index++];
auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name();
m_json_serializer.append_key(name);
m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message);
break;
Expand Down Expand Up @@ -215,9 +222,10 @@ bool SchemaReader::get_next_message(std::string& message, FilterClass* filter) {
return false;
}

bool SchemaReader::get_next_message_with_timestamp(
bool SchemaReader::get_next_message_with_metadata(
std::string& message,
epochtime_t& timestamp,
int64_t& log_event_idx,
FilterClass* filter
) {
// TODO: If we already get max_num_results messages, we can skip messages
Expand All @@ -241,6 +249,7 @@ bool SchemaReader::get_next_message_with_timestamp(
}

timestamp = m_get_timestamp();
log_event_idx = get_next_log_event_idx();

m_cur_message++;
return true;
Expand Down Expand Up @@ -561,7 +570,7 @@ void SchemaReader::initialize_serializer() {
// TODO: this code will have to change once we allow mixing log lines parsed by different
// parsers.
if (false == m_local_schema_tree.get_nodes().empty()) {
generate_json_template(m_local_schema_tree.get_root_node_id());
generate_json_template(m_local_schema_tree.get_object_subtree_node_id());
}
}

Expand All @@ -572,7 +581,7 @@ void SchemaReader::generate_json_template(int32_t id) {
for (int32_t child_id : children_ids) {
int32_t child_global_id = m_local_id_to_global_id[child_id];
auto const& child_node = m_local_schema_tree.get_node(child_id);
std::string const& key = child_node.get_key_name();
auto key = child_node.get_key_name();
switch (child_node.get_type()) {
case NodeType::Object: {
m_json_serializer.add_op(JsonSerializer::Op::BeginObject);
Expand Down
Loading
Loading