From 7295f2af36136c5e573c6055b6927e8dd1253f78 Mon Sep 17 00:00:00 2001 From: Garand Tyson Date: Mon, 4 Nov 2024 20:33:57 -0800 Subject: [PATCH] Refactored addBatch in BucketList --- src/bucket/BucketList.cpp | 158 ++++++++++++-------------------------- src/bucket/BucketList.h | 18 +++++ 2 files changed, 66 insertions(+), 110 deletions(-) diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index 27364c34eb..36414cca3c 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -196,27 +196,15 @@ BucketLevel::prepare( ? std::make_shared() : mCurr; - if constexpr (std::is_same_v) - { - auto shadowsBasedOnProtocol = - protocolVersionStartsFrom( - snap->getBucketVersion(), - LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED) - ? std::vector>() - : shadows; - mNextCurr = - FutureBucket(app, curr, snap, shadowsBasedOnProtocol, - currLedgerProtocol, countMergeEvents, mLevel); - } - else - { - // HotArchive only exists for protocol > 21, should never have shadows - mNextCurr = - FutureBucket(app, curr, snap, /*shadows=*/{}, - currLedgerProtocol, countMergeEvents, mLevel); - } - - releaseAssert(mNextCurr.isMerging()); + auto shadowsBasedOnProtocol = + protocolVersionStartsFrom(snap->getBucketVersion(), + LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + ? std::vector>() + : shadows; + mNextCurr = + FutureBucket(app, curr, snap, shadowsBasedOnProtocol, + currLedgerProtocol, countMergeEvents, mLevel); + releaseAssert(mNextCurr.isMerging()); } template @@ -565,93 +553,17 @@ BucketListBase::getSize() const return sum; } +template +template void -HotArchiveBucketList::addBatch(Application& app, uint32_t currLedger, - uint32_t currLedgerProtocol, - std::vector const& archiveEntries, - std::vector const& restoredEntries, - std::vector const& deletedEntries) -{ - ZoneScoped; - releaseAssert(currLedger > 0); - - for (uint32_t i = static_cast(mLevels.size()) - 1; i != 0; --i) - { - if (levelShouldSpill(currLedger, i - 1)) - { - /** - * At every ledger, level[0] prepares the new batch and commits - * it. - * - * At ledger multiples of 2, level[0] snaps, level[1] commits - * existing (promotes next to curr) and "prepares" by starting a - * merge of that new level[1] curr with the new level[0] snap. This - * is "level 0 spilling". - * - * At ledger multiples of 8, level[1] snaps, level[2] commits - * existing (promotes next to curr) and "prepares" by starting a - * merge of that new level[2] curr with the new level[1] snap. This - * is "level 1 spilling". - * - * At ledger multiples of 32, level[2] snaps, level[3] commits - * existing (promotes next to curr) and "prepares" by starting a - * merge of that new level[3] curr with the new level[2] snap. This - * is "level 2 spilling". - * - * All these have to be done in _reverse_ order (counting down - * levels) because we want a 'curr' to be pulled out of the way into - * a 'snap' the moment it's half-a-level full, not have anything - * else spilled/added to it. - */ - - auto snap = mLevels[i - 1].snap(); - mLevels[i].commit(); - mLevels[i].prepare(app, currLedger, currLedgerProtocol, snap, - /*shadows=*/{}, - /*countMergeEvents=*/true); - } - } - - // In some testing scenarios, we want to inhibit counting level 0 merges - // because they are not repeated when restarting merges on app startup, - // and we are checking for an expected number of merge events on restart. - bool countMergeEvents = - !app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING; - bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC; - mLevels[0].prepare( - app, currLedger, currLedgerProtocol, - HotArchiveBucket::fresh(app.getBucketManager(), currLedgerProtocol, - archiveEntries, restoredEntries, deletedEntries, - countMergeEvents, app.getClock().getIOContext(), - doFsync), - /*shadows=*/{}, countMergeEvents); - mLevels[0].commit(); - - // We almost always want to try to resolve completed merges to single - // buckets, as it makes restarts less fragile: fewer saved/restored shadows, - // fewer buckets for the user to accidentally delete from their buckets - // dir. Also makes publication less likely to redo a merge that was already - // complete (but not resolved) when the snapshot gets taken. - // - // But we support the option of not-doing so, only for the sake of - // testing. Note: this is nonblocking in any case. - if (!app.getConfig().ARTIFICIALLY_PESSIMIZE_MERGES_FOR_TESTING) - { - resolveAnyReadyFutures(); - } -} - -void -LiveBucketList::addBatch(Application& app, uint32_t currLedger, - uint32_t currLedgerProtocol, - std::vector const& initEntries, - std::vector const& liveEntries, - std::vector const& deadEntries) +BucketListBase::addBatchInternal(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, + VectorT const&... inputVectors) { ZoneScoped; releaseAssert(currLedger > 0); - std::vector> shadows; + std::vector> shadows; for (auto& level : mLevels) { shadows.push_back(level.getCurr()); @@ -741,13 +653,12 @@ LiveBucketList::addBatch(Application& app, uint32_t currLedger, !app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING; bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC; releaseAssert(shadows.size() == 0); - mLevels[0].prepare( - app, currLedger, currLedgerProtocol, - LiveBucket::fresh(app.getBucketManager(), currLedgerProtocol, - initEntries, liveEntries, deadEntries, - countMergeEvents, app.getClock().getIOContext(), - doFsync), - shadows, countMergeEvents); + mLevels[0].prepare(app, currLedger, currLedgerProtocol, + BucketT::fresh(app.getBucketManager(), + currLedgerProtocol, inputVectors..., + countMergeEvents, + app.getClock().getIOContext(), doFsync), + shadows, countMergeEvents); mLevels[0].commit(); // We almost always want to try to resolve completed merges to single @@ -764,6 +675,33 @@ LiveBucketList::addBatch(Application& app, uint32_t currLedger, } } +void +HotArchiveBucketList::addBatch(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, + std::vector const& archiveEntries, + std::vector const& restoredEntries, + std::vector const& deletedEntries) +{ + ZoneScoped; + releaseAssertOrThrow(protocolVersionStartsFrom( + currLedgerProtocol, + Bucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION)); + addBatchInternal(app, currLedger, currLedgerProtocol, archiveEntries, + restoredEntries, deletedEntries); +} + +void +LiveBucketList::addBatch(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, + std::vector const& initEntries, + std::vector const& liveEntries, + std::vector const& deadEntries) +{ + ZoneScoped; + addBatchInternal(app, currLedger, currLedgerProtocol, initEntries, + liveEntries, deadEntries); +} + BucketEntryCounters LiveBucketList::sumBucketEntryCounters() const { diff --git a/src/bucket/BucketList.h b/src/bucket/BucketList.h index cd53514dc8..932e48b176 100644 --- a/src/bucket/BucketList.h +++ b/src/bucket/BucketList.h @@ -414,6 +414,24 @@ template class BucketListBase protected: std::vector> mLevels; + // Add a batch of entries to the + // bucketlist, representing the entries effected by closing + // `currLedger`. The bucketlist will incorporate these into the smallest + // (0th) level, as well as commit or prepare merges for any levels that + // should have spilled due to passing through `currLedger`. The `currLedger` + // and `currProtocolVersion` values should be taken from the ledger at which + // this batch is being added. `inputVectors` should contain a vector of + // entries to insert for each corresponding BucketEntry type, i.e. + // initEntry, liveEntry, and deadEntry for the LiveBucketList. These must be + // the same input vector types for the corresponding BucketT::fresh + // function. + // This is an internal function, derived classes should define a + // public addBatch function with explicit input vector types. + template + void addBatchInternal(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, + VectorT const&... inputVectors); + public: // Trivial pure virtual destructor to make this an abstract class virtual ~BucketListBase() = 0;