From 52472fd4505d35d3c2a7215eecaff364dd03ac63 Mon Sep 17 00:00:00 2001 From: Weiyuan Wu Date: Sun, 21 Jan 2024 12:18:17 -0800 Subject: [PATCH] some fixes --- native/src/ops/window/correlation.rs | 167 +++++++++++++-------------- native/src/ops/window/delay.rs | 4 +- native/src/ops/window/mean.rs | 106 +++++++---------- native/src/ops/window/minmax.rs | 84 +++++--------- native/src/ops/window/quantile.rs | 105 +++++++---------- native/src/ops/window/rank.rs | 110 ++++++++---------- native/src/ops/window/skew.rs | 149 ++++++++++-------------- native/src/ops/window/stdev.rs | 123 +++++++++----------- native/src/ops/window/sum.rs | 100 +++++++--------- native/src/python.rs | 6 + 10 files changed, 406 insertions(+), 548 deletions(-) diff --git a/native/src/ops/window/correlation.rs b/native/src/ops/window/correlation.rs index e2d0dff..b345ea6 100644 --- a/native/src/ops/window/correlation.rs +++ b/native/src/ops/window/correlation.rs @@ -29,16 +29,16 @@ impl Cache { pub struct TSCorrelation { win_size: usize, - sx: BoxOp, - sy: BoxOp, + x: BoxOp, + y: BoxOp, cache: Cache, - warmup: usize, + i: usize, } impl Clone for TSCorrelation { fn clone(&self) -> Self { - Self::new(self.win_size, self.sx.clone(), self.sy.clone()) + Self::new(self.win_size, self.x.clone(), self.y.clone()) } } @@ -46,11 +46,11 @@ impl TSCorrelation { pub fn new(win_size: usize, x: BoxOp, y: BoxOp) -> Self { Self { win_size, - sx: x, - sy: y, + x, + y, cache: Cache::new(), - warmup: 0, + i: 0, } } } @@ -62,84 +62,73 @@ impl Named for TSCorrelation { impl Operator for TSCorrelation { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let (sx, sy) = (&mut self.sx, &mut self.sy); - let (xs, ys) = rayon::join(|| sx.update(tb), || sy.update(tb)); - let (xs, ys) = (xs?, ys?); - - let mut results = Vec::with_capacity(xs.len()); - - let mut i = 0; - while i + self.warmup < max(self.sx.ready_offset(), self.sy.ready_offset()) && i < xs.len() - { - results.push(f64::NAN); - i += 1; - } - - while i + self.warmup < self.ready_offset() && i < xs.len() { - // maintain - let (xval, yval) = (xs[i], ys[i]); - - self.cache.history.push_back((xval, yval)); - self.cache.x += xval; - self.cache.y += yval; - - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..xs.len() { - let (xval, yval) = (xs[i], ys[i]); + let (x, y) = (&mut self.x, &mut self.y); + let (xs, ys) = rayon::join(|| x.update(tb), || y.update(tb)); + let (xs, ys) = (&*xs?, &*ys?); + assert_eq!(tb.len(), xs.len()); + assert_eq!(tb.len(), ys.len()); + + let mut results = Vec::with_capacity(tb.len()); + + for (&xval, &yval) in xs.into_iter().zip(ys) { + if self.i < self.x.ready_offset() || self.i < self.y.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - // maintain self.cache.history.push_back((xval, yval)); self.cache.x += xval; self.cache.y += yval; - // compute - let n = self.cache.history.len() as f64; // this should be equal to self.win_size - let xbar = self.cache.x / n; - let ybar = self.cache.y / n; - let nom = self - .cache - .history - .iter() - .map(|(x, y)| (x - xbar) * (y - ybar)) - .sum::(); - let denomx = self - .cache - .history - .iter() - .map(|(x, _)| (x - xbar).powf(2.)) - .sum::() - .sqrt(); - let denomy = self - .cache - .history - .iter() - .map(|(_, y)| (y - ybar).powf(2.)) - .sum::() - .sqrt(); - - let denom = denomx * denomy; - - if denom == 0. { - results.push(0.); + let val = if self.cache.history.len() == self.win_size { + let n = self.cache.history.len() as f64; // this should be equal to self.win_size + let xbar = self.cache.x / n; + let ybar = self.cache.y / n; + let nom = self + .cache + .history + .iter() + .map(|(x, y)| (x - xbar) * (y - ybar)) + .sum::(); + let denomx = self + .cache + .history + .iter() + .map(|(x, _)| (x - xbar).powf(2.)) + .sum::() + .sqrt(); + let denomy = self + .cache + .history + .iter() + .map(|(_, y)| (y - ybar).powf(2.)) + .sum::() + .sqrt(); + + let denom = denomx * denomy; + + let val = if denom == 0. { + 0. + } else { + self.fchecked(nom / denom)? + }; + let (xval, yval) = self.cache.history.pop_front().unwrap(); + self.cache.x -= xval; + self.cache.y -= yval; + val } else { - results.push(self.fchecked(nom / denom)?); - } + f64::NAN + }; - // maintain - let (xval, yval) = self.cache.history.pop_front().unwrap(); - self.cache.x -= xval; - self.cache.y -= yval; + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - max(self.sx.ready_offset(), self.sy.ready_offset()) + self.win_size - 1 + max(self.x.ready_offset(), self.y.ready_offset()) + self.win_size - 1 } fn to_string(&self) -> String { @@ -147,28 +136,28 @@ impl Operator for TSCorrelation { "({} {} {} {})", Self::NAME, self.win_size, - self.sx.to_string(), - self.sy.to_string() + self.x.to_string(), + self.y.to_string() ) } fn depth(&self) -> usize { - 1 + max(self.sx.depth(), self.sy.depth()) + 1 + max(self.x.depth(), self.y.depth()) } fn len(&self) -> usize { - self.sx.len() + self.sy.len() + 1 + self.x.len() + self.y.len() + 1 } fn child_indices(&self) -> Vec { - vec![1, self.sx.len() + 1] + vec![1, self.x.len() + 1] } fn columns(&self) -> Vec { - self.sx + self.x .columns() .into_iter() - .chain(self.sy.columns()) + .chain(self.y.columns()) .collect() } @@ -179,13 +168,13 @@ impl Operator for TSCorrelation { } let i = i - 1; - let nx = self.sx.len(); - let ny = self.sy.len(); + let nx = self.x.len(); + let ny = self.y.len(); if i < nx { - self.sx.get(i)? + self.x.get(i)? } else if i >= nx && i < nx + ny { - self.sy.get(i - nx)? + self.y.get(i - nx)? } else { throw!() } @@ -198,19 +187,19 @@ impl Operator for TSCorrelation { } let i = i - 1; - let nx = self.sx.len(); - let ny = self.sy.len(); + let nx = self.x.len(); + let ny = self.y.len(); if i < nx { if i == 0 { - return mem::replace(&mut self.sx, op) as BoxOp; + return mem::replace(&mut self.x, op) as BoxOp; } - self.sx.insert(i, op)? + self.x.insert(i, op)? } else if i >= nx && i < nx + ny { if i - nx == 0 { - return mem::replace(&mut self.sy, op) as BoxOp; + return mem::replace(&mut self.y, op) as BoxOp; } - self.sy.insert(i - nx, op)? + self.y.insert(i - nx, op)? } else { throw!() } diff --git a/native/src/ops/window/delay.rs b/native/src/ops/window/delay.rs index 00b4346..75ce3bb 100644 --- a/native/src/ops/window/delay.rs +++ b/native/src/ops/window/delay.rs @@ -22,10 +22,10 @@ impl Clone for Delay { } impl Delay { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - inner: s, + inner, window: VecDeque::with_capacity(win_size + 1), i: 0, } diff --git a/native/src/ops/window/mean.rs b/native/src/ops/window/mean.rs index ca66ed0..d904cbb 100644 --- a/native/src/ops/window/mean.rs +++ b/native/src/ops/window/mean.rs @@ -7,43 +7,30 @@ use std::collections::VecDeque; use std::iter::FromIterator; use std::mem; -#[derive(Clone)] -struct Cache { - history: VecDeque, - x: f64, -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - x: 0., - } - } -} - pub struct TSMean { win_size: usize, - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + sum: f64, + i: usize, } impl Clone for TSMean { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl TSMean { - pub fn new(win_size: usize, sub: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s: sub, + inner, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + sum: 0., + i: 0, } } } @@ -55,57 +42,52 @@ impl Named for TSMean { impl Operator for TSMean { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; - - let mut results = Vec::with_capacity(ss.len()); + let vals = &*self.inner.update(tb)?; + assert_eq!(tb.len(), vals.len()); - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } - - while i + self.warmup < self.ready_offset() && i < ss.len() { - // maintain - self.cache.history.push_back(ss[i]); - self.cache.x += ss[i]; - - results.push(f64::NAN); - i += 1; - } - self.warmup += i; + let mut results = Vec::with_capacity(tb.len()); - for i in i..ss.len() { - let val = ss[i]; - - // maintain - self.cache.history.push_back(val); - self.cache.x += val; - - // compute - results.push(self.fchecked(self.cache.x / self.cache.history.len() as f64)?); + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - // maintain - self.cache.x -= self.cache.history.pop_front().unwrap(); + self.window.push_back(val); + self.sum += val; + let val = if self.window.len() == self.win_size { + let val = self.sum / self.win_size as f64; + self.sum -= self.window.pop_front().unwrap(); + val + } else { + f64::NAN + }; + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string()) + format!( + "({} {} {})", + Self::NAME, + self.win_size, + self.inner.to_string() + ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -113,7 +95,7 @@ impl Operator for TSMean { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -123,10 +105,10 @@ impl Operator for TSMean { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -139,13 +121,13 @@ impl Operator for TSMean { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/minmax.rs b/native/src/ops/window/minmax.rs index 7ea2e5f..5b771c2 100644 --- a/native/src/ops/window/minmax.rs +++ b/native/src/ops/window/minmax.rs @@ -27,26 +27,26 @@ macro_rules! impl_minmax { $( pub struct $op { win_size: usize, - s: BoxOp, + inner: BoxOp, cache: Cache, - warmup: usize, + i: usize, } impl Clone for $op { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl $op { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s, + inner, cache: Cache::new(), - warmup: 0, + i: 0, } } } @@ -58,45 +58,18 @@ macro_rules! impl_minmax { impl Operator for $op { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; + let vals = &*self.inner.update(tb)?; + assert_eq!(tb.len(), vals.len()); - let mut results = Vec::with_capacity(ss.len()); + let mut results = Vec::with_capacity(tb.len()); - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } - - while i + self.warmup < self.ready_offset() && i < ss.len() { - let val = ss[i]; - self.cache.seq += 1; - - while let Some((seq_old, _)) = self.cache.history.front() { - if seq_old + self.win_size <= self.cache.seq { - self.cache.history.pop_front(); - } else { - break; - } - } - - while let Some((_, last_val)) = self.cache.history.back() { - if val $cmp *last_val { - self.cache.history.pop_back(); - } else { - break; - } + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; } - self.cache.history.push_back((self.cache.seq, val)); - - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..ss.len() { - let val = ss[i]; self.cache.seq += 1; while let Some((seq_old, _)) = self.cache.history.front() { @@ -117,27 +90,32 @@ macro_rules! impl_minmax { self.cache.history.push_back((self.cache.seq, val)); - let result = ($($vfunc)+) (&self.cache, self.win_size); - results.push(self.fchecked(result)?); + let val = if self.cache.history.len() == self.win_size { + let val = ($($vfunc)+) (&self.cache, self.win_size); + val + } else { + f64::NAN + }; + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string()) + format!("({} {} {})", Self::NAME, self.win_size, self.inner.to_string()) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -145,7 +123,7 @@ macro_rules! impl_minmax { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -155,10 +133,10 @@ macro_rules! impl_minmax { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -171,13 +149,13 @@ macro_rules! impl_minmax { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/quantile.rs b/native/src/ops/window/quantile.rs index c1248b9..6f60b9c 100644 --- a/native/src/ops/window/quantile.rs +++ b/native/src/ops/window/quantile.rs @@ -9,47 +9,34 @@ use std::collections::VecDeque; use std::iter::FromIterator; use std::mem; -#[derive(Clone)] -struct Cache { - history: VecDeque, - ostree: OSTree>, // sorted window -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - ostree: OSTree::new(), - } - } -} - pub struct TSQuantile { win_size: usize, quantile: f64, r: usize, // win_size * quantile - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + ostree: OSTree>, // sorted window + i: usize, } impl Clone for TSQuantile { fn clone(&self) -> Self { - Self::new(self.win_size, self.quantile, self.s.clone()) + Self::new(self.win_size, self.quantile, self.inner.clone()) } } impl TSQuantile { - pub fn new(win_size: usize, quantile: f64, s: BoxOp) -> Self { + pub fn new(win_size: usize, quantile: f64, inner: BoxOp) -> Self { assert!(0. <= quantile && quantile <= 1.); Self { win_size, - s, + inner, quantile, r: ((win_size - 1) as f64 * quantile).floor() as usize, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + ostree: OSTree::new(), + i: 0, } } } @@ -61,49 +48,39 @@ impl Named for TSQuantile { impl Operator for TSQuantile { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; - - let mut results = Vec::with_capacity(ss.len()); - - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } + let vals = &*self.inner.update(tb)?; + assert_eq!(tb.len(), vals.len()); - while i + self.warmup < self.ready_offset() && i < ss.len() { - // maintain - let val = ss[i]; + let mut results = Vec::with_capacity(tb.len()); - self.cache.history.push_back(val); - self.cache.ostree.increase(val.asc(), 1); - - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..ss.len() { - let val = ss[i]; + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - // maintain - self.cache.history.push_back(val); - self.cache.ostree.increase(val.asc(), 1); + self.window.push_back(val); + self.ostree.increase(val.asc(), 1); + let val = if self.window.len() == self.win_size { + let (v, _) = self.ostree.select(self.r).unwrap(); + let val = self.fchecked(v.0)?; - // compute - let (v, _) = self.cache.ostree.select(self.r).unwrap(); - results.push(self.fchecked(v.0)?); + let to_remove = self.window.pop_front().unwrap().asc(); + self.ostree.decrease(&to_remove, 1); - // maintain - let to_remove = self.cache.history.pop_front().unwrap().asc(); - self.cache.ostree.decrease(&to_remove, 1); + val + } else { + f64::NAN + }; + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { @@ -112,16 +89,16 @@ impl Operator for TSQuantile { Self::NAME, self.win_size, self.quantile, - self.s.to_string(), + self.inner.to_string(), ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -129,7 +106,7 @@ impl Operator for TSQuantile { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -139,10 +116,10 @@ impl Operator for TSQuantile { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -155,13 +132,13 @@ impl Operator for TSQuantile { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/rank.rs b/native/src/ops/window/rank.rs index 152e72d..b7adc06 100644 --- a/native/src/ops/window/rank.rs +++ b/native/src/ops/window/rank.rs @@ -9,43 +9,30 @@ use std::collections::VecDeque; use std::iter::FromIterator; use std::mem; -#[derive(Clone)] -struct Cache { - history: VecDeque, - ostree: OSTree>, // sorted window -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - ostree: OSTree::new(), - } - } -} - pub struct TSRank { win_size: usize, - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + ostree: OSTree>, // sorted window + i: usize, } impl Clone for TSRank { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl TSRank { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s, + inner, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + ostree: OSTree::new(), + i: 0, } } } @@ -57,61 +44,56 @@ impl Named for TSRank { impl Operator for TSRank { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; - - let mut results = Vec::with_capacity(ss.len()); - - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } + let vals = &*self.inner.update(tb)?; + assert_eq!(tb.len(), vals.len()); - while i + self.warmup < self.ready_offset() && i < ss.len() { - // maintain - let val = ss[i]; + let mut results = Vec::with_capacity(tb.len()); - self.cache.history.push_back(val); - self.cache.ostree.increase(val.asc(), 1); - - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..ss.len() { - let val = ss[i]; + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - // maintain - self.cache.history.push_back(val); - self.cache.ostree.increase(val.asc(), 1); + self.window.push_back(val); + self.ostree.increase(val.asc(), 1); + let val = if self.window.len() == self.win_size { + let idx = self.ostree.rank(&val.asc()).unwrap(); + let val = self.fchecked(idx as f64)?; - // compute - let idx = self.cache.ostree.rank(&val.asc()).unwrap(); - results.push(self.fchecked(idx as f64)?); + let to_remove = self.window.pop_front().unwrap().asc(); + self.ostree.decrease(&to_remove, 1); - // maintain - let to_remove = self.cache.history.pop_front().unwrap().asc(); - self.cache.ostree.decrease(&to_remove, 1); + val + } else { + f64::NAN + }; + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string(),) + format!( + "({} {} {})", + Self::NAME, + self.win_size, + self.inner.to_string(), + ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -119,7 +101,7 @@ impl Operator for TSRank { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -129,10 +111,10 @@ impl Operator for TSRank { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -145,13 +127,13 @@ impl Operator for TSRank { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/skew.rs b/native/src/ops/window/skew.rs index 98fa55a..7ba7e26 100644 --- a/native/src/ops/window/skew.rs +++ b/native/src/ops/window/skew.rs @@ -6,43 +6,30 @@ use std::borrow::Cow; use std::mem; use std::{collections::VecDeque, iter::FromIterator}; -#[derive(Clone)] -struct Cache { - history: VecDeque, - x: f64, -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - x: 0., - } - } -} - pub struct TSSkew { win_size: usize, - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + sum: f64, + i: usize, } impl Clone for TSSkew { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl TSSkew { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s, + inner, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + sum: 0., + i: 0, } } } @@ -54,85 +41,71 @@ impl Named for TSSkew { impl Operator for TSSkew { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; + let vals = &*self.inner.update(tb)?; + assert_eq!(tb.len(), vals.len()); - let mut results = Vec::with_capacity(ss.len()); + let mut results = Vec::with_capacity(tb.len()); - let mut i = 0; - - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } - - while i + self.warmup < self.ready_offset() && i < ss.len() { - // maintain - self.cache.history.push_back(ss[i]); - self.cache.x += ss[i]; + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..ss.len() { - let val = ss[i]; - // maintain - self.cache.history.push_back(val); - self.cache.x += val; - - // compute - let n = self.cache.history.len() as f64; - let mu = self.cache.x / n; - let m3 = self - .cache - .history - .iter() - .map(|x| (x - mu).powf(3.0)) - .sum::() - / n; - let m2 = self - .cache - .history - .iter() - .map(|x| (x - mu).powf(2.0)) - .sum::() - / n; - - if m2 == 0. { - results.push(0.); + self.window.push_back(val); + self.sum += val; + let val = if self.window.len() == self.win_size { + let n = self.window.len() as f64; + let mu = self.sum / n; + let m3 = self.window.iter().map(|x| (x - mu).powf(3.0)).sum::() / n; + let m2 = self.window.iter().map(|x| (x - mu).powf(2.0)).sum::() / n; + + let val = if m2 == 0. { + 0. + } else { + // do not use window function because this will overflow + // let m3 = + // cache.xxx / n - 3. / n / n * cache.xx * cache.x + 2. / n.powf(3.) * cache.x.powf(3.); + // let m2 = cache.xx / n - cache.x * cache.x / n / n; + let correction = (n * (n - 1.)).sqrt() / (n - 2.); + let result = correction * m3 / m2.powf(1.5); + + self.fchecked(result)? + }; + + self.sum -= self.window.pop_front().unwrap(); + + val } else { - // do not use window function because this will overflow - // let m3 = - // cache.xxx / n - 3. / n / n * cache.xx * cache.x + 2. / n.powf(3.) * cache.x.powf(3.); - // let m2 = cache.xx / n - cache.x * cache.x / n / n; - let correction = (n * (n - 1.)).sqrt() / (n - 2.); - let result = correction * m3 / m2.powf(1.5); - - results.push(self.fchecked(result)?); - } + f64::NAN + }; - // maintain - self.cache.x -= self.cache.history.pop_front().unwrap(); + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string()) + format!( + "({} {} {})", + Self::NAME, + self.win_size, + self.inner.to_string() + ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -140,7 +113,7 @@ impl Operator for TSSkew { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -150,10 +123,10 @@ impl Operator for TSSkew { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -166,13 +139,13 @@ impl Operator for TSSkew { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/stdev.rs b/native/src/ops/window/stdev.rs index 2b45a09..508955c 100644 --- a/native/src/ops/window/stdev.rs +++ b/native/src/ops/window/stdev.rs @@ -6,43 +6,30 @@ use std::borrow::Cow; use std::mem; use std::{collections::VecDeque, iter::FromIterator}; -#[derive(Clone)] -struct Cache { - history: VecDeque, - x: f64, -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - x: 0., - } - } -} - pub struct TSStdev { win_size: usize, - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + sum: f64, + i: usize, } impl Clone for TSStdev { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl TSStdev { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s, + inner, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + sum: 0., + i: 0, } } } @@ -54,66 +41,62 @@ impl Named for TSStdev { impl Operator for TSStdev { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; + let vals = &*self.inner.update(tb)?; - let mut results = Vec::with_capacity(ss.len()); + assert_eq!(tb.len(), vals.len()); - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } + let mut results = Vec::with_capacity(tb.len()); - while i + self.warmup < self.ready_offset() && i < ss.len() { - self.cache.history.push_back(ss[i]); - self.cache.x += ss[i]; + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - results.push(f64::NAN); - i += 1; - } - self.warmup += i; - - for i in i..ss.len() { - let val = ss[i]; - // maintain - self.cache.history.push_back(val); - self.cache.x += val; - - // compute - let n = self.cache.history.len() as f64; - let mu = self.cache.x / n; - let sum = self - .cache - .history - .iter() - .map(|v| (v - mu).powf(2.)) - .sum::(); - - let result = (sum / (n - 1.)).sqrt(); - - results.push(self.fchecked(result)?); - - // maintain - self.cache.x -= self.cache.history.pop_front().unwrap(); + self.window.push_back(val); + self.sum += val; + let val = if self.window.len() == self.win_size { + let n = self.window.len() as f64; + let mu = self.sum / n; + let sum = self.window.iter().map(|v| (v - mu).powf(2.)).sum::(); + + let result = (sum / (n - 1.)).sqrt(); + + let val = self.fchecked(result)?; + + self.sum -= self.window.pop_front().unwrap(); + + val + } else { + f64::NAN + }; + + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string()) + format!( + "({} {} {})", + Self::NAME, + self.win_size, + self.inner.to_string() + ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -121,7 +104,7 @@ impl Operator for TSStdev { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -131,10 +114,10 @@ impl Operator for TSStdev { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -147,13 +130,13 @@ impl Operator for TSStdev { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/window/sum.rs b/native/src/ops/window/sum.rs index 288cde6..5c981a9 100644 --- a/native/src/ops/window/sum.rs +++ b/native/src/ops/window/sum.rs @@ -7,43 +7,30 @@ use std::collections::VecDeque; use std::iter::FromIterator; use std::mem; -#[derive(Clone)] -struct Cache { - history: VecDeque, - x: f64, -} - -impl Cache { - pub fn new() -> Cache { - Cache { - history: VecDeque::new(), - x: 0., - } - } -} - pub struct TSSum { win_size: usize, - s: BoxOp, + inner: BoxOp, - cache: Cache, - warmup: usize, + window: VecDeque, + sum: f64, + i: usize, } impl Clone for TSSum { fn clone(&self) -> Self { - Self::new(self.win_size, self.s.clone()) + Self::new(self.win_size, self.inner.clone()) } } impl TSSum { - pub fn new(win_size: usize, s: BoxOp) -> Self { + pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, - s, + inner, - cache: Cache::new(), - warmup: 0, + window: VecDeque::with_capacity(win_size), + sum: 0., + i: 0, } } } @@ -55,55 +42,56 @@ impl Named for TSSum { impl Operator for TSSum { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let ss = &*self.s.update(tb)?; + let vals = &*self.inner.update(tb)?; - let mut results = Vec::with_capacity(ss.len()); + assert_eq!(tb.len(), vals.len()); - let mut i = 0; - while i + self.warmup < self.s.ready_offset() && i < ss.len() { - results.push(f64::NAN); - i += 1; - } + let mut results = Vec::with_capacity(tb.len()); - while i + self.warmup < self.ready_offset() && i < ss.len() { - self.cache.history.push_back(ss[i]); - self.cache.x += ss[i]; + for &val in vals { + if self.i < self.inner.ready_offset() { + results.push(f64::NAN); + self.i += 1; + continue; + } - results.push(f64::NAN); - i += 1; - } - self.warmup += i; + self.window.push_back(val); + self.sum += val; + let val = if self.window.len() == self.win_size { + let val = self.fchecked(self.sum)?; - for i in i..ss.len() { - let val = ss[i]; - // maintain - self.cache.history.push_back(val); - self.cache.x += val; + self.sum -= self.window.pop_front().unwrap(); - // compute - results.push(self.fchecked(self.cache.x)?); + val + } else { + f64::NAN + }; - // maintain - self.cache.x -= self.cache.history.pop_front().unwrap(); + results.push(val); } results.into() } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.win_size - 1 + self.inner.ready_offset() + self.win_size - 1 } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.win_size, self.s.to_string()) + format!( + "({} {} {})", + Self::NAME, + self.win_size, + self.inner.to_string() + ) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -111,7 +99,7 @@ impl Operator for TSSum { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -121,10 +109,10 @@ impl Operator for TSSum { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -137,13 +125,13 @@ impl Operator for TSSum { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/python.rs b/native/src/python.rs index 9ebcfbc..a6ee0e0 100644 --- a/native/src/python.rs +++ b/native/src/python.rs @@ -129,6 +129,12 @@ pub fn replay<'py>( mut ops: Vec>, njobs: usize, ) -> PyResult { + if array.len() % schema.len() != 0 { + throw!(PyValueError::new_err( + "Number of arrays is not divisible by schema length" + )) + } + let mut ops: Vec<_> = ops.iter_mut().map(|f| f.borrow_mut(py)).collect(); let ops = ops .iter_mut()