From f5ea101df7a7340d0672af3d9a0a887a624c8431 Mon Sep 17 00:00:00 2001 From: Weiyuan Wu Date: Mon, 22 Jan 2024 19:26:35 -0800 Subject: [PATCH] support reset and remove TS in naming --- README.md | 59 +++++----- native/src/lib.rs | 1 + native/src/ops/arithmetic.rs | 48 +++++--- native/src/ops/constant.rs | 2 + native/src/ops/getter.rs | 2 + native/src/ops/logic.rs | 18 +++ native/src/ops/mod.rs | 1 + native/src/ops/overlap_studies.rs | 17 ++- native/src/ops/window/correlation.rs | 66 +++++------ native/src/ops/window/delay.rs | 6 + native/src/ops/window/mean.rs | 7 ++ native/src/ops/window/minmax.rs | 54 ++++----- native/src/ops/window/quantile.rs | 8 ++ native/src/ops/window/rank.rs | 7 ++ native/src/ops/window/returns.rs | 7 ++ native/src/ops/window/skew.rs | 7 ++ native/src/ops/window/stdev.rs | 7 ++ native/src/ops/window/sum.rs | 7 ++ native/src/python.rs | 43 ++++++++ native/src/replay.rs | 4 +- python/factor_expr/replay.py | 157 +++++++++------------------ 21 files changed, 299 insertions(+), 229 deletions(-) diff --git a/README.md b/README.md index f492c57..efb665a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Factor Values -(TSLogReturn 30 :close) +(LogReturn 30 :close) + 2019-12-27~2020-01-14.pq = @@ -73,7 +73,7 @@ For example, on a daily OHLC dataset, the 30 days log return on the column `clos ```python from factor_expr import Factor -Factor("(TSLogReturn 30 :close)") +Factor("(LogReturn 30 :close)") ``` Note, in `Factor Expr`, column names are referred by the `:column-name` syntax. @@ -87,7 +87,7 @@ from factor_expr import Factor, replay result = await replay( ["data.pq"], - [Factor("(TSLogReturn 30 :close)")] + [Factor("(LogReturn 30 :close)")] ) ``` @@ -99,7 +99,7 @@ In case of multiple datasets are passed in, the results will be concatenated wit For example, the code above will give you a DataFrame looks similar to this: -| index | (TSLogReturn 30 :close) | +| index | (LogReturn 30 :close) | | ----- | ----------------------- | | 0 | 0.23 | | ... | ... | @@ -150,24 +150,24 @@ Any `` larger than 0 are treated as `true`. All the window functions take a window size as the first argument. The computation will be done on the look-back window with the size given in ``. -* Sum of the window elements: `(TSSum )` -* Mean of the window elements: `(TSMean )` -* Min of the window elements: `(TSMin )` -* Max of the window elements: `(TSMax )` -* The index of the min of the window elements: `(TSArgMin )` -* The index of the max of the window elements: `(TSArgMax )` -* Stdev of the window elements: `(TSStd )` -* Skew of the window elements: `(TSSkew )` -* The rank (ascending) of the current element in the window: `(TSRank )` +* Sum of the window elements: `(Sum )` +* Mean of the window elements: `(Mean )` +* Min of the window elements: `(Min )` +* Max of the window elements: `(Max )` +* The index of the min of the window elements: `(ArgMin )` +* The index of the max of the window elements: `(ArgMax )` +* Stdev of the window elements: `(Std )` +* Skew of the window elements: `(Skew )` +* The rank (ascending) of the current element in the window: `(Rank )` * The value `` ticks back: `(Delay )` -* The log return of the value `` ticks back to current value: `(TSLogReturn )` -* Rolling correlation between two series: `(TSCorrelation )` -* Rolling quantile of a series: `(TSQuantile )`, e.g. `(TSQuantile 100 0.5 )` computes the median of a window sized 100. +* The log return of the value `` ticks back to current value: `(LogReturn )` +* Rolling correlation between two series: `(Correlation )` +* Rolling quantile of a series: `(Quantile )`, e.g. `(Quantile 100 0.5 )` computes the median of a window sized 100. #### Warm-up Period for Window Functions Factors containing window functions require a warm-up period. For example, for -`(TSSum 10 :close)`, it will not generate data until the 10th tick is replayed. +`(Sum 10 :close)`, it will not generate data until the 10th tick is replayed. In this case, `replay` will write `NaN` into the result during the warm-up period, until the factor starts to produce data. This ensures the length of the factor output will be as same as the length of the input dataset. You can use the `trim` parameter to let replay trim off the warm-up period before it returns. @@ -194,7 +194,7 @@ pd.DataFrame({ result = await replay( ["data.pq"], - [Factor("(TSLogReturn 30 :close)")], + [Factor("(LogReturn 30 :close)")], index_col="time", ) ``` @@ -294,13 +294,11 @@ async def replay( files: Iterable[str], factors: List[Factor], *, - predicate: Optional[Factor] = None, + reset: bool = True, batch_size: int = 40960, n_data_jobs: int = 1, n_factor_jobs: int = 1, pbar: bool = True, - trim: bool = False, - index_col: Optional[str] = None, verbose: bool = False, output: Literal["pandas", "pyarrow", "raw"] = "pandas", ) -> Union[pd.DataFrame, pa.Table]: @@ -309,12 +307,13 @@ async def replay( Parameters ---------- - files: Iterable[str] - Paths to the datasets. Currently, only parquet format is supported. + files: Iterable[str | pa.Table] + Paths to the datasets. Or already read pyarrow Tables. factors: List[Factor] - A list of Factors to replay on the given set of files. - predicate: Optional[Factor] = None - Use a predicate to pre-filter the replay result. Any value larger than 0 is treated as True. + A list of Factors to replay. + reset: bool = True + Whether to reset the factors. Factors carries memory about the data they already replayed. If you are calling + replay multiple times and the factors should not starting from fresh, set this to False. batch_size: int = 40960 How many rows to replay at one time. Default is 40960 rows. n_data_jobs: int = 1 @@ -324,14 +323,10 @@ async def replay( e.g. if `n_data_jobs=3` and `n_factor_jobs=5`, you will have 3 * 5 threads running concurrently. pbar: bool = True Whether to show the progress bar using tqdm. - trim: bool = False - Whether to trim the warm up period off from the result. - index_col: Optional[str] = None - Set the index column. verbose: bool = False If True, failed factors will be printed out in stderr. - output: Literal["pandas" | "pyarrow" | "raw"] = "pandas" - The return format, can be pandas DataFrame ("pandas") or pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw"). + output: Literal["pyarrow" | "raw"] = "pyarrow" + The return format, can be pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw"). """ ``` diff --git a/native/src/lib.rs b/native/src/lib.rs index 73945f9..ef30da7 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -21,6 +21,7 @@ fn _lib(py: Python, m: &PyModule) -> PyResult<()> { )?; m.add_class::()?; m.add_function(wrap_pyfunction!(python::replay, m)?)?; + m.add_function(wrap_pyfunction!(python::replay_file, m)?)?; Ok(()) } diff --git a/native/src/ops/arithmetic.rs b/native/src/ops/arithmetic.rs index 2e73d70..44b0d2e 100644 --- a/native/src/ops/arithmetic.rs +++ b/native/src/ops/arithmetic.rs @@ -30,6 +30,12 @@ macro_rules! impl_arithmetic_bivariate { } impl Operator for $op { + fn reset(&mut self) { + self.l.reset(); + self.r.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let (l, r) = (&mut self.l, &mut self.r); @@ -188,6 +194,11 @@ macro_rules! impl_arithmetic_univariate { } impl Operator for $op { + fn reset(&mut self) { + self.inner.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; @@ -301,20 +312,20 @@ macro_rules! impl_arithmetic_univariate_1arg { ($([$name:tt => $op:ident: $($func:tt)+])+) => { $( pub struct $op { - s: BoxOp, + inner: BoxOp, p: f64, i: usize, } impl Clone for $op { fn clone(&self) -> Self { - Self::new(self.p, self.s.clone()) + Self::new(self.p, self.inner.clone()) } } impl $op { - pub fn new(p: f64, s: BoxOp) -> Self { - Self { p, s, i: 0 } + pub fn new(p: f64, inner: BoxOp) -> Self { + Self { p, inner, i: 0 } } } @@ -323,16 +334,21 @@ macro_rules! impl_arithmetic_univariate_1arg { } impl Operator for $op { + fn reset(&mut self) { + self.inner.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { - let vals = &*self.s.update(tb)?; + let vals = &*self.inner.update(tb)?; #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { - if self.i < self.s.ready_offset() { + if self.i < self.inner.ready_offset() { #[cfg(feature = "check")] assert!(val.is_nan()); results.push(f64::NAN); @@ -348,19 +364,19 @@ macro_rules! impl_arithmetic_univariate_1arg { } fn ready_offset(&self) -> usize { - self.s.ready_offset() + self.inner.ready_offset() } fn to_string(&self) -> String { - format!("({} {} {})", Self::NAME, self.p, self.s.to_string()) + format!("({} {} {})", Self::NAME, self.p, self.inner.to_string()) } fn depth(&self) -> usize { - 1 + self.s.depth() + 1 + self.inner.depth() } fn len(&self) -> usize { - self.s.len() + 1 + self.inner.len() + 1 } fn child_indices(&self) -> Vec { @@ -368,7 +384,7 @@ macro_rules! impl_arithmetic_univariate_1arg { } fn columns(&self) -> Vec { - self.s.columns() + self.inner.columns() } #[throws(as Option)] @@ -378,10 +394,10 @@ macro_rules! impl_arithmetic_univariate_1arg { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { - self.s.get(i)? + self.inner.get(i)? } else { throw!() } @@ -394,13 +410,13 @@ macro_rules! impl_arithmetic_univariate_1arg { } let i = i - 1; - let ns = self.s.len(); + let ns = self.inner.len(); if i < ns { if i == 0 { - return mem::replace(&mut self.s, op) as BoxOp; + return mem::replace(&mut self.inner, op) as BoxOp; } - self.s.insert(i, op)? + self.inner.insert(i, op)? } else { throw!() } diff --git a/native/src/ops/constant.rs b/native/src/ops/constant.rs index 2e8ccdb..05c425a 100644 --- a/native/src/ops/constant.rs +++ b/native/src/ops/constant.rs @@ -5,6 +5,8 @@ use fehler::{throw, throws}; use std::borrow::Cow; impl Operator for f64 { + fn reset(&mut self) {} + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { vec![*self; tb.len()].into() diff --git a/native/src/ops/getter.rs b/native/src/ops/getter.rs index 9e51393..f5710c3 100644 --- a/native/src/ops/getter.rs +++ b/native/src/ops/getter.rs @@ -24,6 +24,8 @@ impl Named for Getter { } impl Operator for Getter { + fn reset(&mut self) {} + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { if matches!(self.idx, None) { diff --git a/native/src/ops/logic.rs b/native/src/ops/logic.rs index e9715f8..c3e093b 100644 --- a/native/src/ops/logic.rs +++ b/native/src/ops/logic.rs @@ -34,6 +34,13 @@ impl Named for If { } impl Operator for If { + fn reset(&mut self) { + self.cond.reset(); + self.btrue.reset(); + self.bfalse.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let cond = &mut self.cond; @@ -224,6 +231,12 @@ macro_rules! impl_logic_bivariate { impl Operator for $op { + fn reset(&mut self) { + self.l.reset(); + self.r.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let (l, r) = (&mut self.l, &mut self.r); @@ -384,6 +397,11 @@ impl Named for Not { } impl Operator for Not { + fn reset(&mut self) { + self.inner.reset(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/mod.rs b/native/src/ops/mod.rs index b68e318..8e5c6e8 100644 --- a/native/src/ops/mod.rs +++ b/native/src/ops/mod.rs @@ -33,6 +33,7 @@ where fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]>; fn ready_offset(&self) -> usize; // A.K.A. at offset the output of factor is first time not nan fn to_string(&self) -> String; + fn reset(&mut self); fn len(&self) -> usize; fn depth(&self) -> usize; diff --git a/native/src/ops/overlap_studies.rs b/native/src/ops/overlap_studies.rs index 5986826..b3d8f94 100644 --- a/native/src/ops/overlap_studies.rs +++ b/native/src/ops/overlap_studies.rs @@ -23,14 +23,14 @@ impl Clone for SMA { } impl SMA { - pub fn new(inner: BoxOp, n: usize) -> Self { + pub fn new(inner: BoxOp, win_size: usize) -> Self { Self { - window: VecDeque::with_capacity(n), + inner, + win_size, + + window: VecDeque::with_capacity(win_size), sum: 0., i: 0, - - inner, - win_size: n, } } } @@ -40,6 +40,13 @@ impl Named for SMA { } impl Operator for SMA { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.sum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/correlation.rs b/native/src/ops/window/correlation.rs index 3220c09..7ba58f2 100644 --- a/native/src/ops/window/correlation.rs +++ b/native/src/ops/window/correlation.rs @@ -4,31 +4,15 @@ use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; use std::{borrow::Cow, cmp::max, collections::VecDeque, iter::FromIterator, mem}; -#[derive(Clone)] -struct Cache { - history: VecDeque<(f64, f64)>, - - x: f64, - y: f64, -} - -impl Cache { - fn new() -> Cache { - Cache { - history: VecDeque::new(), - - x: 0., - y: 0., - } - } -} - pub struct Correlation { win_size: usize, x: BoxOp, y: BoxOp, - cache: Cache, + window: VecDeque<(f64, f64)>, + + xsum: f64, + ysum: f64, i: usize, } @@ -45,7 +29,9 @@ impl Correlation { x, y, - cache: Cache::new(), + window: VecDeque::new(), + xsum: 0., + ysum: 0., i: 0, } } @@ -56,6 +42,15 @@ impl Named for Correlation { } impl Operator for Correlation { + fn reset(&mut self) { + self.x.reset(); + self.y.reset(); + self.window.clear(); + self.xsum = 0.; + self.ysum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let (x, y) = (&mut self.x, &mut self.y); @@ -77,30 +72,27 @@ impl Operator for Correlation { continue; } - self.cache.history.push_back((xval, yval)); - self.cache.x += xval; - self.cache.y += yval; + self.window.push_back((xval, yval)); + self.xsum += xval; + self.ysum += yval; - let val = if self.cache.history.len() == self.win_size { - let n = self.cache.history.len() as f64; // this should be equal to self.win_size - let xbar = self.cache.x / n; - let ybar = self.cache.y / n; + let val = if self.window.len() == self.win_size { + let n = self.window.len() as f64; // this should be equal to self.win_size + let xbar = self.xsum / n; + let ybar = self.ysum / n; let nom = self - .cache - .history + .window .iter() .map(|(x, y)| (x - xbar) * (y - ybar)) .sum::(); let denomx = self - .cache - .history + .window .iter() .map(|(x, _)| (x - xbar).powf(2.)) .sum::() .sqrt(); let denomy = self - .cache - .history + .window .iter() .map(|(_, y)| (y - ybar).powf(2.)) .sum::() @@ -113,9 +105,9 @@ impl Operator for Correlation { } else { self.fchecked(nom / denom)? }; - let (xval, yval) = self.cache.history.pop_front().unwrap(); - self.cache.x -= xval; - self.cache.y -= yval; + let (xval, yval) = self.window.pop_front().unwrap(); + self.xsum -= xval; + self.ysum -= yval; val } else { f64::NAN diff --git a/native/src/ops/window/delay.rs b/native/src/ops/window/delay.rs index 0b9643c..ee034fa 100644 --- a/native/src/ops/window/delay.rs +++ b/native/src/ops/window/delay.rs @@ -34,6 +34,12 @@ impl Named for Delay { } impl Operator for Delay { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/mean.rs b/native/src/ops/window/mean.rs index a3a3330..c410e25 100644 --- a/native/src/ops/window/mean.rs +++ b/native/src/ops/window/mean.rs @@ -37,6 +37,13 @@ impl Named for Mean { } impl Operator for Mean { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.sum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/minmax.rs b/native/src/ops/window/minmax.rs index 7f051d1..b27f470 100644 --- a/native/src/ops/window/minmax.rs +++ b/native/src/ops/window/minmax.rs @@ -4,21 +4,6 @@ use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -#[derive(Clone)] -struct Cache { - history: VecDeque<(usize, f64)>, - seq: usize, -} - -impl Cache { - fn new() -> Cache { - Cache { - history: VecDeque::new(), - seq: 0, - } - } -} - macro_rules! impl_minmax { ($($op:ident $cmp:tt {$($vfunc:tt)+})+) => { $( @@ -26,7 +11,8 @@ macro_rules! impl_minmax { win_size: usize, inner: BoxOp, - cache: Cache, + window: VecDeque<(usize, f64)>, + seq: usize, i: usize, } @@ -42,7 +28,8 @@ macro_rules! impl_minmax { win_size, inner, - cache: Cache::new(), + window: VecDeque::new(), + seq: 0, i: 0, } } @@ -53,6 +40,13 @@ macro_rules! impl_minmax { } impl Operator for $op { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.seq = 0; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; @@ -70,28 +64,28 @@ macro_rules! impl_minmax { continue; } - self.cache.seq += 1; + self.seq += 1; - while let Some((seq_old, _)) = self.cache.history.front() { - if seq_old + self.win_size <= self.cache.seq { - self.cache.history.pop_front(); + while let Some((seq_old, _)) = self.window.front() { + if seq_old + self.win_size <= self.seq { + self.window.pop_front(); } else { break; } } - while let Some((_, last_val)) = self.cache.history.back() { + while let Some((_, last_val)) = self.window.back() { if val $cmp *last_val { - self.cache.history.pop_back(); + self.window.pop_back(); } else { break; } } - self.cache.history.push_back((self.cache.seq, val)); + self.window.push_back((self.seq, val)); - let val = if self.cache.history.len() == self.win_size { - let val = ($($vfunc)+) (&self.cache, self.win_size); + let val = if self.window.len() == self.win_size { + let val = ($($vfunc)+) (&self.window, self.seq, self.win_size); val } else { f64::NAN @@ -182,8 +176,8 @@ macro_rules! impl_minmax { } impl_minmax! { - Min < { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } - Max > { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } - ArgMin < { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } - ArgMax > { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } + Min < { |window: &VecDeque<(usize, f64)>, _: usize, _: usize| window.front().unwrap().1 } + Max > { |window: &VecDeque<(usize, f64)>, _: usize, _: usize| window.front().unwrap().1 } + ArgMin < { |window: &VecDeque<(usize, f64)>, seq: usize, win_size: usize| (window.front().unwrap().0 + win_size - seq - 1) as f64 } + ArgMax > { |window: &VecDeque<(usize, f64)>, seq: usize, win_size: usize| (window.front().unwrap().0 + win_size - seq - 1) as f64 } } diff --git a/native/src/ops/window/quantile.rs b/native/src/ops/window/quantile.rs index bd694fa..b2a687e 100644 --- a/native/src/ops/window/quantile.rs +++ b/native/src/ops/window/quantile.rs @@ -33,6 +33,7 @@ impl Quantile { inner, quantile, r: ((win_size - 1) as f64 * quantile).floor() as usize, + window: VecDeque::with_capacity(win_size), ostree: OSTree::new(), i: 0, @@ -45,6 +46,13 @@ impl Named for Quantile { } impl Operator for Quantile { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.ostree.clear(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/rank.rs b/native/src/ops/window/rank.rs index cdb4ee7..00471a9 100644 --- a/native/src/ops/window/rank.rs +++ b/native/src/ops/window/rank.rs @@ -41,6 +41,13 @@ impl Named for Rank { } impl Operator for Rank { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.ostree.clear(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/returns.rs b/native/src/ops/window/returns.rs index b06ab74..c081be5 100644 --- a/native/src/ops/window/returns.rs +++ b/native/src/ops/window/returns.rs @@ -23,6 +23,7 @@ impl LogReturn { Self { win_size, inner, + window: VecDeque::with_capacity(win_size + 1), i: 0, } @@ -34,6 +35,12 @@ impl Named for LogReturn { } impl Operator for LogReturn { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/skew.rs b/native/src/ops/window/skew.rs index e72c87f..7266a6e 100644 --- a/native/src/ops/window/skew.rs +++ b/native/src/ops/window/skew.rs @@ -37,6 +37,13 @@ impl Named for Skew { } impl Operator for Skew { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.sum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/stdev.rs b/native/src/ops/window/stdev.rs index 0ea5826..726aef4 100644 --- a/native/src/ops/window/stdev.rs +++ b/native/src/ops/window/stdev.rs @@ -37,6 +37,13 @@ impl Named for Stdev { } impl Operator for Stdev { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.sum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/ops/window/sum.rs b/native/src/ops/window/sum.rs index 4cde8d2..30db1cc 100644 --- a/native/src/ops/window/sum.rs +++ b/native/src/ops/window/sum.rs @@ -37,6 +37,13 @@ impl Named for Sum { } impl Operator for Sum { + fn reset(&mut self) { + self.inner.reset(); + self.window.clear(); + self.sum = 0.; + self.i = 0; + } + #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; diff --git a/native/src/python.rs b/native/src/python.rs index 05df01f..7859213 100644 --- a/native/src/python.rs +++ b/native/src/python.rs @@ -44,6 +44,10 @@ impl Factor { self.op.ready_offset() } + pub fn reset(&mut self) { + self.op.reset() + } + pub fn replace<'p>(&self, i: usize, other: PyRef<'p, Factor>) -> PyResult { if i == 0 { return Ok(Factor { @@ -193,3 +197,42 @@ pub fn replay<'py>( .collect(), }) } + +#[pyfunction] +pub fn replay_file<'py>( + py: Python<'py>, + file: &str, + mut ops: Vec>, + njobs: usize, +) -> PyResult { + let mut ops: Vec<_> = ops.iter_mut().map(|f| f.borrow_mut(py)).collect(); + let ops = ops + .iter_mut() + .map(|f| (&mut *f.op) as &mut dyn Operator) + .collect(); + + let (succeeded, failed) = py + .allow_threads(|| -> Result<_> { + let pool = rayon::ThreadPoolBuilder::new().num_threads(njobs).build()?; + Ok(pool.install(|| crate::replay::replay_file(file, ops, None))?) + }) + .map_err(|e| PyValueError::new_err(format!("{}", e)))?; + + Ok(ReplayResult { + succeeded: succeeded + .into_iter() + .map(|(k, v)| { + let data = v.into_data(); + let (array, schema) = ffi::to_ffi(&data).unwrap(); + let array = Box::into_raw(Box::new(array)); + let schema = Box::into_raw(Box::new(schema)); + + (k, (array as usize, schema as usize)) + }) + .collect(), + failed: failed + .into_iter() + .map(|(k, v)| (k, format!("{}", v))) + .collect(), + }) +} diff --git a/native/src/replay.rs b/native/src/replay.rs index b6ced18..2d7dbd7 100644 --- a/native/src/replay.rs +++ b/native/src/replay.rs @@ -75,7 +75,7 @@ pub fn replay_file( path: &str, ops: Vec<&mut (dyn Operator)>, batch_size: O, -) -> (usize, HashMap, HashMap) +) -> (HashMap, HashMap) where O: Into>, { @@ -116,5 +116,5 @@ where Some(nrows), )?; - (nrows, succeeded, failed) + (succeeded, failed) } diff --git a/python/factor_expr/replay.py b/python/factor_expr/replay.py index 0755ba8..0f7047a 100644 --- a/python/factor_expr/replay.py +++ b/python/factor_expr/replay.py @@ -4,6 +4,7 @@ from sys import stderr from typing import Iterable, List, Literal, Optional, Set, Tuple, Union, AsyncGenerator, cast from functools import partial +from tqdm.auto import tqdm import numpy as np import pandas as pd @@ -12,44 +13,33 @@ import pyarrow.compute as pc from ._lib import Factor -from ._lib import replay as _native_replay - -try: - from IPython import get_ipython - - if get_ipython() is not None: - from tqdm.notebook import tqdm - else: - from tqdm import tqdm -except Exception: - from tqdm import tqdm +from ._lib import replay as _native_replay, replay_file as _native_replay_file async def replay( files: Iterable[str | pa.Table], factors: List[Factor], *, - predicate: Optional[Factor] = None, + reset: bool = True, batch_size: int = 40960, n_data_jobs: int = 1, n_factor_jobs: int = 1, pbar: bool = True, - trim: bool = False, - index_col: Optional[str] = None, verbose: bool = False, - output: Literal["pandas", "pyarrow", "raw"] = "pandas", -) -> pd.DataFrame | pa.Table: + output: Literal["pyarrow", "raw"] = "pyarrow", +) -> pa.Table: """ Replay a list of factors on a bunch of data. Parameters ---------- - files: Iterable[str] - Paths to the datasets. Currently only parquet format is supported. + files: Iterable[str | pa.Table] + Paths to the datasets. Or already read pyarrow Tables. factors: List[Factor] - A list of Factors to replay on the given set of files. - predicate: Optional[Factor] = None - Use a predicate to pre-filter the replay result. Any value larger than 0 is treated as True. + A list of Factors to replay. + reset: bool = True + Whether to reset the factors. Factors carries memory about the data they already replayed. If you are calling + replay multiple times and the factors should not starting from fresh, set this to False. batch_size: int = 40960 How many rows to replay at one time. Default is 40960 rows. n_data_jobs: int = 1 @@ -59,14 +49,10 @@ async def replay( e.g. if `n_data_jobs=3` and `n_factor_jobs=5`, you will have 3 * 5 threads running concurrently. pbar: bool = True Whether to show the progress bar using tqdm. - trim: bool = False - Whether to trim the warm up period off from the result. - index_col: Optional[str] = None - Set the index column. verbose: bool = False If True, failed factors will be printed out in stderr. - output: Literal["pandas" | "pyarrow" | "raw"] = "pandas" - The return format, can be pandas DataFrame ("pandas") or pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw"). + output: Literal["pyarrow" | "raw"] = "pyarrow" + The return format, can be pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw"). Examples -------- @@ -77,8 +63,21 @@ async def replay( "2020-11-03T17:09:39.072000~2020-11-04T15:23:36.pq" ], factors = [ - Factor("(> (TSStd 60 (TSLogReturn 120 (+ :price_bid_l1_close :price_bid_l1_close))) 0.0005)"), - Factor("(Abs (TSLogReturn 120 (+ :price_bid_l1_close :price_ask_l1_close)))"), + Factor("(> (Std 60 (LogReturn 120 (+ :price_bid_l1_close :price_bid_l1_close))) 0.0005)"), + Factor("(Abs (LogReturn 120 (+ :price_bid_l1_close :price_ask_l1_close)))"), + ] + ) + ``` + ```python + tbs = [ + pq.read_parquet("2020-11-02T12:00:07.860000~2020-11-03T17:09:01.pq"), + pq.read_parquet("2020-11-03T17:09:39.072000~2020-11-04T15:23:36.pq"), + ] + replay( + files = tbs, + factors = [ + Factor("(> (Std 60 (LogReturn 120 (+ :price_bid_l1_close :price_bid_l1_close))) 0.0005)"), + Factor("(Abs (LogReturn 120 (+ :price_bid_l1_close :price_ask_l1_close)))"), ] ) ``` @@ -86,16 +85,17 @@ async def replay( factor_tables: List[pa.Table] = [] files = list(files) + if reset: + for factor in factors: + factor.reset() + with tqdm(total=len(files), leave=False, disable=not pbar) as progress: async for _, fvals in replay_iter( files, factors, - predicate=predicate, batch_size=batch_size, n_data_jobs=n_data_jobs, n_factor_jobs=n_factor_jobs, - trim=trim, - index_col=index_col, verbose=verbose, ): factor_tables.append(fvals) @@ -103,9 +103,6 @@ async def replay( if output == "pyarrow": factor_table = pa.concat_tables(factor_tables) - elif output == "pandas": - factor_table = pa.concat_tables(factor_tables) - factor_table = factor_table.to_pandas(self_destruct=True) elif output == "raw": factor_table = factor_tables else: @@ -118,7 +115,6 @@ async def replay_iter( files: Iterable[str | pa.Table], factors: List[Factor], *, - predicate: Optional[Factor] = None, batch_size: int = 40960, n_data_jobs: int = 1, n_factor_jobs: int = 1, @@ -141,12 +137,9 @@ async def replay_iter( _replay_single, dname, [f.clone() for f in factors], - predicate=predicate.clone() if predicate is not None else None, batch_size=batch_size, - trim=trim, - index_col=index_col, verbose=verbose, - n_factor_jobs=n_factor_jobs, + n_jobs=n_factor_jobs, ), ) @@ -199,98 +192,48 @@ def _replay_single( file: str | pa.Table, factors: List[Factor], *, - predicate: Optional[Factor] = None, batch_size: int = 40960, - trim: bool = False, - index_col: Optional[str] = None, - n_factor_jobs: int = 1, + n_jobs: int = 1, verbose: bool = False, ) -> Tuple[pa.Table, Set[str]]: - if not isinstance(file, pa.Table): - raise NotImplementedError - - tb = cast(pa.Table, file) - schema = tb.schema - ffi_schema, ffi_arrays, keepalive = table_to_pointers(file) - - if predicate is not None: - # put the predicate as the last - # replay_result = _native_replay(dname, [*factors, predicate], batch_size=batch_size, njobs=n_factor_jobs) - replay_result = _native_replay(ffi_schema, ffi_arrays, [*factors, predicate], njobs=n_factor_jobs) + if isinstance(file, str): + replay_result = _native_replay_file(file, factors, njobs=n_jobs) else: - replay_result = _native_replay(ffi_schema, ffi_arrays, factors, njobs=n_factor_jobs) + schema = file.schema + ffi_schema, ffi_arrays, keepalive = table_to_pointers(file) - table_datas, table_names = [], [] + replay_result = _native_replay(ffi_schema, ffi_arrays, factors, njobs=n_jobs) - # if index_col is not None: - # index = pq.read_table(dname, columns=[index_col]).column(index_col) - # table_datas.append(index) - # table_names.append(index_col) + table_datas, table_names = [], [] - predicate_values = None for i, (data_ptr, schema_ptr) in replay_result["succeeded"].items(): arr = pa.Array._import_from_c(data_ptr, schema_ptr) - if predicate is not None and i == len(factors): # is the predicate col - predicate_values = arr - else: - table_datas.append(arr) - table_names.append(str(factors[i])) + table_datas.append(arr) + table_names.append(str(factors[i])) # Fill in the failed columns if isinstance(file, pa.Table): N = len(file) + elif table_datas: + N = len(table_datas[0]) else: - raise NotImplementedError + tb = pq.read_metadata(file) + N = tb.num_rows nanarr = pa.array(np.empty(N, "f8"), mask=np.ones(N, "b1")) for i, reason in replay_result["failed"].items(): - if predicate is not None and i == len(factors): - raise ValueError("predicate failed to compute: {}", reason) - else: - table_datas.append(nanarr) - table_names.append(str(factors[i])) + table_datas.append(nanarr) + table_names.append(str(factors[i])) if verbose: print(f"{factors[i]} failed: {reason}", file=stderr) tb = pa.Table.from_arrays(table_datas, names=table_names) - if trim: - if index_col is not None: - # the first column is the index - data_starts = 1 - else: - data_starts = 0 - - ready_offset = np.max([Factor(col).ready_offset() for col in tb.column_names[data_starts:]]) - - tb = tb.slice(ready_offset) - - # trim predicate as well - if predicate_values is not None: - predicate_values = predicate_values.slice(ready_offset) - - if predicate is not None: - assert predicate_values is not None, "predicate_values is none, this is not possible" - - # filter the table using the predicate - tb = pc.filter(tb, pc.greater(predicate_values, 0.0)) - - if index_col is not None: - # sort the columns based on the order passed in - tb = tb.select([index_col] + [str(f) for f in factors]) - - # set the metadata for the index col, so that when `.to_pandas` is called, - # the index col automatically becomes the index. - header = tb.slice(0).to_pandas() - header = header.set_index(index_col) - _, _, metadata = pa.pandas_compat.dataframe_to_types(header, True) - tb = tb.replace_schema_metadata(metadata) - else: - # sort the columns based on the order passed in - tb = tb.select([str(f) for f in factors]) + # sort the columns based on the order passed in + tb = tb.select([str(f) for f in factors]) return ( tb,