Skip to content

Commit

Permalink
fix(mirror): wait 15 seconds before sending transactions (near#10990)
Browse files Browse the repository at this point in the history
The test pytest/tools/mirror/offline_test.py often fails with many fewer
transactions observed than wanted, and the logs reveal that many
transactions are invalid because the access keys used do not exist in
the target chain. This happens because some early transactions that
should have added those keys never make it on chain. These transactions
are sent successfully from the perspective of the ClientActor, but the
logs show that they're dropped by the peer manager:

```
DEBUG handle{handler="PeerManagerMessageRequest" actor="PeerManagerActor" msg_type="NetworkRequests"}: network: Failed sending message: peer not connected to=ed25519:Fz7d1xkkt3XsvTPiwk4JRhMuPru4Ss7cLS8fdhshDRj3 num_connected_peers=1 msg=Routed(RoutedMessageV2 { msg: RoutedMessage { ... body: tx GFW8HgTndXVKdcLHdsCXxURjHxDnnEqHadrbxvsLKVQb ...
```

So, the peer manager is dropping the transaction instead of routing it,
and the test fails because many subsequent transactions depended on that
one. A git bisect shows that this behavior starts after
near#9651. It seems that this failure
to route messages happens for a bit longer after startup after that PR.
The proper way to handle this might be to implement a mechanism whereby
these messages won't just silently be dropped, and the ClientActor can
receive a notification that it wasn't successful so that we can retry it
later. But for now a workaround is to just wait a little bit before
sending transactions. So we'll set a 15 second timer for the first batch
of transactions, and then proceed normally with the others
  • Loading branch information
marcelo-gonzalez authored Apr 10, 2024
1 parent 1141947 commit 20658bb
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions tools/mirror/src/chain_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ pub(crate) enum SentBatch {

// TODO: the separation between what's in here and what's in the main file with struct TxMirror is not
// that clear and doesn't make that much sense. Should refactor
#[derive(Default)]
pub(crate) struct TxTracker {
sent_txs: HashMap<CryptoHash, TxSendInfo>,
txs_by_signer: HashMap<(AccountId, PublicKey), BTreeSet<TxId>>,
Expand All @@ -147,7 +146,7 @@ pub(crate) struct TxTracker {
nonempty_height_queued: Option<BlockHeight>,
height_popped: Option<BlockHeight>,
height_seen: Option<BlockHeight>,
send_time: Option<Pin<Box<tokio::time::Sleep>>>,
send_time: Pin<Box<tokio::time::Sleep>>,
// Config value in the target chain, used to judge how long to wait before sending a new batch of txs
min_block_production_delay: Duration,
// timestamps in the target chain, used to judge how long to wait before sending a new batch of txs
Expand All @@ -169,7 +168,25 @@ impl TxTracker {
I: IntoIterator<Item = &'a BlockHeight>,
{
let next_heights = next_heights.into_iter().map(Clone::clone).collect();
Self { min_block_production_delay, next_heights, stop_height, ..Default::default() }
Self {
min_block_production_delay,
next_heights,
stop_height,
// Wait at least 15 seconds before sending any transactions because for
// a few seconds after the node starts, transaction routing requests
// will be silently dropped by the peer manager.
send_time: Box::pin(tokio::time::sleep(std::time::Duration::from_secs(15))),
sent_txs: HashMap::new(),
txs_by_signer: HashMap::new(),
queued_blocks: VecDeque::new(),
updater_to_keys: HashMap::new(),
nonces: HashMap::new(),
height_queued: None,
nonempty_height_queued: None,
height_popped: None,
height_seen: None,
recent_block_timestamps: VecDeque::new(),
}
}

pub(crate) async fn next_heights<T: ChainAccess>(
Expand Down Expand Up @@ -441,10 +458,7 @@ impl TxTracker {
}

pub(crate) fn next_batch_time(&self) -> Instant {
match &self.send_time {
Some(t) => t.as_ref().deadline().into_std(),
None => Instant::now(),
}
self.send_time.as_ref().deadline().into_std()
}

pub(crate) async fn next_batch(
Expand All @@ -455,10 +469,10 @@ impl TxTracker {
// sleep until 20 milliseconds before we want to send transactions before we check for nonces
// in the target chain. In the second or so between now and then, we might process another block
// that will set the nonces.
if let Some(s) = &self.send_time {
tokio::time::sleep_until(s.as_ref().deadline() - std::time::Duration::from_millis(20))
.await;
}
tokio::time::sleep_until(
self.send_time.as_ref().deadline() - std::time::Duration::from_millis(20),
)
.await;
let mut needed_access_keys = HashSet::new();
for c in self.queued_blocks[0].chunks.iter_mut() {
for tx in c.txs.iter_mut() {
Expand Down Expand Up @@ -518,9 +532,7 @@ impl TxTracker {
};
}
}
if let Some(sleep) = &mut self.send_time {
sleep.await;
}
(&mut self.send_time).await;
Ok(self.queued_blocks.pop_front().unwrap())
}

Expand Down Expand Up @@ -1134,12 +1146,7 @@ impl TxTracker {
let block_delay = self
.second_longest_recent_block_delay()
.unwrap_or(self.min_block_production_delay + Duration::from_millis(100));
match &mut self.send_time {
Some(t) => t.as_mut().reset(tokio::time::Instant::now() + block_delay),
None => {
self.send_time = Some(Box::pin(tokio::time::sleep(block_delay)));
}
}
self.send_time.as_mut().reset(tokio::time::Instant::now() + block_delay);
crate::set_last_source_height(db, b.source_height)?;
let txs = b
.chunks
Expand Down

0 comments on commit 20658bb

Please sign in to comment.