Skip to content

Commit

Permalink
fix: estimateSliceSizeOnSliceOpen
Browse files Browse the repository at this point in the history
  • Loading branch information
michaljurecko committed Jul 3, 2024
1 parent 9f4cd6a commit fdf003f
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
➡️ GET "storage/keboola/secret/token/123/456/my-source/my-sink"
✔️ GET "storage/keboola/secret/token/123/456/my-source/my-sink" | count: 0

// READ: statistics to calculate pre-allocated space for new slices
// READ: sink slices and their statistics to calculate pre-allocated disk space - there is no previous slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
001 ➡️ GET ["definition/sink/active/123/456/my-source/", "definition/sink/active/123/456/my-source0")
✔️ TXN | succeeded: true

// READ - get statistics of the sink1 - to calculate pre-allocated disk space
// READ - get previous slices from the sink 1 and their statistics to calculate pre-allocated disk space - there is no slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-1/", "storage/stats/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-1/", "storage/stats/target/123/456/my-source/my-sink-10")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-1/", "storage/slice/level/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-1/", "storage/slice/level/target/123/456/my-source/my-sink-10")
✔️ TXN | succeeded: true

// READ - get statistics of the sink2 - to calculate pre-allocated disk space
// READ - get previous slices from the sink 2 and their statistics to calculate pre-allocated disk space - there is no slice
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-2/", "storage/stats/staging/123/456/my-source/my-sink-20")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-2/", "storage/stats/target/123/456/my-source/my-sink-20")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-2/", "storage/slice/level/staging/123/456/my-source/my-sink-20")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-2/", "storage/slice/level/target/123/456/my-source/my-sink-20")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
001 ➡️ GET ["storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/", "storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z0")
✔️ TXN | succeeded: true

// READ - statistics for aggregation
// READ - previous slices for statistics for aggregation - there is no previous slice in the sink
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
001 ➡️ GET ["definition/sink/active/123/456/my-source/", "definition/sink/active/123/456/my-source0")
✔️ TXN | succeeded: true

// READ - sinks from the source
// READ - get previous slices in the sink and their statistics to calculate pre-allocated disk space
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// READ - get statistics of the sink - to calculate pre-allocated disk space
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
002 ➡️ GET ["storage/slice/level/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-1/", "storage/slice/level/local/123/456/my-source/my-sink-1/2000-01-01T01:00:00.000Z/my-volume-10")
✔️ TXN | succeeded: true

// READ - statistics to be moved
// READ - get previous slice and their statistics to calculate size of the new slice - there are no previous slices
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink-1/", "storage/stats/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink-1/", "storage/stats/target/123/456/my-source/my-sink-10")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink-1/", "storage/slice/level/staging/123/456/my-source/my-sink-10")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink-1/", "storage/slice/level/target/123/456/my-source/my-sink-10")
✔️ TXN | succeeded: true

// WRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
001 ➡️ GET ["storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T03:00:00.000Z/", "storage/slice/level/local/123/456/my-source/my-sink/2000-01-01T03:00:00.000Z0")
✔️ TXN | succeeded: true

// READ: to calculate pre-allocated space for new slices, statistics of previous slices are loaded.
// READ: to calculate pre-allocated space for new slices, previous slices and their statistics are loaded.
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/", "storage/stats/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/", "storage/stats/target/123/456/my-source/my-sink0")
001 ➡️ GET ["storage/slice/level/staging/123/456/my-source/my-sink/", "storage/slice/level/staging/123/456/my-source/my-sink0")
002 ➡️ GET ["storage/slice/level/target/123/456/my-source/my-sink/", "storage/slice/level/target/123/456/my-source/my-sink0")
✔️ TXN | succeeded: true
➡️ TXN
➡️ THEN:
001 ➡️ GET ["storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z/", "storage/stats/staging/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z/my-volume-1/2000-01-01T01:00:00.000Z0")
002 ➡️ GET ["storage/stats/target/123/456/my-source/my-sink/2000-01-01T02:00:00.000Z/my-volume-1/2000-01-01T02:00:00.000Z/", "storage/stats/target/123/456/my-source/my-sink/2000-01-01T02:00:00.000Z/my-volume-1/2000-01-01T02:00:00.000Z0")
✔️ TXN | succeeded: true

➡️ TXN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repository
import (
"context"
"fmt"
"slices"
"time"

"github.com/c2h5oh/datasize"
Expand All @@ -27,41 +28,62 @@ func (r *Repository) estimateSliceSizeOnSliceOpen() {
// We need to update the value to the Slice entity before saving,
// before the callback is completed, because later the entity value is already stored
// in the WRITE phase transaction and cannot be modified.
if err := r.estimateSliceSize(file, slice).Do(ctx).Err(); err != nil {
if err := r.estimateSliceSize(ctx, file, slice); err != nil {
// Error is not fatal
r.logger.Errorf(ctx, `cannot calculate slice pre-allocated size: %s`, err)
}
return nil
})
}

