From 2d5b09eeee41515378976d3e3526c3037bc3839b Mon Sep 17 00:00:00 2001 From: Mayur Jagtap Date: Thu, 18 Jan 2024 13:15:54 +0530 Subject: [PATCH] feat: Use of configurable concurrency for extracting assets from BigQuery --- plugins/extractors/bigquery/bigquery.go | 110 +++++++++++++------ plugins/extractors/bigquery/bigquery_test.go | 2 + 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index fd3e269c4..9d9fc894a 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -56,6 +56,7 @@ type Config struct { UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` UsageProjectIDs []string `mapstructure:"usage_project_ids"` BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"` + Concurrency int `mapstructure:"concurrency"` } type Exclude struct { @@ -219,17 +220,27 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { return err } + ch := make(chan *bigquery.Dataset, len(datasets)) + + numWorkers := e.config.Concurrency + var wg sync.WaitGroup + wg.Add(numWorkers) + + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + e.processDatasetWorker(ctx, emit, ch) + }() + } + for _, ds := range datasets { - if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { - e.excludedDatasetCtr.Add( - ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)), - ) - e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) - continue - } - e.extractTable(ctx, ds, emit) + ch <- ds } + close(ch) + + wg.Wait() + if !hasNext { break } @@ -238,6 +249,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { return nil } +func (e *Extractor) processDatasetWorker(ctx context.Context, emit plugins.Emit, ch <-chan *bigquery.Dataset) { + for ds := range ch { + if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { + e.excludedDatasetCtr.Add( + ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)), + ) + e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) + continue + } + + if err := e.extractTable(ctx, ds, emit); err != nil { + e.logger.Error("failed to extract table", "err", err) + return + } + } +} + func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) { defer func(start time.Time) { attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)} @@ -286,10 +314,10 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol } // Create big query client -func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) { +func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) error { pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50) - pager := iterator.NewPager(ds.Tables(ctx), pageSize, "") + for { tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager) if err != nil { @@ -301,38 +329,50 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit continue } + var wg sync.WaitGroup + wg.Add(len(tables)) for _, table := range tables { - if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { - e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes( - attribute.String("bq.project_id", e.config.ProjectID), - attribute.String("bq.dataset_id", ds.DatasetID), - )) - e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) - continue - } - - tableFQN := table.FullyQualifiedName() - - e.logger.Debug("extracting table", "table", tableFQN) - tmd, err := e.fetchTableMetadata(ctx, table) - if err != nil { - e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) - continue - } - - asset, err := e.buildAsset(ctx, table, tmd) - if err != nil { - e.logger.Error("failed to build asset", "err", err, "table", tableFQN) - continue - } - - emit(models.NewRecord(asset)) + go e.processTable(ctx, ds, table, emit, &wg) } + wg.Wait() + if !hasNext { break } } + + return nil +} + +func (e *Extractor) processTable(ctx context.Context, ds *bigquery.Dataset, table *bigquery.Table, emit plugins.Emit, wg *sync.WaitGroup) { + defer wg.Done() + + if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { + e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes( + attribute.String("bq.project_id", e.config.ProjectID), + attribute.String("bq.dataset_id", ds.DatasetID), + )) + e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) + return + } + + tableFQN := table.FullyQualifiedName() + e.logger.Debug("extracting table", "table", tableFQN) + + tmd, err := e.fetchTableMetadata(ctx, table) + if err != nil { + e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) + return + } + + asset, err := e.buildAsset(ctx, table, tmd) + if err != nil { + e.logger.Error("failed to build asset", "err", err, "table", tableFQN) + return + } + + emit(models.NewRecord(asset)) } func (e *Extractor) fetchTablesNextPage( diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index 1bb5d4677..a286a02b5 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -180,6 +180,7 @@ func TestExtract(t *testing.T) { "datasets": []string{"exclude_this_dataset"}, "tables": []string{"dataset1.exclude_this_table"}, }, + "concurrency": 5, }, }, nil) @@ -198,6 +199,7 @@ func TestExtract(t *testing.T) { "datasets": []string{"exclude_this_dataset"}, "tables": []string{"dataset1.exclude_this_table"}, }, + "concurrency": 5, }, }