Skip to content

Commit

Permalink
support reset and remove TS in naming
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Jan 23, 2024
1 parent 397f675 commit db28961
Show file tree
Hide file tree
Showing 21 changed files with 298 additions and 228 deletions.
59 changes: 27 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<th>Factor Values</th>
</tr>
<tr>
<td>(TSLogReturn 30 :close)</td>
<td>(LogReturn 30 :close)</td>
<td>+</td>
<td>2019-12-27~2020-01-14.pq</td>
<td>=</td>
Expand Down Expand Up @@ -73,7 +73,7 @@ For example, on a daily OHLC dataset, the 30 days log return on the column `clos
```python
from factor_expr import Factor

Factor("(TSLogReturn 30 :close)")
Factor("(LogReturn 30 :close)")
```

Note, in `Factor Expr`, column names are referred by the `:column-name` syntax.
Expand All @@ -87,7 +87,7 @@ from factor_expr import Factor, replay

result = await replay(
["data.pq"],
[Factor("(TSLogReturn 30 :close)")]
[Factor("(LogReturn 30 :close)")]
)
```

Expand All @@ -99,7 +99,7 @@ In case of multiple datasets are passed in, the results will be concatenated wit

For example, the code above will give you a DataFrame looks similar to this:

| index | (TSLogReturn 30 :close) |
| index | (LogReturn 30 :close) |
| ----- | ----------------------- |
| 0 | 0.23 |
| ... | ... |
Expand Down Expand Up @@ -150,24 +150,24 @@ Any `<expr>` larger than 0 are treated as `true`.

All the window functions take a window size as the first argument. The computation will be done on the look-back window with the size given in `<const>`.

* Sum of the window elements: `(TSSum <const> <expr>)`
* Mean of the window elements: `(TSMean <const> <expr>)`
* Min of the window elements: `(TSMin <const> <expr>)`
* Max of the window elements: `(TSMax <const> <expr>)`
* The index of the min of the window elements: `(TSArgMin <const> <expr>)`
* The index of the max of the window elements: `(TSArgMax <const> <expr>)`
* Stdev of the window elements: `(TSStd <const> <expr>)`
* Skew of the window elements: `(TSSkew <const> <expr>)`
* The rank (ascending) of the current element in the window: `(TSRank <const> <expr>)`
* Sum of the window elements: `(Sum <const> <expr>)`
* Mean of the window elements: `(Mean <const> <expr>)`
* Min of the window elements: `(Min <const> <expr>)`
* Max of the window elements: `(Max <const> <expr>)`
* The index of the min of the window elements: `(ArgMin <const> <expr>)`
* The index of the max of the window elements: `(ArgMax <const> <expr>)`
* Stdev of the window elements: `(Std <const> <expr>)`
* Skew of the window elements: `(Skew <const> <expr>)`
* The rank (ascending) of the current element in the window: `(Rank <const> <expr>)`
* The value `<const>` ticks back: `(Delay <const> <expr>)`
* The log return of the value `<const>` ticks back to current value: `(TSLogReturn <const> <expr>)`
* Rolling correlation between two series: `(TSCorrelation <const> <expr> <expr>)`
* Rolling quantile of a series: `(TSQuantile <const> <const> <expr>)`, e.g. `(TSQuantile 100 0.5 <expr>)` computes the median of a window sized 100.
* The log return of the value `<const>` ticks back to current value: `(LogReturn <const> <expr>)`
* Rolling correlation between two series: `(Correlation <const> <expr> <expr>)`
* Rolling quantile of a series: `(Quantile <const> <const> <expr>)`, e.g. `(Quantile 100 0.5 <expr>)` computes the median of a window sized 100.

#### Warm-up Period for Window Functions

