Skip to content

Commit

Permalink
feat: added support for lineage thorugh sql field
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Dec 2, 2024
1 parent 1c1ea69 commit 9e3c331
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 19 deletions.
2 changes: 1 addition & 1 deletion plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/bigquery/auditlog"
"github.com/goto/meteor/plugins/extractors/bigquery/upstream"
"github.com/goto/meteor/plugins/internal/upstream"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down
2 changes: 1 addition & 1 deletion plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *ta
return table.Type().String(), &tableSchema, nil
}

func (c *Client) GetTablePreview(ctx context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error) {
func (c *Client) GetTablePreview(_ context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error) {
records, err := c.tunnel.Preview(table, partitionValue, int64(maxRows))
if err != nil {
c.log.Error("failed to preview table", "table", table.Name(), "error", err)
Expand Down
7 changes: 4 additions & 3 deletions plugins/extractors/maxcompute/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type Config struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
} `mapstructure:"exclude,omitempty"`
MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"`
MixValues bool `mapstructure:"mix_values,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"`
MixValues bool `mapstructure:"mix_values,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"`
}
66 changes: 53 additions & 13 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/maxcompute/client"
"github.com/goto/meteor/plugins/extractors/maxcompute/config"
"github.com/goto/meteor/plugins/internal/upstream"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand All @@ -24,6 +25,10 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
tableTypeView = "VIRTUAL_VIEW"
)

type Extractor struct {
plugins.BaseExtractor
logger log.Logger
Expand Down Expand Up @@ -175,7 +180,9 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return nil
}

func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, table *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) {
func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
table *odps.Table, tableType string, tableSchema *tableschema.TableSchema,
) (*v1beta2.Asset, error) {
defaultSchema := "default"
schemaName := schema.Name()
if schemaName == "" {
Expand All @@ -186,12 +193,14 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, table *

var previewFields []string
var previewRows *structpb.ListValue
if tableType == "MANAGED_TABLE" {
if tableType != tableTypeView {
var err error
previewFields, previewRows, err = e.buildPreview(ctx, table, tableSchema)
if err != nil {
e.logger.Warn("error building preview", "err", err, "table", tableSchema.TableName)
}
} else {
e.logger.Debug("skipping preview for view", "table", tableSchema.TableName)
}

asset := &v1beta2.Asset{
Expand All @@ -206,6 +215,17 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, table *

tableAttributesData := e.buildTableAttributesData(schemaName, table, tableSchema)

if tableType == tableTypeView {
query := tableSchema.ViewText
tableAttributesData["sql"] = query
if e.config.BuildViewLineage {
upstreamResources := getUpstreamResources(query)
asset.Lineage = &v1beta2.Lineage{
Upstreams: upstreamResources,
}
}
}

var columns []*v1beta2.Column
for i, col := range tableSchema.Columns {
columnData := &v1beta2.Column{
Expand Down Expand Up @@ -241,6 +261,22 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, table *
return asset, nil
}

func getUpstreamResources(query string) []*v1beta2.Resource {
upstreamDependencies := upstream.ParseTopLevelUpstreamsFromQuery(query)
uniqueUpstreamDependencies := upstream.UniqueFilterResources(upstreamDependencies)
var upstreams []*v1beta2.Resource
for _, dependency := range uniqueUpstreamDependencies {
urn := plugins.BigQueryURN(dependency.Project, dependency.Dataset, dependency.Name)
upstreams = append(upstreams, &v1beta2.Resource{
Urn: urn,
Name: dependency.Name,
Type: "table",
Service: "maxcompute",
})
}
return upstreams
}

func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
if dataType.ID() != datatype.STRUCT {
return nil
Expand All @@ -263,7 +299,7 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
return columns
}

func (e *Extractor) buildTableAttributesData(schemaName string, table *odps.Table, tableInfo *tableschema.TableSchema) map[string]interface{} {
func (e *Extractor) buildTableAttributesData(schemaName string, _ *odps.Table, tableInfo *tableschema.TableSchema) map[string]interface{} {
attributesData := map[string]interface{}{}

attributesData["project_name"] = e.config.ProjectName
Expand Down Expand Up @@ -307,26 +343,30 @@ func (e *Extractor) buildPreview(ctx context.Context, t *odps.Table, tSchema *ta
return nil, nil, nil
}

var tempRows []interface{}

previewFields, previewRows, err := e.client.GetTablePreview(ctx, "", t, maxPreviewRows)
if err != nil {
e.logger.Error("failed to preview table", "table", t.Name(), "error", err)
return nil, nil, err
}

tempRows, err = e.mixValuesIfNeeded(tempRows, time.Time(tSchema.LastModifiedTime).Unix())
if err != nil {
return nil, nil, fmt.Errorf("mix values: %w", err)
}
if e.config.MixValues {
tempRows := make([]interface{}, len(previewRows.GetValues()))
for i, val := range previewRows.GetValues() {
tempRows[i] = val.AsInterface()
}

previewRows, err = structpb.NewList(tempRows)
if err != nil {
return nil, nil, fmt.Errorf("create preview list: %w", err)
tempRows, err = e.mixValuesIfNeeded(tempRows, time.Time(tSchema.LastModifiedTime).Unix())
if err != nil {
return nil, nil, fmt.Errorf("mix values: %w", err)
}

previewRows, err = structpb.NewList(tempRows)
if err != nil {
return nil, nil, fmt.Errorf("create preview list: %w", err)
}
}

return previewFields, previewRows, nil

}

func (e *Extractor) mixValuesIfNeeded(rows []interface{}, rndSeed int64) ([]interface{}, error) {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package upstream_test
import (
"testing"

"github.com/goto/meteor/plugins/extractors/bigquery/upstream"
"github.com/goto/meteor/plugins/internal/upstream"
"github.com/stretchr/testify/assert"
)

Expand Down
File renamed without changes.

0 comments on commit 9e3c331

Please sign in to comment.