Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Jan 21, 2024
1 parent 529f8fd commit 52472fd
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 548 deletions.
167 changes: 78 additions & 89 deletions native/src/ops/window/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,28 @@ impl Cache {

pub struct TSCorrelation<T> {
win_size: usize,
sx: BoxOp<T>,
sy: BoxOp<T>,
x: BoxOp<T>,
y: BoxOp<T>,

cache: Cache,
warmup: usize,
i: usize,
}

impl<T> Clone for TSCorrelation<T> {
fn clone(&self) -> Self {
Self::new(self.win_size, self.sx.clone(), self.sy.clone())
Self::new(self.win_size, self.x.clone(), self.y.clone())
}
}

impl<T> TSCorrelation<T> {
pub fn new(win_size: usize, x: BoxOp<T>, y: BoxOp<T>) -> Self {
Self {
win_size,
sx: x,
sy: y,
x,
y,

cache: Cache::new(),
warmup: 0,
i: 0,
}
}
}
Expand All @@ -62,113 +62,102 @@ impl<T> Named for TSCorrelation<T> {
impl<T: TickerBatch> Operator<T> for TSCorrelation<T> {
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (sx, sy) = (&mut self.sx, &mut self.sy);
let (xs, ys) = rayon::join(|| sx.update(tb), || sy.update(tb));
let (xs, ys) = (xs?, ys?);

let mut results = Vec::with_capacity(xs.len());

let mut i = 0;
while i + self.warmup < max(self.sx.ready_offset(), self.sy.ready_offset()) && i < xs.len()
{
results.push(f64::NAN);
i += 1;
}

while i + self.warmup < self.ready_offset() && i < xs.len() {
// maintain
let (xval, yval) = (xs[i], ys[i]);

self.cache.history.push_back((xval, yval));
self.cache.x += xval;
self.cache.y += yval;

results.push(f64::NAN);
i += 1;
}
self.warmup += i;

for i in i..xs.len() {
let (xval, yval) = (xs[i], ys[i]);
let (x, y) = (&mut self.x, &mut self.y);
let (xs, ys) = rayon::join(|| x.update(tb), || y.update(tb));
let (xs, ys) = (&*xs?, &*ys?);
assert_eq!(tb.len(), xs.len());
assert_eq!(tb.len(), ys.len());

let mut results = Vec::with_capacity(tb.len());

for (&xval, &yval) in xs.into_iter().zip(ys) {
if self.i < self.x.ready_offset() || self.i < self.y.ready_offset() {
results.push(f64::NAN);
self.i += 1;
continue;
}

// maintain
self.cache.history.push_back((xval, yval));
self.cache.x += xval;
self.cache.y += yval;

// compute
let n = self.cache.history.len() as f64; // this should be equal to self.win_size
let xbar = self.cache.x / n;
let ybar = self.cache.y / n;
let nom = self
.cache
.history
.iter()
.map(|(x, y)| (x - xbar) * (y - ybar))
.sum::<f64>();
let denomx = self
.cache
.history
.iter()
.map(|(x, _)| (x - xbar).powf(2.))
.sum::<f64>()
.sqrt();
let denomy = self
.cache
.history
.iter()
.map(|(_, y)| (y - ybar).powf(2.))
.sum::<f64>()
.sqrt();

let denom = denomx * denomy;

if denom == 0. {
results.push(0.);
let val = if self.cache.history.len() == self.win_size {
let n = self.cache.history.len() as f64; // this should be equal to self.win_size
let xbar = self.cache.x / n;
let ybar = self.cache.y / n;
let nom = self
.cache
.history
.iter()
.map(|(x, y)| (x - xbar) * (y - ybar))
.sum::<f64>();
let denomx = self
.cache
.history
.iter()
.map(|(x, _)| (x - xbar).powf(2.))
.sum::<f64>()
.sqrt();
let denomy = self
.cache
.history
.iter()
.map(|(_, y)| (y - ybar).powf(2.))
.sum::<f64>()
.sqrt();

let denom = denomx * denomy;

let val = if denom == 0. {
0.
} else {
self.fchecked(nom / denom)?
};
let (xval, yval) = self.cache.history.pop_front().unwrap();
self.cache.x -= xval;
self.cache.y -= yval;
val
} else {
results.push(self.fchecked(nom / denom)?);
}
f64::NAN
};

// maintain
let (xval, yval) = self.cache.history.pop_front().unwrap();
self.cache.x -= xval;
self.cache.y -= yval;
results.push(val);
}

results.into()
}

fn ready_offset(&self) -> usize {
max(self.sx.ready_offset(), self.sy.ready_offset()) + self.win_size - 1
max(self.x.ready_offset(), self.y.ready_offset()) + self.win_size - 1
}

fn to_string(&self) -> String {
format!(
"({} {} {} {})",
Self::NAME,
self.win_size,
self.sx.to_string(),
self.sy.to_string()
self.x.to_string(),
self.y.to_string()
)
}

fn depth(&self) -> usize {
1 + max(self.sx.depth(), self.sy.depth())
1 + max(self.x.depth(), self.y.depth())
}

fn len(&self) -> usize {
self.sx.len() + self.sy.len() + 1
self.x.len() + self.y.len() + 1
}

fn child_indices(&self) -> Vec<usize> {
vec![1, self.sx.len() + 1]
vec![1, self.x.len() + 1]
}

fn columns(&self) -> Vec<String> {
self.sx
self.x
.columns()
.into_iter()
.chain(self.sy.columns())
.chain(self.y.columns())
.collect()
}

Expand All @@ -179,13 +168,13 @@ impl<T: TickerBatch> Operator<T> for TSCorrelation<T> {
}
let i = i - 1;

let nx = self.sx.len();
let ny = self.sy.len();
let nx = self.x.len();
let ny = self.y.len();

if i < nx {
self.sx.get(i)?
self.x.get(i)?
} else if i >= nx && i < nx + ny {
self.sy.get(i - nx)?
self.y.get(i - nx)?
} else {
throw!()
}
Expand All @@ -198,19 +187,19 @@ impl<T: TickerBatch> Operator<T> for TSCorrelation<T> {
}
let i = i - 1;

let nx = self.sx.len();
let ny = self.sy.len();
let nx = self.x.len();
let ny = self.y.len();

if i < nx {
if i == 0 {
return mem::replace(&mut self.sx, op) as BoxOp<T>;
return mem::replace(&mut self.x, op) as BoxOp<T>;
}
self.sx.insert(i, op)?
self.x.insert(i, op)?
} else if i >= nx && i < nx + ny {
if i - nx == 0 {
return mem::replace(&mut self.sy, op) as BoxOp<T>;
return mem::replace(&mut self.y, op) as BoxOp<T>;
}
self.sy.insert(i - nx, op)?
self.y.insert(i - nx, op)?
} else {
throw!()
}
Expand Down
4 changes: 2 additions & 2 deletions native/src/ops/window/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ impl<T> Clone for Delay<T> {
}

impl<T> Delay<T> {
pub fn new(win_size: usize, s: BoxOp<T>) -> Self {
pub fn new(win_size: usize, inner: BoxOp<T>) -> Self {
Self {
win_size,
inner: s,
inner,
window: VecDeque::with_capacity(win_size + 1),
i: 0,
}
Expand Down
Loading

0 comments on commit 52472fd

Please sign in to comment.