Skip to content

Commit

Permalink
fix recovery when writing the wal header is failing
Browse files Browse the repository at this point in the history
  • Loading branch information
jauhararifin committed Dec 25, 2024
1 parent d2e6f79 commit c54391f
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,26 @@ struct WalInternal {
}

impl<R: Runtime> Wal<R> {
fn new(f1: WalFile<R>, f2: WalFile<R>, use_wal_1: bool, next_lsn: Lsn) -> Self {
fn new(
mut f1: WalFile<R>,
mut f2: WalFile<R>,
use_wal_1: bool,
next_lsn: Lsn,
) -> anyhow::Result<Self> {
let f = if use_wal_1 { &mut f1 } else { &mut f2 };
assert!(next_lsn.get() >= f.relative_lsn);
let size = next_lsn.get() - f.relative_lsn;
// WARNING: it is important to perform fsync here to make sure that this file is
// successfully truncated to the last log entry. Otherwise, we might end up with a logs
// with old invalid entries. It is possible that the process crashed in the past and some
// the wal entries are not properly written, so some of them might be valid, and some of
// them might not be valid. If we don't truncate our wal here, we might end up writing some
// log entries until it reach some old wal entry that has been there since last crash. If
// we crash at this point, that particular old wal entry will be considered as valid wal
// entry because it follows our valid wal entry.
f.f.truncate(2u64 * WAL_HEADER_SIZE as u64 + size)?;
f.f.sync()?;

let internal = Arc::new(R::RwMutex::new(WalInternal {
temp_buffer: vec![0u8; MAXIMUM_PAGE_SIZE],
use_wal_1,
Expand Down Expand Up @@ -129,15 +148,15 @@ impl<R: Runtime> Wal<R> {
}

let backward_buffer = vec![0u8; buffer.read().size()];
Wal {
Ok(Wal {
f1,
f2,
buffer,
internal,
flush_trigger: timer_handle,
iter_backward_lock: Mutex::new(backward_buffer),
stat,
}
})
}

pub(crate) fn complete_checkpoint(&self, checkpoint_lsn: Lsn) -> anyhow::Result<()> {
Expand Down Expand Up @@ -326,12 +345,6 @@ impl<R: Runtime> Wal<R> {
};
header.encode(&mut buff[..WAL_HEADER_SIZE]);
header.encode(&mut buff[WAL_HEADER_SIZE..]);
f.f.truncate(0)?;
f.f.seek(SeekFrom::Start(0))?;
f.f.write_all(&buff)?;
stat.bytes_written
.fetch_add(buff.len() as u64, Ordering::SeqCst);
f.is_empty = false;

// WARNING: it is important to perform fsync here to make sure that this file is
// successfully truncated to zero. Otherwise, we might end up with partially written
Expand All @@ -340,7 +353,14 @@ impl<R: Runtime> Wal<R> {
// make things worse, if it happen such that there is an old log entry that starts at
// the position which should be used for the new log entry, we can silently fall into
// bug and it's hard to find out the first root cause.
f.f.truncate(0)?;
f.f.sync()?;

f.f.seek(SeekFrom::Start(0))?;
f.f.write_all(&buff)?;
stat.bytes_written
.fetch_add(buff.len() as u64, Ordering::SeqCst);
f.is_empty = false;
}

let offset = internal.first_unflushed.get() - f.relative_lsn + WAL_HEADER_SIZE as u64 * 2;
Expand All @@ -349,11 +369,9 @@ impl<R: Runtime> Wal<R> {
let written = if buffer.end_offset < buffer.start_offset {
f.f.write_all(&buffer.buff[buffer.start_offset..])?;
f.f.write_all(&buffer.buff[..buffer.end_offset])?;

buffer.buff.len() - buffer.start_offset + buffer.end_offset
} else {
f.f.write_all(&buffer.buff[buffer.start_offset..buffer.end_offset])?;

buffer.end_offset - buffer.start_offset
};
stat.bytes_written
Expand Down Expand Up @@ -659,7 +677,7 @@ pub(crate) fn recover<R: Runtime>(
}
}

Ok(Wal::new(f1.into(), f2.into(), use_wal_1, next_lsn))
Wal::new(f1.into(), f2.into(), use_wal_1, next_lsn)
}

fn recover_wal_file<R: Runtime>(mut f: R::File) -> anyhow::Result<RecoveringWalFile<R>> {
Expand Down

0 comments on commit c54391f

Please sign in to comment.