Skip to content

Commit

Permalink
Merge pull request #63 from ecmwf/feature/serverside-callback
Browse files Browse the repository at this point in the history
Add callback interface class for serverside callback registration
  • Loading branch information
danovaro authored Jan 14, 2025
2 parents 6883e58 + 2acd0cf commit 22e9a44
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 21 deletions.
9 changes: 4 additions & 5 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ FDB::FDB(const Config &config) :
internal_(FDBFactory::instance().build(config)),
dirty_(false),
reportStats_(config.getBool("statistics", false)) {
LibFdb5::instance().constructorCallback()(*this);
LibFdb5::instance().constructorCallback()(*internal_);
}


Expand Down Expand Up @@ -290,7 +290,6 @@ void FDB::flush() {
timer.start();

internal_->flush();
flushCallback_();
dirty_ = false;

timer.stop();
Expand Down Expand Up @@ -328,12 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename
void FDB::registerArchiveCallback(ArchiveCallback callback) {
internal_->registerArchiveCallback(callback);
}

void FDB::registerFlushCallback(FlushCallback callback) { // todo rename
flushCallback_ = callback;
void FDB::registerFlushCallback(FlushCallback callback) {
internal_->registerFlushCallback(callback);
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 0 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class FDB {
bool reportStats_;

FDBStats stats_;

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
5 changes: 1 addition & 4 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FDBToolRequest;

/// The base class that FDB implementations are derived from

class FDBBase : private eckit::NonCopyable {
class FDBBase : private eckit::NonCopyable, public CallbackRegistry {

public: // methods

Expand Down Expand Up @@ -95,8 +95,6 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axesIterator(const FDBToolRequest& request, int axes) = 0;

void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique amongst those used
Expand Down Expand Up @@ -133,7 +131,6 @@ class FDBBase : private eckit::NonCopyable {

bool disabled_;

ArchiveCallback callback_ = CALLBACK_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/api/LocalFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void LocalFDB::archive(const Key& key, const void* data, size_t length) {

if (!archiver_) {
LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl;
archiver_.reset(new Archiver(config_, callback_));
archiver_.reset(new Archiver(config_, archiveCallback_));
}

archiver_->archive(key, data, length);
Expand Down Expand Up @@ -133,6 +133,7 @@ AxesIterator LocalFDB::axesIterator(const FDBToolRequest& request, int level) {
void LocalFDB::flush() {
if (archiver_) {
archiver_->flush();
flushCallback_();
}
}

Expand Down
23 changes: 20 additions & 3 deletions src/fdb5/api/helpers/Callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,30 @@
namespace fdb5 {

class FDB;
class CallbackRegistry;

using ArchiveCallback = std::function<void(const Key& key, const void* data, size_t length, std::future<std::shared_ptr<const FieldLocation>>)>;
using FlushCallback = std::function<void()>;
using ConstructorCallback = std::function<void(FDB&)>;
using ConstructorCallback = std::function<void(CallbackRegistry&)>;

static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future<std::shared_ptr<const FieldLocation>>) {};
static const ArchiveCallback CALLBACK_ARCHIVE_NOOP = [](auto&&...) {};
static const FlushCallback CALLBACK_FLUSH_NOOP = []() {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](auto&&...) {};

// -------------------------------------------------------------------------------------------------

// This class provides a common interface for registering callbacks with an FDB object or a Store/Catalogue Handler.
class CallbackRegistry {
public:

void registerFlushCallback(FlushCallback callback) {flushCallback_ = callback;}
void registerArchiveCallback(ArchiveCallback callback) {archiveCallback_ = callback;}

protected:

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
ArchiveCallback archiveCallback_ = CALLBACK_ARCHIVE_NOOP;

};

} // namespace fdb5
2 changes: 1 addition & 1 deletion src/fdb5/database/ArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ArchiveVisitor : public BaseArchiveVisitor {

public: // methods

ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_NOOP);
ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP);

protected: // methods

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Archiver : public eckit::NonCopyable {

public: // methods

Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_NOOP);
Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP);

virtual ~Archiver();

Expand Down
20 changes: 17 additions & 3 deletions src/fdb5/remote/server/StoreHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ using metkit::mars::MarsRequest;
namespace fdb5::remote {

StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config):
ServerConnection(socket, config) {}
ServerConnection(socket, config) {
LibFdb5::instance().constructorCallback()(*this);
}

StoreHandler::~StoreHandler() {}

Expand Down Expand Up @@ -169,9 +171,19 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID

Store& ss = store(clientID, dbKey);

std::unique_ptr<const FieldLocation> location = ss.archive(idxKey, charData + s.position(), length - s.position());
std::shared_ptr<const FieldLocation> location = ss.archive(idxKey, charData + s.position(), length - s.position());

std::promise<std::shared_ptr<const FieldLocation>> promise;
promise.set_value(location);

eckit::StringDict dict = dbKey.keyDict();
dict.insert(idxKey.keyDict().begin(), idxKey.keyDict().end());
const Key fullkey(dict); /// @note: we do not have the third level of the key.

archiveCallback_(fullkey, charData + s.position(), length - s.position(), promise.get_future());

Log::status() << "Archiving done: " << ss_key.str() << std::endl;

eckit::Buffer buffer(16 * 1024);
MemoryStream stream(buffer);
stream << (*location);
Expand All @@ -196,6 +208,8 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf
auto it = stores_.find(clientID);
ASSERT(it != stores_.end());
it->second.store->flush();

flushCallback_();
}

Log::info() << "Flush complete" << std::endl;
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/remote/server/StoreHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "fdb5/api/helpers/Callback.h"
#include "fdb5/database/Store.h"
#include "fdb5/remote/server/ServerConnection.h"

Expand All @@ -29,7 +30,7 @@ struct StoreHelper {
};

//----------------------------------------------------------------------------------------------------------------------
class StoreHandler : public ServerConnection {
class StoreHandler : public ServerConnection, public CallbackRegistry {
public: // methods

StoreHandler(eckit::net::TCPSocket& socket, const Config& config);
Expand Down

0 comments on commit 22e9a44

Please sign in to comment.