Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): Add the write path for single-file archives. #563

Merged
merged 21 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3d4a2e5
Add a write path of the single-file archive format
wraymo Oct 21, 2024
402aef3
fix lint errors
wraymo Oct 21, 2024
7792844
Fix several bugs and spec mismatches
gibber9809 Oct 29, 2024
45e6ab5
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Oct 29, 2024
a09ab6b
Clean up writing code for timestamp dictionary
gibber9809 Nov 5, 2024
53434a7
Improve error handling
gibber9809 Nov 5, 2024
3d51d22
Add clear method to TimestampDictionaryWriter
gibber9809 Nov 6, 2024
5b67e39
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 6, 2024
eaa0982
Fix bug where size of timestamp dictionary section was missing
gibber9809 Nov 7, 2024
5f774c3
Revert change accidentally pulled into this PR
gibber9809 Nov 7, 2024
b46d4c1
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 18, 2024
bec587b
Update components/core/src/clp_s/TimestampDictionaryWriter.hpp
gibber9809 Nov 24, 2024
816526e
Address review comments
gibber9809 Nov 24, 2024
250f5ba
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 25, 2024
d0167d4
Add back option deleted during merge
gibber9809 Nov 25, 2024
ef91747
Write single file archive directly in archives_dir instead of subdire…
gibber9809 Nov 25, 2024
62c43bd
Write timestamp dictionary to a buffered stream instead of directly t…
gibber9809 Nov 25, 2024
4c73e29
Remove duplicated utility code
gibber9809 Nov 26, 2024
1e2be7e
Add docstring for new utility
gibber9809 Nov 26, 2024
4c7a50f
Fix docstring
gibber9809 Nov 26, 2024
c698487
Fix build issues
gibber9809 Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <algorithm>
#include <filesystem>
#include <sstream>

#include <json/single_include/nlohmann/json.hpp>

Expand Down Expand Up @@ -106,7 +107,10 @@ size_t ArchiveWriter::write_timestamp_dict() {
ZstdCompressor timestamp_dict_compressor;
timestamp_dict_file_writer.open(timestamp_dict_path, FileWriter::OpenMode::CreateForWriting);
timestamp_dict_compressor.open(timestamp_dict_file_writer, m_compression_level);
m_timestamp_dict.write(timestamp_dict_compressor);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
timestamp_dict_compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
timestamp_dict_compressor.close();
auto compressed_size = timestamp_dict_file_writer.get_pos();
timestamp_dict_file_writer.close();
Expand Down Expand Up @@ -161,8 +165,11 @@ void ArchiveWriter::write_archive_metadata(

// Write timestamp dictionary
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary);
compressor.write_numeric_value(static_cast<uint32_t>(m_timestamp_dict.size_in_bytes()));
m_timestamp_dict.write(compressor);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
compressor.write_numeric_value(static_cast<uint32_t>(encoded_timestamp_dict.size()));
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());

compressor.close();
}
Expand Down
41 changes: 18 additions & 23 deletions components/core/src/clp_s/TimestampDictionaryWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
#include "TimestampDictionaryWriter.hpp"

#include <sstream>

#include "Utils.hpp"

namespace {
template <typename T>
void write_numeric_value(std::stringstream& stream, T value) {
stream.write(reinterpret_cast<char*>(&value), sizeof(value));
}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
} // namespace

