Skip to content

Commit

Permalink
fix(S3): store issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mcakircali committed Dec 19, 2024
1 parent 5ffc60b commit 2cd18fe
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 231 deletions.
67 changes: 4 additions & 63 deletions src/fdb5/s3/S3Common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include "eckit/config/LocalConfiguration.h"
#include "eckit/exception/Exceptions.h"
#include "eckit/filesystem/URI.h"
#include "eckit/io/s3/S3BucketName.h"
#include "eckit/io/s3/S3Config.h"
#include "eckit/io/s3/S3Session.h"
#include "eckit/utils/Tokenizer.h"
#include "fdb5/config/Config.h"
#include "fdb5/database/Key.h"

Expand All @@ -26,80 +26,22 @@ namespace fdb5 {

//----------------------------------------------------------------------------------------------------------------------

S3Common::S3Common(const fdb5::Config& config, const std::string& component, const fdb5::Key& key) {
S3Common::S3Common(const fdb5::Config& config, const std::string& /*component*/, const fdb5::Key& key) {

parseConfig(config);

/// @note: code for bucket per DB

std::string keyStr = key.valuesToString();
std::replace(keyStr.begin(), keyStr.end(), ':', '-');
db_bucket_ = prefix_ + keyStr;

/// @note: code for single bucket for all DBs

// std::vector<std::string> valid{"catalogue", "store"};
// ASSERT(std::find(valid.begin(), valid.end(), component) != valid.end());

// bucket_ = "default";

// eckit::LocalConfiguration c{};

// if (config.has("s3")) c = config.getSubConfiguration("s3");
// if (c.has(component)) bucket_ = c.getSubConfiguration(component).getString("bucket", bucket_);

// std::string first_cap{component};
// first_cap[0] = toupper(component[0]);

// std::string all_caps{component};
// for (auto & c: all_caps) c = toupper(c);

// bucket_ = eckit::Resource<std::string>("fdbS3" + first_cap + "Bucket;$FDB_S3_" + all_caps + "_BUCKET", bucket_);

// db_prefix_ = key.valuesToString();

// if (c.has("client"))
// fdb5::DaosManager::instance().configure(c.getSubConfiguration("client"));

/// @todo: check that the bucket name complies with name restrictions
}

S3Common::S3Common(const fdb5::Config& config, const std::string& component, const eckit::URI& uri) {

/// @note: validity of input URI is not checked here because this constructor is only triggered
/// by DB::buildReader in EntryVisitMechanism, where validity of URIs is ensured beforehand
S3Common::S3Common(const fdb5::Config& config, const std::string& /*component*/, const eckit::URI& uri) {

parseConfig(config);

endpoint_ = eckit::net::Endpoint {uri.host(), uri.port()};

/// @note: code for bucket per DB

const auto parts = eckit::Tokenizer("/").tokenize(uri.name());
const auto n = parts.size();
ASSERT(n == 1 | n == 2);
db_bucket_ = parts[0];

/// @note: code for single bucket for all DBs

// eckit::S3Name n{uri};

// bucket_ = n.bucket().name();

// eckit::Tokenizer parse("_");
// std::vector<std::string> bits;
// parse(n.name(), bits);

// ASSERT(bits.size() == 2);

// db_prefix_ = bits[0];

// // eckit::LocalConfiguration c{};

// // if (config.has("s3")) c = config.getSubConfiguration("s3");

// // if (c.has("client"))
// // fdb5::DaosManager::instance().configure(c.getSubConfiguration("client"));
db_bucket_ = eckit::S3BucketName::parse(uri.name());
}

void S3Common::parseConfig(const fdb5::Config& config) {
Expand All @@ -126,7 +68,6 @@ void S3Common::parseConfig(const fdb5::Config& config) {

eckit::S3Session::instance().addClient(s3Config);

/// @note: code for bucket per DB only
prefix_ = s3.getString("bucketPrefix", prefix_);
}

Expand Down
24 changes: 7 additions & 17 deletions src/fdb5/s3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,26 @@
#pragma once

#include "eckit/filesystem/URI.h"

#include "fdb5/database/Key.h"
#include "fdb5/config/Config.h"
#include "fdb5/database/Key.h"

namespace fdb5 {

class S3Common {

public: // methods

public: // methods
S3Common(const fdb5::Config&, const std::string& component, const fdb5::Key&);
S3Common(const fdb5::Config&, const std::string& component, const eckit::URI&);

private: // methods

private: // methods
void parseConfig(const fdb5::Config& config);

protected: // members

protected: // members
eckit::net::Endpoint endpoint_;
std::string db_bucket_;

/// @note: code for single bucket for all DBs
// std::string bucket_;
// std::string db_prefix_;

private: // members
std::string db_bucket_;

private: // members
std::string prefix_;

};

}
} // namespace fdb5
160 changes: 9 additions & 151 deletions src/fdb5/s3/S3Store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "eckit/thread/AutoLock.h"
#include "eckit/thread/StaticMutex.h"
#include "eckit/utils/MD5.h"
#include "eckit/utils/Tokenizer.h"
#include "fdb5/LibFdb5.h"
#include "fdb5/database/Field.h"
#include "fdb5/database/FieldLocation.h"
Expand Down Expand Up @@ -63,15 +62,7 @@ bool S3Store::uriBelongs(const eckit::URI& uri) const {
}

bool S3Store::uriExists(const eckit::URI& uri) const {

// auto tmp = uri;
// tmp.endpoint(endpoint_);

std::cerr << "-----------> Checking if " << uri << " exists" << std::endl;

return eckit::S3Name::make(endpoint_, uri.name())->exists();

// return eckit::S3ObjectName(tmp).exists();
}

bool S3Store::auxiliaryURIExists(const eckit::URI& uri) const {
Expand All @@ -90,23 +81,6 @@ std::vector<eckit::URI> S3Store::collocatedDataURIs() const {
for (const auto& key : bucket.listObjects()) { store_unit_uris.push_back(bucket.makeObject(key)->uri()); }

return store_unit_uris;

/// @note: code for single bucket for all DBs
// std::vector<eckit::URI> store_unit_uris;

// eckit::S3Bucket bucket{endpoint_, bucket_};

// if (!bucket.exists()) return store_unit_uris;

// /// @note if an S3Catalogue is implemented, more filtering will need to
// /// be done here to discriminate store keys from catalogue keys
// for (const auto& key : bucket.listObjects(filter = "^" + db_prefix_ + "_.*")) {

// store_unit_uris.push_back(key.uri());

// }

// return store_unit_uris;
}

std::set<eckit::URI> S3Store::asCollocatedDataURIs(const std::vector<eckit::URI>& uris) const {
Expand All @@ -133,17 +107,11 @@ std::vector<eckit::URI> S3Store::getAuxiliaryURIs(const eckit::URI& uri) const {

/// @todo: never used in actual fdb-read?
eckit::DataHandle* S3Store::retrieve(Field& field) const {

return field.dataHandle();
}

std::unique_ptr<const FieldLocation> S3Store::archive(const Key& key, const void* data, eckit::Length length) {

/// @note: code for S3 object (key) per field:

/// @note: generate unique key name
/// if single bucket, starting by dbkey_indexkey_
/// if bucket per db, starting by indexkey_
eckit::S3ObjectName n = generateDataKey(key);

/// @todo: ensure bucket if not yet seen by this process
Expand All @@ -161,21 +129,6 @@ std::unique_ptr<const FieldLocation> S3Store::archive(const Key& key, const void
h->write(data, length);

return std::unique_ptr<const S3FieldLocation>(new S3FieldLocation(n.uri(), 0, length, fdb5::Key()));

/// @note: code for S3 object (key) per index store:

// /// @note: get or generate unique key name
// /// if single bucket, starting by dbkey_indexkey_
// /// if bucket per db, starting by indexkey_
// eckit::S3Name n = getDataKey(key);

// eckit::DataHandle &dh = getDataHandle(key, n);

// eckit::Offset offset{dh.position()};

// h.write(data, length);

// return std::unique_ptr<S3FieldLocation>(new S3FieldLocation(n.URI(), offset, length, fdb5::Key()));
}

void S3Store::flush() {
Expand All @@ -198,65 +151,21 @@ void S3Store::close() {

void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const {

/// @note: code for bucket per DB

const auto parts = eckit::Tokenizer("/").tokenize(uri.name());
const auto n = parts.size();
ASSERT(n == 1 | n == 2);

ASSERT(parts[0] == db_bucket_);

if (n == 2) { // object

eckit::S3ObjectName key {uri};

logVerbose << "destroy S3 key: " << key.asString() << std::endl;

if (doit) { key.remove(); }
auto item = eckit::S3Name::make(endpoint_, uri.name());

} else { // pool

eckit::S3BucketName bucket {uri};

logVerbose << "destroy S3 bucket: " << bucket.asString() << std::endl;

if (doit) { bucket.ensureDestroyed(); }
if (auto* object = dynamic_cast<eckit::S3ObjectName*>(item.get())) {
logVerbose << "Removing S3 object: " << object->asString() << '\n';
if (doit) { object->remove(); }
} else if (auto* bucket = dynamic_cast<eckit::S3BucketName*>(item.get())) {
logVerbose << "Removing S3 bucket: " << bucket->asString() << '\n';
if (doit) { bucket->ensureDestroyed(); }
} else {
throw eckit::SeriousBug("S3Store::remove: unknown URI type: " + uri.asString(), Here());
}

// void TocStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const {
// ASSERT(uri.scheme() == type());
//
// eckit::PathName path = uri.path();
// if (path.isDir()) {
// logVerbose << "rmdir: ";
// logAlways << path << std::endl;
// if (doit) path.rmdir(false);
// } else {
// logVerbose << "Unlinking: ";
// logAlways << path << std::endl;
// if (doit) path.unlink(false);
// }
// }

// /// @note: code for single bucket for all DBs
// eckit::S3Name n{uri};

// ASSERT(n.bucket().name() == bucket_);
// /// @note: if uri doesn't have key name, maybe this method should return without destroying anything.
// /// this way when TocWipeVisitor has wipeAll == true, the (only) bucket will not be destroyed
// ASSERT(n.name().rfind(db_prefix_, 0) == 0);

// logVerbose << "destroy S3 key: ";
// logAlways << n.asString() << std::endl;
// if (doit) n.destroy();
}

void S3Store::print(std::ostream& out) const {

out << "S3Store(" << endpoint_ << "/" << db_bucket_ << ")";

/// @note: code for single bucket for all DBs
// out << "S3Store(" << endpoint_ << "/" << bucket_ << ")";
}

/// @note: unique name generation copied from LocalPathName::unique.
Expand Down Expand Up @@ -287,59 +196,8 @@ eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const {
std::replace(keyStr.begin(), keyStr.end(), ':', '-');

return eckit::S3ObjectName {endpoint_, {db_bucket_, keyStr + "." + md5.digest() + ".data"}};

/// @note: code for single bucket for all DBs
// return eckit::S3Name{endpoint_, bucket_, db_prefix_ + "_" + key.valuesToString() + "_" + md5.digest() + ".data"};
}

/// @note: code for S3 object (key) per index store:
// eckit::S3Name S3Store::getDataKey(const Key& key) const {

// KeyStore::const_iterator j = dataKeys_.find(key);

// if ( j != dataKeys_.end() )
// return j->second;

// eckit::S3Name dataKey = generateDataKey(key);

// dataKeys_[ key ] = dataKey;

// return dataKey;

// }

/// @note: code for S3 object (key) per index store:
// eckit::DataHandle& S3Store::getDataHandle(const Key& key, const eckit::S3Name& name) {

// HandleStore::const_iterator j = handles_.find(key);
// if ( j != handles_.end() )
// return j->second;

// eckit::DataHandle *dh = name.dataHandle(multipart = true);

// ASSERT(dh);

// handles_[ key ] = dh;

// dh->openForAppend(0);

// return *dh;

// }

/// @note: code for S3 object (key) per index store:
// void S3Store::closeDataHandles() {

// for ( HandleStore::iterator j = handles_.begin(); j != handles_.end(); ++j ) {
// eckit::DataHandle *dh = j->second;
// dh->close();
// delete dh;
// }

// handles_.clear();

// }

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5

0 comments on commit 2cd18fe

Please sign in to comment.