Skip to content

Commit

Permalink
feat: Use of configurable concurrency for extracting assets from BigQ…
Browse files Browse the repository at this point in the history
…uery
  • Loading branch information
solsticemj25 committed Jan 18, 2024
1 parent d0af130 commit e73edb7
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 34 deletions.
111 changes: 78 additions & 33 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Config struct {
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"`
Concurrency int `mapstructure:"concurrency" default:"5"`
}

type Exclude struct {
Expand Down Expand Up @@ -122,6 +123,7 @@ type Extractor struct {
policyTagClient *datacatalog.PolicyTagManagerClient
newClient NewClientFunc
randFn randFn
concurrency int

datasetsDurn metric.Int64Histogram
tablesDurn metric.Int64Histogram
Expand Down Expand Up @@ -204,6 +206,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
e.logger.Error("failed to create policy tag manager client", "err", err)
}

e.concurrency = e.config.Concurrency

return nil
}

Expand All @@ -219,15 +223,37 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
return err
}

var wg sync.WaitGroup
ch := make(chan models.Record)

for _, ds := range datasets {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
ds := ds
wg.Add(1)
go func() {
defer wg.Done()

if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
return
}

if err := e.extractTable(ctx, ds, ch); err != nil {
e.logger.Error("failed to extract table", "err", err)
return
}
}()
}

go func() {
wg.Wait()
close(ch)
}()

for asset := range ch {
emit(models.NewRecord(asset.Data()))
}

if !hasNext {
Expand Down Expand Up @@ -286,10 +312,12 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol
}

// Create big query client
func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) {
func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, ch chan<- models.Record) error {
pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50)

pager := iterator.NewPager(ds.Tables(ctx), pageSize, "")
var wg sync.WaitGroup

for {
tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager)
if err != nil {
Expand All @@ -302,37 +330,54 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
}

for _, table := range tables {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
continue
}
table := table
wg.Add(1)
go func() {
defer wg.Done()

emit(models.NewRecord(asset))
processTable(ctx, e, ds, table, ch)
}()
}

if !hasNext {
break
}
}

go func() {
wg.Wait()
close(ch)
}()

return nil
}

func processTable(ctx context.Context, e *Extractor, ds *bigquery.Dataset, table *bigquery.Table, ch chan<- models.Record) {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
return
}

tableFQN := table.FullyQualifiedName()
e.logger.Debug("extracting table", "table", tableFQN)

tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
return
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
return
}

ch <- models.NewRecord(asset)
}

func (e *Extractor) fetchTablesNextPage(
Expand Down
4 changes: 3 additions & 1 deletion plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ func getAllData(emitter *mocks.Emitter, t *testing.T) []*v1beta2.Asset {

// the emulator appending 1 random dataset
// we can't assert it, so we remove it from the list
actual = actual[:len(actual)-1]
if len(actual) > 0 {
actual = actual[:len(actual)-1]
}

// the emulator returning dynamic timestamps
// replace them with static ones
Expand Down

0 comments on commit e73edb7

Please sign in to comment.