Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchpoints: Add onlineaccounts and onlineroundparamstail tables to snapshot files #6177

Merged
merged 25 commits into from
Dec 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d775659
Add onlineaccounts and onlineroundparamstail to catchpoint files
cce Nov 22, 2024
39f25db
Add test demonstrating in-memory sqlite DB retries causing merkle tri…
cce Dec 4, 2024
9b0f9e9
Add retry clear callback
cce Dec 4, 2024
1e736fe
change names
cce Dec 4, 2024
7cc08cc
fmt
cce Dec 4, 2024
a101d07
add dualdriver implementation of TransactionContextWithRetryClearFn
cce Dec 4, 2024
ae67ab8
Update mockDB in trackerdb testsuite
cce Dec 5, 2024
865d018
Update pebbledriver.go comment
cce Dec 5, 2024
33af474
fix race by removing deadlock detection disable
cce Dec 5, 2024
c2c3e4f
Merge remote-tracking branch 'origin/retry-rollback-ledger' into catc…
cce Dec 5, 2024
abf5853
move ledgerTracker advanced error handling methods to ledgerTrackerEx…
cce Dec 6, 2024
b196586
make TestCatchpointAfterStakeLookupTxns work
cce Dec 11, 2024
c11197b
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 11, 2024
49c3067
Merge remote-tracking branch 'origin/catchpoint-onlineaccts-separateh…
cce Dec 11, 2024
4baad21
Update ledger/catchupaccessor.go
cce Dec 17, 2024
48d76af
add more assertions to unit test
cce Dec 12, 2024
8adddbc
update TestCatchpointFastUpdates to give one more kick if DB commits …
cce Dec 19, 2024
330c344
use future
cce Dec 19, 2024
8c81163
use future in state proof catchpoint tests
cce Dec 19, 2024
544b0b0
add TODO in TestFullCatchpointWriterOverflowAccounts and testNewLedge…
cce Dec 19, 2024
5fb737c
code review
cce Dec 19, 2024
998f2fd
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 19, 2024
1fa610d
update TestCatchpointAfterStakeLookupTxns for TEAL R-320
cce Dec 19, 2024
5c9bc2d
Update ledger/catchpointtracker_test.go
cce Dec 19, 2024
767e369
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make TestCatchpointAfterStakeLookupTxns work
cce committed Dec 11, 2024
commit b1965862c305348c72bbf3f47eb6ad67c48064b8
203 changes: 168 additions & 35 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
"compress/gzip"
"context"
"database/sql"
"encoding/binary"
"fmt"
"io"
"os"
@@ -297,8 +298,7 @@ func TestBasicCatchpointWriter(t *testing.T) {
}

func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader {
var totalAccounts uint64
var totalChunks uint64
var totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks uint64
var biggestChunkLen uint64
var accountsRnd basics.Round
var totals ledgercore.AccountTotals
@@ -333,6 +333,9 @@ func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, fil
}
}
totalAccounts = writer.totalAccounts
totalKVs = writer.totalKVs
totalOnlineAccounts = writer.totalOnlineAccounts
totalOnlineRoundParams = writer.totalOnlineRoundParams
totalChunks = writer.chunkNum
biggestChunkLen = writer.biggestChunkLen
accountsRnd, err = ar.AccountsRound()
@@ -347,14 +350,17 @@ func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, fil
blockHeaderDigest := crypto.Hash([]byte{1, 2, 3})
catchpointLabel := fmt.Sprintf("%d#%v", blocksRound, blockHeaderDigest) // this is not a correct way to create a label, but it's good enough for this unit test
catchpointFileHeader := CatchpointFileHeader{
Version: CatchpointFileVersionV7,
BalancesRound: accountsRnd,
BlocksRound: blocksRound,
Totals: totals,
TotalAccounts: totalAccounts,
TotalChunks: totalChunks,
Catchpoint: catchpointLabel,
BlockHeaderDigest: blockHeaderDigest,
Version: CatchpointFileVersionV8,
BalancesRound: accountsRnd,
BlocksRound: blocksRound,
Totals: totals,
TotalAccounts: totalAccounts,
TotalKVs: totalKVs,
TotalOnlineAccounts: totalOnlineAccounts,
TotalOnlineRoundParams: totalOnlineRoundParams,
TotalChunks: totalChunks,
Catchpoint: catchpointLabel,
BlockHeaderDigest: blockHeaderDigest,
}
err = repackCatchpoint(
context.Background(), catchpointFileHeader, biggestChunkLen,
@@ -705,16 +711,8 @@ func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess tracke
err = accessor.BuildMerkleTrie(context.Background(), nil)
require.NoError(t, err)

resetAccountDBToV6(t, l)

err = l.trackerDBs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error {
cw, err := tx.MakeCatchpointWriter()
if err != nil {
return err
}

return cw.ApplyCatchpointStagingBalances(ctx, 0, 0)
})
// Initializes DB, runs migrations, runs ApplyCatchpointStagingBalances
err = accessor.(*catchpointCatchupAccessorImpl).finishBalances(context.Background())
require.NoError(t, err)

