Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MaxCompute Extractor Plugin #69

Merged
merged 14 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ require (
cloud.google.com/go/storage v1.29.0
github.com/AlecAivazis/survey/v2 v2.3.6
github.com/ClickHouse/clickhouse-go v1.4.5
github.com/IBM/sarama v1.43.2
github.com/MakeNowJust/heredoc v1.0.0
github.com/aliyun/aliyun-odps-go-sdk v0.3.14
github.com/aws/aws-sdk-go v1.44.151
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -38,6 +40,7 @@ require (
github.com/ory/dockertest/v3 v3.9.1
github.com/pkg/errors v0.9.1
github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c
github.com/rs/zerolog v1.32.0
github.com/schollz/progressbar/v3 v3.13.1
github.com/segmentio/kafka-go v0.4.47
github.com/sijms/go-ora/v2 v2.7.9
Expand All @@ -64,7 +67,7 @@ require (
google.golang.org/api v0.114.0
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -77,10 +80,12 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/IBM/sarama v1.43.2 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/alibabacloud-go/debug v1.0.1 // indirect
github.com/alibabacloud-go/tea v1.2.2 // indirect
github.com/aliyun/credentials-go v1.3.10 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v11 v11.0.0 // indirect
Expand Down Expand Up @@ -136,11 +141,12 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/gorilla/css v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
Expand All @@ -158,6 +164,7 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jeremywohl/flatten v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
Expand All @@ -178,6 +185,8 @@ require (
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.13.0 // indirect
Expand All @@ -198,7 +207,6 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.4.3 // indirect
github.com/rs/zerolog v1.32.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.4 // indirect
github.com/shoenig/go-m1cpu v0.1.5 // indirect
Expand Down Expand Up @@ -232,7 +240,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
Expand Down
69 changes: 59 additions & 10 deletions go.sum

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions plugins/extractors/maxcompute/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# maxcompute
## Usage
The `maxcompute` extractor allows you to extract metadata from MaxCompute tables and schemas.
It supports configuration for project name, endpoint, access keys, schema name, exclusions, and concurrency.

```yaml
source:
name: maxcompute
config:
project_name: goto_test
endpoint_project: http://goto_test-maxcompute.com
access_key:
id: access_key_id
secret: access_key_secret
schema_name: DEFAULT
exclude:
schemas:
- schema_a
- schema_b
tables:
- schema_c.table_a
concurrency: 10
```

## Inputs

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
| `project_name` | `string` | `goto_test` | MaxCompute Project Name | *required* |
| `endpoint_project` | `string` | `http://goto_test-maxcompute.com` | Endpoint Project URL | *required* |
| `access_key.id` | `string` | `access_key_id` | Access Key ID | *required* |
| `access_key.secret` | `string` | `access_key_secret` | Access Key Secret | *required* |
| `schema_name` | `string` | `DEFAULT` | Default schema name | *optional* |
| `exclude.schemas` | `[]string` | `["schema_a", "schema_b"]` | List of schemas to exclude | *optional* |
| `exclude.tables` | `[]string` | `["schema_c.table_a"]` | List of tables to exclude | *optional* |
| `concurrency` | `int` | `10` | Number of concurrent requests to MaxCompute | *optional* |

### *Notes*

- Leaving `access_key` blank will default to [MaxCompute's default authentication][maxcompute-default-auth].

## Outputs

| Field | Sample Value | Description |
|:-------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------|
| `resource.urn` | `project_name.schema_name.table_name` | |
| `resource.name` | `table_name` | |
| `resource.service` | `maxcompute` | |
| `description` | `table description` | |
| `schema` | [][Column](#column) | |
| `properties.partition_data` | `"partition_data": {"partition_field": "data_date", "require_partition_filter": false, "time_partition": {"partition_by": "DAY","partition_expire": 0 } }` | partition related data for time and range partitioning. |
| `properties.partition_field` | `created_at` | returns the field on which table is time partitioned |

### Partition Data

| Field | Sample Value | Description |
|:------------------------------------------|:-------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `partition_field` | `created_at` | field on which the table is partitioned either by TimePartitioning or RangePartitioning. In case field is empty for TimePartitioning _PARTITIONTIME is returned instead of empty. |
| `require_partition_filter` | `true` | boolean value which denotes if every query on the MaxCompute table must include at least one predicate that only references the partitioning column |
| `time_partition.partition_by` | `HOUR` | returns partition type HOUR/DAY/MONTH/YEAR |
| `time_partition.partition_expire_seconds` | `0` | time in which data will expire from this partition. If 0 it will not expire. |
| `range_partition.interval` | `10` | width of a interval range |
| `range_partition.start` | `0` | start value for partition inclusive of this value |
| `range_partition.end` | `100` | end value for partition exclusive of this value |


### Column

| Field | Sample Value |
|:--------------|:---------------------------------------|
| `name` | `total_price` |
| `description` | `item's total price` |
| `data_type` | `decimal` |
| `is_nullable` | `true` |

### Join

| Field | Sample Value |
|:-------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------|
| `urn` | `project_name.schema_name.table_name` |
| `count` | `3` |
| `conditions` | [`"ON target.column_1 = source.column_1 and target.param_name = source.param_name"`,`"ON DATE(target.event_timestamp) = DATE(source.event_timestamp)"`] |

## Contributing

Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on
contributing to this module.

[maxcompute-default-auth]: https://www.alibabacloud.com/help/doc-detail/27800.htm

73 changes: 73 additions & 0 deletions plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package client

import (
"context"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/maxcompute/config"
"github.com/goto/salt/log"
)

type Client struct {
client *odps.Odps
project *odps.Project
log log.Logger
}

func New(conf config.Config) *Client {
aliAccount := account.NewAliyunAccount(conf.AccessKey.ID, conf.AccessKey.Secret)
client := odps.NewOdps(aliAccount, conf.EndpointProject)
client.SetDefaultProjectName(conf.ProjectName)

project := client.Project(conf.ProjectName)

return &Client{
client: client,
project: project,
log: plugins.GetLog(),
}
}

func (c *Client) ListSchema(context.Context) (schemas []*odps.Schema, err error) {
err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) {
if err2 != nil {
err = err2
c.log.Error("failed to process schema", "with error:", err)
return
}
schemas = append(schemas, schema)
})

return schemas, err
}

func (c *Client) ListTable(_ context.Context, schemaName string) (tables []*odps.Table, err error) {
t := odps.NewTables(c.client, c.project.Name(), schemaName)
t.List(
func(table *odps.Table, err2 error) {
if err2 != nil {
err = err2
c.log.Error("failed to process table", "with error:", err)
return
}
tables = append(tables, table)
},
)
return tables, err
}

func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *tableschema.TableSchema, error) {
err := table.Load()
tableSchema := table.Schema()
if err != nil {
isView := tableSchema.IsVirtualView || tableSchema.IsMaterializedView
isLoaded := table.IsLoaded()
if !isView || (isView && !isLoaded) {
return "", nil, err
}
}
return table.Type().String(), &tableSchema, nil
}
16 changes: 16 additions & 0 deletions plugins/extractors/maxcompute/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package config

type Config struct {
ProjectName string `mapstructure:"project_name"`
EndpointProject string `mapstructure:"endpoint_project"`
AccessKey struct {
ID string `mapstructure:"id"`
Secret string `mapstructure:"secret"`
} `mapstructure:"access_key"`
SchemaName string `mapstructure:"schema_name,omitempty"`
Exclude struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
} `mapstructure:"exclude,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
}
Loading
Loading