diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 4218d9d60..a2e6cb83c 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -106,11 +106,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { std::cerr << " c - compress" << std::endl; std::cerr << " x - decompress" << std::endl; std::cerr << " s - search" << std::endl; + std::cerr << " r - JSON to IR Format" << std::endl; std::cerr << std::endl; std::cerr << "Try " << " c --help OR" << " x --help OR" - << " s --help for command-specific details." << std::endl; + << " s --help OR" + << " r --help for command-specific details." << std::endl; po::options_description visible_options; visible_options.add(general_options); @@ -125,6 +127,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { case (char)Command::Compress: case (char)Command::Extract: case (char)Command::Search: + case (char)Command::JsonToIr: m_command = (Command)command_input; break; default: @@ -727,6 +730,100 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "The --count-by-time and --count options are mutually exclusive." ); } + } else if ((char)Command::JsonToIr == command_input) { + po::options_description compression_positional_options; + // clang-format off + compression_positional_options.add_options()( + "ir-dir", + po::value(&m_archives_dir)->value_name("DIR"), + "output directory" + )( + "input-paths", + po::value>(&m_file_paths)->value_name("PATHS"), + "input paths" + ); + // clang-format on + + po::options_description compression_options("Compression options"); + std::string input_path_list_file_path; + // clang-format off + compression_options.add_options()( + "compression-level", + po::value(&m_compression_level)->value_name("LEVEL")-> + default_value(m_compression_level), + "1 (fast/low compression) to 9 (slow/high compression)." + )( + "max-document-size", + po::value(&m_max_document_size)->value_name("DOC_SIZE")-> + default_value(m_max_document_size), + "Maximum allowed size (B) for a single document before ir generation fails." + )( + "max-ir-buffer-size", + po::value(&m_max_ir_buffer_size)->value_name("BUFFER_SIZE")-> + default_value(m_max_ir_buffer_size), + "Maximum allowed size (B) for an in memory IR buffer befroe being written to file." + )( + "encoding-type", + po::value(&m_encoding_type)->value_name("ENCODING_TYPE")-> + default_value(m_encoding_type), + "4 (four byte encoding) or 8 (eight byte encoding)" + )( + "files-from,f", + po::value(&input_path_list_file_path) + ->value_name("FILE") + ->default_value(input_path_list_file_path), + "Compress files specified in FILE" + ); + // clang-format on + + po::positional_options_description positional_options; + positional_options.add("ir-dir", 1); + positional_options.add("input-paths", -1); + + po::options_description all_compression_options; + all_compression_options.add(compression_options); + all_compression_options.add(compression_positional_options); + + std::vector unrecognized_options + = po::collect_unrecognized(parsed.options, po::include_positional); + unrecognized_options.erase(unrecognized_options.begin()); + po::store( + po::command_line_parser(unrecognized_options) + .options(all_compression_options) + .positional(positional_options) + .run(), + parsed_command_line_options + ); + po::notify(parsed_command_line_options); + + if (parsed_command_line_options.count("help")) { + print_json_to_ir_usage(); + + std::cerr << "Examples:\n"; + std::cerr << " # Parse file1.json and dir1 into irs-dir\n"; + std::cerr << " " << m_program_name << " r irs-dir file1.json dir1\n"; + + po::options_description visible_options; + visible_options.add(general_options); + visible_options.add(compression_options); + std::cerr << visible_options << '\n'; + return ParsingResult::InfoCommand; + } + + if (m_archives_dir.empty()) { + throw std::invalid_argument("No IRs directory specified."); + } + + if (false == input_path_list_file_path.empty()) { + if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) { + SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); + return ParsingResult::Failure; + } + } + + if (m_file_paths.empty()) { + throw std::invalid_argument("No input paths specified."); + } } } catch (std::exception& e) { SPDLOG_ERROR("{}", e.what()); @@ -834,6 +931,10 @@ void CommandLineArguments::print_decompression_usage() const { std::cerr << "Usage: " << m_program_name << " x [OPTIONS] ARCHIVES_DIR OUTPUT_DIR" << std::endl; } +void CommandLineArguments::print_json_to_ir_usage() const { + std::cerr << "Usage: " << m_program_name << " r [OPTIONS] IRS_DIR [FILE/DIR ...]\n"; +} + void CommandLineArguments::print_search_usage() const { std::cerr << "Usage: " << m_program_name << " s [OPTIONS] ARCHIVES_DIR KQL_QUERY" diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 47c244646..3e7cd8e65 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -26,7 +26,8 @@ class CommandLineArguments { enum class Command : char { Compress = 'c', Extract = 'x', - Search = 's' + Search = 's', + JsonToIr = 'r' }; enum class OutputHandlerType : uint8_t { @@ -65,6 +66,10 @@ class CommandLineArguments { size_t get_max_document_size() const { return m_max_document_size; } + [[nodiscard]] auto get_max_ir_buffer_size() const -> size_t { return m_max_ir_buffer_size; } + + [[nodiscard]] auto get_encoding_type() const -> int { return m_encoding_type; } + [[nodiscard]] bool print_archive_stats() const { return m_print_archive_stats; } std::string const& get_mongodb_uri() const { return m_mongodb_uri; } @@ -170,6 +175,8 @@ class CommandLineArguments { void print_decompression_usage() const; + void print_json_to_ir_usage() const; + void print_search_usage() const; // Variables @@ -192,6 +199,8 @@ class CommandLineArguments { size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB bool m_disable_log_order{false}; FileType m_file_type{FileType::Json}; + int m_encoding_type{8}; + size_t m_max_ir_buffer_size{512ULL * 1024 * 1024}; // Metadata db variables std::optional m_metadata_db_config; diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index a89c746c7..fb1b21994 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -52,6 +52,15 @@ struct JsonParserOption { std::shared_ptr metadata_db; }; +struct JsonToIrParserOption { + std::vector file_paths; + std::string irs_dir; + size_t max_document_size; + size_t max_ir_buffer_size; + int compression_level; + int encoding; +}; + class JsonParser { public: class OperationFailed : public TraceableException { diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0f7b5643a..270ed4eef 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -11,6 +12,8 @@ #include #include +#include "../clp/ffi/ir_stream/protocol_constants.hpp" +#include "../clp/ffi/ir_stream/Serializer.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" @@ -36,6 +39,7 @@ #include "Utils.hpp" using namespace clp_s::search; +using clp::ffi::ir_stream::Serializer; using clp_s::cArchiveFormatDevelopmentVersionFlag; using clp_s::cEpochTimeMax; using clp_s::cEpochTimeMin; @@ -50,6 +54,36 @@ namespace { */ bool compress(CommandLineArguments const& command_line_arguments); +template +auto flush_and_clear_serializer_buffer( + Serializer& serializer, + std::vector& byte_buf +) -> void; + +template +auto unpack_and_serialize_msgpack_bytes( + std::vector const& msgpack_bytes, + Serializer& serializer +) -> bool; + +/** + * Given user specified options and a file path to a JSON file calls the serailizer one each JSON + * entry to serialize into IR + * @param option + * @param path + * @return Whether serialization was successful + */ +template +auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path); + +/** + * Iterates over the input JSON files specified by the command line arguments to generate and IR + * file for each one. + * @param command_line_arguments + * @return Whether generation was successful + */ +auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool; + /** * Decompresses the archive specified by the given JsonConstructorOption. * @param json_constructor_option @@ -129,6 +163,162 @@ bool compress(CommandLineArguments const& command_line_arguments) { return true; } +template +auto flush_and_clear_serializer_buffer( + Serializer& serializer, + std::vector& byte_buf +) -> void { + auto const view{serializer.get_ir_buf_view()}; + byte_buf.insert(byte_buf.cend(), view.begin(), view.end()); + serializer.clear_ir_buf(); +} + +template +auto unpack_and_serialize_msgpack_bytes( + std::vector const& msgpack_bytes, + Serializer& serializer +) -> bool { + try { + auto const msgpack_obj_handle{msgpack::unpack( + clp::size_checked_pointer_cast(msgpack_bytes.data()), + msgpack_bytes.size() + )}; + auto const msgpack_obj{msgpack_obj_handle.get()}; + if (msgpack::type::MAP != msgpack_obj.type) { + return false; + } + return serializer.serialize_msgpack_map(msgpack_obj.via.map); + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to unpack msgpack bytes: {}", e.what()); + return false; + } +} + +template +auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path) { + auto result{Serializer::create()}; + if (result.has_error()) { + SPDLOG_ERROR("Failed to create Serializer"); + return false; + } + auto& serializer{result.value()}; + std::vector ir_buf; + flush_and_clear_serializer_buffer(serializer, ir_buf); + + std::ifstream in_file; + in_file.open(path, std::ifstream::in); + + std::filesystem::path input_path{path}; + std::string filename = input_path.filename().string(); + std::string out_path = option.irs_dir + "/" + filename + ".ir"; + + clp_s::FileWriter out_file; + out_file.open(out_path, clp_s::FileWriter::OpenMode::CreateForWriting); + clp_s::ZstdCompressor zc; + try { + zc.open(out_file, option.compression_level); + } catch (clp_s::ZstdCompressor::OperationFailed& error) { + SPDLOG_ERROR("Failed to open ZSTDcompressor - {}", error.what()); + in_file.close(); + out_file.close(); + return false; + } + + std::string line = ""; + size_t total_size = 0; + + if (in_file.is_open()) { + while (getline(in_file, line)) { + try { + auto j_obj = nlohmann::json::parse(line); + if (false + == unpack_and_serialize_msgpack_bytes( + nlohmann::json::to_msgpack(j_obj), + serializer + )) + { + SPDLOG_ERROR("Failed to serialize msgpack bytes for line: {}", line); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } + flush_and_clear_serializer_buffer(serializer, ir_buf); + if (ir_buf.size() >= option.max_ir_buffer_size) { + total_size = total_size + ir_buf.size(); + zc.write(reinterpret_cast(ir_buf.data()), ir_buf.size()); + zc.flush(); + ir_buf.clear(); + } + } catch (nlohmann::json::parse_error const& e) { + SPDLOG_ERROR("JSON parsing error: {}", e.what()); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } catch (std::exception const& e) { + SPDLOG_ERROR("Error during serialization: {}", e.what()); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } + } + total_size = total_size + ir_buf.size(); + ir_buf.push_back(clp::ffi::ir_stream::cProtocol::Eof); + zc.write(reinterpret_cast(ir_buf.data()), ir_buf.size()); + zc.flush(); + ir_buf.clear(); + in_file.close(); + zc.close(); + out_file.close(); + } + + return true; +} + +auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool { + auto irs_dir = std::filesystem::path(command_line_arguments.get_archives_dir()); + + // Create output directory in case it doesn't exist + try { + std::filesystem::create_directory(irs_dir.string()); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to create archives directory {} - {}", irs_dir.string(), e.what()); + return false; + } + clp_s::JsonToIrParserOption option{}; + option.file_paths = command_line_arguments.get_file_paths(); + option.irs_dir = irs_dir.string(); + option.max_document_size = command_line_arguments.get_max_document_size(); + option.max_ir_buffer_size = command_line_arguments.get_max_ir_buffer_size(); + option.compression_level = command_line_arguments.get_compression_level(); + option.encoding = command_line_arguments.get_encoding_type(); + + if (false == clp_s::FileUtils::validate_path(option.file_paths)) { + SPDLOG_ERROR("Invalid file path(s) provided"); + return false; + } + + std::vector all_file_paths; + for (auto& file_path : option.file_paths) { + clp_s::FileUtils::find_all_files(file_path, all_file_paths); + } + + for (auto& path : all_file_paths) { + bool success; + if (option.encoding == 4) { + success = run_serializer(option, path); + } else { + success = run_serializer(option, path); + } + if (false == success) { + return false; + } + } + return true; +} + void decompress_archive(clp_s::JsonConstructorOption const& json_constructor_option) { clp_s::JsonConstructor constructor(json_constructor_option); constructor.store(); @@ -298,6 +488,10 @@ int main(int argc, char const* argv[]) { if (false == compress(command_line_arguments)) { return 1; } + } else if (CommandLineArguments::Command::JsonToIr == command_line_arguments.get_command()) { + if (false == generate_ir(command_line_arguments)) { + return 1; + } } else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) { auto const& archives_dir = command_line_arguments.get_archives_dir(); if (false == std::filesystem::is_directory(archives_dir)) {