Skip to content

Commit

Permalink
io: support aliyun oss backend
Browse files Browse the repository at this point in the history
Signed-off-by: divinerapier <[email protected]>
  • Loading branch information
divinerapier committed Dec 13, 2024
1 parent c1ffe04 commit 0576e0f
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 0 deletions.
22 changes: 22 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"net/url"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -32,6 +33,18 @@ type CatalogType string

type AwsProperties map[string]string

// OSSConfig contains configuration for accessing OSS object storage
type OSSConfig struct {
// Endpoint specifies the OSS service endpoint URL
Endpoint string
// AccessKey is the access key ID for OSS authentication
AccessKey string
// SecretKey is the secret access key for OSS authentication
SecretKey string
// SignatureVersion is the signature version for OSS authentication
SignatureVersion oss.SignatureVersionType
}

const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
Expand Down Expand Up @@ -122,12 +135,21 @@ func WithPrefix(prefix string) Option[RestCatalog] {
}
}

// WithOSSConfig sets the OSS configuration for the catalog.
func WithOSSConfig(cfg OSSConfig) Option[RestCatalog] {
return func(o *options) {
o.ossConfig = cfg
}
}

type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config
awsProperties AwsProperties

ossConfig OSSConfig

tlsConfig *tls.Config
credential string
oauthToken string
Expand Down
13 changes: 13 additions & 0 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"maps"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -56,6 +57,11 @@ const (
keyRestSigV4Region = "rest.signing-region"
keyRestSigV4Service = "rest.signing-name"
keyAuthUrl = "rest.authorization-url"

keyOSSAccessKey = "client.oss-access-key"
keyOSSSecretKey = "client.oss-secret-key"
keyOSSEndpoint = "client.oss-endpoint"
keyOSSSignatureVersion = "client.oss-signature-version"
)

var (
Expand Down Expand Up @@ -356,6 +362,12 @@ func toProps(o *options) iceberg.Properties {
if o.authUri != nil {
setIf(keyAuthUrl, o.authUri.String())
}

setIf(keyOSSAccessKey, o.ossConfig.AccessKey)
setIf(keyOSSSecretKey, o.ossConfig.SecretKey)
setIf(keyOSSEndpoint, o.ossConfig.Endpoint)
// Convert OSS signature version from enum to string representation
setIf(keyOSSSignatureVersion, strconv.FormatInt(int64(o.ossConfig.SignatureVersion), 10))
return props
}

Expand Down Expand Up @@ -515,6 +527,7 @@ func (r *RestCatalog) fetchConfig(opts *options) (*options, error) {

o := fromProps(cfg)
o.awsConfig = opts.awsConfig
o.ossConfig = opts.ossConfig
o.tlsConfig = opts.tlsConfig

if uri, ok := cfg["uri"]; ok {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go 1.23
toolchain go1.23.2

require (
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
github.com/aws/aws-sdk-go-v2 v1.32.5
github.com/aws/aws-sdk-go-v2/config v1.28.5
Expand Down Expand Up @@ -97,6 +98,7 @@ require (
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwld
github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 h1:grJyLSdRJtfxKKhCTWSeJhnOQsp2WoLNdK8XA5FE9oo=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A=
Expand Down Expand Up @@ -243,6 +245,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY=
golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
2 changes: 2 additions & 0 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
switch parsed.Scheme {
case "s3", "s3a", "s3n":
return createS3FileIO(parsed, props)
case "oss":
return createOSSFileIO(parsed, props)
case "file", "":
return LocalFS{}, nil
default:
Expand Down
173 changes: 173 additions & 0 deletions io/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package io

import (
"context"
"fmt"
"io"
"io/fs"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Constants for OSS configuration options
const (
OSSAccessKey = "client.oss-access-key"
OSSSecretKey = "client.oss-secret-key"
OSSEndpoint = "client.oss-endpoint"
OSSSignatureVersion = "client.oss-signature-version"
)

func createOSSFileIO(parsed *url.URL, props map[string]string) (IO, error) {
endpoint, ok := props[OSSEndpoint]
if !ok {
endpoint = os.Getenv("OSS_ENDPOINT")
}
if endpoint == "" {
return nil, fmt.Errorf("oss endpoint must be specified")
}

accessKey := props[OSSAccessKey]
secretKey := props[OSSSecretKey]

provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey)

cfg := oss.LoadDefaultConfig().
WithRetryMaxAttempts(3).
WithCredentialsProvider(provider).
WithEndpoint(endpoint).
WithSignatureVersion(parseSignatureVersion(props[OSSSignatureVersion])).
WithUsePathStyle(true).
WithConnectTimeout(10 * time.Second).
WithReadWriteTimeout(time.Minute)

client := oss.NewClient(cfg)

ossFS := &ossFS{
client: client,
bucket: parsed.Host,
}

preprocess := func(n string) string {
_, after, found := strings.Cut(n, "://")
if found {
n = after
}
return strings.TrimPrefix(n, parsed.Host)
}

return FSPreProcName(ossFS, preprocess), nil
}

// parseSignatureVersion converts string version to oss.SignatureVersionType
// "0" -> 0 (v1)
// "1" -> 1 (v4)
// defaults to 1 (v4) for unknown values
func parseSignatureVersion(version string) oss.SignatureVersionType {
switch version {
case "0": // v1
return 0
case "1": // v4
return 1
default:
return 1
}
}

type ossFS struct {
client *oss.Client
bucket string
}

// Open implements fs.FS
func (o *ossFS) Open(name string) (fs.File, error) {
if !fs.ValidPath(name) {
return nil, &os.PathError{Op: "open", Path: name, Err: os.ErrInvalid}
}
if name == "." {
return &ossFile{
name: name,
}, nil
}
name = strings.TrimPrefix(name, "/")

file, err := o.client.OpenFile(context.Background(), o.bucket, name)
if err != nil {
return nil, err
}

return &ossFile{
file: file,
name: name,
}, nil
}

type ossFile struct {
mutex sync.Mutex
file *oss.ReadOnlyFile
name string
}

// Read implements io.Reader
func (f *ossFile) Read(p []byte) (int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Read(p)
}

// Seek implements io.Seeker
func (f *ossFile) Seek(offset int64, whence int) (int64, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Seek(offset, whence)
}

// Close implements io.Closer
func (f *ossFile) Close() error {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Close()
}

// ReadAt implements io.ReaderAt
func (f *ossFile) ReadAt(p []byte, off int64) (n int, err error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if _, err := f.file.Seek(off, io.SeekStart); err != nil {
return 0, err
}
return f.file.Read(p)
}

func (f *ossFile) Stat() (fs.FileInfo, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Stat()
}

0 comments on commit 0576e0f

Please sign in to comment.