Skip to content

Commit

Permalink
Merge pull request #72 from ecmwf/feature/memcheck
Browse files Browse the repository at this point in the history
Feature/memcheck
  • Loading branch information
danovaro authored Jan 24, 2025
2 parents dc2cda7 + 8864fa1 commit 097842e
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 103 deletions.
4 changes: 2 additions & 2 deletions src/fdb5/database/Inspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ ListIterator Inspector::inspect(const metkit::mars::MarsRequest& request,
const Schema& schema,
const fdb5::Notifier& notifyee) const {

InspectIterator* iterator = new InspectIterator();
auto iterator = std::make_unique<InspectIterator>();
MultiRetrieveVisitor visitor(notifyee, *iterator, databases_, dbConfig_);

LOG_DEBUG_LIB(LibFdb5) << "Using schema: " << schema << std::endl;

schema.expand(request, visitor);

using QueryIterator = APIIterator<ListElement>;
return QueryIterator(iterator);
return QueryIterator(iterator.release());
}

ListIterator Inspector::inspect(const metkit::mars::MarsRequest& request) const {
Expand Down
10 changes: 6 additions & 4 deletions src/fdb5/toc/TocCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,22 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() {
// Add masking entries for all the indexes and subtocs visited so far

Buffer buf(sizeof(TocRecord) * (subtocs.size() + maskable_indexes));
buf.zero();
size_t combinedSize = 0;

for (size_t i = 0; i < readIndexes.size(); i++) {
// We need to explicitly mask indexes in the master TOC
if (!indexInSubtoc[i]) {
Index& idx(readIndexes[i]);
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildClearRecord(*r, idx));
combinedSize += recordSizes(*r, buildClearRecord(*r, idx)).second;
Log::info() << "Masking index: " << idx.location().uri() << std::endl;
}
}

for (const std::string& subtoc_path : subtocs) {
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r, subtoc_path));
combinedSize += recordSizes(*r, buildSubTocMaskRecord(*r, subtoc_path)).second;
Log::info() << "Masking sub-toc: " << subtoc_path << std::endl;
}

Expand Down Expand Up @@ -384,6 +385,7 @@ void TocCatalogueWriter::compactSubTocIndexes() {
// subtoc, written by this process. Then we append a masking entry.

Buffer buf(sizeof(TocRecord) * (fullIndexes_.size() + 1));
buf.zero();
size_t combinedSize = 0;

// n.b. we only need to compact the subtocs if we are actually writing something...
Expand All @@ -399,14 +401,14 @@ void TocCatalogueWriter::compactSubTocIndexes() {

idx.flush();
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_INDEX);
combinedSize += roundRecord(*r, buildIndexRecord(*r, idx));
combinedSize += recordSizes(*r, buildIndexRecord(*r, idx)).second;
}
}

// And add the masking record for the subtoc

TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r));
combinedSize += recordSizes(*r, buildSubTocMaskRecord(*r)).second;

// Write all of these records to the toc in one go.

Expand Down
106 changes: 56 additions & 50 deletions src/fdb5/toc/TocHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <fcntl.h>
#include <sys/types.h>
#include <pwd.h>
#include <utility>
#include <cstddef>

#include "eckit/config/Resource.h"
#include "eckit/io/FileHandle.h"
Expand Down Expand Up @@ -336,36 +338,50 @@ void TocHandler::dumpTocCache() const {

void TocHandler::append(TocRecord &r, size_t payloadSize ) {

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);

LOG_DEBUG_LIB(LibFdb5) << "Writing toc entry: " << (int)r.header_.tag_ << std::endl;

appendRound(r, payloadSize);
}

void TocHandler::appendRound(TocRecord &r, size_t payloadSize) {
// Obtain the rounded size, and set it in the record header.
size_t roundedSize = roundRecord(r, payloadSize);
auto [realSize, roundedSize] = recordSizes(r, payloadSize);

eckit::Buffer buf(roundedSize);
buf.zero();
buf.copy(static_cast<const void*>(&r), realSize);

appendRaw(buf, buf.size());
}

void TocHandler::appendRaw(const void *data, size_t size) {

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);

