Skip to content

Commit

Permalink
Fix flaky tests for BufferedStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 23, 2023
1 parent d537760 commit a9f3557
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 4 deletions.
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/libs/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ roles = { path = "../roles" }
schema = { path = "../schema" }

[dev-dependencies]
assert_matches.workspace = true
tempfile.workspace = true
test-casing.workspace = true
tokio.workspace = true
30 changes: 28 additions & 2 deletions node/libs/storage/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
StorageError, StorageResult,
};
use async_trait::async_trait;
#[cfg(test)]
use concurrency::ctx::channel;
use concurrency::{
ctx,
sync::{self, watch, Mutex},
Expand Down Expand Up @@ -117,13 +119,23 @@ impl BlockBuffer {
}
}

/// Events emitted by [`BufferedStorage`].
#[cfg(test)]
#[derive(Debug)]
pub(crate) enum BufferedStorageEvent {
/// Update was received from
UpdateReceived(BlockNumber),
}

/// [`BlockStore`] with an in-memory buffer for pending blocks.
#[derive(Debug)]
pub struct BufferedStorage<T> {
inner: T,
inner_subscriber: watch::Receiver<BlockNumber>,
block_writes_sender: watch::Sender<BlockNumber>,
buffer: Mutex<BlockBuffer>,
#[cfg(test)]
events_sender: channel::UnboundedSender<BufferedStorageEvent>,
}

impl<T: ContiguousBlockStore> BufferedStorage<T> {
Expand All @@ -137,9 +149,19 @@ impl<T: ContiguousBlockStore> BufferedStorage<T> {
inner_subscriber,
block_writes_sender: watch::channel(store_block_number).0,
buffer: Mutex::new(BlockBuffer::new(store_block_number)),
#[cfg(test)]
events_sender: channel::unbounded().0,
}
}

#[cfg(test)]
pub(crate) fn set_events_sender(
&mut self,
sender: channel::UnboundedSender<BufferedStorageEvent>,
) {
self.events_sender = sender;
}

#[cfg(test)]
pub(crate) fn as_ref(&self) -> &T {
&self.inner
Expand Down Expand Up @@ -167,8 +189,12 @@ impl<T: ContiguousBlockStore> BufferedStorage<T> {
if let Some(block) = next_block_for_store {
self.inner.schedule_next_block(ctx, &block).await?;
let block_number = block.block.number;
tracing::trace!(%block_number, "Block put in underlying storage");
tracing::trace!(%block_number, "Block scheduled in underlying storage");
}

#[cfg(test)]
self.events_sender
.send(BufferedStorageEvent::UpdateReceived(store_block_number));
}
}
}
Expand Down Expand Up @@ -242,7 +268,7 @@ impl<T: ContiguousBlockStore> WriteBlockStore for BufferedStorage<T> {

if let Some(block) = next_block_for_store {
self.inner.schedule_next_block(ctx, &block).await?;
tracing::trace!(block_number = %block.block.number, "Block put in underlying storage");
tracing::trace!(block_number = %block.block.number, "Block scheduled in underlying storage");
}
self.block_writes_sender.send_replace(block.block.number);
Ok(())
Expand Down
17 changes: 15 additions & 2 deletions node/libs/storage/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use crate::types::ReplicaState;
use crate::{buffered::BufferedStorageEvent, types::ReplicaState};
use assert_matches::assert_matches;
use async_trait::async_trait;
use concurrency::{
ctx::{self, channel},
Expand Down Expand Up @@ -267,7 +268,9 @@ async fn test_buffered_storage(
initial_blocks.insert(0, genesis_block.clone());

let (block_store, block_receiver) = MockContiguousStore::new(block_store);
let buffered_store = BufferedStorage::new(block_store);
let mut buffered_store = BufferedStorage::new(block_store);
let (events_sender, mut events_receiver) = channel::unbounded();
buffered_store.set_events_sender(events_sender);

// Check initial values returned by the store.
let last_initial_block = initial_blocks.last().unwrap().clone();
Expand Down Expand Up @@ -343,6 +346,16 @@ async fn test_buffered_storage(
sync::changed(ctx, &mut inner_subscriber).await?;
}

// Check events emitted by the buffered storage. This also ensures that all underlying storage
// updates are processed before proceeding to the following checks.
let expected_numbers = (initial_block_count as u64 + 1)..=last_block_number.0;
for expected_number in expected_numbers.map(BlockNumber) {
assert_matches!(
events_receiver.recv(ctx).await?,
BufferedStorageEvent::UpdateReceived(number) if number == expected_number
);
}

assert_eq!(buffered_store.buffer_len().await, 0);
Ok(())
})
Expand Down

0 comments on commit a9f3557

Please sign in to comment.