diff --git a/internal/pkg/service/stream/aggregation/repository/repository.go b/internal/pkg/service/stream/aggregation/repository/repository.go new file mode 100644 index 0000000000..4aa60e12e1 --- /dev/null +++ b/internal/pkg/service/stream/aggregation/repository/repository.go @@ -0,0 +1,198 @@ +package repository + +import ( + "context" + "slices" + "strings" + + etcd "go.etcd.io/etcd/client/v3" + "golang.org/x/exp/maps" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" + storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics" + statsRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/repository" +) + +type dependencies interface { + Logger() log.Logger + EtcdClient() *etcd.Client + DefinitionRepository() *definitionRepo.Repository + StatisticsRepository() *statsRepo.Repository + StorageRepository() *storageRepo.Repository +} + +type Repository struct { + logger log.Logger + client etcd.KV + definition *definitionRepo.Repository + statistics *statsRepo.Repository + storage *storageRepo.Repository +} + +type SourceWithSinks struct { + SourceKey key.SourceKey + Sinks []*SinkWithStatistics +} + +type SinkWithStatistics struct { + *definition.Sink + Statistics *SinkStatistics +} + +type SinkStatistics struct { + Total *statistics.Aggregated + Files []*FileWithStatistics +} + +type FileWithStatistics struct { + *model.File + Statistics *statistics.Aggregated +} + +func New(d dependencies) *Repository { + return &Repository{ + logger: d.Logger().WithComponent("aggregation.repository"), + client: d.EtcdClient(), + definition: d.DefinitionRepository(), + statistics: d.StatisticsRepository(), + storage: d.StorageRepository(), + } +} + +func (r *Repository) GetSourcesWithSinksAndStatistics(ctx context.Context, sourceKeys []key.SourceKey) ([]*SourceWithSinks, error) { + res, err := r.findSinksForSources(ctx, sourceKeys) + if err != nil { + return nil, err + } + + err = r.addStatisticsToAggregationResponse(ctx, res) + if err != nil { + return nil, err + } + + err = r.addFilesToAggregationResponse(ctx, res) + if err != nil { + return nil, err + } + + err = r.addFileStatisticsToAggregationResponse(ctx, res) + if err != nil { + return nil, err + } + + return maps.Values(res), nil +} + +func (r *Repository) findSinksForSources(ctx context.Context, sourceKeys []key.SourceKey) (map[key.SourceKey]*SourceWithSinks, error) { + res := make(map[key.SourceKey]*SourceWithSinks) + + txn := op.Txn(r.client) + for _, sourceKey := range sourceKeys { + txn.Merge( + r.definition.Sink().List(sourceKey).ForEach(func(value definition.Sink, header *iterator.Header) error { + source, ok := res[value.SourceKey] + if !ok { + source = &SourceWithSinks{ + SourceKey: value.SourceKey, + } + res[value.SourceKey] = source + } + source.Sinks = append(source.Sinks, &SinkWithStatistics{Sink: &value}) + return nil + }), + ) + } + + err := txn.Do(ctx).Err() + + return res, err +} + +func (r *Repository) addStatisticsToAggregationResponse(ctx context.Context, res map[key.SourceKey]*SourceWithSinks) error { + txn := op.Txn(r.client) + for sourceKey, source := range res { + for _, sink := range source.Sinks { + sinkKey := key.SinkKey{ + SourceKey: sourceKey, + SinkID: sink.SinkID, + } + + txn.Merge(r.statistics.AggregateIn(sinkKey).OnResult(func(result *op.TxnResult[statistics.Aggregated]) { + sink.Statistics = &SinkStatistics{ + Total: ptr.Ptr(result.Result()), + } + })) + } + } + + return txn.Do(ctx).Err() +} + +func (r *Repository) addFilesToAggregationResponse(ctx context.Context, res map[key.SourceKey]*SourceWithSinks) error { + txn := op.Txn(r.client) + for sourceKey, source := range res { + for _, sink := range source.Sinks { + sinkKey := key.SinkKey{ + SourceKey: sourceKey, + SinkID: sink.SinkID, + } + + txn.Merge(r.storage.File().ListRecentIn(sinkKey).ForEach(func(value model.File, header *iterator.Header) error { + sink.Statistics.Files = append(sink.Statistics.Files, &FileWithStatistics{ + File: ptr.Ptr(value), + }) + return nil + })) + } + } + + return txn.Do(ctx).Err() +} + +func (r *Repository) addFileStatisticsToAggregationResponse(ctx context.Context, res map[key.SourceKey]*SourceWithSinks) error { + txn := op.Txn(r.client) + for sourceKey, source := range res { + for _, sink := range source.Sinks { + if len(sink.Statistics.Files) == 0 { + continue + } + + sinkKey := key.SinkKey{ + SourceKey: sourceKey, + SinkID: sink.SinkID, + } + + filesMap := make(map[string]*FileWithStatistics) + for _, file := range sink.Statistics.Files { + filesMap[file.FileID.String()] = file + } + + // Sort keys lexicographically + keys := maps.Keys(filesMap) + slices.SortStableFunc(keys, strings.Compare) + + txn.Merge( + r.statistics.FilesStats( + sinkKey, + model.FileID{OpenedAt: utctime.MustParse(keys[0])}, + model.FileID{OpenedAt: utctime.MustParse(keys[len(keys)-1])}, + ).OnResult(func(result *op.TxnResult[map[model.FileID]*statistics.Aggregated]) { + for fileID, aggregated := range result.Result() { + filesMap[fileID.String()].Statistics = aggregated + } + }), + ) + } + } + + return txn.Do(ctx).Err() +} diff --git a/internal/pkg/service/stream/api/mapper/aggregation.go b/internal/pkg/service/stream/api/mapper/aggregation.go new file mode 100644 index 0000000000..38d4c59009 --- /dev/null +++ b/internal/pkg/service/stream/api/mapper/aggregation.go @@ -0,0 +1,109 @@ +package mapper + +import ( + "context" + + etcd "go.etcd.io/etcd/client/v3" + + svcerrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/aggregation/repository" + api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +func (m *Mapper) NewAggregationSourcesResponse( + ctx context.Context, + k key.BranchKey, + sinceId string, + limit int, + list func(...iterator.Option) iterator.DefinitionT[definition.Source], +) (*api.AggregationSourcesResult, error) { + sources, page, err := loadPage(ctx, sinceId, limit, etcd.SortAscend, list, m.NewAggregationSource) + if err != nil { + return nil, err + } + + return &api.AggregationSourcesResult{ + ProjectID: k.ProjectID, + BranchID: k.BranchID, + Page: page, + Sources: sources, + }, nil +} + +func (m *Mapper) NewAggregationSource(entity definition.Source) (*api.AggregationSource, error) { + out := &api.AggregationSource{ + ProjectID: entity.ProjectID, + BranchID: entity.BranchID, + SourceID: entity.SourceID, + Type: entity.Type, + Name: entity.Name, + Description: entity.Description, + Created: m.NewCreatedResponse(entity.Created), + Version: m.NewVersionResponse(entity.Version), + Deleted: m.NewDeletedResponse(entity.SoftDeletable), + Disabled: m.NewDisabledResponse(entity.Switchable), + } + + // Type + switch out.Type { + case definition.SourceTypeHTTP: + out.HTTP = &api.HTTPSource{ + URL: m.formatHTTPSourceURL(entity), + } + default: + return nil, svcerrors.NewBadRequestError(errors.Errorf(`unexpected "type" "%s"`, out.Type.String())) + } + + return out, nil +} + +func (m *Mapper) NewAggregationSinkResponse(entity repository.SinkWithStatistics) (*api.AggregationSink, error) { + out := &api.AggregationSink{ + ProjectID: entity.ProjectID, + BranchID: entity.BranchID, + SourceID: entity.SourceID, + SinkID: entity.SinkID, + Name: entity.Name, + Description: entity.Description, + Created: m.NewCreatedResponse(entity.Created), + Version: m.NewVersionResponse(entity.Version), + Deleted: m.NewDeletedResponse(entity.SoftDeletable), + Disabled: m.NewDisabledResponse(entity.Switchable), + } + + if entity.Statistics.Total != nil { + totals := m.NewSinkStatisticsTotalResponse(*entity.Statistics.Total) + files := api.SinkFiles{} + for _, file := range entity.Statistics.Files { + sinkFile := m.NewSinkFile(*file.File) + if file.Statistics != nil { + sinkFile.Statistics = m.NewSinkFileStatistics(file.Statistics) + } + files = append(files, sinkFile) + } + out.Statistics = &api.AggregationStatistics{ + Total: totals.Total, + Levels: totals.Levels, + Files: files, + } + } + + // Type + out.Type = entity.Type + switch out.Type { + case definition.SinkTypeTable: + tableResponse, err := m.newTableSinkResponse(entity.Table) + if err != nil { + return nil, err + } + out.Table = &tableResponse + default: + return nil, svcerrors.NewBadRequestError(errors.Errorf(`unexpected "type" "%s"`, out.Type.String())) + } + + return out, nil +} diff --git a/internal/pkg/service/stream/api/service/aggregation.go b/internal/pkg/service/stream/api/service/aggregation.go index 214c74356a..b8530bcbd0 100644 --- a/internal/pkg/service/stream/api/service/aggregation.go +++ b/internal/pkg/service/stream/api/service/aggregation.go @@ -3,11 +3,59 @@ package service import ( "context" - "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "golang.org/x/exp/maps" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" ) func (s *service) AggregateSources(ctx context.Context, d dependencies.BranchRequestScope, payload *stream.AggregateSourcesPayload) (res *stream.AggregationSourcesResult, err error) { - return nil, errors.NewNotImplementedError() + list := func(opts ...iterator.Option) iterator.DefinitionT[definition.Source] { + return s.definition.Source().List(d.BranchKey(), opts...) + } + + response, err := s.mapper.NewAggregationSourcesResponse(ctx, d.BranchKey(), payload.SinceID, payload.Limit, list) + if err != nil { + return nil, err + } + + err = s.addSinksToAggregationResponse(ctx, d, response) + if err != nil { + return nil, err + } + + return response, err +} + +func (s *service) addSinksToAggregationResponse(ctx context.Context, d dependencies.BranchRequestScope, response *stream.AggregationSourcesResult) error { + sourceKeys := make(map[key.SourceKey]int) + for i, source := range response.Sources { + sourceKey := key.SourceKey{ + BranchKey: d.BranchKey(), + SourceID: source.SourceID, + } + sourceKeys[sourceKey] = i + } + + sourcesWithSinks, err := d.AggregationRepository().GetSourcesWithSinksAndStatistics(ctx, maps.Keys(sourceKeys)) + if err != nil { + return err + } + + for _, sourceWithSinks := range sourcesWithSinks { + for _, sink := range sourceWithSinks.Sinks { + sink, err := s.mapper.NewAggregationSinkResponse(*sink) + if err != nil { + return err + } + + source := response.Sources[sourceKeys[sourceWithSinks.SourceKey]] + source.Sinks = append(source.Sinks, sink) + } + } + + return nil } diff --git a/internal/pkg/service/stream/dependencies/dependencies.go b/internal/pkg/service/stream/dependencies/dependencies.go index b0407cbb19..ddc2c58593 100644 --- a/internal/pkg/service/stream/dependencies/dependencies.go +++ b/internal/pkg/service/stream/dependencies/dependencies.go @@ -36,6 +36,7 @@ import ( "net/url" "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" + aggregationRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/aggregation/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/config" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" @@ -67,6 +68,7 @@ type ServiceScope interface { DefinitionRepository() *definitionRepo.Repository StorageRepository() *storageRepo.Repository StatisticsRepository() *statsRepo.Repository + AggregationRepository() *aggregationRepo.Repository KeboolaSinkBridge() *keboolaSinkBridge.Bridge } diff --git a/internal/pkg/service/stream/dependencies/service.go b/internal/pkg/service/stream/dependencies/service.go index 359bc74c6d..62c84cf1ae 100644 --- a/internal/pkg/service/stream/dependencies/service.go +++ b/internal/pkg/service/stream/dependencies/service.go @@ -11,6 +11,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/common/httpclient" "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" + aggregationRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/aggregation/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/config" definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/keboolasink/bridge" @@ -32,6 +33,7 @@ type serviceScope struct { definitionRepository *definitionRepo.Repository storageRepository *storageRepo.Repository storageStatisticsRepository *statsRepo.Repository + aggregationRepository *aggregationRepo.Repository keboolaBridge *keboolaSinkBridge.Bridge } @@ -139,6 +141,8 @@ func newServiceScope(parentScp parentScopes, cfg config.Config, storageBackoff m d.storageStatisticsRepository = statsRepo.New(d) + d.aggregationRepository = aggregationRepo.New(d) + return d, nil } @@ -161,3 +165,7 @@ func (v *serviceScope) StorageRepository() *storageRepo.Repository { func (v *serviceScope) StatisticsRepository() *statsRepo.Repository { return v.storageStatisticsRepository } + +func (v *serviceScope) AggregationRepository() *aggregationRepo.Repository { + return v.aggregationRepository +}