ASSERT(size % recordRoundSize() == 0);

size_t len;
SYSCALL2( len = ::write(fd_, &r, roundedSize), tocPath_ );
SYSCALL2( len = ::write(fd_, data, size), tocPath_ );
dirty_ = true;
ASSERT( len == roundedSize);
ASSERT( len == size);

}

void TocHandler::appendBlock(const void *data, size_t size) {
void TocHandler::appendBlock(TocRecord &r, size_t payloadSize) {

openForAppend();
TocHandlerCloser close(*this);

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);
appendRound(r, payloadSize);
}

// Ensure that this block is appropriately rounded.
void TocHandler::appendBlock(const void *data, size_t size) {

ASSERT(size % recordRoundSize() == 0);
openForAppend();
TocHandlerCloser close(*this);

size_t len;
SYSCALL2( len = ::write(fd_, data, size), tocPath_ );
dirty_ = true;
ASSERT( len == size );
appendRaw(data, size);
}

const TocSerialisationVersion& TocHandler::serialisationVersion() const {
Expand All @@ -378,11 +394,12 @@ size_t TocHandler::recordRoundSize() {
return fdbRoundTocRecords;
}

size_t TocHandler::roundRecord(TocRecord &r, size_t payloadSize) {
std::pair<size_t, size_t> TocHandler::recordSizes(TocRecord &r, size_t payloadSize) {

r.header_.size_ = eckit::round(sizeof(TocRecord::Header) + payloadSize, recordRoundSize());
size_t dataSize = sizeof(TocRecord::Header) + payloadSize;
r.header_.size_ = eckit::round(dataSize, recordRoundSize());

return r.header_.size_;
return {dataSize, r.header_.size_};
}

// readNext wraps readNextInternal.
Expand Down Expand Up @@ -513,7 +530,7 @@ std::vector<PathName> TocHandler::subTocPaths() const {
openForRead();
TocHandlerCloser close(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

std::vector<eckit::PathName> paths;

Expand Down Expand Up @@ -592,7 +609,7 @@ void TocHandler::allMaskableEntries(Offset startOffset, Offset endOffset,
Offset ret = proxy.seek(startOffset);
ASSERT(ret == startOffset);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while (proxy.position() < endOffset) {

Expand Down Expand Up @@ -823,8 +840,7 @@ void TocHandler::preloadSubTocs(bool readMasked) const {

eckit::Timer preloadTimer("subtocs.preload", Log::debug<LibFdb5>());
{
std::unique_ptr<TocRecord> r(
new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

// n.b. we call databaseKey() directly, as this preload will normally be called before we have walked
// the toc at all --> TOC_INIT not yet read --> parentKey_ not yet set.
Expand Down Expand Up @@ -870,7 +886,7 @@ void TocHandler::populateMaskedEntriesList() const {

maskedEntries_.clear();

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t countSubTocs = 0;

Expand Down Expand Up @@ -948,7 +964,7 @@ void TocHandler::writeInitRecord(const Key& key) {

TocHandlerCloser closer(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t len = readNext(*r);
if (len == 0) {
Expand Down Expand Up @@ -984,7 +1000,7 @@ void TocHandler::writeInitRecord(const Key& key) {
eckit::LocalPathName::rename(tmp, schemaPath_);
}

std::unique_ptr<TocRecord> r2(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_INIT)); // allocate TocRecord on heap (MARS-779)
auto r2 = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_INIT); // allocate (large) TocRecord on heap not stack (MARS-779)
eckit::MemoryStream s(&r2->payload_[0], r2->maxPayloadSize);
s << key;
s << isSubToc_;
Expand All @@ -1001,22 +1017,20 @@ void TocHandler::writeInitRecord(const Key& key) {

void TocHandler::writeClearRecord(const Index &index) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t sz = roundRecord(*r, buildClearRecord(*r, index));
appendBlock(r.get(), sz);
appendBlock(*r, buildClearRecord(*r, index));
}

void TocHandler::writeClearAllRecord() {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);
s << std::string {"*"};
s << off_t{0};

size_t sz = roundRecord(*r, s.position());
appendBlock(r.get(), sz);
appendBlock(*r, s.position());
}


Expand All @@ -1025,7 +1039,7 @@ void TocHandler::writeSubTocRecord(const TocHandler& subToc) {
openForAppend();
TocHandlerCloser closer(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_SUB_TOC)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_SUB_TOC); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);

Expand Down Expand Up @@ -1053,7 +1067,7 @@ void TocHandler::writeIndexRecord(const Index& index) {

const TocIndexLocation& location = reinterpret_cast<const TocIndexLocation&>(l);

std::unique_ptr<TocRecord> r(new TocRecord(handler_.serialisationVersion_.used(), TocRecord::TOC_INDEX)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(handler_.serialisationVersion_.used(), TocRecord::TOC_INDEX); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);

Expand Down Expand Up @@ -1104,15 +1118,14 @@ void TocHandler::writeIndexRecord(const Index& index) {

void TocHandler::writeSubTocMaskRecord(const TocHandler &subToc) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

// We use a relative path to this subtoc if it belongs to the current DB
// but an absolute one otherwise (e.g. for fdb-overlay).
const PathName& absPath = subToc.tocPath();
PathName path = (absPath.dirName().sameAs(directory_)) ? absPath.baseName() : absPath;

size_t sz = roundRecord(*r, buildSubTocMaskRecord(*r, path));
appendBlock(r.get(), sz);
appendBlock(*r, buildSubTocMaskRecord(*r, path));
}

bool TocHandler::useSubToc() const {
Expand Down Expand Up @@ -1153,8 +1166,7 @@ uid_t TocHandler::dbUID() const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while ( readNext(*r) ) {
if (r->header_.tag_ == TocRecord::TOC_INIT) {
Expand All @@ -1170,8 +1182,7 @@ Key TocHandler::databaseKey() {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = false;
while ( readNext(*r, walkSubTocs) ) {
Expand All @@ -1191,8 +1202,7 @@ size_t TocHandler::numberOfRecords() const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = true;
bool hideSubTocEntries = false;
Expand Down Expand Up @@ -1232,8 +1242,7 @@ std::vector<Index> TocHandler::loadIndexes(const Catalogue& catalogue, bool sort
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)
count_ = 0;

// A record of all the index entries found (to process later)
Expand Down Expand Up @@ -1396,8 +1405,7 @@ void TocHandler::dump(std::ostream& out, bool simple, bool walkSubTocs) const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool hideSubTocEntries = false;
bool hideClearEntries = false;
Expand Down Expand Up @@ -1466,8 +1474,7 @@ void TocHandler::dumpIndexFile(std::ostream& out, const eckit::PathName& indexFi
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = true;
bool hideSubTocEntries = true;
Expand Down Expand Up @@ -1579,8 +1586,7 @@ void TocHandler::enumerateMasked(const Catalogue& catalogue, std::set<std::pair<
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while ( readNextInternal(*r) ) {
if (r->header_.tag_ == TocRecord::TOC_INDEX) {
Expand Down
7 changes: 6 additions & 1 deletion src/fdb5/toc/TocHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <map>
#include <memory>
#include <utility>

#include "eckit/filesystem/PathName.h"
#include "eckit/filesystem/LocalPathName.h"
Expand Down Expand Up @@ -193,9 +194,10 @@ class TocHandler : public TocCommon, private eckit::NonCopyable {

// Given the payload size, returns the record size

static size_t roundRecord(TocRecord &r, size_t payloadSize);
static std::pair<size_t, size_t> recordSizes(TocRecord &r, size_t payloadSize);

void appendBlock(const void* data, size_t size);
void appendBlock(TocRecord &r, size_t payloadSize);

const TocSerialisationVersion& serialisationVersion() const;

Expand All @@ -209,6 +211,9 @@ class TocHandler : public TocCommon, private eckit::NonCopyable {

void close() const;

void appendRaw(const void* data, size_t size);
void appendRound(TocRecord &r, size_t payloadSize);

/// Populate the masked sub toc list, starting from the _current_position_ in the
/// file (opened for read). It resets back to the same place when done. This is
/// to allow searching only from the first subtoc.
Expand Down
Loading

0 comments on commit 097842e

Please sign in to comment.