Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure peer_connected is called before peer_disconnected #3110

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 180 additions & 15 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,25 +1682,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
}

if let Err(()) = self.message_handler.route_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(logger, "Route Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
if let Err(()) = self.message_handler.chan_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(logger, "Channel Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
if let Err(()) = self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
let results = [
("Route Handler", self.message_handler.route_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection)),
("Channel Handler", self.message_handler.chan_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection)),
("Onion Handler", self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection)),
("Custom Message Handler", self.message_handler.custom_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection))
];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't like this attempt to dry up the handling then I'm find just having separate results where I check them one by one with their own log messages.


for (handler_name, result) in &results {
if result.is_err() {
log_debug!(logger, "{} decided we couldn't communicate with peer {}", handler_name, log_pubkey!(their_node_id));
}
}
if let Err(()) = self.message_handler.custom_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(logger, "Custom Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));

peer_lock.their_features = Some(msg.features);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this, wouldn't that lead to use falsely assuming the handshake succeeded even though one of our handlers rejected it? And there is a window between us dropping the lock and handling the disconnect even where we would deal with it in a 'normal' manner, e.g., accepting further messages, and potentially rebroadcasting etc?

(cc @TheBlueMatt as he requested this change)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, if that's true then seems like we'll need to separate "handshake_completed" from "triggered peer_connected" with a new flag on the peer that we can use to decide whether or not to trigger peer_disconnected in do_disconnect and disconnect_event_internal?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, kinda, we'll end up forwarding broadcasts, as you point out, which is maybe not ideal, but we shouldn't process any further messages - we're currently in a read processing call, and we require read processing calls for any given peer to be serial, so presumably when we return an error the read-processing pipeline for this peer will stall and we won't get any more reads. We could make that explicit in the docs, however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhh, rather than introducing this race-y behavior in the first place, couldn't we just introduce a new handshake_aborted flag and check that alternatively to !peer.handshake_complete in disconnect_event_internal?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine too.


if results.iter().any(|(_, result)| result.is_err()) {
return Err(PeerHandleError { }.into());
}

peer_lock.awaiting_pong_timer_tick_intervals = 0;
peer_lock.their_features = Some(msg.features);
return Ok(None);
} else if peer_lock.their_features.is_none() {
log_debug!(logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id));
Expand Down Expand Up @@ -2698,11 +2699,12 @@ mod tests {
use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
use crate::ln::peer_handler::{CustomMessageHandler, OnionMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
use crate::ln::{msgs, wire};
use crate::ln::msgs::{Init, LightningError, SocketAddress};
use crate::util::test_utils;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Drop superfluous whitespace.

use bitcoin::Network;
use bitcoin::blockdata::constants::ChainHash;
use bitcoin::secp256k1::{PublicKey, SecretKey};
Expand Down Expand Up @@ -2779,6 +2781,76 @@ mod tests {
}
}

struct TestPeerTrackingMessageHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I believe the alternative would be to add TestCustomMessageHandler and TestOnionMessageHandler to test_utils and use them as part of the default test setup?

features: InitFeatures,
pub peer_connected_called: AtomicBool,
pub peer_disconnected_called: AtomicBool,
}

impl TestPeerTrackingMessageHandler {
pub fn new(features: InitFeatures) -> Self {
Self {
features,
peer_connected_called: AtomicBool::new(false),
peer_disconnected_called: AtomicBool::new(false),
}
}
}

impl wire::CustomMessageReader for TestPeerTrackingMessageHandler {
type CustomMessage = Infallible;
fn read<R: io::Read>(&self, _: u16, _: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
Ok(None)
}
}

impl CustomMessageHandler for TestPeerTrackingMessageHandler {
fn handle_custom_message(&self, _: Infallible, _: &PublicKey) -> Result<(), LightningError> {
unreachable!();
}

fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() }

fn peer_disconnected(&self, _their_node_id: &PublicKey) {
assert!(self.peer_connected_called.load(Ordering::Acquire));
assert!(!self.peer_disconnected_called.load(Ordering::Acquire));
self.peer_disconnected_called.store(true, Ordering::Release);
}

fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> {
assert!(!self.peer_connected_called.load(Ordering::Acquire));
assert!(!self.peer_disconnected_called.load(Ordering::Acquire));
self.peer_connected_called.store(true, Ordering::Release);
Err(())
}

fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }

