Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleared TODO regarding segfaults and fixed re-publishing metadata behavior #71

Merged
merged 7 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions mdio/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <fstream>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
Expand Down Expand Up @@ -844,7 +845,14 @@ class Dataset {
// Build out list of modified variables
std::vector<std::string> modifiedVariables;
for (const auto& key : keys) {
modifiedVariables.push_back(key);
auto var = variables.at(key).value();
if (var.was_updated() || var.should_publish()) {
modifiedVariables.push_back(key);
}
if (var.should_publish()) {
// Reset the flag. This should only get set by the trim util.
var.set_metadata_publish_flag(false);
}
}

// If nothing changed, we don't want to perform any writes
Expand All @@ -857,15 +865,14 @@ class Dataset {

// We need to update the entire .zmetadata file
std::vector<nlohmann::json> json_vars;
std::vector<Variable<>>
vars; // Keeps the Variables in memory. Fix for premature reference
// decrement in LLVM compiler.
for (auto key : keys) {
auto var = variables.at(key).value();
std::vector<std::shared_ptr<Variable<>>>
vars; // Keeps the Variables in memory using shared_ptr
for (const auto& key : keys) {
auto var = std::make_shared<Variable<>>(variables.at(key).value());
vars.push_back(var);
// Get the JSON, drop transform, and add attributes
nlohmann::json json =
var.get_store().spec().value().ToJson(IncludeDefaults{}).value();
var->get_store().spec().value().ToJson(IncludeDefaults{}).value();
json.erase("transform");
json.erase("dtype");
json["metadata"].erase("filters");
Expand All @@ -876,7 +883,7 @@ class Dataset {
std::string path = json["kvstore"]["path"].get<std::string>();
path.pop_back();
json["kvstore"]["path"] = path;
nlohmann::json meta = var.getMetadata();
nlohmann::json meta = var->getMetadata();
if (meta.contains("coordinates")) {
meta["attributes"]["coordinates"] = meta["coordinates"];
meta.erase("coordinates");
Expand Down Expand Up @@ -912,18 +919,16 @@ class Dataset {
promises;
std::vector<tensorstore::AnyFuture> futures;

vars.clear(); // Clear the vector so we can add only the modified Variables
for (auto key : modifiedVariables) {
for (const auto& key : modifiedVariables) {
auto pair = tensorstore::PromiseFuturePair<
tensorstore::TimestampedStorageGeneration>::Make();
auto var = variables.at(key).value();
vars.push_back(var);
auto updateFuture = var.PublishMetadata();
auto var = std::make_shared<Variable<>>(variables.at(key).value());
auto updateFuture = var->PublishMetadata();
updateFuture.ExecuteWhenReady(
[promise = std::move(pair.promise)](
tensorstore::ReadyFuture<
tensorstore::TimestampedStorageGeneration>
readyFut) { promise.SetResult(readyFut.result()); });
[promise = std::move(pair.promise),
var](tensorstore::ReadyFuture<
tensorstore::TimestampedStorageGeneration>
readyFut) { promise.SetResult(readyFut.result()); });
variableFutures.push_back(std::move(updateFuture));
futures.push_back(std::move(pair.future));
}
Expand Down
1 change: 1 addition & 0 deletions mdio/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Future<void> TrimDataset(std::string dataset_path,

for (auto& varIdentifier : ds.variables.get_iterable_accessor()) {
MDIO_ASSIGN_OR_RETURN(auto var, ds.variables.at(varIdentifier))
var.set_metadata_publish_flag(true);

if (var.dimensions().labels().back() == "") {
auto spec = var.spec();
Expand Down
37 changes: 34 additions & 3 deletions mdio/variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,22 @@ class Variable {
return attributesAddress != currentAddress;
}

/**
* @brief Sets the flag whether the metadata should get republished.
* Intended for internal use with the trimming utility.
*
* @param shouldPublish True if the metadata should get republished.
*/
void set_metadata_publish_flag(const bool shouldPublish) {
if (!toPublish) {
// This should never be the case, but better safe than sorry
toPublish = std::make_shared<std::shared_ptr<bool>>(
std::make_shared<bool>(shouldPublish));
} else {
*toPublish = std::make_shared<bool>(shouldPublish);
}
}

// ===========================Member data getters===========================
const std::string& get_variable_name() const { return variableName; }

Expand All @@ -1153,6 +1169,20 @@ class Variable {
return attributesAddress;
}

/**
* @brief Gets whether the metadata should get republished.
*
* @return True if the metadata should get republished.
*/
bool should_publish() const {
if (toPublish && *toPublish) {
// Deref the shared_ptr so we're not increasing refcount
return **toPublish;
}
// If the flag was a nullptr, err on the side of caution and republish
return true;
}

private:
/**
* This method should NEVER be called by the user.
Expand All @@ -1167,9 +1197,7 @@ class Variable {
if (attributes.get() != nullptr && attributes->get() != nullptr) {
std::uintptr_t newAddress =
reinterpret_cast<std::uintptr_t>(&(**attributes));
// TODO(BrianMichell): Leaving this as active is causing segfaults.
// The features requiring it are low priority.
// attributesAddress = newAddress;
attributesAddress = newAddress;
}
// It is fine that this will only change in the "collection" instance of the
// Variable, because that is the only one that will be operated on by the
Expand All @@ -1185,6 +1213,9 @@ class Variable {
tensorstore::TensorStore<T, R, M> store;
// The address of the attributes. This MUST NEVER be touched by the user.
std::uintptr_t attributesAddress;
// The metadata will need to be updated if the trim util was used on it.
std::shared_ptr<std::shared_ptr<bool>> toPublish =
std::make_shared<std::shared_ptr<bool>>(std::make_shared<bool>(false));
};

// Tensorstore Array's don't have an IndexDomain and so they can't be slice with
Expand Down
45 changes: 45 additions & 0 deletions mdio/variable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,51 @@ TEST(Variable, mismatchAttrs) {
std::filesystem::remove_all("test.zarr");
}

TEST(Variable, updateMetadata) {
nlohmann::json json = GetJsonSpecStruct();
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

// Update the metadata
nlohmann::json attrs = {
{"count", 100},
{"min", -1000.0},
{"max", 1000.0},
{"sum", 0.0},
{"sumSquares", 0.0},
{"histogram", {{"binCenters", {1.0, 2.0, 3.0}}, {"counts", {1, 2, 3}}}}};
auto attrsUpdateRes = variable.UpdateAttributes<float>(attrs);
ASSERT_TRUE(attrsUpdateRes.status().ok()) << attrsUpdateRes.status();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, publishNoStats) {
nlohmann::json json = GetJsonSpecStruct();
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, publishNoAttrs) {
nlohmann::json json = GetJsonSpecStruct();
json["attributes"].erase("long_name");
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, slice) {
auto variable =
mdio::Variable<>::Open(json_good, mdio::constants::kCreateClean).value();
Expand Down