From 1b0315cbbfeecce95c5fc62cd458db30a1c2d6d4 Mon Sep 17 00:00:00 2001 From: meoww-bot Date: Thu, 2 May 2024 00:56:57 +0800 Subject: [PATCH 1/5] feat: add quota usage --- quota_usage.go | 71 +++++++++++++++++++++++++++++++++++++++++++++ quota_usage_test.go | 58 ++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 quota_usage.go create mode 100644 quota_usage_test.go diff --git a/quota_usage.go b/quota_usage.go new file mode 100644 index 00000000..c8f3440d --- /dev/null +++ b/quota_usage.go @@ -0,0 +1,71 @@ +package hdfs + +import ( + "os" + + hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" + "google.golang.org/protobuf/proto" +) + +// TODO: getTypesQuotaUsage +// https://github.com/apache/hadoop/blob/daafc8a0b849ffdf851c6a618684656925f1df76/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/QuotaUsage.java#L348C20-L348C38 + +// ContentSummary represents a set of information about a file or directory in +// HDFS. It's provided directly by the namenode, and has no unix filesystem +// analogue. + +type QuotaUsage struct { + name string + quotaUsage *hdfs.QuotaUsageProto +} + +// GetQuotaUsage returns a QuotaUsage representing the named file or +// directory. The quota usage contains information about the entire tree rooted +// in the named file +func (c *Client) GetQuotaUsage(name string) (*QuotaUsage, error) { + qu, err := c.getQuotaUsage(name) + if err != nil { + err = &os.PathError{ + Op: "quota usage", + Path: name, + Err: interpretException(err)} + } + + return qu, err +} + +func (c *Client) getQuotaUsage(name string) (*QuotaUsage, error) { + req := &hdfs.GetQuotaUsageRequestProto{Path: proto.String(name)} + resp := &hdfs.GetQuotaUsageResponseProto{} + + err := c.namenode.Execute("getQuotaUsage", req, resp) + if err != nil { + return nil, err + } + + return &QuotaUsage{name, resp.GetUsage()}, nil +} + +// FileAndDirectoryCount returns the total file count of the named path, including any subdirectories. +func (qu *QuotaUsage) FileAndDirectoryCount() int64 { + return int64(qu.quotaUsage.GetFileAndDirectoryCount()) +} + +// NameQuota returns the HDFS configured "name quota" for the named path. The +// name quota is a hard limit on the number of directories and files inside a +// directory; see http://goo.gl/sOSJmJ for more information. +func (qu *QuotaUsage) Quota() int64 { + return int64(qu.quotaUsage.GetQuota()) +} + +// SpaceQuota returns the HDFS configured "space quota" for the named path. +// The space quota is a hard limit on the number of bytes used by files in the tree rooted at that directory. +// see http://goo.gl/sOSJmJ for more information. +func (qu *QuotaUsage) SpaceQuota() int64 { + return int64(qu.quotaUsage.GetSpaceQuota()) +} + +// SpaceConsumed returns the actual space consumed for the named path in HDFS. +func (qu *QuotaUsage) SpaceConsumed() int64 { + return int64(qu.quotaUsage.GetSpaceConsumed()) +} diff --git a/quota_usage_test.go b/quota_usage_test.go new file mode 100644 index 00000000..4b696dff --- /dev/null +++ b/quota_usage_test.go @@ -0,0 +1,58 @@ +package hdfs + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQuotaUsageDir(t *testing.T) { + client := getClient(t) + + baleet(t, "/_test/dirforcs") + mkdirp(t, "/_test/dirforcs/1") + mkdirp(t, "/_test/dirforcs/2") + touch(t, "/_test/dirforcs/foo") + touch(t, "/_test/dirforcs/1/bar") + + resp, err := client.GetQuotaUsage("/_test/dirforcs") + require.NoError(t, err) + + assert.EqualValues(t, 4, resp.FileAndDirectoryCount()) + assert.True(t, resp.Quota() < 0) + assert.True(t, resp.SpaceQuota() < 0) + assert.True(t, resp.SpaceConsumed() > 0) + +} + +func TestQuotaUsageFile(t *testing.T) { + client := getClient(t) + + resp, err := client.GetQuotaUsage("/_test/foo.txt") + require.NoError(t, err) + + assert.EqualValues(t, 4, resp.FileAndDirectoryCount()) + assert.True(t, resp.Quota() < 0) + assert.True(t, resp.SpaceQuota() < 0) + assert.True(t, resp.SpaceConsumed() > 0) +} + +func TestQuotaUsageNonExistent(t *testing.T) { + client := getClient(t) + + resp, err := client.GetQuotaUsage("/_test/nonexistent") + assertPathError(t, err, "quota usage", "/_test/nonexistent", os.ErrNotExist) + assert.Nil(t, resp) +} + +func TestQuotaUsageDirWithoutPermission(t *testing.T) { + client2 := getClientForUser(t, "gohdfs2") + + mkdirpMask(t, "/_test/accessdenied", 0700) + touchMask(t, "/_test/accessdenied/foo", 0600) + + _, err := client2.GetQuotaUsage("/_test/accessdenied/foo") + assertPathError(t, err, "quota usage", "/_test/accessdenied/foo", os.ErrPermission) +} From d4eb32ffe9fe49a111fd62eeb2c3504a0fe72ba9 Mon Sep 17 00:00:00 2001 From: meoww-bot Date: Thu, 2 May 2024 00:59:12 +0800 Subject: [PATCH 2/5] feat: add count cmd --- README.md | 34 +++++++------ cmd/hdfs/count.go | 121 ++++++++++++++++++++++++++++++++++++++++++++++ cmd/hdfs/main.go | 8 +++ cmd/hdfs/util.go | 25 ++++++++-- 4 files changed, 169 insertions(+), 19 deletions(-) create mode 100644 cmd/hdfs/count.go diff --git a/README.md b/README.md index 154e511d..4fcabec5 100644 --- a/README.md +++ b/README.md @@ -40,21 +40,25 @@ verbs: The flags available are a subset of the POSIX ones, but should behave similarly. Valid commands: - ls [-lah] [FILE]... - rm [-rf] FILE... - mv [-fT] SOURCE... DEST - mkdir [-p] FILE... - touch [-amc] FILE... - chmod [-R] OCTAL-MODE FILE... - chown [-R] OWNER[:GROUP] FILE... - cat SOURCE... - head [-n LINES | -c BYTES] SOURCE... - tail [-n LINES | -c BYTES] SOURCE... - du [-sh] FILE... - checksum FILE... - get SOURCE [DEST] - getmerge SOURCE DEST - put SOURCE DEST + ls [-lahR] [FILE]... + rm [-rf] FILE... + mv [-nT] SOURCE... DEST + mkdir [-p] FILE... + touch [-c] FILE... + chmod [-R] OCTAL-MODE FILE... + chown [-R] OWNER[:GROUP] FILE... + cat SOURCE... + head [-n LINES | -c BYTES] SOURCE... + tail [-n LINES | -c BYTES] SOURCE... + test [-defsz] FILE... + du [-sh] FILE... + checksum FILE... + get SOURCE [DEST] + getmerge SOURCE DEST + put SOURCE DEST + df [-h] + truncate SIZE FILE + count [-q] [-h] ... Since it doesn't have to wait for the JVM to start up, it's also a lot faster `hadoop -fs`: diff --git a/cmd/hdfs/count.go b/cmd/hdfs/count.go new file mode 100644 index 00000000..54232a81 --- /dev/null +++ b/cmd/hdfs/count.go @@ -0,0 +1,121 @@ +package main + +import ( + "fmt" + "os" + "text/tabwriter" +) + +var ( + quotaHeaderFields = []string{"QUOTA", "REM_QUOTA", "SPACE_QUOTA", "REM_SPACE_QUOTA"} + summaryHeaderFields = []string{"DIR_COUNT", "FILE_COUNT", "CONTENT_SIZE", "PATHNAME"} + + allHeaderFields = append(quotaHeaderFields, summaryHeaderFields...) + + showQuotasFormat = "%v \t%v \t%v \t%v \t%v \t%v \t%v \t%s\n" + summaryFormat = "%v \t%v \t%v \t%s\n" + + quotaNone = "none" + quotaInf = "inf" +) + +func count(args []string, showQuotas, humanReadable bool) { + if len(args) == 0 { + fatalWithUsage() + } + + paths, client, err := getClientAndExpandedPaths(args) + + if err != nil { + fatal(err) + } + + if len(paths) == 0 { + fatalWithUsage() + } + + tw := tabwriter.NewWriter(os.Stdout, 8, 8, 0, ' ', 0) + var headerStr string + if showQuotas { + headerStr = joinHeaders(allHeaderFields) + + } else { + headerStr = joinHeaders(summaryHeaderFields) + } + defer tw.Flush() + + fmt.Fprintf(tw, headerStr) + + for _, p := range paths { + + var ( + size, spaceQuota, remSpaceQuota int64 + dirCount, fileCount, nameQuota, remNameQuota int + quotaStr = quotaNone + quotaRemStr = quotaInf + spaceQuotaStr = quotaNone + spaceQuotaRemStr = quotaInf + ) + + cs, err := client.GetContentSummary(p) + if err != nil { + fmt.Fprintln(tw, err) + status = 1 + continue + } + + qu, err := client.GetQuotaUsage(p) + if err != nil { + fmt.Fprintln(tw, err) + status = 1 + continue + } + + size = cs.Size() + + dirCount = cs.DirectoryCount() + fileCount = cs.FileCount() + nameQuota = cs.NameQuota() + spaceQuota = cs.SpaceQuota() + + remNameQuota = nameQuota - int(qu.FileAndDirectoryCount()) + + remSpaceQuota = spaceQuota - qu.SpaceConsumed() + + if nameQuota > 0 { + quotaStr = formatSize(uint64(nameQuota), humanReadable) + quotaRemStr = formatSize(uint64(remNameQuota), humanReadable) + } + + if spaceQuota >= 0 { + spaceQuotaStr = formatSize(uint64(spaceQuota), humanReadable) + spaceQuotaRemStr = formatSize(uint64(remSpaceQuota), humanReadable) + } + + sizeStr := formatSize(uint64(size), humanReadable) + + if showQuotas { + + fmt.Fprintf(tw, showQuotasFormat, + quotaStr, + quotaRemStr, + spaceQuotaStr, + spaceQuotaRemStr, + dirCount, + fileCount, + sizeStr, + p, + ) + + } else { + + fmt.Fprintf(tw, summaryFormat, + dirCount, + fileCount, + sizeStr, + p, + ) + + } + } +} diff --git a/cmd/hdfs/main.go b/cmd/hdfs/main.go index e8479360..f2659028 100644 --- a/cmd/hdfs/main.go +++ b/cmd/hdfs/main.go @@ -40,6 +40,7 @@ Valid commands: put SOURCE DEST df [-h] truncate SIZE FILE + count [-q] [-h] ... `, os.Args[0]) lsOpts = getopt.New() @@ -83,6 +84,10 @@ Valid commands: dus = duOpts.Bool('s') duh = duOpts.Bool('h') + countOpts = getopt.New() + countq = countOpts.Bool('q') + counth = countOpts.Bool('h') + getmergeOpts = getopt.New() getmergen = getmergeOpts.Bool('n') @@ -146,6 +151,9 @@ func main() { case "du": duOpts.Parse(argv) du(duOpts.Args(), *dus, *duh) + case "count": + countOpts.Parse(argv) + count(countOpts.Args(), *countq, *counth) case "checksum": checksum(argv[1:]) case "get": diff --git a/cmd/hdfs/util.go b/cmd/hdfs/util.go index 0e97e041..c74c2650 100644 --- a/cmd/hdfs/util.go +++ b/cmd/hdfs/util.go @@ -2,19 +2,36 @@ package main import ( "fmt" + "strconv" ) func formatBytes(i uint64) string { switch { - case i > (1024 * 1024 * 1024 * 1024): + case i >= (1024 * 1024 * 1024 * 1024): return fmt.Sprintf("%#.1fT", float64(i)/1024/1024/1024/1024) - case i > (1024 * 1024 * 1024): + case i >= (1024 * 1024 * 1024): return fmt.Sprintf("%#.1fG", float64(i)/1024/1024/1024) - case i > (1024 * 1024): + case i >= (1024 * 1024): return fmt.Sprintf("%#.1fM", float64(i)/1024/1024) - case i > 1024: + case i >= 1024: return fmt.Sprintf("%#.1fK", float64(i)/1024) default: return fmt.Sprintf("%dB", i) } } + +func formatSize(size uint64, humanReadable bool) string { + if humanReadable { + return formatBytes(size) + } else { + return strconv.FormatUint(size, 10) + } +} + +func joinHeaders(headers []string) string { + result := "" + for _, header := range headers { + result += fmt.Sprintf("%s \t", header) + } + return result + "\n" +} From fb86470efaf56d82e7e29688a3f210dd7b79fd8c Mon Sep 17 00:00:00 2001 From: meoww-bot Date: Thu, 2 May 2024 13:49:53 +0800 Subject: [PATCH 3/5] Fix install-hdfs to download archive version --- .github/scripts/install-hdfs.sh | 2 +- quota_usage_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/scripts/install-hdfs.sh b/.github/scripts/install-hdfs.sh index 77d8803e..f1ddb33f 100755 --- a/.github/scripts/install-hdfs.sh +++ b/.github/scripts/install-hdfs.sh @@ -70,7 +70,7 @@ EOF done fi -URL="https://dlcdn.apache.org/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz" +URL="https://archive.apache.org/dist/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz" echo "Downloading $URL" curl -o hadoop.tar.gz $URL tar zxf hadoop.tar.gz diff --git a/quota_usage_test.go b/quota_usage_test.go index 4b696dff..76bc9a54 100644 --- a/quota_usage_test.go +++ b/quota_usage_test.go @@ -20,10 +20,10 @@ func TestQuotaUsageDir(t *testing.T) { resp, err := client.GetQuotaUsage("/_test/dirforcs") require.NoError(t, err) - assert.EqualValues(t, 4, resp.FileAndDirectoryCount()) + assert.EqualValues(t, 5, resp.FileAndDirectoryCount()) assert.True(t, resp.Quota() < 0) assert.True(t, resp.SpaceQuota() < 0) - assert.True(t, resp.SpaceConsumed() > 0) + assert.True(t, resp.SpaceConsumed() == 0) } @@ -33,7 +33,7 @@ func TestQuotaUsageFile(t *testing.T) { resp, err := client.GetQuotaUsage("/_test/foo.txt") require.NoError(t, err) - assert.EqualValues(t, 4, resp.FileAndDirectoryCount()) + assert.EqualValues(t, 1, resp.FileAndDirectoryCount()) assert.True(t, resp.Quota() < 0) assert.True(t, resp.SpaceQuota() < 0) assert.True(t, resp.SpaceConsumed() > 0) From 089a59de8f0e073a809f0b3c256fe9bb4f6c9c16 Mon Sep 17 00:00:00 2001 From: meoww-bot Date: Thu, 2 May 2024 13:54:23 +0800 Subject: [PATCH 4/5] Fix interpretException --- error.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/error.go b/error.go index 4bdf3648..c226b79b 100644 --- a/error.go +++ b/error.go @@ -12,6 +12,7 @@ const ( fileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException" alreadyBeingCreatedException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" illegalArgumentException = "org.apache.hadoop.HadoopIllegalArgumentException" + nullPointerException = "java.lang.NullPointerException" ) // Error represents a remote java exception from an HDFS namenode or datanode. @@ -53,6 +54,8 @@ func interpretException(err error) error { return os.ErrExist case illegalArgumentException: return os.ErrInvalid + case nullPointerException: + return os.ErrNotExist default: return err } From e24bda48e33e3eb83776d34b8845ce3b601ea065 Mon Sep 17 00:00:00 2001 From: meoww-bot Date: Thu, 2 May 2024 21:55:26 +0800 Subject: [PATCH 5/5] Fix doc --- quota_usage.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/quota_usage.go b/quota_usage.go index c8f3440d..56eea68d 100644 --- a/quota_usage.go +++ b/quota_usage.go @@ -10,10 +10,9 @@ import ( // TODO: getTypesQuotaUsage // https://github.com/apache/hadoop/blob/daafc8a0b849ffdf851c6a618684656925f1df76/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/QuotaUsage.java#L348C20-L348C38 -// ContentSummary represents a set of information about a file or directory in +// QuotaUsage represents quota usage about a file or directory in // HDFS. It's provided directly by the namenode, and has no unix filesystem // analogue. - type QuotaUsage struct { name string quotaUsage *hdfs.QuotaUsageProto