Skip to content

Commit

Permalink
test: Add statistics clear test near ops limit
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Jan 23, 2025
1 parent 55a6b5f commit 1c9d951
Showing 1 changed file with 141 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) {

// Create records
nodeID := "test-node"
assert.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{
require.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{
{
SliceKey: sliceKey1,
FirstRecordAt: utctime.MustParse("2000-01-01T01:00:00.000Z"),
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) {
}

// Create record
assert.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{
require.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{
{
SliceKey: sliceKey4,
FirstRecordAt: utctime.MustParse("2000-01-01T09:00:00.000Z"),
Expand Down Expand Up @@ -405,3 +405,142 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) {
assert.False(t, stats.Total.LastRecordAt.IsZero())
}
}

// TestRepository_RollupStatisticsOnFileDelete_LevelTarget_TxnLimit tests statistics reset near the transaction limit.
func TestRepository_RollupStatisticsOnFileDelete_LevelTarget_TxnLimit(t *testing.T) {
t.Parallel()

// How many files and slices to create
const limit = 500

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

clk := clockwork.NewFakeClockAt(utctime.MustParse("2000-01-01T01:00:00.000Z").Time())
by := test.ByUser()

// Fixtures
projectID := keboola.ProjectID(123)
branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456}
sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"}
sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"}

// Get services
d, mocked := dependencies.NewMockedStorageScope(t, ctx, commonDeps.WithClock(clk))
client := mocked.TestEtcdClient()
defRepo := d.DefinitionRepository()
storageRepo := d.StorageRepository()
fileRepo := storageRepo.File()
sliceRepo := storageRepo.Slice()
volumeRepo := storageRepo.Volume()
statsRepo := d.StatisticsRepository()

// Log etcd operations
var etcdLogs bytes.Buffer
rawClient := d.EtcdClient()
rawClient.KV = etcdlogger.KVLogWrapper(rawClient.KV, &etcdLogs, etcdlogger.WithMinimal())

// Register active volumes
// -----------------------------------------------------------------------------------------------------------------
{
session, err := concurrency.NewSession(client)
require.NoError(t, err)
defer func() { require.NoError(t, session.Close()) }()
test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1)
}

// Create branch, source, sink, file, slice
// -----------------------------------------------------------------------------------------------------------------
{
branch := test.NewBranch(branchKey)
require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(ctx).Err())
source := test.NewSource(sourceKey)
require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err())
sink := dummy.NewSinkWithLocalStorage(sinkKey)
require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err())
}

// Create 2 more files/slices (3 totally)
// -----------------------------------------------------------------------------------------------------------------
var fileKeys []model.FileKey
var sliceKeys []model.SliceKey
{
for i := 1; i < limit; i++ {
clk.Advance(time.Hour)
require.NoError(t, fileRepo.Rotate(sinkKey, clk.Now()).Do(ctx).Err())
}

files, err := fileRepo.ListIn(sinkKey).Do(ctx).All()
require.NoError(t, err)
require.Len(t, files, limit)
for i := 1; i < limit; i++ {
fileKeys = append(fileKeys, files[i].FileKey)
}

slices, err := sliceRepo.ListIn(sinkKey).Do(ctx).All()
require.NoError(t, err)
require.Len(t, slices, limit)
for i := 1; i < limit; i++ {
sliceKeys = append(sliceKeys, slices[i].SliceKey)
}
}

// Create records
nodeID := "test-node"
stats := []statistics.PerSlice{}
for _, sliceKey := range sliceKeys {
stats = append(stats, statistics.PerSlice{
SliceKey: sliceKey,
FirstRecordAt: utctime.MustParse("2000-01-01T01:00:00.000Z"),
LastRecordAt: utctime.MustParse("2000-01-01T02:00:00.000Z"),
RecordsCount: 1,
UncompressedSize: 1,
CompressedSize: 1,
})
}
require.NoError(t, statsRepo.Put(ctx, nodeID, stats))

// Move statistics to the target level
// -----------------------------------------------------------------------------------------------------------------
{
// Disable sink, it triggers closing of the active file
require.NoError(t, defRepo.Sink().Disable(sinkKey, clk.Now(), by, "some reason").Do(ctx).Err())

clk.Advance(time.Hour)
for _, sliceKey := range sliceKeys {
require.NoError(t, sliceRepo.SwitchToUploading(sliceKey, clk.Now(), false).Do(ctx).Err())
}

clk.Advance(time.Hour)
for _, sliceKey := range sliceKeys {
require.NoError(t, sliceRepo.SwitchToUploaded(sliceKey, clk.Now()).Do(ctx).Err())
}

clk.Advance(time.Hour)
for _, fileKey := range fileKeys {
require.NoError(t, fileRepo.SwitchToImporting(fileKey, clk.Now(), false).Do(ctx).Err())
}

clk.Advance(time.Hour)
for _, fileKey := range fileKeys {
require.NoError(t, fileRepo.SwitchToImported(fileKey, clk.Now()).Do(ctx).Err())
}
}

// Reset statistics
// -----------------------------------------------------------------------------------------------------------------
{
clk.Advance(time.Hour)
require.NoError(t, statsRepo.ResetAllSinksStats([]key.SinkKey{sinkKey}).Do(ctx).Err())

stats, err := statsRepo.SinkStats(ctx, sinkKey)
require.NoError(t, err)
assert.Equal(t, uint64(1), stats.Total.SlicesCount)
assert.Equal(t, uint64(0), stats.Total.RecordsCount)
assert.Equal(t, datasize.ByteSize(0), stats.Total.CompressedSize)
assert.Equal(t, datasize.ByteSize(0), stats.Total.UncompressedSize)
assert.Equal(t, datasize.ByteSize(0), stats.Total.StagingSize)
assert.False(t, stats.Total.FirstRecordAt.IsZero())
assert.False(t, stats.Total.LastRecordAt.IsZero())
}
}

0 comments on commit 1c9d951

Please sign in to comment.