From f7a02b3df79d5260a2c43adc5627fa335875bc31 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 5 Aug 2024 20:48:01 +0000 Subject: [PATCH 1/5] Added S3 unit tests --- mdio/CMakeLists.txt | 24 ++++ mdio/s3_test.cc | 306 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 mdio/s3_test.cc diff --git a/mdio/CMakeLists.txt b/mdio/CMakeLists.txt index 146316f..d205d22 100644 --- a/mdio/CMakeLists.txt +++ b/mdio/CMakeLists.txt @@ -109,6 +109,30 @@ mdio_cc_test( tensorstore::kvstore_gcs ) +mdio_cc_test( + NAME + s3_test + SRCS + s3_test.cc + COPTS + ${mdio_DEFAULT_COPTS} + LINKOPTS + ${mdio_DEFAULT_LINKOPTS} + DEPS + GTest::gmock_main + tensorstore::driver_array + tensorstore::driver_zarr + tensorstore::driver_json + tensorstore::kvstore_file + tensorstore::kvstore_memory + tensorstore::tensorstore + tensorstore::index_space_dim_expression + tensorstore::index_space_index_transform + tensorstore::util_status_testutil + nlohmann_json_schema_validator + tensorstore::kvstore_s3 +) + mdio_cc_test( NAME variable_collection_test diff --git a/mdio/s3_test.cc b/mdio/s3_test.cc new file mode 100644 index 0000000..2d5e071 --- /dev/null +++ b/mdio/s3_test.cc @@ -0,0 +1,306 @@ +// Copyright 2024 TGS + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "mdio/dataset.h" + +namespace { + +// TODO(End user): User should point to their own S3 bucket here. +// You may find the test dataset at: TODO: Upload the test dataset to a public +// object store +/*NOLINT*/ std::string const S3_PATH = "s3://USER_BUCKET"; + +/*NOLINT*/ std::string const fullToyManifest = R"( +{ + "metadata": { + "name": "campos_3d", + "apiVersion": "1.0.0", + "createdOn": "2023-12-12T15:02:06.413469-06:00", + "attributes": { + "textHeader": [ + "C01 .......................... ", + "C02 .......................... ", + "C03 .......................... " + ], + "foo": "bar" + } + }, + "variables": [ + { + "name": "image", + "dataType": "float32", + "dimensions": [ + {"name": "inline", "size": 256}, + {"name": "crossline", "size": 512}, + {"name": "depth", "size": 384} + ], + "metadata": { + "chunkGrid": { + "name": "regular", + "configuration": { "chunkShape": [128, 128, 128] } + }, + "statsV1": { + "count": 100, + "sum": 1215.1, + "sumSquares": 125.12, + "min": 5.61, + "max": 10.84, + "histogram": {"binCenters": [1, 2], "counts": [10, 15]} + }, + "attributes": { + "fizz": "buzz" + } + }, + "coordinates": ["inline", "crossline", "depth", "cdp-x", "cdp-y"], + "compressor": {"name": "blosc", "algorithm": "zstd"} + }, + { + "name": "velocity", + "dataType": "float64", + "dimensions": ["inline", "crossline", "depth"], + "metadata": { + "chunkGrid": { + "name": "regular", + "configuration": { "chunkShape": [128, 128, 128] } + }, + "unitsV1": {"speed": "m/s"} + }, + "coordinates": ["inline", "crossline", "depth", "cdp-x", "cdp-y"] + }, + { + "name": "image_inline", + "dataType": "int16", + "dimensions": ["inline", "crossline", "depth"], + "longName": "inline optimized version of 3d_stack", + "compressor": {"name": "blosc", "algorithm": "zstd"}, + "metadata": { + "chunkGrid": { + "name": "regular", + "configuration": { "chunkShape": [128, 128, 128] } + } + }, + "coordinates": ["inline", "crossline", "depth", "cdp-x", "cdp-y"] + }, + { + "name": "image_headers", + "dataType": { + "fields": [ + {"name": "cdp-x", "format": "int32"}, + {"name": "cdp-y", "format": "int32"}, + {"name": "elevation", "format": "float16"}, + {"name": "some_scalar", "format": "float16"} + ] + }, + "dimensions": ["inline", "crossline"], + "metadata": { + "chunkGrid": { + "name": "regular", + "configuration": { "chunkShape": [128, 128] } + } + }, + "coordinates": ["inline", "crossline", "cdp-x", "cdp-y"] + }, + { + "name": "inline", + "dataType": "uint32", + "dimensions": [{"name": "inline", "size": 256}] + }, + { + "name": "crossline", + "dataType": "uint32", + "dimensions": [{"name": "crossline", "size": 512}] + }, + { + "name": "depth", + "dataType": "uint32", + "dimensions": [{"name": "depth", "size": 384}], + "metadata": { + "unitsV1": { "length": "m" } + } + }, + { + "name": "cdp-x", + "dataType": "float32", + "dimensions": [ + {"name": "inline", "size": 256}, + {"name": "crossline", "size": 512} + ], + "metadata": { + "unitsV1": { "length": "m" } + } + }, + { + "name": "cdp-y", + "dataType": "float32", + "dimensions": [ + {"name": "inline", "size": 256}, + {"name": "crossline", "size": 512} + ], + "metadata": { + "unitsV1": { "length": "m" } + } + }] +} +)"; + +TEST(S3, create) { + if (S3_PATH == "s3://USER_BUCKET") { + GTEST_SKIP() << "Please set the S3_PATH to your own bucket in the " + "s3_test.cc file."; + } + nlohmann::json j = nlohmann::json::parse(fullToyManifest); + auto dataset = + mdio::Dataset::from_json(j, S3_PATH, mdio::constants::kCreateClean); + EXPECT_TRUE(dataset.status().ok()) << dataset.status(); +} + +TEST(S3, open) { + if (S3_PATH == "s3://USER_BUCKET") { + GTEST_SKIP() << "Please set the S3_PATH to your own bucket in the " + "s3_test.cc file."; + } + nlohmann::json j = nlohmann::json::parse(fullToyManifest); + auto dataset = mdio::Dataset::Open(S3_PATH, mdio::constants::kOpen); + EXPECT_TRUE(dataset.status().ok()) + << dataset.status(); // TODO(BrianMichell): How will timeouts work with + // this? Can we simulate it or make it excessively + // short to force one? +} + +TEST(S3, write) { + if (S3_PATH == "s3://USER_BUCKET") { + GTEST_SKIP() << "Please set the S3_PATH to your own bucket in the " + "s3_test.cc file."; + } + nlohmann::json j = nlohmann::json::parse(fullToyManifest); + auto dataset = mdio::Dataset::Open(S3_PATH, mdio::constants::kOpen); + ASSERT_TRUE(dataset.status().ok()) << dataset.status(); + + auto ds = dataset.value(); + std::vector names = ds.variables.get_keys(); + + // Construct a vector of Variables to work with + std::vector> openVariables; + for (auto& key : names) { + auto var = ds.get_variable(key); + openVariables.emplace_back(var.value()); + } + + // Now we can start opening all the Variables + std::vector>> readVariablesFutures; + for (auto& v : openVariables) { + auto read = v.Read(); + readVariablesFutures.emplace_back(read); + } + + // Now we make sure all the reads were successful + std::vector> readVariables; + for (auto& v : readVariablesFutures) { + ASSERT_TRUE(v.status().ok()) << v.status(); + readVariables.emplace_back(v.value()); + } + + for (auto variable : readVariables) { + std::string name = variable.variableName; + mdio::DataType dtype = variable.dtype(); + if (dtype == mdio::constants::kFloat32 && name == "image") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + data[0] = 3.14f; + } else if (dtype == mdio::constants::kFloat64 && name == "velocity") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + data[0] = 2.71828; + } else if (dtype == mdio::constants::kInt16 && name == "image_inline") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + data[0] = 0xff; + } else if (dtype == mdio::constants::kByte && name == "image_headers") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + for (int i = 0; i < 12; i++) { + data[i] = std::byte(0xff); + } + } else if (name == "inline") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + for (uint32_t i = 0; i < 256; ++i) { + data[i] = i; + } + } else if (name == "crossline") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + for (uint32_t i = 0; i < 512; ++i) { + data[i] = i; + } + } else if (name == "depth") { + auto data = reinterpret_cast( + variable.get_data_accessor().data()); + for (uint32_t i = 0; i < 384; ++i) { + data[i] = i; + } + } + } + + // Pair the Variables to the VariableData objects via name matching so we can + // write them out correctly This makes an assumption that the vectors are 1-1 + std::map variableIdxPair; + for (std::size_t i = 0; i < openVariables.size(); i++) { + for (std::size_t j = 0; j < readVariables.size(); j++) { + if (openVariables[i].get_variable_name() == + readVariables[j].variableName) { + variableIdxPair[i] = j; + break; + } + } + } + + // Now we can write the Variables back to the store + std::vector writeFutures; + for (auto& idxPair : variableIdxPair) { + auto write = + openVariables[idxPair.second].Write(readVariables[idxPair.first]); + writeFutures.emplace_back(write); + } + + // Now we make sure all the writes were successful + for (auto& w : writeFutures) { + ASSERT_TRUE(w.status().ok()) << w.status(); + } +} + +TEST(S3, read) { + if (S3_PATH == "s3://USER_BUCKET") { + GTEST_SKIP() << "Please set the S3_PATH to your own bucket in the " + "s3_test.cc file."; + } + nlohmann::json j = nlohmann::json::parse(fullToyManifest); + auto dataset = mdio::Dataset::Open(S3_PATH, mdio::constants::kOpen); + ASSERT_TRUE(dataset.status().ok()) << dataset.status(); + + auto ds = dataset.value(); + + for (auto& kv : ds.coordinates) { + std::string key = kv.first; + auto var = ds.get_variable(key); + ASSERT_TRUE(var.status().ok()) << var.status(); + } + auto future = ds.SelectField("image_headers", "cdp-x"); + ASSERT_TRUE(future.status().ok()) << future.status(); +} + +} // namespace From 0671b4d65ec033ccf5500531044a99d89e7ad3f0 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 5 Aug 2024 20:48:57 +0000 Subject: [PATCH 2/5] Split cloud specific drives into their own variables --- CMakeLists.txt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0acebae..17881be 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -101,11 +101,20 @@ set(mdio_INTERNAL_DEPS tensorstore::index_space_dim_expression tensorstore::index_space_index_transform tensorstore::util_status_testutil - tensorstore::kvstore_gcs tensorstore::driver_array PARENT_SCOPE ) +# Define internal deps for cloud specific drivers +set(mdio_INTERNAL_GCS_DRIVER_DEPS + tensorstore::kvstore_gcs + PARENT_SCOPE +) + +set(mdio_INTERNAL_S3_DRIVER_DEPS + tensorstore::kvstore_s3 + PARENT_SCOPE +) list(APPEND mdio_COMMON_INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}) From b36cfcf19e0a0b038b4cc9903b8b5ee2a4e0e7f0 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Tue, 6 Aug 2024 13:17:53 +0000 Subject: [PATCH 3/5] Add source directory guard for setting internal deps variable --- CMakeLists.txt | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0acebae..8857989 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,20 +91,23 @@ list(APPEND mdio_TEST_COPTS "-Wno-unknown-warning-option") # Define the internal dependencies that should be linked -set(mdio_INTERNAL_DEPS - tensorstore::driver_array - tensorstore::driver_zarr - tensorstore::driver_json - tensorstore::kvstore_file - tensorstore::kvstore_memory - tensorstore::tensorstore - tensorstore::index_space_dim_expression - tensorstore::index_space_index_transform - tensorstore::util_status_testutil - tensorstore::kvstore_gcs - tensorstore::driver_array - PARENT_SCOPE -) +if(NOT CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) + # This is not the top-level project + set(mdio_INTERNAL_DEPS + tensorstore::driver_array + tensorstore::driver_zarr + tensorstore::driver_json + tensorstore::kvstore_file + tensorstore::kvstore_memory + tensorstore::tensorstore + tensorstore::index_space_dim_expression + tensorstore::index_space_index_transform + tensorstore::util_status_testutil + tensorstore::kvstore_gcs + tensorstore::driver_array + PARENT_SCOPE + ) +endif() list(APPEND mdio_COMMON_INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}) From 534bcfa96427bea4f1d8dbeee548700ba4a001b1 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 7 Aug 2024 14:59:53 +0000 Subject: [PATCH 4/5] Added check for missing driver message, provides more MDIO specific message. --- mdio/variable.h | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/mdio/variable.h b/mdio/variable.h index 364bb19..9b8aca8 100644 --- a/mdio/variable.h +++ b/mdio/variable.h @@ -80,6 +80,41 @@ struct SliceDescriptor { }; namespace internal { + +/** + * @brief Checks a status for a missing driver message and returns an MDIO + * specific error message. + * @param status The status to check + * @return A driver specific message if the status is a missing driver message, + * otherwise the original status. + */ +absl::Status CheckMissingDriverStatus(const absl::Status& status) { + std::string error(status.message()); + if (error.find("Error parsing object member \"driver\"") != + std::string::npos) { + if (error.find("is not registered") != std::string::npos) { + if (error.find("gcs") != std::string::npos) { + return absl::InvalidArgumentError( + "A GCS path was detected but the GCS driver was not " + "registered.\nPlease ensure that your CMake includes the " + "mdio_INTERNAL_GCS_DRIVER_DEPS variable."); + } else if (error.find("s3") != std::string::npos) { + return absl::InvalidArgumentError( + "An S3 path was detected but the S3 driver was not " + "registered.\nPlease ensure that your CMake includes the " + "mdio_INTERNAL_S3_DRIVER_DEPS variable."); + } else { + return absl::InvalidArgumentError( + "An unexpected driver registration error has occured. Please file " + "a bug report with the error message to " + "https://github.com/TGSAI/mdio-cpp/issues\n" + + error); + } + } + } + return status; +} + /** * @brief Validates and processes a JSON specification for a tensorstore * variable. @@ -358,7 +393,7 @@ Future> CreateVariable(const nlohmann::json& json_spec, tensorstore::ReadyFuture readyFut) { auto ready_result = readyFut.result(); if (!ready_result.ok()) { - promise.SetResult(ready_result.status()); + promise.SetResult(CheckMissingDriverStatus(ready_result.status())); } else { promise.SetResult(variable_future.result()); } From fdfdbda3f3be67ecb628ff7370a30f497b5abe67 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Wed, 7 Aug 2024 15:00:27 +0000 Subject: [PATCH 5/5] Explicitly check status in Dataset, return driver message if applicable --- mdio/dataset.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mdio/dataset.h b/mdio/dataset.h index 394e0c1..b5154f7 100644 --- a/mdio/dataset.h +++ b/mdio/dataset.h @@ -293,8 +293,14 @@ from_zmetadata(const std::string& dataset_path) { // FIXME - enable async auto kvs_future = mdio::internal::dataset_kvs_store(dataset_path).result(); + if (!kvs_future.ok()) { + return internal::CheckMissingDriverStatus(kvs_future.status()); + } auto kvs_read_result = tensorstore::kvstore::Read(kvs_future.value(), ".zmetadata").result(); + if (!kvs_read_result.ok()) { + return internal::CheckMissingDriverStatus(kvs_read_result.status()); + } ::nlohmann::json zmetadata; try {