Factors containing window functions require a warm-up period. For example, for
`(TSSum 10 :close)`, it will not generate data until the 10th tick is replayed.
`(Sum 10 :close)`, it will not generate data until the 10th tick is replayed.
In this case, `replay` will write `NaN` into the result during the warm-up period, until the factor starts to produce data.
This ensures the length of the factor output will be as same as the length of the input dataset. You can use the `trim`
parameter to let replay trim off the warm-up period before it returns.
Expand All @@ -194,7 +194,7 @@ pd.DataFrame({

result = await replay(
["data.pq"],
[Factor("(TSLogReturn 30 :close)")],
[Factor("(LogReturn 30 :close)")],
index_col="time",
)
```
Expand Down Expand Up @@ -294,13 +294,11 @@ async def replay(
files: Iterable[str],
factors: List[Factor],
*,
predicate: Optional[Factor] = None,
reset: bool = True,
batch_size: int = 40960,
n_data_jobs: int = 1,
n_factor_jobs: int = 1,
pbar: bool = True,
trim: bool = False,
index_col: Optional[str] = None,
verbose: bool = False,
output: Literal["pandas", "pyarrow", "raw"] = "pandas",
) -> Union[pd.DataFrame, pa.Table]:
Expand All @@ -309,12 +307,13 @@ async def replay(
Parameters
----------
files: Iterable[str]
Paths to the datasets. Currently, only parquet format is supported.
files: Iterable[str | pa.Table]
Paths to the datasets. Or already read pyarrow Tables.
factors: List[Factor]
A list of Factors to replay on the given set of files.
predicate: Optional[Factor] = None
Use a predicate to pre-filter the replay result. Any value larger than 0 is treated as True.
A list of Factors to replay.
reset: bool = True
Whether to reset the factors. Factors carries memory about the data they already replayed. If you are calling
replay multiple times and the factors should not starting from fresh, set this to False.
batch_size: int = 40960
How many rows to replay at one time. Default is 40960 rows.
n_data_jobs: int = 1
Expand All @@ -324,14 +323,10 @@ async def replay(
e.g. if `n_data_jobs=3` and `n_factor_jobs=5`, you will have 3 * 5 threads running concurrently.
pbar: bool = True
Whether to show the progress bar using tqdm.
trim: bool = False
Whether to trim the warm up period off from the result.
index_col: Optional[str] = None
Set the index column.
verbose: bool = False
If True, failed factors will be printed out in stderr.
output: Literal["pandas" | "pyarrow" | "raw"] = "pandas"
The return format, can be pandas DataFrame ("pandas") or pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw").
output: Literal["pyarrow" | "raw"] = "pyarrow"
The return format, can be pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw").
"""
```

1 change: 1 addition & 0 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fn _lib(py: Python, m: &PyModule) -> PyResult<()> {
)?;
m.add_class::<Factor>()?;
m.add_function(wrap_pyfunction!(python::replay, m)?)?;
m.add_function(wrap_pyfunction!(python::replay_file, m)?)?;

Ok(())
}
48 changes: 32 additions & 16 deletions native/src/ops/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ macro_rules! impl_arithmetic_bivariate {
}

impl<T: TickerBatch> Operator<T> for $op<T> {
fn reset(&mut self) {
self.l.reset();
self.r.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (l, r) = (&mut self.l, &mut self.r);
Expand Down Expand Up @@ -188,6 +194,11 @@ macro_rules! impl_arithmetic_univariate {
}

impl<T: TickerBatch> Operator<T> for $op<T> {
fn reset(&mut self) {
self.inner.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
Expand Down Expand Up @@ -301,20 +312,20 @@ macro_rules! impl_arithmetic_univariate_1arg {
($([$name:tt => $op:ident: $($func:tt)+])+) => {
$(
pub struct $op<T> {
s: BoxOp<T>,
inner: BoxOp<T>,
p: f64,
i: usize,
}

impl<T> Clone for $op<T> {
fn clone(&self) -> Self {
Self::new(self.p, self.s.clone())
Self::new(self.p, self.inner.clone())
}
}

impl<T> $op<T> {
pub fn new(p: f64, s: BoxOp<T>) -> Self {
Self { p, s, i: 0 }
pub fn new(p: f64, inner: BoxOp<T>) -> Self {
Self { p, inner, i: 0 }
}
}

Expand All @@ -323,16 +334,21 @@ macro_rules! impl_arithmetic_univariate_1arg {
}

impl<T: TickerBatch> Operator<T> for $op<T> {
fn reset(&mut self) {
self.inner.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.s.update(tb)?;
let vals = &*self.inner.update(tb)?;
#[cfg(feature = "check")]
assert_eq!(tb.len(), vals.len());

let mut results = Vec::with_capacity(tb.len());

for &val in vals {
if self.i < self.s.ready_offset() {
if self.i < self.inner.ready_offset() {
#[cfg(feature = "check")]
assert!(val.is_nan());
results.push(f64::NAN);
Expand All @@ -348,27 +364,27 @@ macro_rules! impl_arithmetic_univariate_1arg {
}

fn ready_offset(&self) -> usize {
self.s.ready_offset()
self.inner.ready_offset()
}

fn to_string(&self) -> String {
format!("({} {} {})", Self::NAME, self.p, self.s.to_string())
format!("({} {} {})", Self::NAME, self.p, self.inner.to_string())
}

fn depth(&self) -> usize {
1 + self.s.depth()
1 + self.inner.depth()
}

fn len(&self) -> usize {
self.s.len() + 1
self.inner.len() + 1
}

fn child_indices(&self) -> Vec<usize> {
vec![1]
}

fn columns(&self) -> Vec<String> {
self.s.columns()
self.inner.columns()
}

#[throws(as Option)]
Expand All @@ -378,10 +394,10 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
let i = i - 1;

let ns = self.s.len();
let ns = self.inner.len();

if i < ns {
self.s.get(i)?
self.inner.get(i)?
} else {
throw!()
}
Expand All @@ -394,13 +410,13 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
let i = i - 1;

let ns = self.s.len();
let ns = self.inner.len();

if i < ns {
if i == 0 {
return mem::replace(&mut self.s, op) as BoxOp<T>;
return mem::replace(&mut self.inner, op) as BoxOp<T>;
}
self.s.insert(i, op)?
self.inner.insert(i, op)?
} else {
throw!()
}
Expand Down
2 changes: 2 additions & 0 deletions native/src/ops/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use fehler::{throw, throws};
use std::borrow::Cow;

impl<T: TickerBatch> Operator<T> for f64 {
fn reset(&mut self) {}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
vec![*self; tb.len()].into()
Expand Down
2 changes: 2 additions & 0 deletions native/src/ops/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ impl Named for Getter {
}

impl<T: TickerBatch> Operator<T> for Getter {
fn reset(&mut self) {}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
if matches!(self.idx, None) {
Expand Down
18 changes: 18 additions & 0 deletions native/src/ops/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ impl<T> Named for If<T> {
}

impl<T: TickerBatch> Operator<T> for If<T> {
fn reset(&mut self) {
self.cond.reset();
self.btrue.reset();
self.bfalse.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let cond = &mut self.cond;
Expand Down Expand Up @@ -224,6 +231,12 @@ macro_rules! impl_logic_bivariate {

impl<T: TickerBatch> Operator<T> for $op<T>
{
fn reset(&mut self) {
self.l.reset();
self.r.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (l, r) = (&mut self.l, &mut self.r);
Expand Down Expand Up @@ -384,6 +397,11 @@ impl<T> Named for Not<T> {
}

impl<T: TickerBatch> Operator<T> for Not<T> {
fn reset(&mut self) {
self.inner.reset();
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
Expand Down
1 change: 1 addition & 0 deletions native/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ where
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]>;
fn ready_offset(&self) -> usize; // A.K.A. at offset the output of factor is first time not nan
fn to_string(&self) -> String;
fn reset(&mut self);

fn len(&self) -> usize;
fn depth(&self) -> usize;
Expand Down
17 changes: 12 additions & 5 deletions native/src/ops/overlap_studies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ impl<T> Clone for SMA<T> {
}

impl<T> SMA<T> {
pub fn new(inner: BoxOp<T>, n: usize) -> Self {
pub fn new(inner: BoxOp<T>, win_size: usize) -> Self {
Self {
window: VecDeque::with_capacity(n),
inner,
win_size,

window: VecDeque::with_capacity(win_size),
sum: 0.,
i: 0,

inner,
win_size: n,
}
}
}
Expand All @@ -40,6 +40,13 @@ impl<T> Named for SMA<T> {
}

impl<T: TickerBatch> Operator<T> for SMA<T> {
fn reset(&mut self) {
self.inner.reset();
self.window.clear();
self.sum = 0.;
self.i = 0;
}

#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
Expand Down
Loading

0 comments on commit db28961

Please sign in to comment.