diff --git a/ipa-core/src/protocol/context/dzkp_validator.rs b/ipa-core/src/protocol/context/dzkp_validator.rs index 881a3e505..9261977ad 100644 --- a/ipa-core/src/protocol/context/dzkp_validator.rs +++ b/ipa-core/src/protocol/context/dzkp_validator.rs @@ -1,17 +1,19 @@ use std::{ collections::HashMap, fmt::Debug, + iter::repeat, sync::{Arc, Mutex, Weak}, }; use async_trait::async_trait; use bitvec::{array::BitArray, prelude::Lsb0, slice::BitSlice}; -use futures::{future, Future, Stream}; -use futures_util::{stream::iter, StreamExt}; +use futures::{Future, Stream}; +use futures_util::{StreamExt, TryFutureExt}; use crate::{ error::Error, ff::Field, + helpers::stream::TryFlattenItersExt, protocol::{ context::{ dzkp_malicious::DZKPUpgraded as MaliciousDZKPUpgraded, @@ -23,7 +25,11 @@ use crate::{ }, seq_join::{seq_join, SeqJoin}, sharding::ShardBinding, + telemetry::metrics::{DZKP_BATCH_REALLOCATION_BACK, DZKP_BATCH_REALLOCATION_FRONT}, }; +// constants for metrics::increment_counter! +const RECORD: &str = "record"; +const OFFSET: &str = "offset"; /// `UnverifiedValues` are intermediate values that occur during a multiplication. /// These values need to be verified since there might have been malicious behavior. @@ -43,7 +49,6 @@ struct UnverifiedValues { z_right: BitArray<[u8; 32], Lsb0>, } -#[allow(dead_code)] impl UnverifiedValues { fn new() -> Self { Self { @@ -82,6 +87,7 @@ impl UnverifiedValues { /// `Convert` allows to convert `UnverifiedValues` into a format compatible with DZKPs /// Converted values will take more space in memory. + #[allow(dead_code)] fn convert(&self) -> DF::UnverifiedFieldValues { DF::convert( &self.x_left, @@ -199,7 +205,6 @@ struct UnverifiedValuesStore { vec: Vec>, } -#[allow(dead_code)] impl UnverifiedValuesStore { /// Creates a new store for given `chunk_size`. `offset` is initialized to `0`. /// Lazy allocation of the vector. It is allocated once the first segment is added. @@ -265,6 +270,11 @@ impl UnverifiedValuesStore { /// when `new_offset` is larger that the current `offset`, the function does nothing fn reset_offset(&mut self, new_offset: RecordId, segment_length: usize) { if new_offset < self.offset { + // metrics to count reallocations, disable-metrics flag will disable it globally + // right now, I don't pass the gate/step information which would be useful + // it would require to change the function signature and potentially clone gate even when feature is not enabled + metrics::increment_counter!(DZKP_BATCH_REALLOCATION_FRONT, RECORD => new_offset.to_string(), OFFSET => self.offset.to_string()); + let mut new_offset = usize::from(new_offset); // if segment_length is less than 256, redefine new offset such that we don't need to rearrange the existing segments if 256 % segment_length == 0 { @@ -273,24 +283,11 @@ impl UnverifiedValuesStore { } let extension_length = ((usize::from(self.offset) - new_offset) >> 8) * segment_length; // use existing vec and add enough space in front - let vec_previous = &mut self.vec; - let mut vec: Vec> = - Vec::with_capacity(vec_previous.len() + extension_length); - vec.resize(extension_length, None); - vec.append(vec_previous); + self.vec.splice(0..0, repeat(None).take(extension_length)); self.offset = RecordId::from(new_offset); } } - /// `extend_vec` allows to extend a store to be able to hold a larger `RecordId` - fn extend_vec(&mut self, extension_length: usize) { - let vec_previous = &mut self.vec; - let new_length = vec_previous.len() + extension_length; - let mut vec: Vec> = Vec::with_capacity(new_length); - vec.append(vec_previous); - vec.resize(new_length, None); - } - /// `insert_segment` allows to include a new segment in `UnverifiedValuesStore` /// /// ## Panics @@ -319,18 +316,21 @@ impl UnverifiedValuesStore { // recover from wrong size // increase size of store to twice the size + position of end of the segment to be included // expensive, ideally use initialize with correct length - self.extend_vec(2 * self.vec.len() - (position_vec + (length >> 8))); + self.vec + .resize(self.vec.len() + position_vec + (length >> 8), None); + // metrics to count reallocations, disable-metrics flag will disable it globally + // right now, I don't pass the gate/step information which would be useful + // it would require to change the function signature and potentially clone gate even when feature is not enabled + metrics::increment_counter!(DZKP_BATCH_REALLOCATION_BACK, RECORD => record_id.to_string(), OFFSET => self.offset.to_string()); } if 256 % length == 0 { // segments are small, pack one or more in each entry of `vec` let position_bit_array = (length * position_raw) % 256; - if self.vec[position_vec].is_none() { - let entry = UnverifiedValues::new(); - self.vec[position_vec] = Some(entry); - } + // get entry - let entry = self.vec[position_vec].as_ref().unwrap(); + let entry = self.vec[position_vec].get_or_insert_with(UnverifiedValues::new); + // copy segment value into entry for (segment_value, mut array_value) in [ (segment.x_left, entry.x_left), @@ -367,6 +367,7 @@ impl UnverifiedValuesStore { /// `get_unverified_field_value` converts a `UnverifiedValuesStore` into an iterator over `UnverifiedFieldValues` /// compatible with DZKPs + #[allow(dead_code)] fn get_unverified_field_values( &self, ) -> impl Iterator + '_ { @@ -387,7 +388,6 @@ struct Batch { inner: HashMap, } -#[allow(dead_code)] impl Batch { fn new(chunk_size: usize) -> Self { Self { @@ -397,11 +397,7 @@ impl Batch { } fn is_empty(&self) -> bool { - self.inner.is_empty() - || self - .inner - .iter() - .fold(true, |acc, (_, value)| acc && value.is_empty()) + self.inner.is_empty() || self.inner.values().all(UnverifiedValuesStore::is_empty) } fn push(&mut self, gate: Gate, record_id: RecordId, segment: Segment) { @@ -428,6 +424,7 @@ impl Batch { /// `get_unverified_field_value` converts a `Batch` into an iterator over `UnverifiedFieldValues` /// compatible with DZKPs + #[allow(dead_code)] fn get_unverified_field_values( &self, ) -> impl Iterator + '_ { @@ -535,18 +532,8 @@ pub trait DZKPValidator { let chunk_size = self.get_chunk_size().unwrap_or(10usize); seq_join::<'st, S, F, O>(self.context().active_work(), source) .chunks(chunk_size) - .then(move |x| async move { - let valid = self.validate::().await.is_ok(); - iter(x).map(move |x| { - if valid { - Ok(x) - } else { - Err(Error::DZKPValidationFailed) - } - }) - }) - .flatten() - .take_while(|x| future::ready(x.is_ok())) + .then(move |chunk| self.validate::().map_ok(|()| chunk)) + .try_flatten_iters() } } @@ -588,12 +575,15 @@ impl<'a, B: ShardBinding> DZKPValidator> /// `MaliciousDZKPValidator` corresponds to pub struct `Malicious` and implements the trait `DZKPValidator` /// The implementation of `validate` of the `DZKPValidator` trait depends on generic `DF` #[allow(dead_code)] +#[cfg(feature = "descriptive-gate")] +// dead code: validate_ctx is not used yet pub struct MaliciousDZKPValidator<'a> { batch_ref: Arc>, protocol_ctx: MaliciousDZKPUpgraded<'a>, validate_ctx: Base<'a>, } +#[cfg(feature = "descriptive-gate")] #[async_trait] impl<'a> DZKPValidator> for MaliciousDZKPValidator<'a> { fn context(&self) -> MaliciousDZKPUpgraded<'a> { @@ -634,6 +624,7 @@ impl<'a> DZKPValidator> for MaliciousDZKPValidator<'a> { } } +#[cfg(feature = "descriptive-gate")] impl<'a> MaliciousDZKPValidator<'a> { #[must_use] pub fn new(ctx: MaliciousContext<'a>, chunk_size: usize) -> Self { @@ -652,6 +643,7 @@ impl<'a> MaliciousDZKPValidator<'a> { } } +#[cfg(feature = "descriptive-gate")] impl<'a> Drop for MaliciousDZKPValidator<'a> { fn drop(&mut self) { self.is_safe().unwrap(); diff --git a/ipa-core/src/telemetry/mod.rs b/ipa-core/src/telemetry/mod.rs index 8f90befbb..109ae6518 100644 --- a/ipa-core/src/telemetry/mod.rs +++ b/ipa-core/src/telemetry/mod.rs @@ -17,6 +17,8 @@ pub mod metrics { pub const INDEXED_PRSS_GENERATED: &str = "i.prss.gen"; pub const SEQUENTIAL_PRSS_GENERATED: &str = "s.prss.gen"; pub const STEP_NARROWED: &str = "step.narrowed"; + pub const DZKP_BATCH_REALLOCATION_FRONT: &str = "batch.realloc.front"; + pub const DZKP_BATCH_REALLOCATION_BACK: &str = "batch.realloc.back"; #[cfg(feature = "web-app")] pub mod web { @@ -105,5 +107,17 @@ pub mod metrics { Unit::Count, "Number of times the step is narrowed" ); + + describe_counter!( + DZKP_BATCH_REALLOCATION_FRONT, + Unit::Count, + "Number of DZKP reallocations due to records smaller than the offset" + ); + + describe_counter!( + DZKP_BATCH_REALLOCATION_BACK, + Unit::Count, + "Number of DZKP reallocations due to insufficient length" + ); } }