Skip to content

Commit

Permalink
kv: add iterator helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
barraguda committed Dec 17, 2024
1 parent 0877f85 commit 1248855
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum KvAction {
BeginTx,
Commit { tx_id: u64 },
Backup,
IterStart { prefix: Option<Vec<u8>> },
IterNext { iterator_id: u64, count: u64 },
IterClose { iterator_id: u64 },
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -31,6 +34,15 @@ pub enum KvResponse {
BeginTx { tx_id: u64 },
Get { key: Vec<u8> },
Err { error: KvError },
IterStart {
iterator_id: u64,
},
IterNext {
done: bool,
},
IterClose {
iterator_id: u64,
},
}

#[derive(Debug, Serialize, Deserialize, Error)]
Expand Down Expand Up @@ -181,6 +193,117 @@ where
}
}

/// Get all key-value pairs with an optional prefix
///
/// # Example
/// ```
/// let entries = kv.iter_all(Some(&"user_"), 100)?;
/// for (key, value) in entries {
/// println!("key: {}, value: {:?}", key, value);
/// }
/// ```
pub fn iter_all(&self, prefix: Option<&K>, batch_size: u64) -> anyhow::Result<Vec<(K, V)>> {
// Start the iterator
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterStart {
prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()?
},
})?)
.send_and_await_response(self.timeout)?;

let iterator_id = match res {
Ok(Message::Response { body, .. }) => {
match serde_json::from_slice::<KvResponse>(&body)? {
KvResponse::IterStart { iterator_id } => iterator_id,
KvResponse::Err { error } => return Err(error.into()),
_ => return Err(anyhow::anyhow!("kv: unexpected response")),
}
}
_ => return Err(anyhow::anyhow!("kv: unexpected message")),
};

let mut all_entries = Vec::new();

// Collect all entries
loop {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterNext {
iterator_id,
count: batch_size,
},
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
match serde_json::from_slice::<KvResponse>(&body)? {
KvResponse::IterNext { done } => {
let entries_bytes = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?;
let entries: Vec<(Vec<u8>, Vec<u8>)> = serde_json::from_slice(&entries_bytes)?;
for (key_bytes, value_bytes) in entries {
let key = serde_json::from_slice(&key_bytes)?;
let value = serde_json::from_slice(&value_bytes)?;
all_entries.push((key, value));
}
if done {
break;
}
}
KvResponse::Err { error } => return Err(error.into()),
_ => return Err(anyhow::anyhow!("kv: unexpected response")),
}
}
_ => return Err(anyhow::anyhow!("kv: unexpected message")),
}
}

// Clean up
let _ = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterClose { iterator_id },
})?)
.send_and_await_response(self.timeout)?;

Ok(all_entries)
}

/// Get all keys with an optional prefix
///
/// # Example
/// ```
/// let keys = kv.collect_keys(Some(&"user_"))?;
/// for key in keys {
/// println!("key: {}", key);
/// }
/// ```
pub fn collect_keys(&self, prefix: Option<&K>) -> anyhow::Result<Vec<K>> {
Ok(self.iter_all(prefix, 100)?.into_iter().map(|(k, _)| k).collect())
}

/// Get all values with an optional key prefix
///
/// # Example
/// ```
/// let values = kv.collect_values(Some(&"user_"))?;
/// for value in values {
/// println!("value: {:?}", value);
/// }
/// ```
pub fn collect_values(&self, prefix: Option<&K>) -> anyhow::Result<Vec<V>> {
Ok(self.iter_all(prefix, 100)?.into_iter().map(|(_, v)| v).collect())
}

/// Commit a transaction.
pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> {
let res = Request::new()
Expand Down

0 comments on commit 1248855

Please sign in to comment.