Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add catchpoint downloading/parsing to e2e catchup tests #6224

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ledger/acctdeltas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
last64KSize = chunkSize
last64KAccountCreationTime = time.Duration(0)
}
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
chunk.Balances = make([]encoded.BalanceRecordV6, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encoded.BalanceRecordV6
Expand Down
25 changes: 14 additions & 11 deletions ledger/catchpointfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type catchpointFileWriter struct {
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunk CatchpointSnapshotChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
Expand All @@ -80,12 +80,15 @@ type catchpointFileWriter struct {
onlineRoundParamsDone bool
}

type catchpointFileBalancesChunkV5 struct {
// CatchpointSnapshotChunkV5 defines the encoding of "balances.X.msgpack" files in the catchpoint snapshot
// used before database schema v6, which split accounts from asset/app resource data.
type CatchpointSnapshotChunkV5 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`
Balances []encoded.BalanceRecordV5 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
}

type catchpointFileChunkV6 struct {
// CatchpointSnapshotChunkV6 defines the current encoding of "balances.X.msgpack" files in the catchpoint snapshot.
type CatchpointSnapshotChunkV6 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
Expand All @@ -95,7 +98,7 @@ type catchpointFileChunkV6 struct {
OnlineRoundParams []encoded.OnlineRoundParamsRecordV6 `codec:"orp,allocbound=BalancesPerCatchpointFileChunk"`
}

func (chunk catchpointFileChunkV6) empty() bool {
func (chunk CatchpointSnapshotChunkV6) empty() bool {
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0 && len(chunk.OnlineAccounts) == 0 && len(chunk.OnlineRoundParams) == 0
}

Expand Down Expand Up @@ -216,7 +219,7 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
return
}

writerRequest := make(chan catchpointFileChunkV6, 1)
writerRequest := make(chan CatchpointSnapshotChunkV6, 1)
writerResponse := make(chan error, 2)
go cw.asyncWriter(writerRequest, writerResponse, cw.chunkNum)
defer func() {
Expand Down Expand Up @@ -298,11 +301,11 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
cw.chunkNum++
writerRequest <- cw.chunk
// indicate that we need a readDatabaseStep
cw.chunk = catchpointFileChunkV6{}
cw.chunk = CatchpointSnapshotChunkV6{}
}
}

func (cw *catchpointFileWriter) asyncWriter(chunks chan catchpointFileChunkV6, response chan error, chunkNum uint64) {
func (cw *catchpointFileWriter) asyncWriter(chunks chan CatchpointSnapshotChunkV6, response chan error, chunkNum uint64) {
defer close(response)
for chk := range chunks {
chunkNum++
Expand Down Expand Up @@ -341,7 +344,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
return err
}
if len(balances) > 0 {
cw.chunk = catchpointFileChunkV6{Balances: balances, numAccounts: numAccounts}
cw.chunk = CatchpointSnapshotChunkV6{Balances: balances, numAccounts: numAccounts}
return nil
}
// It might seem reasonable, but do not close accountsIterator here,
Expand Down Expand Up @@ -372,7 +375,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(kvrs) > 0 {
cw.chunk = catchpointFileChunkV6{KVs: kvrs}
cw.chunk = CatchpointSnapshotChunkV6{KVs: kvrs}
return nil
}
// Do not close kvRows here, or it will start over on the next iteration
Expand Down Expand Up @@ -401,7 +404,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(onlineAccts) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineAccounts: onlineAccts}
cw.chunk = CatchpointSnapshotChunkV6{OnlineAccounts: onlineAccts}
return nil
}
// Do not close onlineAccountRows here, or it will start over on the next iteration
Expand Down Expand Up @@ -430,7 +433,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(onlineRndParams) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineRoundParams: onlineRndParams}
cw.chunk = CatchpointSnapshotChunkV6{OnlineRoundParams: onlineRndParams}
return nil
}
// Do not close onlineRndParamsRows here, or it will start over on the next iteration
Expand Down
34 changes: 21 additions & 13 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ func TestCatchpointFileBalancesChunkEncoding(t *testing.T) {
kvs[i] = kv
}

chunk1 := catchpointFileChunkV6{}
chunk1 := CatchpointSnapshotChunkV6{}
chunk1.Balances = balances
chunk1.KVs = kvs
encodedChunk := chunk1.MarshalMsg(nil)

var chunk2 catchpointFileChunkV6
var chunk2 CatchpointSnapshotChunkV6
_, err := chunk2.UnmarshalMsg(encodedChunk)
require.NoError(t, err)

Expand Down Expand Up @@ -291,7 +291,7 @@ func TestBasicCatchpointWriter(t *testing.T) {
balanceFileName := fmt.Sprintf(catchpointBalancesFileNameTemplate, 1)
require.Equal(t, balanceFileName, catchpointContent[1].headerName)

var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err = protocol.Decode(catchpointContent[1].data, &chunk)
require.NoError(t, err)
require.Equal(t, uint64(len(accts)), uint64(len(chunk.Balances)))
Expand Down Expand Up @@ -834,14 +834,16 @@ func TestExactAccountChunk(t *testing.T) {
partitiontest.PartitionTest(t)
// t.Parallel() // probably not good to parallelize catchpoint file save/load

t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40) })
t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 100) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40) })
t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 100) })
t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40, false) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40, false) })
t.Run("v40_noSPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 63, false) })
t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 64, true) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40, false) })
t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 64, true) })
t.Run("future_SPstall300", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 300, true) })
}

func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int) {
func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int, longHistory bool) {
genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(c *ledgertesting.GenesisCfg) {
c.OnlineCount = 1 // addrs[0] is online
}, ledgertesting.TurnOffRewards)
Expand Down Expand Up @@ -901,27 +903,33 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraB
valLowestRound := dl.validator.trackers.acctsOnline.voters.lowestRound(valDBRound)
require.Equal(t, genLowestRound, valLowestRound)
require.Equal(t, genDBRound, valDBRound)
// genDBRound is MaxAcctLookback (4) rounds behind genR
require.Equal(t, genR, genDBRound+basics.Round(dl.generator.cfg.MaxAcctLookback))
// This assert, plus previous assert on genR guarantees that genDBRound is:
// BalancesPerCatchpointFileChunk-12+extraBlocks-MaxAcctLookback (560 for 64 extraBlocks, 536 for 40 extraBlocks)

var onlineExcludeBefore basics.Round
// we added so many blocks that lowestRound is stuck at first state proof, round 240?
if normalHorizon := (genDBRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalHorizon == genLowestRound {
t.Logf("subtest is exercising case where 320 rounds of history are already in DB")
if normalHorizon := (genDBRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalHorizon <= genLowestRound {
t.Logf("subtest is exercising case where lowestRound from votersTracker is satsified by the existing history")
require.EqualValues(t, genLowestRound, params.StateProofInterval-params.StateProofVotersLookback)
require.False(t, longHistory)
} else if normalHorizon > genLowestRound {
t.Logf("subtest is exercising case where votersTracker causes onlineaccounts & onlineroundparams to extend history to round %d (DBRound %d)", genLowestRound, genDBRound)
onlineExcludeBefore = normalHorizon // fails without this adjustment
require.True(t, longHistory)
}

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, onlineExcludeBefore)

decodedData := readCatchpointFile(t, catchpointFilePath)

// decode and verify some stats about balances chunk contents
var chunks []catchpointFileChunkV6
var chunks []CatchpointSnapshotChunkV6
for i, d := range decodedData {
t.Logf("section %d: %s", i, d.headerName)
if strings.HasPrefix(d.headerName, "balances.") {
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err := protocol.Decode(d.data, &chunk)
require.NoError(t, err)
t.Logf("chunk %d balances: %d, kvs: %d, onlineaccounts: %d, onlineroundparams: %d", i, len(chunk.Balances), len(chunk.KVs), len(chunk.OnlineAccounts), len(chunk.OnlineRoundParams))
Expand Down
4 changes: 2 additions & 2 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@
// we won't get to this point, since we've already verified the version in processStagingContent
return errors.New("unsupported version")
case CatchpointFileVersionV5:
var balances catchpointFileBalancesChunkV5
var balances CatchpointSnapshotChunkV5

Check warning on line 524 in ledger/catchupaccessor.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchupaccessor.go#L524

Added line #L524 was not covered by tests
err = protocol.Decode(bytes, &balances)
if err != nil {
return err
Expand All @@ -542,7 +542,7 @@
fallthrough
case CatchpointFileVersionV8:
// V8 added online accounts and online round params data + hashes, and added them to the v6 chunk format
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err = protocol.Decode(bytes, &chunk)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func createTestingEncodedChunks(accountsCount uint64) (encodedAccountChunks [][]
if accounts >= accountsCount-64*1024 && last64KIndex == -1 {
last64KIndex = len(encodedAccountChunks)
}
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
chunk.Balances = make([]encoded.BalanceRecordV6, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encoded.BalanceRecordV6
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestCatchupAccessorResourceCountMismatch(t *testing.T) {
err = catchpointAccessor.ProcessStagingBalances(ctx, CatchpointContentFileName, encodedFileHeader, &progress)
require.NoError(t, err)

var balances catchpointFileChunkV6
var balances CatchpointSnapshotChunkV6
balances.Balances = make([]encoded.BalanceRecordV6, 1)
var randomAccount encoded.BalanceRecordV6
accountData := trackerdb.BaseAccountData{}
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestCatchupAccessorProcessStagingBalances(t *testing.T) {
}

// make chunks
chunks := []catchpointFileChunkV6{
chunks := []CatchpointSnapshotChunkV6{
{
Balances: []encoded.BalanceRecordV6{
encodedBalanceRecordFromBase(ledgertesting.RandomAddress(), acctA, nil, false),
Expand Down
Loading
Loading