Skip to content

Commit

Permalink
Merge pull request #8409 from dolthub/macneale4/fsck
Browse files Browse the repository at this point in the history
Introduction of `dolt fsck` support.
  • Loading branch information
macneale4 authored Oct 8, 2024
2 parents 2c8007b + c31524a commit 46d9f7c
Show file tree
Hide file tree
Showing 33 changed files with 559 additions and 1 deletion.
1 change: 1 addition & 0 deletions go/cmd/dolt/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
PasswordFlag = "password"
PortFlag = "port"
PruneFlag = "prune"
QuietFlag = "quiet"
RemoteParam = "remote"
SetUpstreamFlag = "set-upstream"
ShallowFlag = "shallow"
Expand Down
122 changes: 122 additions & 0 deletions go/cmd/dolt/commands/fsck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"context"

"github.com/fatih/color"

"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
)

type FsckCmd struct{}

var _ cli.Command = FsckCmd{}

func (cmd FsckCmd) Description() string {
return "Verifies the contents of the database are not corrupted."
}

var fsckDocs = cli.CommandDocumentationContent{
ShortDesc: "Verifies the contents of the database are not corrupted.",
LongDesc: "Verifies the contents of the database are not corrupted.",
Synopsis: []string{
"[--quiet]",
},
}

func (cmd FsckCmd) Docs() *cli.CommandDocumentation {
return cli.NewCommandDocumentation(fsckDocs, cmd.ArgParser())
}

func (cmd FsckCmd) ArgParser() *argparser.ArgParser {
ap := argparser.NewArgParserWithMaxArgs(cmd.Name(), 0)
ap.SupportsFlag(cli.QuietFlag, "", "Don't show progress. Just print final report.")

return ap
}

func (cmd FsckCmd) Name() string {
return "fsck"
}

func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv, _ cli.CliContext) int {
ap := cmd.ArgParser()
apr, _, terminate, status := ParseArgsOrPrintHelp(ap, commandStr, args, fsckDocs)
if terminate {
return status
}

quiet := apr.Contains(cli.QuietFlag)

progress := make(chan string, 32)
go fsckHandleProgress(ctx, progress, quiet)

var report *doltdb.FSCKReport
terminate = func() bool {
defer close(progress)
var err error
report, err = dEnv.DoltDB.FSCK(ctx, progress)
if err != nil {
cli.PrintErrln(err.Error())
return true
}
// skip printing the report is we were cancelled. Most likely we tripped on the error above first.
select {
case <-ctx.Done():
cli.PrintErrln(ctx.Err().Error())
return true
default:
return false
}
}()
if terminate {
return 1
}

return printFSCKReport(report)
}

func printFSCKReport(report *doltdb.FSCKReport) int {
cli.Printf("Chunks Scanned: %d\n", report.ChunkCount)
if len(report.Problems) == 0 {
cli.Println("No problems found.")
return 0
} else {
for _, e := range report.Problems {
cli.Println(color.RedString("------ Corruption Found ------"))
cli.PrintErrln(e.Error())
}

return 1
}
}

func fsckHandleProgress(ctx context.Context, progress chan string, quiet bool) {
for item := range progress {
if !quiet {
cli.Println(item)
}
select {
case <-ctx.Done():
return
default:
}
}
}
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func getFastforward(row sql.Row, index int) bool {
}

