diff --git a/.gitignore b/.gitignore index b3359a675..809536ce0 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,4 @@ env/ *.out node_modules/ *DS_Store -*.iml \ No newline at end of file +*.iml diff --git a/CHANGELOG.md b/CHANGELOG.md index e51e27f17..e19800f32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - [BREAKING] `Endpoint` configuration simplified to a single string (#654). - [BREAKING] `CheckNullifiersByPrefix` now takes a starting block number (#707). - [BREAKING] Removed nullifiers from `SyncState` endpoint (#708). +- [BREAKING] Changed sync state endpoint to stream the response (#685). ### Enhancements diff --git a/Makefile b/Makefile index 8514edf2f..6a8c84b4d 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ check: ## Check all targets and features for errors without code generation # --- building ------------------------------------------------------------------------------------ .PHONY: build -build: ## Builds all crates and re-builds ptotobuf bindings for proto crates +build: ## Builds all crates and re-builds protobuf bindings for proto crates ${BUILD_PROTO} cargo build --locked # --- installing ---------------------------------------------------------------------------------- diff --git a/crates/proto/src/generated/responses.rs b/crates/proto/src/generated/responses.rs index ac764735d..b5de6e749 100644 --- a/crates/proto/src/generated/responses.rs +++ b/crates/proto/src/generated/responses.rs @@ -42,24 +42,21 @@ pub struct NullifierUpdate { /// Represents the result of syncing state request. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SyncStateResponse { - /// Number of the latest block in the chain. - #[prost(fixed32, tag = "1")] - pub chain_tip: u32, /// Block header of the block with the first note matching the specified criteria. - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag = "1")] pub block_header: ::core::option::Option, /// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "2")] pub mmr_delta: ::core::option::Option, /// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - #[prost(message, repeated, tag = "5")] + #[prost(message, repeated, tag = "3")] pub accounts: ::prost::alloc::vec::Vec, /// List of transactions executed against requested accounts between `request.block_num + 1` and /// `response.block_header.block_num`. - #[prost(message, repeated, tag = "6")] + #[prost(message, repeated, tag = "4")] pub transactions: ::prost::alloc::vec::Vec, /// List of all notes together with the Merkle paths from `response.block_header.note_root`. - #[prost(message, repeated, tag = "7")] + #[prost(message, repeated, tag = "5")] pub notes: ::prost::alloc::vec::Vec, } /// Represents the result of syncing notes request. diff --git a/crates/proto/src/generated/rpc.rs b/crates/proto/src/generated/rpc.rs index 4f25dfdbb..74767ee1f 100644 --- a/crates/proto/src/generated/rpc.rs +++ b/crates/proto/src/generated/rpc.rs @@ -355,10 +355,9 @@ pub mod api_client { /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts and notes) the client is interested in. /// - /// This request returns the next block containing requested data. It also returns `chain_tip` - /// which is the latest block number in the chain. Client is expected to repeat these requests - /// in a loop until `response.block_header.block_num == response.chain_tip`, at which point - /// the client is fully synchronized with the chain. + /// This request returns a stream where multiple update responses will be pushed in order. + /// Client is expected to read the updates from the stream and apply them, and then it will be + /// fully synchronized with the chain. /// /// Each update response also contains info about new notes, accounts etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain @@ -371,7 +370,9 @@ pub mod api_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response< + tonic::codec::Streaming, + >, tonic::Status, > { self.inner @@ -386,7 +387,7 @@ pub mod api_client { let path = http::uri::PathAndQuery::from_static("/rpc.Api/SyncState"); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new("rpc.Api", "SyncState")); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } } } @@ -501,13 +502,21 @@ pub mod api_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the SyncState method. + type SyncStateStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::responses::SyncStateResponse, + tonic::Status, + >, + > + + std::marker::Send + + 'static; /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts and notes) the client is interested in. /// - /// This request returns the next block containing requested data. It also returns `chain_tip` - /// which is the latest block number in the chain. Client is expected to repeat these requests - /// in a loop until `response.block_header.block_num == response.chain_tip`, at which point - /// the client is fully synchronized with the chain. + /// This request returns a stream where multiple update responses will be pushed in order. + /// Client is expected to read the updates from the stream and apply them, and then it will be + /// fully synchronized with the chain. /// /// Each update response also contains info about new notes, accounts etc. created. It also returns /// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain @@ -519,10 +528,7 @@ pub mod api_server { async fn sync_state( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ApiServer { @@ -1087,12 +1093,13 @@ pub mod api_server { struct SyncStateSvc(pub Arc); impl< T: Api, - > tonic::server::UnaryService< + > tonic::server::ServerStreamingService< super::super::requests::SyncStateRequest, > for SyncStateSvc { type Response = super::super::responses::SyncStateResponse; + type ResponseStream = T::SyncStateStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( @@ -1125,7 +1132,7 @@ pub mod api_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) diff --git a/crates/proto/src/generated/store.rs b/crates/proto/src/generated/store.rs index 0d9b52f78..ee146f68b 100644 --- a/crates/proto/src/generated/store.rs +++ b/crates/proto/src/generated/store.rs @@ -450,7 +450,9 @@ pub mod api_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response< + tonic::codec::Streaming, + >, tonic::Status, > { self.inner @@ -465,7 +467,7 @@ pub mod api_client { let path = http::uri::PathAndQuery::from_static("/store.Api/SyncState"); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new("store.Api", "SyncState")); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } } } @@ -602,6 +604,15 @@ pub mod api_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the SyncState method. + type SyncStateStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::responses::SyncStateResponse, + tonic::Status, + >, + > + + std::marker::Send + + 'static; /// Returns info which can be used by the client to sync up to the latest state of the chain /// for the objects (accounts, notes, nullifiers) the client is interested in. /// @@ -620,10 +631,7 @@ pub mod api_server { async fn sync_state( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ApiServer { @@ -1332,12 +1340,13 @@ pub mod api_server { struct SyncStateSvc(pub Arc); impl< T: Api, - > tonic::server::UnaryService< + > tonic::server::ServerStreamingService< super::super::requests::SyncStateRequest, > for SyncStateSvc { type Response = super::super::responses::SyncStateResponse; + type ResponseStream = T::SyncStateStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( @@ -1370,7 +1379,7 @@ pub mod api_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) diff --git a/crates/rpc-proto/proto/responses.proto b/crates/rpc-proto/proto/responses.proto index ad1f353a5..e651d9d35 100644 --- a/crates/rpc-proto/proto/responses.proto +++ b/crates/rpc-proto/proto/responses.proto @@ -48,24 +48,21 @@ message NullifierUpdate { // Represents the result of syncing state request. message SyncStateResponse { - // Number of the latest block in the chain. - fixed32 chain_tip = 1; - // Block header of the block with the first note matching the specified criteria. - block.BlockHeader block_header = 2; + block.BlockHeader block_header = 1; // Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - mmr.MmrDelta mmr_delta = 3; + mmr.MmrDelta mmr_delta = 2; // List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - repeated account.AccountSummary accounts = 5; + repeated account.AccountSummary accounts = 3; // List of transactions executed against requested accounts between `request.block_num + 1` and // `response.block_header.block_num`. - repeated transaction.TransactionSummary transactions = 6; + repeated transaction.TransactionSummary transactions = 4; // List of all notes together with the Merkle paths from `response.block_header.note_root`. - repeated note.NoteSyncRecord notes = 7; + repeated note.NoteSyncRecord notes = 5; } // Represents the result of syncing notes request. diff --git a/crates/rpc-proto/proto/rpc.proto b/crates/rpc-proto/proto/rpc.proto index a7ad531db..247c51ede 100644 --- a/crates/rpc-proto/proto/rpc.proto +++ b/crates/rpc-proto/proto/rpc.proto @@ -51,10 +51,9 @@ service Api { // Returns info which can be used by the client to sync up to the latest state of the chain // for the objects (accounts and notes) the client is interested in. // - // This request returns the next block containing requested data. It also returns `chain_tip` - // which is the latest block number in the chain. Client is expected to repeat these requests - // in a loop until `response.block_header.block_num == response.chain_tip`, at which point - // the client is fully synchronized with the chain. + // This request returns a stream where multiple update responses will be pushed in order. + // Client is expected to read the updates from the stream and apply them, and then it will be + // fully synchronized with the chain. // // Each update response also contains info about new notes, accounts etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain @@ -63,5 +62,5 @@ service Api { // For preserving some degree of privacy, note tags contain only high // part of hashes. Thus, returned data contains excessive notes, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/crates/rpc-proto/proto/store.proto b/crates/rpc-proto/proto/store.proto index 7137121dc..1dbb78f7b 100644 --- a/crates/rpc-proto/proto/store.proto +++ b/crates/rpc-proto/proto/store.proto @@ -74,5 +74,5 @@ service Api { // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/crates/rpc/README.md b/crates/rpc/README.md index 520b52014..ffec1e804 100644 --- a/crates/rpc/README.md +++ b/crates/rpc/README.md @@ -106,9 +106,8 @@ the chain. Returns info which can be used by the client to sync up to the latest state of the chain for the objects (accounts and notes) the client is interested in. -This request returns the next block containing requested data. It also returns `chain_tip` which is the latest block -number in the chain. Client is expected to repeat these requests in a loop until -`response.block_header.block_num == response.chain_tip`, at which point the client is fully synchronized with the chain. +This request returns a stream where multiple update responses will be pushed in order. Client is expected to read +the updates from the stream and apply them, and then it will be fully synchronized with the chain. Each request also returns info about new notes, accounts, etc. created. It also returns Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain MMR peaks and chain MMR nodes. diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index fbb233eb4..ab3b06889 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -27,7 +27,7 @@ use miden_tx::TransactionVerifier; use tonic::{ service::interceptor::InterceptedService, transport::{Channel, Error}, - Request, Response, Status, + Request, Response, Status, Streaming, }; use tracing::{debug, info, instrument}; @@ -71,6 +71,8 @@ impl RpcApi { #[tonic::async_trait] impl api_server::Api for RpcApi { + type SyncStateStream = Streaming; + #[instrument( target = COMPONENT, name = "rpc:check_nullifiers", @@ -130,13 +132,12 @@ impl api_server::Api for RpcApi { target = COMPONENT, name = "rpc:sync_state", skip_all, - ret(level = "debug"), err )] async fn sync_state( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { debug!(target: COMPONENT, request = ?request.get_ref()); self.store.clone().sync_state(request).await diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index be0ea560d..b2aabd6c5 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, convert::Infallible, sync::Arc}; +use std::{collections::BTreeSet, convert::Infallible, pin::Pin, sync::Arc, time::Duration}; use miden_node_proto::{ convert, @@ -34,10 +34,12 @@ use miden_objects::{ note::{NoteId, Nullifier}, utils::{Deserializable, Serializable}, }; +use tokio::{sync::mpsc, time::timeout}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status}; use tracing::{debug, info, instrument}; -use crate::{state::State, COMPONENT}; +use crate::{errors::StateSyncError, state::State, COMPONENT}; // STORE API // ================================================================================================ @@ -46,8 +48,11 @@ pub struct StoreApi { pub(super) state: Arc, } +type ResponseStream = Pin> + Send>>; + #[tonic::async_trait] impl api_server::Api for StoreApi { + type SyncStateStream = ResponseStream; // CLIENT ENDPOINTS // -------------------------------------------------------------------------------------------- @@ -151,53 +156,98 @@ impl api_server::Api for StoreApi { target = COMPONENT, name = "store.server.sync_state", skip_all, - ret(level = "debug"), err )] async fn sync_state( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { let request = request.into_inner(); - let account_ids: Vec = read_account_ids(&request.account_ids)?; - - let (state, delta) = self - .state - .sync_state(request.block_num.into(), account_ids, request.note_tags) - .await - .map_err(internal_error)?; - - let accounts = state - .account_updates - .into_iter() - .map(|account_info| AccountSummary { - account_id: Some(account_info.account_id.into()), - account_hash: Some(account_info.account_hash.into()), - block_num: account_info.block_num.as_u32(), - }) - .collect(); - - let transactions = state - .transactions - .into_iter() - .map(|transaction_summary| TransactionSummary { - account_id: Some(transaction_summary.account_id.into()), - block_num: transaction_summary.block_num.as_u32(), - transaction_id: Some(transaction_summary.transaction_id.into()), - }) - .collect(); + let (sender, receiver) = mpsc::channel(128); // TODO: check bound of the channel - let notes = state.notes.into_iter().map(Into::into).collect(); - - Ok(Response::new(SyncStateResponse { - chain_tip: self.state.latest_block_num().await.as_u32(), - block_header: Some(state.block_header.into()), - mmr_delta: Some(delta.into()), - accounts, - transactions, - notes, - })) + let state = self.state.clone(); + let account_ids: Vec = read_account_ids(&request.account_ids)?; + tokio::spawn(async move { + let mut last_block_num = request.block_num; + loop { + let sync_result: Result = async { + let (state, delta) = state + .sync_state( + last_block_num.into(), + account_ids.clone(), + request.note_tags.clone(), + ) + .await?; + + let accounts = state + .account_updates + .into_iter() + .map(|account_info| AccountSummary { + account_id: Some(account_info.account_id.into()), + account_hash: Some(account_info.account_hash.into()), + block_num: account_info.block_num.as_u32(), + }) + .collect(); + + let transactions = state + .transactions + .into_iter() + .map(|transaction_summary| TransactionSummary { + account_id: Some(transaction_summary.account_id.into()), + block_num: transaction_summary.block_num.as_u32(), + transaction_id: Some(transaction_summary.transaction_id.into()), + }) + .collect(); + + let notes = state.notes.into_iter().map(Into::into).collect(); + + let response = SyncStateResponse { + block_header: Some(state.block_header.into()), + mmr_delta: Some(delta.into()), + accounts, + transactions, + notes, + }; + Ok(response) + } + .await; + + if let Ok(response) = &sync_result { + // If the response is empty, we have already reached the latest state + if sync_response_is_empty(response) { + break; + } + last_block_num = response.block_header.unwrap().block_num; + } + let sync_failed = sync_result.is_err(); + let response = sync_result + .inspect_err(|err| info!(target: COMPONENT, "Sync state failed: {}", err)) + .map_err(internal_error); + + match timeout(Duration::from_secs(10), sender.send(response)).await { + Ok(Ok(())) => {}, + Ok(Err(e)) => { + info!(target: COMPONENT, "Failed to send sync state response: {}", e); + break; + }, + Err(e) => { + info!( + target: COMPONENT, + "Failed to send sync state response: timeout after {}", e + ); + break; + }, + }; + + if sync_failed { + break; + } + } + }); + + let output_stream = ReceiverStream::new(receiver); + Ok(Response::new(Box::pin(output_stream) as Self::SyncStateStream)) } /// Returns info which can be used by the client to sync note state. @@ -539,6 +589,13 @@ fn read_account_id(id: Option) -> Result bool { + response.mmr_delta.as_ref().map_or(true, |d| d.data.is_empty()) + && response.accounts.is_empty() + && response.transactions.is_empty() + && response.notes.is_empty() +} + #[instrument(target = COMPONENT, skip_all, err)] fn read_account_ids( account_ids: &[generated::account::AccountId], diff --git a/proto/responses.proto b/proto/responses.proto index ad1f353a5..e651d9d35 100644 --- a/proto/responses.proto +++ b/proto/responses.proto @@ -48,24 +48,21 @@ message NullifierUpdate { // Represents the result of syncing state request. message SyncStateResponse { - // Number of the latest block in the chain. - fixed32 chain_tip = 1; - // Block header of the block with the first note matching the specified criteria. - block.BlockHeader block_header = 2; + block.BlockHeader block_header = 1; // Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`. - mmr.MmrDelta mmr_delta = 3; + mmr.MmrDelta mmr_delta = 2; // List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`. - repeated account.AccountSummary accounts = 5; + repeated account.AccountSummary accounts = 3; // List of transactions executed against requested accounts between `request.block_num + 1` and // `response.block_header.block_num`. - repeated transaction.TransactionSummary transactions = 6; + repeated transaction.TransactionSummary transactions = 4; // List of all notes together with the Merkle paths from `response.block_header.note_root`. - repeated note.NoteSyncRecord notes = 7; + repeated note.NoteSyncRecord notes = 5; } // Represents the result of syncing notes request. diff --git a/proto/rpc.proto b/proto/rpc.proto index a7ad531db..247c51ede 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -51,10 +51,9 @@ service Api { // Returns info which can be used by the client to sync up to the latest state of the chain // for the objects (accounts and notes) the client is interested in. // - // This request returns the next block containing requested data. It also returns `chain_tip` - // which is the latest block number in the chain. Client is expected to repeat these requests - // in a loop until `response.block_header.block_num == response.chain_tip`, at which point - // the client is fully synchronized with the chain. + // This request returns a stream where multiple update responses will be pushed in order. + // Client is expected to read the updates from the stream and apply them, and then it will be + // fully synchronized with the chain. // // Each update response also contains info about new notes, accounts etc. created. It also returns // Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain @@ -63,5 +62,5 @@ service Api { // For preserving some degree of privacy, note tags contain only high // part of hashes. Thus, returned data contains excessive notes, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} } diff --git a/proto/store.proto b/proto/store.proto index 7137121dc..1dbb78f7b 100644 --- a/proto/store.proto +++ b/proto/store.proto @@ -74,5 +74,5 @@ service Api { // For preserving some degree of privacy, note tags and nullifiers filters contain only high // part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make // additional filtering of that data on its side. - rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {} + rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {} }