func (r *Repository) estimateSliceSize(file model.File, slice *model.Slice) *op.TxnOp[datasize.ByteSize] {
return r.
maxUsedDiskSizeBySliceIn(slice.SinkKey, recordsForSliceDiskSizeCalc).
OnResult(func(r *op.TxnResult[datasize.ByteSize]) {
slice.LocalStorage.AllocatedDiskSpace = file.LocalStorage.DiskAllocation.ForNextSlice(r.Result())
})
func (r *Repository) estimateSliceSize(ctx context.Context, file model.File, slice *model.Slice) error {
// Get maximum slice size
size, err := r.maxUsedDiskSizeBySliceIn(ctx, slice.SinkKey, recordsForSliceDiskSizeCalc)
if err != nil {
return err
}

// Calculate allocated disk space for the new slice
slice.LocalStorage.AllocatedDiskSpace = file.LocalStorage.DiskAllocation.ForNextSlice(size)
return nil
}

// maxUsedDiskSizeBySliceIn scans the statistics in the parentKey, scanned are:
// - The last <limit> slices in level.LevelStaging (uploaded slices).
// - The last <limit> slices in level.LevelTarget (imported slices).
func (r *Repository) maxUsedDiskSizeBySliceIn(parentKey fmt.Stringer, limit int) *op.TxnOp[datasize.ByteSize] {
func (r *Repository) maxUsedDiskSizeBySliceIn(ctx context.Context, parentKey fmt.Stringer, limit int) (datasize.ByteSize, error) {
// Get last <limit> slices from staging and target levels
var lastStagingSlices, lastTargetSlices []model.Slice
listSlicesOpts := []iterator.Option{iterator.WithSort(etcd.SortDescend), iterator.WithLimit(limit)}
err := op.Txn(r.client).
Then(r.storage.Slice().ListInLevel(parentKey, model.LevelStaging, listSlicesOpts...).WithAllTo(&lastStagingSlices)).
Then(r.storage.Slice().ListInLevel(parentKey, model.LevelTarget, listSlicesOpts...).WithAllTo(&lastTargetSlices)).
Do(ctx).
Err()
if err != nil {
return 0, err
}

// Load and process statistics for each slice from the previous step
var maxSize datasize.ByteSize
txn := op.TxnWithResult(r.client, &maxSize)
for _, l := range []model.Level{model.LevelStaging, model.LevelTarget} {
// Get maximum
txn.Then(
r.schema.
InLevel(l).InObject(parentKey).
GetAll(r.client, iterator.WithLimit(limit), iterator.WithSort(etcd.SortDescend)).
ForEach(func(v statistics.Value, header *iterator.Header) error {
// Ignore sums
if v.SlicesCount == 1 && v.CompressedSize > maxSize {
maxSize = v.CompressedSize
}
return nil
}))
txn := op.Txn(r.client)
for _, slice := range slices.Concat(lastStagingSlices, lastTargetSlices) {
txn.Merge(
r.AggregateInLevel(slice.SliceKey, slice.State.Level()).OnSucceeded(func(r *op.TxnResult[statistics.Aggregated]) {
if s := r.Result().Total.CompressedSize; s > maxSize {
maxSize = s
}
}),
)
}
return txn

if txn.Empty() {
return 0, nil
}

if err := txn.Do(ctx).Err(); err != nil {
return 0, err
}

return maxSize, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ func (r *Repository) AggregateIn(objectKey fmt.Stringer) *op.TxnOp[statistics.Ag
return txn
}

func (r *Repository) AggregateInLevel(objectKey fmt.Stringer, level model.Level) *op.TxnOp[statistics.Aggregated] {
var result statistics.Aggregated
return op.TxnWithResult(r.client, &result).
Then(r.schema.
InLevel(level).
InObject(objectKey).
GetAll(r.client).
ForEach(func(v statistics.Value, header *iterator.Header) error {
aggregate.Aggregate(level, v, &result)
return nil
}),
)
}

// aggregate statistics from the database.
func (r *Repository) aggregate(ctx context.Context, objectKey fmt.Stringer) (out statistics.Aggregated, err error) {
txn := r.AggregateIn(objectKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/repository"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
)

Expand All @@ -55,6 +56,7 @@ type Repository struct {
telemetry telemetry.Telemetry
client *etcd.Client
plugins *plugin.Plugins
storage *storageRepo.Repository
schema schema
}

Expand All @@ -64,6 +66,7 @@ type dependencies interface {
EtcdClient() *etcd.Client
EtcdSerde() *serde.Serde
Plugins() *plugin.Plugins
StorageRepository() *storageRepo.Repository
}

func New(d dependencies) *Repository {
Expand All @@ -72,6 +75,7 @@ func New(d dependencies) *Repository {
telemetry: d.Telemetry(),
client: d.EtcdClient(),
plugins: d.Plugins(),
storage: d.StorageRepository(),
schema: newSchema(d.EtcdSerde()),
}

Expand Down

0 comments on commit fdf003f

Please sign in to comment.