Skip to content

Commit

Permalink
tests: Fix writer test, use statistics/collector
Browse files Browse the repository at this point in the history
  • Loading branch information
michaljurecko committed Jul 3, 2024
1 parent 2e7e553 commit 14442ef
Showing 1 changed file with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/c2h5oh/datasize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table/column"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/compression"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/disksync"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer"
writerVolume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer/volume"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/collector"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/utils/testhelper"
"github.com/keboola/keboola-as-code/internal/pkg/validator"
)

Expand Down Expand Up @@ -53,17 +52,19 @@ func (tc *WriterTestCase) Run(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Setup logger
logger := log.NewDebugLogger()
logger.ConnectTo(testhelper.VerboseStdout())
d, mock := dependencies.NewMockedLocalStorageScope(t)
cfg := mock.TestConfig()

// Start statistics collector
events := writer.NewEvents()
collector.Start(d, events, cfg.Storage.Statistics.Collector, cfg.NodeID)

// Open volume
opts := []writerVolume.Option{writerVolume.WithWatchDrainFile(false)}
clk := clock.New()
now := clk.Now()
now := d.Clock().Now()
volPath := t.TempDir()
spec := volume.Spec{NodeID: "my-node", Path: volPath, Type: "hdd", Label: "1"}
vol, err := writerVolume.Open(ctx, logger, clk, writer.NewEvents(), writer.NewConfig(), spec, opts...)
vol, err := writerVolume.Open(ctx, d.Logger(), d.Clock(), events, cfg.Storage.Level.Local.Writer, spec, opts...)
require.NoError(t, err)

// Create a test slice
Expand Down Expand Up @@ -122,17 +123,13 @@ func (tc *WriterTestCase) Run(t *testing.T) {

// Close the writer
require.NoError(t, w.Close(ctx))
assert.Equal(t, uint64(rowsCount), w.CompletedWrites())
assert.NotEmpty(t, w.CompressedSize())
assert.NotEmpty(t, w.UncompressedSize())

// Close volume
assert.NoError(t, vol.Close(ctx))

// Check compressed size
stat, err := os.Stat(filePath)
// Get file size
fileStat, err := os.Stat(filePath)
require.NoError(t, err)
assert.Equal(t, int64(w.CompressedSize().Bytes()), stat.Size(), "compressed file size doesn't match")

// Open file
f, err := os.OpenFile(filePath, os.O_RDONLY, 0o640)
Expand All @@ -147,13 +144,20 @@ func (tc *WriterTestCase) Run(t *testing.T) {
// Read file content
content, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, int(w.UncompressedSize().Bytes()), len(content), "uncompressed file size doesn't match")

// Check written data
tc.Validator(t, string(content))

// Close file
assert.NoError(t, f.Close())

// Check statistics
sliceStats, err := d.StatisticsRepository().SliceStats(ctx, slice.SliceKey)
if assert.NoError(t, err) {
assert.Equal(t, int(sliceStats.Total.RecordsCount), rowsCount, "records count doesn't match")
assert.Equal(t, int64(sliceStats.Total.CompressedSize.Bytes()), fileStat.Size(), "compressed file size doesn't match")
assert.Equal(t, int(sliceStats.Total.UncompressedSize.Bytes()), len(content), "uncompressed file size doesn't match")
}
}

func (tc *WriterTestCase) newSlice(t *testing.T, volume *writerVolume.Volume) *model.Slice {
Expand Down

0 comments on commit 14442ef

Please sign in to comment.