From 428304ff47af1da09596e5420ac3b473353a4fe0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Tue, 2 Jul 2024 11:49:49 +0200 Subject: [PATCH 1/3] feat: local storage writer: remove stats backup file - source node is stateless --- .../level/local/writer/volume/writer.go | 2 +- .../storage/level/local/writer/writer.go | 84 ++++--------------- 2 files changed, 16 insertions(+), 70 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/writer/volume/writer.go b/internal/pkg/service/stream/storage/level/local/writer/volume/writer.go index f217dd597b..03d17b15f8 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/volume/writer.go +++ b/internal/pkg/service/stream/storage/level/local/writer/volume/writer.go @@ -99,7 +99,7 @@ func (v *Volume) OpenWriter(slice *model.Slice) (w writer.Writer, err error) { } // Create writer - w, err = writer.New(v.ctx, logger, v.clock, v.config.writerConfig, slice, file, dirPath, filePath, v.config.syncerFactory, v.config.formatWriterFactory, v.events) + w, err = writer.New(v.ctx, logger, v.clock, v.config.writerConfig, slice, file, v.config.syncerFactory, v.config.formatWriterFactory, v.events) if err != nil { return nil, err } diff --git a/internal/pkg/service/stream/storage/level/local/writer/writer.go b/internal/pkg/service/stream/storage/level/local/writer/writer.go index 93f9d8bfea..e99a769b3e 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/writer.go +++ b/internal/pkg/service/stream/storage/level/local/writer/writer.go @@ -3,7 +3,6 @@ package writer import ( "context" "io" - "path/filepath" "sync" "time" @@ -23,18 +22,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" ) -const ( - CompletedWritesCounter = "completed_count" - CompressedSizeFile = "compressed_size" - UncompressedSizeFile = "uncompressed_size" -) - // Writer writes values as bytes to the slice file on the disk, in the configured format and compression. -// -// The FormatWriter is used for values to bytes conversion. -// The disksync.Syncer is used for syncing to the cache/disk, -// The writechain.Chain is used to glue together all buffers and intermediate writers. -// The Events are used to dispatch writer open and close events. type Writer interface { StatisticsProvider @@ -45,10 +33,6 @@ type Writer interface { Close(context.Context) error // Events provides listening to the writer lifecycle. Events() *Events - // DirPath is absolute path to the slice directory. It contains slice file and optionally an auxiliary files. - DirPath() string - // FilePath is absolute path to the slice file. - FilePath() string } type StatisticsProvider interface { @@ -69,9 +53,6 @@ type StatisticsProvider interface { // writer implements Writer interface, it wraps common logic for all file types. // For conversion between record values and bytes, the Writer is used. type writer struct { - dirPath string - filePath string - logger log.Logger slice *model.Slice events *Events @@ -86,9 +67,9 @@ type writer struct { writeWg *sync.WaitGroup acceptedWrites *count.Counter - completedWrites *count.CounterWithBackup - compressedMeter *size.MeterWithBackup - uncompressedMeter *size.MeterWithBackup + completedWrites *count.Counter + compressedMeter *size.Meter + uncompressedMeter *size.Meter } func New( @@ -98,20 +79,16 @@ func New( cfg Config, slice *model.Slice, file writechain.File, - dirPath string, - filePath string, syncerFactory disksync.SyncerFactory, formatWriterFactory FormatWriterFactory, volumeEvents *Events, ) (out Writer, err error) { w := &writer{ - dirPath: dirPath, - filePath: filePath, - logger: logger.WithComponent("slice-writer"), - slice: slice, - events: volumeEvents.Clone(), // clone volume events, to attach additional writer specific events - closed: make(chan struct{}), - writeWg: &sync.WaitGroup{}, + logger: logger.WithComponent("slice-writer"), + slice: slice, + events: volumeEvents.Clone(), // clone volume events, to attach additional writer specific events + closed: make(chan struct{}), + writeWg: &sync.WaitGroup{}, } // Close resources on error @@ -123,9 +100,6 @@ func New( if w.chain != nil { _ = w.chain.Close(ctx) } - if w.completedWrites != nil { - _ = w.completedWrites.Close() - } } }() @@ -137,10 +111,7 @@ func New( // Successful writes counter, the value backup is periodically saved to disk. // In the case of non-graceful node shutdown, the initial state is loaded from the disk after the node is restarted. // In that case, the statistics may not be accurate, but this should not happen, and we prefer throughput over the atomicity of statistics. - w.completedWrites, err = count.NewCounterWithBackupFile(ctx, clk, logger, filepath.Join(dirPath, CompletedWritesCounter), cfg.Statistics.DiskSyncInterval) - if err != nil { - return nil, err - } + w.completedWrites = count.NewCounter() } // Create empty chain of writers to the file @@ -160,14 +131,10 @@ func New( // Measure size of compressed data { - backupPath := filepath.Join(dirPath, CompressedSizeFile) - _, err = w.chain.PrependWriterOrErr(func(writer io.Writer) (out io.Writer, err error) { - w.compressedMeter, err = size.NewMeterWithBackupFile(ctx, clk, logger, writer, backupPath, cfg.Statistics.DiskSyncInterval) - return w.compressedMeter, err + w.chain.PrependWriter(func(writer io.Writer) io.Writer { + w.compressedMeter = size.NewMeter(writer) + return w.compressedMeter }) - if err != nil { - return nil, err - } } // Add compression if enabled @@ -189,14 +156,10 @@ func New( } // Measure size of uncompressed CSV data - backupPath := filepath.Join(dirPath, UncompressedSizeFile) - _, err = w.chain.PrependWriterOrErr(func(writer io.Writer) (_ io.Writer, err error) { - w.uncompressedMeter, err = size.NewMeterWithBackupFile(ctx, clk, logger, writer, backupPath, cfg.Statistics.DiskSyncInterval) - return w.uncompressedMeter, err + w.chain.PrependWriter(func(writer io.Writer) io.Writer { + w.uncompressedMeter = size.NewMeter(writer) + return w.uncompressedMeter }) - if err != nil { - return nil, err - } } else { // Size of the compressed and uncompressed data is same w.uncompressedMeter = w.compressedMeter @@ -271,18 +234,6 @@ func (w *writer) Events() *Events { return w.events } -// DirPath to the directory with slice files. -// It is an absolute path. -func (w *writer) DirPath() string { - return w.dirPath -} - -// FilePath to the slice data. -// It is an absolute path. -func (w *writer) FilePath() string { - return w.filePath -} - // AcceptedWrites returns count of write operations waiting for the sync. func (w *writer) AcceptedWrites() uint64 { return w.acceptedWrites.Count() @@ -338,11 +289,6 @@ func (w *writer) Close(ctx context.Context) error { // Wait for running writes w.writeWg.Wait() - // Close, backup counter value - if err := w.completedWrites.Close(); err != nil { - errs.Append(err) - } - if err := w.events.dispatchOnWriterClose(w, errs.ErrorOrNil()); err != nil { errs.Append(err) } From 2e7e55395267701a6d36a86f5b168a0cd346381c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Tue, 2 Jul 2024 11:50:03 +0200 Subject: [PATCH 2/3] tests: local storage writer: remove stats backup file - source node is stateless --- .../level/local/writer/format/csv/csv_test.go | 16 +++++++---- .../local/writer/test/benchmark/benchmark.go | 12 ++++++-- .../local/writer/test/testcase/testcase.go | 9 ++++-- .../local/writer/volume/writer_linux_test.go | 2 +- .../level/local/writer/volume/writer_test.go | 28 +++++++++++-------- .../storage/level/local/writer/writer_test.go | 8 ++---- 6 files changed, 45 insertions(+), 30 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go b/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go index 2cf2fdf2b8..1190a18aa6 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go @@ -117,13 +117,14 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { syncLock := &sync.Mutex{} // Open volume + volPath := t.TempDir() vol, err := writerVolume.Open( ctx, log.NewNopLogger(), clock.New(), writer.NewEvents(), writer.NewConfig(), - volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "001"}, + volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "001"}, writerVolume.WithFileOpener(func(filePath string) (writerVolume.File, error) { file, err := writerVolume.DefaultFileOpener(filePath) if err != nil { @@ -144,6 +145,7 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { slice.LocalStorage.DiskSync.IntervalTrigger = duration.From(2 * time.Second) val := validator.New() assert.NoError(t, val.Validate(ctx, slice)) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) // Create writer w, err := vol.OpenWriter(slice) @@ -177,14 +179,16 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { } // Check file content - content, err := os.ReadFile(w.FilePath()) + content, err := os.ReadFile(filePath) assert.NoError(t, err) assert.Equal(t, "\"value\"\n\"value\"\n", string(content)) - // Check rows count file - content, err = os.ReadFile(filepath.Join(w.DirPath(), writer.CompletedWritesCounter)) - assert.NoError(t, err) - assert.Equal(t, "2,2000-01-01T00:00:00.000Z,2000-01-01T00:00:00.000Z", string(content)) + // Check statistics + + assert.Equal(t, uint64(2), w.AcceptedWrites()) + assert.Equal(t, uint64(2), w.CompletedWrites()) + assert.Equal(t, "2000-01-01T00:00:00.000Z", w.FirstRecordAt().String()) + assert.Equal(t, "2000-01-01T00:00:00.000Z", w.LastRecordAt().String()) // Close volume assert.NoError(t, vol.Close(ctx)) diff --git a/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go b/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go index e2a8815b8b..8929175bf7 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go +++ b/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go @@ -3,6 +3,7 @@ package benchmark import ( "context" "os" + "path/filepath" "sync" "testing" "time" @@ -61,12 +62,17 @@ func (wb *WriterBenchmark) Run(b *testing.B) { // Open volume clk := clock.New() now := clk.Now() - spec := volume.Spec{NodeID: "my-node", Path: b.TempDir(), Type: "hdd", Label: "1"} + volPath := b.TempDir() + spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"} vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec) require.NoError(b, err) + // Create slice + slice := wb.newSlice(b, vol) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) + // Create writer - sliceWriter, err := vol.OpenWriter(wb.newSlice(b, vol)) + sliceWriter, err := vol.OpenWriter(slice) require.NoError(b, err) // Create data channel @@ -125,7 +131,7 @@ func (wb *WriterBenchmark) Run(b *testing.B) { if wb.Compression.Type == compression.TypeNone { assert.Equal(b, sliceWriter.CompressedSize(), sliceWriter.UncompressedSize()) } - stat, err := os.Stat(sliceWriter.FilePath()) + stat, err := os.Stat(filePath) assert.NoError(b, err) assert.Equal(b, sliceWriter.CompressedSize(), datasize.ByteSize(stat.Size())) } diff --git a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go index ad528780fc..9a2622fffd 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go +++ b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path/filepath" "sync" "testing" "time" @@ -60,12 +61,14 @@ func (tc *WriterTestCase) Run(t *testing.T) { opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)} clk := clock.New() now := clk.Now() - spec := volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "1"} + volPath := t.TempDir() + spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"} vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...) require.NoError(t, err) // Create a test slice slice := tc.newSlice(t, vol) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) // Create writer w, err := vol.OpenWriter(slice) @@ -127,12 +130,12 @@ func (tc *WriterTestCase) Run(t *testing.T) { assert.NoError(t, vol.Close(ctx)) // Check compressed size - stat, err := os.Stat(w.FilePath()) + stat, err := os.Stat(filePath) require.NoError(t, err) assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match") // Open file - f, err := os.OpenFile(w.FilePath(), os.O_RDONLY, 0o640) + f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640) require.NoError(t, err) // Create file reader diff --git a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go index 4965689fd7..97754838a5 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go @@ -29,7 +29,7 @@ func TestVolume_Writer_AllocateSpace_Enabled(t *testing.T) { // Check file size after allocation // The size is rounded to whole blocks, so we check: // EXPECTED <= ACTUAL SIZE < 2*EXPECTED - allocated, err := diskalloc.Allocated(w.FilePath()) + allocated, err := diskalloc.Allocated(tc.FilePath()) require.NoError(t, err) assert.GreaterOrEqual(t, allocated, expectedSize) assert.Less(t, allocated, 2*expectedSize) diff --git a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go index e39b7917e1..0454001715 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go @@ -73,10 +73,10 @@ func TestVolume_Writer_OpenFile_Ok(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) assert.NoError(t, w.Close(context.Background())) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) } func TestVolume_Writer_OpenFile_MkdirError(t *testing.T) { @@ -180,7 +180,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDisk(t *testing.T) { wg.Wait() // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -282,7 +282,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDiskCache(t *testing.T) { wg.Wait() // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -353,7 +353,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDisk(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -430,7 +430,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDiskCache(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -495,7 +495,7 @@ func TestVolume_Writer_Sync_Disabled(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -529,11 +529,11 @@ func TestVolume_Writer_AllocateSpace_Error(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Close writer assert.NoError(t, w.Close(ctx)) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Check logs tc.AssertLogs(` @@ -555,11 +555,11 @@ func TestVolume_Writer_AllocateSpace_NotSupported(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Close writer assert.NoError(t, w.Close(ctx)) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Check logs tc.AssertLogs(` @@ -582,7 +582,7 @@ func TestVolume_Writer_AllocateSpace_Disabled(t *testing.T) { assert.NoError(t, err) // Check file - no allocation - allocated, err := diskalloc.Allocated(w.FilePath()) + allocated, err := diskalloc.Allocated(tc.FilePath()) require.NoError(t, err) assert.Less(t, allocated, datasize.KB) @@ -645,6 +645,10 @@ func (tc *writerTestCase) NewWriter(opts ...Option) (writer.Writer, error) { return w, nil } +func (tc *writerTestCase) FilePath() string { + return filepath.Join(tc.VolumePath, tc.Slice.LocalStorage.Dir, tc.Slice.LocalStorage.Filename) +} + type testAllocator struct { Ok bool Error error diff --git a/internal/pkg/service/stream/storage/level/local/writer/writer_test.go b/internal/pkg/service/stream/storage/level/local/writer/writer_test.go index 6da6132e6f..a1d141d13e 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/writer_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/writer_test.go @@ -33,13 +33,11 @@ func TestWriter(t *testing.T) { file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) assert.NoError(t, err) - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents()) require.NoError(t, err) // Test getters assert.Equal(t, slice.SliceKey, w.SliceKey()) - assert.Equal(t, dirPath, w.DirPath()) - assert.Equal(t, filePath, w.FilePath()) // Test write methods assert.NoError(t, w.WriteRecord(clk.Now(), []any{"123", "456", "789"})) @@ -79,7 +77,7 @@ func TestWriter_FlushError(t *testing.T) { return w, nil } - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents()) require.NoError(t, err) // Test Close method @@ -108,7 +106,7 @@ func TestWriter_CloseError(t *testing.T) { return w, nil } - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents()) require.NoError(t, err) // Test Close method From 14442efd282e611de93ef327d9e6fee24da8e6e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Wed, 3 Jul 2024 09:11:39 +0200 Subject: [PATCH 3/3] tests: Fix writer test, use statistics/collector --- .../local/writer/test/testcase/testcase.go | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go index 9a2622fffd..c32c768fbb 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go +++ b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go @@ -10,12 +10,11 @@ import ( "testing" "time" - "github.com/benbjohnson/clock" "github.com/c2h5oh/datasize" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table/column" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/compression" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/disksync" @@ -23,8 +22,8 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer" writerVolume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer/volume" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/collector" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" - "github.com/keboola/keboola-as-code/internal/pkg/utils/testhelper" "github.com/keboola/keboola-as-code/internal/pkg/validator" ) @@ -53,17 +52,19 @@ func (tc *WriterTestCase) Run(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Setup logger - logger := log.NewDebugLogger() - logger.ConnectTo(testhelper.VerboseStdout()) + d, mock := dependencies.NewMockedLocalStorageScope(t) + cfg := mock.TestConfig() + + // Start statistics collector + events := writer.NewEvents() + collector.Start(d, events, cfg.Storage.Statistics.Collector, cfg.NodeID) // Open volume opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)} - clk := clock.New() - now := clk.Now() + now := d.Clock().Now() volPath := t.TempDir() spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"} - vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...) + vol, err := writerVolume.Open(ctx, d.Logger(), d.Clock(), events, cfg.Storage.Level.Local.Writer, spec, opts...) require.NoError(t, err) // Create a test slice @@ -122,17 +123,13 @@ func (tc *WriterTestCase) Run(t *testing.T) { // Close the writer require.NoError(t, w.Close(ctx)) - assert.Equal(t, uint64(rowsCount), w.CompletedWrites()) - assert.NotEmpty(t, w.CompressedSize()) - assert.NotEmpty(t, w.UncompressedSize()) // Close volume assert.NoError(t, vol.Close(ctx)) - // Check compressed size - stat, err := os.Stat(filePath) + // Get file size + fileStat, err := os.Stat(filePath) require.NoError(t, err) - assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match") // Open file f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640) @@ -147,13 +144,20 @@ func (tc *WriterTestCase) Run(t *testing.T) { // Read file content content, err := io.ReadAll(reader) require.NoError(t, err) - assert.Equal(t, int(w.UncompressedSize().Bytes()), len(content), "uncompressed file size doesn't match") // Check written data tc.Validator(t, string(content)) // Close file assert.NoError(t, f.Close()) + + // Check statistics + sliceStats, err := d.StatisticsRepository().SliceStats(ctx, slice.SliceKey) + if assert.NoError(t, err) { + assert.Equal(t, int(sliceStats.Total.RecordsCount), rowsCount, "records count doesn't match") + assert.Equal(t, int64(sliceStats.Total.CompressedSize.Bytes()), fileStat.Size(), "compressed file size doesn't match") + assert.Equal(t, int(sliceStats.Total.UncompressedSize.Bytes()), len(content), "uncompressed file size doesn't match") + } } func (tc *WriterTestCase) newSlice(t *testing.T, volume *writerVolume.Volume) *model.Slice {