balanceTrieStats := func(db trackerdb.Store) merkletrie.Stats {
@@ -790,6 +788,20 @@ func TestFullCatchpointWriter(t *testing.T) {
}
}

// ensure both committed all pending changes before taking a catchpoint
// another approach is to modify the test and craft round numbers,
// and make the ledger to generate catchpoint itself when it is time
func testCatchpointFlushRound(l *Ledger) {
// Clear the timer to ensure a flush
l.trackers.mu.Lock()
l.trackers.lastFlushTime = time.Time{}
l.trackers.mu.Unlock()

r, _ := l.LatestCommitted()
l.trackers.committedUpTo(r)
l.trackers.waitAccountsWriting()
}

func TestExactAccountChunk(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
@@ -821,21 +833,8 @@ func TestExactAccountChunk(t *testing.T) {
dl.fullBlock(&selfpay)
}

// ensure both committed all pending changes before taking a catchpoint
// another approach is to modify the test and craft round numbers,
// and make the ledger to generate catchpoint itself when it is time
flushRound := func(l *Ledger) {
// Clear the timer to ensure a flush
l.trackers.mu.Lock()
l.trackers.lastFlushTime = time.Time{}
l.trackers.mu.Unlock()

r, _ := l.LatestCommitted()
l.trackers.committedUpTo(r)
l.trackers.waitAccountsWriting()
}
flushRound(dl.generator)
flushRound(dl.validator)
testCatchpointFlushRound(dl.generator)
testCatchpointFlushRound(dl.validator)

require.Eventually(t, func() bool {
dl.generator.accts.accountsMu.RLock()
@@ -959,6 +958,140 @@ func TestCatchpointAfterTxns(t *testing.T) {
}
}

func TestCatchpointAfterStakeLookupTxns(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(cfg *ledgertesting.GenesisCfg) {
cfg.OnlineCount = 1
ledgertesting.TurnOffRewards(cfg)
})
cfg := config.GetDefaultLocal()
dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg, simpleLedgerOnDisk())
defer dl.Close()

initialStake := uint64(833333333333333)
expectedStake := initialStake
stakeAppSource := main(`
// ensure total online stake matches arg 0
txn ApplicationArgs 0
btoi
online_stake
==
assert
// ensure stake for accounts 1 (the only online account) matches arg 0
txn Accounts 1
voter_params_get VoterBalance
pop
txn ApplicationArgs 0
btoi
==
assert
`)
// uses block 1 and 2
stakeApp := dl.fundedApp(addrs[1], 1_000_000, stakeAppSource)

// starting with block 3, make an app call and a pay in each block
callStakeApp := func(assertStake uint64) []*txntest.Txn {
stakebuf := make([]byte, 8)
binary.BigEndian.PutUint64(stakebuf, assertStake)
return []*txntest.Txn{
// assert stake from 320 rounds ago
txntest.Txn{
Type: "appl",
Sender: addrs[2],
ApplicationID: stakeApp,
Note: ledgertesting.RandomNote(),
Accounts: []basics.Address{addrs[0]},
}.Args(string(stakebuf)),
// pay 1 microalgo to the only online account (takes effect in 320 rounds)
{
Type: "pay",
Sender: addrs[1],
Receiver: addrs[0],
Amount: 1,
}}
}

// adds block 3
vb := dl.fullBlock(callStakeApp(expectedStake)...)
require.Equal(t, vb.Block().Round(), basics.Round(3))
require.Empty(t, vb.Block().ExpiredParticipationAccounts)
require.Empty(t, vb.Block().AbsentParticipationAccounts)

// add blocks until round 322, after which stake will go up by 1 each round
for ; vb.Block().Round() < 322; vb = dl.fullBlock(callStakeApp(expectedStake)...) {
require.Empty(t, vb.Block().ExpiredParticipationAccounts)
require.Empty(t, vb.Block().AbsentParticipationAccounts)

nextRnd := vb.Block().Round() + 1
stake, err := dl.generator.OnlineCirculation(nextRnd.SubSaturate(320), nextRnd)
require.NoError(t, err)
require.Equal(t, expectedStake, stake.Raw)
}
require.Equal(t, vb.Block().Round(), basics.Round(322))

for vb.Block().Round() <= 1500 {
// the online_stake opcode in block 323 will look up OnlineCirculation(2, 322).
xRnd := vb.Block().Round()
stake, err := dl.generator.OnlineCirculation(xRnd.SubSaturate(320), xRnd)
require.NoError(t, err)
require.Equal(t, expectedStake, stake.Raw)

// build a new block for xRnd+1, asserting online stake for xRnd-320
vb = dl.fullBlock(callStakeApp(expectedStake)...)
require.Empty(t, vb.Block().ExpiredParticipationAccounts)
require.Empty(t, vb.Block().AbsentParticipationAccounts)

expectedStake++ // add 1 microalgo to the expected stake for the next block
}

// wait for tracker to flush
testCatchpointFlushRound(dl.generator)
testCatchpointFlushRound(dl.validator)

// ensure flush and latest round all were OK
genDBRound := dl.generator.LatestTrackerCommitted()
valDBRound := dl.validator.LatestTrackerCommitted()
require.NotZero(t, genDBRound)
require.NotZero(t, valDBRound)
require.Equal(t, genDBRound, valDBRound)
require.Equal(t, 1497, int(genDBRound))
genLatestRound := dl.generator.Latest()
valLatestRound := dl.validator.Latest()
require.NotZero(t, genLatestRound)
require.NotZero(t, valLatestRound)
require.Equal(t, genLatestRound, valLatestRound)
// latest should be 4 rounds ahead of DB round
require.Equal(t, genDBRound+basics.Round(cfg.MaxAcctLookback), genLatestRound)

t.Log("DB round generator", genDBRound, "validator", valDBRound)
t.Log("Latest round generator", genLatestRound, "validator", valLatestRound)

tempDir := t.TempDir()
catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data")
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, 7, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
defer l.Close()

oar, err := l.trackerDBs.MakeOnlineAccountsOptimizedReader()
require.NoError(t, err)

for i := genDBRound; i >= (genDBRound - 1000); i-- {
oad, err := oar.LookupOnline(addrs[0], basics.Round(i))
require.NoError(t, err)
t.Log(i, oad.AccountData.MicroAlgos.Raw)
// block 3 started paying 1 microalgo to addrs[0] per round
expected := initialStake + uint64(i) - 2
require.Equal(t, expected, oad.AccountData.MicroAlgos.Raw)
}

}

