Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename mc::range to mc::block, better bucket scaling #33

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 108 additions & 49 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,117 @@ namespace gribjump {

namespace {

void encodeVector(eckit::Stream& s, const std::vector<double>& v) {
template <typename T>
void encodeVector(eckit::Stream& s, const std::vector<T>& v) {
size_t size = v.size();
s << size;
eckit::Buffer buffer(v.data(), size * sizeof(double));
eckit::Buffer buffer(v.data(), size * sizeof(T));
s << buffer;
}

std::vector<double> decodeVector(eckit::Stream& s) {
template <typename T>
std::vector<T> decodeVector(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(double));
eckit::Buffer buffer(size * sizeof(T));
s >> buffer;
double* data = (double*) buffer.data();
T* data = (T*) buffer.data();

return std::vector<double>(data, data + size);
return std::vector<T>(data, data + size);
}

template <typename T>
void encodeVectorVector(eckit::Stream& s, const std::vector<std::vector<T>>& vv) {
std::vector<size_t> sizes;
sizes.reserve(vv.size());
size_t totalSize = 0;
for (auto& v : vv) {
sizes.push_back(v.size());
totalSize += v.size();
}
encodeVector(s, sizes);

// Make a flat vector
std::vector<T> flat;
flat.reserve(totalSize);
for (auto& v : vv) {
flat.insert(flat.end(), v.begin(), v.end());
}
encodeVector(s, flat);
}

template <typename T>
std::vector<std::vector<T>> decodeVectorVector(eckit::Stream& s) {
std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<T> flat = decodeVector<T>(s);

std::vector<std::vector<T>> vv;
size_t pos = 0;
for (auto& size : sizes) {
vv.push_back(std::vector<T>(flat.begin() + pos, flat.begin() + pos + size));
pos += size;
}
return vv;
}

void encodeRanges(eckit::Stream& s, const std::vector<Range>& ranges) {
size_t size = ranges.size();
s << size;
eckit::Buffer buffer(ranges.data(), size * sizeof(size_t)*2); // does this really work?
s << buffer;
}

std::vector<Range> decodeRanges(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(size_t)*2);
s >> buffer;
size_t* data = (size_t*) buffer.data();

std::vector<Range> ranges;
for (size_t i = 0; i < size; i++) {
ranges.push_back(std::make_pair(data[i*2], data[i*2+1]));
}

return ranges;
}

void encodeMask(eckit::Stream& s, const std::vector<std::vector<std::bitset<64>>>& mask) {

size_t totalSize = 0;
std::vector<size_t> sizes;
for (auto& v : mask) {
totalSize += v.size();
sizes.push_back(v.size());
}
std::vector<uint64_t> flat;
flat.reserve(totalSize);
for (auto& v : mask) {
for (auto& b : v) {
flat.push_back(b.to_ullong());
}
}
encodeVector(s, sizes);
encodeVector(s, flat);
}

std::vector<std::vector<std::bitset<64>>> decodeMask(eckit::Stream& s) {

std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<uint64_t> flat = decodeVector<uint64_t>(s);

std::vector<std::vector<std::bitset<64>>> mask;
size_t pos = 0;
for (auto& size : sizes) {
std::vector<std::bitset<64>> maskBitset;
for (size_t i = 0; i < size; i++) {
maskBitset.push_back(std::bitset<64>(flat[pos + i]));
}
mask.push_back(maskBitset);
pos += size;
}
return mask;
}
} // namespace

ExtractionResult::ExtractionResult() {}
Expand All @@ -45,21 +139,13 @@ ExtractionResult::ExtractionResult(std::vector<std::vector<double>> values, std:
{}

ExtractionResult::ExtractionResult(eckit::Stream& s) {
size_t numRanges;
s >> numRanges;
for (size_t i = 0; i < numRanges; i++) {
values_.push_back(decodeVector(s));
}
values_ = decodeVectorVector<double>(s);
mask_ = decodeMask(s);
}

std::vector<std::vector<std::string>> bitsetStrings;
s >> bitsetStrings;
for (auto& v : bitsetStrings) {
std::vector<std::bitset<64>> bitset;
for (auto& b : v) {
bitset.push_back(std::bitset<64>(b));
}
mask_.push_back(bitset);
}
void ExtractionResult::encode(eckit::Stream& s) const {
encodeVectorVector(s, values_);
encodeMask(s, mask_);
}

void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsigned long** nvalues) {
Expand All @@ -72,24 +158,6 @@ void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsig
}
}

