diff --git a/pkarr/src/client.rs b/pkarr/src/client.rs index 3d661d6..5843df8 100644 --- a/pkarr/src/client.rs +++ b/pkarr/src/client.rs @@ -17,15 +17,19 @@ use std::{ use crate::{PublicKey, Result, SignedPacket}; -const DEFAULT_CACHE_SIZE: usize = 1000; +pub const DEFAULT_CACHE_SIZE: usize = 1000; /// Default minimum TTL 30 seconds -const DEFAULT_MINIMUM_TTL: u64 = 30; +pub const DEFAULT_MINIMUM_TTL: u32 = 30; +/// Default maximum TTL 30 seconds +pub const DEFAULT_MAXIMUM_TTL: u32 = 1800; #[derive(Debug, Clone)] -pub struct Settings { +struct Settings { dht: DhtSettings, cache_size: usize, // TODO: add cusotmization of minimum ttl and maximum ttl + minimum_ttl: u32, + maximum_ttl: u32, } impl Default for Settings { @@ -33,6 +37,8 @@ impl Default for Settings { Self { dht: DhtSettings::default(), cache_size: DEFAULT_CACHE_SIZE, + minimum_ttl: DEFAULT_MINIMUM_TTL, + maximum_ttl: DEFAULT_MAXIMUM_TTL, } } } @@ -43,21 +49,52 @@ pub struct PkarrClientBuilder { } impl PkarrClientBuilder { + /// Create a full DHT node that accepts requests, and acts as a routing and storage node. pub fn server(mut self) -> Self { self.settings.dht.read_only = false; self } + /// Set the Dht bootstrapping nodes pub fn bootstrap(mut self, bootstrap: &[String]) -> Self { self.settings.dht.bootstrap = Some(bootstrap.to_owned()); self } + /// Set the port to listen on. + pub fn port(mut self, port: u16) -> Self { + self.settings.dht.port = Some(port); + self + } + + // TODO: allow custom Cache with traits. + /// Set the [SignedPacket] cache size. + /// Defaults to [DEFAULT_CACHE_SIZE]. pub fn cache_size(mut self, cache_size: usize) -> Self { self.settings.cache_size = cache_size; self } + /// Set the minimum ttl for a cached [SignedPacket]. + /// Defaults to [DEFAULT_MINIMUM_TTL]. + /// + /// Internally the cache will expire after the smallest ttl in + /// the resource records, unless it is smaller than this minimum. + pub fn minimum_ttl(mut self, ttl: u32) -> Self { + self.settings.minimum_ttl = ttl; + self + } + + /// Set the maximum ttl for a cached [SignedPacket]. + /// Defaults to [DEFAULT_MAXIMUM_TTL]. + /// + /// Internally the cache will expire after the smallest ttl in + /// the resource records, unless it is bigger than this maximum. + pub fn maximum_ttl(mut self, ttl: u32) -> Self { + self.settings.maximum_ttl = ttl; + self + } + pub fn build(self) -> Result { PkarrClient::new(self.settings) } @@ -70,7 +107,7 @@ pub struct PkarrClient { } impl PkarrClient { - pub fn new(settings: Settings) -> Result { + fn new(settings: Settings) -> Result { let (sender, receiver) = flume::bounded(32); let mut rpc = Rpc::new()?.with_read_only(settings.dht.read_only); @@ -87,6 +124,7 @@ impl PkarrClient { rpc, resolve_senders: HashMap::new(), cache: LruCache::new(NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap()), + settings, }; thread::spawn(move || run(state, receiver)); @@ -138,10 +176,16 @@ struct CachedPacket { } impl CachedPacket { - fn new(signed_packet: &SignedPacket, ttl: u64) -> Self { + fn new(signed_packet: &SignedPacket, minimum_ttl: u32, maximum_ttl: u32) -> Self { + let mut ttl = minimum_ttl; + + for answer in signed_packet.packet().answers.iter() { + ttl = answer.ttl.max(minimum_ttl).min(maximum_ttl); + } + Self { // TODO: set the ttl from signed_packet records' ttls - expires_at: Instant::now() + Duration::from_secs(ttl), + expires_at: Instant::now() + Duration::from_secs(ttl as u64), signed_packet: signed_packet.to_owned(), } } @@ -156,6 +200,7 @@ struct State { rpc: Rpc, resolve_senders: HashMap>>, cache: LruCache, + settings: Settings, } fn run(mut state: State, receiver: Receiver) { @@ -179,7 +224,11 @@ fn run(mut state: State, receiver: Receiver) { state.cache.put( signed_packet.public_key().to_owned(), - CachedPacket::new(&signed_packet, DEFAULT_MINIMUM_TTL), + CachedPacket::new( + &signed_packet, + state.settings.minimum_ttl, + state.settings.maximum_ttl, + ), ); let request = messages::PutRequestSpecific::PutMutable( @@ -261,7 +310,8 @@ fn dht_tick(state: &mut State, server: &mut mainline::server::Server) { let is_most_recent = state .cache .get(&public_key) - .filter(|cached| cached.is_fresh()) + // Do not filter expired cached packets, because we care more that + // they are more recent. .map_or(true, |cached| { signed_packet.more_recent_than(&cached.signed_packet) }); @@ -270,7 +320,11 @@ fn dht_tick(state: &mut State, server: &mut mainline::server::Server) { state.cache.put( public_key.clone(), // TODO: compare the packet TTLs and the DEFAULT_MINIMUM_TTL - CachedPacket::new(&signed_packet, DEFAULT_MINIMUM_TTL), + CachedPacket::new( + &signed_packet, + state.settings.minimum_ttl, + state.settings.maximum_ttl, + ), ); if let Some(set) = state.resolve_senders.get_mut(target) {