Skip to content

Commit

Permalink
feat(remoteFDB): added overlay support
Browse files Browse the repository at this point in the history
blob
  • Loading branch information
mcakircali committed Jan 20, 2025
1 parent 464040e commit abe7027
Show file tree
Hide file tree
Showing 22 changed files with 353 additions and 234 deletions.
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter {
/// Mount an existing TocCatalogue, which has a different metadata key (within
/// constraints) to allow on-line rebadging of data
/// variableKeys: The keys that are allowed to differ between the two DBs
void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) override { NOTIMP; };
void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) override { NOTIMP; };

// // Hide the contents of the DB!!!
// void hideContents() override;
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class CatalogueReader : virtual public Catalogue {
public:

CatalogueReader() {}

virtual ~CatalogueReader() {}

virtual DbStats stats() const = 0;
Expand All @@ -156,7 +156,7 @@ class CatalogueWriter : virtual public Catalogue {
virtual const Index& currentIndex() = 0;
virtual const Key currentIndexKey();
virtual void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr<const FieldLocation> fieldLocation) = 0;
virtual void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) = 0;
virtual void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) = 0;
virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0;
virtual void reconsolidate() = 0;
};
Expand Down
2 changes: 0 additions & 2 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Connection::Connection() : single_(false) { }

