Skip to content

Commit

Permalink
WIP: Background tryAdd functionality in TransactionQueue
Browse files Browse the repository at this point in the history
This is a *draft* change that will resolve stellar#4316 when it is complete.
The change makes `TransactionQueue` thread safe and runs the `tryAdd`
function in the background when the feature is enabled. The
implementation closely follows the
[design document](https://docs.google.com/document/d/1pU__XfEp-rR-17TNsuj-VhY6JfyendaFSYLTiq6tIj4/edit?usp=sharing)
I wrote.  The implementation still requires the main thread to
re-broadcast the transactions (for now). I've opened this PR for
visibility / early feedback on the implementation.

This change is very much a work in progress, with the following tasks
remaining:

* [ ] Fix catchup. I seem to have broken catchup in rebasing these
      changes on master. I need to figure out what is going on there and fix
      it.
* [ ] Fix failing tests. These are failing because they don't update
      `TransactionQueue`s new snapshots correctly.
* [ ] Rigorous testing, both for correctness and performance.
* [ ] I'd like to take a look at pushing the cut-point out a bit to
      enable flooding in the background as well. If this is a relatively
      simple change, I'd like to roll it into this PR. If it looks hairy,
      then I'll leave it for a separate change later.
  • Loading branch information
bboston7 committed Jan 16, 2025
1 parent e1cf3d1 commit 84444eb
Show file tree
Hide file tree
Showing 38 changed files with 703 additions and 317 deletions.
94 changes: 63 additions & 31 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
}

HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
: mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
, mTrackingTimer(app)
Expand Down Expand Up @@ -275,7 +272,10 @@ HerderImpl::shutdown()
"Shutdown interrupting quorum transitive closure analysis.");
mLastQuorumMapIntersectionState.mInterruptFlag = true;
}
mTransactionQueue.shutdown();
if (mTransactionQueue)
{
mTransactionQueue->shutdown();
}
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->shutdown();
Expand Down Expand Up @@ -603,7 +603,7 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
mTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
Expand All @@ -617,11 +617,31 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
}
else if (!tx->isSoroban())
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
[this, tx]() { mTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
}
else
{
result = mTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else if (mSorobanTransactionQueue)
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
}
else
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else
{
Expand Down Expand Up @@ -923,7 +943,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
bool
HerderImpl::sourceAccountPending(AccountID const& accountID) const
{
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
bool accPending = mTransactionQueue->sourceAccountPending(accountID);
if (mSorobanTransactionQueue)
{
accPending = accPending ||
Expand Down Expand Up @@ -1092,7 +1112,7 @@ HerderImpl::getPendingEnvelopes()
ClassicTransactionQueue&
HerderImpl::getTransactionQueue()
{
return mTransactionQueue;
return *mTransactionQueue;
}
SorobanTransactionQueue&
HerderImpl::getSorobanTransactionQueue()
Expand Down Expand Up @@ -1391,7 +1411,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// it's guaranteed to be up-to-date
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
PerPhaseTransactionList txPhases;
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header));

if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
Expand Down Expand Up @@ -1470,7 +1490,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
}

mTransactionQueue.ban(
mTransactionQueue->ban(
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);

auto txSetHash = proposedSet->getContentsHash();
Expand Down Expand Up @@ -2172,9 +2192,11 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
{
if (!mSorobanTransactionQueue)
{
releaseAssert(mTxQueueBucketSnapshot);
mSorobanTransactionQueue =
std::make_unique<SorobanTransactionQueue>(
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
mApp, mTxQueueBucketSnapshot,
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
}
Expand All @@ -2189,6 +2211,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
void
HerderImpl::start()
{
releaseAssert(!mTxQueueBucketSnapshot);
mTxQueueBucketSnapshot = mApp.getBucketManager()
.getBucketSnapshotManager()
.copySearchableLiveBucketListSnapshot();
releaseAssert(!mTransactionQueue);
mTransactionQueue = std::make_unique<ClassicTransactionQueue>(
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER);

mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
{
uint32_t version = mApp.getLedgerManager()
Expand Down Expand Up @@ -2333,23 +2364,23 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)

auto lhhe = mLedgerManager.getLastClosedLedgerHeader();

auto updateQueue = [&](auto& queue, auto const& applied) {
queue.removeApplied(applied);
queue.shift();

auto txs = queue.getTransactions(lhhe.header);

auto invalidTxs = TxSetUtils::getInvalidTxList(
auto filterInvalidTxs = [&](TxFrameList const& txs) {
return TxSetUtils::getInvalidTxList(
txs, mApp, 0,
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime));
queue.ban(invalidTxs);

queue.rebroadcast();
getUpperBoundCloseTimeOffset(mApp.getAppConnector(),
lhhe.header.scpValue.closeTime));
};
// Update bucket list snapshot, if needed. Note that this modifies the
// pointer itself on update, so we need to pass the potentially new pointer
// to the tx queues.
mApp.getBucketManager()
.getBucketSnapshotManager()
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
{
updateQueue(mTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]);
mTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}

// Even if we're in protocol 20, still check for number of phases, in case
Expand All @@ -2358,8 +2389,9 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
if (mSorobanTransactionQueue != nullptr &&
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
{
updateQueue(*mSorobanTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]);
mSorobanTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}
}

Expand Down Expand Up @@ -2476,7 +2508,7 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
size_t
HerderImpl::getMaxQueueSizeOps() const
{
return mTransactionQueue.getMaxQueueSizeOps();
return mTransactionQueue->getMaxQueueSizeOps();
}

size_t
Expand All @@ -2490,7 +2522,7 @@ HerderImpl::getMaxQueueSizeSorobanOps() const
bool
HerderImpl::isBannedTx(Hash const& hash) const
{
auto banned = mTransactionQueue.isBanned(hash);
auto banned = mTransactionQueue->isBanned(hash);
if (mSorobanTransactionQueue)
{
banned = banned || mSorobanTransactionQueue->isBanned(hash);
Expand All @@ -2501,7 +2533,7 @@ HerderImpl::isBannedTx(Hash const& hash) const
TransactionFrameBaseConstPtr
HerderImpl::getTx(Hash const& hash) const
{
auto classic = mTransactionQueue.getTx(hash);
auto classic = mTransactionQueue->getTx(hash);
if (!classic && mSorobanTransactionQueue)
{
return mSorobanTransactionQueue->getTx(hash);
Expand Down
5 changes: 4 additions & 1 deletion src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class HerderImpl : public Herder
void purgeOldPersistedTxSets();
void writeDebugTxSet(LedgerCloseData const& lcd);

ClassicTransactionQueue mTransactionQueue;
std::unique_ptr<ClassicTransactionQueue> mTransactionQueue;
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;

void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
Expand Down Expand Up @@ -301,6 +301,9 @@ class HerderImpl : public Herder
Application& mApp;
LedgerManager& mLedgerManager;

// Bucket list snapshot to use for transaction queues
SearchableSnapshotConstPtr mTxQueueBucketSnapshot;

struct SCPMetrics
{
medida::Meter& mLostSync;
Expand Down
Loading

0 comments on commit 84444eb

Please sign in to comment.