// Exercises a sequence of box modifications that caused a bug in
// catchpoint writes.
//
45 changes: 30 additions & 15 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
@@ -446,6 +446,8 @@ func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex
case CatchpointFileVersionV5:
case CatchpointFileVersionV6:
case CatchpointFileVersionV7:
case CatchpointFileVersionV8:

default:
return fmt.Errorf("CatchpointCatchupAccessorImpl::processStagingContent: unable to process catchpoint - version %d is not supported", fileHeader.Version)
}
@@ -533,8 +535,13 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
expectingMoreEntries = make([]bool, len(balances.Balances))

case CatchpointFileVersionV6:
// V6 split accounts from resources; later, KVs were added to the v6 chunk format
fallthrough
case CatchpointFileVersionV7:
// V7 added state proof verification data + hash, but left v6 chunk format unchanged
fallthrough
case CatchpointFileVersionV8:
// V8 added online accounts and online round params data + hashes, and added them to the v6 chunk format
var chunk catchpointFileChunkV6
err = protocol.Decode(bytes, &chunk)
if err != nil {
@@ -852,7 +859,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
var trie *merkletrie.Trie
uncommitedHashesCount := 0
keepWriting := true
var accountHashesWritten, kvHashesWritten uint64
accountHashesWritten, kvHashesWritten := uint64(0), uint64(0)
var mc trackerdb.MerkleCommitter

txErr := dbs.Transaction(func(transactionCtx context.Context, tx trackerdb.TransactionScope) (err error) {
@@ -904,8 +911,8 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
uncommitedHashesCount += len(hashesToWrite)

accounts, kvs := countHashes(hashesToWrite)
accountHashesWritten += accounts
kvHashesWritten += kvs
accountHashesWritten += accounts

return nil
})
@@ -1047,7 +1054,8 @@ func (c *catchpointCatchupAccessorImpl) GetVerifyData(ctx context.Context) (bala
}

// calculateVerificationHash iterates over a TableIterator, hashes each item, and returns a hash of
// all the concatenated item hashes.
// all the concatenated item hashes. It is used to verify onlineaccounts and onlineroundparams tables,
// both at restore time (in catchpointCatchupAccessorImpl) and snapshot time (in catchpointTracker).
func calculateVerificationHash[T crypto.Hashable](
ctx context.Context,
iterFactory func(context.Context) (trackerdb.TableIterator[T], error),
@@ -1259,7 +1267,7 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
return err
}

var balancesRound, hashRound uint64
var balancesRound, hashRound, catchpointFileVersion uint64
var totals ledgercore.AccountTotals

balancesRound, err = crw.ReadCatchpointStateUint64(ctx, trackerdb.CatchpointStateCatchupBalancesRound)
@@ -1272,6 +1280,11 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
return err
}

