Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory usage for TSDataset._merge_exog #596

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking:** Bump minimum `scipy` version to 1.12 ([#599](https://github.com/etna-team/etna/pull/599))
- **Breaking:** Bump minimum `optuna` version to 4.0 ([#599](https://github.com/etna-team/etna/pull/599))
- **Breaking:** Bump minimum `statsforecast` version to 2.0 ([#599](https://github.com/etna-team/etna/pull/599))
-
- Optimize performance of exogenous variables addition to the dataset ([#596](https://github.com/etna-team/etna/pull/596))
-

### Fixed
Expand Down
121 changes: 80 additions & 41 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union

Expand Down Expand Up @@ -141,15 +142,15 @@ def __init__(
"""
self.freq = freq
self.df_exog = None
self.raw_df = self._prepare_df(df=df.copy(deep=True), freq=freq)
self.df = self.raw_df.copy(deep=True)
self.raw_df = self._prepare_df(df=df, freq=freq)
self.df = self.raw_df.copy(deep=None)

self.hierarchical_structure = hierarchical_structure
self.current_df_level: Optional[str] = self._get_dataframe_level(df=self.df)
self.current_df_exog_level: Optional[str] = None

if df_exog is not None:
self.df_exog = self._prepare_df_exog(df_exog=df_exog.copy(deep=True), freq=freq)
self.df_exog = self._prepare_df_exog(df_exog=df_exog, freq=freq)

self.known_future = self._check_known_future(known_future, self.df_exog)
self._regressors = copy(self.known_future)
Expand Down Expand Up @@ -244,15 +245,15 @@ def create_from_misaligned(
freq=freq,
timestamp_name=original_timestamp_name,
)
timestamp_df = TSDataset.to_dataset(timestamp_df)

if df_exog is not None:
df_exog_realigned = apply_alignment(df=df_exog, alignment=alignment)

df_exog_realigned = pd.merge(df_exog_realigned, timestamp_df, how="outer", on=["timestamp", "segment"])
df_exog_realigned = TSDataset.to_dataset(df_exog_realigned)

df_exog_realigned = df_exog_realigned.join(timestamp_df, how="outer")
else:
df_exog_realigned = timestamp_df
df_exog_realigned = TSDataset.to_dataset(timestamp_df)

known_future_realigned: Union[Literal["all"], Sequence]
if known_future != "all":
Expand Down Expand Up @@ -328,6 +329,9 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:
if df_format is DataFrameFormat.long:
df = cls.to_dataset(df)

else:
df = df.copy(deep=True)

# cast segment to str type
cls._cast_segment_to_str(df)

Expand All @@ -338,7 +342,7 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:

new_index = np.arange(df.index.min(), df.index.max() + 1)
index_name = df.index.name
df = df.reindex(new_index)
df = df.reindex(new_index, copy=False)
df.index.name = index_name

else:
Expand All @@ -354,7 +358,9 @@ def _prepare_df(cls, df: pd.DataFrame, freq: Optional[str]) -> pd.DataFrame:
f"You probably set wrong freq. Discovered freq in you data is {inferred_freq}, you set {freq}"
)

df = df.asfreq(freq)
new_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq=freq)
new_index.name = df.index.name # type: ignore
df = df.reindex(new_index, copy=False)

return df

Expand All @@ -364,6 +370,9 @@ def _prepare_df_exog(cls, df_exog: pd.DataFrame, freq: Optional[str]) -> pd.Data
if df_format is DataFrameFormat.long:
df_exog = cls.to_dataset(df_exog)

else:
df_exog = df_exog.copy(deep=True)

df_exog = cls._cast_segment_to_str(df=df_exog)
if freq is not None:
cls._cast_index_to_datetime(df_exog, freq)
Expand Down Expand Up @@ -548,44 +557,73 @@ def _check_known_future(
return sorted(known_future_unique)

@staticmethod
def _check_regressors(df: pd.DataFrame, df_regressors: pd.DataFrame):
def _get_min_max_valid_timestamp(
df: pd.DataFrame, segments: Set[str], regressors: Optional[Sequence[str]] = None
) -> Tuple[Sequence[pd.Timestamp], Sequence[pd.Timestamp]]:
"""Estimate first and last valid indices for the dataframe."""
# shape: (num_samples, num_segments, num_features)
df_values = df.values.reshape((len(df), len(segments), -1))

if regressors is not None:
# expected equal features for all segments and sorted column index
features = df.columns.get_level_values("feature")
segment_features = features[: len(features) // len(segments)]
regressors_mask = segment_features.isin(set(regressors))
# shape: (num_samples, num_segments, num_regressors)
df_values = df_values[..., regressors_mask]

# shape: (num_samples, num_segments)
df_mask = ~np.any(pd.isna(df_values), axis=-1)

# shape: (num_segments,)
min_ids = np.argmax(df_mask, axis=0)
max_ids = len(df_mask) - np.argmax(df_mask[::-1], axis=0) - 1

min_index = df.index.values[min_ids]
max_index = df.index.values[max_ids]

none_segments = ~np.any(df_mask, axis=0)
min_index[none_segments] = np.datetime64("NaT")
max_index[none_segments] = np.datetime64("NaT")

return min_index, max_index

def _check_regressors(self, df: pd.DataFrame):
"""Check that regressors begin not later than in ``df`` and end later than in ``df``."""
if df_regressors.shape[1] == 0:
if len(self.known_future) == 0:
return
# TODO: check performance
df_segments = df.columns.get_level_values("segment")
for segment in df_segments:
target_min = df[segment]["target"].first_valid_index()
target_min = pd.NaT if target_min is None else target_min
target_max = df[segment]["target"].last_valid_index()
target_max = pd.NaT if target_max is None else target_max

exog_series_min = df_regressors[segment].first_valid_index()
exog_series_min = pd.NaT if exog_series_min is None else exog_series_min
exog_series_max = df_regressors[segment].last_valid_index()
exog_series_max = pd.NaT if exog_series_max is None else exog_series_max
if target_min < exog_series_min:

segments = set(df.columns.get_level_values("segment"))

target_min, target_max = self._get_min_max_valid_timestamp(df=df, segments=segments)
exog_series_min, exog_series_max = self._get_min_max_valid_timestamp(
df=self.df_exog, segments=segments, regressors=self.known_future
)

for i, segment in enumerate(segments):
if target_min[i] < exog_series_min[i]:
raise ValueError(
f"All the regressor series should start not later than corresponding 'target'."
f"Series of segment {segment} have not enough history: "
f"{target_min} < {exog_series_min}."
f"{target_min[i]} < {exog_series_min[i]}."
)
if target_max >= exog_series_max:
if target_max[i] >= exog_series_max[i]:
raise ValueError(
f"All the regressor series should finish later than corresponding 'target'."
f"Series of segment {segment} have not enough history: "
f"{target_max} >= {exog_series_max}."
f"{target_max[i]} >= {exog_series_max[i]}."
)

def _merge_exog(self, df: pd.DataFrame) -> pd.DataFrame:
if self.df_exog is None:
raise ValueError("Something went wrong, Trying to merge df_exog which is None!")

# TODO: this check could probably be skipped at make_future
df_regressors = self.df_exog.loc[:, pd.IndexSlice[:, self.known_future]]
self._check_regressors(df=df, df_regressors=df_regressors)
self._check_regressors(df=df)

df = df.merge(self.df_exog, how="left", left_index=True, right_index=True)

df = pd.concat((df, self.df_exog), axis=1).loc[df.index].sort_index(axis=1, level=(0, 1))
df.sort_index(axis=1, level=(0, 1), inplace=True)

_check_features_in_segments(columns=df.columns)

Expand Down Expand Up @@ -964,22 +1002,23 @@ def to_dataset(df: pd.DataFrame) -> pd.DataFrame:
2021-01-04 3 8
2021-01-05 4 9
"""
df_copy = df.copy(deep=True)
df = df.set_index(["timestamp", "segment"])

if not pd.api.types.is_integer_dtype(df_copy["timestamp"]):
df_copy["timestamp"] = pd.to_datetime(df_copy["timestamp"])
df = df.unstack(level=-1)
if not pd.api.types.is_integer_dtype(df.index):
df.index = pd.to_datetime(df.index)

df_copy["segment"] = df_copy["segment"].astype(str)
if not pd.api.types.is_string_dtype(df.columns.levels[1]):
df.columns = df.columns.set_levels(df.columns.levels[1].astype(str), level=1)

feature_columns = df_copy.columns.tolist()
feature_columns.remove("timestamp")
feature_columns.remove("segment")
df.columns = df.columns.reorder_levels([1, 0])
df.columns.names = ["segment", "feature"]
df.sort_index(axis=1, level=(0, 1), inplace=True)

df_copy = df_copy.pivot(index="timestamp", columns="segment")
df_copy = df_copy.reorder_levels([1, 0], axis=1)
df_copy.columns.names = ["segment", "feature"]
df_copy = df_copy.sort_index(axis=1, level=(0, 1))
return df_copy
if df._is_view or df._is_copy is None:
df = df.copy(deep=None)

return df

@staticmethod
def _hierarchical_structure_from_level_columns(
Expand Down
20 changes: 13 additions & 7 deletions tests/test_datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,19 +1213,20 @@ def test_check_regressors_error(exog_starts_later: bool, exog_ends_earlier: bool
df_regressors = TSDataset.to_dataset(df_regressors)

with pytest.raises(ValueError):
TSDataset._check_regressors(df=df, df_regressors=df_regressors)
_ = TSDataset(df=df, df_exog=df_regressors, known_future="all", freq="D")


def test_check_regressors_pass(df_and_regressors):
"""Check that regressors check on creation passes with correct regressors."""
df, df_exog, _ = df_and_regressors
_ = TSDataset._check_regressors(df=df, df_regressors=df_exog)
_ = TSDataset(df=df, df_exog=df_exog, known_future="all", freq="D")


def test_check_regressors_pass_empty(df_and_regressors):
"""Check that regressors check on creation passes with no regressors."""
df, _, _ = df_and_regressors
_ = TSDataset._check_regressors(df=df, df_regressors=pd.DataFrame())
df_exog = pd.DataFrame(columns=["timestamp", "segment", "exog"])
_ = TSDataset(df=df, df_exog=df_exog, known_future="all", freq="D")


def test_getitem_only_date(tsdf_with_exog):
Expand Down Expand Up @@ -1968,15 +1969,17 @@ def test_create_from_misaligned_with_exog(
expected_raw_df = TSDataset.to_dataset(apply_alignment(df=df, alignment=alignment))
pd.testing.assert_frame_equal(ts.raw_df, expected_raw_df)

expected_df_exog = TSDataset.to_dataset(apply_alignment(df=df_exog, alignment=alignment))
timestamp_df = make_timestamp_df_from_alignment(
alignment=alignment,
start=expected_raw_df.index[0],
periods=len(expected_raw_df) + future_steps,
freq=freq,
timestamp_name=original_timestamp_name,
)
expected_df_exog = expected_df_exog.join(TSDataset.to_dataset(timestamp_df), how="outer")
expected_df_exog = apply_alignment(df=df_exog, alignment=alignment)
expected_df_exog = pd.merge(expected_df_exog, timestamp_df, how="outer", on=["timestamp", "segment"])
expected_df_exog = TSDataset.to_dataset(expected_df_exog)

pd.testing.assert_frame_equal(ts.df_exog, expected_df_exog)

expected_known_future = sorted(set(known_future).union([original_timestamp_name]))
Expand Down Expand Up @@ -2016,15 +2019,18 @@ def test_create_from_misaligned_with_exog_all(
expected_raw_df = TSDataset.to_dataset(apply_alignment(df=df, alignment=alignment))
pd.testing.assert_frame_equal(ts.raw_df, expected_raw_df)

expected_df_exog = TSDataset.to_dataset(apply_alignment(df=df_exog, alignment=alignment))
timestamp_df = make_timestamp_df_from_alignment(
alignment=alignment,
start=expected_raw_df.index[0],
periods=len(expected_raw_df) + future_steps,
freq=freq,
timestamp_name=original_timestamp_name,
)
expected_df_exog = expected_df_exog.join(TSDataset.to_dataset(timestamp_df), how="outer")

expected_df_exog = apply_alignment(df=df_exog, alignment=alignment)
expected_df_exog = pd.merge(expected_df_exog, timestamp_df, how="outer", on=["timestamp", "segment"])

expected_df_exog = TSDataset.to_dataset(expected_df_exog)
pd.testing.assert_frame_equal(ts.df_exog, expected_df_exog)

assert ts.known_future == expected_known_future
Expand Down
6 changes: 3 additions & 3 deletions tests/test_transforms/test_math/test_exog_shift_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def test_transformed_names(ts_name, lag, horizon, expected, request):
@pytest.mark.parametrize(
"lag,horizon,expected_types",
(
(1, None, {"feat1_shift_1": "float", "feat2_shift_1": "float", "feat3_shift_1": "float", "target": "float"}),
("auto", 1, {"feat1_shift_1": "float", "feat2_shift_2": "float", "feat3": "int", "target": "float"}),
("auto", 2, {"feat1_shift_2": "float", "feat2_shift_3": "float", "feat3_shift_1": "float", "target": "float"}),
(1, None, {"feat1_shift_1": "float", "feat2_shift_1": "float", "feat3_shift_1": "float", "target": "int"}),
("auto", 1, {"feat1_shift_1": "float", "feat2_shift_2": "float", "feat3": "int", "target": "int"}),
("auto", 2, {"feat1_shift_2": "float", "feat2_shift_3": "float", "feat3_shift_1": "float", "target": "int"}),
),
)
@pytest.mark.parametrize(
Expand Down