Skip to content

Commit

Permalink
PR Feedback: Making processJournalRecords() less aggressive on error'…
Browse files Browse the repository at this point in the history
…ing out for invalid journal records, to preserve graceful crash consistency
  • Loading branch information
fulghum committed Jan 29, 2025
1 parent 35fe785 commit 17f16f6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 37 deletions.
1 change: 0 additions & 1 deletion go/libraries/doltcore/dconfig/envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
EnvDoltAuthorDate = "DOLT_AUTHOR_DATE"
EnvDoltCommitterDate = "DOLT_COMMITTER_DATE"
EnvDbNameReplace = "DOLT_DBNAME_REPLACE"
EnvSkipInvalidJournalRecords = "DOLT_SKIP_INVALID_JOURNAL_RECORDS"
EnvDoltRootHost = "DOLT_ROOT_HOST"
EnvDoltRootPassword = "DOLT_ROOT_PASSWORD"
)
53 changes: 32 additions & 21 deletions go/store/nbs/journal_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"fmt"
"io"
"math"
"os"
"time"

"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/sirupsen/logrus"

"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
Expand Down Expand Up @@ -227,7 +227,7 @@ func readJournalRecord(buf []byte) (rec journalRec, err error) {

// validateJournalRecord performs some sanity checks on the buffer |buf| containing a journal
// record, such as checking that the length of the record is not too short, and checking the
// checksum. If any problems are detected, an erorr is returned, otherwise nil is returned.
// checksum. If any problems are detected, an error is returned, otherwise nil is returned.
func validateJournalRecord(buf []byte) error {
if len(buf) < (journalRecLenSz + journalRecChecksumSz) {
return fmt.Errorf("invalid journal record: buffer length too small (%d < %d)", len(buf), (journalRecLenSz + journalRecChecksumSz))
Expand All @@ -251,6 +251,12 @@ func validateJournalRecord(buf []byte) error {
// processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at
// offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped
// is returned, or any error encountered along the way.
// If an invalid journal record is found, it is not included and this function stops processing journal
// entries, but does not return an error. Journal records may be incomplete if the system crashes while
// records are being persisted to disk. This isn't likely, but the OS filesystem write is not an atomic
// operation, so it's possible to leave the journal in a corrupted state. We must gracefully recover
// without preventing the server from starting up, so we are careful to only return the journal file
// offset that points to end fo the last valid record.
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error) (int64, error) {
var (
buf []byte
Expand All @@ -266,11 +272,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
for {
// peek to read next record size
if buf, err = rdr.Peek(uint32Size); err != nil {
if err == io.EOF {
break
} else {
return 0, err
}
break
}

// The first 4 bytes in the journal record are the total length of the record (including
Expand All @@ -285,35 +287,44 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
}

if buf, err = rdr.Peek(int(l)); err != nil {
return 0, err
break
}

if err = validateJournalRecord(buf); err != nil {
// If the DOLT_SKIP_INVALID_JOURNAL_RECORDS env var is set, then we stop reading the journal
// as soon as we hit an invalid record. This allows users to opt-in to the previous behavior
// where we process as many journal records we can, but stop once we hit an invalid record.
if os.Getenv(dconfig.EnvSkipInvalidJournalRecords) != "" {
break
} else {
return 0, err
}
// TODO: The NomsBlockStore manifest is updated when the sql engine is shutdown cleanly. In the clean shutdown
// case, we have the root hash value and the number of chunks written to the journal from the manifest
// and we could use that that more aggressively validate journal records. When we are starting up from a
// clean shutdown, we expect all journal records to be valid, and could safely error out during startup
// for invalid records.
if validationErr := validateJournalRecord(buf); validationErr != nil {
// NOTE: We don't assign the validation error to err, because we want to stop processing journal records
// when we see an invalid record and return successfully from processJournalRecords(), so that only
// the preceding records in the journal are used.
logrus.Errorf("Error validating journal record; "+
"skipping remaining journal records past offset %d: %s", validationErr, off)
break
}

var rec journalRec
if rec, err = readJournalRecord(buf); err != nil {
return 0, err
break // failed to read valid record
}
if err = cb(off, rec); err != nil {
return 0, err
break
}

// advance |rdr| state by |l| bytes
if _, err = io.ReadFull(rdr, buf); err != nil {
return 0, err
break
}
off += int64(len(buf))
}

// If a non-EOF error was captured while processing journal records, return a
// journal offset of 0 and the error, which will cause startup to halt.
if err != nil && err != io.EOF {
return 0, err
}

// reset the file pointer to end of the last
// successfully processed journal record
if _, err = r.Seek(off, 0); err != nil {
Expand Down
16 changes: 1 addition & 15 deletions go/store/nbs/journal_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"bytes"
"context"
"math/rand"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
Expand Down Expand Up @@ -120,22 +118,10 @@ func TestProcessJournalRecords(t *testing.T) {
assert.Equal(t, int(off), int(n))
require.NoError(t, err)

// write a bogus record to the end and verify that we get an error
// write a bogus record to the end and verify that we don't get an error
i, sum = 0, 0
writeCorruptJournalRecord(journal[off:])
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
require.Error(t, err)
require.Contains(t, err.Error(), "CRC checksum does not match")
assert.Equal(t, cnt, i)
// Since an error was encountered, the returned offset is 0
assert.Equal(t, 0, int(n))

// Turn on the env setting to stop processing journal records once we hit an invalid record
require.NoError(t, os.Setenv(dconfig.EnvSkipInvalidJournalRecords, "1"))
i, sum = 0, 0
// write a bogus record to the end and process again
writeCorruptJournalRecord(journal[off:])
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
require.NoError(t, err)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
Expand Down

0 comments on commit 17f16f6

Please sign in to comment.