diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 1b58f1935..2c539392f 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -682,12 +682,10 @@ impl Handler { // outgoing session is that we originally sent a RANDOM packet (signifying we did // not have a session for a request) and the packet is not a PING (we are not // trying to update an old session that may have expired. - let connection_direction = { - match (request_call.initiating_session(), &request_call.body()) { - (true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming, - (true, _) => ConnectionDirection::Outgoing, - (false, _) => ConnectionDirection::Incoming, - } + let connection_direction = if request_call.initiating_session() { + ConnectionDirection::Outgoing + } else { + ConnectionDirection::Incoming }; // We already know the ENR. Send the handshake response packet diff --git a/src/service.rs b/src/service.rs index 5e0c0512e..a85c87d44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1369,13 +1369,27 @@ impl Service { /// The equivalent of libp2p `inject_connected()` for a udp session. We have no stream, but a /// session key-pair has been negotiated. - fn inject_session_established(&mut self, enr: Enr, direction: ConnectionDirection) { + fn inject_session_established(&mut self, enr: Enr, connection_direction: ConnectionDirection) { // Ignore sessions with non-contactable ENRs if self.ip_mode.get_contactable_addr(&enr).is_none() { return; } let node_id = enr.node_id(); + + // We never update connection direction if a node already exists in the routing table as we + // don't want to promote the direction from incoming to outgoing. + let key = kbucket::Key::from(node_id); + let direction = match self + .kbuckets + .read() + .get_bucket(&key) + .map(|bucket| bucket.get(&key)) + { + Some(Some(node)) => node.status.direction, + _ => connection_direction, + }; + debug!( "Session established with Node: {}, direction: {}", node_id, direction diff --git a/src/service/test.rs b/src/service/test.rs index d4d113f32..b2860700e 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -164,3 +164,57 @@ async fn test_updating_connection_on_ping() { let node = buckets.iter_ref().next().unwrap(); assert!(node.status.is_connected()) } + +#[tokio::test] +async fn test_connection_direction_on_inject_session_established() { + init(); + + let enr_key1 = CombinedKey::generate_secp256k1(); + let ip = std::net::Ipv4Addr::LOCALHOST; + let enr = EnrBuilder::new("v4") + .ip4(ip) + .udp4(10001) + .build(&enr_key1) + .unwrap(); + + let enr_key2 = CombinedKey::generate_secp256k1(); + let ip2 = std::net::Ipv4Addr::LOCALHOST; + let enr2 = EnrBuilder::new("v4") + .ip4(ip2) + .udp4(10002) + .build(&enr_key2) + .unwrap(); + + let mut service = build_service::( + Arc::new(RwLock::new(enr)), + Arc::new(RwLock::new(enr_key1)), + false, + ) + .await; + + let key = &kbucket::Key::from(enr2.node_id()); + + // Test that the existing connection direction is not updated. + // Incoming + service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming); + let status = service.kbuckets.read().iter_ref().next().unwrap().status; + assert!(status.is_connected()); + assert_eq!(ConnectionDirection::Incoming, status.direction); + + service.inject_session_established(enr2.clone(), ConnectionDirection::Outgoing); + let status = service.kbuckets.read().iter_ref().next().unwrap().status; + assert!(status.is_connected()); + assert_eq!(ConnectionDirection::Incoming, status.direction); + + // (disconnected) Outgoing + let result = service.kbuckets.write().update_node_status( + key, + ConnectionState::Disconnected, + Some(ConnectionDirection::Outgoing), + ); + assert!(matches!(result, UpdateResult::Updated)); + service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming); + let status = service.kbuckets.read().iter_ref().next().unwrap().status; + assert!(status.is_connected()); + assert_eq!(ConnectionDirection::Outgoing, status.direction); +}