From 14a8f1ab7e03decca5064cd74abaebd53c41ec79 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Mon, 17 Feb 2025 11:22:41 -0300 Subject: [PATCH 1/7] feat: stream State Sync response --- .gitignore | 2 +- CHANGELOG.md | 1 + Makefile | 2 +- crates/proto/src/generated/responses.rs | 15 +-- crates/proto/src/generated/rpc.rs | 45 +++++--- crates/proto/src/generated/store.rs | 27 +++-- crates/rpc-proto/proto/responses.proto | 15 +-- crates/rpc-proto/proto/rpc.proto | 11 +- crates/rpc-proto/proto/store.proto | 2 +- crates/rpc/src/server/api.rs | 7 +- crates/store/src/server/api.rs | 146 +++++++++++++++--------- proto/responses.proto | 15 +-- proto/rpc.proto | 11 +- proto/store.proto | 2 +- 14 files changed, 171 insertions(+), 130 deletions(-) diff --git a/.gitignore b/.gitignore index b3359a675..809536ce0 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,4 @@ env/ *.out node_modules/ *DS_Store -*.iml \ No newline at end of file +*.iml diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a3e87b2a..ed9872ec1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [BREAKING] Updated minimum Rust version to 1.84. - [BREAKING] `Endpoint` configuration simplified to a single string (#654). +- [BREAKING] Changed sync state endpoint to stream the response (#685). ### Enhancements diff --git a/Makefile b/Makefile index 5a6d691b0..a62ae2717 100644 --- a/Makefile +++ b/Makefile @@ -75,7 +75,7 @@ check: ## Check all targets and features for errors without code generation # --- building ------------------------------------------------------------------------------------ .PHONY: build -build: ## Builds all crates and re-builds ptotobuf bindings for proto crates +build: ## Builds all crates and re-builds protobuf bindings for proto crates ${BUILD_PROTO} cargo build --locked # --- installing ---------------------------------------------------------------------------------- diff --git a/crates/proto/src/generated/responses.rs b/crates/proto/src/generated/responses.rs index c3a8f5f20..3f9969446 100644 --- a/crates/proto/src/generated/responses.rs +++ b/crates/proto/src/generated/responses.rs @@ -42,27 +42,24 @@ pub struct NullifierUpdate { /// Represents the result of syncing state request. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SyncStateResponse { - /// Number of the latest block in the chain. - #[prost(fixed32, tag = "1")] - pub chain_tip: u32, /// Block header of the block with the first note matching the specified criteria. - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag = "1")] pub block_header: ::core::option::Option, /// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "2")] pub mmr_delta: ::core::option::Option, /// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - #[prost(message, repeated, tag = "5")] + #[prost(message, repeated, tag = "3")] pub accounts: ::prost::alloc::vec::Vec, /// List of transactions executed against requested accounts between `request.block_num + 1` and /// `response.block_header.block_num`. - #[prost(message, repeated, tag = "6")] + #[prost(message, repeated, tag = "4")] pub transactions: ::prost::alloc::vec::Vec, /// List of all notes together with the Merkle paths from `response.block_header.note_root`. - #[prost(message, repeated, tag = "7")] + #[prost(message, repeated, tag = "5")] pub notes: ::prost::alloc::vec::Vec, /// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - #[prost(message, repeated, tag = "8")] + #[prost(message, repeated, tag = "6")] pub nullifiers: ::prost::alloc::vec::Vec, } /// Represents the result of syncing notes request. diff --git a/crates/proto/src/generated/rpc.rs b/crates/proto/src/generated/rpc.rs index cab7a3998..de205e75b 100644 --- a/crates/proto/src/generated/rpc.rs +++ b/crates/proto/src/generated/rpc.rs @@ -355,12 +355,11 @@ pub mod api_client { /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts, notes, nullifiers) the client is interested in. /// - /// This request returns the next block containing requested data. It also returns `chain_tip` - /// which is the latest block number in the chain. Client is expected to repeat these requests - /// in a loop until `response.block_header.block_num == response.chain_tip`, at which point - /// the client is fully synchronized with the chain. + /// This request returns a stream where multiple update responses will be pushed in order. + /// Client is expected to read the updates from the stream and apply them, and then it will be + /// fully synchronized with the chain. /// - /// Each request also returns info about new notes, nullifiers etc. created. It also returns + /// Each update response also contains info about new notes, nullifiers etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain /// MMR peaks and chain MMR nodes. /// @@ -371,7 +370,9 @@ pub mod api_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response< + tonic::codec::Streaming, + >, tonic::Status, > { self.inner @@ -386,7 +387,7 @@ pub mod api_client { let path = http::uri::PathAndQuery::from_static("/rpc.Api/SyncState"); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new("rpc.Api", "SyncState")); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } } } @@ -501,15 +502,23 @@ pub mod api_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the SyncState method. + type SyncStateStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::responses::SyncStateResponse, + tonic::Status, + >, + > + + std::marker::Send + + 'static; /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts, notes, nullifiers) the client is interested in. /// - /// This request returns the next block containing requested data. It also returns `chain_tip` - /// which is the latest block number in the chain. Client is expected to repeat these requests - /// in a loop until `response.block_header.block_num == response.chain_tip`, at which point - /// the client is fully synchronized with the chain. + /// This request returns a stream where multiple update responses will be pushed in order. + /// Client is expected to read the updates from the stream and apply them, and then it will be + /// fully synchronized with the chain. /// - /// Each request also returns info about new notes, nullifiers etc. created. It also returns + /// Each update response also contains info about new notes, nullifiers etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain /// MMR peaks and chain MMR nodes. /// @@ -519,10 +528,7 @@ pub mod api_server { async fn sync_state( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ApiServer { @@ -1087,12 +1093,13 @@ pub mod api_server { struct SyncStateSvc(pub Arc); impl< T: Api, - > tonic::server::UnaryService< + > tonic::server::ServerStreamingService< super::super::requests::SyncStateRequest, > for SyncStateSvc { type Response = super::super::responses::SyncStateResponse; + type ResponseStream = T::SyncStateStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( @@ -1125,7 +1132,7 @@ pub mod api_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) diff --git a/crates/proto/src/generated/store.rs b/crates/proto/src/generated/store.rs index 0d9b52f78..ee146f68b 100644 --- a/crates/proto/src/generated/store.rs +++ b/crates/proto/src/generated/store.rs @@ -450,7 +450,9 @@ pub mod api_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response< + tonic::codec::Streaming, + >, tonic::Status, > { self.inner @@ -465,7 +467,7 @@ pub mod api_client { let path = http::uri::PathAndQuery::from_static("/store.Api/SyncState"); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new("store.Api", "SyncState")); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } } } @@ -602,6 +604,15 @@ pub mod api_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the SyncState method. + type SyncStateStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::responses::SyncStateResponse, + tonic::Status, + >, + > + + std::marker::Send + + 'static; /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts, notes, nullifiers) the client is interested in. /// @@ -620,10 +631,7 @@ pub mod api_server { async fn sync_state( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ApiServer { @@ -1332,12 +1340,13 @@ pub mod api_server { struct SyncStateSvc(pub Arc); impl< T: Api, - > tonic::server::UnaryService< + > tonic::server::ServerStreamingService< super::super::requests::SyncStateRequest, > for SyncStateSvc { type Response = super::super::responses::SyncStateResponse; + type ResponseStream = T::SyncStateStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( @@ -1370,7 +1379,7 @@ pub mod api_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) diff --git a/crates/rpc-proto/proto/responses.proto b/crates/rpc-proto/proto/responses.proto index f1dfe5f90..e2829a832 100644 --- a/crates/rpc-proto/proto/responses.proto +++ b/crates/rpc-proto/proto/responses.proto @@ -48,27 +48,24 @@ message NullifierUpdate { // Represents the result of syncing state request. message SyncStateResponse { - // Number of the latest block in the chain. - fixed32 chain_tip = 1; - // Block header of the block with the first note matching the specified criteria. - block.BlockHeader block_header = 2; + block.BlockHeader block_header = 1; // Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - mmr.MmrDelta mmr_delta = 3; + mmr.MmrDelta mmr_delta = 2; // List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - repeated account.AccountSummary accounts = 5; + repeated account.AccountSummary accounts = 3; // List of transactions executed against requested accounts between `request.block_num + 1` and // `response.block_header.block_num`. - repeated transaction.TransactionSummary transactions = 6; + repeated transaction.TransactionSummary transactions = 4; // List of all notes together with the Merkle paths from `response.block_header.note_root`. - repeated note.NoteSyncRecord notes = 7; + repeated note.NoteSyncRecord notes = 5; // List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - repeated NullifierUpdate nullifiers = 8; + repeated NullifierUpdate nullifiers = 6; } // Represents the result of syncing notes request. diff --git a/crates/rpc-proto/proto/rpc.proto b/crates/rpc-proto/proto/rpc.proto index 82da3e20c..ccce57d23 100644 --- a/crates/rpc-proto/proto/rpc.proto +++ b/crates/rpc-proto/proto/rpc.proto @@ -51,17 +51,16 @@ service Api { // Returns info which can be used by the client to sync up to the latest state of the chain // for the objects (accounts, notes, nullifiers) the client is interested in. // - // This request returns the next block containing requested data. It also returns `chain_tip` - // which is the latest block number in the chain. Client is expected to repeat these requests - // in a loop until `response.block_header.block_num == response.chain_tip`, at which point - // the client is fully synchronized with the chain. + // This request returns a stream where multiple update responses will be pushed in order. + // Client is expected to read the updates from the stream and apply them, and then it will be + // fully synchronized with the chain. // - // Each request also returns info about new notes, nullifiers etc. created. It also returns + // Each update response also contains info about new notes, nullifiers etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain // MMR peaks and chain MMR nodes. // // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/crates/rpc-proto/proto/store.proto b/crates/rpc-proto/proto/store.proto index 7137121dc..1dbb78f7b 100644 --- a/crates/rpc-proto/proto/store.proto +++ b/crates/rpc-proto/proto/store.proto @@ -74,5 +74,5 @@ service Api { // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index fbb233eb4..ab3b06889 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -27,7 +27,7 @@ use miden_tx::TransactionVerifier; use tonic::{ service::interceptor::InterceptedService, transport::{Channel, Error}, - Request, Response, Status, + Request, Response, Status, Streaming, }; use tracing::{debug, info, instrument}; @@ -71,6 +71,8 @@ impl RpcApi { #[tonic::async_trait] impl api_server::Api for RpcApi { + type SyncStateStream = Streaming; + #[instrument( target = COMPONENT, name = "rpc:check_nullifiers", @@ -130,13 +132,12 @@ impl api_server::Api for RpcApi { target = COMPONENT, name = "rpc:sync_state", skip_all, - ret(level = "debug"), err )] async fn sync_state( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { debug!(target: COMPONENT, request = ?request.get_ref()); self.store.clone().sync_state(request).await diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 24cde86a5..b973fbeba 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, convert::Infallible, sync::Arc}; +use std::{collections::BTreeSet, convert::Infallible, pin::Pin, sync::Arc}; use miden_node_proto::{ convert, @@ -34,6 +34,8 @@ use miden_objects::{ note::{NoteId, Nullifier}, utils::{Deserializable, Serializable}, }; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status}; use tracing::{debug, info, instrument}; @@ -46,8 +48,11 @@ pub struct StoreApi { pub(super) state: Arc, } +type ResponseStream = Pin> + Send>>; + #[tonic::async_trait] impl api_server::Api for StoreApi { + type SyncStateStream = ResponseStream; // CLIENT ENDPOINTS // -------------------------------------------------------------------------------------------- @@ -147,68 +152,97 @@ impl api_server::Api for StoreApi { target = COMPONENT, name = "store.server.sync_state", skip_all, - ret(level = "debug"), err )] async fn sync_state( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { let request = request.into_inner(); - let account_ids: Vec = read_account_ids(&request.account_ids)?; - - let (state, delta) = self - .state - .sync_state( - request.block_num.into(), - account_ids, - request.note_tags, - request.nullifiers, - ) - .await - .map_err(internal_error)?; - - let accounts = state - .account_updates - .into_iter() - .map(|account_info| AccountSummary { - account_id: Some(account_info.account_id.into()), - account_hash: Some(account_info.account_hash.into()), - block_num: account_info.block_num.as_u32(), - }) - .collect(); - - let transactions = state - .transactions - .into_iter() - .map(|transaction_summary| TransactionSummary { - account_id: Some(transaction_summary.account_id.into()), - block_num: transaction_summary.block_num.as_u32(), - transaction_id: Some(transaction_summary.transaction_id.into()), - }) - .collect(); - - let notes = state.notes.into_iter().map(Into::into).collect(); - - let nullifiers = state - .nullifiers - .into_iter() - .map(|nullifier_info| NullifierUpdate { - nullifier: Some(nullifier_info.nullifier.into()), - block_num: nullifier_info.block_num.as_u32(), - }) - .collect(); - - Ok(Response::new(SyncStateResponse { - chain_tip: self.state.latest_block_num().await.as_u32(), - block_header: Some(state.block_header.into()), - mmr_delta: Some(delta.into()), - accounts, - transactions, - notes, - nullifiers, - })) + let (tx, rx) = mpsc::channel(128); // TODO: check bound of the channel + + let state = self.state.clone(); + let mut last_block_num = request.block_num; + let chain_tip = state.latest_block_num().await.as_u32(); + tokio::spawn(async move { + loop { + if last_block_num == chain_tip { + // The state is up to date, no need to sync + break; + } + + let result = async { + let account_ids: Vec = read_account_ids(&request.account_ids)?; + + let (state, delta) = state + .sync_state( + last_block_num.into(), + account_ids, + request.note_tags.clone(), + request.nullifiers.clone(), + ) + .await + .map_err(internal_error)?; + + let accounts = state + .account_updates + .into_iter() + .map(|account_info| AccountSummary { + account_id: Some(account_info.account_id.into()), + account_hash: Some(account_info.account_hash.into()), + block_num: account_info.block_num.as_u32(), + }) + .collect(); + + let transactions = state + .transactions + .into_iter() + .map(|transaction_summary| TransactionSummary { + account_id: Some(transaction_summary.account_id.into()), + block_num: transaction_summary.block_num.as_u32(), + transaction_id: Some(transaction_summary.transaction_id.into()), + }) + .collect(); + + let notes = state.notes.into_iter().map(Into::into).collect(); + + let nullifiers = state + .nullifiers + .into_iter() + .map(|nullifier_info| NullifierUpdate { + nullifier: Some(nullifier_info.nullifier.into()), + block_num: nullifier_info.block_num.as_u32(), + }) + .collect(); + + let response = SyncStateResponse { + block_header: Some(state.block_header.into()), + mmr_delta: Some(delta.into()), + accounts, + transactions, + notes, + nullifiers, + }; + Ok(response) + } + .await; + + if let Ok(response) = &result { + last_block_num = response.block_header.unwrap().block_num; + } + let is_error = result.is_err(); + + tx.send(result).await.map_err(internal_error).unwrap(); + + if is_error { + break; + } + } + }); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::SyncStateStream)) } /// Returns info which can be used by the client to sync note state. diff --git a/proto/responses.proto b/proto/responses.proto index f1dfe5f90..e2829a832 100644 --- a/proto/responses.proto +++ b/proto/responses.proto @@ -48,27 +48,24 @@ message NullifierUpdate { // Represents the result of syncing state request. message SyncStateResponse { - // Number of the latest block in the chain. - fixed32 chain_tip = 1; - // Block header of the block with the first note matching the specified criteria. - block.BlockHeader block_header = 2; + block.BlockHeader block_header = 1; // Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - mmr.MmrDelta mmr_delta = 3; + mmr.MmrDelta mmr_delta = 2; // List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - repeated account.AccountSummary accounts = 5; + repeated account.AccountSummary accounts = 3; // List of transactions executed against requested accounts between `request.block_num + 1` and // `response.block_header.block_num`. - repeated transaction.TransactionSummary transactions = 6; + repeated transaction.TransactionSummary transactions = 4; // List of all notes together with the Merkle paths from `response.block_header.note_root`. - repeated note.NoteSyncRecord notes = 7; + repeated note.NoteSyncRecord notes = 5; // List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - repeated NullifierUpdate nullifiers = 8; + repeated NullifierUpdate nullifiers = 6; } // Represents the result of syncing notes request. diff --git a/proto/rpc.proto b/proto/rpc.proto index 82da3e20c..ccce57d23 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -51,17 +51,16 @@ service Api { // Returns info which can be used by the client to sync up to the latest state of the chain // for the objects (accounts, notes, nullifiers) the client is interested in. // - // This request returns the next block containing requested data. It also returns `chain_tip` - // which is the latest block number in the chain. Client is expected to repeat these requests - // in a loop until `response.block_header.block_num == response.chain_tip`, at which point - // the client is fully synchronized with the chain. + // This request returns a stream where multiple update responses will be pushed in order. + // Client is expected to read the updates from the stream and apply them, and then it will be + // fully synchronized with the chain. // - // Each request also returns info about new notes, nullifiers etc. created. It also returns + // Each update response also contains info about new notes, nullifiers etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain // MMR peaks and chain MMR nodes. // // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/proto/store.proto b/proto/store.proto index 7137121dc..1dbb78f7b 100644 --- a/proto/store.proto +++ b/proto/store.proto @@ -74,5 +74,5 @@ service Api { // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } From aa2dc07613ff8ccb735b98237d77c6767f26ffef Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Mon, 17 Feb 2025 11:49:21 -0300 Subject: [PATCH 2/7] fix: add safety comment and replace unwrap with expect --- crates/store/src/server/api.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index b973fbeba..83760939b 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -229,11 +229,12 @@ impl api_server::Api for StoreApi { .await; if let Ok(response) = &result { + // SAFETY: SyncStateResponse always has a block header last_block_num = response.block_header.unwrap().block_num; } let is_error = result.is_err(); - tx.send(result).await.map_err(internal_error).unwrap(); + tx.send(result).await.expect("error sending sync state response to the client"); if is_error { break; From eb2de00d00005206551b672093506b7ce24e386e Mon Sep 17 00:00:00 2001 From: Tomas Date: Thu, 20 Feb 2025 15:53:05 -0300 Subject: [PATCH 3/7] feat: add block_num parameter to check_nullifiers_by_prefix (#707) * feat: add block_num parameter to check_nullifiers_by_prefix * chore: update CHANGELOG * review: add `block_num` parameter as required * review: update CHANGELOG * review: update doc comment * chore: update rpc README --- CHANGELOG.md | 1 + crates/proto/src/generated/requests.rs | 3 +++ crates/rpc-proto/proto/requests.proto | 2 ++ crates/rpc/README.md | 2 +- crates/store/src/db/mod.rs | 3 ++- crates/store/src/db/sql/mod.rs | 10 ++++----- crates/store/src/db/tests.rs | 29 +++++++++++++++++++++++++- crates/store/src/server/api.rs | 6 +++++- crates/store/src/state.rs | 5 ++++- proto/requests.proto | 2 ++ 10 files changed, 53 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed9872ec1..8e53515b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [BREAKING] Updated minimum Rust version to 1.84. - [BREAKING] `Endpoint` configuration simplified to a single string (#654). +- [BREAKING] `CheckNullifiersByPrefix` now takes a starting block number (#707). - [BREAKING] Changed sync state endpoint to stream the response (#685). ### Enhancements diff --git a/crates/proto/src/generated/requests.rs b/crates/proto/src/generated/requests.rs index c8e19bb29..dc1bd3e7b 100644 --- a/crates/proto/src/generated/requests.rs +++ b/crates/proto/src/generated/requests.rs @@ -17,6 +17,9 @@ pub struct CheckNullifiersByPrefixRequest { /// to `prefix_len`. #[prost(uint32, repeated, tag = "2")] pub nullifiers: ::prost::alloc::vec::Vec, + /// Block number from which the nullifiers are requested (inclusive). + #[prost(fixed32, tag = "3")] + pub block_num: u32, } /// Returns a nullifier proof for each of the requested nullifiers. #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/crates/rpc-proto/proto/requests.proto b/crates/rpc-proto/proto/requests.proto index f2323c56c..cd0bb960d 100644 --- a/crates/rpc-proto/proto/requests.proto +++ b/crates/rpc-proto/proto/requests.proto @@ -18,6 +18,8 @@ message CheckNullifiersByPrefixRequest { // List of nullifiers to check. Each nullifier is specified by its prefix with length equal // to `prefix_len`. repeated uint32 nullifiers = 2; + // Block number from which the nullifiers are requested (inclusive). + fixed32 block_num = 3; } // Returns a nullifier proof for each of the requested nullifiers. diff --git a/crates/rpc/README.md b/crates/rpc/README.md index e8e7e6492..9dd4375cd 100644 --- a/crates/rpc/README.md +++ b/crates/rpc/README.md @@ -37,7 +37,7 @@ Returns a nullifier proof for each of the requested nullifiers. ### CheckNullifiersByPrefix -Returns a list of nullifiers that match the specified prefixes and are recorded in the node. +Returns a list of nullifiers recorded in the node that match the specified prefixes and block creation height. Only 16-bit prefixes are supported at this time. diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 51a678e22..fc6bbc0ec 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -215,12 +215,13 @@ impl Db { &self, prefix_len: u32, nullifier_prefixes: Vec, + block_num: BlockNumber, ) -> Result> { self.pool .get() .await? .interact(move |conn| { - sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes) + sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes, block_num) }) .await .map_err(|err| { diff --git a/crates/store/src/db/sql/mod.rs b/crates/store/src/db/sql/mod.rs index 7ade42670..1a6742f04 100644 --- a/crates/store/src/db/sql/mod.rs +++ b/crates/store/src/db/sql/mod.rs @@ -665,8 +665,7 @@ pub fn select_nullifiers_by_block_range( Ok(result) } -/// Select nullifiers created that match the `nullifier_prefixes` filter using the given -/// [Connection]. +/// Returns nullifiers filtered by prefix and block creation height. /// /// Each value of the `nullifier_prefixes` is only the `prefix_len` most significant bits /// of the nullifier of interest to the client. This hides the details of the specific @@ -680,6 +679,7 @@ pub fn select_nullifiers_by_prefix( conn: &mut Connection, prefix_len: u32, nullifier_prefixes: &[u32], + block_num: BlockNumber, ) -> Result> { assert_eq!(prefix_len, 16, "Only 16-bit prefixes are supported"); @@ -694,13 +694,13 @@ pub fn select_nullifiers_by_prefix( FROM nullifiers WHERE - nullifier_prefix IN rarray(?1) + nullifier_prefix IN rarray(?1) AND + block_num >= ?2 ORDER BY block_num ASC ", )?; - - let mut rows = stmt.query(params![Rc::new(nullifier_prefixes)])?; + let mut rows = stmt.query(params![Rc::new(nullifier_prefixes), block_num.as_u32()])?; let mut result = Vec::new(); while let Some(row) = rows.next()? { diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index fe8d1684c..4d89e1cf9 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -619,7 +619,9 @@ fn select_nullifiers_by_prefix() { const PREFIX_LEN: u32 = 16; let mut conn = create_db(); // test empty table - let nullifiers = sql::select_nullifiers_by_prefix(&mut conn, PREFIX_LEN, &[]).unwrap(); + let block_number0 = 0.into(); + let nullifiers = + sql::select_nullifiers_by_prefix(&mut conn, PREFIX_LEN, &[], block_number0).unwrap(); assert!(nullifiers.is_empty()); // test single item @@ -635,6 +637,7 @@ fn select_nullifiers_by_prefix() { &mut conn, PREFIX_LEN, &[sql::utils::get_nullifier_prefix(&nullifier1)], + block_number0, ) .unwrap(); assert_eq!( @@ -662,6 +665,7 @@ fn select_nullifiers_by_prefix() { &mut conn, PREFIX_LEN, &[sql::utils::get_nullifier_prefix(&nullifier1)], + block_number0, ) .unwrap(); assert_eq!( @@ -675,6 +679,7 @@ fn select_nullifiers_by_prefix() { &mut conn, PREFIX_LEN, &[sql::utils::get_nullifier_prefix(&nullifier2)], + block_number0, ) .unwrap(); assert_eq!( @@ -693,6 +698,7 @@ fn select_nullifiers_by_prefix() { sql::utils::get_nullifier_prefix(&nullifier1), sql::utils::get_nullifier_prefix(&nullifier2), ], + block_number0, ) .unwrap(); assert_eq!( @@ -714,9 +720,30 @@ fn select_nullifiers_by_prefix() { &mut conn, PREFIX_LEN, &[sql::utils::get_nullifier_prefix(&num_to_nullifier(3 << 48))], + block_number0, ) .unwrap(); assert!(nullifiers.is_empty()); + + // If a block number is provided, only matching nullifiers created at or after that block are + // returned + let nullifiers = sql::select_nullifiers_by_prefix( + &mut conn, + PREFIX_LEN, + &[ + sql::utils::get_nullifier_prefix(&nullifier1), + sql::utils::get_nullifier_prefix(&nullifier2), + ], + block_number2, + ) + .unwrap(); + assert_eq!( + nullifiers, + vec![NullifierInfo { + nullifier: nullifier2, + block_num: block_number2 + }] + ); } #[test] diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 83760939b..a3a000fbb 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -134,7 +134,11 @@ impl api_server::Api for StoreApi { let nullifiers = self .state - .check_nullifiers_by_prefix(request.prefix_len, request.nullifiers) + .check_nullifiers_by_prefix( + request.prefix_len, + request.nullifiers, + BlockNumber::from(request.block_num), + ) .await? .into_iter() .map(|nullifier_info| NullifierUpdate { diff --git a/crates/store/src/state.rs b/crates/store/src/state.rs index 8b8bcf695..5df4e94a3 100644 --- a/crates/store/src/state.rs +++ b/crates/store/src/state.rs @@ -411,8 +411,11 @@ impl State { &self, prefix_len: u32, nullifier_prefixes: Vec, + block_num: BlockNumber, ) -> Result, DatabaseError> { - self.db.select_nullifiers_by_prefix(prefix_len, nullifier_prefixes).await + self.db + .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_num) + .await } /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier diff --git a/proto/requests.proto b/proto/requests.proto index f2323c56c..cd0bb960d 100644 --- a/proto/requests.proto +++ b/proto/requests.proto @@ -18,6 +18,8 @@ message CheckNullifiersByPrefixRequest { // List of nullifiers to check. Each nullifier is specified by its prefix with length equal // to `prefix_len`. repeated uint32 nullifiers = 2; + // Block number from which the nullifiers are requested (inclusive). + fixed32 block_num = 3; } // Returns a nullifier proof for each of the requested nullifiers. From bac6a7351c4a716c41558bcddcb0bb5c65c57e96 Mon Sep 17 00:00:00 2001 From: Tomas Date: Thu, 20 Feb 2025 17:59:32 -0300 Subject: [PATCH 4/7] feat: remove nullifiers from SyncState endpoint (#708) * feat: add block_num parameter to check_nullifiers_by_prefix * chore: update CHANGELOG * review: add `block_num` parameter as required * review: update CHANGELOG * review: update doc comment * feat: remove nullifiers from `SyncState` endpoint * chore: update doc comments * chore: update CHANGELOG * chore: update rpc README --- CHANGELOG.md | 1 + crates/proto/src/generated/requests.rs | 6 +- crates/proto/src/generated/responses.rs | 3 - crates/proto/src/generated/rpc.rs | 16 +-- crates/rpc-proto/proto/requests.proto | 6 +- crates/rpc-proto/proto/responses.proto | 3 - crates/rpc-proto/proto/rpc.proto | 8 +- crates/rpc/README.md | 17 ++-- crates/store/src/db/mod.rs | 6 +- crates/store/src/db/sql/mod.rs | 57 ----------- crates/store/src/db/tests.rs | 128 ------------------------ crates/store/src/server/api.rs | 17 +--- crates/store/src/state.rs | 8 +- proto/requests.proto | 6 +- proto/responses.proto | 3 - proto/rpc.proto | 8 +- 16 files changed, 31 insertions(+), 262 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e53515b8..8c6045682 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - [BREAKING] Updated minimum Rust version to 1.84. - [BREAKING] `Endpoint` configuration simplified to a single string (#654). - [BREAKING] `CheckNullifiersByPrefix` now takes a starting block number (#707). +- [BREAKING] Removed nullifiers from `SyncState` endpoint (#708). - [BREAKING] Changed sync state endpoint to stream the response (#685). ### Enhancements diff --git a/crates/proto/src/generated/requests.rs b/crates/proto/src/generated/requests.rs index dc1bd3e7b..9194e084f 100644 --- a/crates/proto/src/generated/requests.rs +++ b/crates/proto/src/generated/requests.rs @@ -45,7 +45,7 @@ pub struct GetBlockHeaderByNumberRequest { /// /// Specifies state updates the client is interested in. The server will return the first block which /// contains a note matching `note_tags` or the chain tip. And the corresponding updates to -/// `nullifiers` and `account_ids` for that block range. +/// `account_ids` for that block range. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SyncStateRequest { /// Last block known by the client. The response will contain data starting from the next block, @@ -63,10 +63,6 @@ pub struct SyncStateRequest { /// Specifies the tags which the client is interested in. #[prost(fixed32, repeated, tag = "3")] pub note_tags: ::prost::alloc::vec::Vec, - /// Determines the nullifiers the client is interested in by specifying the 16high bits of the - /// target nullifier. - #[prost(uint32, repeated, tag = "4")] - pub nullifiers: ::prost::alloc::vec::Vec, } /// Note synchronization request. /// diff --git a/crates/proto/src/generated/responses.rs b/crates/proto/src/generated/responses.rs index 3f9969446..b5de6e749 100644 --- a/crates/proto/src/generated/responses.rs +++ b/crates/proto/src/generated/responses.rs @@ -58,9 +58,6 @@ pub struct SyncStateResponse { /// List of all notes together with the Merkle paths from `response.block_header.note_root`. #[prost(message, repeated, tag = "5")] pub notes: ::prost::alloc::vec::Vec, - /// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - #[prost(message, repeated, tag = "6")] - pub nullifiers: ::prost::alloc::vec::Vec, } /// Represents the result of syncing notes request. #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/crates/proto/src/generated/rpc.rs b/crates/proto/src/generated/rpc.rs index de205e75b..74767ee1f 100644 --- a/crates/proto/src/generated/rpc.rs +++ b/crates/proto/src/generated/rpc.rs @@ -353,18 +353,18 @@ pub mod api_client { self.inner.unary(req, path, codec).await } /// Returns info which can be used by the client to sync up to the latest state of the chain - /// for the objects (accounts, notes, nullifiers) the client is interested in. + /// for the objects (accounts and notes) the client is interested in. /// /// This request returns a stream where multiple update responses will be pushed in order. /// Client is expected to read the updates from the stream and apply them, and then it will be /// fully synchronized with the chain. /// - /// Each update response also contains info about new notes, nullifiers etc. created. It also returns + /// Each update response also contains info about new notes, accounts etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain /// MMR peaks and chain MMR nodes. /// - /// For preserving some degree of privacy, note tags and nullifiers filters contain only high - /// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make + /// For preserving some degree of privacy, note tags contain only high + /// part of hashes. Thus, returned data contains excessive notes, client can make /// additional filtering of that data on its side. pub async fn sync_state( &mut self, @@ -512,18 +512,18 @@ pub mod api_server { + std::marker::Send + 'static; /// Returns info which can be used by the client to sync up to the latest state of the chain - /// for the objects (accounts, notes, nullifiers) the client is interested in. + /// for the objects (accounts and notes) the client is interested in. /// /// This request returns a stream where multiple update responses will be pushed in order. /// Client is expected to read the updates from the stream and apply them, and then it will be /// fully synchronized with the chain. /// - /// Each update response also contains info about new notes, nullifiers etc. created. It also returns + /// Each update response also contains info about new notes, accounts etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain /// MMR peaks and chain MMR nodes. /// - /// For preserving some degree of privacy, note tags and nullifiers filters contain only high - /// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make + /// For preserving some degree of privacy, note tags contain only high + /// part of hashes. Thus, returned data contains excessive notes, client can make /// additional filtering of that data on its side. async fn sync_state( &self, diff --git a/crates/rpc-proto/proto/requests.proto b/crates/rpc-proto/proto/requests.proto index cd0bb960d..bf9fd557a 100644 --- a/crates/rpc-proto/proto/requests.proto +++ b/crates/rpc-proto/proto/requests.proto @@ -43,7 +43,7 @@ message GetBlockHeaderByNumberRequest { // // Specifies state updates the client is interested in. The server will return the first block which // contains a note matching `note_tags` or the chain tip. And the corresponding updates to -// `nullifiers` and `account_ids` for that block range. +// `account_ids` for that block range. message SyncStateRequest { // Last block known by the client. The response will contain data starting from the next block, // until the first block which contains a note of matching the requested tag, or the chain tip @@ -59,10 +59,6 @@ message SyncStateRequest { // Specifies the tags which the client is interested in. repeated fixed32 note_tags = 3; - - // Determines the nullifiers the client is interested in by specifying the 16high bits of the - // target nullifier. - repeated uint32 nullifiers = 4; } // Note synchronization request. diff --git a/crates/rpc-proto/proto/responses.proto b/crates/rpc-proto/proto/responses.proto index e2829a832..e651d9d35 100644 --- a/crates/rpc-proto/proto/responses.proto +++ b/crates/rpc-proto/proto/responses.proto @@ -63,9 +63,6 @@ message SyncStateResponse { // List of all notes together with the Merkle paths from `response.block_header.note_root`. repeated note.NoteSyncRecord notes = 5; - - // List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - repeated NullifierUpdate nullifiers = 6; } // Represents the result of syncing notes request. diff --git a/crates/rpc-proto/proto/rpc.proto b/crates/rpc-proto/proto/rpc.proto index ccce57d23..247c51ede 100644 --- a/crates/rpc-proto/proto/rpc.proto +++ b/crates/rpc-proto/proto/rpc.proto @@ -49,18 +49,18 @@ service Api { rpc SyncNotes(requests.SyncNoteRequest) returns (responses.SyncNoteResponse) {} // Returns info which can be used by the client to sync up to the latest state of the chain - // for the objects (accounts, notes, nullifiers) the client is interested in. + // for the objects (accounts and notes) the client is interested in. // // This request returns a stream where multiple update responses will be pushed in order. // Client is expected to read the updates from the stream and apply them, and then it will be // fully synchronized with the chain. // - // Each update response also contains info about new notes, nullifiers etc. created. It also returns + // Each update response also contains info about new notes, accounts etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain // MMR peaks and chain MMR nodes. // - // For preserving some degree of privacy, note tags and nullifiers filters contain only high - // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make + // For preserving some degree of privacy, note tags contain only high + // part of hashes. Thus, returned data contains excessive notes, client can make // additional filtering of that data on its side. rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/crates/rpc/README.md b/crates/rpc/README.md index 9dd4375cd..6f0f40048 100644 --- a/crates/rpc/README.md +++ b/crates/rpc/README.md @@ -102,18 +102,17 @@ the chain. ### SyncState -Returns info which can be used by the client to sync up to the latest state of the chain for the objects (accounts, -notes, nullifiers) the client is interested in. +Returns info which can be used by the client to sync up to the latest state of the chain for the objects (accounts +and notes) the client is interested in. -This request returns the next block containing requested data. It also returns `chain_tip` which is the latest block -number in the chain. Client is expected to repeat these requests in a loop until -`response.block_header.block_num == response.chain_tip`, at which point the client is fully synchronized with the chain. +This request returns a stream where multiple update responses will be pushed in order. Client is expected to read +the updates from the stream and apply them, and then it will be fully synchronized with the chain. -Each request also returns info about new notes, nullifiers etc. created. It also returns Chain MMR delta that can be -used to update the state of Chain MMR. This includes both chain MMR peaks and chain MMR nodes. +Each update response also contains info about new notes, accounts etc. created. It also returns Chain MMR delta +that can be used to update the state of Chain MMR. This includes both chain MMR peaks and chain MMR nodes. -For preserving some degree of privacy, note tags and nullifiers filters contain only high part of hashes. Thus, returned -data contains excessive notes and nullifiers, client can make additional filtering of that data on its side. +For preserving some degree of privacy, note tags contain only high part of hashes. Thus, returned data contains +excessive notes, client can make additional filtering of that data on its side. --- diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index fc6bbc0ec..a00094f54 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -85,7 +85,6 @@ pub struct StateSyncUpdate { pub block_header: BlockHeader, pub account_updates: Vec, pub transactions: Vec, - pub nullifiers: Vec, } #[derive(Debug, PartialEq)] @@ -328,15 +327,12 @@ impl Db { block_num: BlockNumber, account_ids: Vec, note_tags: Vec, - nullifier_prefixes: Vec, ) -> Result { self.pool .get() .await .map_err(DatabaseError::MissingDbConnection)? - .interact(move |conn| { - sql::get_state_sync(conn, block_num, &account_ids, ¬e_tags, &nullifier_prefixes) - }) + .interact(move |conn| sql::get_state_sync(conn, block_num, &account_ids, ¬e_tags)) .await .map_err(|err| { DatabaseError::InteractError(format!("Get state sync task failed: {err}")) diff --git a/crates/store/src/db/sql/mod.rs b/crates/store/src/db/sql/mod.rs index 1a6742f04..87ff16c3d 100644 --- a/crates/store/src/db/sql/mod.rs +++ b/crates/store/src/db/sql/mod.rs @@ -617,54 +617,6 @@ pub fn select_all_nullifiers(conn: &mut Connection) -> Result Result> { - let nullifier_prefixes: Vec = - nullifier_prefixes.iter().copied().map(Into::into).collect(); - - let mut stmt = conn.prepare_cached( - " - SELECT - nullifier, - block_num - FROM - nullifiers - WHERE - block_num > ?1 AND - block_num <= ?2 AND - nullifier_prefix IN rarray(?3) - ORDER BY - block_num ASC - ", - )?; - - let mut rows = - stmt.query(params![block_start.as_u32(), block_end.as_u32(), Rc::new(nullifier_prefixes)])?; - - let mut result = Vec::new(); - while let Some(row) = rows.next()? { - let nullifier_data = row.get_ref(0)?.as_blob()?; - let nullifier = Nullifier::read_from_bytes(nullifier_data)?; - let block_num: u32 = row.get(1)?; - result.push(NullifierInfo { nullifier, block_num: block_num.into() }); - } - Ok(result) -} - /// Returns nullifiers filtered by prefix and block creation height. /// /// Each value of the `nullifier_prefixes` is only the `prefix_len` most significant bits @@ -1195,7 +1147,6 @@ pub fn get_state_sync( block_num: BlockNumber, account_ids: &[AccountId], note_tag_prefixes: &[u32], - nullifier_prefixes: &[u32], ) -> Result { let notes = select_notes_since_block_by_tag_and_sender( conn, @@ -1218,19 +1169,11 @@ pub fn get_state_sync( account_ids, )?; - let nullifiers = select_nullifiers_by_block_range( - conn, - block_num, - block_header.block_num(), - nullifier_prefixes, - )?; - Ok(StateSyncUpdate { notes, block_header, account_updates, transactions, - nullifiers, }) } diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index 4d89e1cf9..768d6b2ff 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -486,134 +486,6 @@ fn sql_public_account_details() { assert_eq!(read_delta, Some(delta2)); } -#[test] -fn sql_select_nullifiers_by_block_range() { - let mut conn = create_db(); - - // test empty table - let nullifiers = - sql::select_nullifiers_by_block_range(&mut conn, 0.into(), u32::MAX.into(), &[]).unwrap(); - assert!(nullifiers.is_empty()); - - // test single item - let nullifier1 = num_to_nullifier(1 << 48); - let block_number1 = 1.into(); - create_block(&mut conn, block_number1); - - let transaction = conn.transaction().unwrap(); - sql::insert_nullifiers_for_block(&transaction, &[nullifier1], block_number1).unwrap(); - transaction.commit().unwrap(); - - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 0.into(), - u32::MAX.into(), - &[sql::utils::get_nullifier_prefix(&nullifier1)], - ) - .unwrap(); - assert_eq!( - nullifiers, - vec![NullifierInfo { - nullifier: nullifier1, - block_num: block_number1 - }] - ); - - // test two elements - let nullifier2 = num_to_nullifier(2 << 48); - let block_number2 = 2.into(); - create_block(&mut conn, block_number2); - - let transaction = conn.transaction().unwrap(); - sql::insert_nullifiers_for_block(&transaction, &[nullifier2], block_number2).unwrap(); - transaction.commit().unwrap(); - - let nullifiers = sql::select_all_nullifiers(&mut conn).unwrap(); - assert_eq!(nullifiers, vec![(nullifier1, block_number1), (nullifier2, block_number2)]); - - // only the nullifiers matching the prefix are included - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 0.into(), - u32::MAX.into(), - &[sql::utils::get_nullifier_prefix(&nullifier1)], - ) - .unwrap(); - assert_eq!( - nullifiers, - vec![NullifierInfo { - nullifier: nullifier1, - block_num: block_number1 - }] - ); - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 0.into(), - u32::MAX.into(), - &[sql::utils::get_nullifier_prefix(&nullifier2)], - ) - .unwrap(); - assert_eq!( - nullifiers, - vec![NullifierInfo { - nullifier: nullifier2, - block_num: block_number2 - }] - ); - - // Nullifiers created at block_end are included - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 0.into(), - 1.into(), - &[ - sql::utils::get_nullifier_prefix(&nullifier1), - sql::utils::get_nullifier_prefix(&nullifier2), - ], - ) - .unwrap(); - assert_eq!( - nullifiers, - vec![NullifierInfo { - nullifier: nullifier1, - block_num: block_number1 - }] - ); - - // Nullifiers created at block_start are not included - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 1.into(), - u32::MAX.into(), - &[ - sql::utils::get_nullifier_prefix(&nullifier1), - sql::utils::get_nullifier_prefix(&nullifier2), - ], - ) - .unwrap(); - assert_eq!( - nullifiers, - vec![NullifierInfo { - nullifier: nullifier2, - block_num: block_number2 - }] - ); - - // When block start and end are the same, no nullifiers should be returned. This case happens - // when the client requests a sync update, and it is already tracking the chain tip. - let nullifiers = sql::select_nullifiers_by_block_range( - &mut conn, - 2.into(), - 2.into(), - &[ - sql::utils::get_nullifier_prefix(&nullifier1), - sql::utils::get_nullifier_prefix(&nullifier2), - ], - ) - .unwrap(); - assert!(nullifiers.is_empty()); -} - #[test] fn select_nullifiers_by_prefix() { const PREFIX_LEN: u32 = 16; diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index a3a000fbb..3a10af9de 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -180,12 +180,7 @@ impl api_server::Api for StoreApi { let account_ids: Vec = read_account_ids(&request.account_ids)?; let (state, delta) = state - .sync_state( - last_block_num.into(), - account_ids, - request.note_tags.clone(), - request.nullifiers.clone(), - ) + .sync_state(last_block_num.into(), account_ids, request.note_tags.clone()) .await .map_err(internal_error)?; @@ -211,22 +206,12 @@ impl api_server::Api for StoreApi { let notes = state.notes.into_iter().map(Into::into).collect(); - let nullifiers = state - .nullifiers - .into_iter() - .map(|nullifier_info| NullifierUpdate { - nullifier: Some(nullifier_info.nullifier.into()), - block_num: nullifier_info.block_num.as_u32(), - }) - .collect(); - let response = SyncStateResponse { block_header: Some(state.block_header.into()), mmr_delta: Some(delta.into()), accounts, transactions, notes, - nullifiers, }; Ok(response) } diff --git a/crates/store/src/state.rs b/crates/store/src/state.rs index 5df4e94a3..e3f95d34a 100644 --- a/crates/store/src/state.rs +++ b/crates/store/src/state.rs @@ -646,22 +646,16 @@ impl State { /// range. /// - `note_tags`: The tags the client is interested in, result is restricted to the first block /// with any matches tags. - /// - `nullifier_prefixes`: Only the 16 high bits of the nullifiers the client is interested in, - /// results will include nullifiers matching prefixes produced in the given block range. #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn sync_state( &self, block_num: BlockNumber, account_ids: Vec, note_tags: Vec, - nullifier_prefixes: Vec, ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> { let inner = self.inner.read().await; - let state_sync = self - .db - .get_state_sync(block_num, account_ids, note_tags, nullifier_prefixes) - .await?; + let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?; let delta = if block_num == state_sync.block_header.block_num() { // The client is in sync with the chain tip. diff --git a/proto/requests.proto b/proto/requests.proto index cd0bb960d..bf9fd557a 100644 --- a/proto/requests.proto +++ b/proto/requests.proto @@ -43,7 +43,7 @@ message GetBlockHeaderByNumberRequest { // // Specifies state updates the client is interested in. The server will return the first block which // contains a note matching `note_tags` or the chain tip. And the corresponding updates to -// `nullifiers` and `account_ids` for that block range. +// `account_ids` for that block range. message SyncStateRequest { // Last block known by the client. The response will contain data starting from the next block, // until the first block which contains a note of matching the requested tag, or the chain tip @@ -59,10 +59,6 @@ message SyncStateRequest { // Specifies the tags which the client is interested in. repeated fixed32 note_tags = 3; - - // Determines the nullifiers the client is interested in by specifying the 16high bits of the - // target nullifier. - repeated uint32 nullifiers = 4; } // Note synchronization request. diff --git a/proto/responses.proto b/proto/responses.proto index e2829a832..e651d9d35 100644 --- a/proto/responses.proto +++ b/proto/responses.proto @@ -63,9 +63,6 @@ message SyncStateResponse { // List of all notes together with the Merkle paths from `response.block_header.note_root`. repeated note.NoteSyncRecord notes = 5; - - // List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`. - repeated NullifierUpdate nullifiers = 6; } // Represents the result of syncing notes request. diff --git a/proto/rpc.proto b/proto/rpc.proto index ccce57d23..247c51ede 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -49,18 +49,18 @@ service Api { rpc SyncNotes(requests.SyncNoteRequest) returns (responses.SyncNoteResponse) {} // Returns info which can be used by the client to sync up to the latest state of the chain - // for the objects (accounts, notes, nullifiers) the client is interested in. + // for the objects (accounts and notes) the client is interested in. // // This request returns a stream where multiple update responses will be pushed in order. // Client is expected to read the updates from the stream and apply them, and then it will be // fully synchronized with the chain. // - // Each update response also contains info about new notes, nullifiers etc. created. It also returns + // Each update response also contains info about new notes, accounts etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain // MMR peaks and chain MMR nodes. // - // For preserving some degree of privacy, note tags and nullifiers filters contain only high - // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make + // For preserving some degree of privacy, note tags contain only high + // part of hashes. Thus, returned data contains excessive notes, client can make // additional filtering of that data on its side. rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } From 77b6d986e3734bb93fcb8c5b5047b000173d647e Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Thu, 20 Feb 2025 18:09:07 -0300 Subject: [PATCH 5/7] review: update rpc README --- crates/rpc/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/rpc/README.md b/crates/rpc/README.md index 6f0f40048..9b7c6ca9e 100644 --- a/crates/rpc/README.md +++ b/crates/rpc/README.md @@ -37,7 +37,8 @@ Returns a nullifier proof for each of the requested nullifiers. ### CheckNullifiersByPrefix -Returns a list of nullifiers recorded in the node that match the specified prefixes and block creation height. +Returns a list of nullifiers recorded in the node that match the specified prefixes and were created at or after +the given block height. Only 16-bit prefixes are supported at this time. From c3b242b9302f9107abbe49ba51cdf93285ed392e Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 26 Feb 2025 20:37:16 -0300 Subject: [PATCH 6/7] review: sync until empty delta and add timeout --- crates/store/src/server/api.rs | 65 ++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 3a10af9de..85e05fc1b 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, convert::Infallible, pin::Pin, sync::Arc}; +use std::{collections::BTreeSet, convert::Infallible, pin::Pin, sync::Arc, time::Duration}; use miden_node_proto::{ convert, @@ -34,12 +34,12 @@ use miden_objects::{ note::{NoteId, Nullifier}, utils::{Deserializable, Serializable}, }; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::timeout}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status}; use tracing::{debug, info, instrument}; -use crate::{state::State, COMPONENT}; +use crate::{errors::StateSyncError, state::State, COMPONENT}; // STORE API // ================================================================================================ @@ -164,25 +164,20 @@ impl api_server::Api for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let (tx, rx) = mpsc::channel(128); // TODO: check bound of the channel + let (sender, receiver) = mpsc::channel(128); // TODO: check bound of the channel let state = self.state.clone(); - let mut last_block_num = request.block_num; - let chain_tip = state.latest_block_num().await.as_u32(); + let account_ids: Vec = read_account_ids(&request.account_ids)?; tokio::spawn(async move { loop { - if last_block_num == chain_tip { - // The state is up to date, no need to sync - break; - } - - let result = async { - let account_ids: Vec = read_account_ids(&request.account_ids)?; - + let sync_result: Result = async { let (state, delta) = state - .sync_state(last_block_num.into(), account_ids, request.note_tags.clone()) - .await - .map_err(internal_error)?; + .sync_state( + request.block_num.into(), + account_ids.clone(), + request.note_tags.clone(), + ) + .await?; let accounts = state .account_updates @@ -217,21 +212,39 @@ impl api_server::Api for StoreApi { } .await; - if let Ok(response) = &result { - // SAFETY: SyncStateResponse always has a block header - last_block_num = response.block_header.unwrap().block_num; + // If the mmr_delta is empty, we have already reached the latest state + if let Ok(response) = &sync_result { + if response.mmr_delta.as_ref().map_or(true, |d| d.data.is_empty()) { + break; + } } - let is_error = result.is_err(); - - tx.send(result).await.expect("error sending sync state response to the client"); - - if is_error { + let sync_failed = sync_result.is_err(); + let response = sync_result + .inspect_err(|err| info!(target: COMPONENT, "Sync state failed: {}", err)) + .map_err(internal_error); + + match timeout(Duration::from_secs(10), sender.send(response)).await { + Ok(Ok(())) => {}, + Ok(Err(e)) => { + info!(target: COMPONENT, "Failed to send sync state response: {}", e); + break; + }, + Err(e) => { + info!( + target: COMPONENT, + "Failed to send sync state response: timeout after {}", e + ); + break; + }, + }; + + if sync_failed { break; } } }); - let output_stream = ReceiverStream::new(rx); + let output_stream = ReceiverStream::new(receiver); Ok(Response::new(Box::pin(output_stream) as Self::SyncStateStream)) } From 49f8325835f7cb2aed85b34bbf024d46d868bbb2 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Thu, 27 Feb 2025 15:21:27 -0300 Subject: [PATCH 7/7] review: fix sync breaking condition --- crates/store/src/server/api.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 85e05fc1b..b2aabd6c5 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -169,11 +169,12 @@ impl api_server::Api for StoreApi { let state = self.state.clone(); let account_ids: Vec = read_account_ids(&request.account_ids)?; tokio::spawn(async move { + let mut last_block_num = request.block_num; loop { let sync_result: Result = async { let (state, delta) = state .sync_state( - request.block_num.into(), + last_block_num.into(), account_ids.clone(), request.note_tags.clone(), ) @@ -212,11 +213,12 @@ impl api_server::Api for StoreApi { } .await; - // If the mmr_delta is empty, we have already reached the latest state if let Ok(response) = &sync_result { - if response.mmr_delta.as_ref().map_or(true, |d| d.data.is_empty()) { + // If the response is empty, we have already reached the latest state + if sync_response_is_empty(response) { break; } + last_block_num = response.block_header.unwrap().block_num; } let sync_failed = sync_result.is_err(); let response = sync_result @@ -587,6 +589,13 @@ fn read_account_id(id: Option) -> Result bool { + response.mmr_delta.as_ref().map_or(true, |d| d.data.is_empty()) + && response.accounts.is_empty() + && response.transactions.is_empty() + && response.notes.is_empty() +} + #[instrument(target = COMPONENT, skip_all, err)] fn read_account_ids( account_ids: &[generated::account::AccountId],