void Connection::teardown() {

if (!single_) {
Expand Down
5 changes: 2 additions & 3 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#pragma once

#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/Messages.h"

#include "eckit/exception/Exceptions.h"
Expand Down Expand Up @@ -52,7 +51,7 @@ class Connection : eckit::NonCopyable {
using PayloadList = std::vector<Payload>;

public: // methods
Connection();
Connection() = default;

virtual ~Connection() = default;

Expand Down Expand Up @@ -82,7 +81,7 @@ class Connection : eckit::NonCopyable {
virtual const eckit::net::TCPSocket& dataSocket() const = 0;

protected: // members
bool single_;
bool single_ {false};

private: // members
mutable std::mutex controlMutex_;
Expand Down
18 changes: 18 additions & 0 deletions src/fdb5/remote/Messages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@

#include "fdb5/remote/Messages.h"

#include <cstddef>
#include <ostream>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Payload::Payload(const BufferStream& buffer) : length {buffer.length()}, data {buffer.data()} { }

Payload::Payload(const std::size_t length, const void* data) : length {length}, data {data} { }

bool Payload::empty() const {
return data == nullptr && length == 0;
}

bool Payload::consistent() const {
return ((length == 0) ^ (data == nullptr)) == 0;
}

//----------------------------------------------------------------------------------------------------------------------

std::ostream& operator<<(std::ostream& s, const Message& m) {
switch (m) {
case Message::None: s << "None"; break;
Expand Down Expand Up @@ -46,6 +63,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {
case Message::Store: s << "Store"; break;
case Message::Axes: s << "Axes"; break;
case Message::Exists: s << "Exists"; break;
case Message::Overlay: s << "Overlay"; break;

// Responses
case Message::Received: s << "Received"; break;
Expand Down
44 changes: 34 additions & 10 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,50 @@

#pragma once

#include "eckit/io/Buffer.h"
#include "eckit/serialisation/MemoryStream.h"
#include "eckit/types/FixedString.h"

#include <cmath>
#include <cstddef>
#include <cstdint>

#include "eckit/types/FixedString.h"

namespace eckit {
class Stream;
}
#include <iosfwd>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

struct BufferStream;

struct Payload {
Payload(std::size_t length, const void* data) : length {length}, data {data} { }
Payload() = default;

explicit Payload(const BufferStream& buffer);

Payload(std::size_t length, const void* data);

bool empty() const;

/// @brief Checks if this object is in a consistent state.
/// @returns True if (length & data) is (zero & null) or (non-zero & non-null).
bool consistent() const;

std::size_t length {0};
const void* data {nullptr};
};

struct BufferStream : private eckit::Buffer, public eckit::MemoryStream {
explicit BufferStream(const size_t size) : eckit::Buffer(size), eckit::MemoryStream(data(), size) { }

size_t length() const { return eckit::MemoryStream::position(); }

const void* data() const { return eckit::Buffer::data(); }

Payload payload() const { return {length(), data()}; }
};

//----------------------------------------------------------------------------------------------------------------------

enum class Message : uint16_t {

// Server instructions
Expand Down Expand Up @@ -67,6 +90,7 @@ enum class Message : uint16_t {
Store,
Axes,
Exists,
Overlay,

// Responses
Received = 200,
Expand All @@ -86,11 +110,11 @@ std::ostream& operator<<(std::ostream& s, const Message& m);
class MessageHeader {

public: // types
constexpr static uint16_t currentVersion = 12;
static constexpr uint16_t currentVersion = 12;

constexpr static const auto hashBytes = 16;
static constexpr uint16_t hashBytes = 16;

constexpr static const auto markerBytes = 4;
static constexpr uint16_t markerBytes = 4;

using MarkerType = eckit::FixedString<markerBytes>;

Expand Down
30 changes: 15 additions & 15 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,33 @@ Client::~Client() {
connection_.remove(id_);
}

void Client::controlWriteCheckResponse(const Message msg,
const uint32_t requestID,
const bool dataListener,
const void* const payload,
const uint32_t payloadLength) const {
//----------------------------------------------------------------------------------------------------------------------

void Client::controlWriteCheckResponse(const Message msg,
const uint32_t requestID,
const bool dataListener,
const Payload payload) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);
ASSERT(payload.consistent());
std::lock_guard lock(blockingRequestMutex_);

PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }
if (!payload.empty()) { payloads.emplace_back(payload); }

auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads);
f.wait();
ASSERT(f.get().size() == 0);
}

eckit::Buffer Client::controlWriteReadResponse(const Message msg,
const uint32_t requestID,
const void* const payload,
const uint32_t payloadLength) const {
eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t requestID, const Payload payload) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);
ASSERT(payload.consistent());
std::lock_guard lock(blockingRequestMutex_);

PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }
if (!payload.empty()) { payloads.emplace_back(payload); }

auto f = connection_.controlWrite(*this, msg, requestID, false, payloads);
f.wait();
Expand All @@ -94,4 +92,6 @@ void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) {
connection_.dataWrite(*this, msg, requestID, std::move(payloads));
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::remote
47 changes: 37 additions & 10 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@

#pragma once

#include "eckit/memory/NonCopyable.h"
#include "eckit/net/Endpoint.h"

#include "fdb5/remote/Connection.h"
#include "fdb5/remote/Messages.h"
#include "fdb5/remote/client/ClientConnection.h"

#include "eckit/memory/NonCopyable.h"
#include "eckit/net/Endpoint.h"
#include "eckit/serialisation/MemoryStream.h"

#include <cstddef> // std::size_t
#include <cstdint> // std::uint32_t
#include <mutex>
#include <string>
#include <utility> // std::pair
#include <vector>

Expand Down Expand Up @@ -45,7 +49,7 @@ class Client : eckit::NonCopyable {
public: // methods
Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint);

Client(const EndpointList& endpoints);
explicit Client(const EndpointList& endpoints);

virtual ~Client();

Expand All @@ -60,21 +64,42 @@ class Client : eckit::NonCopyable {
uint32_t generateRequestID() const { return connection_.generateRequestID(); }

// blocking requests

void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, Payload payload = {}) const;

void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const BufferStream& buffer) const {
controlWriteCheckResponse(msg, requestID, dataListener, buffer.payload());
}

void controlWriteCheckResponse(Message msg,
uint32_t requestID,
bool dataListener,
const void* payload = nullptr,
uint32_t payloadLength = 0) const;
const void* payload,
uint32_t payloadLength) const {
controlWriteCheckResponse(msg, requestID, dataListener, {payloadLength, payload});
}

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, Payload payload = {}) const;

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, const BufferStream& buffer) const {
return controlWriteReadResponse(msg, requestID, buffer.payload());
}

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg,
uint32_t requestID,
const void* payload = nullptr,
uint32_t payloadLength = 0) const;

const void* payload,
uint32_t payloadLength) const {
return controlWriteReadResponse(msg, requestID, {payloadLength, payload});
}
void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {});

// handlers for incoming messages - to be defined in the client class
virtual bool handle(Message message, uint32_t requestID) = 0;

virtual bool handle(Message message, uint32_t requestID) = 0;

virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0;

protected:
Expand All @@ -89,4 +114,6 @@ class Client : eckit::NonCopyable {
mutable std::mutex blockingRequestMutex_;
};

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::remote
3 changes: 2 additions & 1 deletion src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const {

void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) {

static size_t maxQueueLength = eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320);
static const size_t maxQueueLength =
eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", defaultDataWriteQueueLength);

{
// retrieve or add client to the list
Expand Down
2 changes: 2 additions & 0 deletions src/fdb5/remote/client/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class DataWriteRequest;

class ClientConnection : protected Connection {

static constexpr size_t defaultDataWriteQueueLength = 320;

public: // methods
~ClientConnection() override;

Expand Down
Loading

0 comments on commit abe7027

Please sign in to comment.