Skip to content

Commit

Permalink
Fix unexpected ConnectionDirection update (#188)
Browse files Browse the repository at this point in the history
* The handler instructs the service on how to update connection direction.

* cargo fmt

* cargo fmt

* Fix clippy warning

* Add tests for inject_session_established()

* Rename ConnectionDirectionInstruction shortly

* read access

* Update src/service/test.rs

Co-authored-by: Divma <[email protected]>

* Update src/service/test.rs

Co-authored-by: Divma <[email protected]>

* Add a test case

* We never update connection direction if a node already exists in the routing table

* Update src/service.rs

Fix typo

Co-authored-by: Divma <[email protected]>

---------

Co-authored-by: Divma <[email protected]>
  • Loading branch information
ackintosh and divagant-martian authored Jun 13, 2023
1 parent 370b147 commit 47844ca
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
10 changes: 4 additions & 6 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,12 +682,10 @@ impl Handler {
// outgoing session is that we originally sent a RANDOM packet (signifying we did
// not have a session for a request) and the packet is not a PING (we are not
// trying to update an old session that may have expired.
let connection_direction = {
match (request_call.initiating_session(), &request_call.body()) {
(true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming,
(true, _) => ConnectionDirection::Outgoing,
(false, _) => ConnectionDirection::Incoming,
}
let connection_direction = if request_call.initiating_session() {
ConnectionDirection::Outgoing
} else {
ConnectionDirection::Incoming
};

// We already know the ENR. Send the handshake response packet
Expand Down
16 changes: 15 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,13 +1369,27 @@ impl Service {

/// The equivalent of libp2p `inject_connected()` for a udp session. We have no stream, but a
/// session key-pair has been negotiated.
fn inject_session_established(&mut self, enr: Enr, direction: ConnectionDirection) {
fn inject_session_established(&mut self, enr: Enr, connection_direction: ConnectionDirection) {
// Ignore sessions with non-contactable ENRs
if self.ip_mode.get_contactable_addr(&enr).is_none() {
return;
}

let node_id = enr.node_id();

// We never update connection direction if a node already exists in the routing table as we
// don't want to promote the direction from incoming to outgoing.
let key = kbucket::Key::from(node_id);
let direction = match self
.kbuckets
.read()
.get_bucket(&key)
.map(|bucket| bucket.get(&key))
{
Some(Some(node)) => node.status.direction,
_ => connection_direction,
};

debug!(
"Session established with Node: {}, direction: {}",
node_id, direction
Expand Down
54 changes: 54 additions & 0 deletions src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,57 @@ async fn test_updating_connection_on_ping() {
let node = buckets.iter_ref().next().unwrap();
assert!(node.status.is_connected())
}

#[tokio::test]
async fn test_connection_direction_on_inject_session_established() {
init();

let enr_key1 = CombinedKey::generate_secp256k1();
let ip = std::net::Ipv4Addr::LOCALHOST;
let enr = EnrBuilder::new("v4")
.ip4(ip)
.udp4(10001)
.build(&enr_key1)
.unwrap();

let enr_key2 = CombinedKey::generate_secp256k1();
let ip2 = std::net::Ipv4Addr::LOCALHOST;
let enr2 = EnrBuilder::new("v4")
.ip4(ip2)
.udp4(10002)
.build(&enr_key2)
.unwrap();

let mut service = build_service::<DefaultProtocolId>(
Arc::new(RwLock::new(enr)),
Arc::new(RwLock::new(enr_key1)),
false,
)
.await;

let key = &kbucket::Key::from(enr2.node_id());

// Test that the existing connection direction is not updated.
// Incoming
service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming);
let status = service.kbuckets.read().iter_ref().next().unwrap().status;
assert!(status.is_connected());
assert_eq!(ConnectionDirection::Incoming, status.direction);

service.inject_session_established(enr2.clone(), ConnectionDirection::Outgoing);
let status = service.kbuckets.read().iter_ref().next().unwrap().status;
assert!(status.is_connected());
assert_eq!(ConnectionDirection::Incoming, status.direction);

// (disconnected) Outgoing
let result = service.kbuckets.write().update_node_status(
key,
ConnectionState::Disconnected,
Some(ConnectionDirection::Outgoing),
);
assert!(matches!(result, UpdateResult::Updated));
service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming);
let status = service.kbuckets.read().iter_ref().next().unwrap().status;
assert!(status.is_connected());
assert_eq!(ConnectionDirection::Outgoing, status.direction);
}

0 comments on commit 47844ca

Please sign in to comment.