From 2e7e55395267701a6d36a86f5b168a0cd346381c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Tue, 2 Jul 2024 11:50:03 +0200 Subject: [PATCH] tests: local storage writer: remove stats backup file - source node is stateless --- .../level/local/writer/format/csv/csv_test.go | 16 +++++++---- .../local/writer/test/benchmark/benchmark.go | 12 ++++++-- .../local/writer/test/testcase/testcase.go | 9 ++++-- .../local/writer/volume/writer_linux_test.go | 2 +- .../level/local/writer/volume/writer_test.go | 28 +++++++++++-------- .../storage/level/local/writer/writer_test.go | 8 ++---- 6 files changed, 45 insertions(+), 30 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go b/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go index 2cf2fdf2b8..1190a18aa6 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/format/csv/csv_test.go @@ -117,13 +117,14 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { syncLock := &sync.Mutex{} // Open volume + volPath := t.TempDir() vol, err := writerVolume.Open( ctx, log.NewNopLogger(), clock.New(), writer.NewEvents(), writer.NewConfig(), - volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "001"}, + volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "001"}, writerVolume.WithFileOpener(func(filePath string) (writerVolume.File, error) { file, err := writerVolume.DefaultFileOpener(filePath) if err != nil { @@ -144,6 +145,7 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { slice.LocalStorage.DiskSync.IntervalTrigger = duration.From(2 * time.Second) val := validator.New() assert.NoError(t, val.Validate(ctx, slice)) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) // Create writer w, err := vol.OpenWriter(slice) @@ -177,14 +179,16 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) { } // Check file content - content, err := os.ReadFile(w.FilePath()) + content, err := os.ReadFile(filePath) assert.NoError(t, err) assert.Equal(t, "\"value\"\n\"value\"\n", string(content)) - // Check rows count file - content, err = os.ReadFile(filepath.Join(w.DirPath(), writer.CompletedWritesCounter)) - assert.NoError(t, err) - assert.Equal(t, "2,2000-01-01T00:00:00.000Z,2000-01-01T00:00:00.000Z", string(content)) + // Check statistics + + assert.Equal(t, uint64(2), w.AcceptedWrites()) + assert.Equal(t, uint64(2), w.CompletedWrites()) + assert.Equal(t, "2000-01-01T00:00:00.000Z", w.FirstRecordAt().String()) + assert.Equal(t, "2000-01-01T00:00:00.000Z", w.LastRecordAt().String()) // Close volume assert.NoError(t, vol.Close(ctx)) diff --git a/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go b/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go index e2a8815b8b..8929175bf7 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go +++ b/internal/pkg/service/stream/storage/level/local/writer/test/benchmark/benchmark.go @@ -3,6 +3,7 @@ package benchmark import ( "context" "os" + "path/filepath" "sync" "testing" "time" @@ -61,12 +62,17 @@ func (wb *WriterBenchmark) Run(b *testing.B) { // Open volume clk := clock.New() now := clk.Now() - spec := volume.Spec{NodeID: "my-node", Path: b.TempDir(), Type: "hdd", Label: "1"} + volPath := b.TempDir() + spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"} vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec) require.NoError(b, err) + // Create slice + slice := wb.newSlice(b, vol) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) + // Create writer - sliceWriter, err := vol.OpenWriter(wb.newSlice(b, vol)) + sliceWriter, err := vol.OpenWriter(slice) require.NoError(b, err) // Create data channel @@ -125,7 +131,7 @@ func (wb *WriterBenchmark) Run(b *testing.B) { if wb.Compression.Type == compression.TypeNone { assert.Equal(b, sliceWriter.CompressedSize(), sliceWriter.UncompressedSize()) } - stat, err := os.Stat(sliceWriter.FilePath()) + stat, err := os.Stat(filePath) assert.NoError(b, err) assert.Equal(b, sliceWriter.CompressedSize(), datasize.ByteSize(stat.Size())) } diff --git a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go index ad528780fc..9a2622fffd 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go +++ b/internal/pkg/service/stream/storage/level/local/writer/test/testcase/testcase.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path/filepath" "sync" "testing" "time" @@ -60,12 +61,14 @@ func (tc *WriterTestCase) Run(t *testing.T) { opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)} clk := clock.New() now := clk.Now() - spec := volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "1"} + volPath := t.TempDir() + spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"} vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...) require.NoError(t, err) // Create a test slice slice := tc.newSlice(t, vol) + filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename) // Create writer w, err := vol.OpenWriter(slice) @@ -127,12 +130,12 @@ func (tc *WriterTestCase) Run(t *testing.T) { assert.NoError(t, vol.Close(ctx)) // Check compressed size - stat, err := os.Stat(w.FilePath()) + stat, err := os.Stat(filePath) require.NoError(t, err) assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match") // Open file - f, err := os.OpenFile(w.FilePath(), os.O_RDONLY, 0o640) + f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640) require.NoError(t, err) // Create file reader diff --git a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go index 4965689fd7..97754838a5 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_linux_test.go @@ -29,7 +29,7 @@ func TestVolume_Writer_AllocateSpace_Enabled(t *testing.T) { // Check file size after allocation // The size is rounded to whole blocks, so we check: // EXPECTED <= ACTUAL SIZE < 2*EXPECTED - allocated, err := diskalloc.Allocated(w.FilePath()) + allocated, err := diskalloc.Allocated(tc.FilePath()) require.NoError(t, err) assert.GreaterOrEqual(t, allocated, expectedSize) assert.Less(t, allocated, 2*expectedSize) diff --git a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go index e39b7917e1..0454001715 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/volume/writer_test.go @@ -73,10 +73,10 @@ func TestVolume_Writer_OpenFile_Ok(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) assert.NoError(t, w.Close(context.Background())) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) } func TestVolume_Writer_OpenFile_MkdirError(t *testing.T) { @@ -180,7 +180,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDisk(t *testing.T) { wg.Wait() // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -282,7 +282,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDiskCache(t *testing.T) { wg.Wait() // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -353,7 +353,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDisk(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -430,7 +430,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDiskCache(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -495,7 +495,7 @@ func TestVolume_Writer_Sync_Disabled(t *testing.T) { assert.NoError(t, w.Close(ctx)) // Check file content - AssertFileContent(t, w.FilePath(), ` + AssertFileContent(t, tc.FilePath(), ` foo,bar,123 foo,bar,123 abc,def,456 @@ -529,11 +529,11 @@ func TestVolume_Writer_AllocateSpace_Error(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Close writer assert.NoError(t, w.Close(ctx)) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Check logs tc.AssertLogs(` @@ -555,11 +555,11 @@ func TestVolume_Writer_AllocateSpace_NotSupported(t *testing.T) { w, err := tc.NewWriter() assert.NoError(t, err) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Close writer assert.NoError(t, w.Close(ctx)) - assert.FileExists(t, w.FilePath()) + assert.FileExists(t, tc.FilePath()) // Check logs tc.AssertLogs(` @@ -582,7 +582,7 @@ func TestVolume_Writer_AllocateSpace_Disabled(t *testing.T) { assert.NoError(t, err) // Check file - no allocation - allocated, err := diskalloc.Allocated(w.FilePath()) + allocated, err := diskalloc.Allocated(tc.FilePath()) require.NoError(t, err) assert.Less(t, allocated, datasize.KB) @@ -645,6 +645,10 @@ func (tc *writerTestCase) NewWriter(opts ...Option) (writer.Writer, error) { return w, nil } +func (tc *writerTestCase) FilePath() string { + return filepath.Join(tc.VolumePath, tc.Slice.LocalStorage.Dir, tc.Slice.LocalStorage.Filename) +} + type testAllocator struct { Ok bool Error error diff --git a/internal/pkg/service/stream/storage/level/local/writer/writer_test.go b/internal/pkg/service/stream/storage/level/local/writer/writer_test.go index 6da6132e6f..a1d141d13e 100644 --- a/internal/pkg/service/stream/storage/level/local/writer/writer_test.go +++ b/internal/pkg/service/stream/storage/level/local/writer/writer_test.go @@ -33,13 +33,11 @@ func TestWriter(t *testing.T) { file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) assert.NoError(t, err) - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents()) require.NoError(t, err) // Test getters assert.Equal(t, slice.SliceKey, w.SliceKey()) - assert.Equal(t, dirPath, w.DirPath()) - assert.Equal(t, filePath, w.FilePath()) // Test write methods assert.NoError(t, w.WriteRecord(clk.Now(), []any{"123", "456", "789"})) @@ -79,7 +77,7 @@ func TestWriter_FlushError(t *testing.T) { return w, nil } - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents()) require.NoError(t, err) // Test Close method @@ -108,7 +106,7 @@ func TestWriter_CloseError(t *testing.T) { return w, nil } - w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents()) + w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents()) require.NoError(t, err) // Test Close method