fn provided_init_features(&self, _: &PublicKey) -> InitFeatures {
self.features.clone()
}
}

impl OnionMessageHandler for TestPeerTrackingMessageHandler {
fn handle_onion_message(&self, _peer_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
fn peer_disconnected(&self, _their_node_id: &PublicKey) {
assert!(self.peer_connected_called.load(Ordering::Acquire));
assert!(!self.peer_disconnected_called.load(Ordering::Acquire));
self.peer_disconnected_called.store(true, Ordering::Release);
}

fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> {
assert!(!self.peer_connected_called.load(Ordering::Acquire));
assert!(!self.peer_disconnected_called.load(Ordering::Acquire));
self.peer_connected_called.store(true, Ordering::Release);
Err(())
}
fn timer_tick_occurred(&self) {}
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty()}
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { self.features.clone() }
}

fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
let mut cfgs = Vec::new();
for i in 0..peer_count {
Expand Down Expand Up @@ -3163,6 +3235,99 @@ mod tests {
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
}

#[test]
fn test_peer_connected_error_disconnects() {

struct PeerTrackingPeerManagerConfig {
logger: test_utils::TestLogger,
node_signer: test_utils::TestNodeSigner,
chan_handler: test_utils::TestChannelMessageHandler,
route_handler: test_utils::TestRoutingMessageHandler,
onion_message_handler: TestPeerTrackingMessageHandler,
custom_message_handler: TestPeerTrackingMessageHandler,
}

fn create_cfgs(peers: u8) -> Vec<PeerTrackingPeerManagerConfig> {
let mut cfgs = vec![];
for i in 0..peers {
let features = {
let mut feature_bits = vec![0u8; 33];
feature_bits[32] = 0b00000001;
InitFeatures::from_le_bytes(feature_bits)
};
let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
cfgs.push(PeerTrackingPeerManagerConfig {
logger: test_utils::TestLogger::new(),
node_signer: test_utils::TestNodeSigner::new(node_secret),
chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)),
route_handler: test_utils::TestRoutingMessageHandler::new(),
onion_message_handler: TestPeerTrackingMessageHandler::new(features.clone()),
custom_message_handler: TestPeerTrackingMessageHandler::new(features.clone()),
});
}
cfgs
}

type PeerTrackingPeerManager<'a> = PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a TestPeerTrackingMessageHandler, &'a test_utils::TestLogger, &'a TestPeerTrackingMessageHandler, &'a test_utils::TestNodeSigner>;

fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerTrackingPeerManagerConfig>) -> Vec<PeerTrackingPeerManager<'a>> {
let mut peers = Vec::new();
for i in 0..peer_count {
let ephemeral_bytes = [i as u8; 32];
let msg_handler = MessageHandler {
chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].route_handler,
onion_message_handler: &cfgs[i].onion_message_handler, custom_message_handler: &cfgs[i].custom_message_handler
};
let peer = PeerManager::new(msg_handler, 0, &ephemeral_bytes, &cfgs[i].logger, &cfgs[i].node_signer);
peers.push(peer);
}

peers
}

fn try_establish_connection<'a>(peer_a: &PeerTrackingPeerManager<'a>, peer_b: &PeerTrackingPeerManager<'a>) -> (FileDescriptor, FileDescriptor) {
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
let mut fd_a = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
let mut fd_b = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();

let _res = peer_a.read_event(&mut fd_a, &initial_data);
peer_a.process_events();

let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);

let _res = peer_b.read_event(&mut fd_b, &a_data);

peer_b.process_events();
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
let _res = peer_a.read_event(&mut fd_a, &b_data);

peer_a.process_events();
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);

let _res = peer_b.read_event(&mut fd_b, &a_data);
(fd_a.clone(), fd_b.clone())
}

let cfgs = create_cfgs(2);
let peers = create_network(2, &cfgs);
let (_sd1, _sd2) = try_establish_connection(&peers[0], &peers[1]);

assert!(cfgs[0].custom_message_handler.peer_connected_called.load(Ordering::Acquire));
assert!(cfgs[0].custom_message_handler.peer_disconnected_called.load(Ordering::Acquire));
assert!(cfgs[0].onion_message_handler.peer_connected_called.load(Ordering::Acquire));
assert!(cfgs[0].onion_message_handler.peer_disconnected_called.load(Ordering::Acquire));
}

#[test]
fn test_do_attempt_write_data() {
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
Expand Down
Loading