Skip to content

Commit

Permalink
Merge pull request #4176 from SirTyson/parallel-bl-db
Browse files Browse the repository at this point in the history
Parallel bl db

Reviewed-by: marta-lokhova
  • Loading branch information
latobarita authored Mar 29, 2024
2 parents ededc67 + 916bb73 commit ad7b164
Show file tree
Hide file tree
Showing 19 changed files with 1,014 additions and 543 deletions.
134 changes: 11 additions & 123 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,6 @@ Bucket::Bucket()
{
}

std::unique_ptr<XDRInputFileStream>
Bucket::openStream()
{
releaseAssertOrThrow(!mFilename.empty());
auto streamPtr = std::make_unique<XDRInputFileStream>();
streamPtr->open(mFilename.string());
return std::move(streamPtr);
}

XDRInputFileStream&
Bucket::getIndexStream()
{
if (!mIndexStream)
{
mIndexStream = openStream();
}
return *mIndexStream;
}

XDRInputFileStream&
Bucket::getEvictionStream()
{
if (!mEvictionStream)
{
mEvictionStream = openStream();
}
return *mEvictionStream;
}

Hash const&
Bucket::getHash() const
{
Expand Down Expand Up @@ -156,90 +127,6 @@ void
Bucket::freeIndex()
{
mIndex.reset(nullptr);
mIndexStream.reset(nullptr);
}

std::optional<BucketEntry>
Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos,
size_t pageSize)
{
ZoneScoped;
auto& stream = getIndexStream();
stream.seek(pos);

BucketEntry be;
if (pageSize == 0)
{
if (stream.readOne(be))
{
return std::make_optional(be);
}
}
else if (stream.readPage(be, k, pageSize))
{
return std::make_optional(be);
}

// Mark entry miss for metrics
getIndex().markBloomMiss();
return std::nullopt;
}

std::optional<BucketEntry>
Bucket::getBucketEntry(LedgerKey const& k)
{
ZoneScoped;
auto pos = getIndex().lookup(k);
if (pos.has_value())
{
return getEntryAtOffset(k, pos.value(), getIndex().getPageSize());
}

return std::nullopt;
}

// When searching for an entry, BucketList calls this function on every bucket.
// Since the input is sorted, we do a binary search for the first key in keys.
// If we find the entry, we remove the found key from keys so that later buckets
// do not load shadowed entries. If we don't find the entry, we do not remove it
// from keys so that it will be searched for again at a lower level.
void
Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result)
{
ZoneScoped;

auto currKeyIt = keys.begin();
auto const& index = getIndex();
auto indexIter = index.begin();
while (currKeyIt != keys.end() && indexIter != index.end())
{
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
indexIter = newIndexIter;
if (offOp)
{
auto entryOp =
getEntryAtOffset(*currKeyIt, *offOp, getIndex().getPageSize());
if (entryOp)
{
if (entryOp->type() != DEADENTRY)
{
result.push_back(entryOp->liveEntry());
}

currKeyIt = keys.erase(currKeyIt);
continue;
}
}

++currKeyIt;
}
}

std::vector<PoolID> const&
Bucket::getPoolIDsByAsset(Asset const& asset) const
{
return getIndex().getPoolIDsByAsset(asset);
}

#ifdef BUILD_TESTS
Expand Down Expand Up @@ -787,12 +674,12 @@ mergeCasesWithEqualKeys(MergeCounters& mc, BucketInputIterator& oi,
}

bool
Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics)
Bucket::scanForEvictionLegacySQL(
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const
{
ZoneScoped;
if (isEmpty() ||
Expand All @@ -809,7 +696,8 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
return true;
}

auto& stream = getEvictionStream();
XDRInputFileStream stream{};
stream.open(mFilename);
stream.seek(iter.bucketFileOffset);

BucketEntry be;
Expand Down Expand Up @@ -844,10 +732,10 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
if (shouldEvict())
{
ZoneNamedN(evict, "evict entry", true);
if (metrics.has_value())
if (stats.has_value())
{
++metrics->numEntriesEvicted;
metrics->evictedEntriesAgeSum +=
++stats->numEntriesEvicted;
stats->evictedEntriesAgeSum +=
ledgerSeq - liveUntilLedger;
}

Expand Down
65 changes: 15 additions & 50 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace stellar
class AbstractLedgerTxn;
class Application;
class BucketManager;
struct EvictionMetrics;
struct EvictionStatistics;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
Expand All @@ -49,33 +49,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

std::unique_ptr<BucketIndex const> mIndex{};

// Lazily-constructed and retained for read path, one for BucketListDB reads
// and one for eviction scans
std::unique_ptr<XDRInputFileStream> mIndexStream;
std::unique_ptr<XDRInputFileStream> mEvictionStream;

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;

// Returns (lazily-constructed) file stream for bucketDB search. Note
// this might be in some random position left over from a previous read --
// must be seek()'ed before use.
XDRInputFileStream& getIndexStream();

// Returns (lazily-constructed) file stream for eviction scans. Unlike the
// indexStream, this should retain its position in-between calls. However, a
// node performing catchup or joining the network may need to begin evicting
// mid-bucket, so this stream should still be seeked to the proper position
// before reading.
XDRInputFileStream& getEvictionStream();

// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
// reads until key is found or the end of the page.
std::optional<BucketEntry>
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);

std::unique_ptr<XDRInputFileStream> openStream();

static std::string randomFileName(std::string const& tmpDir,
std::string ext);

Expand Down Expand Up @@ -109,18 +85,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);

// Loads bucket entry for LedgerKey k.
std::optional<BucketEntry> getBucketEntry(LedgerKey const& k);

// Loads LedgerEntry's for given keys. When a key is found, the
// entry is added to result and the key is removed from keys.
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result);

// Return all PoolIDs that contain the given asset on either side of the
// pool
std::vector<PoolID> const& getPoolIDsByAsset(Asset const& asset) const;

// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr ProtocolVersion
Expand All @@ -141,19 +105,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

// Returns false if eof reached or if Bucket protocol version < 20, true
// otherwise. Modifies iter as the bucket is scanned. Also modifies
// bytesToScan and remainingEntriesToEvict such that after this function
// returns:
// bytesToScan -= amount_bytes_scanned
// remainingEntriesToEvict -= entries_evicted
bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics);

#ifdef BUILD_TESTS
// "Applies" the bucket to the database. For each entry in the bucket,
// if the entry is init or live, creates or updates the corresponding
Expand All @@ -169,6 +120,18 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

#endif // BUILD_TESTS

// Returns false if eof reached, true otherwise. Modifies iter as the bucket
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
// after this function returns:
// bytesToScan -= amount_bytes_scanned
// maxEntriesToEvict -= entries_evicted
bool scanForEvictionLegacySQL(
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const;

// Create a fresh bucket from given vectors of init (created) and live
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
// be sorted, hashed, and adopted in the provided BucketManager.
Expand Down Expand Up @@ -201,5 +164,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);

friend class BucketSnapshot;
};
}
Loading

0 comments on commit ad7b164

Please sign in to comment.