Skip to content

Commit

Permalink
add(scan): Implement Results request (#8224)
Browse files Browse the repository at this point in the history
* implement Results service call

* call `sapling_results_for_key` from a blocking thread

Co-authored-by: Arya <[email protected]>

---------

Co-authored-by: Arya <[email protected]>
  • Loading branch information
oxarbitrage and arya2 authored Feb 2, 2024
1 parent 5feb40e commit 052f235
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 11 deletions.
4 changes: 2 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub enum Request {
/// Deletes viewing keys and their results from the database.
DeleteKeys(Vec<String>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),
/// Accept keys and return transaction data
Results(Vec<String>),

/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),
Expand Down
13 changes: 9 additions & 4 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! `zebra_scan::service::ScanService` response types.
use std::sync::{mpsc, Arc};
use std::{
collections::BTreeMap,
sync::{mpsc, Arc},
};

use zebra_chain::{block::Height, transaction::Transaction};
use zebra_chain::{block::Height, transaction::Hash};

#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
Expand All @@ -14,7 +17,9 @@ pub enum Response {
},

/// Response to Results request
Results(Vec<Transaction>),
///
/// We use the nested `BTreeMap` so we don't repeat any piece of response data.
Results(BTreeMap<String, BTreeMap<Height, Vec<Hash>>>),

/// Response to DeleteKeys request
DeletedKeys,
Expand All @@ -23,5 +28,5 @@ pub enum Response {
ClearedResults,

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
SubscribeResults(mpsc::Receiver<Arc<Hash>>),
}
34 changes: 29 additions & 5 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! [`tower::Service`] for zebra-scan.
use std::{future::Future, pin::Pin, task::Poll, time::Duration};
use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};

use futures::future::FutureExt;
use tower::Service;

use zebra_chain::parameters::Network;
use zebra_chain::{parameters::Network, transaction::Hash};

use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
Expand Down Expand Up @@ -124,8 +125,31 @@ impl Service<Request> for ScanService {
.boxed();
}

Request::Results(_key_hashes) => {
// TODO: read results from db
Request::Results(keys) => {
let db = self.db.clone();

return async move {
let mut final_result = BTreeMap::new();
for key in keys {
let db = db.clone();
let mut heights_and_transactions = BTreeMap::new();
let txs = {
let key = key.clone();
tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key))
}
.await?;
txs.iter().for_each(|(k, v)| {
heights_and_transactions
.entry(*k)
.or_insert_with(Vec::new)
.extend(v.iter().map(|x| Hash::from(*x)));
});
final_result.entry(key).or_insert(heights_and_transactions);
}

Ok(Response::Results(final_result))
}
.boxed();
}

Request::SubscribeResults(_key_hashes) => {
Expand All @@ -148,6 +172,6 @@ impl Service<Request> for ScanService {
}
}

async move { Ok(Response::Results(vec![])) }.boxed()
async move { Ok(Response::Results(BTreeMap::new())) }.boxed()
}
}
76 changes: 76 additions & 0 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,79 @@ pub async fn scan_service_clears_results_correctly() -> Result<()> {

Ok(())
}

/// Tests that results for key are returned correctly
#[tokio::test]
pub async fn scan_service_get_results_for_key_correctly() -> Result<()> {
let mut db = new_test_storage(Network::Mainnet);

let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();

for fake_result_height in [Height::MIN, Height(1), Height::MAX] {
db.insert_sapling_results(
&zec_pages_sapling_efvk,
fake_result_height,
fake_sapling_results([
TransactionIndex::MIN,
TransactionIndex::from_index(40),
TransactionIndex::MAX,
]),
);
}

assert!(
db.sapling_results(&zec_pages_sapling_efvk).len() == 3,
"there should be 3 heights for this key in the db"
);

for (_height, transactions) in db.sapling_results(&zec_pages_sapling_efvk) {
assert!(
transactions.len() == 3,
"there should be 3 transactions for each height for this key in the db"
);
}

// We don't need to send any command to the scanner for this call.
let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db);

let response_fut = scan_service
.ready()
.await
.map_err(|err| eyre!(err))?
.call(Request::Results(vec![zec_pages_sapling_efvk.clone()]));

match response_fut.await.map_err(|err| eyre!(err))? {
Response::Results(results) => {
assert!(
results.contains_key(&zec_pages_sapling_efvk),
"results should contain the requested key"
);
assert!(results.len() == 1, "values are only for 1 key");

assert!(
results
.get_key_value(&zec_pages_sapling_efvk)
.unwrap()
.1
.len()
== 3,
"we should have 3 heights for the given key "
);

for transactions in results
.get_key_value(&zec_pages_sapling_efvk)
.unwrap()
.1
.values()
{
assert!(
transactions.len() == 3,
"there should be 3 transactions for each height for this key"
);
}
}
_ => panic!("scan service returned unexpected response variant"),
};

Ok(())
}

0 comments on commit 052f235

Please sign in to comment.