Skip to content

Commit

Permalink
fileservice: add HDFS
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee committed Dec 26, 2024
1 parent f7b6950 commit 65f66fa
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 1 deletion.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
github.com/cespare/xxhash/v2 v2.3.0
github.com/cockroachdb/errors v1.9.1
github.com/colinmarc/hdfs/v2 v2.4.0
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0
github.com/containerd/cgroups/v3 v3.0.1
github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e
Expand Down Expand Up @@ -110,7 +111,14 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/gosimple/slug v1.13.1 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
Expand Down
25 changes: 24 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
github.com/colinmarc/hdfs/v2 v2.4.0 h1:v6R8oBx/Wu9fHpdPoJJjpGSUxo8NhHIwrwsfhFvU9W0=
github.com/colinmarc/hdfs/v2 v2.4.0/go.mod h1:0NAO+/3knbMx6+5pCv+Hcbaz4xn/Zzbn9+WIib2rKVI=
github.com/compose-spec/compose-go/v2 v2.0.0-rc.2 h1:eJ01FpliL/02KvsaPyH1bSLbM1S70yWQUojHVRbyvy4=
github.com/compose-spec/compose-go/v2 v2.0.0-rc.2/go.mod h1:IVsvFyGVhw4FASzUtlWNVaAOhYmakXAFY9IlZ7LAuD8=
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 h1:NbOku86JJlsRJPJKE0snNsz6D1Qr4j5VR/lticrLZrY=
Expand Down Expand Up @@ -368,6 +370,10 @@ github.com/gopherjs/gopherjs v1.12.80 h1:aC68NT6VK715WeUapxcPSFq/a3gZdS32HdtghdO
github.com/gopherjs/gopherjs v1.12.80/go.mod h1:d55Q4EjGQHeJVms+9LGtXul6ykz5Xzx1E1gaXQXdimY=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosimple/slug v1.13.1 h1:bQ+kpX9Qa6tHRaK+fZR0A0M2Kd7Pa5eHPPsb1JpHD+Q=
Expand All @@ -392,8 +398,10 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
Expand Down Expand Up @@ -427,6 +435,18 @@ github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g=
github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.15.2 h1:7YppbATX94jEt9KLAc5hICx4h6Yt3SaavhQRsIUEHP0=
github.com/jhump/protoreflect v1.15.2/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down Expand Up @@ -891,6 +911,7 @@ golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
Expand Down Expand Up @@ -936,6 +957,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand All @@ -944,6 +966,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
Expand Down
298 changes: 298 additions & 0 deletions pkg/fileservice/hdfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileservice

import (
"context"
"io"
"iter"
"net/url"
"os"
"path"
"strings"
"time"

"github.com/colinmarc/hdfs/v2"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"go.uber.org/zap"
)

type HDFS struct {
name string
client *hdfs.Client
perfCounterSets []*perfcounter.CounterSet
keyPrefix string
}

func NewHDFS(
ctx context.Context,
args ObjectStorageArguments,
perfCounterSets []*perfcounter.CounterSet,
) (*HDFS, error) {

if err := args.validate(); err != nil {
return nil, err
}

u, err := url.Parse(args.Endpoint)
if err != nil {
return nil, err
}

client, err := hdfs.NewClient(hdfs.ClientOptions{
Addresses: []string{
u.Host,
},
User: firstNonZero(args.User, "hadoop"),
NamenodeDialFunc: httpDialer.DialContext,
DatanodeDialFunc: httpDialer.DialContext,
})
if err != nil {
return nil, err
}

logutil.Info("new object storage",
zap.Any("sdk", "hdfs"),
zap.Any("arguments", args),
)

return &HDFS{
name: args.Name,
client: client,
perfCounterSets: perfCounterSets,
keyPrefix: path.Clean(args.KeyPrefix),
}, nil
}

var _ ObjectStorage = new(HDFS)

func (h *HDFS) Delete(ctx context.Context, keys ...string) (err error) {
if err := ctx.Err(); err != nil {
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Delete.Add(1)
}, h.perfCounterSets...)

