Skip to content

Commit

Permalink
tests: local storage writer: remove stats backup file - source node i…
Browse files Browse the repository at this point in the history
…s stateless
  • Loading branch information
michaljurecko committed Jul 3, 2024
1 parent 428304f commit 2e7e553
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
syncLock := &sync.Mutex{}

// Open volume
volPath := t.TempDir()
vol, err := writerVolume.Open(
ctx,
log.NewNopLogger(),
clock.New(),
writer.NewEvents(),
writer.NewConfig(),
volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "001"},
volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "001"},
writerVolume.WithFileOpener(func(filePath string) (writerVolume.File, error) {
file, err := writerVolume.DefaultFileOpener(filePath)
if err != nil {
Expand All @@ -144,6 +145,7 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
slice.LocalStorage.DiskSync.IntervalTrigger = duration.From(2 * time.Second)
val := validator.New()
assert.NoError(t, val.Validate(ctx, slice))
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
w, err := vol.OpenWriter(slice)
Expand Down Expand Up @@ -177,14 +179,16 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
}

// Check file content
content, err := os.ReadFile(w.FilePath())
content, err := os.ReadFile(filePath)
assert.NoError(t, err)
assert.Equal(t, "\"value\"\n\"value\"\n", string(content))

// Check rows count file
content, err = os.ReadFile(filepath.Join(w.DirPath(), writer.CompletedWritesCounter))
assert.NoError(t, err)
assert.Equal(t, "2,2000-01-01T00:00:00.000Z,2000-01-01T00:00:00.000Z", string(content))
// Check statistics

assert.Equal(t, uint64(2), w.AcceptedWrites())
assert.Equal(t, uint64(2), w.CompletedWrites())
assert.Equal(t, "2000-01-01T00:00:00.000Z", w.FirstRecordAt().String())
assert.Equal(t, "2000-01-01T00:00:00.000Z", w.LastRecordAt().String())

// Close volume
assert.NoError(t, vol.Close(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package benchmark
import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -61,12 +62,17 @@ func (wb *WriterBenchmark) Run(b *testing.B) {
// Open volume
clk := clock.New()
now := clk.Now()
spec := volume.Spec{NodeID: "my-node", Path: b.TempDir(), Type: "hdd", Label: "1"}
volPath := b.TempDir()
spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec)
require.NoError(b, err)

// Create slice
slice := wb.newSlice(b, vol)
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
sliceWriter, err := vol.OpenWriter(wb.newSlice(b, vol))
sliceWriter, err := vol.OpenWriter(slice)
require.NoError(b, err)

// Create data channel
Expand Down Expand Up @@ -125,7 +131,7 @@ func (wb *WriterBenchmark) Run(b *testing.B) {
if wb.Compression.Type == compression.TypeNone {
assert.Equal(b, sliceWriter.CompressedSize(), sliceWriter.UncompressedSize())
}
stat, err := os.Stat(sliceWriter.FilePath())
stat, err := os.Stat(filePath)
assert.NoError(b, err)
assert.Equal(b, sliceWriter.CompressedSize(), datasize.ByteSize(stat.Size()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -60,12 +61,14 @@ func (tc *WriterTestCase) Run(t *testing.T) {
opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)}
clk := clock.New()
now := clk.Now()
spec := volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "1"}
volPath := t.TempDir()
spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...)
require.NoError(t, err)

// Create a test slice
slice := tc.newSlice(t, vol)
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
w, err := vol.OpenWriter(slice)
Expand Down Expand Up @@ -127,12 +130,12 @@ func (tc *WriterTestCase) Run(t *testing.T) {
assert.NoError(t, vol.Close(ctx))

// Check compressed size
stat, err := os.Stat(w.FilePath())
stat, err := os.Stat(filePath)
require.NoError(t, err)
assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match")

// Open file
f, err := os.OpenFile(w.FilePath(), os.O_RDONLY, 0o640)
f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640)
require.NoError(t, err)

// Create file reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestVolume_Writer_AllocateSpace_Enabled(t *testing.T) {
// Check file size after allocation
// The size is rounded to whole blocks, so we check:
// EXPECTED <= ACTUAL SIZE < 2*EXPECTED
allocated, err := diskalloc.Allocated(w.FilePath())
allocated, err := diskalloc.Allocated(tc.FilePath())
require.NoError(t, err)
assert.GreaterOrEqual(t, allocated, expectedSize)
assert.Less(t, allocated, 2*expectedSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestVolume_Writer_OpenFile_Ok(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

assert.NoError(t, w.Close(context.Background()))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())
}

func TestVolume_Writer_OpenFile_MkdirError(t *testing.T) {
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDisk(t *testing.T) {
wg.Wait()

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDiskCache(t *testing.T) {
wg.Wait()

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDisk(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDiskCache(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestVolume_Writer_Sync_Disabled(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -529,11 +529,11 @@ func TestVolume_Writer_AllocateSpace_Error(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Close writer
assert.NoError(t, w.Close(ctx))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Check logs
tc.AssertLogs(`
Expand All @@ -555,11 +555,11 @@ func TestVolume_Writer_AllocateSpace_NotSupported(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Close writer
assert.NoError(t, w.Close(ctx))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Check logs
tc.AssertLogs(`
Expand All @@ -582,7 +582,7 @@ func TestVolume_Writer_AllocateSpace_Disabled(t *testing.T) {
assert.NoError(t, err)

// Check file - no allocation
allocated, err := diskalloc.Allocated(w.FilePath())
allocated, err := diskalloc.Allocated(tc.FilePath())
require.NoError(t, err)
assert.Less(t, allocated, datasize.KB)

Expand Down Expand Up @@ -645,6 +645,10 @@ func (tc *writerTestCase) NewWriter(opts ...Option) (writer.Writer, error) {
return w, nil
}

func (tc *writerTestCase) FilePath() string {
return filepath.Join(tc.VolumePath, tc.Slice.LocalStorage.Dir, tc.Slice.LocalStorage.Filename)
}

type testAllocator struct {
Ok bool
Error error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ func TestWriter(t *testing.T) {
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644)
assert.NoError(t, err)

w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents())
w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, test.DummyWriterFactory, writer.NewEvents())
require.NoError(t, err)

// Test getters
assert.Equal(t, slice.SliceKey, w.SliceKey())
assert.Equal(t, dirPath, w.DirPath())
assert.Equal(t, filePath, w.FilePath())

// Test write methods
assert.NoError(t, w.WriteRecord(clk.Now(), []any{"123", "456", "789"}))
Expand Down Expand Up @@ -79,7 +77,7 @@ func TestWriter_FlushError(t *testing.T) {
return w, nil
}

w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents())
w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents())
require.NoError(t, err)

// Test Close method
Expand Down Expand Up @@ -108,7 +106,7 @@ func TestWriter_CloseError(t *testing.T) {
return w, nil
}

w, err := writer.New(ctx, logger, clk, cfg, slice, file, dirPath, filePath, disksync.NewSyncer, writerFactory, writer.NewEvents())
w, err := writer.New(ctx, logger, clk, cfg, slice, file, disksync.NewSyncer, writerFactory, writer.NewEvents())
require.NoError(t, err)

// Test Close method
Expand Down

0 comments on commit 2e7e553

Please sign in to comment.