Skip to content

Commit

Permalink
Optimize memory usage for TSDataset._merge_exog (#596)
Browse files Browse the repository at this point in the history
* added optimizations

* updated tests

* fixed tests

* minor updates

* updated changelog

* fixed tests

* typo
  • Loading branch information
brsnw250 authored Feb 20, 2025
1 parent 7508561 commit 797cfe3
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 52 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking:** Bump minimum `scipy` version to 1.12 ([#599](https://github.com/etna-team/etna/pull/599))
- **Breaking:** Bump minimum `optuna` version to 4.0 ([#599](https://github.com/etna-team/etna/pull/599))
- **Breaking:** Bump minimum `statsforecast` version to 2.0 ([#599](https://github.com/etna-team/etna/pull/599))
-
- Optimize performance of exogenous variables addition to the dataset ([#596](https://github.com/etna-team/etna/pull/596))
-

### Fixed
Expand Down
121 changes: 80 additions & 41 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union

Expand Down Expand Up @@ -141,15 +142,15 @@ def __init__(
"""
self.freq = freq
self.df_exog = None
self.raw_df = self._prepare_df(df=df.copy(deep=True), freq=freq)
self.df = self.raw_df.copy(deep=True)
self.raw_df = self._prepare_df(df=df, freq=freq)
self.df = self.raw_df.copy(deep=None)

self.hierarchical_structure = hierarchical_structure
self.current_df_level: Optional[str] = self._get_dataframe_level(df=self.df)
self.current_df_exog_level: Optional[str] = None

if df_exog is not None:
self.df_exog = self._prepare_df_exog(df_exog=df_exog.copy(deep=True), freq=freq)
self.df_exog = self._prepare_df_exog(df_exog=df_exog, freq=freq)

self.known_future = self._check_known_future(known_future, self.df_exog)
self._regressors = copy(self.known_future)
Expand Down Expand Up @@ -244,15 +245,15 @@ def create_from_misaligned(
freq=freq,
timestamp_name=original_timestamp_name,
)
timestamp_df = TSDataset.to_dataset(timestamp_df)

if df_exog is not None:
df_exog_realigned = apply_alignment(df=df_exog, alignment=alignment)

df_exog_realigned = pd.merge(df_exog_realigned, timestamp_df, how="outer", on=["timestamp", "segment"])
df_exog_realigned = TSDataset.to_dataset(df_exog_realigned)

df_exog_realigned = df_exog_realigned.join(timestamp_df, how="outer")
else:
df_exog_realigned = timestamp_df
df_exog_realigned = TSDataset.to_dataset(timestamp_df)

known_future_realigned: Union[Literal["all"], Sequence]
if known_future != "all":
Expand Down Expand Up @@ -328,6 +329,9 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:
if df_format is DataFrameFormat.long:
df = cls.to_dataset(df)

else:
df = df.copy(deep=True)

# cast segment to str type
cls._cast_segment_to_str(df)

Expand All @@ -338,7 +342,7 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:

new_index = np.arange(df.index.min(), df.index.max() + 1)
index_name = df.index.name
df = df.reindex(new_index)
df = df.reindex(new_index, copy=False)
df.index.name = index_name

else:
Expand All @@ -354,7 +358,9 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:
f"You probably set wrong freq. Discovered freq in you data is {inferred_freq}, you set {freq}"
)

df = df.asfreq(freq)
new_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq=freq)
new_index.name = df.index.name # type: ignore
df = df.reindex(new_index, copy=False)

return df

Expand All @@ -364,6 +370,9 @@ def _prepare_df_exog(cls, df_exog: pd.DataFrame, freq: Optional[str]) -> pd.Data
if df_format is DataFrameFormat.long:
df_exog = cls.to_dataset(df_exog)

else:
df_exog = df_exog.copy(deep=True)

df_exog = cls._cast_segment_to_str(df=df_exog)
if freq is not None:
cls._cast_index_to_datetime(df_exog, freq)
Expand Down Expand Up @@ -548,44 +557,73 @@ def _check_known_future(
return sorted(known_future_unique)

@staticmethod
def _check_regressors(df: pd.DataFrame, df_regressors: pd.DataFrame):
def _get_min_max_valid_timestamp(
df: pd.DataFrame, segments: Set[str], regressors: Optional[Sequence[str]] = None
) -> Tuple[Sequence[pd.Timestamp], Sequence[pd.Timestamp]]:
"""Estimate first and last valid indices for the dataframe."""
# shape: (num_samples, num_segments, num_features)
df_values = df.values.reshape((len(df), len(segments), -1))

if regressors is not None:
# expected equal features for all segments and sorted column index
features = df.columns.get_level_values("feature")
segment_features = features[: len(features) // len(segments)]
regressors_mask = segment_features.isin(set(regressors))
# shape: (num_samples, num_segments, num_regressors)
df_values = df_values[..., regressors_mask]

# shape: (num_samples, num_segments)
df_mask = ~np.any(pd.isna(df_values), axis=-1)

# shape: (num_segments,)
min_ids = np.argmax(df_mask, axis=0)
max_ids = len(df_mask) - np.argmax(df_mask[::-1], axis=0) - 1

min_index = df.index.values[min_ids]
max_index = df.index.values[max_ids]

none_segments = ~np.any(df_mask, axis=0)
min_index[none_segments] = np.datetime64("NaT")
max_index[none_segments] = np.datetime64("NaT")

return min_index, max_index

def _check_regressors(self, df: pd.DataFrame):
"""Check that regressors begin not later than in ``df`` and end later than in ``df``."""
if df_regressors.shape[1] == 0:
if len(self.known_future) == 0:
return
# TODO: check performance
df_segments = df.columns.get_level_values("segment")
for segment in df_segments:
target_min = df[segment]["target"].first_valid_index()
target_min = pd.NaT if target_min is None else target_min
target_max = df[segment]["target"].last_valid_index()
target_max = pd.NaT if target_max is None else target_max

exog_series_min = df_regressors[segment].first_valid_index()
exog_series_min = pd.NaT if exog_series_min is None else exog_series_min
exog_series_max = df_regressors[segment].last_valid_index()
exog_series_max = pd.NaT if exog_series_max is None else exog_series_max
if target_min < exog_series_min:

segments = set(df.columns.get_level_values("segment"))

target_min, target_max = self._get_min_max_valid_timestamp(df=df, segments=segments)
exog_series_min, exog_series_max = self._get_min_max_valid_timestamp(
df=self.df_exog, segments=segments, regressors=self.known_future
)

for i, segment in enumerate(segments):
if target_min[i] < exog_series_min[i]:
raise ValueError(
f"All the regressor series should start not later than corresponding 'target'."
f"Series of segment {segment} have not enough history: "
f"{target_min} < {exog_series_min}."
f"{target_min[i]} < {exog_series_min[i]}."
)
if target_max >= exog_series_max:
if target_max[i] >= exog_series_max[i]:
raise ValueError(
f"All the regressor series should finish later than corresponding 'target'."
f"Series of segment {segment} have not enough history: "
f"{target_max} >= {exog_series_max}."
f"{target_max[i]} >= {exog_series_max[i]}."
)

def _merge_exog(self, df: pd.DataFrame) -> pd.DataFrame:
if self.df_exog is None:
raise ValueError("Something went wrong, Trying to merge df_exog which is None!")

# TODO: this check could probably be skipped at make_future
df_regressors = self.df_exog.loc[:, pd.IndexSlice[:, self.known_future]]
self._check_regressors(df=df, df_regressors=df_regressors)
self._check_regressors(df=df)

df = df.merge(self.df_exog, how="left", left_index=True, right_index=True)

df = pd.concat((df, self.df_exog), axis=1).loc[df.index].sort_index(axis=1, level=(0, 1))
df.sort_index(axis=1, level=(0, 1), inplace=True)

_check_features_in_segments(columns=df.columns)

Expand Down Expand Up @@ -964,22 +1002,23 @@ def to_dataset(df: pd.DataFrame) -> pd.DataFrame:
2021-01-04 3 8
2021-01-05 4 9
"""
df_copy = df.copy(deep=True)
df = df.set_index(["timestamp", "segment"])

if not pd.api.types.is_integer_dtype(df_copy["timestamp"]):
df_copy["timestamp"] = pd.to_datetime(df_copy["timestamp"])
df = df.unstack(level=-1)
if not pd.api.types.is_integer_dtype(df.index):
df.index = pd.to_datetime(df.index)

df_copy["segment"] = df_copy["segment"].astype(str)
if not pd.api.types.is_string_dtype(df.columns.levels[1]):
df.columns = df.columns.set_levels(df.columns.levels[1].astype(str), level=1)

feature_columns = df_copy.columns.tolist()
feature_columns.remove("timestamp")
feature_columns.remove("segment")
df.columns = df.columns.reorder_levels([1, 0])
df.columns.names = ["segment", "feature"]
df.sort_index(axis=1, level=(0, 1), inplace=True)

df_copy = df_copy.pivot(index="timestamp", columns="segment")
df_copy = df_copy.reorder_levels([1, 0], axis=1)
df_copy.columns.names = ["segment", "feature"]
df_copy = df_copy.sort_index(axis=1, level=(0, 1))
return df_copy
if df._is_view or df._is_copy is None:
df = df.copy(deep=None)

return df

@staticmethod
def _hierarchical_structure_from_level_columns(
Expand Down
20 changes: 13 additions & 7 deletions tests/test_datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,19 +1213,20 @@ def test_check_regressors_error(exog_starts_later: bool, exog_ends_earlier: bool
df_regressors = TSDataset.to_dataset(df_regressors)

with pytest.raises(ValueError):
TSDataset._check_regressors(df=df, df_regressors=df_regressors)
_ = TSDataset(df=df, df_exog=df_regressors, known_future="all", freq="D")


def test_check_regressors_pass(df_and_regressors):
"""Check that regressors check on creation passes with correct regressors."""
df, df_exog, _ = df_and_regressors
_ = TSDataset._check_regressors(df=df, df_regressors=df_exog)
_ = TSDataset(df=df, df_exog=df_exog, known_future="all", freq="D")


def test_check_regressors_pass_empty(df_and_regressors):
"""Check that regressors check on creation passes with no regressors."""
df, _, _ = df_and_regressors
_ = TSDataset._check_regressors(df=df, df_regressors=pd.DataFrame())
df_exog = pd.DataFrame(columns=["timestamp", "segment", "exog"])
_ = TSDataset(df=df, df_exog=df_exog, known_future="all", freq="D")


def test_getitem_only_date(tsdf_with_exog):
Expand Down Expand Up @@ -1968,15 +1969,17 @@ def test_create_from_misaligned_with_exog(
expected_raw_df = TSDataset.to_dataset(apply_alignment(df=df, alignment=alignment))
pd.testing.assert_frame_equal(ts.raw_df, expected_raw_df)

expected_df_exog = TSDataset.to_dataset(apply_alignment(df=df_exog, alignment=alignment))
timestamp_df = make_timestamp_df_from_alignment(
alignment=alignment,
start=expected_raw_df.index[0],
periods=len(expected_raw_df) + future_steps,
freq=freq,
timestamp_name=original_timestamp_name,
)
expected_df_exog = expected_df_exog.join(TSDataset.to_dataset(timestamp_df), how="outer")
expected_df_exog = apply_alignment(df=df_exog, alignment=alignment)
expected_df_exog = pd.merge(expected_df_exog, timestamp_df, how="outer", on=["timestamp", "segment"])
expected_df_exog = TSDataset.to_dataset(expected_df_exog)

pd.testing.assert_frame_equal(ts.df_exog, expected_df_exog)

expected_known_future = sorted(set(known_future).union([original_timestamp_name]))
Expand Down Expand Up @@ -2016,15 +2019,18 @@ def test_create_from_misaligned_with_exog_all(
expected_raw_df = TSDataset.to_dataset(apply_alignment(df=df, alignment=alignment))
pd.testing.assert_frame_equal(ts.raw_df, expected_raw_df)

expected_df_exog = TSDataset.to_dataset(apply_alignment(df=df_exog, alignment=alignment))
timestamp_df = make_timestamp_df_from_alignment(
alignment=alignment,
start=expected_raw_df.index[0],
periods=len(expected_raw_df) + future_steps,
freq=freq,
timestamp_name=original_timestamp_name,
)
expected_df_exog = expected_df_exog.join(TSDataset.to_dataset(timestamp_df), how="outer")

expected_df_exog = apply_alignment(df=df_exog, alignment=alignment)
expected_df_exog = pd.merge(expected_df_exog, timestamp_df, how="outer", on=["timestamp", "segment"])

expected_df_exog = TSDataset.to_dataset(expected_df_exog)
pd.testing.assert_frame_equal(ts.df_exog, expected_df_exog)

assert ts.known_future == expected_known_future
Expand Down
6 changes: 3 additions & 3 deletions tests/test_transforms/test_math/test_exog_shift_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def test_transformed_names(ts_name, lag, horizon, expected, request):
@pytest.mark.parametrize(
"lag,horizon,expected_types",
(
(1, None, {"feat1_shift_1": "float", "feat2_shift_1": "float", "feat3_shift_1": "float", "target": "float"}),
("auto", 1, {"feat1_shift_1": "float", "feat2_shift_2": "float", "feat3": "int", "target": "float"}),
("auto", 2, {"feat1_shift_2": "float", "feat2_shift_3": "float", "feat3_shift_1": "float", "target": "float"}),
(1, None, {"feat1_shift_1": "float", "feat2_shift_1": "float", "feat3_shift_1": "float", "target": "int"}),
("auto", 1, {"feat1_shift_1": "float", "feat2_shift_2": "float", "feat3": "int", "target": "int"}),
("auto", 2, {"feat1_shift_2": "float", "feat2_shift_3": "float", "feat3_shift_1": "float", "target": "int"}),
),
)
@pytest.mark.parametrize(
Expand Down

0 comments on commit 797cfe3

Please sign in to comment.