diff --git a/catalog/catalog.go b/catalog/catalog.go index 65da7e54..2adfb6e8 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -23,6 +23,7 @@ import ( "errors" "net/url" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" @@ -32,6 +33,18 @@ type CatalogType string type AwsProperties map[string]string +// OSSConfig contains configuration for accessing OSS object storage +type OSSConfig struct { + // Endpoint specifies the OSS service endpoint URL + Endpoint string + // AccessKey is the access key ID for OSS authentication + AccessKey string + // SecretKey is the secret access key for OSS authentication + SecretKey string + // SignatureVersion is the signature version for OSS authentication + SignatureVersion oss.SignatureVersionType +} + const ( REST CatalogType = "rest" Hive CatalogType = "hive" @@ -122,12 +135,21 @@ func WithPrefix(prefix string) Option[RestCatalog] { } } +// WithOSSConfig sets the OSS configuration for the catalog. +func WithOSSConfig(cfg OSSConfig) Option[RestCatalog] { + return func(o *options) { + o.ossConfig = cfg + } +} + type Option[T GlueCatalog | RestCatalog] func(*options) type options struct { awsConfig aws.Config awsProperties AwsProperties + ossConfig OSSConfig + tlsConfig *tls.Config credential string oauthToken string diff --git a/catalog/rest.go b/catalog/rest.go index ef9c332f..0b547601 100644 --- a/catalog/rest.go +++ b/catalog/rest.go @@ -29,6 +29,7 @@ import ( "maps" "net/http" "net/url" + "strconv" "strings" "time" @@ -56,6 +57,11 @@ const ( keyRestSigV4Region = "rest.signing-region" keyRestSigV4Service = "rest.signing-name" keyAuthUrl = "rest.authorization-url" + + keyOSSAccessKey = "client.oss-access-key" + keyOSSSecretKey = "client.oss-secret-key" + keyOSSEndpoint = "client.oss-endpoint" + keyOSSSignatureVersion = "client.oss-signature-version" ) var ( @@ -356,6 +362,12 @@ func toProps(o *options) iceberg.Properties { if o.authUri != nil { setIf(keyAuthUrl, o.authUri.String()) } + + setIf(keyOSSAccessKey, o.ossConfig.AccessKey) + setIf(keyOSSSecretKey, o.ossConfig.SecretKey) + setIf(keyOSSEndpoint, o.ossConfig.Endpoint) + // Convert OSS signature version from enum to string representation + setIf(keyOSSSignatureVersion, strconv.FormatInt(int64(o.ossConfig.SignatureVersion), 10)) return props } @@ -515,6 +527,7 @@ func (r *RestCatalog) fetchConfig(opts *options) (*options, error) { o := fromProps(cfg) o.awsConfig = opts.awsConfig + o.ossConfig = opts.ossConfig o.tlsConfig = opts.tlsConfig if uri, ok := cfg["uri"]; ok { diff --git a/go.mod b/go.mod index 14a4d21c..5dba701d 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ go 1.23 toolchain go1.23.2 require ( + github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/aws/aws-sdk-go-v2/config v1.28.5 @@ -97,6 +98,7 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect + golang.org/x/time v0.4.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/go.sum b/go.sum index ecfdcdeb..b8078f97 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwld github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk= github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 h1:grJyLSdRJtfxKKhCTWSeJhnOQsp2WoLNdK8XA5FE9oo= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A= @@ -243,6 +245,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= +golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/io/io.go b/io/io.go index abe5971f..231383d9 100644 --- a/io/io.go +++ b/io/io.go @@ -215,6 +215,8 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) { switch parsed.Scheme { case "s3", "s3a", "s3n": return createS3FileIO(parsed, props) + case "oss": + return createOSSFileIO(parsed, props) case "file", "": return LocalFS{}, nil default: diff --git a/io/oss.go b/io/oss.go new file mode 100644 index 00000000..cd5dc7e4 --- /dev/null +++ b/io/oss.go @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "context" + "fmt" + "io" + "io/fs" + "net/url" + "os" + "strings" + "sync" + "time" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" +) + +// Constants for OSS configuration options +const ( + OSSAccessKey = "client.oss-access-key" + OSSSecretKey = "client.oss-secret-key" + OSSEndpoint = "client.oss-endpoint" + OSSSignatureVersion = "client.oss-signature-version" +) + +func createOSSFileIO(parsed *url.URL, props map[string]string) (IO, error) { + endpoint, ok := props[OSSEndpoint] + if !ok { + endpoint = os.Getenv("OSS_ENDPOINT") + } + if endpoint == "" { + return nil, fmt.Errorf("oss endpoint must be specified") + } + + accessKey := props[OSSAccessKey] + secretKey := props[OSSSecretKey] + + provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey) + + cfg := oss.LoadDefaultConfig(). + WithRetryMaxAttempts(3). + WithCredentialsProvider(provider). + WithEndpoint(endpoint). + WithSignatureVersion(parseSignatureVersion(props[OSSSignatureVersion])). + WithUsePathStyle(true). + WithConnectTimeout(10 * time.Second). + WithReadWriteTimeout(time.Minute) + + client := oss.NewClient(cfg) + + ossFS := &ossFS{ + client: client, + bucket: parsed.Host, + } + + preprocess := func(n string) string { + _, after, found := strings.Cut(n, "://") + if found { + n = after + } + return strings.TrimPrefix(n, parsed.Host) + } + + return FSPreProcName(ossFS, preprocess), nil +} + +// parseSignatureVersion converts string version to oss.SignatureVersionType +// "0" -> 0 (v1) +// "1" -> 1 (v4) +// defaults to 1 (v4) for unknown values +func parseSignatureVersion(version string) oss.SignatureVersionType { + switch version { + case "0": // v1 + return 0 + case "1": // v4 + return 1 + default: + return 1 + } +} + +type ossFS struct { + client *oss.Client + bucket string +} + +// Open implements fs.FS +func (o *ossFS) Open(name string) (fs.File, error) { + if !fs.ValidPath(name) { + return nil, &os.PathError{Op: "open", Path: name, Err: os.ErrInvalid} + } + if name == "." { + return &ossFile{ + name: name, + }, nil + } + name = strings.TrimPrefix(name, "/") + + file, err := o.client.OpenFile(context.Background(), o.bucket, name) + if err != nil { + return nil, err + } + + return &ossFile{ + file: file, + name: name, + }, nil +} + +type ossFile struct { + mutex sync.Mutex + file *oss.ReadOnlyFile + name string +} + +// Read implements io.Reader +func (f *ossFile) Read(p []byte) (int, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.file.Read(p) +} + +// Seek implements io.Seeker +func (f *ossFile) Seek(offset int64, whence int) (int64, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.file.Seek(offset, whence) +} + +// Close implements io.Closer +func (f *ossFile) Close() error { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.file.Close() +} + +// ReadAt implements io.ReaderAt +func (f *ossFile) ReadAt(p []byte, off int64) (n int, err error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, err := f.file.Seek(off, io.SeekStart); err != nil { + return 0, err + } + return f.file.Read(p) +} + +func (f *ossFile) Stat() (fs.FileInfo, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.file.Stat() +}