diff --git a/go.mod b/go.mod index 43749d20e3d66..1c30716f61c8f 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/errors v1.9.1 + github.com/colinmarc/hdfs/v2 v2.4.0 github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 github.com/containerd/cgroups/v3 v3.0.1 github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e @@ -49,6 +50,7 @@ require ( github.com/hashicorp/memberlist v0.3.1 github.com/hayageek/threadsafe v1.0.1 github.com/itchyny/gojq v0.12.16 + github.com/jcmturner/gokrb5/v8 v8.4.4 github.com/jhump/protoreflect v1.15.2 github.com/json-iterator/go v1.1.12 github.com/lni/dragonboat/v4 v4.0.0-20220815145555-6f622e8bcbef @@ -112,8 +114,14 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/gosimple/slug v1.13.1 // indirect github.com/gosimple/unidecode v1.0.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/ianlancetaylor/demangle v0.0.0-20240805132620-81f5be970eca // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/goidentity/v6 v6.0.1 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v1.1.0 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect diff --git a/go.sum b/go.sum index 5c3e9a3c484de..e6f421c3568bd 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,8 @@ github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= +github.com/colinmarc/hdfs/v2 v2.4.0 h1:v6R8oBx/Wu9fHpdPoJJjpGSUxo8NhHIwrwsfhFvU9W0= +github.com/colinmarc/hdfs/v2 v2.4.0/go.mod h1:0NAO+/3knbMx6+5pCv+Hcbaz4xn/Zzbn9+WIib2rKVI= github.com/compose-spec/compose-go/v2 v2.0.0-rc.2 h1:eJ01FpliL/02KvsaPyH1bSLbM1S70yWQUojHVRbyvy4= github.com/compose-spec/compose-go/v2 v2.0.0-rc.2/go.mod h1:IVsvFyGVhw4FASzUtlWNVaAOhYmakXAFY9IlZ7LAuD8= github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 h1:NbOku86JJlsRJPJKE0snNsz6D1Qr4j5VR/lticrLZrY= @@ -372,6 +374,10 @@ github.com/gopherjs/gopherjs v1.12.80 h1:aC68NT6VK715WeUapxcPSFq/a3gZdS32HdtghdO github.com/gopherjs/gopherjs v1.12.80/go.mod h1:d55Q4EjGQHeJVms+9LGtXul6ykz5Xzx1E1gaXQXdimY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gosimple/slug v1.13.1 h1:bQ+kpX9Qa6tHRaK+fZR0A0M2Kd7Pa5eHPPsb1JpHD+Q= @@ -396,8 +402,10 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= -github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -433,6 +441,18 @@ github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/protoreflect v1.15.2 h1:7YppbATX94jEt9KLAc5hICx4h6Yt3SaavhQRsIUEHP0= github.com/jhump/protoreflect v1.15.2/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -899,6 +919,7 @@ golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= @@ -944,6 +965,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -952,6 +974,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= diff --git a/pkg/fileservice/hdfs.go b/pkg/fileservice/hdfs.go new file mode 100644 index 0000000000000..ec16249eb545e --- /dev/null +++ b/pkg/fileservice/hdfs.go @@ -0,0 +1,379 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "context" + "crypto/rand" + "errors" + "io" + "iter" + "net/url" + "os" + "path" + "strings" + "time" + + "github.com/colinmarc/hdfs/v2" + krb "github.com/jcmturner/gokrb5/v8/client" + krbconfig "github.com/jcmturner/gokrb5/v8/config" + krbkeytab "github.com/jcmturner/gokrb5/v8/keytab" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/perfcounter" + "go.uber.org/zap" +) + +type HDFS struct { + name string + client *hdfs.Client + perfCounterSets []*perfcounter.CounterSet + dir string +} + +func NewHDFS( + ctx context.Context, + args ObjectStorageArguments, + perfCounterSets []*perfcounter.CounterSet, +) (*HDFS, error) { + + if err := args.validate(); err != nil { + return nil, err + } + + u, err := url.Parse(args.Endpoint) + if err != nil { + return nil, err + } + + kerberosClient, err := args.getKerberosClient() + if err != nil { + return nil, err + } + + client, err := hdfs.NewClient(hdfs.ClientOptions{ + Addresses: []string{ + u.Host, + }, + User: firstNonZero(args.User, "hadoop"), + NamenodeDialFunc: httpDialer.DialContext, + DatanodeDialFunc: httpDialer.DialContext, + KerberosServicePrincipleName: args.KerberosServicePrincipleName, + KerberosClient: kerberosClient, + }) + if err != nil { + return nil, err + } + + ret := &HDFS{ + name: args.Name, + client: client, + perfCounterSets: perfCounterSets, + dir: path.Join( + "/user", + args.User, + args.KeyPrefix, + ), + } + + if err := ret.client.MkdirAll(ret.dir, 0755); err != nil { + return nil, err + } + + logutil.Info("new object storage", + zap.Any("sdk", "hdfs"), + zap.Any("arguments", args), + ) + + return ret, nil +} + +var _ ObjectStorage = new(HDFS) + +func (h *HDFS) Delete(ctx context.Context, keys ...string) (err error) { + if err := ctx.Err(); err != nil { + return err + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Delete.Add(1) + }, h.perfCounterSets...) + + for _, key := range keys { + _, err := DoWithRetry("HDFS: delete", func() (bool, error) { + return true, h.client.Remove(h.keyToPath(key)) + }, maxRetryAttemps, IsRetryableError) + if err != nil { + logutil.Warn("HDFS: delete file", zap.Error(err)) + } + } + + return nil +} + +func (h *HDFS) keyToPath(key string) string { + return path.Join(h.dir, key) +} + +func (h *HDFS) createTemp() (*hdfs.FileWriter, string, error) { + for { + name := h.keyToPath(rand.Text()) + _, err := h.client.Stat(name) + if err == nil { + // existed + continue + } + file, err := h.client.Create(name) + if err != nil { + return nil, "", err + } + return file, name, nil + } +} + +func (h *HDFS) Exists(ctx context.Context, key string) (bool, error) { + if err := ctx.Err(); err != nil { + return false, err + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Head.Add(1) + }, h.perfCounterSets...) + + _, err := h.client.Stat(h.keyToPath(key)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + return true, nil +} + +func (h *HDFS) List(ctx context.Context, prefix string) iter.Seq2[*DirEntry, error] { + return func(yield func(*DirEntry, error) bool) { + if err := ctx.Err(); err != nil { + yield(nil, err) + return + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.List.Add(1) + }, h.perfCounterSets...) + + dir, prefix := path.Split(prefix) + + f, err := h.client.Open(h.keyToPath(dir)) + if err != nil { + if os.IsNotExist(err) { + return + } + yield(nil, err) + return + } + defer f.Close() + + read: + for { + infos, err := f.Readdir(256) + + for _, info := range infos { + name := info.Name() + if strings.HasSuffix(name, ".mofstemp") { + continue + } + if !strings.HasPrefix(name, prefix) { + continue + } + if !yield(&DirEntry{ + IsDir: info.IsDir(), + Name: path.Join(dir, name), + Size: info.Size(), + }, nil) { + break read + } + } + + if err != nil { + if err == io.EOF { + break read + } + yield(nil, err) + return + + } else if len(infos) == 0 { + // no error, and no infos + break read + } + + } + + } +} + +func (h *HDFS) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Get.Add(1) + }, h.perfCounterSets...) + + f, err := h.client.Open(h.keyToPath(key)) + if err != nil { + if os.IsNotExist(err) { + return nil, moerr.NewFileNotFoundNoCtx(key) + } + return nil, err + } + defer func() { + if err != nil { + err = errors.Join(err, f.Close()) + } + }() + + var reader io.Reader + reader = f + + if min != nil { + pos, err := f.Seek(*min, io.SeekStart) + if err != nil { + return nil, err + } + if pos != *min { + return nil, moerr.NewEmptyRangeNoCtx(key) + } + } + + if max != nil { + limit := *max + if min != nil { + limit -= *min + } + if limit <= 0 { + return nil, moerr.NewEmptyRangeNoCtx(key) + } + reader = io.LimitReader(reader, limit) + } + + return &readCloser{ + r: reader, + closeFunc: f.Close, + }, nil +} + +func (h *HDFS) Stat(ctx context.Context, key string) (size int64, err error) { + if err := ctx.Err(); err != nil { + return 0, err + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Head.Add(1) + }, h.perfCounterSets...) + + stat, err := h.client.Stat(h.keyToPath(key)) + if err != nil { + if os.IsNotExist(err) { + return 0, moerr.NewFileNotFoundNoCtx(key) + } + return 0, err + } + + size = stat.Size() + + return +} + +func (h *HDFS) Write(ctx context.Context, key string, r io.Reader, sizeHint *int64, expire *time.Time) (err error) { + if err := ctx.Err(); err != nil { + return err + } + + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Put.Add(1) + }, h.perfCounterSets...) + + tempFile, tempFilePath, err := h.createTemp() + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tempFile.Close() + _ = h.client.Remove(tempFilePath) + } + }() + + n, err := io.Copy( + tempFile, + r, + ) + if err != nil { + return err + } + + if sizeHint != nil && n != *sizeHint { + return moerr.NewSizeNotMatchNoCtx(key) + } + + if err := tempFile.Close(); err != nil { + return err + } + + filePath := h.keyToPath(key) + err = h.client.MkdirAll(path.Dir(filePath), 0755) + if err != nil { + if !os.IsExist(err) { + return err + } + } + + if err := h.client.Rename(tempFilePath, filePath); err != nil { + return err + } + + return nil +} + +func (o *ObjectStorageArguments) getKerberosClient() (*krb.Client, error) { + + // password + if o.KerberosPassword != "" { + return krb.NewWithPassword( + o.KerberosUsername, + o.KerberosRealm, + o.KerberosPassword, + &krbconfig.Config{}, + ), nil + } + + // keytab + if o.KerberosKeytabPath != "" { + keytab, err := krbkeytab.Load(o.KerberosKeytabPath) + if err != nil { + return nil, err + } + return krb.NewWithKeytab( + o.KerberosUsername, + o.KerberosRealm, + keytab, + &krbconfig.Config{}, + ), nil + } + + return nil, nil +} diff --git a/pkg/fileservice/object_storage_arguments.go b/pkg/fileservice/object_storage_arguments.go index bc592fa4a0c54..f6d011d883cbb 100644 --- a/pkg/fileservice/object_storage_arguments.go +++ b/pkg/fileservice/object_storage_arguments.go @@ -53,6 +53,14 @@ type ObjectStorageArguments struct { RoleSessionName string `json:"-" toml:"role-session-name"` SecurityToken string `json:"-" toml:"security-token"` SessionToken string `json:"-" toml:"session-token"` + + // HDFS + User string `toml:"user"` + KerberosServicePrincipleName string `toml:"kerberos-service-principle-name"` + KerberosUsername string `toml:"kerberos-username"` + KerberosRealm string `toml:"kerberos-realm"` + KerberosPassword string `json:"-" toml:"kerberos-password"` + KerberosKeytabPath string `toml:"kerberos-keytab-path"` } func (o ObjectStorageArguments) String() string { @@ -124,6 +132,20 @@ func (o *ObjectStorageArguments) SetFromString(arguments []string) error { case "token", "session-token": o.SessionToken = value + case "user": + o.User = value + + case "kerberos-service-principle-name": + o.KerberosServicePrincipleName = value + case "kerberos-username": + o.KerberosUsername = value + case "kerberos-realm": + o.KerberosRealm = value + case "kerberos-password": + o.KerberosPassword = value + case "kerberos-keytab-path": + o.KerberosKeytabPath = value + default: return moerr.NewInvalidInputNoCtxf("invalid S3 argument: %s", pair) } diff --git a/pkg/fileservice/object_storage_test.go b/pkg/fileservice/object_storage_test.go index ff721709b80eb..824dabdbacb0e 100644 --- a/pkg/fileservice/object_storage_test.go +++ b/pkg/fileservice/object_storage_test.go @@ -153,6 +153,30 @@ func testObjectStorage[T ObjectStorage]( t.Fatal() } + // list root + n = 0 + for _, err := range storage.List(ctx, "") { + if err != nil { + t.Fatal(err) + } + n++ + } + if n != 2 { + t.Fatal() + } + + // list root + n = 0 + for _, err := range storage.List(ctx, "/") { + if err != nil { + t.Fatal(err) + } + n++ + } + if n != 2 { + t.Fatal() + } + }) t.Run("invalid write length", func(t *testing.T) { @@ -207,6 +231,16 @@ func TestObjectStorages(t *testing.T) { switch { + case strings.HasPrefix(strings.ToLower(args.Endpoint), "hdfs"): + // HDFS + testObjectStorage(t, "hdfs", func(t *testing.T) *HDFS { + storage, err := NewHDFS(context.Background(), args, nil) + if err != nil { + t.Fatal(err) + } + return storage + }) + case args.Endpoint == "disk": // disk testObjectStorage(t, "disk", func(t *testing.T) *diskObjectStorage { diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index 374c3d7ae991d..764b354b4464d 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -81,6 +81,13 @@ func NewS3FS( var err error switch { + case strings.HasPrefix(strings.ToLower(args.Endpoint), "hdfs"): + // HDFS + fs.storage, err = NewHDFS(ctx, args, perfCounterSets) + if err != nil { + return nil, err + } + case args.IsMinio || // 天翼云,使用SignatureV2验证,其他SDK不再支持 strings.Contains(args.Endpoint, "ctyunapi.cn"):