Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream SyncState response #685

Open
wants to merge 8 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ env/
*.out
node_modules/
*DS_Store
*.iml
*.iml
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

- [BREAKING] Updated minimum Rust version to 1.84.
- [BREAKING] `Endpoint` configuration simplified to a single string (#654).
- [BREAKING] `CheckNullifiersByPrefix` now takes a starting block number (#707).
- [BREAKING] Changed sync state endpoint to stream the response (#685).

### Enhancements

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ check: ## Check all targets and features for errors without code generation
# --- building ------------------------------------------------------------------------------------

.PHONY: build
build: ## Builds all crates and re-builds ptotobuf bindings for proto crates
build: ## Builds all crates and re-builds protobuf bindings for proto crates
${BUILD_PROTO} cargo build --locked

# --- installing ----------------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions crates/proto/src/generated/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct CheckNullifiersByPrefixRequest {
/// to `prefix_len`.
#[prost(uint32, repeated, tag = "2")]
pub nullifiers: ::prost::alloc::vec::Vec<u32>,
/// Block number from which the nullifiers are requested (inclusive).
#[prost(fixed32, tag = "3")]
pub block_num: u32,
}
/// Returns a nullifier proof for each of the requested nullifiers.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
15 changes: 6 additions & 9 deletions crates/proto/src/generated/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,24 @@ pub struct NullifierUpdate {
/// Represents the result of syncing state request.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncStateResponse {
/// Number of the latest block in the chain.
#[prost(fixed32, tag = "1")]
pub chain_tip: u32,
/// Block header of the block with the first note matching the specified criteria.
#[prost(message, optional, tag = "2")]
#[prost(message, optional, tag = "1")]
pub block_header: ::core::option::Option<super::block::BlockHeader>,
/// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`.
#[prost(message, optional, tag = "3")]
#[prost(message, optional, tag = "2")]
pub mmr_delta: ::core::option::Option<super::mmr::MmrDelta>,
/// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`.
#[prost(message, repeated, tag = "5")]
#[prost(message, repeated, tag = "3")]
pub accounts: ::prost::alloc::vec::Vec<super::account::AccountSummary>,
/// List of transactions executed against requested accounts between `request.block_num + 1` and
/// `response.block_header.block_num`.
#[prost(message, repeated, tag = "6")]
#[prost(message, repeated, tag = "4")]
pub transactions: ::prost::alloc::vec::Vec<super::transaction::TransactionSummary>,
/// List of all notes together with the Merkle paths from `response.block_header.note_root`.
#[prost(message, repeated, tag = "7")]
#[prost(message, repeated, tag = "5")]
pub notes: ::prost::alloc::vec::Vec<super::note::NoteSyncRecord>,
/// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`.
#[prost(message, repeated, tag = "8")]
#[prost(message, repeated, tag = "6")]
pub nullifiers: ::prost::alloc::vec::Vec<NullifierUpdate>,
}
/// Represents the result of syncing notes request.
Expand Down
45 changes: 26 additions & 19 deletions crates/proto/src/generated/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,11 @@ pub mod api_client {
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
/// This request returns the next block containing requested data. It also returns `chain_tip`
/// which is the latest block number in the chain. Client is expected to repeat these requests
/// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
/// the client is fully synchronized with the chain.
/// This request returns a stream where multiple update responses will be pushed in order.
/// Client is expected to read the updates from the stream and apply them, and then it will be
/// fully synchronized with the chain.
///
/// Each request also returns info about new notes, nullifiers etc. created. It also returns
/// Each update response also contains info about new notes, nullifiers etc. created. It also returns
/// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
/// MMR peaks and chain MMR nodes.
///
Expand All @@ -371,7 +370,9 @@ pub mod api_client {
&mut self,
request: impl tonic::IntoRequest<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Response<
tonic::codec::Streaming<super::super::responses::SyncStateResponse>,
>,
tonic::Status,
> {
self.inner
Expand All @@ -386,7 +387,7 @@ pub mod api_client {
let path = http::uri::PathAndQuery::from_static("/rpc.Api/SyncState");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("rpc.Api", "SyncState"));
self.inner.unary(req, path, codec).await
self.inner.server_streaming(req, path, codec).await
}
}
}
Expand Down Expand Up @@ -501,15 +502,23 @@ pub mod api_server {
tonic::Response<super::super::responses::SyncNoteResponse>,
tonic::Status,
>;
/// Server streaming response type for the SyncState method.
type SyncStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<
super::super::responses::SyncStateResponse,
tonic::Status,
>,
>
+ std::marker::Send
+ 'static;
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
/// This request returns the next block containing requested data. It also returns `chain_tip`
/// which is the latest block number in the chain. Client is expected to repeat these requests
/// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
/// the client is fully synchronized with the chain.
/// This request returns a stream where multiple update responses will be pushed in order.
/// Client is expected to read the updates from the stream and apply them, and then it will be
/// fully synchronized with the chain.
///
/// Each request also returns info about new notes, nullifiers etc. created. It also returns
/// Each update response also contains info about new notes, nullifiers etc. created. It also returns
/// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
/// MMR peaks and chain MMR nodes.
///
Expand All @@ -519,10 +528,7 @@ pub mod api_server {
async fn sync_state(
&self,
request: tonic::Request<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Status,
>;
) -> std::result::Result<tonic::Response<Self::SyncStateStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct ApiServer<T> {
Expand Down Expand Up @@ -1087,12 +1093,13 @@ pub mod api_server {
struct SyncStateSvc<T: Api>(pub Arc<T>);
impl<
T: Api,
> tonic::server::UnaryService<
> tonic::server::ServerStreamingService<
super::super::requests::SyncStateRequest,
> for SyncStateSvc<T> {
type Response = super::super::responses::SyncStateResponse;
type ResponseStream = T::SyncStateStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
Expand Down Expand Up @@ -1125,7 +1132,7 @@ pub mod api_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
Expand Down
27 changes: 18 additions & 9 deletions crates/proto/src/generated/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ pub mod api_client {
&mut self,
request: impl tonic::IntoRequest<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Response<
tonic::codec::Streaming<super::super::responses::SyncStateResponse>,
>,
tonic::Status,
> {
self.inner
Expand All @@ -465,7 +467,7 @@ pub mod api_client {
let path = http::uri::PathAndQuery::from_static("/store.Api/SyncState");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("store.Api", "SyncState"));
self.inner.unary(req, path, codec).await
self.inner.server_streaming(req, path, codec).await
}
}
}
Expand Down Expand Up @@ -602,6 +604,15 @@ pub mod api_server {
tonic::Response<super::super::responses::SyncNoteResponse>,
tonic::Status,
>;
/// Server streaming response type for the SyncState method.
type SyncStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<
super::super::responses::SyncStateResponse,
tonic::Status,
>,
>
+ std::marker::Send
+ 'static;
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
Expand All @@ -620,10 +631,7 @@ pub mod api_server {
async fn sync_state(
&self,
request: tonic::Request<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Status,
>;
) -> std::result::Result<tonic::Response<Self::SyncStateStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct ApiServer<T> {
Expand Down Expand Up @@ -1332,12 +1340,13 @@ pub mod api_server {
struct SyncStateSvc<T: Api>(pub Arc<T>);
impl<
T: Api,
> tonic::server::UnaryService<
> tonic::server::ServerStreamingService<
super::super::requests::SyncStateRequest,
> for SyncStateSvc<T> {
type Response = super::super::responses::SyncStateResponse;
type ResponseStream = T::SyncStateStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
Expand Down Expand Up @@ -1370,7 +1379,7 @@ pub mod api_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc-proto/proto/requests.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ message CheckNullifiersByPrefixRequest {
// List of nullifiers to check. Each nullifier is specified by its prefix with length equal
// to `prefix_len`.
repeated uint32 nullifiers = 2;
// Block number from which the nullifiers are requested (inclusive).
fixed32 block_num = 3;
}

// Returns a nullifier proof for each of the requested nullifiers.
Expand Down
15 changes: 6 additions & 9 deletions crates/rpc-proto/proto/responses.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,24 @@ message NullifierUpdate {

// Represents the result of syncing state request.
message SyncStateResponse {
// Number of the latest block in the chain.
fixed32 chain_tip = 1;

// Block header of the block with the first note matching the specified criteria.
block.BlockHeader block_header = 2;
block.BlockHeader block_header = 1;

// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`.
mmr.MmrDelta mmr_delta = 3;
mmr.MmrDelta mmr_delta = 2;

// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`.
repeated account.AccountSummary accounts = 5;
repeated account.AccountSummary accounts = 3;

// List of transactions executed against requested accounts between `request.block_num + 1` and
// `response.block_header.block_num`.
repeated transaction.TransactionSummary transactions = 6;
repeated transaction.TransactionSummary transactions = 4;

// List of all notes together with the Merkle paths from `response.block_header.note_root`.
repeated note.NoteSyncRecord notes = 7;
repeated note.NoteSyncRecord notes = 5;

// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`.
repeated NullifierUpdate nullifiers = 8;
repeated NullifierUpdate nullifiers = 6;
}

// Represents the result of syncing notes request.
Expand Down
11 changes: 5 additions & 6 deletions crates/rpc-proto/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ service Api {
// Returns info which can be used by the client to sync up to the latest state of the chain
// for the objects (accounts, notes, nullifiers) the client is interested in.
//
// This request returns the next block containing requested data. It also returns `chain_tip`
// which is the latest block number in the chain. Client is expected to repeat these requests
// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
// the client is fully synchronized with the chain.
// This request returns a stream where multiple update responses will be pushed in order.
// Client is expected to read the updates from the stream and apply them, and then it will be
// fully synchronized with the chain.
//
// Each request also returns info about new notes, nullifiers etc. created. It also returns
// Each update response also contains info about new notes, nullifiers etc. created. It also returns
// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
// MMR peaks and chain MMR nodes.
//
// For preserving some degree of privacy, note tags and nullifiers filters contain only high
// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make
// additional filtering of that data on its side.
rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {}
rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {}
}
2 changes: 1 addition & 1 deletion crates/rpc-proto/proto/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ service Api {
// For preserving some degree of privacy, note tags and nullifiers filters contain only high
// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make
// additional filtering of that data on its side.
rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {}
rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {}
}
2 changes: 1 addition & 1 deletion crates/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Returns a nullifier proof for each of the requested nullifiers.

### CheckNullifiersByPrefix

Returns a list of nullifiers that match the specified prefixes and are recorded in the node.
Returns a list of nullifiers recorded in the node that match the specified prefixes and block creation height.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to match block creation height, right? If so we should change this to reflect the response will contain nullifiers created at any block larger or equal than the passed one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed - let's make it more precise - i.e., the response will contain nullifiers created in or after the specified block.


Only 16-bit prefixes are supported at this time.

Expand Down
7 changes: 4 additions & 3 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use miden_tx::TransactionVerifier;
use tonic::{
service::interceptor::InterceptedService,
transport::{Channel, Error},
Request, Response, Status,
Request, Response, Status, Streaming,
};
use tracing::{debug, info, instrument};

Expand Down Expand Up @@ -71,6 +71,8 @@ impl RpcApi {

#[tonic::async_trait]
impl api_server::Api for RpcApi {
type SyncStateStream = Streaming<SyncStateResponse>;

#[instrument(
target = COMPONENT,
name = "rpc:check_nullifiers",
Expand Down Expand Up @@ -130,13 +132,12 @@ impl api_server::Api for RpcApi {
target = COMPONENT,
name = "rpc:sync_state",
skip_all,
ret(level = "debug"),
err
)]
async fn sync_state(
&self,
request: Request<SyncStateRequest>,
) -> Result<Response<SyncStateResponse>, Status> {
) -> Result<Response<Self::SyncStateStream>, Status> {
debug!(target: COMPONENT, request = ?request.get_ref());

self.store.clone().sync_state(request).await
Expand Down
3 changes: 2 additions & 1 deletion crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,13 @@ impl Db {
&self,
prefix_len: u32,
nullifier_prefixes: Vec<u32>,
block_num: BlockNumber,
) -> Result<Vec<NullifierInfo>> {
self.pool
.get()
.await?
.interact(move |conn| {
sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes)
sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes, block_num)
})
.await
.map_err(|err| {
Expand Down
Loading
Loading