diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 5362d32cc..7c68b301d 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -27,6 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) { m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str); m_schema_map = ReaderUtils::read_schemas(archive_path_str); + m_log_event_idx_column_id = m_schema_tree->get_metadata_field_id(constants::cLogEventIdxName); + m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile); m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile); } @@ -310,6 +312,12 @@ void ArchiveReader::initialize_schema_reader( } BaseColumnReader* column_reader = append_reader_column(reader, column_id); + if (column_id == m_log_event_idx_column_id + && nullptr != dynamic_cast(column_reader)) + { + reader.mark_column_as_log_event_idx(static_cast(column_reader)); + } + if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0) { reader.mark_column_as_timestamp(column_reader); @@ -346,6 +354,7 @@ void ArchiveReader::close() { m_cur_stream_id = 0; m_stream_buffer.reset(); m_stream_buffer_size = 0ULL; + m_log_event_idx_column_id = -1; } std::shared_ptr ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) { diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 41073ec84..6b437dfd2 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -142,6 +142,11 @@ class ArchiveReader { m_projection = projection; } + /** + * @return true if this archive has log ordering information, and false otherwise. + */ + bool has_log_order() { return m_log_event_idx_column_id >= 0; } + private: /** * Initializes a schema reader passed by reference to become a reader for a given schema. @@ -214,6 +219,7 @@ class ArchiveReader { std::shared_ptr m_stream_buffer{}; size_t m_stream_buffer_size{0ULL}; size_t m_cur_stream_id{0ULL}; + int32_t m_log_event_idx_column_id{-1}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index 369fd79d2..7118ce88b 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -68,6 +68,7 @@ void ArchiveWriter::close() { m_encoded_message_size = 0UL; m_uncompressed_size = 0UL; m_compressed_size = 0UL; + m_next_log_event_id = 0; } void ArchiveWriter::append_message( @@ -86,6 +87,7 @@ void ArchiveWriter::append_message( } m_encoded_message_size += schema_writer->append_message(message); + ++m_next_log_event_id; } size_t ArchiveWriter::get_data_size() { diff --git a/components/core/src/clp_s/ArchiveWriter.hpp b/components/core/src/clp_s/ArchiveWriter.hpp index 7edfe4491..87e9d11e5 100644 --- a/components/core/src/clp_s/ArchiveWriter.hpp +++ b/components/core/src/clp_s/ArchiveWriter.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_ARCHIVEWRITER_HPP #define CLP_S_ARCHIVEWRITER_HPP +#include #include #include @@ -93,10 +94,15 @@ class ArchiveWriter { * @param key * @return the node id */ - int32_t add_node(int parent_node_id, NodeType type, std::string const& key) { + int32_t add_node(int parent_node_id, NodeType type, std::string_view const key) { return m_schema_tree.add_node(parent_node_id, type, key); } + /** + * @return The Id that will be assigned to the next log event when appended to the archive. + */ + int64_t get_next_log_event_id() const { return m_next_log_event_id; } + /** * Return a schema's Id and add the schema to the * schema map if it does not already exist. @@ -174,6 +180,7 @@ class ArchiveWriter { size_t m_encoded_message_size{}; size_t m_uncompressed_size{}; size_t m_compressed_size{}; + int64_t m_next_log_event_id{}; std::string m_id; diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index cf69a066c..ace505788 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -194,6 +194,10 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "structurize-arrays", po::bool_switch(&m_structurize_arrays), "Structurize arrays instead of compressing them as clp strings." + )( + "disable-log-order", + po::bool_switch(&m_disable_log_order), + "Do not record log order at ingestion time." ); // clang-format on @@ -296,13 +300,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { decompression_options.add_options()( "ordered", po::bool_switch(&m_ordered_decompression), - "Enable decompression in ascending timestamp order for this archive" + "Enable decompression in log order for this archive" )( "ordered-chunk-size", po::value(&m_ordered_chunk_size) ->default_value(m_ordered_chunk_size), "Number of records to include in each output file when decompressing records " - "in ascending timestamp order" + "in log order" ); // clang-format on extraction_options.add(decompression_options); diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 798e42728..8f2d79d8f 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -112,6 +112,8 @@ class CommandLineArguments { std::vector const& get_projection_columns() const { return m_projection_columns; } + bool get_record_log_order() const { return false == m_disable_log_order; } + private: // Methods /** @@ -178,6 +180,7 @@ class CommandLineArguments { bool m_ordered_decompression{false}; size_t m_ordered_chunk_size{0}; size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB + bool m_disable_log_order{false}; // Metadata db variables std::optional m_metadata_db_config; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 90887f1e5..0c816c5e3 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -48,7 +48,13 @@ void JsonConstructor::store() { m_archive_reader = std::make_unique(); m_archive_reader->open(m_option.archives_dir, m_option.archive_id); m_archive_reader->read_dictionaries_and_metadata(); - if (false == m_option.ordered) { + + if (m_option.ordered && false == m_archive_reader->has_log_order()) { + SPDLOG_WARN("This archive is missing ordering information and can not be decompressed in " + "log order. Falling back to out of order decompression."); + } + + if (false == m_option.ordered || false == m_archive_reader->has_log_order()) { FileWriter writer; writer.open( m_option.output_dir + "/original", @@ -68,15 +74,15 @@ void JsonConstructor::construct_in_order() { auto tables = m_archive_reader->read_all_tables(); using ReaderPointer = std::shared_ptr; auto cmp = [](ReaderPointer& left, ReaderPointer& right) { - return left->get_next_timestamp() > right->get_next_timestamp(); + return left->get_next_log_event_idx() > right->get_next_log_event_idx(); }; std::priority_queue record_queue(tables.begin(), tables.end(), cmp); // Clear tables vector so that memory gets deallocated after we have marshalled all records for // a given table tables.clear(); - epochtime_t first_timestamp{0}; - epochtime_t last_timestamp{0}; + int64_t first_idx{0}; + int64_t last_idx{0}; size_t num_records_marshalled{0}; auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; FileWriter writer; @@ -97,9 +103,11 @@ void JsonConstructor::construct_in_order() { std::vector results; auto finalize_chunk = [&](bool open_new_writer) { + // Add one to last_idx to match clp's behaviour of having the end index be exclusive + ++last_idx; writer.close(); - std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_" - + std::to_string(last_timestamp) + ".jsonl"; + std::string new_file_name = src_path.string() + "_" + std::to_string(first_idx) + "_" + + std::to_string(last_idx) + ".jsonl"; auto new_file_path = std::filesystem::path(new_file_name); std::error_code ec; std::filesystem::rename(src_path, new_file_path, ec); @@ -119,11 +127,11 @@ void JsonConstructor::construct_in_order() { ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cBeginMsgIx, - static_cast(first_timestamp) + first_idx ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cEndMsgIx, - static_cast(last_timestamp) + last_idx ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cIsLastIrChunk, @@ -140,9 +148,9 @@ void JsonConstructor::construct_in_order() { while (false == record_queue.empty()) { ReaderPointer next = record_queue.top(); record_queue.pop(); - last_timestamp = next->get_next_timestamp(); + last_idx = next->get_next_log_event_idx(); if (0 == num_records_marshalled) { - first_timestamp = last_timestamp; + first_idx = last_idx; } next->get_next_message(buffer); if (false == next->done()) { diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index f1f71f9d8..c38e6d00b 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -66,7 +66,7 @@ class JsonConstructor { private: /** * Reads all of the tables from m_archive_reader and writes all of the records - * they contain to writer in timestamp order. + * they contain to writer in log order. */ void construct_in_order(); diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index 5336c367a..9e8293510 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -15,7 +15,8 @@ JsonParser::JsonParser(JsonParserOption const& option) m_target_encoded_size(option.target_encoded_size), m_max_document_size(option.max_document_size), m_timestamp_key(option.timestamp_key), - m_structurize_arrays(option.structurize_arrays) { + m_structurize_arrays(option.structurize_arrays), + m_record_log_order(option.record_log_order) { if (false == FileUtils::validate_path(option.file_paths)) { exit(1); } @@ -447,6 +448,16 @@ bool JsonParser::parse() { m_num_messages = 0; size_t bytes_consumed_up_to_prev_archive = 0; size_t bytes_consumed_up_to_prev_record = 0; + + int32_t log_event_idx_node_id{}; + auto add_log_event_idx_node = [&]() { + if (m_record_log_order) { + log_event_idx_node_id + = add_metadata_field(constants::cLogEventIdxName, NodeType::Integer); + } + }; + add_log_event_idx_node(); + while (json_file_iterator.get_json(json_it)) { m_current_schema.clear(); @@ -467,11 +478,20 @@ bool JsonParser::parse() { return false; } + // Add log_event_idx field to metadata for record + if (m_record_log_order) { + m_current_parsed_message.add_value( + log_event_idx_node_id, + m_archive_writer->get_next_log_event_id() + ); + m_current_schema.insert_ordered(log_event_idx_node_id); + } + // Some errors from simdjson are latent until trying to access invalid JSON fields. // Instead of checking for an error every time we access a JSON field in parse_line we // just catch simdjson_error here instead. try { - parse_line(ref.value(), -1, ""); + parse_line(ref.value(), constants::cRootNodeId, constants::cRootNodeName); } catch (simdjson::simdjson_error& error) { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing {} bytes", @@ -496,6 +516,7 @@ bool JsonParser::parse() { ); bytes_consumed_up_to_prev_archive = bytes_consumed_up_to_prev_record; split_archive(); + add_log_event_idx_node(); } m_current_parsed_message.clear(); @@ -526,6 +547,15 @@ bool JsonParser::parse() { return true; } +int32_t JsonParser::add_metadata_field(std::string_view const field_name, NodeType type) { + auto metadata_subtree_id = m_archive_writer->add_node( + constants::cRootNodeId, + NodeType::Metadata, + constants::cMetadataSubtreeName + ); + return m_archive_writer->add_node(metadata_subtree_id, type, field_name); +} + void JsonParser::store() { m_archive_writer->close(); } diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index af6b024ef..d7cc5a2fe 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -30,12 +31,13 @@ struct JsonParserOption { std::vector file_paths; std::string timestamp_key; std::string archives_dir; - size_t target_encoded_size; - size_t max_document_size; - size_t min_table_size; - int compression_level; - bool print_archive_stats; - bool structurize_arrays; + size_t target_encoded_size{}; + size_t max_document_size{}; + size_t min_table_size{}; + int compression_level{}; + bool print_archive_stats{}; + bool structurize_arrays{}; + bool record_log_order{true}; std::shared_ptr metadata_db; }; @@ -94,6 +96,14 @@ class JsonParser { */ void split_archive(); + /** + * Adds an internal field to the MPT and get its Id. + * + * Note: this method should be called before parsing a record so that internal fields come first + * in each table. This isn't strictly necessary, but it is a nice convention. + */ + int32_t add_metadata_field(std::string_view const field_name, NodeType type); + int m_num_messages; std::vector m_file_paths; @@ -109,6 +119,7 @@ class JsonParser { size_t m_target_encoded_size; size_t m_max_document_size; bool m_structurize_arrays{false}; + bool m_record_log_order{true}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/JsonSerializer.hpp b/components/core/src/clp_s/JsonSerializer.hpp index ff46dfa24..63f845525 100644 --- a/components/core/src/clp_s/JsonSerializer.hpp +++ b/components/core/src/clp_s/JsonSerializer.hpp @@ -2,6 +2,7 @@ #define CLP_S_JSONSERIALIZER_HPP #include +#include #include #include "ColumnReader.hpp" @@ -66,7 +67,7 @@ class JsonSerializer { return false; } - void add_special_key(std::string const& key) { m_special_keys.push_back(key); } + void add_special_key(std::string_view const key) { m_special_keys.emplace_back(key); } void begin_object() { append_key(); @@ -110,13 +111,13 @@ class JsonSerializer { void append_key() { append_key(m_special_keys[m_special_keys_index++]); } - void append_key(std::string const& key) { + void append_key(std::string_view const key) { m_json_string += "\""; m_json_string += key; m_json_string += "\":"; } - void append_value(std::string const& value) { + void append_value(std::string_view const value) { m_json_string += value; m_json_string += ","; } diff --git a/components/core/src/clp_s/ReaderUtils.cpp b/components/core/src/clp_s/ReaderUtils.cpp index 11583a60d..a2ab5a34a 100644 --- a/components/core/src/clp_s/ReaderUtils.cpp +++ b/components/core/src/clp_s/ReaderUtils.cpp @@ -18,10 +18,10 @@ std::shared_ptr ReaderUtils::read_schema_tree(std::string const& arc throw OperationFailed(error_code, __FILENAME__, __LINE__); } + std::string key; for (size_t i = 0; i < num_nodes; i++) { int32_t parent_id; size_t key_length; - std::string key; uint8_t node_type; error_code = schema_tree_decompressor.try_read_numeric_value(parent_id); diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 35c1d0a6e..8120ab323 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -37,6 +37,13 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { } } +int64_t SchemaReader::get_next_log_event_idx() const { + if (nullptr != m_log_event_idx_column) { + return std::get(m_log_event_idx_column->extract_value(m_cur_message)); + } + return 0; +} + void SchemaReader::load( std::shared_ptr stream_buffer, size_t offset, @@ -86,7 +93,7 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddIntField: { column = m_reordered_columns[column_id_index++]; - auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( std::to_string(std::get(column->extract_value(m_cur_message))) @@ -102,7 +109,7 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddFloatField: { column = m_reordered_columns[column_id_index++]; - auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( std::to_string(std::get(column->extract_value(m_cur_message))) @@ -118,7 +125,7 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddBoolField: { column = m_reordered_columns[column_id_index++]; - auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( std::get(column->extract_value(m_cur_message)) != 0 ? "true" @@ -136,7 +143,7 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddStringField: { column = m_reordered_columns[column_id_index++]; - auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message); break; @@ -215,9 +222,10 @@ bool SchemaReader::get_next_message(std::string& message, FilterClass* filter) { return false; } -bool SchemaReader::get_next_message_with_timestamp( +bool SchemaReader::get_next_message_with_metadata( std::string& message, epochtime_t& timestamp, + int64_t& log_event_idx, FilterClass* filter ) { // TODO: If we already get max_num_results messages, we can skip messages @@ -241,6 +249,7 @@ bool SchemaReader::get_next_message_with_timestamp( } timestamp = m_get_timestamp(); + log_event_idx = get_next_log_event_idx(); m_cur_message++; return true; @@ -561,7 +570,7 @@ void SchemaReader::initialize_serializer() { // TODO: this code will have to change once we allow mixing log lines parsed by different // parsers. if (false == m_local_schema_tree.get_nodes().empty()) { - generate_json_template(m_local_schema_tree.get_root_node_id()); + generate_json_template(m_local_schema_tree.get_object_subtree_node_id()); } } @@ -572,7 +581,7 @@ void SchemaReader::generate_json_template(int32_t id) { for (int32_t child_id : children_ids) { int32_t child_global_id = m_local_id_to_global_id[child_id]; auto const& child_node = m_local_schema_tree.get_node(child_id); - std::string const& key = child_node.get_key_name(); + auto key = child_node.get_key_name(); switch (child_node.get_type()) { case NodeType::Object: { m_json_serializer.add_op(JsonSerializer::Op::BeginObject); diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 08651cc39..75f9ff117 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -99,6 +99,7 @@ class SchemaReader { m_reordered_columns.clear(); m_timestamp_column = nullptr; m_get_timestamp = []() -> epochtime_t { return 0; }; + m_log_event_idx_column = nullptr; m_local_id_to_global_id.clear(); m_global_id_to_local_id.clear(); m_global_id_to_unordered_object.clear(); @@ -159,15 +160,17 @@ class SchemaReader { bool get_next_message(std::string& message, FilterClass* filter); /** - * Gets the next message matching a filter, and its timestamp + * Gets the next message matching a filter as well as its timestamp and log event index. * @param message * @param timestamp + * @param log_event_idx * @param filter * @return true if there is a next message */ - bool get_next_message_with_timestamp( + bool get_next_message_with_metadata( std::string& message, epochtime_t& timestamp, + int64_t& log_event_idx, FilterClass* filter ); @@ -183,6 +186,13 @@ class SchemaReader { */ void mark_column_as_timestamp(BaseColumnReader* column_reader); + /** + * Marks a column as the log_event_idx column. + */ + void mark_column_as_log_event_idx(Int64ColumnReader* column_reader) { + m_log_event_idx_column = column_reader; + } + int32_t get_schema_id() const { return m_schema_id; } /** @@ -197,6 +207,12 @@ class SchemaReader { */ epochtime_t get_next_timestamp() const { return m_get_timestamp(); } + /** + * @return the log_event_idx in the row pointed to by m_cur_message or 0 if there is no + * log_event_idx in this table. + */ + int64_t get_next_log_event_idx() const; + /** * @return true if all records in this table have been iterated over, false otherwise */ @@ -288,6 +304,7 @@ class SchemaReader { BaseColumnReader* m_timestamp_column; std::function m_get_timestamp; + Int64ColumnReader* m_log_event_idx_column{nullptr}; std::shared_ptr m_global_schema_tree; SchemaTree m_local_schema_tree; diff --git a/components/core/src/clp_s/SchemaTree.cpp b/components/core/src/clp_s/SchemaTree.cpp index e56168a2c..4408efc01 100644 --- a/components/core/src/clp_s/SchemaTree.cpp +++ b/components/core/src/clp_s/SchemaTree.cpp @@ -5,9 +5,8 @@ #include "ZstdCompressor.hpp" namespace clp_s { -int32_t SchemaTree::add_node(int32_t parent_node_id, NodeType type, std::string const& key) { - std::tuple node_key = {parent_node_id, key, type}; - auto node_it = m_node_map.find(node_key); +int32_t SchemaTree::add_node(int32_t parent_node_id, NodeType type, std::string_view const key) { + auto node_it = m_node_map.find({parent_node_id, key, type}); if (node_it != m_node_map.end()) { auto node_id = node_it->second; m_nodes[node_id].increase_count(); @@ -15,18 +14,42 @@ int32_t SchemaTree::add_node(int32_t parent_node_id, NodeType type, std::string } int32_t node_id = m_nodes.size(); - auto& node = m_nodes.emplace_back(parent_node_id, node_id, key, type, 0); + auto& node = m_nodes.emplace_back(parent_node_id, node_id, std::string{key}, type, 0); node.increase_count(); - if (parent_node_id >= 0) { + if (constants::cRootNodeId == parent_node_id) { + if (NodeType::Object == type) { + m_object_subtree_id = node_id; + } else if (NodeType::Metadata == type) { + m_metadata_subtree_id = node_id; + } + } + + if (constants::cRootNodeId != parent_node_id) { auto& parent_node = m_nodes[parent_node_id]; node.set_depth(parent_node.get_depth() + 1); parent_node.add_child(node_id); } - m_node_map[node_key] = node_id; + m_node_map.emplace(std::make_tuple(parent_node_id, node.get_key_name(), type), node_id); return node_id; } +int32_t SchemaTree::get_metadata_field_id(std::string_view const field_name) { + if (m_metadata_subtree_id < 0) { + return -1; + } + + auto& metadata_subtree_node = m_nodes[m_metadata_subtree_id]; + for (auto child_id : metadata_subtree_node.get_children_ids()) { + auto& child_node = m_nodes[child_id]; + if (child_node.get_key_name() == field_name) { + return child_id; + } + } + + return -1; +} + size_t SchemaTree::store(std::string const& archives_dir, int compression_level) { FileWriter schema_tree_writer; ZstdCompressor schema_tree_compressor; @@ -41,9 +64,9 @@ size_t SchemaTree::store(std::string const& archives_dir, int compression_level) for (auto const& node : m_nodes) { schema_tree_compressor.write_numeric_value(node.get_parent_id()); - std::string const& key = node.get_key_name(); + auto key = node.get_key_name(); schema_tree_compressor.write_numeric_value(key.size()); - schema_tree_compressor.write_string(key); + schema_tree_compressor.write(key.begin(), key.size()); schema_tree_compressor.write_numeric_value(node.get_type()); } diff --git a/components/core/src/clp_s/SchemaTree.hpp b/components/core/src/clp_s/SchemaTree.hpp index 05ed52935..438714fa2 100644 --- a/components/core/src/clp_s/SchemaTree.hpp +++ b/components/core/src/clp_s/SchemaTree.hpp @@ -5,11 +5,25 @@ #include #include #include +#include #include #include namespace clp_s { +/** + * This enum defines the valid MPT node types as well as the 8-bit number used to encode them. + * + * The number used to represent each node type can not change. That means that elements in this + * enum can never be reordered and that new node types always need to be added to the end of the + * enum (but before Unknown). + * + * Node types are used to help record the structure of a log record, with the exception of the + * "Metadata" node type. The "Metadata" type is a special type used by the implementation to + * demarcate data needed by the implementation that is not part of the log record. In particular, + * the implementation may create a special subtree of the MPT which contains fields used to record + * things like original log order. + */ enum class NodeType : uint8_t { Integer, Float, @@ -21,21 +35,37 @@ enum class NodeType : uint8_t { NullValue, DateString, StructuredArray, + Metadata, Unknown = std::underlying_type::type(~0ULL) }; +/** + * This class represents a single node in the SchemaTree. + * + * Note: the result of get_key_name is valid even if the original SchemaNode is later + * move-constructed. + */ class SchemaNode { public: // Constructor SchemaNode() : m_parent_id(-1), m_id(-1), m_type(NodeType::Integer), m_count(0) {} - SchemaNode(int32_t parent_id, int32_t id, std::string key_name, NodeType type, int32_t depth) + SchemaNode( + int32_t parent_id, + int32_t id, + std::string_view const key_name, + NodeType type, + int32_t depth + ) : m_parent_id(parent_id), m_id(id), - m_key_name(std::move(key_name)), + m_key_name_buf(std::make_unique(key_name.size())), + m_key_name(m_key_name_buf.get(), key_name.size()), m_type(type), m_count(0), - m_depth(depth) {} + m_depth(depth) { + memcpy(m_key_name_buf.get(), key_name.begin(), key_name.size()); + } /** * Getters @@ -48,7 +78,7 @@ class SchemaNode { NodeType get_type() const { return m_type; } - std::string const& get_key_name() const { return m_key_name; } + std::string_view const get_key_name() const { return m_key_name; } int32_t get_count() const { return m_count; } @@ -71,7 +101,10 @@ class SchemaNode { int32_t m_id; int32_t m_parent_id; std::vector m_children_ids; - std::string m_key_name; + // We use a buffer so that references to this key name are stable after this SchemaNode is move + // constructed + std::unique_ptr m_key_name_buf; + std::string_view m_key_name; NodeType m_type; int32_t m_count; int32_t m_depth{0}; @@ -81,7 +114,7 @@ class SchemaTree { public: SchemaTree() = default; - int32_t add_node(int parent_node_id, NodeType type, std::string const& key); + int32_t add_node(int parent_node_id, NodeType type, std::string_view const key); bool has_node(int32_t id) { return id < m_nodes.size() && id >= 0; } @@ -93,7 +126,25 @@ class SchemaTree { return m_nodes[id]; } - int32_t get_root_node_id() const { return m_nodes[0].get_id(); } + /** + * @return the Id of the root of the Object sub-tree that records the structure of JSON data. + * @return -1 if the Object sub-tree does not exist. + */ + int32_t get_object_subtree_node_id() const { return m_object_subtree_id; } + + /** + * Get the field Id for a specified field within the Metadata subtree. + * @param field_name + * + * @return the field Id if the field exists within the Metadata sub-tree, -1 otherwise. + */ + int32_t get_metadata_field_id(std::string_view const field_name); + + /** + * @return the Id of the root of the Metadata sub-tree. + * @return -1 if the Metadata sub-tree does not exist. + */ + int32_t get_metadata_subtree_node_id() { return m_metadata_subtree_id; } std::vector const& get_nodes() const { return m_nodes; } @@ -129,7 +180,9 @@ class SchemaTree { private: std::vector m_nodes; - absl::flat_hash_map, int32_t> m_node_map; + absl::flat_hash_map, int32_t> m_node_map; + int32_t m_object_subtree_id{-1}; + int32_t m_metadata_subtree_id{-1}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/archive_constants.hpp b/components/core/src/clp_s/archive_constants.hpp index 30e2b78d5..604c97f66 100644 --- a/components/core/src/clp_s/archive_constants.hpp +++ b/components/core/src/clp_s/archive_constants.hpp @@ -1,6 +1,8 @@ #ifndef CLP_S_ARCHIVE_CONSTANTS_HPP #define CLP_S_ARCHIVE_CONSTANTS_HPP +#include + namespace clp_s::constants { // Schema files constexpr char cArchiveSchemaMapFile[] = "/schema_ids"; @@ -16,6 +18,12 @@ constexpr char cArchiveLogDictFile[] = "/log.dict"; constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict"; constexpr char cArchiveVarDictFile[] = "/var.dict"; +// Schema tree constants +constexpr char cRootNodeName[] = ""; +constexpr int32_t cRootNodeId = -1; +constexpr char cMetadataSubtreeName[] = ""; +constexpr char cLogEventIdxName[] = "log_event_idx"; + namespace results_cache::decompression { constexpr char cPath[]{"path"}; constexpr char cOrigFileId[]{"orig_file_id"}; @@ -24,5 +32,13 @@ constexpr char cEndMsgIx[]{"end_msg_ix"}; constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"}; } // namespace results_cache::decompression +namespace results_cache::search { +constexpr char cOrigFilePath[]{"orig_file_path"}; +constexpr char cLogEventIx[]{"log_event_ix"}; +constexpr char cTimestamp[]{"timestamp"}; +constexpr char cMessage[]{"message"}; +constexpr char cArchiveId[]{"archive_id"}; +} // namespace results_cache::search + } // namespace clp_s::constants #endif // CLP_S_ARCHIVE_CONSTANTS_HPP diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 8752384ae..941fd4366 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -96,6 +96,7 @@ bool compress(CommandLineArguments const& command_line_arguments) { option.timestamp_key = command_line_arguments.get_timestamp_key(); option.print_archive_stats = command_line_arguments.print_archive_stats(); option.structurize_arrays = command_line_arguments.get_structurize_arrays(); + option.record_log_order = command_line_arguments.get_record_log_order(); auto const& db_config_container = command_line_arguments.get_metadata_db_config(); if (db_config_container.has_value()) { diff --git a/components/core/src/clp_s/search/ColumnDescriptor.hpp b/components/core/src/clp_s/search/ColumnDescriptor.hpp index 7341ff07a..ea1cfd7ec 100644 --- a/components/core/src/clp_s/search/ColumnDescriptor.hpp +++ b/components/core/src/clp_s/search/ColumnDescriptor.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "Literal.hpp" @@ -22,7 +23,7 @@ class DescriptorToken { * wildcards * @param token the string to initialize the token from */ - explicit DescriptorToken(std::string const& token) + explicit DescriptorToken(std::string_view const token) : m_token(token), m_wildcard(false), m_regex(false) { diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index 4d36b4e29..c9954779b 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -62,6 +62,7 @@ bool Output::filter() { } } + populate_internal_columns(); populate_string_queries(top_level_expr); std::string message; @@ -92,9 +93,10 @@ bool Output::filter() { reader.initialize_filter(this); if (m_output_handler->should_output_metadata()) { - epochtime_t timestamp; - while (reader.get_next_message_with_timestamp(message, timestamp, this)) { - m_output_handler->write(message, timestamp, archive_id); + epochtime_t timestamp{}; + int64_t log_event_idx{}; + while (reader.get_next_message_with_metadata(message, timestamp, log_event_idx, this)) { + m_output_handler->write(message, timestamp, archive_id, log_event_idx); } } else { while (reader.get_next_message(message, this)) { @@ -136,6 +138,10 @@ void Output::init( for (auto column_reader : column_readers) { auto column_id = column_reader->get_id(); + if (0 != m_metadata_columns.count(column_id)) { + continue; + } + if ((0 != (m_wildcard_type_mask & node_to_literal_type(m_schema_tree->get_node(column_id).get_type()))) @@ -959,6 +965,19 @@ void Output::populate_string_queries(std::shared_ptr const& expr) { } } +void Output::populate_internal_columns() { + int32_t metadata_subtree_root_node_id = m_schema_tree->get_metadata_subtree_node_id(); + if (-1 == metadata_subtree_root_node_id) { + return; + } + + // This code assumes that the metadata subtree contains no nested structures + auto& metadata_node = m_schema_tree->get_node(metadata_subtree_root_node_id); + for (auto child_id : metadata_node.get_children_ids()) { + m_metadata_columns.insert(child_id); + } +} + void Output::populate_searched_wildcard_columns(std::shared_ptr const& expr) { if (expr->has_only_expression_operands()) { for (auto const& op : expr->get_op_list()) { @@ -975,6 +994,9 @@ void Output::populate_searched_wildcard_columns(std::shared_ptr cons if (Schema::schema_entry_is_unordered_object(node)) { continue; } + if (0 != m_metadata_columns.count(node)) { + continue; + } auto tree_node_type = m_schema_tree->get_node(node).get_type(); if (col->matches_type(node_to_literal_type(tree_node_type))) { auto literal_type = node_to_literal_type(tree_node_type); diff --git a/components/core/src/clp_s/search/Output.hpp b/components/core/src/clp_s/search/Output.hpp index ba4c4e1d7..bfd364457 100644 --- a/components/core/src/clp_s/search/Output.hpp +++ b/components/core/src/clp_s/search/Output.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,7 @@ class Output : public FilterClass { std::vector m_wildcard_columns; std::map> m_wildcard_to_searched_basic_columns; LiteralTypeBitmask m_wildcard_type_mask{0}; + std::unordered_set m_metadata_columns; std::stack< std::pair, @@ -342,6 +344,11 @@ class Output : public FilterClass { */ void populate_string_queries(std::shared_ptr const& expr); + /** + * Populates the set of internal columns that get ignored during dynamic wildcard expansion. + */ + void populate_internal_columns(); + /** * Constant propagates an expression * @param expr diff --git a/components/core/src/clp_s/search/OutputHandler.cpp b/components/core/src/clp_s/search/OutputHandler.cpp index 8d6a1eaa5..13771e9b5 100644 --- a/components/core/src/clp_s/search/OutputHandler.cpp +++ b/components/core/src/clp_s/search/OutputHandler.cpp @@ -10,6 +10,7 @@ #include "../../reducer/CountOperator.hpp" #include "../../reducer/network_utils.hpp" #include "../../reducer/Record.hpp" +#include "../archive_constants.hpp" using std::string; using std::string_view; @@ -31,11 +32,12 @@ NetworkOutputHandler::NetworkOutputHandler( void NetworkOutputHandler::write( string_view message, epochtime_t timestamp, - string_view archive_id + string_view archive_id, + int64_t log_event_idx ) { static constexpr string_view cOrigFilePathPlaceholder{""}; - msgpack::type::tuple const - src(timestamp, message, cOrigFilePathPlaceholder, archive_id); + msgpack::type::tuple const + src(timestamp, message, cOrigFilePathPlaceholder, archive_id, log_event_idx); msgpack::sbuffer m; msgpack::pack(m, src); @@ -72,10 +74,26 @@ ErrorCode ResultsCacheOutputHandler::flush() { try { m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document( - bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)), - bsoncxx::builder::basic::kvp("message", std::move(result.message)), - bsoncxx::builder::basic::kvp("timestamp", result.timestamp), - bsoncxx::builder::basic::kvp("archive_id", std::move(result.archive_id)) + bsoncxx::builder::basic::kvp( + constants::results_cache::search::cOrigFilePath, + std::move(result.original_path) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::search::cMessage, + std::move(result.message) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::search::cTimestamp, + result.timestamp + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::search::cArchiveId, + std::move(result.archive_id) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::search::cLogEventIx, + result.log_event_idx + ) ))); count++; @@ -103,17 +121,26 @@ ErrorCode ResultsCacheOutputHandler::flush() { void ResultsCacheOutputHandler::write( string_view message, epochtime_t timestamp, - string_view archive_id + string_view archive_id, + int64_t log_event_idx ) { if (m_latest_results.size() < m_max_num_results) { - m_latest_results.emplace( - std::make_unique(string_view{}, message, timestamp, archive_id) - ); + m_latest_results.emplace(std::make_unique( + string_view{}, + message, + timestamp, + archive_id, + log_event_idx + )); } else if (m_latest_results.top()->timestamp < timestamp) { m_latest_results.pop(); - m_latest_results.emplace( - std::make_unique(string_view{}, message, timestamp, archive_id) - ); + m_latest_results.emplace(std::make_unique( + string_view{}, + message, + timestamp, + archive_id, + log_event_idx + )); } } diff --git a/components/core/src/clp_s/search/OutputHandler.hpp b/components/core/src/clp_s/search/OutputHandler.hpp index ca12e32d3..f033d37e8 100644 --- a/components/core/src/clp_s/search/OutputHandler.hpp +++ b/components/core/src/clp_s/search/OutputHandler.hpp @@ -43,9 +43,14 @@ class OutputHandler { * @param message The message in the log event. * @param timestamp The timestamp of the log event. * @param archive_id The archive containing the log event. + * @param log_event_idx The index of the log event within an archive. */ - virtual void write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) - = 0; + virtual void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) = 0; /** * Writes a message to the output handler. @@ -84,9 +89,13 @@ class StandardOutputHandler : public OutputHandler { : OutputHandler(should_output_metadata, true) {} // Methods inherited from OutputHandler - void - write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override { - std::cout << archive_id << ": " << timestamp << " " << message; + void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) override { + std::cout << archive_id << ": " << log_event_idx << ": " << timestamp << " " << message; } void write(std::string_view message) override { std::cout << message; } @@ -120,10 +129,14 @@ class NetworkOutputHandler : public OutputHandler { } // Methods inherited from OutputHandler - void - write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override; + void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) override; - void write(std::string_view message) override { write(message, 0, {}); } + void write(std::string_view message) override { write(message, 0, {}, 0); } private: std::string m_host; @@ -143,17 +156,20 @@ class ResultsCacheOutputHandler : public OutputHandler { std::string_view original_path, std::string_view message, epochtime_t timestamp, - std::string_view archive_id + std::string_view archive_id, + int64_t log_event_idx ) : original_path(original_path), message(message), timestamp(timestamp), - archive_id(archive_id) {} + archive_id(archive_id), + log_event_idx(log_event_idx) {} std::string original_path; std::string message; epochtime_t timestamp; std::string archive_id; + int64_t log_event_idx; }; struct QueryResultGreaterTimestampComparator { @@ -189,10 +205,14 @@ class ResultsCacheOutputHandler : public OutputHandler { */ ErrorCode flush() override; - void - write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override; + void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) override; - void write(std::string_view message) override { write(message, 0, {}); } + void write(std::string_view message) override { write(message, 0, {}, 0); } private: mongocxx::client m_client; @@ -216,8 +236,12 @@ class CountOutputHandler : public OutputHandler { CountOutputHandler(int reducer_socket_fd); // Methods inherited from OutputHandler - void - write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override {} + void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) override {} void write(std::string_view message) override; @@ -246,8 +270,12 @@ class CountByTimeOutputHandler : public OutputHandler { m_count_by_time_bucket_size{count_by_time_bucket_size} {} // Methods inherited from OutputHandler - void - write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override { + void write( + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id, + int64_t log_event_idx + ) override { int64_t bucket = (timestamp / m_count_by_time_bucket_size) * m_count_by_time_bucket_size; m_bucket_counts[bucket] += 1; } diff --git a/components/core/src/clp_s/search/Projection.cpp b/components/core/src/clp_s/search/Projection.cpp index 69836e312..b1c453776 100644 --- a/components/core/src/clp_s/search/Projection.cpp +++ b/components/core/src/clp_s/search/Projection.cpp @@ -52,7 +52,7 @@ void Projection::resolve_column( * what we need. */ - auto cur_node_id = tree->get_root_node_id(); + auto cur_node_id = tree->get_object_subtree_node_id(); auto it = column->descriptor_begin(); while (it != column->descriptor_end()) { bool matched_any{false}; diff --git a/components/core/src/clp_s/search/SchemaMatch.cpp b/components/core/src/clp_s/search/SchemaMatch.cpp index 82c9af866..203634000 100644 --- a/components/core/src/clp_s/search/SchemaMatch.cpp +++ b/components/core/src/clp_s/search/SchemaMatch.cpp @@ -76,7 +76,7 @@ std::shared_ptr SchemaMatch::populate_column_mapping(std::shared_ptr auto const* node = &m_tree->get_node(node_id); auto literal_type = node_to_literal_type(node->get_type()); DescriptorList descriptors; - while (node->get_id() != m_tree->get_root_node_id()) { + while (node->get_id() != m_tree->get_object_subtree_node_id()) { // may have to explicitly mark non-regex descriptors.emplace_back(node->get_key_name()); node = &m_tree->get_node(node->get_parent_id()); @@ -111,9 +111,9 @@ bool SchemaMatch::populate_column_mapping(ColumnDescriptor* column) { return matched; } - // TODO: once we start supporting multi-rooted MPTs this (and anything that uses - // get_root_node_id, or assumes root node id is 0) will have to change - auto const& root = m_tree->get_node(m_tree->get_root_node_id()); + // TODO: Once we start supporting mixing different types of logs we will have to match against + // more than just the object subtree. + auto const& root = m_tree->get_node(m_tree->get_object_subtree_node_id()); for (int32_t child_node_id : root.get_children_ids()) { matched |= populate_column_mapping(column, child_node_id); } diff --git a/components/core/src/clp_s/search/SearchUtils.cpp b/components/core/src/clp_s/search/SearchUtils.cpp index 3f7c522cb..39ae694a0 100644 --- a/components/core/src/clp_s/search/SearchUtils.cpp +++ b/components/core/src/clp_s/search/SearchUtils.cpp @@ -34,6 +34,7 @@ LiteralType node_to_literal_type(NodeType type) { return LiteralType::NullT; case NodeType::DateString: return LiteralType::EpochDateT; + case NodeType::Metadata: case NodeType::Unknown: default: return LiteralType::UnknownT;