Skip to content

Commit

Permalink
refactor: Simplify SinkStatisticsFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Jun 6, 2024
1 parent 5a591f0 commit 17bc458
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 73 deletions.
39 changes: 30 additions & 9 deletions internal/pkg/service/stream/api/mapper/sink.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package mapper

import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics"
)

func (m *Mapper) NewSinkStatisticsTotalResponse(result statistics.Aggregated) (res *stream.SinkStatisticsTotalResult) {
func (m *Mapper) NewSinkStatisticsTotalResponse(result statistics.Aggregated) *stream.SinkStatisticsTotalResult {
return &stream.SinkStatisticsTotalResult{
Total: mapValueToLevel(result.Total),
Levels: &stream.Levels{
Expand All @@ -17,25 +19,44 @@ func (m *Mapper) NewSinkStatisticsTotalResponse(result statistics.Aggregated) (r
}
}

func (m *Mapper) NewSinkFile(file model.File) *stream.SinkFile {
return &stream.SinkFile{
State: file.State,
OpenedAt: file.OpenedAt().String(),
ClosingAt: timeToString(file.ClosingAt),
ImportingAt: timeToString(file.ImportingAt),
ImportedAt: timeToString(file.ImportedAt),
}
}

func (m *Mapper) NewSinkFileStatistics(result *statistics.Aggregated) *stream.SinkFileStatistics {
return &stream.SinkFileStatistics{
Total: mapValueToLevel(result.Total),
Levels: &stream.Levels{
Local: mapValueToLevel(result.Local),
Staging: mapValueToLevel(result.Staging),
Target: mapValueToLevel(result.Target),
},
}
}

func mapValueToLevel(value statistics.Value) *stream.Level {
if value.RecordsCount == 0 {
return nil
}

return &stream.Level{
FirstRecordAt: timeValueToPointer(value.FirstRecordAt),
LastRecordAt: timeValueToPointer(value.LastRecordAt),
FirstRecordAt: timeToString(&value.FirstRecordAt),
LastRecordAt: timeToString(&value.LastRecordAt),
RecordsCount: value.RecordsCount,
UncompressedSize: uint64(value.UncompressedSize),
}
}

func timeValueToPointer(time utctime.UTCTime) *string {
var result *string
if !time.IsZero() {
value := time.String()
result = &value
func timeToString(time *utctime.UTCTime) *string {
if time == nil || time.IsZero() {
return nil
}

return result
return ptr.Ptr(time.String())
}
73 changes: 9 additions & 64 deletions internal/pkg/service/stream/api/service/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ import (

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/task"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics"
)

//nolint:dupl // CreateSource method is similar
Expand Down Expand Up @@ -235,64 +232,27 @@ func (s *service) SinkStatisticsFiles(ctx context.Context, d dependencies.SinkRe
filesMap := make(map[model.FileID]*stream.SinkFile)

err = d.StorageRepository().File().ListRecentIn(d.SinkKey()).Do(ctx).ForEachValue(func(value model.File, header *iterator.Header) error {
out := &stream.SinkFile{
State: value.State,
OpenedAt: value.OpenedAt().String(),
ClosingAt: timeToString(value.ClosingAt),
ImportingAt: timeToString(value.ImportingAt),
ImportedAt: timeToString(value.ImportedAt),
Statistics: &stream.SinkFileStatistics{
Levels: &stream.Levels{
Local: &stream.Level{},
Staging: &stream.Level{},
Target: &stream.Level{},
},
Total: &stream.Level{},
},
}

if value.RetryAttempt > 0 {
out.RetryAttempt = ptr.Ptr(value.RetryAttempt)
out.RetryReason = ptr.Ptr(value.RetryReason)
out.RetryAfter = ptr.Ptr(value.RetryAfter.String())
}

filesMap[value.FileID] = out
filesMap[value.FileID] = s.mapper.NewSinkFile(value)
return nil
})
if err != nil {
return nil, err
}

// Sort keys lexicographically
keys := maps.Keys(filesMap)
slices.SortStableFunc(keys, func(a, b model.FileID) int {
return strings.Compare(a.String(), b.String())
})
if len(filesMap) > 0 {
// Sort keys lexicographically
keys := maps.Keys(filesMap)
slices.SortStableFunc(keys, func(a, b model.FileID) int {
return strings.Compare(a.String(), b.String())
})

if len(keys) > 0 {
statisticsMap, err := d.StatisticsRepository().FilesStats(d.SinkKey(), keys[0], keys[len(keys)-1]).Do(ctx).ResultOrErr()
if err != nil {
return nil, err
}

for k, aggregated := range statisticsMap {
assignStatistics(filesMap[k].Statistics.Total, aggregated.Total)
assignStatistics(filesMap[k].Statistics.Levels.Local, aggregated.Local)
assignStatistics(filesMap[k].Statistics.Levels.Staging, aggregated.Staging)
assignStatistics(filesMap[k].Statistics.Levels.Target, aggregated.Target)
}

for _, file := range filesMap {
if file.Statistics.Levels.Local.RecordsCount == 0 {
file.Statistics.Levels.Local = nil
}
if file.Statistics.Levels.Staging.RecordsCount == 0 {
file.Statistics.Levels.Staging = nil
}
if file.Statistics.Levels.Target.RecordsCount == 0 {
file.Statistics.Levels.Target = nil
}
for key, aggregated := range statisticsMap {
filesMap[key].Statistics = s.mapper.NewSinkFileStatistics(aggregated)
}
}

Expand All @@ -312,18 +272,3 @@ func (s *service) sinkMustNotExist(ctx context.Context, k key.SinkKey) error {
func (s *service) sinkMustExists(ctx context.Context, k key.SinkKey) error {
return s.definition.Sink().ExistsOrErr(k).Do(ctx).Err()
}

func assignStatistics(levelStatistics *stream.Level, levelValue statistics.Value) {
levelStatistics.FirstRecordAt = timeToString(&levelValue.FirstRecordAt)
levelStatistics.LastRecordAt = timeToString(&levelValue.LastRecordAt)
levelStatistics.RecordsCount = levelValue.RecordsCount
levelStatistics.UncompressedSize = uint64(levelValue.UncompressedSize)
}

func timeToString(time *utctime.UTCTime) *string {
if time == nil || time.IsZero() {
return nil
}

return ptr.Ptr(time.String())
}

0 comments on commit 17bc458

Please sign in to comment.