Skip to content

Commit

Permalink
fix: update asynchronous process in deletion assets
Browse files Browse the repository at this point in the history
  • Loading branch information
Muhammad Luthfi Fahlevi committed Aug 9, 2024
1 parent 41b32cb commit 3ef8559
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"log"

Check failure on line 10 in core/asset/service.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gci`-ed with --skip-generated -s standard -s default (gci)

Check failure on line 10 in core/asset/service.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
"sync"
"time"
)

Expand Down Expand Up @@ -127,6 +129,10 @@ func (s *Service) DeleteAsset(ctx context.Context, id string) (err error) {
}

func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun bool) (affectedRows uint32, err error) {
var wg sync.WaitGroup
var urns []string
dbErrChan := make(chan error, 1)

total, err := s.assetRepository.GetCountByQueryExpr(ctx, queryExpr, true)
if err != nil {
return 0, err
Expand All @@ -136,20 +142,37 @@ func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun boo
return uint32(total), nil
}

// Perform the Assets deletion asynchronously.
wg.Add(1)
go func() {
urns, err := s.assetRepository.DeleteByQueryExpr(ctx, queryExpr)
if err != nil {
return
}

if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, queryExpr); err != nil {
return
}
defer wg.Done()
urnsDeleted, err := s.assetRepository.DeleteByQueryExpr(context.Background(), queryExpr)
urns = urnsDeleted
dbErrChan <- err
close(dbErrChan)
}()

for _, urn := range urns {
if err := s.lineageRepository.DeleteByURN(ctx, urn); err != nil {
return
}
// Perform Elasticsearch and Lineage asynchronously if the Assets deletion is successful.
wg.Add(1)
go func() {
defer wg.Done()
dbErr := <-dbErrChan
if dbErr == nil {
go func() {
if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(context.Background(), queryExpr); err != nil {
log.Fatalf("Error occurred during Elasticsearch deletion: %s", err)

Check failure on line 163 in core/asset/service.go

View workflow job for this annotation

GitHub Actions / golangci

deep-exit: calls to log.Fatalf only in main() or init() functions (revive)
}
}()

go func() {
for _, urn := range urns {
if err := s.lineageRepository.DeleteByURN(context.Background(), urn); err != nil {
log.Fatalf("Error occurred during Lineage deletion: %s", err)

Check failure on line 170 in core/asset/service.go

View workflow job for this annotation

GitHub Actions / golangci

deep-exit: calls to log.Fatalf only in main() or init() functions (revive)
}
}
}()
} else {
log.Printf("Database deletion failed, skipping Elasticsearch and Lineage deletions: %s", dbErr)
}
}()

Expand Down

0 comments on commit 3ef8559

Please sign in to comment.