namespace clp_s {
void TimestampDictionaryWriter::write_timestamp_entries(
std::map<std::string, TimestampEntry> const& ranges,
ZstdCompressor& compressor
std::stringstream& stream
) {
compressor.write_numeric_value<uint64_t>(ranges.size());
write_numeric_value<uint64_t>(stream, ranges.size());

for (auto const& range : ranges) {
range.second.write_to_file(compressor);
range.second.write_to_stream(stream);
}
}

void TimestampDictionaryWriter::write(ZstdCompressor& compressor) {
void TimestampDictionaryWriter::write(std::stringstream& stream) {
merge_range();
write_timestamp_entries(m_column_key_to_range, compressor);
write_timestamp_entries(m_column_key_to_range, stream);

compressor.write_numeric_value<uint64_t>(m_pattern_to_id.size());
write_numeric_value<uint64_t>(stream, m_pattern_to_id.size());
for (auto& it : m_pattern_to_id) {
// write pattern ID
compressor.write_numeric_value<uint64_t>(it.second);
write_numeric_value<uint64_t>(stream, it.second);

std::string const& pattern = it.first->get_format();
compressor.write_numeric_value<uint64_t>(pattern.length());
compressor.write_string(pattern);
write_numeric_value<uint64_t>(stream, pattern.length());
stream.write(pattern.data(), pattern.size());
}
}

Expand Down Expand Up @@ -156,18 +165,4 @@ void TimestampDictionaryWriter::clear() {
m_column_key_to_range.clear();
m_column_id_to_range.clear();
}

size_t TimestampDictionaryWriter::size_in_bytes() {
merge_range();
size_t size{2 * sizeof(uint64_t)};
for (auto const& range : m_column_key_to_range) {
size += range.second.size_in_bytes();
}

for (auto& pattern : m_pattern_to_id) {
size += 2 * sizeof(uint64_t);
size += pattern.first->get_format().size();
}
return size;
}
} // namespace clp_s
17 changes: 6 additions & 11 deletions components/core/src/clp_s/TimestampDictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
#define CLP_S_TIMESTAMPDICTIONARYWRITER_HPP

#include <map>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>

#include "SchemaTree.hpp"
#include "TimestampEntry.hpp"
#include "TimestampPattern.hpp"
#include "ZstdCompressor.hpp"

namespace clp_s {
class TimestampDictionaryWriter {
Expand All @@ -26,10 +26,10 @@ class TimestampDictionaryWriter {
TimestampDictionaryWriter() {}

/**
* Writes the timestamp dictionary to a compression stream.
* @param compressor
* Writes the timestamp dictionary to a buffered stream.
* @param stream
*/
void write(ZstdCompressor& compressor);
void write(std::stringstream& stream);

/**
* Gets the pattern id for a given pattern
Expand Down Expand Up @@ -84,25 +84,20 @@ class TimestampDictionaryWriter {
*/
void clear();

/**
* Merge ranges by key name then return the size of data to be compressed in bytes
*/
size_t size_in_bytes();

private:
/**
* Merges timestamp ranges with the same key name but different node ids.
*/
void merge_range();

/**
* Writes timestamp entries to a compression stream.
* Writes timestamp entries to a buffered stream.
* @param ranges
* @param compressor
*/
static void write_timestamp_entries(
std::map<std::string, TimestampEntry> const& ranges,
ZstdCompressor& compressor
std::stringstream& stream
);

using pattern_to_id_t = std::unordered_map<TimestampPattern const*, uint64_t>;
Expand Down
28 changes: 18 additions & 10 deletions components/core/src/clp_s/TimestampEntry.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
#include "TimestampEntry.hpp"

#include <cmath>
#include <sstream>

namespace {
template <typename T>
void write_numeric_value(std::stringstream& stream, T value) {
stream.write(reinterpret_cast<char*>(&value), sizeof(value));
}
} // namespace
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get rid of the duplicate code?


namespace clp_s {
void TimestampEntry::ingest_timestamp(epochtime_t timestamp) {
Expand Down Expand Up @@ -54,21 +62,21 @@ void TimestampEntry::merge_range(TimestampEntry const& entry) {
}
}

void TimestampEntry::write_to_file(ZstdCompressor& compressor) const {
compressor.write_numeric_value<uint64_t>(m_key_name.size());
compressor.write_string(m_key_name);
compressor.write_numeric_value<uint64_t>(m_column_ids.size());
void TimestampEntry::write_to_stream(std::stringstream& stream) const {
write_numeric_value<uint64_t>(stream, m_key_name.size());
stream.write(m_key_name.data(), m_key_name.size());
write_numeric_value<uint64_t>(stream, m_column_ids.size());
for (auto const& id : m_column_ids) {
compressor.write_numeric_value<int32_t>(id);
write_numeric_value<int32_t>(stream, id);
}

compressor.write_numeric_value<TimestampEncoding>(m_encoding);
write_numeric_value<TimestampEncoding>(stream, m_encoding);
if (m_encoding == Epoch) {
compressor.write_numeric_value<epochtime_t>(m_epoch_start);
compressor.write_numeric_value<epochtime_t>(m_epoch_end);
write_numeric_value<epochtime_t>(stream, m_epoch_start);
write_numeric_value<epochtime_t>(stream, m_epoch_end);
} else if (m_encoding == DoubleEpoch) {
compressor.write_numeric_value<double>(m_epoch_start_double);
compressor.write_numeric_value<double>(m_epoch_end_double);
write_numeric_value<double>(stream, m_epoch_start_double);
write_numeric_value<double>(stream, m_epoch_end_double);
}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
12 changes: 3 additions & 9 deletions components/core/src/clp_s/TimestampEntry.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_TIMESTAMPENTRY_HPP
#define CLP_S_TIMESTAMPENTRY_HPP

#include <sstream>
#include <string>
#include <unordered_set>
#include <variant>
Expand All @@ -9,7 +10,6 @@
#include "ErrorCode.hpp"
#include "search/FilterOperation.hpp"
#include "Utils.hpp"
#include "ZstdCompressor.hpp"
#include "ZstdDecompressor.hpp"

using clp_s::search::FilterOperation;
Expand Down Expand Up @@ -66,10 +66,10 @@ class TimestampEntry {
void merge_range(TimestampEntry const& entry);

/**
* Write the timestamp entry to a file
* Write the timestamp entry to a buffered stream.
* @param compressor
*/
void write_to_file(ZstdCompressor& compressor) const;
void write_to_stream(std::stringstream& stream) const;

/**
* Try to read the timestamp entry from a file
Expand Down Expand Up @@ -119,12 +119,6 @@ class TimestampEntry {
*/
epochtime_t get_end_timestamp() const;

size_t size_in_bytes() const {
return sizeof(uint64_t) + m_key_name.size() + sizeof(uint64_t)
+ m_column_ids.size() * sizeof(int32_t) + sizeof(TimestampEncoding)
+ 2 * sizeof(epochtime_t);
}

private:
TimestampEncoding m_encoding;
double m_epoch_start_double, m_epoch_end_double;
Expand Down
Loading