Skip to content

Commit

Permalink
Merge pull request #71 from TGSAI/46_publish_bug
Browse files Browse the repository at this point in the history
Cleared TODO regarding segfaults and fixed re-publishing metadata behavior
  • Loading branch information
blasscoc authored Aug 15, 2024
2 parents e480085 + 8d662f3 commit 2d14001
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 20 deletions.
39 changes: 22 additions & 17 deletions mdio/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <fstream>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
Expand Down Expand Up @@ -844,7 +845,14 @@ class Dataset {
// Build out list of modified variables
std::vector<std::string> modifiedVariables;
for (const auto& key : keys) {
modifiedVariables.push_back(key);
auto var = variables.at(key).value();
if (var.was_updated() || var.should_publish()) {
modifiedVariables.push_back(key);
}
if (var.should_publish()) {
// Reset the flag. This should only get set by the trim util.
var.set_metadata_publish_flag(false);
}
}

// If nothing changed, we don't want to perform any writes
Expand All @@ -857,15 +865,14 @@ class Dataset {

// We need to update the entire .zmetadata file
std::vector<nlohmann::json> json_vars;
std::vector<Variable<>>
vars; // Keeps the Variables in memory. Fix for premature reference
// decrement in LLVM compiler.
for (auto key : keys) {
auto var = variables.at(key).value();
std::vector<std::shared_ptr<Variable<>>>
vars; // Keeps the Variables in memory using shared_ptr
for (const auto& key : keys) {
auto var = std::make_shared<Variable<>>(variables.at(key).value());
vars.push_back(var);
// Get the JSON, drop transform, and add attributes
nlohmann::json json =
var.get_store().spec().value().ToJson(IncludeDefaults{}).value();
var->get_store().spec().value().ToJson(IncludeDefaults{}).value();
json.erase("transform");
json.erase("dtype");
json["metadata"].erase("filters");
Expand All @@ -876,7 +883,7 @@ class Dataset {
std::string path = json["kvstore"]["path"].get<std::string>();
path.pop_back();
json["kvstore"]["path"] = path;
nlohmann::json meta = var.getMetadata();
nlohmann::json meta = var->getMetadata();
if (meta.contains("coordinates")) {
meta["attributes"]["coordinates"] = meta["coordinates"];
meta.erase("coordinates");
Expand Down Expand Up @@ -912,18 +919,16 @@ class Dataset {
promises;
std::vector<tensorstore::AnyFuture> futures;

vars.clear(); // Clear the vector so we can add only the modified Variables
for (auto key : modifiedVariables) {
for (const auto& key : modifiedVariables) {
auto pair = tensorstore::PromiseFuturePair<
tensorstore::TimestampedStorageGeneration>::Make();
auto var = variables.at(key).value();
vars.push_back(var);
auto updateFuture = var.PublishMetadata();
auto var = std::make_shared<Variable<>>(variables.at(key).value());
auto updateFuture = var->PublishMetadata();
updateFuture.ExecuteWhenReady(
[promise = std::move(pair.promise)](
tensorstore::ReadyFuture<
tensorstore::TimestampedStorageGeneration>
readyFut) { promise.SetResult(readyFut.result()); });
[promise = std::move(pair.promise),
var](tensorstore::ReadyFuture<
tensorstore::TimestampedStorageGeneration>
readyFut) { promise.SetResult(readyFut.result()); });
variableFutures.push_back(std::move(updateFuture));
futures.push_back(std::move(pair.future));
}
Expand Down
1 change: 1 addition & 0 deletions mdio/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Future<void> TrimDataset(std::string dataset_path,

for (auto& varIdentifier : ds.variables.get_iterable_accessor()) {
MDIO_ASSIGN_OR_RETURN(auto var, ds.variables.at(varIdentifier))
var.set_metadata_publish_flag(true);

if (var.dimensions().labels().back() == "") {
auto spec = var.spec();
Expand Down
37 changes: 34 additions & 3 deletions mdio/variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,22 @@ class Variable {
return attributesAddress != currentAddress;
}

/**
* @brief Sets the flag whether the metadata should get republished.
* Intended for internal use with the trimming utility.
*
* @param shouldPublish True if the metadata should get republished.
*/
void set_metadata_publish_flag(const bool shouldPublish) {
if (!toPublish) {
// This should never be the case, but better safe than sorry
toPublish = std::make_shared<std::shared_ptr<bool>>(
std::make_shared<bool>(shouldPublish));
} else {
*toPublish = std::make_shared<bool>(shouldPublish);
}
}

// ===========================Member data getters===========================
const std::string& get_variable_name() const { return variableName; }

Expand All @@ -1153,6 +1169,20 @@ class Variable {
return attributesAddress;
}

/**
* @brief Gets whether the metadata should get republished.
*
* @return True if the metadata should get republished.
*/
bool should_publish() const {
if (toPublish && *toPublish) {
// Deref the shared_ptr so we're not increasing refcount
return **toPublish;
}
// If the flag was a nullptr, err on the side of caution and republish
return true;
}

private:
/**
* This method should NEVER be called by the user.
Expand All @@ -1167,9 +1197,7 @@ class Variable {
if (attributes.get() != nullptr && attributes->get() != nullptr) {
std::uintptr_t newAddress =
reinterpret_cast<std::uintptr_t>(&(**attributes));
// TODO(BrianMichell): Leaving this as active is causing segfaults.
// The features requiring it are low priority.
// attributesAddress = newAddress;
attributesAddress = newAddress;
}
// It is fine that this will only change in the "collection" instance of the
// Variable, because that is the only one that will be operated on by the
Expand All @@ -1185,6 +1213,9 @@ class Variable {
tensorstore::TensorStore<T, R, M> store;
// The address of the attributes. This MUST NEVER be touched by the user.
std::uintptr_t attributesAddress;
// The metadata will need to be updated if the trim util was used on it.
std::shared_ptr<std::shared_ptr<bool>> toPublish =
std::make_shared<std::shared_ptr<bool>>(std::make_shared<bool>(false));
};

// Tensorstore Array's don't have an IndexDomain and so they can't be slice with
Expand Down
45 changes: 45 additions & 0 deletions mdio/variable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,51 @@ TEST(Variable, mismatchAttrs) {
std::filesystem::remove_all("test.zarr");
}

TEST(Variable, updateMetadata) {
nlohmann::json json = GetJsonSpecStruct();
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

// Update the metadata
nlohmann::json attrs = {
{"count", 100},
{"min", -1000.0},
{"max", 1000.0},
{"sum", 0.0},
{"sumSquares", 0.0},
{"histogram", {{"binCenters", {1.0, 2.0, 3.0}}, {"counts", {1, 2, 3}}}}};
auto attrsUpdateRes = variable.UpdateAttributes<float>(attrs);
ASSERT_TRUE(attrsUpdateRes.status().ok()) << attrsUpdateRes.status();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, publishNoStats) {
nlohmann::json json = GetJsonSpecStruct();
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, publishNoAttrs) {
nlohmann::json json = GetJsonSpecStruct();
json["attributes"].erase("long_name");
auto variableRes =
mdio::Variable<>::Open(json, mdio::constants::kCreateClean);
ASSERT_TRUE(variableRes.status().ok()) << variableRes.status();
auto variable = variableRes.value();

auto publishFuture = variable.PublishMetadata();
EXPECT_TRUE(publishFuture.status().ok()) << publishFuture.status();
}

TEST(Variable, slice) {
auto variable =
mdio::Variable<>::Open(json_good, mdio::constants::kCreateClean).value();
Expand Down

0 comments on commit 2d14001

Please sign in to comment.