Skip to content

Commit

Permalink
feat: Use of configurable concurrency for extracting assets from BigQ…
Browse files Browse the repository at this point in the history
…uery
  • Loading branch information
solsticemj25 committed Jan 19, 2024
1 parent d0af130 commit 9df98b8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 35 deletions.
110 changes: 75 additions & 35 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Config struct {
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"`
Concurrency int `mapstructure:"concurrency"`
}

type Exclude struct {
Expand Down Expand Up @@ -219,17 +220,27 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
return err
}

ch := make(chan *bigquery.Dataset, len(datasets))

numWorkers := e.config.Concurrency
var wg sync.WaitGroup
wg.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
e.processDatasetWorker(ctx, emit, ch)
}()
}

for _, ds := range datasets {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
ch <- ds
}

close(ch)

wg.Wait()

if !hasNext {
break
}
Expand All @@ -238,6 +249,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
return nil
}

func (e *Extractor) processDatasetWorker(ctx context.Context, emit plugins.Emit, ch <-chan *bigquery.Dataset) {
for ds := range ch {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}

if err := e.extractTable(ctx, ds, emit); err != nil {
e.logger.Error("failed to extract table", "err", err)
return
}
}
}

func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)}
Expand Down Expand Up @@ -286,10 +314,10 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol
}

// Create big query client
func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) {
func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) error {
pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50)

pager := iterator.NewPager(ds.Tables(ctx), pageSize, "")

for {
tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager)
if err != nil {
Expand All @@ -301,38 +329,50 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
continue
}

var wg sync.WaitGroup
wg.Add(len(tables))
for _, table := range tables {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
continue
}

emit(models.NewRecord(asset))
go e.processTable(ctx, ds, table, emit, &wg)
}

wg.Wait()

if !hasNext {
break
}
}

return nil
}

func (e *Extractor) processTable(ctx context.Context, ds *bigquery.Dataset, table *bigquery.Table, emit plugins.Emit, wg *sync.WaitGroup) {
defer wg.Done()

if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
return
}

tableFQN := table.FullyQualifiedName()
e.logger.Debug("extracting table", "table", tableFQN)

tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
return
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
return
}

emit(models.NewRecord(asset))
}

func (e *Extractor) fetchTablesNextPage(
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestExtract(t *testing.T) {
"datasets": []string{"exclude_this_dataset"},
"tables": []string{"dataset1.exclude_this_table"},
},
"concurrency": 5,
},
}, nil)

Expand Down

0 comments on commit 9df98b8

Please sign in to comment.