catchpointFileVersion, err = c.catchpointStore.ReadCatchpointStateUint64(ctx, trackerdb.CatchpointStateCatchupVersion)
if err != nil {
return fmt.Errorf("unable to retrieve catchpoint version: %v", err)
}

totals, err = ar.AccountsTotals(ctx, true)
if err != nil {
return err
@@ -1309,22 +1322,24 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
if err != nil {
return err
}
// Rename staged v6 tables from catchpoint file to official table names

err = crw.ApplyCatchpointStagingBalances(ctx, basics.Round(balancesRound), basics.Round(hashRound))
if err != nil {
return err
}

// Upgrade to v7
_, err = tx.RunMigrations(ctx, tp, c.ledger.log, 7 /*target database version*/)
if err != nil {
return err
}
// Now that we have upgraded, rename staged v7 tables from the catchpoint file to official names.
// If the catchpoint file didn't have v7 tables, the existing migrated tables will not be overwriten.
err = crw.ApplyCatchpointStagingTablesV7(ctx)
if err != nil {
return err
if catchpointFileVersion == CatchpointFileVersionV8 { // This catchpoint contains onlineaccounts and onlineroundparamstail tables.
// Upgrade to v7 (which adds the onlineaccounts & onlineroundparamstail tables, among others)
_, err = tx.RunMigrations(ctx, tp, c.ledger.log, 7)
if err != nil {
return err
}

// Now that we have upgraded to v7, replace the onlineaccounts and onlineroundparamstail with the staged catchpoint tables.
err = crw.ApplyCatchpointStagingTablesV7(ctx)
if err != nil {
return err
}
}

err = aw.AccountsPutTotals(totals, false)
21 changes: 3 additions & 18 deletions ledger/store/trackerdb/sqlitedriver/catchpoint.go
Original file line number Diff line number Diff line change
@@ -593,25 +593,10 @@ func (cw *catchpointWriter) ApplyCatchpointStagingBalances(ctx context.Context,
return
}

// ApplyCatchpointStagingTablesV7 drops the existing onlineaccounts and onlineroundparamstail tables,
// replacing them with data from the catchpoint staging tables. It should only be used for CatchpointFileVersionV8,
// after the ApplyCatchpointStagingBalances function has been run on DB v6, then upgraded to DB v7.
func (cw *catchpointWriter) ApplyCatchpointStagingTablesV7(ctx context.Context) (err error) {
// Check if catchpoint tables have data
var accountCount int
err = cw.e.QueryRow("SELECT COUNT(1) FROM catchpointonlineaccounts").Scan(&accountCount)
if err != nil {
return err
}

var paramsCount int
err = cw.e.QueryRow("SELECT COUNT(1) FROM catchpointonlineroundparamstail").Scan(&paramsCount)
if err != nil {
return err
}

// If there is no data in the catchpoint staging tables, don't overwrite the existing v6 tables
if accountCount == 0 && paramsCount == 0 {
return nil
}

stmts := []string{
"DROP TABLE IF EXISTS onlineaccounts",
"DROP TABLE IF EXISTS onlineroundparamstail",