Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: iterator support #629

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions kinode/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use lib::types::core::{
PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID,
KV_PROCESS_ID,
};
use rand::random;
use rocksdb::OptimisticTransactionDB;
use std::{
collections::{HashMap, VecDeque},
Expand All @@ -24,6 +25,13 @@ struct KvState {
/// access order of dbs, used to cull if we hit the fds limit
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
/// track active iterators: (package_id, db_name) -> (iterator_id -> current position)
iterators: Arc<
DashMap<
(PackageId, String),
DashMap<u64, Vec<u8>>, // Store last seen key instead of iterator
>,
>,
fds_limit: u64,
}

Expand All @@ -42,6 +50,7 @@ impl KvState {
open_kvs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
iterators: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
Expand Down Expand Up @@ -98,6 +107,108 @@ impl KvState {
self.remove_db(key.0, key.1).await;
}
}

async fn handle_iter_start(
&mut self,
package_id: PackageId,
db: String,
prefix: Option<Vec<u8>>,
) -> Result<u64, KvError> {
let db_key = (package_id.clone(), db.clone());
let _db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;

// Generate a random iterator ID and ensure it's unique
let iterators = self
.iterators
.entry(db_key.clone())
.or_insert_with(|| DashMap::new());

let mut iterator_id = random::<u64>();
while iterators.contains_key(&iterator_id) {
iterator_id = random::<u64>();
}

// Store the starting position (prefix or empty vec for start)
iterators.insert(iterator_id, prefix.unwrap_or_default());

Ok(iterator_id)
}

async fn handle_iter_next(
&mut self,
package_id: PackageId,
db: String,
iterator_id: u64,
count: u64,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), KvError> {
let db_key = (package_id.clone(), db.clone());
let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;

let db_iters = self.iterators.get(&db_key).ok_or(KvError::NoDb)?;
let last_key = db_iters
.get(&iterator_id)
.ok_or(KvError::NoIterator)?
.clone();

let mut entries = Vec::new();
let mut done = true;

// Create a fresh iterator starting from our last position
let mode = if last_key.is_empty() {
rocksdb::IteratorMode::Start
} else {
rocksdb::IteratorMode::From(&last_key, rocksdb::Direction::Forward)
};

let mut iter = db.iterator(mode);
let mut count_remaining = count;

while let Some(item) = iter.next() {
if count_remaining == 0 {
done = false;
break;
}

match item {
Ok((key, value)) => {
let key_vec = key.to_vec();
if !key_vec.starts_with(&last_key) && !last_key.is_empty() {
// We've moved past our prefix
break;
}
entries.push((key_vec.clone(), value.to_vec()));
if let Some(mut last_key_entry) = db_iters.get_mut(&iterator_id) {
*last_key_entry = key_vec;
}
count_remaining -= 1;
}
Err(e) => {
return Err(KvError::RocksDBError {
action: "iter_next".to_string(),
error: e.to_string(),
});
}
}
}

Ok((entries, done))
}

async fn handle_iter_close(
&mut self,
package_id: PackageId,
db: String,
iterator_id: u64,
) -> Result<(), KvError> {
let db_key = (package_id, db);
if let Some(db_iters) = self.iterators.get_mut(&db_key) {
db_iters.remove(&iterator_id);
if db_iters.is_empty() {
self.iterators.remove(&db_key);
}
}
Ok(())
}
}

pub async fn kv(
Expand Down Expand Up @@ -379,6 +490,39 @@ async fn handle_request(
}
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::IterStart { prefix } => {
let iterator_id = state
.handle_iter_start(
request.package_id.clone(),
request.db.clone(),
prefix.clone(),
)
.await?;
(
serde_json::to_vec(&KvResponse::IterStart { iterator_id }).unwrap(),
None,
)
}
KvAction::IterNext { iterator_id, count } => {
let (entries, done) = state
.handle_iter_next(
request.package_id.clone(),
request.db.clone(),
*iterator_id,
*count,
)
.await?;
(
serde_json::to_vec(&KvResponse::IterNext { done }).unwrap(),
Some(serde_json::to_vec(&entries).unwrap()),
)
}
KvAction::IterClose { iterator_id } => {
state
.handle_iter_close(request.package_id.clone(), request.db.clone(), *iterator_id)
.await?;
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
};

if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
Expand Down Expand Up @@ -534,6 +678,9 @@ async fn check_caps(
Ok(())
}
KvAction::Backup { .. } => Ok(()),
KvAction::IterStart { .. } => Ok(()),
KvAction::IterNext { .. } => Ok(()),
KvAction::IterClose { .. } => Ok(()),
}
}

Expand Down
10 changes: 10 additions & 0 deletions lib/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub enum KvAction {
BeginTx,
Commit { tx_id: u64 },
Backup,
// Iterator operations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments inside enums should be /// docstrings -- please label each iter operation with an explanation of how to use it. It's not immediately obvious to me how it works.

IterStart { prefix: Option<Vec<u8>> },
IterNext { iterator_id: u64, count: u64 },
IterClose { iterator_id: u64 },
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -28,6 +32,10 @@ pub enum KvResponse {
BeginTx { tx_id: u64 },
Get { key: Vec<u8> },
Err { error: KvError },
// Iterator responses
IterStart { iterator_id: u64 },
IterNext { done: bool },
IterClose { iterator_id: u64 },
}

#[derive(Debug, Serialize, Deserialize, Error)]
Expand All @@ -38,6 +46,8 @@ pub enum KvError {
KeyNotFound,
#[error("no Tx found")]
NoTx,
#[error("Iterator not found")]
NoIterator,
#[error("No capability: {error}")]
NoCap { error: String },
#[error("rocksdb internal error: {error}")]
Expand Down
Loading