Skip to content

Commit

Permalink
Ensure we retry BlockRequests which never receive a BlockResponse (
Browse files Browse the repository at this point in the history
…#1398)

We do this by properly utilising `libp2p::request_response` and
representing a `BlockRequest` and a `BlockResponse` as an actual
request and response pair. Previously, both were requests at the
libp2p level and would be responded to by an empty ACK.
`libp2p::request_response` now sends us a request timeout failure
if a response is not received for a request within 10 seconds. This
triggers the existing retry logic in `BlockStore` and we
successfully make the request again (hopefully from a different
peer).

To achieve this I refactored the interaction between the libp2p
layer and `Node`. Instead of all messages being passed to
`handle_network_message`, I have split this into 4 different
methods:
* `handle_broadcast`
* `handle_request`
* `handle_request_failure`
* `handle_response`

Importantly, `handle_request` takes `ResponseChannel` as an
argument. When the handler is ready to respond to the request it
sends that `ResponseChannel` (and the actual response) to a new
`request_responses` channel. The libp2p layer picks up these
responses and passes them on to the network.

This is a slightly leaky abstraction, since the `ResponseChannel`
is libp2p-specific but the node knows about it. This isn't awful,
but it does mean we need some compile-time feature flag
shenanigans to allow the integration test suite (which doesn't use
libp2p) to construct fake versions of the `ResponseChannel`. I
hope this is reasonably well documented in the code and not too
ugly.
  • Loading branch information
JamesHinshelwood authored Sep 5, 2024
1 parent 73e9431 commit 6c8c343
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 205 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion z2/src/docgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,11 @@ pub fn get_implemented_jsonrpc_methods() -> Result<HashMap<ApiMethod, PageStatus
let (s1, _) = tokio::sync::mpsc::unbounded_channel();
let (s2, _) = tokio::sync::mpsc::unbounded_channel();
let (s3, _) = tokio::sync::mpsc::unbounded_channel();
let (s4, _) = tokio::sync::mpsc::unbounded_channel();
let peers = Arc::new(AtomicUsize::new(0));

let my_node = Arc::new(Mutex::new(zilliqa::node::Node::new(
config, secret_key, s1, s2, s3, peers,
config, secret_key, s1, s2, s3, s4, peers,
)?));
let module = zilliqa::api::rpc_module(my_node.clone());
for m in module.method_names() {
Expand Down
5 changes: 4 additions & 1 deletion zilliqa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ path = "src/bin/zilliqa.rs"
[features]
# Enable tests which assert contract bytecode is correct and reproducible. Enabled in CI.
test_contract_bytecode = []
# Enable fake implementation of `ResponseChannel`, so tests can construct response channels.
fake_response_channel = []
# Enable fake implementation of `std::time::SystemTime` for deterministic tests.
fake_time = []

Expand Down Expand Up @@ -76,6 +78,7 @@ cbor4ii = "0.3.2"
scilla-parser = "1.0.0"
blsful = "2.5.7"
bech32 = "0.11.0"
cfg-if = "1.0.0"

[dev-dependencies]
alloy = { version = "0.2.1", default-features = false, features = ["rand"] }
Expand All @@ -88,7 +91,7 @@ indicatif = "0.17.8"
primitive-types = { version = "0.12.2" }
semver = "1.0.23"
ureq = "2.10.1"
zilliqa = { path = ".", default-features = false, features = ["fake_time"] }
zilliqa = { path = ".", default-features = false, features = ["fake_response_channel", "fake_time"] }
zilliqa-macros = { path = "../zilliqa-macros" }

[[bench]]
Expand Down
2 changes: 1 addition & 1 deletion zilliqa/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ impl Consensus {
Ok(committee)
}

pub fn new_view(&mut self, _: PeerId, new_view: NewView) -> Result<Option<Block>> {
pub fn new_view(&mut self, new_view: NewView) -> Result<Option<Block>> {
trace!("Received new view for height: {:?}", new_view.view);

// The leader for this view should be chosen according to the parent of the highest QC
Expand Down
6 changes: 4 additions & 2 deletions zilliqa/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ pub enum ExternalMessage {
BlockRequest(BlockRequest),
BlockResponse(BlockResponse),
NewTransaction(SignedTransaction),
RequestResponse,
/// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't
/// require any data in the response.
Acknowledgement,
}

impl ExternalMessage {
Expand Down Expand Up @@ -254,7 +256,7 @@ impl Display for ExternalMessage {
}
}
ExternalMessage::NewTransaction(_) => write!(f, "NewTransaction"),
ExternalMessage::RequestResponse => write!(f, "RequestResponse"),
ExternalMessage::Acknowledgement => write!(f, "RequestResponse"),
}
}
}
Expand Down
168 changes: 113 additions & 55 deletions zilliqa/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ use crate::{
exec::{PendingState, TransactionApplyResult},
inspector::{self, ScillaInspector},
message::{
Block, BlockHeader, BlockRequest, BlockResponse, ExternalMessage, InternalMessage,
IntershardCall, Proposal,
Block, BlockHeader, BlockResponse, ExternalMessage, InternalMessage, IntershardCall,
Proposal,
},
node_launcher::ResponseChannel,
p2p_node::{LocalMessageTuple, OutboundMessageTuple},
pool::TxPoolContent,
state::State,
Expand Down Expand Up @@ -134,6 +135,9 @@ pub struct Node {
pub db: Arc<Db>,
peer_id: PeerId,
message_sender: MessageSender,
/// Send responses to requests down this channel. The `ResponseChannel` passed must correspond to a
/// `ResponseChannel` received via `handle_request`.
request_responses: UnboundedSender<(ResponseChannel, ExternalMessage)>,
reset_timeout: UnboundedSender<Duration>,
pub consensus: Consensus,
peer_num: Arc<AtomicUsize>,
Expand Down Expand Up @@ -162,6 +166,7 @@ impl Node {
secret_key: SecretKey,
message_sender_channel: UnboundedSender<OutboundMessageTuple>,
local_sender_channel: UnboundedSender<LocalMessageTuple>,
request_responses: UnboundedSender<(ResponseChannel, ExternalMessage)>,
reset_timeout: UnboundedSender<Duration>,
peer_num: Arc<AtomicUsize>,
) -> Result<Node> {
Expand All @@ -178,6 +183,7 @@ impl Node {
config: config.clone(),
peer_id,
message_sender: message_sender.clone(),
request_responses,
reset_timeout: reset_timeout.clone(),
db: db.clone(),
chain_id: ChainId::new(config.eth_chain_id),
Expand All @@ -187,69 +193,129 @@ impl Node {
Ok(node)
}

// TODO: Multithreading - `&mut self` -> `&self`
pub fn handle_network_message(
&mut self,
from: PeerId,
message: Result<ExternalMessage, OutgoingMessageFailure>,
) -> Result<()> {
let to = self.peer_id;
let to_self = from == to;
let message = match message {
Ok(message) => message,
Err(failure) => {
debug!(?failure, "handling message failure");
self.consensus.report_outgoing_message_failure(failure)?;

return Ok(());
}
};
debug!(%from, %to, %message, "handling message");
pub fn handle_broadcast(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> {
debug!(%from, to = %self.peer_id, %message, "handling broadcast");
// We only expect `Proposal`s and `NewTransaction`s to be broadcast.
match message {
ExternalMessage::Proposal(m) => {
if let Some((to, message)) = self.consensus.proposal(from, m, false)? {
self.reset_timeout.send(DEFAULT_SLEEP_TIME_MS)?;
if let Some(to) = to {
self.message_sender.send_external_message(to, message)?;
} else {
self.handle_proposal(from, m)?;
}
ExternalMessage::NewTransaction(t) => {
let inserted = self.consensus.new_transaction(t.verify()?)?;
if inserted {
if let Some((_, message)) = self.consensus.try_to_propose_new_block()? {
self.message_sender.broadcast_external_message(message)?;
}
}
}
_ => {
warn!("unexpected message type");
}
}

Ok(())
}

pub fn handle_request(
&mut self,
from: PeerId,
message: ExternalMessage,
response_channel: ResponseChannel,
) -> Result<()> {
debug!(%from, to = %self.peer_id, %message, "handling request");
match message {
ExternalMessage::Vote(m) => {
if let Some((block, transactions)) = self.consensus.vote(*m)? {
self.message_sender
.broadcast_external_message(ExternalMessage::Proposal(
Proposal::from_parts(block, transactions),
))?;
}
// Acknowledge this vote.
self.request_responses
.send((response_channel, ExternalMessage::Acknowledgement))?;
}
ExternalMessage::NewView(m) => {
if let Some(block) = self.consensus.new_view(from, *m)? {
if let Some(block) = self.consensus.new_view(*m)? {
self.message_sender
.broadcast_external_message(ExternalMessage::Proposal(
Proposal::from_parts(block, vec![]),
))?;
}
// Acknowledge this new view.
self.request_responses
.send((response_channel, ExternalMessage::Acknowledgement))?;
}
ExternalMessage::BlockRequest(m) => {
if !to_self {
self.handle_block_request(from, m)?;
} else {
ExternalMessage::BlockRequest(request) => {
if from == self.peer_id {
debug!("ignoring blocks request to self");
return Ok(());
}

let proposals = (request.from_view..=request.to_view)
.take(self.config.block_request_limit)
.filter_map(|view| {
self.consensus
.get_block_by_view(view)
.transpose()
.map(|block| Ok(self.block_to_proposal(block?)))
})
.collect::<Result<_>>()?;

trace!("responding to new blocks request of {request:?}");

// Send the response to this block request.
self.request_responses.send((
response_channel,
ExternalMessage::BlockResponse(BlockResponse { proposals }),
))?;
}
// We don't usually expect a [BlockResponse] to be received as a request, however this can occur when our
// [BlockStore] has re-sent a previously unusable block because we didn't (yet) have the block's parent.
// Having knowledge of this here breaks our abstraction boundaries slightly, but it also keeps things
// simple.
ExternalMessage::BlockResponse(m) => {
self.handle_block_response(from, m)?;
// Acknowledge this block response. This does nothing because the `BlockResponse` request was sent by
// us, but we keep it here for symmetry with the other handlers.
self.request_responses
.send((response_channel, ExternalMessage::Acknowledgement))?;
}
ExternalMessage::RequestResponse => {}
ExternalMessage::NewTransaction(t) => {
let inserted = self.consensus.new_transaction(t.verify()?)?;
if inserted {
if let Some((_, message)) = self.consensus.try_to_propose_new_block()? {
self.message_sender.broadcast_external_message(message)?;
}
}
// Handle requests which just contain a block proposal. This shouldn't actually happen in production, but
// annoyingly we have some test cases which trigger this (by rewriting broadcasts to direct messages) and
// the easiest fix is just to handle requests with proposals gracefully.
ExternalMessage::Proposal(m) => {
self.handle_proposal(from, m)?;

// Acknowledge the proposal.
self.request_responses
.send((response_channel, ExternalMessage::Acknowledgement))?;
}
_ => {
warn!("unexpected message type");
}
}

Ok(())
}

pub fn handle_request_failure(
&mut self,
from: PeerId,
failure: OutgoingMessageFailure,
) -> Result<()> {
debug!(%from, to = %self.peer_id, ?failure, "handling message failure");
self.consensus.report_outgoing_message_failure(failure)?;
Ok(())
}

pub fn handle_response(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> {
debug!(%from, to = %self.peer_id, %message, "handling response");
match message {
ExternalMessage::BlockResponse(m) => self.handle_block_response(from, m)?,
ExternalMessage::Acknowledgement => {}
_ => {
warn!("unexpected message type");
}
}

Expand Down Expand Up @@ -801,23 +867,15 @@ impl Node {
Proposal::from_parts(block, txs)
}

fn handle_block_request(&mut self, source: PeerId, request: BlockRequest) -> Result<()> {
let proposals = (request.from_view..=request.to_view)
.take(self.config.block_request_limit)
.filter_map(|view| {
self.consensus
.get_block_by_view(view)
.transpose()
.map(|block| Ok(self.block_to_proposal(block?)))
})
.collect::<Result<_>>()?;

trace!("responding to new blocks request of {request:?}");

self.message_sender.send_external_message(
source,
ExternalMessage::BlockResponse(BlockResponse { proposals }),
)?;
fn handle_proposal(&mut self, from: PeerId, proposal: Proposal) -> Result<()> {
if let Some((to, message)) = self.consensus.proposal(from, proposal, false)? {
self.reset_timeout.send(DEFAULT_SLEEP_TIME_MS)?;
if let Some(to) = to {
self.message_sender.send_external_message(to, message)?;
} else {
self.message_sender.broadcast_external_message(message)?;
}
}

Ok(())
}
Expand Down
Loading

0 comments on commit 6c8c343

Please sign in to comment.