From 1d6857d3bbf805c2e56ab44ddf1cbda1e4b38317 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 6 Jan 2025 15:47:46 +0000 Subject: [PATCH 1/3] Add callback interface class for serverside callback registration --- src/fdb5/api/FDB.cc | 9 ++++----- src/fdb5/api/FDB.h | 2 -- src/fdb5/api/FDBFactory.h | 5 +---- src/fdb5/api/LocalFDB.cc | 3 ++- src/fdb5/api/helpers/Callback.h | 23 ++++++++++++++++++++--- src/fdb5/database/ArchiveVisitor.h | 2 +- src/fdb5/database/Archiver.h | 2 +- src/fdb5/remote/server/StoreHandler.cc | 11 +++++++++-- src/fdb5/remote/server/StoreHandler.h | 3 ++- 9 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 2720f46fa..8e06c18b8 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -41,7 +41,7 @@ FDB::FDB(const Config &config) : internal_(FDBFactory::instance().build(config)), dirty_(false), reportStats_(config.getBool("statistics", false)) { - LibFdb5::instance().constructorCallback()(*this); + LibFdb5::instance().constructorCallback()(*internal_); } @@ -290,7 +290,6 @@ void FDB::flush() { timer.start(); internal_->flush(); - flushCallback_(); dirty_ = false; timer.stop(); @@ -328,12 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const { return internal_->enabled(controlIdentifier); } -void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename +void FDB::registerArchiveCallback(ArchiveCallback callback) { internal_->registerArchiveCallback(callback); } -void FDB::registerFlushCallback(FlushCallback callback) { // todo rename - flushCallback_ = callback; +void FDB::registerFlushCallback(FlushCallback callback) { + internal_->registerFlushCallback(callback); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index 28529795c..75c3b5de4 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -158,8 +158,6 @@ class FDB { bool reportStats_; FDBStats stats_; - - FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/FDBFactory.h b/src/fdb5/api/FDBFactory.h index 6661a475e..f1057b2a9 100644 --- a/src/fdb5/api/FDBFactory.h +++ b/src/fdb5/api/FDBFactory.h @@ -60,7 +60,7 @@ class FDBToolRequest; /// The base class that FDB implementations are derived from -class FDBBase : private eckit::NonCopyable { +class FDBBase : private eckit::NonCopyable, public CallbackInterface { public: // methods @@ -95,8 +95,6 @@ class FDBBase : private eckit::NonCopyable { virtual AxesIterator axesIterator(const FDBToolRequest& request, int axes) = 0; - void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;} - // -------------- API management ---------------------------- /// ID used for hashing in the Rendezvous hash. Should be unique amongst those used @@ -133,7 +131,6 @@ class FDBBase : private eckit::NonCopyable { bool disabled_; - ArchiveCallback callback_ = CALLBACK_NOOP; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/api/LocalFDB.cc b/src/fdb5/api/LocalFDB.cc index 8c2d680a7..215532105 100644 --- a/src/fdb5/api/LocalFDB.cc +++ b/src/fdb5/api/LocalFDB.cc @@ -51,7 +51,7 @@ void LocalFDB::archive(const Key& key, const void* data, size_t length) { if (!archiver_) { LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl; - archiver_.reset(new Archiver(config_, callback_)); + archiver_.reset(new Archiver(config_, archiveCallback_)); } archiver_->archive(key, data, length); @@ -133,6 +133,7 @@ AxesIterator LocalFDB::axesIterator(const FDBToolRequest& request, int level) { void LocalFDB::flush() { if (archiver_) { archiver_->flush(); + flushCallback_(); } } diff --git a/src/fdb5/api/helpers/Callback.h b/src/fdb5/api/helpers/Callback.h index 1ea128c6e..52545bb8a 100644 --- a/src/fdb5/api/helpers/Callback.h +++ b/src/fdb5/api/helpers/Callback.h @@ -21,13 +21,30 @@ namespace fdb5 { class FDB; +class CallbackInterface; using ArchiveCallback = std::function>)>; using FlushCallback = std::function; -using ConstructorCallback = std::function; +using ConstructorCallback = std::function; -static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; +static const ArchiveCallback CALLBACK_ARCHIVE_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; -static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {}; +static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](CallbackInterface&) {}; + +// ------------------------------------------------------------------------------------------------- + +// This class provides a common interface for registering callbacks with an FDB object or a Store/Catalogue Handler. +class CallbackInterface { +public: + + void registerFlushCallback(FlushCallback callback) {flushCallback_ = callback;} + void registerArchiveCallback(ArchiveCallback callback) {archiveCallback_ = callback;} + +protected: + + FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP; + ArchiveCallback archiveCallback_ = CALLBACK_ARCHIVE_NOOP; + +}; } // namespace fdb5 diff --git a/src/fdb5/database/ArchiveVisitor.h b/src/fdb5/database/ArchiveVisitor.h index 3d1c08f04..7f2d9c142 100644 --- a/src/fdb5/database/ArchiveVisitor.h +++ b/src/fdb5/database/ArchiveVisitor.h @@ -31,7 +31,7 @@ class ArchiveVisitor : public BaseArchiveVisitor { public: // methods - ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_NOOP); + ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP); protected: // methods diff --git a/src/fdb5/database/Archiver.h b/src/fdb5/database/Archiver.h index 8d4b623b2..ac6826b8e 100644 --- a/src/fdb5/database/Archiver.h +++ b/src/fdb5/database/Archiver.h @@ -47,7 +47,7 @@ class Archiver : public eckit::NonCopyable { public: // methods - Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_NOOP); + Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP); virtual ~Archiver(); diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index a80603b7a..821cc1fed 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -169,9 +169,16 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID Store& ss = store(clientID, dbKey); - std::unique_ptr location = ss.archive(idxKey, charData + s.position(), length - s.position()); + std::shared_ptr location = ss.archive(idxKey, charData + s.position(), length - s.position()); + + std::promise> promise; + promise.set_value(location); + + Key fullkey = Key::parseString(ss_key.str()); + archiveCallback_(fullkey, data, length, promise.get_future()); + Log::status() << "Archiving done: " << ss_key.str() << std::endl; - + eckit::Buffer buffer(16 * 1024); MemoryStream stream(buffer); stream << (*location); diff --git a/src/fdb5/remote/server/StoreHandler.h b/src/fdb5/remote/server/StoreHandler.h index 92abb9a36..df4b3305d 100644 --- a/src/fdb5/remote/server/StoreHandler.h +++ b/src/fdb5/remote/server/StoreHandler.h @@ -10,6 +10,7 @@ #pragma once +#include "fdb5/api/helpers/Callback.h" #include "fdb5/database/Store.h" #include "fdb5/remote/server/ServerConnection.h" @@ -29,7 +30,7 @@ struct StoreHelper { }; //---------------------------------------------------------------------------------------------------------------------- -class StoreHandler : public ServerConnection { +class StoreHandler : public ServerConnection, public CallbackInterface { public: // methods StoreHandler(eckit::net::TCPSocket& socket, const Config& config); From b10d2056b0ce4d2172b85e52edae02bc4313bc6c Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 6 Jan 2025 17:54:19 +0000 Subject: [PATCH 2/3] Constructor/Flush callbacks. Fix merging of key parts --- src/fdb5/remote/server/StoreHandler.cc | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 821cc1fed..39a9cbbec 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -21,7 +21,9 @@ using metkit::mars::MarsRequest; namespace fdb5::remote { StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config): - ServerConnection(socket, config) {} + ServerConnection(socket, config) { + LibFdb5::instance().constructorCallback()(*this); + } StoreHandler::~StoreHandler() {} @@ -174,8 +176,11 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID std::promise> promise; promise.set_value(location); - Key fullkey = Key::parseString(ss_key.str()); - archiveCallback_(fullkey, data, length, promise.get_future()); + eckit::StringDict dict = dbKey.keyDict(); + dict.insert(idxKey.keyDict().begin(), idxKey.keyDict().end()); + const Key fullkey(dict); /// @note: we do not have the third level of the key. + + archiveCallback_(fullkey, charData + s.position(), length - s.position(), promise.get_future()); Log::status() << "Archiving done: " << ss_key.str() << std::endl; @@ -203,6 +208,8 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf auto it = stores_.find(clientID); ASSERT(it != stores_.end()); it->second.store->flush(); + + flushCallback_(); } Log::info() << "Flush complete" << std::endl; From 2acd0cff8ffe50cab70a6eb896457cc9605070ed Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 14 Jan 2025 17:15:12 +0000 Subject: [PATCH 3/3] Rename to CallbackRegistry --- src/fdb5/api/FDBFactory.h | 2 +- src/fdb5/api/helpers/Callback.h | 10 +++++----- src/fdb5/remote/server/StoreHandler.h | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/fdb5/api/FDBFactory.h b/src/fdb5/api/FDBFactory.h index f1057b2a9..6ec6e7b99 100644 --- a/src/fdb5/api/FDBFactory.h +++ b/src/fdb5/api/FDBFactory.h @@ -60,7 +60,7 @@ class FDBToolRequest; /// The base class that FDB implementations are derived from -class FDBBase : private eckit::NonCopyable, public CallbackInterface { +class FDBBase : private eckit::NonCopyable, public CallbackRegistry { public: // methods diff --git a/src/fdb5/api/helpers/Callback.h b/src/fdb5/api/helpers/Callback.h index 52545bb8a..a628eb8b7 100644 --- a/src/fdb5/api/helpers/Callback.h +++ b/src/fdb5/api/helpers/Callback.h @@ -21,20 +21,20 @@ namespace fdb5 { class FDB; -class CallbackInterface; +class CallbackRegistry; using ArchiveCallback = std::function>)>; using FlushCallback = std::function; -using ConstructorCallback = std::function; +using ConstructorCallback = std::function; -static const ArchiveCallback CALLBACK_ARCHIVE_NOOP = [](const Key& key, const void* data, size_t length, std::future>) {}; +static const ArchiveCallback CALLBACK_ARCHIVE_NOOP = [](auto&&...) {}; static const FlushCallback CALLBACK_FLUSH_NOOP = []() {}; -static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](CallbackInterface&) {}; +static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](auto&&...) {}; // ------------------------------------------------------------------------------------------------- // This class provides a common interface for registering callbacks with an FDB object or a Store/Catalogue Handler. -class CallbackInterface { +class CallbackRegistry { public: void registerFlushCallback(FlushCallback callback) {flushCallback_ = callback;} diff --git a/src/fdb5/remote/server/StoreHandler.h b/src/fdb5/remote/server/StoreHandler.h index df4b3305d..40362b09a 100644 --- a/src/fdb5/remote/server/StoreHandler.h +++ b/src/fdb5/remote/server/StoreHandler.h @@ -30,7 +30,7 @@ struct StoreHelper { }; //---------------------------------------------------------------------------------------------------------------------- -class StoreHandler : public ServerConnection, public CallbackInterface { +class StoreHandler : public ServerConnection, public CallbackRegistry { public: // methods StoreHandler(eckit::net::TCPSocket& socket, const Config& config);