diff --git a/internal/pkg/service/stream/storage/statistics/repository/stats_delete_test.go b/internal/pkg/service/stream/storage/statistics/repository/stats_delete_test.go index 7de9f6296d..2e147e3fee 100644 --- a/internal/pkg/service/stream/storage/statistics/repository/stats_delete_test.go +++ b/internal/pkg/service/stream/storage/statistics/repository/stats_delete_test.go @@ -249,7 +249,7 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) { // Create records nodeID := "test-node" - assert.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{ + require.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{ { SliceKey: sliceKey1, FirstRecordAt: utctime.MustParse("2000-01-01T01:00:00.000Z"), @@ -351,7 +351,7 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) { } // Create record - assert.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{ + require.NoError(t, statsRepo.Put(ctx, nodeID, []statistics.PerSlice{ { SliceKey: sliceKey4, FirstRecordAt: utctime.MustParse("2000-01-01T09:00:00.000Z"), @@ -405,3 +405,142 @@ func TestRepository_RollupStatisticsOnFileDelete_LevelTarget(t *testing.T) { assert.False(t, stats.Total.LastRecordAt.IsZero()) } } + +// TestRepository_RollupStatisticsOnFileDelete_LevelTarget_TxnLimit tests statistics reset near the transaction limit. +func TestRepository_RollupStatisticsOnFileDelete_LevelTarget_TxnLimit(t *testing.T) { + t.Parallel() + + // How many files and slices to create + const limit = 500 + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + clk := clockwork.NewFakeClockAt(utctime.MustParse("2000-01-01T01:00:00.000Z").Time()) + by := test.ByUser() + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + + // Get services + d, mocked := dependencies.NewMockedStorageScope(t, ctx, commonDeps.WithClock(clk)) + client := mocked.TestEtcdClient() + defRepo := d.DefinitionRepository() + storageRepo := d.StorageRepository() + fileRepo := storageRepo.File() + sliceRepo := storageRepo.Slice() + volumeRepo := storageRepo.Volume() + statsRepo := d.StatisticsRepository() + + // Log etcd operations + var etcdLogs bytes.Buffer + rawClient := d.EtcdClient() + rawClient.KV = etcdlogger.KVLogWrapper(rawClient.KV, &etcdLogs, etcdlogger.WithMinimal()) + + // Register active volumes + // ----------------------------------------------------------------------------------------------------------------- + { + session, err := concurrency.NewSession(client) + require.NoError(t, err) + defer func() { require.NoError(t, session.Close()) }() + test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1) + } + + // Create branch, source, sink, file, slice + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(ctx).Err()) + source := test.NewSource(sourceKey) + require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err()) + sink := dummy.NewSinkWithLocalStorage(sinkKey) + require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err()) + } + + // Create 2 more files/slices (3 totally) + // ----------------------------------------------------------------------------------------------------------------- + var fileKeys []model.FileKey + var sliceKeys []model.SliceKey + { + for i := 1; i < limit; i++ { + clk.Advance(time.Hour) + require.NoError(t, fileRepo.Rotate(sinkKey, clk.Now()).Do(ctx).Err()) + } + + files, err := fileRepo.ListIn(sinkKey).Do(ctx).All() + require.NoError(t, err) + require.Len(t, files, limit) + for i := 1; i < limit; i++ { + fileKeys = append(fileKeys, files[i].FileKey) + } + + slices, err := sliceRepo.ListIn(sinkKey).Do(ctx).All() + require.NoError(t, err) + require.Len(t, slices, limit) + for i := 1; i < limit; i++ { + sliceKeys = append(sliceKeys, slices[i].SliceKey) + } + } + + // Create records + nodeID := "test-node" + stats := []statistics.PerSlice{} + for _, sliceKey := range sliceKeys { + stats = append(stats, statistics.PerSlice{ + SliceKey: sliceKey, + FirstRecordAt: utctime.MustParse("2000-01-01T01:00:00.000Z"), + LastRecordAt: utctime.MustParse("2000-01-01T02:00:00.000Z"), + RecordsCount: 1, + UncompressedSize: 1, + CompressedSize: 1, + }) + } + require.NoError(t, statsRepo.Put(ctx, nodeID, stats)) + + // Move statistics to the target level + // ----------------------------------------------------------------------------------------------------------------- + { + // Disable sink, it triggers closing of the active file + require.NoError(t, defRepo.Sink().Disable(sinkKey, clk.Now(), by, "some reason").Do(ctx).Err()) + + clk.Advance(time.Hour) + for _, sliceKey := range sliceKeys { + require.NoError(t, sliceRepo.SwitchToUploading(sliceKey, clk.Now(), false).Do(ctx).Err()) + } + + clk.Advance(time.Hour) + for _, sliceKey := range sliceKeys { + require.NoError(t, sliceRepo.SwitchToUploaded(sliceKey, clk.Now()).Do(ctx).Err()) + } + + clk.Advance(time.Hour) + for _, fileKey := range fileKeys { + require.NoError(t, fileRepo.SwitchToImporting(fileKey, clk.Now(), false).Do(ctx).Err()) + } + + clk.Advance(time.Hour) + for _, fileKey := range fileKeys { + require.NoError(t, fileRepo.SwitchToImported(fileKey, clk.Now()).Do(ctx).Err()) + } + } + + // Reset statistics + // ----------------------------------------------------------------------------------------------------------------- + { + clk.Advance(time.Hour) + require.NoError(t, statsRepo.ResetAllSinksStats([]key.SinkKey{sinkKey}).Do(ctx).Err()) + + stats, err := statsRepo.SinkStats(ctx, sinkKey) + require.NoError(t, err) + assert.Equal(t, uint64(1), stats.Total.SlicesCount) + assert.Equal(t, uint64(0), stats.Total.RecordsCount) + assert.Equal(t, datasize.ByteSize(0), stats.Total.CompressedSize) + assert.Equal(t, datasize.ByteSize(0), stats.Total.UncompressedSize) + assert.Equal(t, datasize.ByteSize(0), stats.Total.StagingSize) + assert.False(t, stats.Total.FirstRecordAt.IsZero()) + assert.False(t, stats.Total.LastRecordAt.IsZero()) + } +}