Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added M5 example notebooks #55

Merged
merged 3 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ dmypy.json

# Lightning Logs
examples/lightning_logs
examples/m5-examples/lightning_logs
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/chronos-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
2 changes: 2 additions & 0 deletions examples/foundation-model-examples/data_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def create_m4_daily():

def transform_group_daily(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand Down
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/moirai-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/moment-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
17 changes: 13 additions & 4 deletions examples/foundation_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -63,6 +63,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -71,7 +73,6 @@ def transform_group(df):
res_df["y"] = df.y.values
return res_df


# COMMAND ----------

# MAGIC %md
Expand All @@ -82,6 +83,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -133,7 +136,7 @@ def transform_group(df):
# MAGIC
# MAGIC If you are interested in how MMF achieves distributed inference on these foundation models using Pandas UDF, have a look at the model pipeline scripts: e.g. [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py).
# MAGIC
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function.
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` and `resample=False`,or provide a complete dataset without missing entries to avoid issues with skipped dates.

# COMMAND ----------

Expand All @@ -153,7 +156,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down Expand Up @@ -189,3 +194,7 @@ def transform_group(df):
# COMMAND ----------

display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output"))

# COMMAND ----------


2 changes: 1 addition & 1 deletion examples/foundation_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down
8 changes: 6 additions & 2 deletions examples/global_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -64,6 +64,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -83,6 +85,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -137,7 +141,7 @@ def transform_group(df):
# MAGIC
# MAGIC If you are interested in how MMF achieves distributed training and inference, have a look at the two methods `evaluate_global_model` and `evaluate_global_model` defined in the source code [`Forecaster.py`](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/Forecaster.py).
# MAGIC
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function.
# MAGIC A small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` or provide a complete dataset without missing entries to avoid issues with skipped dates.

# COMMAND ----------

Expand Down
2 changes: 2 additions & 0 deletions examples/global_external_regressors_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)
volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred

# COMMAND ----------

# Make sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down
8 changes: 7 additions & 1 deletion examples/global_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down Expand Up @@ -87,6 +87,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -178,3 +180,7 @@ def transform_group(df):
# COMMAND ----------

display(spark.sql(f"delete from {catalog}.{db}.monthly_scoring_output"))

# COMMAND ----------


27 changes: 21 additions & 6 deletions examples/local_univariate_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -69,6 +69,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -77,7 +79,6 @@ def transform_group(df):
res_df["y"] = df.y.values
return res_df


# COMMAND ----------

# MAGIC %md
Expand All @@ -88,6 +89,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -110,6 +113,16 @@ def transform_group(df):

# COMMAND ----------

# MAGIC %md
# MAGIC If the number of time series is larger than the number of total cores, we set `spark.sql.shuffle.partitions` to the number of cores (can also be a multiple) so that we don't under-utilize the resource.

# COMMAND ----------

if n > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the detailed description of each model.
# MAGIC
Expand Down Expand Up @@ -139,7 +152,7 @@ def transform_group(df):
"RDynamicHarmonicRegression",
"SKTimeTBats",
"SKTimeLgbmDsDt",
]
]

# COMMAND ----------

Expand Down Expand Up @@ -168,8 +181,8 @@ def transform_group(df):
prediction_length=10,
backtest_months=1,
stride=10,
train_predict_ratio=2,
data_quality_check=True,
train_predict_ratio=1,
data_quality_check=False,
resample=False,
active_models=active_models,
experiment_path=f"/Shared/mmf_experiment",
Expand All @@ -183,7 +196,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
17 changes: 13 additions & 4 deletions examples/local_univariate_external_regressors_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)
volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred

# COMMAND ----------

# Make sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -55,7 +57,7 @@

# Number of time series to sample
sample = True
size = 100
size = 1000
stores = sorted(random.sample(range(0, 1000), size))

train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True)
Expand Down Expand Up @@ -91,6 +93,11 @@

# COMMAND ----------

if sample and size > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model.
# MAGIC
Expand Down Expand Up @@ -144,9 +151,9 @@
prediction_length=10,
backtest_months=1,
stride=10,
train_predict_ratio=2,
train_predict_ratio=1,
active_models=active_models,
data_quality_check=True,
data_quality_check=False,
resample=False,
experiment_path=f"/Shared/mmf_rossmann",
use_case_name="rossmann_daily",
Expand All @@ -159,7 +166,9 @@

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
21 changes: 16 additions & 5 deletions examples/local_univariate_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down Expand Up @@ -94,6 +94,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -110,7 +112,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id"))
display(
spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id")
)

# COMMAND ----------

Expand All @@ -120,6 +124,11 @@ def transform_group(df):

# COMMAND ----------

if n > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model.

Expand Down Expand Up @@ -170,8 +179,8 @@ def transform_group(df):
prediction_length=3,
backtest_months=12,
stride=1,
train_predict_ratio=2,
data_quality_check=True,
train_predict_ratio=1,
data_quality_check=False,
resample=False,
active_models=active_models,
experiment_path=f"/Shared/mmf_experiment_monthly",
Expand All @@ -185,7 +194,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
Loading
Loading