Skip to content

Commit

Permalink
Merge pull request #1875 from keboola/michaljurecko-PSGO-591-stats-no…
Browse files Browse the repository at this point in the history
…de-id

feat: Difference slice statistics by the source node ID
  • Loading branch information
michaljurecko authored Jul 3, 2024
2 parents d405975 + fdf003f commit e88ab0c
Show file tree
Hide file tree
Showing 45 changed files with 1,024 additions and 486 deletions.
7 changes: 7 additions & 0 deletions internal/pkg/service/common/utctime/utctime.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (v UTCTime) After(target UTCTime) bool {
}

func (v UTCTime) MarshalJSON() ([]byte, error) {
if v.IsZero() {
return jsonLib.Marshal("")
}
return jsonLib.Marshal(v.String())
}

Expand All @@ -48,6 +51,10 @@ func (v *UTCTime) UnmarshalJSON(b []byte) error {
if err := jsonLib.Unmarshal(b, &str); err != nil {
return err
}
if str == "" {
*v = UTCTime{}
return nil
}
out, err := time.Parse(TimeFormat, str)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
➡️ GET "storage/keboola/secret/token/123/456/my-source/my-sink"
✔️ GET "storage/keboola/secret/token/123/456/my-source/my-sink" | count: 0

// READ: statistics to calculate pre-allocated space for new slices
// READ: sink slices and their statistics to calculate pre-allocated disk space - there is no previous slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Start(ctx context.Context, d dependencies, cfg config.Config) error {

// Setup statistics collector
syncCfg := cfg.Storage.Statistics.Collector
collector.Start(d, volumes.Events(), syncCfg)
collector.Start(d, volumes.Events(), syncCfg, cfg.NodeID)

return nil
}
11 changes: 4 additions & 7 deletions internal/pkg/service/stream/storage/quota/quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ func TestQuota_Check(t *testing.T) {
quoteChecker := quota.New(d)
updateStats := func(sliceKey model.SliceKey, size datasize.ByteSize) {
header := etcdhelper.ExpectModificationInPrefix(t, client, "storage/stats/", func() {
require.NoError(t, repo.Put(ctx, []statistics.PerSlice{
require.NoError(t, repo.Put(ctx, "test-node", []statistics.PerSlice{
{
SliceKey: sliceKey,
Value: statistics.Value{
SlicesCount: 1,
RecordsCount: 123,
CompressedSize: size,
},
SliceKey: sliceKey,
RecordsCount: 123,
CompressedSize: size,
},
}))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
001 ➡️ GET ["definition/sink/active/123/456/my-source/", "definition/sink/active/123/456/my-source0")
✔️ TXN | succeeded: true

// READ - get statistics of the sink1 - to calculate pre-allocated disk space
// READ - get previous slices from the sink 1 and their statistics to calculate pre-allocated disk space - there is no slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-1/", "storage/stats/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-1/", "storage/stats/target/123/456/my-source/my-sink-10")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-1/", "storage/slice/level/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-1/", "storage/slice/level/target/123/456/my-source/my-sink-10")
✔️ TXN | succeeded: true

// READ - get statistics of the sink2 - to calculate pre-allocated disk space
// READ - get previous slices from the sink 2 and their statistics to calculate pre-allocated disk space - there is no slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-2/", "storage/stats/staging/123/456/my-source/my-sink-20")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-2/", "storage/stats/target/123/456/my-source/my-sink-20")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-2/", "storage/slice/level/staging/123/456/my-source/my-sink-20")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-2/", "storage/slice/level/target/123/456/my-source/my-sink-20")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
001 ➡️ GET ["storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/", "storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z0")
✔️ TXN | succeeded: true

// READ - statistics for aggregation
// READ - previous slices for statistics for aggregation - there is no previous slice in the sink
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// READ - get statistics to be moved
➡️ TXN
➡️ THEN:
001 ➡️ GET "storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/value"
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/", "storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z0")

✔️ TXN | succeeded: true

Expand All @@ -29,7 +29,7 @@
005 ["storage/slice/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/", "storage/slice/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z0") MOD GREATER 0
006 ["storage/slice/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/", "storage/slice/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z0") MOD LESS %d
007 "storage/slice/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z" MOD GREATER 0
008 "storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/value" MOD EQUAL 0
008 ["storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/", "storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z0") MOD EQUAL 0
➡️ THEN:
// Mark file and slice as imported, move stats
001 ➡️ PUT "storage/file/all/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
001 ➡️ GET ["definition/sink/active/123/456/my-source/", "definition/sink/active/123/456/my-source0")
✔️ TXN | succeeded: true

// READ - sinks from the source
// READ - get previous slices in the sink and their statistics to calculate pre-allocated disk space
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// READ - get statistics of the sink - to calculate pre-allocated disk space
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
002 ➡️ GET ["storage/slice/level/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/", "storage/slice/level/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-10")
✔️ TXN | succeeded: true

// READ - statistics to be moved
// READ - get previous slice and their statistics to calculate size of the new slice - there are no previous slices
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-1/", "storage/stats/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-1/", "storage/stats/target/123/456/my-source/my-sink-10")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-1/", "storage/slice/level/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-1/", "storage/slice/level/target/123/456/my-source/my-sink-10")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// READ - statistics - to be moved
➡️ TXN
➡️ THEN:
001 ➡️ GET "storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/value"
001 ➡️ GET ["storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/", "storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z0")
✔️ TXN | succeeded: true

// WRITE
Expand All @@ -20,7 +20,7 @@
003 "storage/file/all/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z" MOD LESS %d
004 "storage/slice/all/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z" MOD GREATER 0
005 "storage/slice/all/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z" MOD LESS %d
006 "storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/value" MOD EQUAL 0
006 ["storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/", "storage/stats/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z0") MOD EQUAL 0
➡️ THEN:
// Mark the slices as uploaded, move it to the staging level
001 ➡️ PUT "storage/slice/all/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
)

// ListIn lists slices in the parent, in all storage levels.
func (r *Repository) ListIn(parentKey fmt.Stringer) iterator.DefinitionT[model.Slice] {
return r.schema.AllLevels().InObject(parentKey).GetAll(r.client)
func (r *Repository) ListIn(parentKey fmt.Stringer, opts ...iterator.Option) iterator.DefinitionT[model.Slice] {
return r.schema.AllLevels().InObject(parentKey).GetAll(r.client, opts...)
}

// ListInLevel lists slices in the specified storage level.
func (r *Repository) ListInLevel(parentKey fmt.Stringer, level model.Level) iterator.DefinitionT[model.Slice] {
return r.schema.InLevel(level).InObject(parentKey).GetAll(r.client)
func (r *Repository) ListInLevel(parentKey fmt.Stringer, level model.Level, opts ...iterator.Option) iterator.DefinitionT[model.Slice] {
return r.schema.InLevel(level).InObject(parentKey).GetAll(r.client, opts...)
}

// ListInState lists slices in the specified state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Collector struct {
logger log.Logger
repository *repository.Repository
config statistics.SyncConfig
nodeID string
wg *sync.WaitGroup

syncLock *sync.Mutex
Expand All @@ -38,8 +39,10 @@ type WriterEvents interface {
// writerSnapshot contains collected statistics from a writer.Writer.
// It is used to determine whether the statistics have changed and should be saved to the database or not.
type writerSnapshot struct {
stats statistics.PerSlice
writer writer.Writer
writer writer.Writer
sliceKey model.SliceKey
initialValue statistics.Value
value statistics.Value
}

type dependencies interface {
Expand All @@ -49,11 +52,12 @@ type dependencies interface {
StatisticsRepository() *repository.Repository
}

func Start(d dependencies, events WriterEvents, config statistics.SyncConfig) {
func Start(d dependencies, events WriterEvents, config statistics.SyncConfig, nodeID string) {
c := &Collector{
logger: d.Logger().WithComponent("statistics.collector"),
repository: d.StatisticsRepository(),
config: config,
nodeID: nodeID,
wg: &sync.WaitGroup{},
syncLock: &sync.Mutex{},
writersLock: &sync.Mutex{},
Expand All @@ -74,13 +78,17 @@ func Start(d dependencies, events WriterEvents, config statistics.SyncConfig) {
// Register the writer for the periodical sync, see bellow
k := w.SliceKey()

initialValue, err := c.repository.OpenSlice(k, c.nodeID).Do(ctx).ResultOrErr()
if err != nil {
return err
}

c.writersLock.Lock()
c.writers[k] = &writerSnapshot{
writer: w,
stats: statistics.PerSlice{
SliceKey: k,
Value: statistics.Value{SlicesCount: 1},
},
writer: w,
sliceKey: k,
initialValue: initialValue,
value: statistics.Value{},
}
c.writersLock.Unlock()

Expand Down Expand Up @@ -140,16 +148,30 @@ func (c *Collector) sync(filter *model.SliceKey) error {
if filter == nil {
// Collect all writers
for _, s := range c.writers {
if changed := c.collect(s.writer, &s.stats); changed {
forSync = append(forSync, s.stats)
if changed := c.collect(s.writer, &s.value); changed {
value := s.initialValue.Add(s.value)
forSync = append(forSync, statistics.PerSlice{
SliceKey: s.sliceKey,
FirstRecordAt: value.FirstRecordAt,
LastRecordAt: value.LastRecordAt,
RecordsCount: value.RecordsCount,
UncompressedSize: value.UncompressedSize,
CompressedSize: value.CompressedSize,
})
}
}
} else {
} else if s, found := c.writers[*filter]; found {
// Collect one writer
if s, found := c.writers[*filter]; found {
if changed := c.collect(s.writer, &s.stats); changed {
forSync = append(forSync, s.stats)
}
if changed := c.collect(s.writer, &s.value); changed {
value := s.initialValue.Add(s.value)
forSync = append(forSync, statistics.PerSlice{
SliceKey: s.sliceKey,
FirstRecordAt: value.FirstRecordAt,
LastRecordAt: value.LastRecordAt,
RecordsCount: value.RecordsCount,
UncompressedSize: value.UncompressedSize,
CompressedSize: value.CompressedSize,
})
}
}
c.writersLock.Unlock()
Expand All @@ -161,7 +183,7 @@ func (c *Collector) sync(filter *model.SliceKey) error {

// Update values in the database
if len(forSync) > 0 {
if err := c.repository.Put(ctx, forSync); err != nil {
if err := c.repository.Put(ctx, c.nodeID, forSync); err != nil {
err = errors.Errorf("cannot save the storage statistics to the database: %w", err)
c.logger.Error(ctx, err.Error())
return err
Expand All @@ -173,7 +195,7 @@ func (c *Collector) sync(filter *model.SliceKey) error {
}

// collect statistics from the writer to the PerSlice struct.
func (c *Collector) collect(w writer.Writer, out *statistics.PerSlice) (changed bool) {
func (c *Collector) collect(w writer.Writer, out *statistics.Value) (changed bool) {
// Get values
firstRowAt := w.FirstRecordAt()
lastRowAt := w.LastRecordAt()
Expand All @@ -182,18 +204,18 @@ func (c *Collector) collect(w writer.Writer, out *statistics.PerSlice) (changed
uncompressedSize := w.UncompressedSize()

// Are statistics changed?
changed = out.Value.FirstRecordAt != firstRowAt ||
out.Value.LastRecordAt != lastRowAt ||
out.Value.RecordsCount != rowsCount ||
out.Value.CompressedSize != compressedSize ||
out.Value.UncompressedSize != uncompressedSize
changed = out.FirstRecordAt != firstRowAt ||
out.LastRecordAt != lastRowAt ||
out.RecordsCount != rowsCount ||
out.CompressedSize != compressedSize ||
out.UncompressedSize != uncompressedSize

// Update values
out.Value.FirstRecordAt = firstRowAt
out.Value.LastRecordAt = lastRowAt
out.Value.RecordsCount = rowsCount
out.Value.CompressedSize = compressedSize
out.Value.UncompressedSize = uncompressedSize
out.FirstRecordAt = firstRowAt
out.LastRecordAt = lastRowAt
out.RecordsCount = rowsCount
out.CompressedSize = compressedSize
out.UncompressedSize = uncompressedSize

return changed
}
Loading

0 comments on commit e88ab0c

Please sign in to comment.