Skip to content

Commit

Permalink
feat: consider resource records' ttl with min and max settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Apr 21, 2024
1 parent a40585b commit cc5d8fd
Showing 1 changed file with 63 additions and 9 deletions.
72 changes: 63 additions & 9 deletions pkarr/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@ use std::{

use crate::{PublicKey, Result, SignedPacket};

const DEFAULT_CACHE_SIZE: usize = 1000;
pub const DEFAULT_CACHE_SIZE: usize = 1000;
/// Default minimum TTL 30 seconds
const DEFAULT_MINIMUM_TTL: u64 = 30;
pub const DEFAULT_MINIMUM_TTL: u32 = 30;
/// Default maximum TTL 30 seconds
pub const DEFAULT_MAXIMUM_TTL: u32 = 1800;

#[derive(Debug, Clone)]
pub struct Settings {
struct Settings {
dht: DhtSettings,
cache_size: usize,
// TODO: add cusotmization of minimum ttl and maximum ttl
minimum_ttl: u32,
maximum_ttl: u32,
}

impl Default for Settings {
fn default() -> Self {
Self {
dht: DhtSettings::default(),
cache_size: DEFAULT_CACHE_SIZE,
minimum_ttl: DEFAULT_MINIMUM_TTL,
maximum_ttl: DEFAULT_MAXIMUM_TTL,
}
}
}
Expand All @@ -43,21 +49,52 @@ pub struct PkarrClientBuilder {
}

impl PkarrClientBuilder {
/// Create a full DHT node that accepts requests, and acts as a routing and storage node.
pub fn server(mut self) -> Self {
self.settings.dht.read_only = false;
self
}

/// Set the Dht bootstrapping nodes
pub fn bootstrap(mut self, bootstrap: &[String]) -> Self {
self.settings.dht.bootstrap = Some(bootstrap.to_owned());
self
}

/// Set the port to listen on.
pub fn port(mut self, port: u16) -> Self {
self.settings.dht.port = Some(port);
self
}

// TODO: allow custom Cache with traits.
/// Set the [SignedPacket] cache size.
/// Defaults to [DEFAULT_CACHE_SIZE].
pub fn cache_size(mut self, cache_size: usize) -> Self {
self.settings.cache_size = cache_size;
self
}

/// Set the minimum ttl for a cached [SignedPacket].
/// Defaults to [DEFAULT_MINIMUM_TTL].
///
/// Internally the cache will expire after the smallest ttl in
/// the resource records, unless it is smaller than this minimum.
pub fn minimum_ttl(mut self, ttl: u32) -> Self {
self.settings.minimum_ttl = ttl;
self
}

/// Set the maximum ttl for a cached [SignedPacket].
/// Defaults to [DEFAULT_MAXIMUM_TTL].
///
/// Internally the cache will expire after the smallest ttl in
/// the resource records, unless it is bigger than this maximum.
pub fn maximum_ttl(mut self, ttl: u32) -> Self {
self.settings.maximum_ttl = ttl;
self
}

pub fn build(self) -> Result<PkarrClient> {
PkarrClient::new(self.settings)
}
Expand All @@ -70,7 +107,7 @@ pub struct PkarrClient {
}

impl PkarrClient {
pub fn new(settings: Settings) -> Result<PkarrClient> {
fn new(settings: Settings) -> Result<PkarrClient> {
let (sender, receiver) = flume::bounded(32);

let mut rpc = Rpc::new()?.with_read_only(settings.dht.read_only);
Expand All @@ -87,6 +124,7 @@ impl PkarrClient {
rpc,
resolve_senders: HashMap::new(),
cache: LruCache::new(NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap()),
settings,
};

thread::spawn(move || run(state, receiver));
Expand Down Expand Up @@ -138,10 +176,16 @@ struct CachedPacket {
}

impl CachedPacket {
fn new(signed_packet: &SignedPacket, ttl: u64) -> Self {
fn new(signed_packet: &SignedPacket, minimum_ttl: u32, maximum_ttl: u32) -> Self {
let mut ttl = minimum_ttl;

for answer in signed_packet.packet().answers.iter() {
ttl = answer.ttl.max(minimum_ttl).min(maximum_ttl);
}

Self {
// TODO: set the ttl from signed_packet records' ttls
expires_at: Instant::now() + Duration::from_secs(ttl),
expires_at: Instant::now() + Duration::from_secs(ttl as u64),
signed_packet: signed_packet.to_owned(),
}
}
Expand All @@ -156,6 +200,7 @@ struct State {
rpc: Rpc,
resolve_senders: HashMap<Id, Vec<Sender<SignedPacket>>>,
cache: LruCache<PublicKey, CachedPacket>,
settings: Settings,
}

fn run(mut state: State, receiver: Receiver<ActorMessage>) {
Expand All @@ -179,7 +224,11 @@ fn run(mut state: State, receiver: Receiver<ActorMessage>) {

state.cache.put(
signed_packet.public_key().to_owned(),
CachedPacket::new(&signed_packet, DEFAULT_MINIMUM_TTL),
CachedPacket::new(
&signed_packet,
state.settings.minimum_ttl,
state.settings.maximum_ttl,
),
);

let request = messages::PutRequestSpecific::PutMutable(
Expand Down Expand Up @@ -261,7 +310,8 @@ fn dht_tick(state: &mut State, server: &mut mainline::server::Server) {
let is_most_recent = state
.cache
.get(&public_key)
.filter(|cached| cached.is_fresh())
// Do not filter expired cached packets, because we care more that
// they are more recent.
.map_or(true, |cached| {
signed_packet.more_recent_than(&cached.signed_packet)
});
Expand All @@ -270,7 +320,11 @@ fn dht_tick(state: &mut State, server: &mut mainline::server::Server) {
state.cache.put(
public_key.clone(),
// TODO: compare the packet TTLs and the DEFAULT_MINIMUM_TTL
CachedPacket::new(&signed_packet, DEFAULT_MINIMUM_TTL),
CachedPacket::new(
&signed_packet,
state.settings.minimum_ttl,
state.settings.maximum_ttl,
),
);

if let Some(set) = state.resolve_senders.get_mut(target) {
Expand Down

0 comments on commit cc5d8fd

Please sign in to comment.