func getHashOf(queryist cli.Queryist, sqlCtx *sql.Context, ref string) (string, error) {
q, err := dbr.InterpolateForDialect("select hashof(?)", []interface{}{ref}, dialect.MySQL)
q, err := dbr.InterpolateForDialect("select dolt_hashof(?)", []interface{}{ref}, dialect.MySQL)
if err != nil {
return "", fmt.Errorf("error interpolating hashof query: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/dolt/dolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var doltSubCommands = []cli.Command{
indexcmds.Commands,
commands.ReadTablesCmd{},
commands.GarbageCollectionCmd{},
commands.FsckCmd{},
commands.FilterBranchCmd{},
commands.MergeBaseCmd{},
commands.RootsCmd{},
Expand Down Expand Up @@ -151,6 +152,7 @@ var commandsWithoutCliCtx = []cli.Command{
&commands.Assist{},
commands.ProfileCmd{},
commands.ArchiveCmd{},
commands.FsckCmd{},
}

var commandsWithoutGlobalArgSupport = []cli.Command{
Expand Down
114 changes: 114 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package doltdb

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/dolthub/go-mysql-server/sql"
Expand Down Expand Up @@ -2045,3 +2048,114 @@ func (ddb *DoltDB) GetStashRootAndHeadCommitAtIdx(ctx context.Context, idx int)
func (ddb *DoltDB) PersistGhostCommits(ctx context.Context, ghostCommits hash.HashSet) error {
return ddb.db.Database.PersistGhostCommitIDs(ctx, ghostCommits)
}

type FSCKReport struct {
ChunkCount uint32
Problems []error
}

// FSCK performs a full file system check on the database. This is currently exposed with the CLI as `dolt fsck`
// The success of failure of the scan are returned in the report as a list of errors. The error returned by this function
// indicates a deeper issue such as having database in an old format.
func (ddb *DoltDB) FSCK(ctx context.Context, progress chan string) (*FSCKReport, error) {
cs := datas.ChunkStoreFromDatabase(ddb.db)

vs := types.NewValueStore(cs)

gs, ok := cs.(*nbs.GenerationalNBS)
if !ok {
return nil, errors.New("FSCK requires a local database")
}

chunkCount, err := gs.OldGen().Count()
if err != nil {
return nil, err
}
chunkCount2, err := gs.NewGen().Count()
if err != nil {
return nil, err
}
chunkCount += chunkCount2
proccessedCnt := int64(0)

var errs []error

decodeMsg := func(chk chunks.Chunk) string {
hrs := ""
val, err := types.DecodeValue(chk, vs)
if err == nil {
hrs = val.HumanReadableString()
} else {
hrs = fmt.Sprintf("Unable to decode value: %s", err.Error())
}
return hrs
}

// Append safely to the slice of errors with a mutex.
errsLock := &sync.Mutex{}
appendErr := func(err error) {
errsLock.Lock()
defer errsLock.Unlock()
errs = append(errs, err)
}

// Callback for validating chunks. This code could be called concurrently, though that is not currently the case.
validationCallback := func(chunk chunks.Chunk) {
chunkOk := true
pCnt := atomic.AddInt64(&proccessedCnt, 1)
h := chunk.Hash()
raw := chunk.Data()
calcChkSum := hash.Of(raw)

if h != calcChkSum {
fuzzyMatch := false
// Special case for the journal chunk source. We may have an address which has 4 null bytes at the end.
if h[hash.ByteLen-1] == 0 && h[hash.ByteLen-2] == 0 && h[hash.ByteLen-3] == 0 && h[hash.ByteLen-4] == 0 {
// Now we'll just verify that the first 16 bytes match.
ln := hash.ByteLen - 4
fuzzyMatch = bytes.Compare(h[:ln], calcChkSum[:ln]) == 0
}
if !fuzzyMatch {
hrs := decodeMsg(chunk)
appendErr(errors.New(fmt.Sprintf("Chunk: %s content hash mismatch: %s\n%s", h.String(), calcChkSum.String(), hrs)))
chunkOk = false
}
}

if chunkOk {
// Round trip validation. Ensure that the top level store returns the same data.
c, err := cs.Get(ctx, h)
if err != nil {
appendErr(errors.New(fmt.Sprintf("Chunk: %s load failed with error: %s", h.String(), err.Error())))
chunkOk = false
} else if bytes.Compare(raw, c.Data()) != 0 {
hrs := decodeMsg(chunk)
appendErr(errors.New(fmt.Sprintf("Chunk: %s read with incorrect ID: %s\n%s", h.String(), c.Hash().String(), hrs)))
chunkOk = false
}
}

percentage := (float64(pCnt) * 100) / float64(chunkCount)
result := fmt.Sprintf("(%4.1f%% done)", percentage)

progStr := "OK: " + h.String()
if !chunkOk {
progStr = "FAIL: " + h.String()
}
progStr = result + " " + progStr
progress <- progStr
}

err = gs.OldGen().IterateAllChunks(ctx, validationCallback)
if err != nil {
return nil, err
}
err = gs.NewGen().IterateAllChunks(ctx, validationCallback)
if err != nil {
return nil, err
}

FSCKReport := FSCKReport{Problems: errs, ChunkCount: chunkCount}

return &FSCKReport, nil
}
7 changes: 7 additions & 0 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ type ChunkStoreGarbageCollector interface {
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error

// Count returns the number of chunks in the store.
Count() (uint32, error)

// IterateAllChunks iterates over all chunks in the store, calling the provided callback for each chunk. This is
// a wrapper over the internal chunkSource.iterateAllChunks() method.
IterateAllChunks(context.Context, func(chunk Chunk)) error
}

type PrefixChunkStore interface {
Expand Down
8 changes: 8 additions & 0 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ LOOP:
return nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
return uint32(len(ms.pending)), nil
}

func (ms *MemoryStoreView) IterateAllChunks(_ context.Context, _ func(Chunk)) error {
panic("runtime error: GetChunkHashes should never be called on the MemoryStoreView")
}

func (ms *MemoryStoreView) Stats() interface{} {
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []
return collector.MarkAndSweepChunks(ctx, hashes, collector)
}

func (s *TestStoreView) Count() (uint32, error) {
panic("currently unused")
}

func (s *TestStoreView) IterateAllChunks(_ context.Context, _ func(Chunk)) error {
panic("currently unused")
}

func (s *TestStoreView) Reads() int {
reads := atomic.LoadInt32(&s.reads)
return int(reads)
Expand Down
25 changes: 25 additions & 0 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nbs

import (
"context"
"encoding/binary"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -152,3 +153,27 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord)
func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return false, errors.New("Archive chunk source does not support getManyCompressed")
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
addrCount := uint32(len(acs.aRdr.prefixes))
for i := uint32(0); i < addrCount; i++ {
var h hash.Hash
suffix := acs.aRdr.getSuffixByID(i)

// Reconstruct the hash from the prefix and suffix.
binary.BigEndian.PutUint64(h[:uint64Size], acs.aRdr.prefixes[i])
copy(h[uint64Size:], suffix[:])

if ctx.Err() != nil {
return ctx.Err()
}

data, err := acs.aRdr.get(h)
if err != nil {
return err
}

cb(chunks.NewChunkWithHash(h, data))
}
return nil
}
4 changes: 4 additions & 0 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,7 @@ func (tcs *testChunkSource) clone() (chunkSource, error) {
func (tcs *testChunkSource) currentSize() uint64 {
panic("never used")
}

func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
panic("never used")
}
5 changes: 5 additions & 0 deletions go/store/nbs/chunk_source_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package nbs
import (
"context"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -53,3 +54,7 @@ func (csa chunkSourceAdapter) clone() (chunkSource, error) {
}
return &chunkSourceAdapter{tr, csa.h}, nil
}

func (csa chunkSourceAdapter) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
panic("unimplemented")
}
4 changes: 4 additions & 0 deletions go/store/nbs/empty_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ func (ecs emptyChunkSource) close() error {
func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}

func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
return nil
}
Loading

0 comments on commit 46d9f7c

Please sign in to comment.