void ExtractionResult::encode(eckit::Stream& s) const {

s << values_.size(); // vector of vectors
for (auto& v : values_) {
encodeVector(s, v);
}

std::vector<std::vector<std::string>> bitsetStrings;
for (auto& v : mask_) {
std::vector<std::string> bitsetString;
for (auto& b : v) {
bitsetString.push_back(b.to_string());
}
bitsetStrings.push_back(bitsetString);
}
s << bitsetStrings;
}

void ExtractionResult::print(std::ostream& s) const {
s << "ExtractionResult[Values:[";
for (auto& v : values_) {
Expand Down Expand Up @@ -129,13 +197,7 @@ ExtractionRequest::ExtractionRequest() {}
ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
s >> request_;
s >> gridHash_;
size_t numRanges;
s >> numRanges;
for (size_t j = 0; j < numRanges; j++) {
size_t start, end;
s >> start >> end;
ranges_.push_back(std::make_pair(start, end));
}
ranges_ = decodeRanges(s);
}

eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
Expand All @@ -146,10 +208,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
void ExtractionRequest::encode(eckit::Stream& s) const {
s << request_;
s << gridHash_;
s << ranges_.size();
for (auto& [start, end] : ranges_) {
s << start << end;
}
encodeRanges(s, ranges_);
}

void ExtractionRequest::print(std::ostream& s) const {
Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/GribJumpDataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ namespace gribjump {
class GribJumpDataAccessor : public mc::DataAccessor {

public:
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Range range) : dh_{dh}, data_section_range_{range} {}
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Block range) : dh_{dh}, data_section_range_{range} {}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const mc::Range& range) const override {
eckit::Buffer read(const mc::Block& range) const override {
eckit::Offset offset = range.first;
eckit::Length size = range.second;

Expand All @@ -51,7 +51,7 @@ class GribJumpDataAccessor : public mc::DataAccessor {

private:
eckit::DataHandle& dh_;
mc::Range data_section_range_;
mc::Block data_section_range_;
};

} // namespace gribjump
6 changes: 3 additions & 3 deletions src/gribjump/compression/DataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DataAccessor {
public:
virtual ~DataAccessor() = default;
virtual void write(const eckit::Buffer& buffer, const size_t offset) const = 0;
virtual eckit::Buffer read(const Range& range) const = 0;
virtual eckit::Buffer read(const Block& range) const = 0;
virtual eckit::Buffer read() const = 0;
virtual size_t eof() const = 0;
};
Expand All @@ -43,7 +43,7 @@ class PosixAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
eckit::Buffer buf(size);
ifs_.seekg(offset, std::ios::beg);
Expand Down Expand Up @@ -89,7 +89,7 @@ class MemoryAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
if (offset + size > buf_.size()) {
std::stringstream ss;
Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/compression/NumericCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ class NumericDecompressor {
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual Values decode(const CompressedData&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Range&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Block&) = 0;


virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges) {
virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges) {
using Values = typename NumericDecompressor<ValueType>::Values;
std::vector<Values> result;
decode(accessor, ranges, result);
return result;
}

virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges, std::vector<Values>& result) {
virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges, std::vector<Values>& result) {
using Values = typename NumericDecompressor<ValueType>::Values;

std::unordered_map<Range, std::pair<Range, std::shared_ptr<Values>>> ranges_map;
std::unordered_map<Block, std::pair<Block, std::shared_ptr<Values>>> ranges_map;

// find which sub_ranges are in which buckets
RangeBuckets buckets;
BlockBuckets buckets;
for (const auto& range : ranges) {
buckets << range;
}
Expand Down
Loading
Loading