Skip to content

Commit

Permalink
feat: add schema check and bug fixes (#74)
Browse files Browse the repository at this point in the history
* fix: Fixed bug of sending partition fields as interface

* feat: added schema check

---------

Co-authored-by: Mayur Jagtap <[email protected]>
  • Loading branch information
Mayurjag and solsticemj25 authored Dec 5, 2024
1 parent 24f52f1 commit a8749af
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 31 deletions.
22 changes: 18 additions & 4 deletions plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Client struct {
client *odps.Odps
project *odps.Project
tunnel *tunnel.Tunnel

isSchemaEnabled bool
}

func New(conf config.Config) (*Client, error) {
Expand All @@ -28,14 +30,26 @@ func New(conf config.Config) (*Client, error) {
return nil, err
}

properties, err := project.GetAllProperties()
if err != nil {
return nil, err
}
isSchemaEnabled := properties.Get("odps.namespace.schema") == "true"

return &Client{
client: client,
project: project,
tunnel: tunnelInstance,
client: client,
project: project,
tunnel: tunnelInstance,
isSchemaEnabled: isSchemaEnabled,
}, nil
}

func (c *Client) ListSchema(context.Context) (schemas []*odps.Schema, err error) {
func (c *Client) ListSchema(_ context.Context) (schemas []*odps.Schema, err error) {
if !c.isSchemaEnabled {
schema := odps.NewSchema(nil, "", "default")
return []*odps.Schema{schema}, nil
}

err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) {
if err2 != nil {
err = err2
Expand Down
33 changes: 24 additions & 9 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
_ "embed" // used to print the embedded assets
"fmt"
"math/rand"
"time"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
Expand All @@ -25,6 +26,10 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
maxcomputeService = "maxcompute"
)

type Extractor struct {
plugins.BaseExtractor
logger log.Logger
Expand Down Expand Up @@ -77,10 +82,11 @@ type Client interface {
GetTablePreview(ctx context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error)
}

func New(logger log.Logger, clientFunc NewClientFunc) *Extractor {
func New(logger log.Logger, clientFunc NewClientFunc, randFn randFn) *Extractor {
e := &Extractor{
logger: logger,
newClient: clientFunc,
randFn: randFn,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
e.ScopeNotRequired = true
Expand Down Expand Up @@ -199,14 +205,14 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
asset := &v1beta2.Asset{
Urn: tableURN,
Name: tableSchema.TableName,
Type: tableType,
Type: "table",
Description: tableSchema.Comment,
CreateTime: timestamppb.New(time.Time(tableSchema.CreateTime)),
UpdateTime: timestamppb.New(time.Time(tableSchema.LastModifiedTime)),
Service: "maxcompute",
Service: maxcomputeService,
}

tableAttributesData := e.buildTableAttributesData(schemaName, tableSchema)
tableAttributesData := e.buildTableAttributesData(schemaName, tableType, tableSchema)

if tableType == config.TableTypeView {
query := tableSchema.ViewText
Expand Down Expand Up @@ -264,7 +270,7 @@ func getUpstreamResources(query string) []*v1beta2.Resource {
Urn: urn,
Name: dependency.Name,
Type: "table",
Service: "maxcompute",
Service: maxcomputeService,
})
}
return upstreams
Expand Down Expand Up @@ -292,11 +298,12 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
return columns
}

func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *tableschema.TableSchema) map[string]interface{} {
func (e *Extractor) buildTableAttributesData(schemaName, tableType string, tableInfo *tableschema.TableSchema) map[string]interface{} {
attributesData := map[string]interface{}{}

attributesData["project_name"] = e.config.ProjectName
attributesData["schema"] = schemaName
attributesData["type"] = tableType

rb := common.ResourceBuilder{ProjectName: e.config.ProjectName}
attributesData["resource_url"] = rb.Table(tableInfo.TableName)
Expand All @@ -305,8 +312,9 @@ func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *table
attributesData["sql"] = tableInfo.ViewText
}

partitionNames := make([]string, len(tableInfo.PartitionColumns))
var partitionNames []interface{}
if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 {
partitionNames = make([]interface{}, len(tableInfo.PartitionColumns))
for i, column := range tableInfo.PartitionColumns {
partitionNames[i] = column.Name
}
Expand Down Expand Up @@ -415,13 +423,20 @@ func contains(slice []string, item string) bool {
}

func init() {
if err := registry.Extractors.Register("maxcompute", func() plugins.Extractor {
return New(plugins.GetLog(), CreateClient)
if err := registry.Extractors.Register(maxcomputeService, func() plugins.Extractor {
return New(plugins.GetLog(), CreateClient, seededRandom)
}); err != nil {
panic(err)
}
}

func seededRandom(seed int64) func(max int64) int64 {
rnd := rand.New(rand.NewSource(seed)) //nolint:gosec
return func(max int64) int64 {
return rnd.Int63n(max)
}
}

func CreateClient(_ context.Context, _ log.Logger, conf config.Config) (Client, error) {
return client.New(conf)
}
16 changes: 8 additions & 8 deletions plugins/extractors/maxcompute/maxcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInit(t *testing.T) {

mockClient := mocks.NewMaxComputeClient(t)
t.Run("should return error if config is invalid", func(t *testing.T) {
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, plugins.Config{
Expand All @@ -48,7 +48,7 @@ func TestInit(t *testing.T) {
})

t.Run("should return no error", func(t *testing.T) {
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, plugins.Config{
Expand Down Expand Up @@ -153,12 +153,12 @@ func TestExtract(t *testing.T) {
"new_table": &newTableSchema,
}

runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient)) ([]*v1beta2.Asset, error) {
runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient), randomizer func(seed int64) func(int64) int64) ([]*v1beta2.Asset, error) {
mockClient := mocks.NewMaxComputeClient(t)
if mockSetup != nil {
mockSetup(mockClient)
}
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), randomizer)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := extr.Init(ctx, cfg)
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestExtract(t *testing.T) {
},
nil,
)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestExtract(t *testing.T) {
},
nil,
)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand All @@ -291,7 +291,7 @@ func TestExtract(t *testing.T) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand All @@ -311,7 +311,7 @@ func TestExtract(t *testing.T) {
},
}, func(mockClient *mocks.MaxComputeClient) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(nil, fmt.Errorf("ListSchema fails"))
})
}, nil)
assert.ErrorContains(t, err, "ListSchema fails")
assert.Nil(t, actual)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -37,7 +37,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table",
"name": "dummy_table",
"service": "maxcompute",
"type": "VIRTUAL_VIEW",
"type": "table",
"url": "",
"description": "dummy table description",
"data": {
Expand Down Expand Up @@ -58,7 +58,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/dummy_table",
"schema": "my_schema",
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table"
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table",
"type": "VIRTUAL_VIEW"
},
"create_time": "2024-11-14T06:41:35Z",
"update_time": "2024-11-14T06:41:35Z"
Expand All @@ -84,7 +85,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -130,7 +131,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down
10 changes: 6 additions & 4 deletions plugins/extractors/maxcompute/testdata/expected-assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table",
"name": "dummy_table",
"service": "maxcompute",
"type": "VIRTUAL_VIEW",
"type": "table",
"url": "",
"description": "dummy table description",
"data": {
Expand Down Expand Up @@ -58,7 +58,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/dummy_table",
"schema": "my_schema",
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table"
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table",
"type": "VIRTUAL_VIEW"
},
"create_time": "2024-11-14T06:41:35Z",
"update_time": "2024-11-14T06:41:35Z"
Expand All @@ -74,7 +75,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -120,7 +121,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down

0 comments on commit a8749af

Please sign in to comment.