diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index fd3e269c..1bad002c 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -56,6 +56,7 @@ type Config struct { UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` UsageProjectIDs []string `mapstructure:"usage_project_ids"` BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"` + Concurrency int `mapstructure:"concurrency" default:"5"` } type Exclude struct { @@ -122,6 +123,7 @@ type Extractor struct { policyTagClient *datacatalog.PolicyTagManagerClient newClient NewClientFunc randFn randFn + concurrency int datasetsDurn metric.Int64Histogram tablesDurn metric.Int64Histogram @@ -204,6 +206,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { e.logger.Error("failed to create policy tag manager client", "err", err) } + e.concurrency = e.config.Concurrency + return nil } @@ -219,17 +223,27 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { return err } + ch := make(chan *bigquery.Dataset, len(datasets)) + + numWorkers := 5 + var wg sync.WaitGroup + wg.Add(numWorkers) + + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + e.processDatasetWorker(ctx, emit, ch) + }() + } + for _, ds := range datasets { - if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { - e.excludedDatasetCtr.Add( - ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)), - ) - e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) - continue - } - e.extractTable(ctx, ds, emit) + ch <- ds } + close(ch) + + wg.Wait() + if !hasNext { break } @@ -238,6 +252,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { return nil } +func (e *Extractor) processDatasetWorker(ctx context.Context, emit plugins.Emit, ch <-chan *bigquery.Dataset) { + for ds := range ch { + if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { + e.excludedDatasetCtr.Add( + ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)), + ) + e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) + continue + } + + if err := e.extractTable(ctx, ds, emit); err != nil { + e.logger.Error("failed to extract table", "err", err) + return + } + } +} + func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) { defer func(start time.Time) { attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)} @@ -286,7 +317,7 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol } // Create big query client -func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) { +func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) error { pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50) pager := iterator.NewPager(ds.Tables(ctx), pageSize, "") @@ -301,38 +332,48 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit continue } + var wg sync.WaitGroup for _, table := range tables { - if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { - e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes( - attribute.String("bq.project_id", e.config.ProjectID), - attribute.String("bq.dataset_id", ds.DatasetID), - )) - e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) - continue - } + wg.Add(1) + go func(table *bigquery.Table) { + defer wg.Done() - tableFQN := table.FullyQualifiedName() + if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { + e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes( + attribute.String("bq.project_id", e.config.ProjectID), + attribute.String("bq.dataset_id", ds.DatasetID), + )) + e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) + return + } - e.logger.Debug("extracting table", "table", tableFQN) - tmd, err := e.fetchTableMetadata(ctx, table) - if err != nil { - e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) - continue - } + tableFQN := table.FullyQualifiedName() + e.logger.Debug("extracting table", "table", tableFQN) - asset, err := e.buildAsset(ctx, table, tmd) - if err != nil { - e.logger.Error("failed to build asset", "err", err, "table", tableFQN) - continue - } + tmd, err := e.fetchTableMetadata(ctx, table) + if err != nil { + e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) + return + } + + asset, err := e.buildAsset(ctx, table, tmd) + if err != nil { + e.logger.Error("failed to build asset", "err", err, "table", tableFQN) + return + } - emit(models.NewRecord(asset)) + emit(models.NewRecord(asset)) + }(table) } + wg.Wait() + if !hasNext { break } } + + return nil } func (e *Extractor) fetchTablesNextPage(