for _, key := range keys {
_, err := DoWithRetry("HDFS: delete", func() (bool, error) {
return true, h.client.Remove(h.keyToPath(key))
}, maxRetryAttemps, IsRetryableError)
if err != nil {
logutil.Warn("HDFS: delete file", zap.Error(err))
}
}

return nil
}

func (h *HDFS) keyToPath(key string) string {
return path.Join(h.keyPrefix, key)
}

func (h *HDFS) Exists(ctx context.Context, key string) (bool, error) {
if err := ctx.Err(); err != nil {
return false, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Head.Add(1)
}, h.perfCounterSets...)

_, err := h.client.Stat(h.keyToPath(key))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

return true, nil
}

func (h *HDFS) List(ctx context.Context, prefix string) iter.Seq2[*DirEntry, error] {
return func(yield func(*DirEntry, error) bool) {
if err := ctx.Err(); err != nil {
yield(nil, err)
return
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.List.Add(1)
}, h.perfCounterSets...)

dir, prefix := path.Split(prefix)

f, err := h.client.Open(h.keyToPath(dir))
if err != nil {
if os.IsNotExist(err) {
return
}
yield(nil, err)
return
}
defer f.Close()

read:
for {
infos, err := f.Readdir(256)

for _, info := range infos {
name := info.Name()
if strings.HasSuffix(name, ".mofstemp") {
continue
}
if !strings.HasPrefix(name, prefix) {
continue
}
if !yield(&DirEntry{
IsDir: info.IsDir(),
Name: path.Join(dir, name),
Size: info.Size(),
}, nil) {
break read
}
}

if err != nil {
if err == io.EOF {
break read
}
yield(nil, err)
return
}

}

}
}

func (h *HDFS) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error) {
if err := ctx.Err(); err != nil {
return nil, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Get.Add(1)
}, h.perfCounterSets...)

f, err := h.client.Open(h.keyToPath(key))
if err != nil {
if os.IsNotExist(err) {
return nil, moerr.NewFileNotFoundNoCtx(key)
}
return nil, err
}

var reader io.Reader
reader = f

if min != nil {
pos, err := f.Seek(*min, io.SeekStart)
if err != nil {
return nil, err
}
if pos != *min {
return nil, moerr.NewEmptyRangeNoCtx(key)
}
}

if max != nil {
limit := *max
if min != nil {
limit -= *min
}
if limit <= 0 {
return nil, moerr.NewEmptyRangeNoCtx(key)
}
reader = io.LimitReader(reader, limit)
}

return &readCloser{
r: reader,
closeFunc: f.Close,
}, nil
}

func (h *HDFS) Stat(ctx context.Context, key string) (size int64, err error) {
if err := ctx.Err(); err != nil {
return 0, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Head.Add(1)
}, h.perfCounterSets...)

stat, err := h.client.Stat(h.keyToPath(key))
if err != nil {
if os.IsNotExist(err) {
return 0, moerr.NewFileNotFoundNoCtx(key)
}
return 0, err
}

size = stat.Size()

return
}

func (h *HDFS) Write(ctx context.Context, key string, r io.Reader, sizeHint *int64, expire *time.Time) (err error) {
if err := ctx.Err(); err != nil {
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Put.Add(1)
}, h.perfCounterSets...)

tempFilePath := h.keyToPath(key) + ".mofstemp"
tempFile, err := h.client.Create(tempFilePath)
if err != nil {
return err
}

n, err := io.Copy(
tempFile,
r,
)
if err != nil {
return err
}

if sizeHint != nil && n != *sizeHint {
return moerr.NewSizeNotMatchNoCtx(key)
}

if err := tempFile.Close(); err != nil {
return err
}

filePath := h.keyToPath(key)
err = h.client.MkdirAll(path.Dir(filePath), 0755)
if err != nil {
if !os.IsExist(err) {
return err
}
}

if err := os.Rename(tempFilePath, filePath); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 65f66fa

Please sign in to comment.