Skip to content

Commit

Permalink
Initial ArchiveCallback work
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed May 28, 2024
1 parent 29420df commit 3eb793b
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ list( APPEND fdb5_srcs
api/helpers/PurgeIterator.h
api/helpers/StatsIterator.cc
api/helpers/StatsIterator.h
api/helpers/ArchiveCallback.h

api/local/QueryVisitor.h
api/local/QueueStringLogTarget.h
Expand Down
9 changes: 5 additions & 4 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ FDB::~FDB() {
}
}

void FDB::archive(eckit::message::Message msg) {
Key FDB::archive(eckit::message::Message msg, ArchiveCallback callback) {
fdb5::Key key = MessageDecoder::messageToKey(msg);
archive(key, msg.data(), msg.length());
return archive(key, msg.data(), msg.length(), callback);
}
void FDB::archive(eckit::DataHandle& handle) {
eckit::message::Message msg;
Expand Down Expand Up @@ -97,7 +97,7 @@ void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& h
}
}

void FDB::archive(const Key& key, const void* data, size_t length) {
Key FDB::archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {
eckit::Timer timer;
timer.start();

Expand All @@ -118,11 +118,12 @@ void FDB::archive(const Key& key, const void* data, size_t length) {
keyInternal.unset("stepunits");
}

internal_->archive(keyInternal, data, length);
internal_->archive(keyInternal, data, length, callback);
dirty_ = true;

timer.stop();
stats_.addArchive(length, timer);
return keyInternal; /* TODO: Not convinced this key is useful to return.*/
}

bool FDB::sorted(const metkit::mars::MarsRequest &request) {
Expand Down
7 changes: 5 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "fdb5/api/helpers/WipeIterator.h"
#include "fdb5/api/helpers/MoveIterator.h"
#include "fdb5/config/Config.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -69,13 +70,15 @@ class FDB {

// -------------- Primary API functions ----------------------------

void archive(eckit::message::Message msg);
Key archive(eckit::message::Message msg, ArchiveCallback callback = nullptr);
void archive(eckit::DataHandle& handle);
void archive(const void* data, size_t length);
// warning: not high-perf API - makes sure that all the requested fields are archived and there are no data exceeding the request
void archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& handle);

// disclaimer: this is a low-level API. The provided key and the corresponding data are not checked for consistency
void archive(const Key& key, const void* data, size_t length);
// Optional callback function is called upon receiving field location from the store.
Key archive(const Key& key, const void* data, size_t length, ArchiveCallback callback = nullptr);

/// Flushes all buffers and closes all data handles into a consistent DB state
/// @note always safe to call
Expand Down
4 changes: 4 additions & 0 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "fdb5/api/helpers/PurgeIterator.h"
#include "fdb5/api/helpers/StatsIterator.h"
#include "fdb5/api/helpers/StatusIterator.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -65,6 +66,9 @@ class FDBBase : private eckit::NonCopyable {
// -------------- Primary API functions ----------------------------

virtual void archive(const Key& key, const void* data, size_t length) = 0;

/* TODO Using a seperate ArchiveCallback method for developing/experimentation convenience */
virtual void archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {NOTIMP;};

virtual void flush() = 0;

Expand Down
7 changes: 6 additions & 1 deletion src/fdb5/api/LocalFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ using namespace eckit;


namespace fdb5 {

void LocalFDB::archive(const Key& key, const void* data, size_t length) {
archive(key, data, length, nullptr);
}

void LocalFDB::archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) {

if (!archiver_) {
LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl;
archiver_.reset(new Archiver(config_));
}

archiver_->archive(key, data, length);
archiver_->archive(key, data, length, callback);
}

ListIterator LocalFDB::inspect(const metkit::mars::MarsRequest &request) {
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/api/LocalFDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class LocalFDB : public FDBBase {
using FDBBase::stats;

void archive(const Key& key, const void* data, size_t length) override;
void archive(const Key& key, const void* data, size_t length, ArchiveCallback callback) override;

ListIterator inspect(const metkit::mars::MarsRequest& request) override;

Expand Down
25 changes: 25 additions & 0 deletions src/fdb5/api/helpers/ArchiveCallback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/*
* This software was developed as part of the EC H2020 funded project NextGenIO
* (Project ID: 671951) www.nextgenio.eu
*/

#pragma once

#include "fdb5/database/Key.h"
#include "fdb5/database/FieldLocation.h"

namespace fdb5 {

using ArchiveCallback = std::function<void(const Key&, const FieldLocation&)>;

} // namespace fdb5
8 changes: 4 additions & 4 deletions src/fdb5/database/ArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

namespace fdb5 {

ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size) :
ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size, ArchiveCallback callback) :
BaseArchiveVisitor(owner, field),
data_(data),
size_(size) {
size_(size),
callback_(callback){
}

bool ArchiveVisitor::selectDatum(const Key &key, const Key &full) {
Expand All @@ -29,8 +30,7 @@ bool ArchiveVisitor::selectDatum(const Key &key, const Key &full) {

ASSERT(current());

current()->archive(key, data_, size_);

current()->archive(key, data_, size_, full, callback_);

return true;
}
Expand Down
4 changes: 3 additions & 1 deletion src/fdb5/database/ArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ArchiveVisitor : public BaseArchiveVisitor {

public: // methods

ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size);
ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size, ArchiveCallback callback = nullptr);

protected: // methods

Expand All @@ -43,6 +43,8 @@ class ArchiveVisitor : public BaseArchiveVisitor {
const void *data_;
size_t size_;

ArchiveCallback callback_;

};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/database/Archiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Archiver::~Archiver() {
databases_.clear(); //< explicitly delete the DBs before schemas are destroyed
}

void Archiver::archive(const Key &key, const void* data, size_t len) {
ArchiveVisitor visitor(*this, key, data, len);
void Archiver::archive(const Key &key, const void* data, size_t len, ArchiveCallback callback) {
ArchiveVisitor visitor(*this, key, data, len, callback);
archive(key, visitor);
}

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Archiver : public eckit::NonCopyable {
virtual ~Archiver();

void archive(const Key &key, BaseArchiveVisitor& visitor);
void archive(const Key &key, const void* data, size_t len);
void archive(const Key &key, const void* data, size_t len, ArchiveCallback callback = nullptr);

/// Flushes all buffers and closes all data handles into a consistent DB state
/// @note always safe to call
Expand Down
11 changes: 9 additions & 2 deletions src/fdb5/database/DB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "fdb5/database/DB.h"
#include "fdb5/database/Field.h"
#include "fdb5/toc/TocEngine.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

using eckit::Log;

Expand Down Expand Up @@ -103,13 +104,19 @@ eckit::DataHandle *DB::retrieve(const Key& key) {
return nullptr;
}

void DB::archive(const Key& key, const void* data, eckit::Length length) {
void DB::archive(const Key& key, const void* data, eckit::Length length, const Key& fullKey, ArchiveCallback callback) {

CatalogueWriter* cat = dynamic_cast<CatalogueWriter*>(catalogue_.get());
ASSERT(cat);

const Index& idx = cat->currentIndex();
cat->archive(key, store().archive(idx.key(), data, length));
std::unique_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

if (callback) {
callback(fullKey, *location);
}

cat->archive(key, std::move(location));
}

bool DB::open() {
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/database/DB.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "fdb5/database/EntryVisitMechanism.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
class DataHandle;
Expand Down Expand Up @@ -56,7 +57,7 @@ class DB final : public eckit::OwnedLock {
bool axis(const std::string &keyword, eckit::StringSet &s) const;
bool inspect(const Key& key, Field& field);
eckit::DataHandle *retrieve(const Key &key);
void archive(const Key &key, const void *data, eckit::Length length);
void archive(const Key &key, const void *data, eckit::Length length, const Key &full, ArchiveCallback callback = nullptr);

bool open();
void flush();
Expand Down
1 change: 1 addition & 0 deletions tests/fdb/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ list( APPEND api_tests
select
dist
fdb_c
archive_callback
)

foreach( _test ${api_tests} )
Expand Down
101 changes: 101 additions & 0 deletions tests/fdb/api/test_archive_callback.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@


#include "eckit/testing/Test.h"
#include "fdb5/api/FDB.h"

namespace {
bool testEqual(const fdb5::Key& key1, const fdb5::Key& key2) {
if (key1.size() != key2.size()) {
std::cout << "key1.size() != key2.size()" << std::endl;
return false;
}

// Then check that all items in key1 are in key2
for (const auto& item : key1) {
if (key2.find(item.first) == key2.end()) {
return false;
}

if (key2.value(item.first) != item.second) {
return false;
}
}

return true;
}

} // namespace anonymous


namespace fdb5::test {

//----------------------------------------------------------------------------------------------------------------------
CASE("Archive callback 2") {
FDB fdb;

std::string data_str = "Raining cats and dogs";
const void* data = static_cast<const void *>(data_str.c_str());
size_t length = data_str.size();

Key key;
key.set("class","od");
key.set("expver","xxxx");
key.set("type","fc");
key.set("stream","oper");
key.set("date","20101010");
key.set("time","0000");
key.set("domain","g");
key.set("levtype","sfc");
key.set("param","130");

std::map<fdb5::Key, eckit::URI> map;
std::vector<Key> internalKeys;

key.set("step","1");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));

key.set("step","2");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));

key.set("step","3");
internalKeys.push_back(fdb.archive(key, data, length, [&map, key] (const fdb5::Key& fullKey, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
}));

fdb.flush();

EXPECT(map.size() == 3);

// Print out the map
for (const auto& [key, uri] : map) {
std::cout << key << " -> " << uri << std::endl;
}

// Note that the keys are not the same, but they are equivalent
// so iterate over the map keys and check that there is a corresponding key in the internalKeys vector
for (const auto& [key, uri] : map) {
bool found = false;
for (const auto& internalKey : internalKeys) {
if (testEqual(key, internalKey)) {
found = true;
break;
}
}
EXPECT(found);
}

}
//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::test

int main(int argc, char** argv) {

eckit::Log::info() << ::getenv("FDB_HOME") << std::endl;

return ::eckit::testing::run_tests(argc, argv);
}

0 comments on commit 3eb793b

Please sign in to comment.