Skip to content

Commit

Permalink
Register callback
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed May 30, 2024
1 parent 3eb793b commit 1a6b7d7
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 58 deletions.
14 changes: 9 additions & 5 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ FDB::~FDB() {
}
}

Key FDB::archive(eckit::message::Message msg, ArchiveCallback callback) {
Key FDB::archive(eckit::message::Message msg) {
fdb5::Key key = MessageDecoder::messageToKey(msg);
return archive(key, msg.data(), msg.length(), callback);
return archive(key, msg.data(), msg.length());
}
void FDB::archive(eckit::DataHandle& handle) {
eckit::message::Message msg;
Expand Down Expand Up @@ -97,7 +97,7 @@ void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& h
}
}

Key FDB::archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {
Key FDB::archive(const Key& key, const void* data, size_t length) {
eckit::Timer timer;
timer.start();

Expand All @@ -118,12 +118,12 @@ Key FDB::archive(const Key& key, const void* data, size_t length, ArchiveCallbac
keyInternal.unset("stepunits");
}

internal_->archive(keyInternal, data, length, callback);
internal_->archive(keyInternal, data, length);
dirty_ = true;

timer.stop();
stats_.addArchive(length, timer);
return keyInternal; /* TODO: Not convinced this key is useful to return.*/
return keyInternal;
}

bool FDB::sorted(const metkit::mars::MarsRequest &request) {
Expand Down Expand Up @@ -322,6 +322,10 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerCallback(ArchiveCallback callback) {
internal_->registerCallback(callback);
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5
8 changes: 6 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ class FDB {

// -------------- Primary API functions ----------------------------

Key archive(eckit::message::Message msg, ArchiveCallback callback = nullptr);
Key archive(eckit::message::Message msg);
void archive(eckit::DataHandle& handle);
void archive(const void* data, size_t length);
// warning: not high-perf API - makes sure that all the requested fields are archived and there are no data exceeding the request
void archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& handle);

// disclaimer: this is a low-level API. The provided key and the corresponding data are not checked for consistency
// Optional callback function is called upon receiving field location from the store.
Key archive(const Key& key, const void* data, size_t length, ArchiveCallback callback = nullptr);
Key archive(const Key& key, const void* data, size_t length);

/// Flushes all buffers and closes all data handles into a consistent DB state
/// @note always safe to call
Expand Down Expand Up @@ -119,6 +119,8 @@ class FDB {

bool dirty() const;

void registerCallback(ArchiveCallback callback);

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique.
Expand Down Expand Up @@ -152,6 +154,8 @@ class FDB {
bool reportStats_;

FDBStats stats_;

ArchiveCallback callback_ = nullptr;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
7 changes: 4 additions & 3 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ class FDBBase : private eckit::NonCopyable {

virtual void archive(const Key& key, const void* data, size_t length) = 0;

/* TODO Using a seperate ArchiveCallback method for developing/experimentation convenience */
virtual void archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {NOTIMP;};

virtual void flush() = 0;

virtual ListIterator inspect(const metkit::mars::MarsRequest& request) = 0;
Expand All @@ -94,6 +91,8 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; }

void registerCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique amongst those used
Expand Down Expand Up @@ -129,6 +128,8 @@ class FDBBase : private eckit::NonCopyable {
ControlIdentifiers controlIdentifiers_;

bool disabled_;

ArchiveCallback callback_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 1 addition & 5 deletions src/fdb5/api/LocalFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,13 @@ using namespace eckit;
namespace fdb5 {

void LocalFDB::archive(const Key& key, const void* data, size_t length) {
archive(key, data, length, nullptr);
}

void LocalFDB::archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {

if (!archiver_) {
LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl;
archiver_.reset(new Archiver(config_));
}

archiver_->archive(key, data, length, callback);
archiver_->archive(key, data, length, callback_);
}

ListIterator LocalFDB::inspect(const metkit::mars::MarsRequest &request) {
Expand Down
1 change: 0 additions & 1 deletion src/fdb5/api/LocalFDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class LocalFDB : public FDBBase {
using FDBBase::stats;

void archive(const Key& key, const void* data, size_t length) override;
void archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) override;

ListIterator inspect(const metkit::mars::MarsRequest& request) override;

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/ArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bool ArchiveVisitor::selectDatum(const Key &key, const Key &full) {

ASSERT(current());

current()->archive(key, data_, size_, full, callback_);
current()->archive(key, data_, size_, field_, callback_); //field_ is the user key! (or rather, "internal key")

return true;
}
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/database/BaseArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class BaseArchiveVisitor : public WriteVisitor {

fdb5::DB* current() const;

protected: // members

const Key &field_;

private: // members

Archiver &owner_;

const Key &field_;

bool checkMissingKeysOnWrite_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/database/DB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ eckit::DataHandle *DB::retrieve(const Key& key) {
return nullptr;
}

void DB::archive(const Key& key, const void* data, eckit::Length length, const Key& fullKey, ArchiveCallback callback) {
void DB::archive(const Key& key, const void* data, eckit::Length length, const Key& internalKey, ArchiveCallback callback) {

CatalogueWriter* cat = dynamic_cast<CatalogueWriter*>(catalogue_.get());
ASSERT(cat);
Expand All @@ -113,7 +113,7 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K
std::unique_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

if (callback) {
callback(fullKey, *location);
callback(internalKey, *location);
}

cat->archive(key, std::move(location));
Expand Down
46 changes: 9 additions & 37 deletions tests/fdb/api/test_archive_callback.cc
Original file line number Diff line number Diff line change
@@ -1,36 +1,10 @@


#include "eckit/testing/Test.h"
#include "fdb5/api/FDB.h"

namespace {
bool testEqual(const fdb5::Key& key1, const fdb5::Key& key2) {
if (key1.size() != key2.size()) {
std::cout << "key1.size() != key2.size()" << std::endl;
return false;
}

// Then check that all items in key1 are in key2
for (const auto& item : key1) {
if (key2.find(item.first) == key2.end()) {
return false;
}

if (key2.value(item.first) != item.second) {
return false;
}
}

return true;
}

} // namespace anonymous


namespace fdb5::test {

//----------------------------------------------------------------------------------------------------------------------
CASE("Archive callback 2") {
CASE("Archive callback") {
FDB fdb;

std::string data_str = "Raining cats and dogs";
Expand All @@ -51,20 +25,18 @@ CASE("Archive callback 2") {
std::map<fdb5::Key, eckit::URI> map;
std::vector<Key> internalKeys;

fdb.registerCallback([&map] (const fdb5::Key& internalKey, const fdb5::FieldLocation& location) {
map[internalKey] = location.fullUri();
});

key.set("step","1");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));
internalKeys.push_back(fdb.archive(key, data, length));

key.set("step","2");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));
internalKeys.push_back(fdb.archive(key, data, length));

key.set("step","3");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));
internalKeys.push_back(fdb.archive(key, data, length));

fdb.flush();

Expand All @@ -80,7 +52,7 @@ CASE("Archive callback 2") {
for (const auto& [key, uri] : map) {
bool found = false;
for (const auto& internalKey : internalKeys) {
if (testEqual(key, internalKey)) {
if (key == internalKey){
found = true;
break;
}
Expand Down

0 comments on commit 1a6b7d7

Please sign in to comment.