Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Jan 23, 2025
1 parent dc2cda7 commit c3d1558
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
10 changes: 6 additions & 4 deletions src/fdb5/toc/TocCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,22 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() {
// Add masking entries for all the indexes and subtocs visited so far

Buffer buf(sizeof(TocRecord) * (subtocs.size() + maskable_indexes));
buf.zero();
size_t combinedSize = 0;

for (size_t i = 0; i < readIndexes.size(); i++) {
// We need to explicitly mask indexes in the master TOC
if (!indexInSubtoc[i]) {
Index& idx(readIndexes[i]);
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildClearRecord(*r, idx));
combinedSize += roundRecord(*r, buildClearRecord(*r, idx)).second;
Log::info() << "Masking index: " << idx.location().uri() << std::endl;
}
}

for (const std::string& subtoc_path : subtocs) {
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r, subtoc_path));
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r, subtoc_path)).second;
Log::info() << "Masking sub-toc: " << subtoc_path << std::endl;
}

Expand Down Expand Up @@ -384,6 +385,7 @@ void TocCatalogueWriter::compactSubTocIndexes() {
// subtoc, written by this process. Then we append a masking entry.

Buffer buf(sizeof(TocRecord) * (fullIndexes_.size() + 1));
buf.zero();
size_t combinedSize = 0;

// n.b. we only need to compact the subtocs if we are actually writing something...
Expand All @@ -399,14 +401,14 @@ void TocCatalogueWriter::compactSubTocIndexes() {

idx.flush();
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_INDEX);
combinedSize += roundRecord(*r, buildIndexRecord(*r, idx));
combinedSize += roundRecord(*r, buildIndexRecord(*r, idx)).second;
}
}

// And add the masking record for the subtoc

TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r));
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r)).second;

// Write all of these records to the toc in one go.

Expand Down
60 changes: 36 additions & 24 deletions src/fdb5/toc/TocHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,36 +336,50 @@ void TocHandler::dumpTocCache() const {

void TocHandler::append(TocRecord &r, size_t payloadSize ) {

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);

LOG_DEBUG_LIB(LibFdb5) << "Writing toc entry: " << (int)r.header_.tag_ << std::endl;

appendRound(r, payloadSize);
}

void TocHandler::appendRound(TocRecord &r, size_t payloadSize) {
// Obtain the rounded size, and set it in the record header.
size_t roundedSize = roundRecord(r, payloadSize);
auto [realSize, roundedSize] = roundRecord(r, payloadSize);

eckit::Buffer buf(roundedSize);
buf.zero();
buf.copy(static_cast<const void*>(&r), realSize);

appendRaw(buf, roundedSize);
}

void TocHandler::appendRaw(const void *data, size_t size) {

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);

ASSERT(size % recordRoundSize() == 0);

size_t len;
SYSCALL2( len = ::write(fd_, &r, roundedSize), tocPath_ );
SYSCALL2( len = ::write(fd_, data, size), tocPath_ );
dirty_ = true;
ASSERT( len == roundedSize);
ASSERT( len == size);

}

