From 17f16f6263c87a0c73bb47f39d87bb7037140612 Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Tue, 28 Jan 2025 17:04:56 -0800 Subject: [PATCH] PR Feedback: Making processJournalRecords() less aggressive on error'ing out for invalid journal records, to preserve graceful crash consistency --- go/libraries/doltcore/dconfig/envvars.go | 1 - go/store/nbs/journal_record.go | 53 ++++++++++++++---------- go/store/nbs/journal_record_test.go | 16 +------ 3 files changed, 33 insertions(+), 37 deletions(-) diff --git a/go/libraries/doltcore/dconfig/envvars.go b/go/libraries/doltcore/dconfig/envvars.go index 8f46bd1057d..f9df29d7e99 100755 --- a/go/libraries/doltcore/dconfig/envvars.go +++ b/go/libraries/doltcore/dconfig/envvars.go @@ -43,7 +43,6 @@ const ( EnvDoltAuthorDate = "DOLT_AUTHOR_DATE" EnvDoltCommitterDate = "DOLT_COMMITTER_DATE" EnvDbNameReplace = "DOLT_DBNAME_REPLACE" - EnvSkipInvalidJournalRecords = "DOLT_SKIP_INVALID_JOURNAL_RECORDS" EnvDoltRootHost = "DOLT_ROOT_HOST" EnvDoltRootPassword = "DOLT_ROOT_PASSWORD" ) diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index bab535adcde..b028be67911 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -22,10 +22,10 @@ import ( "fmt" "io" "math" - "os" "time" - "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/sirupsen/logrus" + "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" ) @@ -227,7 +227,7 @@ func readJournalRecord(buf []byte) (rec journalRec, err error) { // validateJournalRecord performs some sanity checks on the buffer |buf| containing a journal // record, such as checking that the length of the record is not too short, and checking the -// checksum. If any problems are detected, an erorr is returned, otherwise nil is returned. +// checksum. If any problems are detected, an error is returned, otherwise nil is returned. func validateJournalRecord(buf []byte) error { if len(buf) < (journalRecLenSz + journalRecChecksumSz) { return fmt.Errorf("invalid journal record: buffer length too small (%d < %d)", len(buf), (journalRecLenSz + journalRecChecksumSz)) @@ -251,6 +251,12 @@ func validateJournalRecord(buf []byte) error { // processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at // offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped // is returned, or any error encountered along the way. +// If an invalid journal record is found, it is not included and this function stops processing journal +// entries, but does not return an error. Journal records may be incomplete if the system crashes while +// records are being persisted to disk. This isn't likely, but the OS filesystem write is not an atomic +// operation, so it's possible to leave the journal in a corrupted state. We must gracefully recover +// without preventing the server from starting up, so we are careful to only return the journal file +// offset that points to end fo the last valid record. func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error) (int64, error) { var ( buf []byte @@ -266,11 +272,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f for { // peek to read next record size if buf, err = rdr.Peek(uint32Size); err != nil { - if err == io.EOF { - break - } else { - return 0, err - } + break } // The first 4 bytes in the journal record are the total length of the record (including @@ -285,35 +287,44 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f } if buf, err = rdr.Peek(int(l)); err != nil { - return 0, err + break } - if err = validateJournalRecord(buf); err != nil { - // If the DOLT_SKIP_INVALID_JOURNAL_RECORDS env var is set, then we stop reading the journal - // as soon as we hit an invalid record. This allows users to opt-in to the previous behavior - // where we process as many journal records we can, but stop once we hit an invalid record. - if os.Getenv(dconfig.EnvSkipInvalidJournalRecords) != "" { - break - } else { - return 0, err - } + // TODO: The NomsBlockStore manifest is updated when the sql engine is shutdown cleanly. In the clean shutdown + // case, we have the root hash value and the number of chunks written to the journal from the manifest + // and we could use that that more aggressively validate journal records. When we are starting up from a + // clean shutdown, we expect all journal records to be valid, and could safely error out during startup + // for invalid records. + if validationErr := validateJournalRecord(buf); validationErr != nil { + // NOTE: We don't assign the validation error to err, because we want to stop processing journal records + // when we see an invalid record and return successfully from processJournalRecords(), so that only + // the preceding records in the journal are used. + logrus.Errorf("Error validating journal record; "+ + "skipping remaining journal records past offset %d: %s", validationErr, off) + break } var rec journalRec if rec, err = readJournalRecord(buf); err != nil { - return 0, err + break // failed to read valid record } if err = cb(off, rec); err != nil { - return 0, err + break } // advance |rdr| state by |l| bytes if _, err = io.ReadFull(rdr, buf); err != nil { - return 0, err + break } off += int64(len(buf)) } + // If a non-EOF error was captured while processing journal records, return a + // journal offset of 0 and the error, which will cause startup to halt. + if err != nil && err != io.EOF { + return 0, err + } + // reset the file pointer to end of the last // successfully processed journal record if _, err = r.Seek(off, 0); err != nil { diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index d3ece60633f..607f808b475 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -18,14 +18,12 @@ import ( "bytes" "context" "math/rand" - "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" @@ -120,22 +118,10 @@ func TestProcessJournalRecords(t *testing.T) { assert.Equal(t, int(off), int(n)) require.NoError(t, err) - // write a bogus record to the end and verify that we get an error + // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check) - require.Error(t, err) - require.Contains(t, err.Error(), "CRC checksum does not match") - assert.Equal(t, cnt, i) - // Since an error was encountered, the returned offset is 0 - assert.Equal(t, 0, int(n)) - - // Turn on the env setting to stop processing journal records once we hit an invalid record - require.NoError(t, os.Setenv(dconfig.EnvSkipInvalidJournalRecords, "1")) - i, sum = 0, 0 - // write a bogus record to the end and process again - writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n))