From 052f235ba31e2006ef80b371a2d5e7d8b4cfc2b5 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 1 Feb 2024 23:29:36 -0300 Subject: [PATCH] add(scan): Implement `Results` request (#8224) * implement Results service call * call `sapling_results_for_key` from a blocking thread Co-authored-by: Arya --------- Co-authored-by: Arya --- .../src/scan_service/request.rs | 4 +- .../src/scan_service/response.rs | 13 +++- zebra-scan/src/service.rs | 34 +++++++-- zebra-scan/src/service/tests.rs | 76 +++++++++++++++++++ 4 files changed, 116 insertions(+), 11 deletions(-) diff --git a/zebra-node-services/src/scan_service/request.rs b/zebra-node-services/src/scan_service/request.rs index acdd98394df..6069baf25be 100644 --- a/zebra-node-services/src/scan_service/request.rs +++ b/zebra-node-services/src/scan_service/request.rs @@ -20,8 +20,8 @@ pub enum Request { /// Deletes viewing keys and their results from the database. DeleteKeys(Vec), - /// TODO: Accept `KeyHash`es and return `Transaction`s - Results(Vec<()>), + /// Accept keys and return transaction data + Results(Vec), /// TODO: Accept `KeyHash`es and return a channel receiver SubscribeResults(Vec<()>), diff --git a/zebra-node-services/src/scan_service/response.rs b/zebra-node-services/src/scan_service/response.rs index 9cb1bc8fe0a..3a04de94218 100644 --- a/zebra-node-services/src/scan_service/response.rs +++ b/zebra-node-services/src/scan_service/response.rs @@ -1,8 +1,11 @@ //! `zebra_scan::service::ScanService` response types. -use std::sync::{mpsc, Arc}; +use std::{ + collections::BTreeMap, + sync::{mpsc, Arc}, +}; -use zebra_chain::{block::Height, transaction::Transaction}; +use zebra_chain::{block::Height, transaction::Hash}; #[derive(Debug)] /// Response types for `zebra_scan::service::ScanService` @@ -14,7 +17,9 @@ pub enum Response { }, /// Response to Results request - Results(Vec), + /// + /// We use the nested `BTreeMap` so we don't repeat any piece of response data. + Results(BTreeMap>>), /// Response to DeleteKeys request DeletedKeys, @@ -23,5 +28,5 @@ pub enum Response { ClearedResults, /// Response to SubscribeResults request - SubscribeResults(mpsc::Receiver>), + SubscribeResults(mpsc::Receiver>), } diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index 3abba3f26fe..970e52f9db5 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -1,11 +1,12 @@ //! [`tower::Service`] for zebra-scan. -use std::{future::Future, pin::Pin, task::Poll, time::Duration}; +use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration}; use futures::future::FutureExt; use tower::Service; -use zebra_chain::parameters::Network; +use zebra_chain::{parameters::Network, transaction::Hash}; + use zebra_state::ChainTipChange; use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response}; @@ -124,8 +125,31 @@ impl Service for ScanService { .boxed(); } - Request::Results(_key_hashes) => { - // TODO: read results from db + Request::Results(keys) => { + let db = self.db.clone(); + + return async move { + let mut final_result = BTreeMap::new(); + for key in keys { + let db = db.clone(); + let mut heights_and_transactions = BTreeMap::new(); + let txs = { + let key = key.clone(); + tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key)) + } + .await?; + txs.iter().for_each(|(k, v)| { + heights_and_transactions + .entry(*k) + .or_insert_with(Vec::new) + .extend(v.iter().map(|x| Hash::from(*x))); + }); + final_result.entry(key).or_insert(heights_and_transactions); + } + + Ok(Response::Results(final_result)) + } + .boxed(); } Request::SubscribeResults(_key_hashes) => { @@ -148,6 +172,6 @@ impl Service for ScanService { } } - async move { Ok(Response::Results(vec![])) }.boxed() + async move { Ok(Response::Results(BTreeMap::new())) }.boxed() } } diff --git a/zebra-scan/src/service/tests.rs b/zebra-scan/src/service/tests.rs index c7d3a2b8377..6d29d00d209 100644 --- a/zebra-scan/src/service/tests.rs +++ b/zebra-scan/src/service/tests.rs @@ -132,3 +132,79 @@ pub async fn scan_service_clears_results_correctly() -> Result<()> { Ok(()) } + +/// Tests that results for key are returned correctly +#[tokio::test] +pub async fn scan_service_get_results_for_key_correctly() -> Result<()> { + let mut db = new_test_storage(Network::Mainnet); + + let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string(); + + for fake_result_height in [Height::MIN, Height(1), Height::MAX] { + db.insert_sapling_results( + &zec_pages_sapling_efvk, + fake_result_height, + fake_sapling_results([ + TransactionIndex::MIN, + TransactionIndex::from_index(40), + TransactionIndex::MAX, + ]), + ); + } + + assert!( + db.sapling_results(&zec_pages_sapling_efvk).len() == 3, + "there should be 3 heights for this key in the db" + ); + + for (_height, transactions) in db.sapling_results(&zec_pages_sapling_efvk) { + assert!( + transactions.len() == 3, + "there should be 3 transactions for each height for this key in the db" + ); + } + + // We don't need to send any command to the scanner for this call. + let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db); + + let response_fut = scan_service + .ready() + .await + .map_err(|err| eyre!(err))? + .call(Request::Results(vec![zec_pages_sapling_efvk.clone()])); + + match response_fut.await.map_err(|err| eyre!(err))? { + Response::Results(results) => { + assert!( + results.contains_key(&zec_pages_sapling_efvk), + "results should contain the requested key" + ); + assert!(results.len() == 1, "values are only for 1 key"); + + assert!( + results + .get_key_value(&zec_pages_sapling_efvk) + .unwrap() + .1 + .len() + == 3, + "we should have 3 heights for the given key " + ); + + for transactions in results + .get_key_value(&zec_pages_sapling_efvk) + .unwrap() + .1 + .values() + { + assert!( + transactions.len() == 3, + "there should be 3 transactions for each height for this key" + ); + } + } + _ => panic!("scan service returned unexpected response variant"), + }; + + Ok(()) +}