From 36d7b03c2bca42897a3d7cf0d244b32e019a3912 Mon Sep 17 00:00:00 2001 From: "ryuta.yoshimatsu@databricks.com" Date: Sun, 9 Jun 2024 10:58:32 +0000 Subject: [PATCH 1/3] Added M5 example notebooks --- .../chronos-example.py | 2 +- .../data_preparation.py | 2 + .../moirai-example.py | 2 +- .../moment-example.py | 2 +- examples/foundation_daily.py | 17 ++- examples/foundation_monthly.py | 2 +- examples/global_daily.py | 8 +- examples/global_external_regressors_daily.py | 2 + examples/global_monthly.py | 8 +- examples/local_univariate_daily.py | 27 ++++- ...al_univariate_external_regressors_daily.py | 17 ++- examples/local_univariate_monthly.py | 21 +++- examples/m5-examples/data_preparation_m5.py | 85 ++++++++++++++ examples/m5-examples/foundation_daily_m5.py | 63 +++++++++++ examples/m5-examples/global_daily_m5.py | 67 +++++++++++ .../m5-examples/local_univariate_daily_m5.py | 106 ++++++++++++++++++ examples/m5-examples/run_daily_m5.py | 51 +++++++++ examples/run_daily.py | 2 +- examples/run_external_regressors_daily.py | 2 +- examples/run_monthly.py | 2 +- mmf_sa/Forecaster.py | 3 +- mmf_sa/data_quality_checks.py | 21 ++-- .../models/chronosforecast/ChronosPipeline.py | 5 +- .../models/moiraiforecast/MoiraiPipeline.py | 2 +- .../models/momentforecast/MomentPipeline.py | 2 +- 25 files changed, 480 insertions(+), 41 deletions(-) create mode 100644 examples/m5-examples/data_preparation_m5.py create mode 100644 examples/m5-examples/foundation_daily_m5.py create mode 100644 examples/m5-examples/global_daily_m5.py create mode 100644 examples/m5-examples/local_univariate_daily_m5.py create mode 100644 examples/m5-examples/run_daily_m5.py diff --git a/examples/foundation-model-examples/chronos-example.py b/examples/foundation-model-examples/chronos-example.py index e570a16..1be2d0e 100644 --- a/examples/foundation-model-examples/chronos-example.py +++ b/examples/foundation-model-examples/chronos-example.py @@ -62,7 +62,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(prediction_length): timestamp = timestamp + one_ts_offset - horizon_timestamps.append(timestamp) + horizon_timestamps.append(timestamp.to_numpy()) barch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(barch_horizon_timestamps) diff --git a/examples/foundation-model-examples/data_preparation.py b/examples/foundation-model-examples/data_preparation.py index 0aa501a..c6a302a 100644 --- a/examples/foundation-model-examples/data_preparation.py +++ b/examples/foundation-model-examples/data_preparation.py @@ -51,6 +51,8 @@ def create_m4_daily(): def transform_group_daily(df): unique_id = df.unique_id.iloc[0] + if len(df) > 1020: + df = df.iloc[-1020:] _start = pd.Timestamp("2020-01-01") _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") diff --git a/examples/foundation-model-examples/moirai-example.py b/examples/foundation-model-examples/moirai-example.py index 007f8e7..aa1e450 100644 --- a/examples/foundation-model-examples/moirai-example.py +++ b/examples/foundation-model-examples/moirai-example.py @@ -64,7 +64,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(prediction_length): timestamp = timestamp + one_ts_offset - horizon_timestamps.append(timestamp) + horizon_timestamps.append(timestamp.to_numpy()) barch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(barch_horizon_timestamps) diff --git a/examples/foundation-model-examples/moment-example.py b/examples/foundation-model-examples/moment-example.py index e4b0d4f..d57d771 100644 --- a/examples/foundation-model-examples/moment-example.py +++ b/examples/foundation-model-examples/moment-example.py @@ -63,7 +63,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(prediction_length): timestamp = timestamp + one_ts_offset - horizon_timestamps.append(timestamp) + horizon_timestamps.append(timestamp.to_numpy()) barch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(barch_horizon_timestamps) diff --git a/examples/foundation_daily.py b/examples/foundation_daily.py index 1e05fcc..2502033 100644 --- a/examples/foundation_daily.py +++ b/examples/foundation_daily.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_daily(): @@ -63,6 +63,8 @@ def create_m4_daily(): def transform_group(df): unique_id = df.unique_id.iloc[0] + if len(df) > 1020: + df = df.iloc[-1020:] _start = pd.Timestamp("2020-01-01") _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") @@ -71,7 +73,6 @@ def transform_group(df): res_df["y"] = df.y.values return res_df - # COMMAND ---------- # MAGIC %md @@ -82,6 +83,8 @@ def transform_group(df): catalog = "solacc_uc" # Name of the catalog we use to manage our assets db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +# COMMAND ---------- + # Making sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -133,7 +136,7 @@ def transform_group(df): # MAGIC # MAGIC If you are interested in how MMF achieves distributed inference on these foundation models using Pandas UDF, have a look at the model pipeline scripts: e.g. [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py). # MAGIC -# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. +# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` and `resample=False`,or provide a complete dataset without missing entries to avoid issues with skipped dates. # COMMAND ---------- @@ -153,7 +156,9 @@ def transform_group(df): # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")) +display( + spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date") + ) # COMMAND ---------- @@ -189,3 +194,7 @@ def transform_group(df): # COMMAND ---------- display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) + +# COMMAND ---------- + + diff --git a/examples/foundation_monthly.py b/examples/foundation_monthly.py index c5f07ce..19ea93c 100644 --- a/examples/foundation_monthly.py +++ b/examples/foundation_monthly.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_monthly(): diff --git a/examples/global_daily.py b/examples/global_daily.py index 99c03f3..08842d0 100644 --- a/examples/global_daily.py +++ b/examples/global_daily.py @@ -46,7 +46,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_daily(): @@ -64,6 +64,8 @@ def create_m4_daily(): def transform_group(df): unique_id = df.unique_id.iloc[0] + if len(df) > 1020: + df = df.iloc[-1020:] _start = pd.Timestamp("2020-01-01") _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") @@ -83,6 +85,8 @@ def transform_group(df): catalog = "solacc_uc" # Name of the catalog we use to manage our assets db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +# COMMAND ---------- + # Making sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -137,7 +141,7 @@ def transform_group(df): # MAGIC # MAGIC If you are interested in how MMF achieves distributed training and inference, have a look at the two methods `evaluate_global_model` and `evaluate_global_model` defined in the source code [`Forecaster.py`](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/Forecaster.py). # MAGIC -# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. +# MAGIC A small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` or provide a complete dataset without missing entries to avoid issues with skipped dates. # COMMAND ---------- diff --git a/examples/global_external_regressors_daily.py b/examples/global_external_regressors_daily.py index 3487869..d4bf76a 100644 --- a/examples/global_external_regressors_daily.py +++ b/examples/global_external_regressors_daily.py @@ -50,6 +50,8 @@ db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred +# COMMAND ---------- + # Make sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") diff --git a/examples/global_monthly.py b/examples/global_monthly.py index bf18389..275c4a6 100644 --- a/examples/global_monthly.py +++ b/examples/global_monthly.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_monthly(): @@ -87,6 +87,8 @@ def transform_group(df): catalog = "solacc_uc" # Name of the catalog we use to manage our assets db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +# COMMAND ---------- + # Making sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -178,3 +180,7 @@ def transform_group(df): # COMMAND ---------- display(spark.sql(f"delete from {catalog}.{db}.monthly_scoring_output")) + +# COMMAND ---------- + + diff --git a/examples/local_univariate_daily.py b/examples/local_univariate_daily.py index 320cac1..c4c1688 100644 --- a/examples/local_univariate_daily.py +++ b/examples/local_univariate_daily.py @@ -51,7 +51,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_daily(): @@ -69,6 +69,8 @@ def create_m4_daily(): def transform_group(df): unique_id = df.unique_id.iloc[0] + if len(df) > 1020: + df = df.iloc[-1020:] _start = pd.Timestamp("2020-01-01") _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") @@ -77,7 +79,6 @@ def transform_group(df): res_df["y"] = df.y.values return res_df - # COMMAND ---------- # MAGIC %md @@ -88,6 +89,8 @@ def transform_group(df): catalog = "solacc_uc" # Name of the catalog we use to manage our assets db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +# COMMAND ---------- + # Making sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -110,6 +113,16 @@ def transform_group(df): # COMMAND ---------- +# MAGIC %md +# MAGIC If the number of time series is larger than the number of total cores, we set `spark.sql.shuffle.partitions` to the number of cores (can also be a multiple) so that we don't under-utilize the resource. + +# COMMAND ---------- + +if n > sc.defaultParallelism: + sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism) + +# COMMAND ---------- + # MAGIC %md ### Models # MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the detailed description of each model. # MAGIC @@ -139,7 +152,7 @@ def transform_group(df): "RDynamicHarmonicRegression", "SKTimeTBats", "SKTimeLgbmDsDt", -] + ] # COMMAND ---------- @@ -168,8 +181,8 @@ def transform_group(df): prediction_length=10, backtest_months=1, stride=10, - train_predict_ratio=2, - data_quality_check=True, + train_predict_ratio=1, + data_quality_check=False, resample=False, active_models=active_models, experiment_path=f"/Shared/mmf_experiment", @@ -183,7 +196,9 @@ def transform_group(df): # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")) +display( + spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date") + ) # COMMAND ---------- diff --git a/examples/local_univariate_external_regressors_daily.py b/examples/local_univariate_external_regressors_daily.py index e83a0da..a3907a3 100644 --- a/examples/local_univariate_external_regressors_daily.py +++ b/examples/local_univariate_external_regressors_daily.py @@ -42,6 +42,8 @@ db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred +# COMMAND ---------- + # Make sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -55,7 +57,7 @@ # Number of time series to sample sample = True -size = 100 +size = 1000 stores = sorted(random.sample(range(0, 1000), size)) train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True) @@ -91,6 +93,11 @@ # COMMAND ---------- +if sample and size > sc.defaultParallelism: + sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism) + +# COMMAND ---------- + # MAGIC %md ### Models # MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model. # MAGIC @@ -144,9 +151,9 @@ prediction_length=10, backtest_months=1, stride=10, - train_predict_ratio=2, + train_predict_ratio=1, active_models=active_models, - data_quality_check=True, + data_quality_check=False, resample=False, experiment_path=f"/Shared/mmf_rossmann", use_case_name="rossmann_daily", @@ -159,7 +166,9 @@ # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date")) +display( + spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date") + ) # COMMAND ---------- diff --git a/examples/local_univariate_monthly.py b/examples/local_univariate_monthly.py index 44053fe..dead4fb 100644 --- a/examples/local_univariate_monthly.py +++ b/examples/local_univariate_monthly.py @@ -52,7 +52,7 @@ # COMMAND ---------- # Number of time series -n = 100 +n = 1000 def create_m4_monthly(): @@ -94,6 +94,8 @@ def transform_group(df): catalog = "solacc_uc" # Name of the catalog we use to manage our assets db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +# COMMAND ---------- + # Making sure that the catalog and the schema exist _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") @@ -110,7 +112,9 @@ def transform_group(df): # COMMAND ---------- -display(spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id")) +display( + spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id") + ) # COMMAND ---------- @@ -120,6 +124,11 @@ def transform_group(df): # COMMAND ---------- +if n > sc.defaultParallelism: + sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism) + +# COMMAND ---------- + # MAGIC %md ### Models # MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model. @@ -170,8 +179,8 @@ def transform_group(df): prediction_length=3, backtest_months=12, stride=1, - train_predict_ratio=2, - data_quality_check=True, + train_predict_ratio=1, + data_quality_check=False, resample=False, active_models=active_models, experiment_path=f"/Shared/mmf_experiment_monthly", @@ -185,7 +194,9 @@ def transform_group(df): # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date")) +display( + spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date") + ) # COMMAND ---------- diff --git a/examples/m5-examples/data_preparation_m5.py b/examples/m5-examples/data_preparation_m5.py new file mode 100644 index 0000000..50509f9 --- /dev/null +++ b/examples/m5-examples/data_preparation_m5.py @@ -0,0 +1,85 @@ +# Databricks notebook source +# MAGIC %pip install datasetsforecast==0.0.8 --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import pathlib +import pandas as pd +from datasetsforecast.m5 import M5 +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +catalog = "mmf" +db = "m5" + +# COMMAND ---------- + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +# COMMAND ---------- + +df_target, df_exogenous, static_features = M5.load(directory=str(pathlib.Path.home())) +daily_train = pd.merge(df_target, df_exogenous, on=['unique_id','ds'], how='inner') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Write out the entire dataset to a delta table + +# COMMAND ---------- + +( + spark.createDataFrame(daily_train) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.daily_train") +) +print(f"Saved data to {catalog}.{db}.daily_train_full") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Write out the sampled dataset to a delta table + +# COMMAND ---------- + +import random +random.seed(7) + +unique_ids = list(daily_train["unique_id"].unique()) +unique_id_1000 = sorted(random.sample(unique_ids, 1000)) +unique_id_10000 = sorted(random.sample(unique_ids, 10000)) + +daily_train_1000 = daily_train[daily_train["unique_id"].isin(unique_id_1000)] +daily_train_10000 = daily_train[daily_train["unique_id"].isin(unique_id_10000)] + +# COMMAND ---------- + +( + spark.createDataFrame(daily_train_1000) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.daily_train_1000") +) +print(f"Saved data to {catalog}.{db}.daily_train_1000") + +# COMMAND ---------- + +( + spark.createDataFrame(daily_train_10000) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.daily_train_10000") +) +print(f"Saved data to {catalog}.{db}.daily_train_10000") + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_train_1000")) + +# COMMAND ---------- + + diff --git a/examples/m5-examples/foundation_daily_m5.py b/examples/m5-examples/foundation_daily_m5.py new file mode 100644 index 0000000..c97c535 --- /dev/null +++ b/examples/m5-examples/foundation_daily_m5.py @@ -0,0 +1,63 @@ +# Databricks notebook source +# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import uuid +import pathlib +import pandas as pd +from mmf_sa import run_forecast + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) + +# COMMAND ---------- + +active_models = [ + "ChronosT5Tiny", + "ChronosT5Mini", + "ChronosT5Small", + "ChronosT5Base", + "ChronosT5Large", + "MoiraiSmall", + "MoiraiBase", + "MoiraiLarge", + "Moment1Large", +] + +# COMMAND ---------- + +# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later. +run_id = str(uuid.uuid4()) + +for model in active_models: + dbutils.notebook.run( + "run_daily", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")) + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) diff --git a/examples/m5-examples/global_daily_m5.py b/examples/m5-examples/global_daily_m5.py new file mode 100644 index 0000000..ab335e4 --- /dev/null +++ b/examples/m5-examples/global_daily_m5.py @@ -0,0 +1,67 @@ +# Databricks notebook source +# DBTITLE 1,Install the necessary libraries +# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import uuid +import pathlib +import pandas as pd +from mmf_sa import run_forecast + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) + +# COMMAND ---------- + +active_models = [ + "NeuralForecastRNN", + "NeuralForecastLSTM", + "NeuralForecastNBEATSx", + "NeuralForecastNHITS", + "NeuralForecastAutoRNN", + "NeuralForecastAutoLSTM", + "NeuralForecastAutoNBEATSx", + "NeuralForecastAutoNHITS", + "NeuralForecastAutoTiDE", + "NeuralForecastAutoPatchTST", +] + +# COMMAND ---------- + +# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later. +run_id = str(uuid.uuid4()) + +for model in active_models: + dbutils.notebook.run( + "run_daily", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) + +# COMMAND ---------- + +display( + spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date") + ) + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) diff --git a/examples/m5-examples/local_univariate_daily_m5.py b/examples/m5-examples/local_univariate_daily_m5.py new file mode 100644 index 0000000..b56b02c --- /dev/null +++ b/examples/m5-examples/local_univariate_daily_m5.py @@ -0,0 +1,106 @@ +# Databricks notebook source +# DBTITLE 1,Install the necessary libraries +# MAGIC %pip install -r ../../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import pathlib +import pandas as pd +from mmf_sa import run_forecast + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) +user_email = spark.sql('select current_user() as user').collect()[0]['user'] + +n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k +taining_table = f"daily_train_{n}" + +# COMMAND ---------- + +display( + spark.sql(f"select * from {catalog}.{db}.{taining_table} where unique_id in ('FOODS_1_001_WI_1', 'FOODS_1_004_TX_2', 'FOODS_1_006_WI_1', 'FOODS_1_008_CA_3', 'FOODS_1_012_WI_1') order by unique_id, ds") + ) + +# COMMAND ---------- + +active_models = [ + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", + "StatsForecastAutoArima", + "StatsForecastAutoETS", + "StatsForecastAutoCES", + "StatsForecastAutoTheta", + "StatsForecastTSB", + "StatsForecastADIDA", + "StatsForecastIMAPA", + "StatsForecastCrostonClassic", + "StatsForecastCrostonOptimized", + "StatsForecastCrostonSBA", + "RFableArima", + "RFableETS", + "RFableNNETAR", + "RFableEnsemble", + "RDynamicHarmonicRegression", + #"SKTimeTBats", + #"SKTimeLgbmDsDt", +] + +# COMMAND ---------- + +if n > sc.defaultParallelism: + sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism) + +# COMMAND ---------- + +run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.{taining_table}", + scoring_data=f"{catalog}.{db}.{taining_table}", + scoring_output=f"{catalog}.{db}.daily_scoring_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", + group_id="unique_id", + date_col="ds", + target="y", + freq="D", + prediction_length=28, + backtest_months=3, + stride=7, + train_predict_ratio=1, + data_quality_check=False, + resample=False, + active_models=active_models, + experiment_path=f"/Users/{user_email}/mmf/m5", + use_case_name="m5_daily", +) + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")) + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) + +# COMMAND ---------- + + diff --git a/examples/m5-examples/run_daily_m5.py b/examples/m5-examples/run_daily_m5.py new file mode 100644 index 0000000..969006d --- /dev/null +++ b/examples/m5-examples/run_daily_m5.py @@ -0,0 +1,51 @@ +# Databricks notebook source +# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +dbutils.widgets.text("catalog", "") +dbutils.widgets.text("db", "") +dbutils.widgets.text("model", "") +dbutils.widgets.text("run_id", "") + +catalog = dbutils.widgets.get("catalog") +db = dbutils.widgets.get("db") +model = dbutils.widgets.get("model") +run_id = dbutils.widgets.get("run_id") + +# COMMAND ---------- + +from mmf_sa import run_forecast +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k +taining_table = f"daily_train_{n}" +user_email = spark.sql('select current_user() as user').collect()[0]['user'] + +run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.{taining_table}", + scoring_data=f"{catalog}.{db}.{taining_table}", + scoring_output=f"{catalog}.{db}.daily_scoring_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", + model_output=f"{catalog}.{db}", + group_id="unique_id", + date_col="ds", + target="y", + freq="D", + prediction_length=28, + backtest_months=3, + stride=7, + train_predict_ratio=1, + data_quality_check=True, + resample=False, + active_models=[model], + experiment_path=f"/Users/{user_email}/mmf/m5", + use_case_name="m5_daily", + run_id=run_id, + accelerator="gpu", +) diff --git a/examples/run_daily.py b/examples/run_daily.py index 314b59a..5581ad7 100644 --- a/examples/run_daily.py +++ b/examples/run_daily.py @@ -37,7 +37,7 @@ prediction_length=10, backtest_months=1, stride=10, - train_predict_ratio=2, + train_predict_ratio=1, data_quality_check=True, resample=False, active_models=[model], diff --git a/examples/run_external_regressors_daily.py b/examples/run_external_regressors_daily.py index 844cbeb..ff4b3d6 100644 --- a/examples/run_external_regressors_daily.py +++ b/examples/run_external_regressors_daily.py @@ -38,7 +38,7 @@ prediction_length=10, backtest_months=1, stride=10, - train_predict_ratio=2, + train_predict_ratio=1, active_models=[model], data_quality_check=True, resample=False, diff --git a/examples/run_monthly.py b/examples/run_monthly.py index 73ae9d3..2119ab8 100644 --- a/examples/run_monthly.py +++ b/examples/run_monthly.py @@ -37,7 +37,7 @@ prediction_length=3, backtest_months=12, stride=1, - train_predict_ratio=2, + train_predict_ratio=1, data_quality_check=True, resample=False, active_models=[model], diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index b268f70..0dc8c75 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -27,7 +27,7 @@ ArrayType, IntegerType, ) -from pyspark.sql.functions import lit, avg, min, max, col, posexplode, collect_list, to_date +from pyspark.sql.functions import lit, avg, min, max, col, posexplode, collect_list, to_date, countDistinct from mmf_sa.models.abstract_model import ForecastingRegressor from mmf_sa.models import ModelRegistry from mmf_sa.data_quality_checks import DataQualityChecks @@ -214,6 +214,7 @@ def evaluate_local_model(self, model_conf): StructField("model_pickle", BinaryType()), ] ) + model = self.model_registry.get_model(model_conf["name"]) # Use Pandas UDF to forecast individual group diff --git a/mmf_sa/data_quality_checks.py b/mmf_sa/data_quality_checks.py index 1c73b56..a66e6f2 100644 --- a/mmf_sa/data_quality_checks.py +++ b/mmf_sa/data_quality_checks.py @@ -77,14 +77,19 @@ def _multiple_checks( # 1. Checking for nulls in external regressors static_features = conf.get("static_features", None) - dynamic_reals = conf.get("dynamic_reals", None) + dynamic_future = conf.get("dynamic_future", None) + dynamic_historical = conf.get("dynamic_historical", None) if static_features: if _df[static_features].isnull().values.any(): # Removing: null in static categoricals return pd.DataFrame() - if dynamic_reals: - if _df[dynamic_reals].isnull().values.any(): - # Removing: null in dynamic reals + if dynamic_future: + if _df[dynamic_future].isnull().values.any(): + # Removing: null in dynamic future + return pd.DataFrame() + if dynamic_historical: + if _df[dynamic_historical].isnull().values.any(): + # Removing: null in dynamic historical return pd.DataFrame() # 2. Checking for training period length @@ -102,6 +107,7 @@ def _multiple_checks( # 3. Checking for missing entries if max_date is None: max_date = _df[conf["date_col"]].max() + _resampled = _df.set_index(conf["date_col"]) date_idx = pd.date_range( start=_df[conf["date_col"]].min(), @@ -114,7 +120,8 @@ def _multiple_checks( .reset_index() .fillna(value=0) ) - if len(_df) != len(_resampled): + + if len(_resampled) > len(_df): if conf.get("resample"): if (len(_resampled) - len(_df)) / len(_resampled) > 0.2: # Removing: missing rate over 0.2 @@ -126,9 +133,9 @@ def _multiple_checks( return pd.DataFrame() # 4. Checking for negative entries - _positive = _resampled[_resampled[conf["target"]] > 0] + _positive = _resampled[_resampled[conf["target"]] >= 0] if (len(_resampled) - len(_positive)) / len(_resampled) > 0.2: - # Removing: zero or negative entries over 0.2 + # Removing: negative entries over 0.2 return pd.DataFrame() else: _df = _resampled diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 2777db1..832f3f2 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -52,6 +52,7 @@ def register(self, registered_model_name: str): def create_horizon_timestamps_udf(self): @pandas_udf('array') def horizon_timestamps_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + import numpy as np batch_horizon_timestamps = [] for batch in batch_iterator: for series in batch: @@ -59,8 +60,8 @@ def horizon_timestamps_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(self.params["prediction_length"]): last = last + self.one_ts_offset - horizon_timestamps.append(last) - batch_horizon_timestamps.append(np.array(horizon_timestamps)) + horizon_timestamps.append(last.to_numpy()) + batch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(batch_horizon_timestamps) return horizon_timestamps_udf diff --git a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py index 54b20b4..861444c 100644 --- a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py +++ b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py @@ -60,7 +60,7 @@ def horizon_timestamps_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(self.params["prediction_length"]): last = last + self.one_ts_offset - horizon_timestamps.append(last) + horizon_timestamps.append(last.to_numpy()) batch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(batch_horizon_timestamps) return horizon_timestamps_udf diff --git a/mmf_sa/models/momentforecast/MomentPipeline.py b/mmf_sa/models/momentforecast/MomentPipeline.py index 1768a4b..b72f975 100644 --- a/mmf_sa/models/momentforecast/MomentPipeline.py +++ b/mmf_sa/models/momentforecast/MomentPipeline.py @@ -58,7 +58,7 @@ def horizon_timestamps_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S horizon_timestamps = [] for i in range(self.params["prediction_length"]): last = last + self.one_ts_offset - horizon_timestamps.append(last) + horizon_timestamps.append(last.to_numpy()) batch_horizon_timestamps.append(np.array(horizon_timestamps)) yield pd.Series(batch_horizon_timestamps) return horizon_timestamps_udf From 980deed7f380ac1fc9f4cbc599e37fb57da151fc Mon Sep 17 00:00:00 2001 From: "ryuta.yoshimatsu@databricks.com" Date: Sun, 9 Jun 2024 11:22:07 +0000 Subject: [PATCH 2/3] Added M5 example notebooks --- examples/m5-examples/data_preparation_m5.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/m5-examples/data_preparation_m5.py b/examples/m5-examples/data_preparation_m5.py index 50509f9..34f8c8a 100644 --- a/examples/m5-examples/data_preparation_m5.py +++ b/examples/m5-examples/data_preparation_m5.py @@ -31,13 +31,14 @@ # MAGIC %md # MAGIC #### Write out the entire dataset to a delta table +# MAGIC Writing the full dataset takes about 45 minutes. # COMMAND ---------- ( spark.createDataFrame(daily_train) .write.format("delta").mode("overwrite") - .saveAsTable(f"{catalog}.{db}.daily_train") + .saveAsTable(f"{catalog}.{db}.daily_train_full") ) print(f"Saved data to {catalog}.{db}.daily_train_full") From 9f6eed357ce791d36d21f251f11feccec0670364 Mon Sep 17 00:00:00 2001 From: "ryuta.yoshimatsu@databricks.com" Date: Sun, 9 Jun 2024 14:49:37 +0000 Subject: [PATCH 3/3] Added M5 example notebooks --- .gitignore | 1 + examples/m5-examples/foundation_daily_m5.py | 25 +++++++++++----- examples/m5-examples/global_daily_m5.py | 29 ++++++++++++++----- .../m5-examples/local_univariate_daily_m5.py | 10 +++---- examples/m5-examples/run_daily_m5.py | 15 +++++----- 5 files changed, 54 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index c160071..5d5bf09 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,4 @@ dmypy.json # Lightning Logs examples/lightning_logs +examples/m5-examples/lightning_logs diff --git a/examples/m5-examples/foundation_daily_m5.py b/examples/m5-examples/foundation_daily_m5.py index c97c535..b081a44 100644 --- a/examples/m5-examples/foundation_daily_m5.py +++ b/examples/m5-examples/foundation_daily_m5.py @@ -1,5 +1,5 @@ # Databricks notebook source -# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC %pip install -r ../../requirements.txt --quiet # MAGIC dbutils.library.restartPython() # COMMAND ---------- @@ -20,6 +20,9 @@ catalog = "mmf" # Name of the catalog we use to manage our assets db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) +n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k +table = f"daily_train_{n}" # Training table name +user_email = spark.sql('select current_user() as user').collect()[0]['user'] # User email # COMMAND ---------- @@ -42,22 +45,30 @@ for model in active_models: dbutils.notebook.run( - "run_daily", + "run_daily_m5", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) + arguments={ + "catalog": catalog, + "db": db, + "model": model, + "run_id": run_id, + "table": table , + "user_email": user_email, + } + ) # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")) +display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date limit 1000")) # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds")) +display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds limit 1000")) # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) diff --git a/examples/m5-examples/global_daily_m5.py b/examples/m5-examples/global_daily_m5.py index ab335e4..197518f 100644 --- a/examples/m5-examples/global_daily_m5.py +++ b/examples/m5-examples/global_daily_m5.py @@ -1,6 +1,6 @@ # Databricks notebook source # DBTITLE 1,Install the necessary libraries -# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC %pip install -r ../../requirements.txt --quiet # MAGIC dbutils.library.restartPython() # COMMAND ---------- @@ -21,6 +21,9 @@ catalog = "mmf" # Name of the catalog we use to manage our assets db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) +n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k +table = f"daily_train_{n}" # Training table name +user_email = spark.sql('select current_user() as user').collect()[0]['user'] # User email # COMMAND ---------- @@ -44,24 +47,36 @@ for model in active_models: dbutils.notebook.run( - "run_daily", + "run_daily_m5", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) + arguments={ + "catalog": catalog, + "db": db, + "model": model, + "run_id": run_id, + "table": table , + "user_email": user_email, + } + ) # COMMAND ---------- display( - spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date") + spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date limit 1000") ) # COMMAND ---------- -display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds")) +display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds limit 1000")) # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) + +# COMMAND ---------- + + diff --git a/examples/m5-examples/local_univariate_daily_m5.py b/examples/m5-examples/local_univariate_daily_m5.py index b56b02c..d27c345 100644 --- a/examples/m5-examples/local_univariate_daily_m5.py +++ b/examples/m5-examples/local_univariate_daily_m5.py @@ -53,8 +53,8 @@ "RFableNNETAR", "RFableEnsemble", "RDynamicHarmonicRegression", - #"SKTimeTBats", - #"SKTimeLgbmDsDt", + "SKTimeTBats", + "SKTimeLgbmDsDt", ] # COMMAND ---------- @@ -78,7 +78,7 @@ backtest_months=3, stride=7, train_predict_ratio=1, - data_quality_check=False, + data_quality_check=True, resample=False, active_models=active_models, experiment_path=f"/Users/{user_email}/mmf/m5", @@ -95,11 +95,11 @@ # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output")) # COMMAND ---------- -#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) +display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output")) # COMMAND ---------- diff --git a/examples/m5-examples/run_daily_m5.py b/examples/m5-examples/run_daily_m5.py index 969006d..5018903 100644 --- a/examples/m5-examples/run_daily_m5.py +++ b/examples/m5-examples/run_daily_m5.py @@ -1,5 +1,5 @@ # Databricks notebook source -# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC %pip install -r ../../requirements.txt --quiet # MAGIC dbutils.library.restartPython() # COMMAND ---------- @@ -8,11 +8,16 @@ dbutils.widgets.text("db", "") dbutils.widgets.text("model", "") dbutils.widgets.text("run_id", "") +dbutils.widgets.text("table", "") +dbutils.widgets.text("user_email", "") + catalog = dbutils.widgets.get("catalog") db = dbutils.widgets.get("db") model = dbutils.widgets.get("model") run_id = dbutils.widgets.get("run_id") +table = dbutils.widgets.get("table") +user_email = dbutils.widgets.get("user_email") # COMMAND ---------- @@ -22,14 +27,10 @@ logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) -n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k -taining_table = f"daily_train_{n}" -user_email = spark.sql('select current_user() as user').collect()[0]['user'] - run_forecast( spark=spark, - train_data=f"{catalog}.{db}.{taining_table}", - scoring_data=f"{catalog}.{db}.{taining_table}", + train_data=f"{catalog}.{db}.{table}", + scoring_data=f"{catalog}.{db}.{table}", scoring_output=f"{catalog}.{db}.daily_scoring_output", evaluation_output=f"{catalog}.{db}.daily_evaluation_output", model_output=f"{catalog}.{db}",