Skip to content

Commit

Permalink
Merge pull request #1876 from keboola/michaljurecko-PSGO-591-stats-rm…
Browse files Browse the repository at this point in the history
…-backup-files

feat: local storage writer: remove stats backup file
  • Loading branch information
michaljurecko authored Jul 3, 2024
2 parents e88ab0c + 14442ef commit 290836e
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
syncLock := &sync.Mutex{}

// Open volume
volPath := t.TempDir()
vol, err := writerVolume.Open(
ctx,
log.NewNopLogger(),
clock.New(),
writer.NewEvents(),
writer.NewConfig(),
volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "001"},
volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "001"},
writerVolume.WithFileOpener(func(filePath string) (writerVolume.File, error) {
file, err := writerVolume.DefaultFileOpener(filePath)
if err != nil {
Expand All @@ -144,6 +145,7 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
slice.LocalStorage.DiskSync.IntervalTrigger = duration.From(2 * time.Second)
val := validator.New()
assert.NoError(t, val.Validate(ctx, slice))
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
w, err := vol.OpenWriter(slice)
Expand Down Expand Up @@ -177,14 +179,16 @@ func TestCSVWriter_Close_WaitForWrites(t *testing.T) {
}

// Check file content
content, err := os.ReadFile(w.FilePath())
content, err := os.ReadFile(filePath)
assert.NoError(t, err)
assert.Equal(t, "\"value\"\n\"value\"\n", string(content))

// Check rows count file
content, err = os.ReadFile(filepath.Join(w.DirPath(), writer.CompletedWritesCounter))
assert.NoError(t, err)
assert.Equal(t, "2,2000-01-01T00:00:00.000Z,2000-01-01T00:00:00.000Z", string(content))
// Check statistics

assert.Equal(t, uint64(2), w.AcceptedWrites())
assert.Equal(t, uint64(2), w.CompletedWrites())
assert.Equal(t, "2000-01-01T00:00:00.000Z", w.FirstRecordAt().String())
assert.Equal(t, "2000-01-01T00:00:00.000Z", w.LastRecordAt().String())

// Close volume
assert.NoError(t, vol.Close(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package benchmark
import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -61,12 +62,17 @@ func (wb *WriterBenchmark) Run(b *testing.B) {
// Open volume
clk := clock.New()
now := clk.Now()
spec := volume.Spec{NodeID: "my-node", Path: b.TempDir(), Type: "hdd", Label: "1"}
volPath := b.TempDir()
spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec)
require.NoError(b, err)

// Create slice
slice := wb.newSlice(b, vol)
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
sliceWriter, err := vol.OpenWriter(wb.newSlice(b, vol))
sliceWriter, err := vol.OpenWriter(slice)
require.NoError(b, err)

// Create data channel
Expand Down Expand Up @@ -125,7 +131,7 @@ func (wb *WriterBenchmark) Run(b *testing.B) {
if wb.Compression.Type == compression.TypeNone {
assert.Equal(b, sliceWriter.CompressedSize(), sliceWriter.UncompressedSize())
}
stat, err := os.Stat(sliceWriter.FilePath())
stat, err := os.Stat(filePath)
assert.NoError(b, err)
assert.Equal(b, sliceWriter.CompressedSize(), datasize.ByteSize(stat.Size()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/c2h5oh/datasize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table/column"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/compression"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/disksync"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer"
writerVolume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer/volume"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/collector"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/utils/testhelper"
"github.com/keboola/keboola-as-code/internal/pkg/validator"
)

Expand Down Expand Up @@ -52,20 +52,24 @@ func (tc *WriterTestCase) Run(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Setup logger
logger := log.NewDebugLogger()
logger.ConnectTo(testhelper.VerboseStdout())
d, mock := dependencies.NewMockedLocalStorageScope(t)
cfg := mock.TestConfig()

// Start statistics collector
events := writer.NewEvents()
collector.Start(d, events, cfg.Storage.Statistics.Collector, cfg.NodeID)

// Open volume
opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)}
clk := clock.New()
now := clk.Now()
spec := volume.Spec{NodeID: "my-node", Path: t.TempDir(), Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...)
now := d.Clock().Now()
volPath := t.TempDir()
spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, d.Logger(), d.Clock(), events, cfg.Storage.Level.Local.Writer, spec, opts...)
require.NoError(t, err)

// Create a test slice
slice := tc.newSlice(t, vol)
filePath := filepath.Join(volPath, slice.LocalStorage.Dir, slice.LocalStorage.Filename)

// Create writer
w, err := vol.OpenWriter(slice)
Expand Down Expand Up @@ -119,20 +123,16 @@ func (tc *WriterTestCase) Run(t *testing.T) {

// Close the writer
require.NoError(t, w.Close(ctx))
assert.Equal(t, uint64(rowsCount), w.CompletedWrites())
assert.NotEmpty(t, w.CompressedSize())
assert.NotEmpty(t, w.UncompressedSize())

// Close volume
assert.NoError(t, vol.Close(ctx))

// Check compressed size
stat, err := os.Stat(w.FilePath())
// Get file size
fileStat, err := os.Stat(filePath)
require.NoError(t, err)
assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match")

// Open file
f, err := os.OpenFile(w.FilePath(), os.O_RDONLY, 0o640)
f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640)
require.NoError(t, err)

// Create file reader
Expand All @@ -144,13 +144,20 @@ func (tc *WriterTestCase) Run(t *testing.T) {
// Read file content
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, int(w.UncompressedSize().Bytes()), len(content), "uncompressed file size doesn't match")

// Check written data
tc.Validator(t, string(content))

// Close file
assert.NoError(t, f.Close())

// Check statistics
sliceStats, err := d.StatisticsRepository().SliceStats(ctx, slice.SliceKey)
if assert.NoError(t, err) {
assert.Equal(t, int(sliceStats.Total.RecordsCount), rowsCount, "records count doesn't match")
assert.Equal(t, int64(sliceStats.Total.CompressedSize.Bytes()), fileStat.Size(), "compressed file size doesn't match")
assert.Equal(t, int(sliceStats.Total.UncompressedSize.Bytes()), len(content), "uncompressed file size doesn't match")
}
}

func (tc *WriterTestCase) newSlice(t *testing.T, volume *writerVolume.Volume) *model.Slice {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (v *Volume) OpenWriter(slice *model.Slice) (w writer.Writer, err error) {
}

// Create writer
w, err = writer.New(v.ctx, logger, v.clock, v.config.writerConfig, slice, file, dirPath, filePath, v.config.syncerFactory, v.config.formatWriterFactory, v.events)
w, err = writer.New(v.ctx, logger, v.clock, v.config.writerConfig, slice, file, v.config.syncerFactory, v.config.formatWriterFactory, v.events)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestVolume_Writer_AllocateSpace_Enabled(t *testing.T) {
// Check file size after allocation
// The size is rounded to whole blocks, so we check:
// EXPECTED <= ACTUAL SIZE < 2*EXPECTED
allocated, err := diskalloc.Allocated(w.FilePath())
allocated, err := diskalloc.Allocated(tc.FilePath())
require.NoError(t, err)
assert.GreaterOrEqual(t, allocated, expectedSize)
assert.Less(t, allocated, 2*expectedSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestVolume_Writer_OpenFile_Ok(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

assert.NoError(t, w.Close(context.Background()))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())
}

func TestVolume_Writer_OpenFile_MkdirError(t *testing.T) {
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDisk(t *testing.T) {
wg.Wait()

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestVolume_Writer_Sync_Enabled_Wait_ToDiskCache(t *testing.T) {
wg.Wait()

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDisk(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestVolume_Writer_Sync_Enabled_NoWait_ToDiskCache(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestVolume_Writer_Sync_Disabled(t *testing.T) {
assert.NoError(t, w.Close(ctx))

// Check file content
AssertFileContent(t, w.FilePath(), `
AssertFileContent(t, tc.FilePath(), `
foo,bar,123
foo,bar,123
abc,def,456
Expand Down Expand Up @@ -529,11 +529,11 @@ func TestVolume_Writer_AllocateSpace_Error(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Close writer
assert.NoError(t, w.Close(ctx))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Check logs
tc.AssertLogs(`
Expand All @@ -555,11 +555,11 @@ func TestVolume_Writer_AllocateSpace_NotSupported(t *testing.T) {

w, err := tc.NewWriter()
assert.NoError(t, err)
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Close writer
assert.NoError(t, w.Close(ctx))
assert.FileExists(t, w.FilePath())
assert.FileExists(t, tc.FilePath())

// Check logs
tc.AssertLogs(`
Expand All @@ -582,7 +582,7 @@ func TestVolume_Writer_AllocateSpace_Disabled(t *testing.T) {
assert.NoError(t, err)

// Check file - no allocation
allocated, err := diskalloc.Allocated(w.FilePath())
allocated, err := diskalloc.Allocated(tc.FilePath())
require.NoError(t, err)
assert.Less(t, allocated, datasize.KB)

Expand Down Expand Up @@ -645,6 +645,10 @@ func (tc *writerTestCase) NewWriter(opts ...Option) (writer.Writer, error) {
return w, nil
}

func (tc *writerTestCase) FilePath() string {
return filepath.Join(tc.VolumePath, tc.Slice.LocalStorage.Dir, tc.Slice.LocalStorage.Filename)
}

type testAllocator struct {
Ok bool
Error error
Expand Down
Loading

0 comments on commit 290836e

Please sign in to comment.