Skip to content

Commit

Permalink
mcr-settlement-client: pause and resume for mock
Browse files Browse the repository at this point in the history
To test dynamic behavior of the McrSettlementManager, add methods to
pause and resume streaming of commitments.
  • Loading branch information
mzabaluev committed May 27, 2024
1 parent 63726a0 commit a76e9ed
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions protocol-units/settlement/mcr/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ movement-types = { workspace = true }
serde_json = { workspace = true }
async-stream = { workspace = true }

[dev-dependencies]
futures = { workspace = true }

[features]
mock = []

Expand Down
89 changes: 82 additions & 7 deletions protocol-units/settlement/mcr/client/src/mock.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
use crate::{CommitmentStream, McrSettlementClientOperations};
use movement_types::BlockCommitment;
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;

#[derive(Clone)]
pub struct MockMcrSettlementClient {
commitments: Arc<RwLock<HashMap<u64, BlockCommitment>>>,
commitments: Arc<RwLock<BTreeMap<u64, BlockCommitment>>>,
stream_sender: mpsc::Sender<Result<BlockCommitment, anyhow::Error>>,
stream_receiver: Arc<Mutex<Option<mpsc::Receiver<Result<BlockCommitment, anyhow::Error>>>>>,
pub current_height: Arc<RwLock<u64>>,
pub block_lead_tolerance: u64
pub block_lead_tolerance: u64,
paused_at_height: Option<u64>,
}

impl MockMcrSettlementClient {
pub fn new() -> Self {
let (stream_sender, receiver) = mpsc::channel(10);
MockMcrSettlementClient {
commitments: Arc::new(RwLock::new(HashMap::new())),
commitments: Arc::new(RwLock::new(BTreeMap::new())),
stream_sender,
stream_receiver: Arc::new(Mutex::new(Some(receiver))),
current_height: Arc::new(RwLock::new(0)),
block_lead_tolerance: 16,
paused_at_height: None,
}
}

Expand All @@ -34,6 +36,25 @@ impl MockMcrSettlementClient {
let mut commitments = self.commitments.write().await;
commitments.insert(commitment.height, commitment);
}

/// Stop streaming commitments.
///
/// Any posted commitments will be accumulated.
pub async fn pause(&mut self) {
let current_height = self.current_height.read().await;
self.paused_at_height = Some(*current_height);
}

/// Stream any commitments that have been posted following the height
/// at which `pause` was called, and resume streaming any newly posted
/// commitments
pub async fn resume(&mut self) {
let resume_height = self.paused_at_height.take().expect("was not paused");
let commitments = self.commitments.read().await;
for (_, commitment) in commitments.range(resume_height + 1..) {
self.stream_sender.send(Ok(commitment.clone())).await.unwrap();
}
}
}

#[async_trait::async_trait]
Expand All @@ -48,8 +69,10 @@ impl McrSettlementClientOperations for MockMcrSettlementClient {
{
let mut commitments = self.commitments.write().await;
let settled = commitments.entry(block_commitment.height).or_insert(block_commitment);
// Simulate sending to the stream
self.stream_sender.send(Ok(settled.clone())).await?;
if self.paused_at_height.is_none() {
// Simulate sending to the stream
self.stream_sender.send(Ok(settled.clone())).await?;
}
}

{
Expand Down Expand Up @@ -97,7 +120,10 @@ pub mod test {

use super::*;
use movement_types::Commitment;

use futures::future;
use tokio_stream::StreamExt;
use tokio::select;

#[tokio::test]
async fn test_post_block_commitment() -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -170,7 +196,56 @@ pub mod test {
commitment: Commitment([1; 32]),
}).await.unwrap();
let mut stream = client.stream_block_commitments().await?;
assert_eq!(stream.next().await.unwrap().unwrap(), commitment);
assert_eq!(stream.next().await.expect("stream has ended")?, commitment);
Ok(())
}

#[tokio::test]
async fn test_pause() -> Result<(), anyhow::Error> {
let mut client = MockMcrSettlementClient::new();
let commitment = BlockCommitment {
height: 1,
block_id: Default::default(),
commitment: Commitment([1; 32]),
};
client.post_block_commitment(commitment.clone()).await?;
client.pause().await;
let commitment2 = BlockCommitment {
height: 2,
block_id: Default::default(),
commitment: Commitment([1; 32]),
};
client.post_block_commitment(commitment2).await?;
let mut stream = client.stream_block_commitments().await?;
assert_eq!(stream.next().await.expect("stream has ended")?, commitment);
select! {
biased;
_ = stream.next() => panic!("stream should be paused"),
_ = future::ready(()) => {}
}
Ok(())
}

#[tokio::test]
async fn test_resume() -> Result<(), anyhow::Error> {
let mut client = MockMcrSettlementClient::new();
let commitment = BlockCommitment {
height: 1,
block_id: Default::default(),
commitment: Commitment([1; 32]),
};
client.post_block_commitment(commitment.clone()).await?;
client.pause().await;
let commitment2 = BlockCommitment {
height: 2,
block_id: Default::default(),
commitment: Commitment([1; 32]),
};
client.post_block_commitment(commitment2.clone()).await?;
let mut stream = client.stream_block_commitments().await?;
assert_eq!(stream.next().await.expect("stream has ended")?, commitment);
client.resume().await;
assert_eq!(stream.next().await.expect("stream has ended")?, commitment2);
Ok(())
}
}

0 comments on commit a76e9ed

Please sign in to comment.