From a8749af671069b53e49851b35aa09aa0dafa4a0c Mon Sep 17 00:00:00 2001 From: Mayurjag <63900197+Mayurjag@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:37:09 +0530 Subject: [PATCH] feat: add schema check and bug fixes (#74) * fix: Fixed bug of sending partition fields as interface * feat: added schema check --------- Co-authored-by: Mayur Jagtap --- .../extractors/maxcompute/client/client.go | 22 ++++++++++--- plugins/extractors/maxcompute/maxcompute.go | 33 ++++++++++++++----- .../extractors/maxcompute/maxcompute_test.go | 16 ++++----- .../expected-assets-with-table-exclusion.json | 5 +-- .../expected-assets-with-view-lineage.json | 10 +++--- .../maxcompute/testdata/expected-assets.json | 10 +++--- 6 files changed, 65 insertions(+), 31 deletions(-) diff --git a/plugins/extractors/maxcompute/client/client.go b/plugins/extractors/maxcompute/client/client.go index 3aba0b9d..000d1db6 100644 --- a/plugins/extractors/maxcompute/client/client.go +++ b/plugins/extractors/maxcompute/client/client.go @@ -15,6 +15,8 @@ type Client struct { client *odps.Odps project *odps.Project tunnel *tunnel.Tunnel + + isSchemaEnabled bool } func New(conf config.Config) (*Client, error) { @@ -28,14 +30,26 @@ func New(conf config.Config) (*Client, error) { return nil, err } + properties, err := project.GetAllProperties() + if err != nil { + return nil, err + } + isSchemaEnabled := properties.Get("odps.namespace.schema") == "true" + return &Client{ - client: client, - project: project, - tunnel: tunnelInstance, + client: client, + project: project, + tunnel: tunnelInstance, + isSchemaEnabled: isSchemaEnabled, }, nil } -func (c *Client) ListSchema(context.Context) (schemas []*odps.Schema, err error) { +func (c *Client) ListSchema(_ context.Context) (schemas []*odps.Schema, err error) { + if !c.isSchemaEnabled { + schema := odps.NewSchema(nil, "", "default") + return []*odps.Schema{schema}, nil + } + err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) { if err2 != nil { err = err2 diff --git a/plugins/extractors/maxcompute/maxcompute.go b/plugins/extractors/maxcompute/maxcompute.go index 8a8b9ed9..f7acc415 100644 --- a/plugins/extractors/maxcompute/maxcompute.go +++ b/plugins/extractors/maxcompute/maxcompute.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to print the embedded assets "fmt" + "math/rand" "time" "github.com/aliyun/aliyun-odps-go-sdk/odps" @@ -25,6 +26,10 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +const ( + maxcomputeService = "maxcompute" +) + type Extractor struct { plugins.BaseExtractor logger log.Logger @@ -77,10 +82,11 @@ type Client interface { GetTablePreview(ctx context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error) } -func New(logger log.Logger, clientFunc NewClientFunc) *Extractor { +func New(logger log.Logger, clientFunc NewClientFunc, randFn randFn) *Extractor { e := &Extractor{ logger: logger, newClient: clientFunc, + randFn: randFn, } e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) e.ScopeNotRequired = true @@ -199,14 +205,14 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, asset := &v1beta2.Asset{ Urn: tableURN, Name: tableSchema.TableName, - Type: tableType, + Type: "table", Description: tableSchema.Comment, CreateTime: timestamppb.New(time.Time(tableSchema.CreateTime)), UpdateTime: timestamppb.New(time.Time(tableSchema.LastModifiedTime)), - Service: "maxcompute", + Service: maxcomputeService, } - tableAttributesData := e.buildTableAttributesData(schemaName, tableSchema) + tableAttributesData := e.buildTableAttributesData(schemaName, tableType, tableSchema) if tableType == config.TableTypeView { query := tableSchema.ViewText @@ -264,7 +270,7 @@ func getUpstreamResources(query string) []*v1beta2.Resource { Urn: urn, Name: dependency.Name, Type: "table", - Service: "maxcompute", + Service: maxcomputeService, }) } return upstreams @@ -292,11 +298,12 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column { return columns } -func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *tableschema.TableSchema) map[string]interface{} { +func (e *Extractor) buildTableAttributesData(schemaName, tableType string, tableInfo *tableschema.TableSchema) map[string]interface{} { attributesData := map[string]interface{}{} attributesData["project_name"] = e.config.ProjectName attributesData["schema"] = schemaName + attributesData["type"] = tableType rb := common.ResourceBuilder{ProjectName: e.config.ProjectName} attributesData["resource_url"] = rb.Table(tableInfo.TableName) @@ -305,8 +312,9 @@ func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *table attributesData["sql"] = tableInfo.ViewText } - partitionNames := make([]string, len(tableInfo.PartitionColumns)) + var partitionNames []interface{} if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 { + partitionNames = make([]interface{}, len(tableInfo.PartitionColumns)) for i, column := range tableInfo.PartitionColumns { partitionNames[i] = column.Name } @@ -415,13 +423,20 @@ func contains(slice []string, item string) bool { } func init() { - if err := registry.Extractors.Register("maxcompute", func() plugins.Extractor { - return New(plugins.GetLog(), CreateClient) + if err := registry.Extractors.Register(maxcomputeService, func() plugins.Extractor { + return New(plugins.GetLog(), CreateClient, seededRandom) }); err != nil { panic(err) } } +func seededRandom(seed int64) func(max int64) int64 { + rnd := rand.New(rand.NewSource(seed)) //nolint:gosec + return func(max int64) int64 { + return rnd.Int63n(max) + } +} + func CreateClient(_ context.Context, _ log.Logger, conf config.Config) (Client, error) { return client.New(conf) } diff --git a/plugins/extractors/maxcompute/maxcompute_test.go b/plugins/extractors/maxcompute/maxcompute_test.go index c18013a4..de7104f1 100644 --- a/plugins/extractors/maxcompute/maxcompute_test.go +++ b/plugins/extractors/maxcompute/maxcompute_test.go @@ -34,7 +34,7 @@ func TestInit(t *testing.T) { mockClient := mocks.NewMaxComputeClient(t) t.Run("should return error if config is invalid", func(t *testing.T) { - extr := maxcompute.New(utils.Logger, createClient(mockClient)) + extr := maxcompute.New(utils.Logger, createClient(mockClient), nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, plugins.Config{ @@ -48,7 +48,7 @@ func TestInit(t *testing.T) { }) t.Run("should return no error", func(t *testing.T) { - extr := maxcompute.New(utils.Logger, createClient(mockClient)) + extr := maxcompute.New(utils.Logger, createClient(mockClient), nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, plugins.Config{ @@ -153,12 +153,12 @@ func TestExtract(t *testing.T) { "new_table": &newTableSchema, } - runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient)) ([]*v1beta2.Asset, error) { + runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient), randomizer func(seed int64) func(int64) int64) ([]*v1beta2.Asset, error) { mockClient := mocks.NewMaxComputeClient(t) if mockSetup != nil { mockSetup(mockClient) } - extr := maxcompute.New(utils.Logger, createClient(mockClient)) + extr := maxcompute.New(utils.Logger, createClient(mockClient), randomizer) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err := extr.Init(ctx, cfg) @@ -216,7 +216,7 @@ func TestExtract(t *testing.T) { }, nil, ) - }) + }, nil) assert.Nil(t, err) assert.NotEmpty(t, actual) @@ -266,7 +266,7 @@ func TestExtract(t *testing.T) { }, nil, ) - }) + }, nil) assert.Nil(t, err) assert.NotEmpty(t, actual) @@ -291,7 +291,7 @@ func TestExtract(t *testing.T) { mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil) mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil) - }) + }, nil) assert.Nil(t, err) assert.NotEmpty(t, actual) @@ -311,7 +311,7 @@ func TestExtract(t *testing.T) { }, }, func(mockClient *mocks.MaxComputeClient) { mockClient.EXPECT().ListSchema(mock.Anything).Return(nil, fmt.Errorf("ListSchema fails")) - }) + }, nil) assert.ErrorContains(t, err, "ListSchema fails") assert.Nil(t, actual) }) diff --git a/plugins/extractors/maxcompute/testdata/expected-assets-with-table-exclusion.json b/plugins/extractors/maxcompute/testdata/expected-assets-with-table-exclusion.json index 9d28f8d1..3d6e26c8 100644 --- a/plugins/extractors/maxcompute/testdata/expected-assets-with-table-exclusion.json +++ b/plugins/extractors/maxcompute/testdata/expected-assets-with-table-exclusion.json @@ -3,7 +3,7 @@ "urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table", "name": "new_table", "service": "maxcompute", - "type": "MANAGED_TABLE", + "type": "table", "url": "", "description": "", "data": { @@ -37,7 +37,8 @@ "project_name": "test-project-id", "resource_url": "/projects/test-project-id/tables/new_table", "schema": "my_schema", - "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table" + "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table", + "type": "MANAGED_TABLE" }, "create_time": "2024-11-18T08:00:00Z", "update_time": "2024-11-18T08:00:00Z" diff --git a/plugins/extractors/maxcompute/testdata/expected-assets-with-view-lineage.json b/plugins/extractors/maxcompute/testdata/expected-assets-with-view-lineage.json index 4c18140c..25193856 100644 --- a/plugins/extractors/maxcompute/testdata/expected-assets-with-view-lineage.json +++ b/plugins/extractors/maxcompute/testdata/expected-assets-with-view-lineage.json @@ -3,7 +3,7 @@ "urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table", "name": "dummy_table", "service": "maxcompute", - "type": "VIRTUAL_VIEW", + "type": "table", "url": "", "description": "dummy table description", "data": { @@ -58,7 +58,8 @@ "project_name": "test-project-id", "resource_url": "/projects/test-project-id/tables/dummy_table", "schema": "my_schema", - "sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table" + "sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table", + "type": "VIRTUAL_VIEW" }, "create_time": "2024-11-14T06:41:35Z", "update_time": "2024-11-14T06:41:35Z" @@ -84,7 +85,7 @@ "urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table", "name": "new_table", "service": "maxcompute", - "type": "MANAGED_TABLE", + "type": "table", "url": "", "description": "", "data": { @@ -130,7 +131,8 @@ "project_name": "test-project-id", "resource_url": "/projects/test-project-id/tables/new_table", "schema": "my_schema", - "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table" + "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table", + "type": "MANAGED_TABLE" }, "create_time": "2024-11-18T08:00:00Z", "update_time": "2024-11-18T08:00:00Z" diff --git a/plugins/extractors/maxcompute/testdata/expected-assets.json b/plugins/extractors/maxcompute/testdata/expected-assets.json index 8e7539c8..d1a75dc2 100644 --- a/plugins/extractors/maxcompute/testdata/expected-assets.json +++ b/plugins/extractors/maxcompute/testdata/expected-assets.json @@ -3,7 +3,7 @@ "urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table", "name": "dummy_table", "service": "maxcompute", - "type": "VIRTUAL_VIEW", + "type": "table", "url": "", "description": "dummy table description", "data": { @@ -58,7 +58,8 @@ "project_name": "test-project-id", "resource_url": "/projects/test-project-id/tables/dummy_table", "schema": "my_schema", - "sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table" + "sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table", + "type": "VIRTUAL_VIEW" }, "create_time": "2024-11-14T06:41:35Z", "update_time": "2024-11-14T06:41:35Z" @@ -74,7 +75,7 @@ "urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table", "name": "new_table", "service": "maxcompute", - "type": "MANAGED_TABLE", + "type": "table", "url": "", "description": "", "data": { @@ -120,7 +121,8 @@ "project_name": "test-project-id", "resource_url": "/projects/test-project-id/tables/new_table", "schema": "my_schema", - "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table" + "sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table", + "type": "MANAGED_TABLE" }, "create_time": "2024-11-18T08:00:00Z", "update_time": "2024-11-18T08:00:00Z"