void TocHandler::appendBlock(const void *data, size_t size) {
void TocHandler::appendBlock(TocRecord &r, size_t payloadSize) {

openForAppend();
TocHandlerCloser close(*this);

ASSERT(fd_ != -1);
ASSERT(not cachedToc_);
appendRound(r, payloadSize);
}

// Ensure that this block is appropriately rounded.
void TocHandler::appendBlock(const void *data, size_t size) {

ASSERT(size % recordRoundSize() == 0);
openForAppend();
TocHandlerCloser close(*this);

size_t len;
SYSCALL2( len = ::write(fd_, data, size), tocPath_ );
dirty_ = true;
ASSERT( len == size );
appendRaw(data, size);
}

const TocSerialisationVersion& TocHandler::serialisationVersion() const {
Expand All @@ -378,11 +392,12 @@ size_t TocHandler::recordRoundSize() {
return fdbRoundTocRecords;
}

size_t TocHandler::roundRecord(TocRecord &r, size_t payloadSize) {
std::pair<size_t, size_t> TocHandler::roundRecord(TocRecord &r, size_t payloadSize) {

r.header_.size_ = eckit::round(sizeof(TocRecord::Header) + payloadSize, recordRoundSize());
size_t dataSize = sizeof(TocRecord::Header) + payloadSize;
r.header_.size_ = eckit::round(dataSize, recordRoundSize());

return r.header_.size_;
return {dataSize, r.header_.size_};
}

// readNext wraps readNextInternal.
Expand Down Expand Up @@ -1003,8 +1018,7 @@ void TocHandler::writeClearRecord(const Index &index) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t sz = roundRecord(*r, buildClearRecord(*r, index));
appendBlock(r.get(), sz);
appendBlock(*r, buildClearRecord(*r, index));
}

void TocHandler::writeClearAllRecord() {
Expand All @@ -1015,8 +1029,7 @@ void TocHandler::writeClearAllRecord() {
s << std::string {"*"};
s << off_t{0};

size_t sz = roundRecord(*r, s.position());
appendBlock(r.get(), sz);
appendBlock(*r, s.position());
}


Expand Down Expand Up @@ -1111,8 +1124,7 @@ void TocHandler::writeSubTocMaskRecord(const TocHandler &subToc) {
const PathName& absPath = subToc.tocPath();
PathName path = (absPath.dirName().sameAs(directory_)) ? absPath.baseName() : absPath;

size_t sz = roundRecord(*r, buildSubTocMaskRecord(*r, path));
appendBlock(r.get(), sz);
appendBlock(*r, buildSubTocMaskRecord(*r, path));
}

bool TocHandler::useSubToc() const {
Expand Down
6 changes: 5 additions & 1 deletion src/fdb5/toc/TocHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ class TocHandler : public TocCommon, private eckit::NonCopyable {

// Given the payload size, returns the record size

static size_t roundRecord(TocRecord &r, size_t payloadSize);
static std::pair<size_t, size_t> roundRecord(TocRecord &r, size_t payloadSize);

void appendBlock(const void* data, size_t size);
void appendBlock(TocRecord &r, size_t payloadSize);

const TocSerialisationVersion& serialisationVersion() const;

Expand All @@ -209,6 +210,9 @@ class TocHandler : public TocCommon, private eckit::NonCopyable {

void close() const;

void appendRaw(const void* data, size_t size);
void appendRound(TocRecord &r, size_t payloadSize);

/// Populate the masked sub toc list, starting from the _current_position_ in the
/// file (opened for read). It resets back to the same place when done. This is
/// to allow searching only from the first subtoc.
Expand Down
4 changes: 3 additions & 1 deletion tests/fdb/test_fdb5_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ CASE ( "test_fdb_service" ) {
while ((dp = ::readdir(dirp)) != nullptr) {
EXPECT_NOT(strstr( dp->d_name, "toc."));
}
::closedir(dirp);

// consuming the rest of the queue
while (iter.next(el));
Expand Down Expand Up @@ -502,6 +503,7 @@ CASE ( "test_fdb_service_subtoc" ) {
}
}
EXPECT(subtoc);
::closedir(dirp);

// consuming the rest of the queue
while (iter.next(el));
Expand Down Expand Up @@ -581,7 +583,7 @@ CASE( "schemaSerialisation" ) {
eckit::FileStream sin(filepath.c_str(), "r");
auto c = eckit::closer(sin);

fdb5::Schema* clone = eckit::Reanimator<fdb5::Schema>::reanimate(sin);
std::unique_ptr<fdb5::Schema> clone(eckit::Reanimator<fdb5::Schema>::reanimate(sin));

std::stringstream ss;
clone->dump(ss);
Expand Down

0 comments on commit c3d1558

Please sign in to comment.