diff --git a/log.go b/log.go index bfc52ed..a4afdce 100644 --- a/log.go +++ b/log.go @@ -18,7 +18,7 @@ import ( ) // Open create a log based on a dir and set of options -func Open(dir string, opts Options) (Log, error) { +func Open(dir string, opts Options) (result Log, err error) { if opts.Rollover == 0 { opts.Rollover = 1024 * 1024 } @@ -45,6 +45,13 @@ func Open(dir string, opts Options) (Log, error) { return nil, kleverr.Newf("log already locked") } } + defer func() { + if err != nil { + if lerr := lock.Unlock(); lerr != nil { + err = kleverr.Newf("%w: could not release lock: %w", err, lerr) + } + } + }() params := index.Params{Times: opts.TimeIndex, Keys: opts.KeyIndex} diff --git a/log_test.go b/log_test.go index 60115e2..fa13567 100644 --- a/log_test.go +++ b/log_test.go @@ -15,6 +15,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/segment" "github.com/klev-dev/kleverr" ) @@ -1079,6 +1080,54 @@ func TestReindex(t *testing.T) { require.Equal(t, msgs[1], gmsg) } +func TestCorruptReopen(t *testing.T) { + msgs := message.Gen(4) + logOpts := Options{ + TimeIndex: true, + KeyIndex: true, + Rollover: 2 * message.Size(msgs[0]), + } + + dir := t.TempDir() + + l, err := Open(dir, logOpts) + require.NoError(t, err) + + publishBatched(t, l, msgs, 1) + require.NoError(t, l.Close()) + + segments, err := segment.Find(dir) + require.NoError(t, err) + require.Len(t, segments, 2) + lastSegment := segments[len(segments)-1] + require.NoError(t, os.WriteFile(lastSegment.Index, []byte("random data characters"), 0700)) + + l, err = Open(dir, logOpts) + require.Error(t, err) + require.Nil(t, l) + + for _, seg := range segments { + require.NoError(t, os.Remove(seg.Index)) + } + + l, err = Open(dir, logOpts) + require.NoError(t, err) + + coff, cmsgs, err := l.Consume(message.OffsetOldest, 32) + require.NoError(t, err) + require.Equal(t, int64(2), coff) + require.Equal(t, msgs[0:2], cmsgs) + + coff, cmsgs, err = l.Consume(coff, 32) + require.NoError(t, err) + require.Equal(t, int64(4), coff) + require.Equal(t, msgs[2:], cmsgs) + + gmsg, err := l.GetByKey(msgs[1].Key) + require.NoError(t, err) + require.Equal(t, msgs[1], gmsg) +} + func TestDelete(t *testing.T) { t.Run("ReaderPartial", testDeleteReaderPartial) t.Run("ReaderPartialReload", testDeleteReaderPartialReload)