Skip to content

Commit

Permalink
Refactored addBatch in BucketList
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Nov 5, 2024
1 parent 0570e30 commit 7295f2a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 110 deletions.
158 changes: 48 additions & 110 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,15 @@ BucketLevel<BucketT>::prepare(
? std::make_shared<BucketT>()
: mCurr;

if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
auto shadowsBasedOnProtocol =
protocolVersionStartsFrom(
snap->getBucketVersion(),
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
? std::vector<std::shared_ptr<LiveBucket>>()
: shadows;
mNextCurr =
FutureBucket<BucketT>(app, curr, snap, shadowsBasedOnProtocol,
currLedgerProtocol, countMergeEvents, mLevel);
}
else
{
// HotArchive only exists for protocol > 21, should never have shadows
mNextCurr =
FutureBucket<BucketT>(app, curr, snap, /*shadows=*/{},
currLedgerProtocol, countMergeEvents, mLevel);
}

releaseAssert(mNextCurr.isMerging());
auto shadowsBasedOnProtocol =
protocolVersionStartsFrom(snap->getBucketVersion(),
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
? std::vector<std::shared_ptr<BucketT>>()
: shadows;
mNextCurr =
FutureBucket<BucketT>(app, curr, snap, shadowsBasedOnProtocol,
currLedgerProtocol, countMergeEvents, mLevel);
releaseAssert(mNextCurr.isMerging());
}

template <typename BucketT>
Expand Down Expand Up @@ -565,93 +553,17 @@ BucketListBase<BucketT>::getSize() const
return sum;
}

template <typename BucketT>
template <typename... VectorT>
void
HotArchiveBucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& archiveEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries)
{
ZoneScoped;
releaseAssert(currLedger > 0);

for (uint32_t i = static_cast<uint32>(mLevels.size()) - 1; i != 0; --i)
{
if (levelShouldSpill(currLedger, i - 1))
{
/**
* At every ledger, level[0] prepares the new batch and commits
* it.
*
* At ledger multiples of 2, level[0] snaps, level[1] commits
* existing (promotes next to curr) and "prepares" by starting a
* merge of that new level[1] curr with the new level[0] snap. This
* is "level 0 spilling".
*
* At ledger multiples of 8, level[1] snaps, level[2] commits
* existing (promotes next to curr) and "prepares" by starting a
* merge of that new level[2] curr with the new level[1] snap. This
* is "level 1 spilling".
*
* At ledger multiples of 32, level[2] snaps, level[3] commits
* existing (promotes next to curr) and "prepares" by starting a
* merge of that new level[3] curr with the new level[2] snap. This
* is "level 2 spilling".
*
* All these have to be done in _reverse_ order (counting down
* levels) because we want a 'curr' to be pulled out of the way into
* a 'snap' the moment it's half-a-level full, not have anything
* else spilled/added to it.
*/

auto snap = mLevels[i - 1].snap();
mLevels[i].commit();
mLevels[i].prepare(app, currLedger, currLedgerProtocol, snap,
/*shadows=*/{},
/*countMergeEvents=*/true);
}
}

// In some testing scenarios, we want to inhibit counting level 0 merges
// because they are not repeated when restarting merges on app startup,
// and we are checking for an expected number of merge events on restart.
bool countMergeEvents =
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING;
bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC;
mLevels[0].prepare(
app, currLedger, currLedgerProtocol,
HotArchiveBucket::fresh(app.getBucketManager(), currLedgerProtocol,
archiveEntries, restoredEntries, deletedEntries,
countMergeEvents, app.getClock().getIOContext(),
doFsync),
/*shadows=*/{}, countMergeEvents);
mLevels[0].commit();

// We almost always want to try to resolve completed merges to single
// buckets, as it makes restarts less fragile: fewer saved/restored shadows,
// fewer buckets for the user to accidentally delete from their buckets
// dir. Also makes publication less likely to redo a merge that was already
// complete (but not resolved) when the snapshot gets taken.
//
// But we support the option of not-doing so, only for the sake of
// testing. Note: this is nonblocking in any case.
if (!app.getConfig().ARTIFICIALLY_PESSIMIZE_MERGES_FOR_TESTING)
{
resolveAnyReadyFutures();
}
}

void
LiveBucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries)
BucketListBase<BucketT>::addBatchInternal(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
VectorT const&... inputVectors)
{
ZoneScoped;
releaseAssert(currLedger > 0);

std::vector<std::shared_ptr<LiveBucket>> shadows;
std::vector<std::shared_ptr<BucketT>> shadows;
for (auto& level : mLevels)
{
shadows.push_back(level.getCurr());
Expand Down Expand Up @@ -741,13 +653,12 @@ LiveBucketList::addBatch(Application& app, uint32_t currLedger,
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING;
bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC;
releaseAssert(shadows.size() == 0);
mLevels[0].prepare(
app, currLedger, currLedgerProtocol,
LiveBucket::fresh(app.getBucketManager(), currLedgerProtocol,
initEntries, liveEntries, deadEntries,
countMergeEvents, app.getClock().getIOContext(),
doFsync),
shadows, countMergeEvents);
mLevels[0].prepare(app, currLedger, currLedgerProtocol,
BucketT::fresh(app.getBucketManager(),
currLedgerProtocol, inputVectors...,
countMergeEvents,
app.getClock().getIOContext(), doFsync),
shadows, countMergeEvents);
mLevels[0].commit();

// We almost always want to try to resolve completed merges to single
Expand All @@ -764,6 +675,33 @@ LiveBucketList::addBatch(Application& app, uint32_t currLedger,
}
}

void
HotArchiveBucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& archiveEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries)
{
ZoneScoped;
releaseAssertOrThrow(protocolVersionStartsFrom(
currLedgerProtocol,
Bucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
addBatchInternal(app, currLedger, currLedgerProtocol, archiveEntries,
restoredEntries, deletedEntries);
}

void
LiveBucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries)
{
ZoneScoped;
addBatchInternal(app, currLedger, currLedgerProtocol, initEntries,
liveEntries, deadEntries);
}

BucketEntryCounters
LiveBucketList::sumBucketEntryCounters() const
{
Expand Down
18 changes: 18 additions & 0 deletions src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,24 @@ template <class BucketT> class BucketListBase
protected:
std::vector<BucketLevel<BucketT>> mLevels;

// Add a batch of entries to the
// bucketlist, representing the entries effected by closing
// `currLedger`. The bucketlist will incorporate these into the smallest
// (0th) level, as well as commit or prepare merges for any levels that
// should have spilled due to passing through `currLedger`. The `currLedger`
// and `currProtocolVersion` values should be taken from the ledger at which
// this batch is being added. `inputVectors` should contain a vector of
// entries to insert for each corresponding BucketEntry type, i.e.
// initEntry, liveEntry, and deadEntry for the LiveBucketList. These must be
// the same input vector types for the corresponding BucketT::fresh
// function.
// This is an internal function, derived classes should define a
// public addBatch function with explicit input vector types.
template <typename... VectorT>
void addBatchInternal(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
VectorT const&... inputVectors);

public:
// Trivial pure virtual destructor to make this an abstract class
virtual ~BucketListBase() = 0;
Expand Down

0 comments on commit 7295f2a

Please sign in to comment.