From 535af520cd406589096ee4bff6a717cad63b696b Mon Sep 17 00:00:00 2001 From: ouyangwenyu Date: Mon, 25 Mar 2024 20:15:53 +0800 Subject: [PATCH] update cross_valid_data and split_train_test funcs --- hydromodel/datasets/data_preprocess.py | 195 +++++++++++-------------- test/test_data_preprocess.py | 55 ++++++- 2 files changed, 141 insertions(+), 109 deletions(-) diff --git a/hydromodel/datasets/data_preprocess.py b/hydromodel/datasets/data_preprocess.py index 850a20d..89ef33a 100644 --- a/hydromodel/datasets/data_preprocess.py +++ b/hydromodel/datasets/data_preprocess.py @@ -1,7 +1,7 @@ """ Author: Wenyu Ouyang Date: 2022-10-25 21:16:22 -LastEditTime: 2024-03-25 17:19:38 +LastEditTime: 2024-03-25 19:54:15 LastEditors: Wenyu Ouyang Description: preprocess data for models in hydro-model-xaj FilePath: \hydro-model-xaj\hydromodel\datasets\data_preprocess.py @@ -278,16 +278,14 @@ def process_and_save_data_as_nc( return True -def split_train_test(json_file, npy_file, train_period, test_period): +def split_train_test(ts_file, train_period, test_period): """ Split all data to train and test parts with same format Parameters ---------- - json_file - dict file of all data - npy_file - numpy file of all data + ts_file + nc file of all time series data train_period training period test_period @@ -297,116 +295,97 @@ def split_train_test(json_file, npy_file, train_period, test_period): ------- None """ - data = hydro_file.unserialize_numpy(npy_file) - data_info = hydro_file.unserialize_json(json_file) - date_lst = pd.to_datetime(data_info["time"]).values.astype("datetime64[D]") - t_range_train = hydro_time.t_range_days(train_period) - t_range_test = hydro_time.t_range_days(test_period) - _, ind1, ind2 = np.intersect1d(date_lst, t_range_train, return_indices=True) - _, ind3, ind4 = np.intersect1d(date_lst, t_range_test, return_indices=True) - data_info_train = OrderedDict( - { - "time": [str(t)[:10] for t in hydro_time.t_range_days(train_period)], - # TODO: for time, more detailed time is needed, so we need to change the format of time - # "time": [str(t)[:16] for t in hydro_time.t_range_days(train_period)], - "basin": data_info["basin"], - "variable": data_info["variable"], - "area": data_info["area"], - } + ts_data = xr.open_dataset(ts_file) + # Convert date strings to pandas datetime objects + train_start, train_end = pd.to_datetime(train_period[0]), pd.to_datetime( + train_period[1] ) - data_info_test = OrderedDict( - { - "time": [str(t)[:10] for t in hydro_time.t_range_days(test_period)], - # TODO: for time, more detailed time is needed, so we need to change the format of time - # "time": [str(t)[:16] for t in hydro_time.t_range_days(test_period)], - "basin": data_info["basin"], - "variable": data_info["variable"], - "area": data_info["area"], - } + test_start, test_end = pd.to_datetime(test_period[0]), pd.to_datetime( + test_period[1] ) - # unify it with cross validation case, so we add a 'fold0' - train_json_file = json_file.parent.joinpath(json_file.stem + "_fold0_train.json") - train_npy_file = json_file.parent.joinpath(npy_file.stem + "_fold0_train.npy") - hydro_file.serialize_json(data_info_train, train_json_file) - hydro_file.serialize_numpy(data[ind1, :, :], train_npy_file) - test_json_file = json_file.parent.joinpath(json_file.stem + "_fold0_test.json") - test_npy_file = json_file.parent.joinpath(npy_file.stem + "_fold0_test.npy") - hydro_file.serialize_json(data_info_test, test_json_file) - hydro_file.serialize_numpy(data[ind3, :, :], test_npy_file) - - -def cross_valid_data(json_file, npy_file, period, warmup, cv_fold, time_unit="h"): + + # Select data for training and testing periods + train_data = ts_data.sel(time=slice(train_start, train_end)) + test_data = ts_data.sel(time=slice(test_start, test_end)) + + return train_data, test_data + + +def validate_freq(freq): """ - Split all data to train and test parts with same format + Validate if the freq string is a valid pandas frequency. Parameters ---------- - json_file - dict file of all data - npy_file - numpy file of all data - period - the whole period - warmup - warmup period length - cv_fold - number of folds + freq : str + Frequency string to validate. Returns ------- - None + bool + True if the freq string is valid, False otherwise. """ - data = hydro_file.unserialize_numpy(npy_file) - data_info = hydro_file.unserialize_json(json_file) - date_lst = pd.to_datetime(data_info["time"]).values.astype("datetime64[D]") - date_wo_warmup = date_lst[warmup:] - kf = KFold(n_splits=cv_fold, shuffle=False) - for i, (train, test) in enumerate(kf.split(date_wo_warmup)): - train_period = date_wo_warmup[train] - test_period = date_wo_warmup[test] - train_period_warmup = np.arange( - train_period[0] - np.timedelta64(warmup, time_unit), train_period[0] - ) - test_period_warmup = np.arange( - test_period[0] - np.timedelta64(warmup, time_unit), test_period[0] - ) - t_range_train = np.concatenate((train_period_warmup, train_period)) - t_range_test = np.concatenate((test_period_warmup, test_period)) - _, ind1, ind2 = np.intersect1d(date_lst, t_range_train, return_indices=True) - _, ind3, ind4 = np.intersect1d(date_lst, t_range_test, return_indices=True) - data_info_train = OrderedDict( - { - "time": [ - np.datetime_as_string(d, unit=time_unit) for d in t_range_train - ], - "basin": data_info["basin"], - "variable": data_info["variable"], - "area": data_info["area"], - } - ) - data_info_test = OrderedDict( - { - "time": [ - np.datetime_as_string(d, unit=time_unit) for d in t_range_test - ], - "basin": data_info["basin"], - "variable": data_info["variable"], - "area": data_info["area"], - } - ) - train_json_file = json_file.parent.joinpath( - json_file.stem + "_fold" + str(i) + "_train.json" - ) - train_npy_file = json_file.parent.joinpath( - npy_file.stem + "_fold" + str(i) + "_train.npy" - ) - hydro_file.serialize_json(data_info_train, train_json_file) - hydro_file.serialize_numpy(data[ind1, :, :], train_npy_file) - test_json_file = json_file.parent.joinpath( - json_file.stem + "_fold" + str(i) + "_test.json" - ) - test_npy_file = json_file.parent.joinpath( - npy_file.stem + "_fold" + str(i) + "_test.npy" + try: + pd.to_timedelta("1" + freq) + return True + except ValueError: + return False + + +def cross_valid_data(ts_file, period, warmup, cv_fold, freq="1D"): + """ + Split all data to train and test parts with same format for cross validation. + + Parameters + ---------- + ts_file : str + Path to the NetCDF file of time series data. + period : tuple of str + The whole period in the format ("start_date", "end_date"). + warmup : int + Warmup period length in days. + cv_fold : int + Number of folds for cross-validation. + freq : str + len of one period. + + Returns + ------- + list of tuples + Each tuple contains training and testing datasets for a fold. + """ + if not validate_freq(freq): + raise ValueError( + "Time unit must be number with either 'Y','M','W','D','h','m' or 's', such as 3D." ) - hydro_file.serialize_json(data_info_test, test_json_file) - hydro_file.serialize_numpy(data[ind3, :, :], test_npy_file) + ts_data = xr.open_dataset(ts_file) + + # Convert the whole period to pandas datetime + start_date, end_date = pd.to_datetime(period[0]), pd.to_datetime(period[1]) + date_lst = pd.date_range(start=start_date, end=end_date, freq=freq) + date_rm_warmup = date_lst[warmup:] + + # Initialize lists to store train and test datasets for each fold + train_test_data = [] + + # KFold split + kf = KFold(n_splits=cv_fold, shuffle=False) + for train_index, test_index in kf.split(date_rm_warmup): + train_period = date_rm_warmup[train_index] + test_period = date_rm_warmup[test_index] + # Create warmup periods using the specified frequency + train_period_warmup = pd.date_range( + end=train_period[0], periods=warmup + 1, freq=freq + )[:-1] + test_period_warmup = pd.date_range( + end=test_period[0], periods=warmup + 1, freq=freq + )[:-1] + + # Select data from ts_data based on train and test periods + train_data = ts_data.sel(time=train_period.union(train_period_warmup)) + test_data = ts_data.sel(time=test_period.union(test_period_warmup)) + + # Add the datasets to the list + train_test_data.append((train_data, test_data)) + + return train_test_data diff --git a/test/test_data_preprocess.py b/test/test_data_preprocess.py index bfcf22d..78b5ea9 100644 --- a/test/test_data_preprocess.py +++ b/test/test_data_preprocess.py @@ -1,15 +1,21 @@ from hydrodataset import Camels +import numpy as np import pytest import os import pandas as pd import xarray as xr +from sklearn.model_selection import KFold from hydromodel import SETTING from hydromodel.datasets import * -from hydromodel.datasets.data_preprocess import process_and_save_data_as_nc +from hydromodel.datasets.data_preprocess import ( + process_and_save_data_as_nc, + split_train_test, +) from hydromodel.datasets.data_preprocess import check_tsdata_format from hydromodel.datasets.data_preprocess import check_basin_attr_format from hydromodel.datasets.data_preprocess import check_folder_contents +from hydromodel.datasets.data_preprocess import cross_valid_data @pytest.fixture() @@ -253,3 +259,50 @@ def test_load_dataset(): ["01013500"], ["2010-01-01", "2014-01-01"], ["streamflow"] ) print(data) + + +def create_temp_netCDF(tmp_path, periods=10): + """temp NetCDF file for test""" + ts_file = tmp_path / "time_series.nc" + basins = ["basin1", "basin2", "basin3"] + data = xr.Dataset( + { + "flow": (("time", "basin"), np.random.rand(periods, 3)), + "prcp": (("time", "basin"), np.random.rand(periods, 3)), + }, + coords={ + "time": pd.date_range(start="2022-01-01", periods=periods), + "basin": basins, + }, + ) + data.to_netcdf(ts_file) + return str(ts_file) + + +@pytest.fixture +def ts_file_fixture(tmp_path): + return create_temp_netCDF(tmp_path) + + +def test_cross_valid_data(ts_file_fixture): + period = ("2022-01-01", "2022-01-10") + warmup = 3 + cv_fold = 3 + + train_test_data = cross_valid_data(ts_file_fixture, period, warmup, cv_fold) + + assert len(train_test_data) == cv_fold + + +def test_split_train_test(ts_file_fixture): + # Define the train and test periods + train_period = ("2022-01-01", "2022-01-05") + test_period = ("2022-01-06", "2022-01-10") + + # Call the function to split the data + train_data, test_data = split_train_test(ts_file_fixture, train_period, test_period) + + # Assert that the train and test data have the correct length and shape + basins = ["basin1", "basin2", "basin3"] + assert len(train_data.time) == 5 and train_data.flow.shape == (5, len(basins)) + assert len(test_data.time) == 5 and test_data.flow.shape == (5, len(basins))