diff --git a/examples/polars/notebook.ipynb b/examples/polars/notebook.ipynb index c8cad7e44..c81678590 100644 --- a/examples/polars/notebook.ipynb +++ b/examples/polars/notebook.ipynb @@ -38,8 +38,10 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/stefankrawczyk/.pyenv/versions/knowledge_retrieval-py39/lib/python3.9/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", - " warnings.warn(\n" + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", + " warnings.warn(\n", + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" ] } ], @@ -70,177 +72,177 @@ "\n", "\n", - "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", - "\n", + "\n", "\n", + "spend_zero_mean\n", + "\n", + "spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "spend_zero_mean_unit_variance\n", + "\n", + "spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "spend_zero_mean->spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_mean\n", + "\n", + "spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "spend_mean->spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", "spend_per_signup\n", "\n", "spend_per_signup\n", "Series\n", "\n", "\n", - "\n", + "\n", "avg_3wk_spend\n", - "\n", - "avg_3wk_spend\n", - "Series\n", + "\n", + "avg_3wk_spend\n", + "Series\n", "\n", - "\n", - "\n", - "spend\n", - "\n", - "spend\n", - "Series\n", + "\n", + "\n", + "base_df\n", + "\n", + "base_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "spend->spend_per_signup\n", - "\n", - "\n", + "\n", + "\n", + "signups\n", + "\n", + "signups\n", + "Series\n", "\n", - "\n", - "\n", - "spend->avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", + "base_df->signups\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_mean\n", - "\n", - "spend_mean\n", - "float\n", + "\n", + "\n", + "spend\n", + "\n", + "spend\n", + "Series\n", "\n", - "\n", - "\n", - "spend->spend_mean\n", - "\n", - "\n", + "\n", + "\n", + "base_df->spend\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "spend_std_dev\n", "\n", "spend_std_dev\n", "float\n", "\n", - "\n", + "\n", "\n", - "spend->spend_std_dev\n", - "\n", - "\n", + "spend_std_dev->spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_zero_mean\n", - "\n", - "spend_zero_mean\n", - "Series\n", + "\n", + "\n", + "signups->spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "spend->spend_zero_mean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "spend_mean->spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "base_df\n", - "\n", - "base_df\n", - "DataFrame\n", + "\n", + "\n", + "spend->spend_mean\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "base_df->spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "signups\n", - "\n", - "signups\n", - "Series\n", + "spend->spend_per_signup\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "base_df->signups\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "spend_zero_mean_unit_variance\n", - "\n", - "spend_zero_mean_unit_variance\n", - "Series\n", - "\n", - "\n", - "\n", - "spend_std_dev->spend_zero_mean_unit_variance\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "signups->spend_per_signup\n", - "\n", - "\n", + "spend->avg_3wk_spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_zero_mean->spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "spend->spend_std_dev\n", + "\n", + "\n", "\n", "\n", "\n", "_base_df_inputs\n", - "\n", - "base_df_location\n", - "str\n", + "\n", + "base_df_location\n", + "str\n", "\n", "\n", - "\n", + "\n", "_base_df_inputs->base_df\n", - "\n", - "\n", + "\n", + "\n", "\n", "\n", "\n", "input\n", - "\n", - "input\n", + "\n", + "input\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -777,7 +779,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "hamilton", "language": "python", "name": "python3" }, @@ -791,7 +793,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.10.14" } }, "nbformat": 4, diff --git a/examples/polars/with_columns/DAG_DataFrame.png b/examples/polars/with_columns/DAG_DataFrame.png new file mode 100644 index 000000000..cc3d713b7 Binary files /dev/null and b/examples/polars/with_columns/DAG_DataFrame.png differ diff --git a/examples/polars/with_columns/DAG_lazy.png b/examples/polars/with_columns/DAG_lazy.png new file mode 100644 index 000000000..d2566fd7b Binary files /dev/null and b/examples/polars/with_columns/DAG_lazy.png differ diff --git a/examples/polars/with_columns/README b/examples/polars/with_columns/README new file mode 100644 index 000000000..86db77204 --- /dev/null +++ b/examples/polars/with_columns/README @@ -0,0 +1,8 @@ +# Using with_columns with Polars + +We show the ability to use the familiar `with_columns` from `polars`. Supported for both `pl.DataFrame` and `pl.LazyFrame`. + +To see the example look at the notebook. + +![image info](./DAG_DataFrame.png) +![image info](./DAG_lazy.png) diff --git a/examples/polars/with_columns/my_functions.py b/examples/polars/with_columns/my_functions.py new file mode 100644 index 000000000..3b2c401b9 --- /dev/null +++ b/examples/polars/with_columns/my_functions.py @@ -0,0 +1,51 @@ +import polars as pl + +from hamilton.function_modifiers import config + +""" +Notes: + 1. This file is used for all the [ray|dask|spark]/hello_world examples. + 2. It therefore show cases how you can write something once and not only scale it, but port it + to different frameworks with ease! +""" + + +@config.when(case="millions") +def avg_3wk_spend__millions(spend: pl.Series) -> pl.Series: + """Rolling 3 week average spend.""" + return ( + spend.to_frame("spend").select(pl.col("spend").rolling_mean(window_size=3) / 1e6) + ).to_series(0) + + +@config.when(case="thousands") +def avg_3wk_spend__thousands(spend: pl.Series) -> pl.Series: + """Rolling 3 week average spend.""" + return ( + spend.to_frame("spend").select(pl.col("spend").rolling_mean(window_size=3) / 1e3) + ).to_series(0) + + +def spend_per_signup(spend: pl.Series, signups: pl.Series) -> pl.Series: + """The cost per signup in relation to spend.""" + return spend / signups + + +def spend_mean(spend: pl.Series) -> float: + """Shows function creating a scalar. In this case it computes the mean of the entire column.""" + return spend.mean() + + +def spend_zero_mean(spend: pl.Series, spend_mean: float) -> pl.Series: + """Shows function that takes a scalar. In this case to zero mean spend.""" + return spend - spend_mean + + +def spend_std_dev(spend: pl.Series) -> float: + """Function that computes the standard deviation of the spend column.""" + return spend.std() + + +def spend_zero_mean_unit_variance(spend_zero_mean: pl.Series, spend_std_dev: float) -> pl.Series: + """Function showing one way to make spend have zero mean and unit variance.""" + return spend_zero_mean / spend_std_dev diff --git a/examples/polars/with_columns/my_functions_lazy.py b/examples/polars/with_columns/my_functions_lazy.py new file mode 100644 index 000000000..4b65b2ac2 --- /dev/null +++ b/examples/polars/with_columns/my_functions_lazy.py @@ -0,0 +1,47 @@ +import polars as pl + +from hamilton.function_modifiers import config + +""" +Notes: + 1. This file is used for all the [ray|dask|spark]/hello_world examples. + 2. It therefore show cases how you can write something once and not only scale it, but port it + to different frameworks with ease! +""" + + +@config.when(case="millions") +def avg_3wk_spend__millions(spend: pl.Expr) -> pl.Expr: + """Rolling 3 week average spend.""" + return spend.rolling_mean(window_size=3) / 1e6 + + +@config.when(case="thousands") +def avg_3wk_spend__thousands(spend: pl.Expr) -> pl.Expr: + """Rolling 3 week average spend.""" + return spend.rolling_mean(window_size=3) / 1e3 + + +def spend_per_signup(spend: pl.Expr, signups: pl.Expr) -> pl.Expr: + """The cost per signup in relation to spend.""" + return spend / signups + + +def spend_mean(spend: pl.Expr) -> float: + """Shows function creating a scalar. In this case it computes the mean of the entire column.""" + return spend.mean() + + +def spend_zero_mean(spend: pl.Expr, spend_mean: float) -> pl.Expr: + """Shows function that takes a scalar. In this case to zero mean spend.""" + return spend - spend_mean + + +def spend_std_dev(spend: pl.Expr) -> float: + """Function that computes the standard deviation of the spend column.""" + return spend.std() + + +def spend_zero_mean_unit_variance(spend_zero_mean: pl.Expr, spend_std_dev: float) -> pl.Expr: + """Function showing one way to make spend have zero mean and unit variance.""" + return spend_zero_mean / spend_std_dev diff --git a/examples/polars/with_columns/notebook.ipynb b/examples/polars/with_columns/notebook.ipynb new file mode 100644 index 000000000..39bd66d35 --- /dev/null +++ b/examples/polars/with_columns/notebook.ipynb @@ -0,0 +1,1239 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Execute this cell to install dependencies\n", + "%pip install sf-hamilton[visualization]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Example of using with_columns for Polars DataFrame [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/polars/with_columns/notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dagworks-inc/hamilton/blob/main/examples/polars/with_columns/notebook.ipynb)\n", + "\n", + "This allows you to efficiently run groups of map operations on a dataframe.\n", + "Here's an example of calling it -- if you've seen `@subdag`, you should be familiar with the concepts." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "%reload_ext hamilton.plugins.jupyter_magic\n", + "from hamilton import driver\n", + "import my_functions\n", + "\n", + "my_builder = driver.Builder().with_modules(my_functions).with_config({\"case\":\"thousands\"})\n", + "output_node = [\"final_df\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "case\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", + "\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%cell_to_module with_columns_example --builder my_builder --display --execute output_node\n", + "import polars as pl\n", + "from hamilton.plugins.h_polars import with_columns\n", + "import my_functions\n", + "\n", + "output_columns = [\n", + " \"spend\",\n", + " \"signups\",\n", + " \"avg_3wk_spend\",\n", + " \"spend_per_signup\",\n", + " \"spend_zero_mean_unit_variance\",\n", + "]\n", + "\n", + "def initial_df()->pl.DataFrame:\n", + " return pl.DataFrame(\n", + " { \n", + " \"signups\": pl.Series([1, 10, 50, 100, 200, 400]),\n", + " \"spend\": pl.Series([10, 10, 20, 40, 40, 50])*1e6,\n", + " }\n", + " )\n", + "\n", + "# the with_columns call\n", + "@with_columns(\n", + " *[my_functions],\n", + " columns_to_pass=[\"spend\", \"signups\"], # The columns to select from the dataframe\n", + " # select=output_columns, # The columns to append to the dataframe\n", + " # config_required = [\"a\"]\n", + ")\n", + "def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:\n", + " return initial_df" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "shape: (6, 6)\n", + "┌─────────┬───────┬───────────────┬──────────────────┬─────────────────┬───────────────────────────┐\n", + "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean ┆ spend_zero_mean_unit_vari │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ ance │\n", + "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ --- │\n", + "│ ┆ ┆ ┆ ┆ ┆ f64 │\n", + "╞═════════╪═══════╪═══════════════╪══════════════════╪═════════════════╪═══════════════════════════╡\n", + "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -8.3333e6 ┆ -0.483821 │\n", + "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 2.1667e7 ┆ 1.257934 │\n", + "└─────────┴───────┴───────────────┴──────────────────┴─────────────────┴───────────────────────────┘\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "case\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", + "\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import with_columns_example\n", + "dr = driver.Builder().with_modules(my_functions, with_columns_example).with_config({\"case\":\"millions\"}).build()\n", + "print(dr.execute(final_vars=[\"final_df\"])[\"final_df\"])\n", + "dr.visualize_execution(final_vars=[\"final_df\"])\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Example of using with_columns for Polars LazyFrame\n", + "\n", + "This allows you to efficiently run groups of map operations on a dataframe.\n", + "Here's an example of calling it -- if you've seen `@subdag`, you should be familiar with the concepts." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "%reload_ext hamilton.plugins.jupyter_magic\n", + "from hamilton import driver\n", + "import my_functions_lazy\n", + "\n", + "my_builder_lazy = driver.Builder().with_modules(my_functions_lazy).with_config({\"case\":\"thousands\"})\n", + "output_node = [\"final_df\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "case\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", + "\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Expr\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%cell_to_module with_columns_lazy_example --builder my_builder_lazy --display --execute output_node\n", + "import polars as pl\n", + "from hamilton.plugins.h_polars_lazyframe import with_columns\n", + "import my_functions_lazy\n", + "\n", + "output_columns = [\n", + " \"spend\",\n", + " \"signups\",\n", + " \"avg_3wk_spend\",\n", + " \"spend_per_signup\",\n", + " \"spend_zero_mean_unit_variance\",\n", + "]\n", + "\n", + "def initial_df()->pl.LazyFrame:\n", + " return pl.DataFrame(\n", + " { \n", + " \"signups\": pl.Series([1, 10, 50, 100, 200, 400]),\n", + " \"spend\": pl.Series([10, 10, 20, 40, 40, 50])*1e6,\n", + " }\n", + " ).lazy()\n", + "\n", + "# the with_columns call\n", + "@with_columns(\n", + " *[my_functions_lazy],\n", + " columns_to_pass=[\"spend\", \"signups\"], # The columns to select from the dataframe\n", + " # select=output_columns, # The columns to append to the dataframe\n", + " # config_required = [\"a\"]\n", + ")\n", + "def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame:\n", + " return initial_df" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "shape: (6, 6)\n", + "┌─────────┬───────┬───────────────┬──────────────────┬─────────────────┬───────────────────────────┐\n", + "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean ┆ spend_zero_mean_unit_vari │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ ance │\n", + "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ --- │\n", + "│ ┆ ┆ ┆ ┆ ┆ f64 │\n", + "╞═════════╪═══════╪═══════════════╪══════════════════╪═════════════════╪═══════════════════════════╡\n", + "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -8.3333e6 ┆ -0.483821 │\n", + "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 2.1667e7 ┆ 1.257934 │\n", + "└─────────┴───────┴───────────────┴──────────────────┴─────────────────┴───────────────────────────┘\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "case\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", + "\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Expr\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "config\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import with_columns_lazy_example\n", + "from hamilton import base\n", + "from hamilton.plugins import h_polars\n", + "\n", + "dr = (\n", + " driver.Builder()\n", + " .with_adapter(\n", + " adapter=base.SimplePythonGraphAdapter(result_builder=h_polars.PolarsDataFrameResult()))\n", + " .with_modules(my_functions_lazy, with_columns_lazy_example)\n", + " .with_config({\"case\":\"millions\"})\n", + " .build()\n", + " )\n", + "print(dr.execute(final_vars=[\"final_df\"]))\n", + "dr.visualize_execution(final_vars=[\"final_df\"])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "hamilton", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/hamilton/plugins/h_polars.py b/hamilton/plugins/h_polars.py index 799882a30..4ef8609ab 100644 --- a/hamilton/plugins/h_polars.py +++ b/hamilton/plugins/h_polars.py @@ -1,8 +1,27 @@ -from typing import Any, Dict, Type, Union +import sys +from types import ModuleType +from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union, get_type_hints import polars as pl -from hamilton import base +_sys_version_info = sys.version_info +_version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) + +if _version_tuple < (3, 11, 0): + pass +else: + pass + +# Copied this over from function_graph +# TODO -- determine the best place to put this code +from hamilton import base, node, registry +from hamilton.function_modifiers.expanders import extract_columns +from hamilton.function_modifiers.recursive import ( + _default_inject_parameter, + subdag, + with_columns_base, +) +from hamilton.plugins.polars_extensions import DATAFRAME_TYPE class PolarsDataFrameResult(base.ResultMixin): @@ -54,3 +73,216 @@ def build_result( def output_type(self) -> Type: return pl.DataFrame + + +# Do we need this here? +class with_columns(with_columns_base): + """Initializes a with_columns decorator for polars. + + This allows you to efficiently run groups of map operations on a dataframe. We support + both eager and lazy mode in polars. In case of using eager mode the type should be + pl.DataFrame and the subsequent operations run on columns with type pl.Series. + + Here's an example of calling in eager mode -- if you've seen ``@subdag``, you should be familiar with + the concepts: + + .. code-block:: python + + # my_module.py + def a_b_average(a: pl.Series, b: pl.Series) -> pl.Series: + return (a + b) / 2 + + + .. code-block:: python + + # with_columns_module.py + def a_plus_b(a: pl.Series, b: pl.Series) -> pl.Series: + return a + b + + + # the with_columns call + @with_columns( + *[my_module], # Load from any module + *[a_plus_b], # or list operations directly + columns_to_pass=["a", "b"], # The columns to pass from the dataframe to + # the subdag + select=["a_plus_b", "a_b_average"], # The columns to append to the dataframe + ) + def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: + # process, or just return unprocessed + ... + + In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``. + + Note that the operation is "append", meaning that the columns that are selected are appended + onto the dataframe. + + If the function takes multiple dataframes, the dataframe input to process will always be + the first argument. This will be passed to the subdag, transformed, and passed back to the function. + This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code + above, the dataframe that is passed to the subdag is `initial_df`. That is transformed + by the subdag, and then returned as the final dataframe. + + You can read it as: + + "final_df is a function that transforms the upstream dataframe initial_df, running the transformations + from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns + a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." + + In case you need more flexibility you can alternatively use ``on_input``, for example, + + .. code-block:: python + + # with_columns_module.py + def a_from_df() -> pl.Expr: + return pl.col(a).alias("a") / 100 + + def b_from_df() -> pl.Expr: + return pl.col(b).alias("b") / 100 + + + # the with_columns call + @with_columns( + *[my_module], + on_input="initial_df", + select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: + # process, or just return unprocessed + ... + + the above would output a dataframe where the two columns ``a`` and ``b`` get + overwritten. + """ + + def __init__( + self, + *load_from: Union[Callable, ModuleType], + columns_to_pass: List[str] = None, + pass_dataframe_as: str = None, + on_input: str = None, + select: List[str] = None, + namespace: str = None, + config_required: List[str] = None, + ): + """Instantiates a ``@with_columns`` decorator. + + :param load_from: The functions or modules that will be used to generate the group of map operations. + :param columns_to_pass: The initial schema of the dataframe. This is used to determine which + upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is + left empty (and external_inputs is as well), we will assume that all dependencies come + from the dataframe. This cannot be used in conjunction with on_input. + :param on_input: The name of the dataframe that we're modifying, as known to the subdag. + If you pass this in, you are responsible for extracting columns out. If not provided, you have + to pass columns_to_pass in, and we will extract the columns out on the first parameter for you. + :param select: The end nodes that represent columns to be appended to the original dataframe + via with_columns. Existing columns will be overridden. The selected nodes need to have the + corresponding column type, in this case pl.Series, to be appended to the original dataframe. + :param namespace: The namespace of the nodes, so they don't clash with the global namespace + and so this can be reused. If its left out, there will be no namespace (in which case you'll want + to be careful about repeating it/reusing the nodes in other parts of the DAG.) + :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ + if you want the functions/modules to have access to all possible config. + """ + + if pass_dataframe_as is not None: + raise NotImplementedError( + "We currently do not support pass_dataframe_as for pandas. Please reach out if you need this " + "functionality." + ) + + super().__init__( + *load_from, + columns_to_pass=columns_to_pass, + on_input=on_input, + select=select, + namespace=namespace, + config_required=config_required, + dataframe_type=DATAFRAME_TYPE, + ) + + def _create_column_nodes( + self, fn: Callable, inject_parameter: str, params: Dict[str, Type[Type]] + ) -> List[node.Node]: + output_type = params[inject_parameter] + + def temp_fn(**kwargs) -> Any: + return kwargs[inject_parameter] + + # We recreate the df node to use extract columns + temp_node = node.Node( + name=inject_parameter, + typ=output_type, + callabl=temp_fn, + input_types={inject_parameter: output_type}, + ) + + extract_columns_decorator = extract_columns(*self.initial_schema) + + out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) + return out_nodes[1:] + + def get_initial_nodes( + self, fn: Callable, params: Dict[str, Type[Type]] + ) -> Tuple[str, Collection[node.Node]]: + """Selects the correct dataframe and optionally extracts out columns.""" + inject_parameter = _default_inject_parameter(fn=fn, target_dataframe=self.target_dataframe) + with_columns_base.validate_dataframe( + fn=fn, + inject_parameter=inject_parameter, + params=params, + required_type=self.dataframe_type, + ) + + initial_nodes = ( + [] + if self.target_dataframe is not None + else self._create_column_nodes(fn=fn, inject_parameter=inject_parameter, params=params) + ) + + return inject_parameter, initial_nodes + + def get_subdag_nodes(self, fn: Callable, config: Dict[str, Any]) -> Collection[node.Node]: + return subdag.collect_nodes(config, self.subdag_functions) + + def chain_subdag_nodes( + self, fn: Callable, inject_parameter: str, generated_nodes: Collection[node.Node] + ) -> node.Node: + "Node that adds to / overrides columns for the original dataframe based on selected output." + + if self.select is None: + self.select = [ + sink_node.name + for sink_node in generated_nodes + if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type) + ] + + def new_callable(**kwargs) -> Any: + df = kwargs[inject_parameter] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] + + return df.with_columns(**columns_to_append) + + column_type = registry.get_column_type_from_df_type(self.dataframe_type) + input_map = {column: column_type for column in self.select} + input_map[inject_parameter] = self.dataframe_type + merge_node = node.Node( + name="_append", + typ=self.dataframe_type, + callabl=new_callable, + input_types=input_map, + ) + output_nodes = generated_nodes + [merge_node] + return output_nodes, merge_node.name + + def validate(self, fn: Callable): + inject_parameter = _default_inject_parameter(fn=fn, target_dataframe=self.target_dataframe) + params = get_type_hints(fn) + with_columns_base.validate_dataframe( + fn=fn, + inject_parameter=inject_parameter, + params=params, + required_type=self.dataframe_type, + ) diff --git a/hamilton/plugins/h_polars_lazyframe.py b/hamilton/plugins/h_polars_lazyframe.py index a933762a7..00f4326e1 100644 --- a/hamilton/plugins/h_polars_lazyframe.py +++ b/hamilton/plugins/h_polars_lazyframe.py @@ -1,8 +1,16 @@ -from typing import Any, Dict, Type, Union +from types import ModuleType +from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union, get_type_hints import polars as pl -from hamilton import base +from hamilton import base, node, registry +from hamilton.function_modifiers.expanders import extract_columns +from hamilton.function_modifiers.recursive import ( + _default_inject_parameter, + subdag, + with_columns_base, +) +from hamilton.plugins.polars_lazyframe_extensions import DATAFRAME_TYPE class PolarsLazyFrameResult(base.ResultMixin): @@ -45,3 +53,214 @@ def build_result( def output_type(self) -> Type: return pl.LazyFrame + + +class with_columns(with_columns_base): + """Initializes a with_columns decorator for polars. + + This allows you to efficiently run groups of map operations on a dataframe. We support + both eager and lazy mode in polars. For lazy execution, use pl.LazyFrame and the subsequent + operations should be typed as pl.Expr. See examples/polars/with_columns for a practical + implementation in both variations. + + The lazy execution would be: + + .. code-block:: python + + # my_module.py + def a_b_average(a: pl.Expr, b: pl.Expr) -> pl.Expr: + return (a + b) / 2 + + + .. code-block:: python + + # with_columns_module.py + def a_plus_b(a: pl.Expr, b: pl.Expr) -> pl.Expr: + return a + b + + + # the with_columns call + @with_columns( + *[my_module], # Load from any module + *[a_plus_b], # or list operations directly + columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to + # the subdag + select=["a_plus_b", "a_b_average"], # The columns to append to the dataframe + ) + def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: + # process, or just return unprocessed + ... + + Note that the operation is "append", meaning that the columns that are selected are appended + onto the dataframe. + + If the function takes multiple dataframes, the dataframe input to process will always be + the first argument. This will be passed to the subdag, transformed, and passed back to the function. + This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code + above, the dataframe that is passed to the subdag is `initial_df`. That is transformed + by the subdag, and then returned as the final dataframe. + + You can read it as: + + "final_df is a function that transforms the upstream dataframe initial_df, running the transformations + from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns + a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." + + In case you need more flexibility you can alternatively use ``on_input``, for example, + + .. code-block:: python + + # with_columns_module.py + def a_from_df() -> pl.Expr: + return pl.col(a).alias("a") / 100 + + def b_from_df() -> pd.Expr: + return pl.col(a).alias("b") / 100 + + + # the with_columns call + @with_columns( + *[my_module], + on_input="initial_df", + select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: + # process, or just return unprocessed + ... + + the above would output a dataframe where the two columns ``a`` and ``b`` get + overwritten. + """ + + def __init__( + self, + *load_from: Union[Callable, ModuleType], + columns_to_pass: List[str] = None, + pass_dataframe_as: str = None, + on_input: str = None, + select: List[str] = None, + namespace: str = None, + config_required: List[str] = None, + ): + """Instantiates a ``@with_columns`` decorator. + + :param load_from: The functions or modules that will be used to generate the group of map operations. + :param columns_to_pass: The initial schema of the dataframe. This is used to determine which + upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is + left empty (and external_inputs is as well), we will assume that all dependencies come + from the dataframe. This cannot be used in conjunction with on_input. + :param on_input: The name of the dataframe that we're modifying, as known to the subdag. + If you pass this in, you are responsible for extracting columns out. If not provided, you have + to pass columns_to_pass in, and we will extract the columns out on the first parameter for you. + :param select: The end nodes that represent columns to be appended to the original dataframe + via with_columns. Existing columns will be overridden. The selected nodes need to have the + corresponding column type, in this case pl.Expr, to be appended to the original dataframe. + :param namespace: The namespace of the nodes, so they don't clash with the global namespace + and so this can be reused. If its left out, there will be no namespace (in which case you'll want + to be careful about repeating it/reusing the nodes in other parts of the DAG.) + :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ + if you want the functions/modules to have access to all possible config. + """ + + if pass_dataframe_as is not None: + raise NotImplementedError( + "We currently do not support pass_dataframe_as for pandas. Please reach out if you need this " + "functionality." + ) + + super().__init__( + *load_from, + columns_to_pass=columns_to_pass, + on_input=on_input, + select=select, + namespace=namespace, + config_required=config_required, + dataframe_type=DATAFRAME_TYPE, + ) + + def _create_column_nodes( + self, fn: Callable, inject_parameter: str, params: Dict[str, Type[Type]] + ) -> List[node.Node]: + output_type = params[inject_parameter] + + def temp_fn(**kwargs) -> Any: + return kwargs[inject_parameter] + + # We recreate the df node to use extract columns + temp_node = node.Node( + name=inject_parameter, + typ=output_type, + callabl=temp_fn, + input_types={inject_parameter: output_type}, + ) + + extract_columns_decorator = extract_columns(*self.initial_schema) + + out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) + return out_nodes[1:] + + def get_initial_nodes( + self, fn: Callable, params: Dict[str, Type[Type]] + ) -> Tuple[str, Collection[node.Node]]: + """Selects the correct dataframe and optionally extracts out columns.""" + inject_parameter = _default_inject_parameter(fn=fn, target_dataframe=self.target_dataframe) + + with_columns_base.validate_dataframe( + fn=fn, + inject_parameter=inject_parameter, + params=params, + required_type=self.dataframe_type, + ) + + initial_nodes = ( + [] + if self.target_dataframe is not None + else self._create_column_nodes(fn=fn, inject_parameter=inject_parameter, params=params) + ) + + return inject_parameter, initial_nodes + + def get_subdag_nodes(self, fn: Callable, config: Dict[str, Any]) -> Collection[node.Node]: + return subdag.collect_nodes(config, self.subdag_functions) + + def chain_subdag_nodes( + self, fn: Callable, inject_parameter: str, generated_nodes: Collection[node.Node] + ) -> node.Node: + "Node that adds to / overrides columns for the original dataframe based on selected output." + + if self.select is None: + self.select = [ + sink_node.name + for sink_node in generated_nodes + if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type) + ] + + def new_callable(**kwargs) -> Any: + df = kwargs[inject_parameter] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] + + return df.with_columns(**columns_to_append) + + column_type = registry.get_column_type_from_df_type(self.dataframe_type) + input_map = {column: column_type for column in self.select} + input_map[inject_parameter] = self.dataframe_type + merge_node = node.Node( + name="_append", + typ=self.dataframe_type, + callabl=new_callable, + input_types=input_map, + ) + output_nodes = generated_nodes + [merge_node] + return output_nodes, merge_node.name + + def validate(self, fn: Callable): + inject_parameter = _default_inject_parameter(fn=fn, target_dataframe=self.target_dataframe) + params = get_type_hints(fn) + with_columns_base.validate_dataframe( + fn=fn, + inject_parameter=inject_parameter, + params=params, + required_type=self.dataframe_type, + ) diff --git a/plugin_tests/h_polars/__init__.py b/plugin_tests/h_polars/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_polars/conftest.py b/plugin_tests/h_polars/conftest.py new file mode 100644 index 000000000..bc5ef5b5a --- /dev/null +++ b/plugin_tests/h_polars/conftest.py @@ -0,0 +1,4 @@ +from hamilton import telemetry + +# disable telemetry for all tests! +telemetry.disable_telemetry() diff --git a/plugin_tests/h_polars/resources/__init__.py b/plugin_tests/h_polars/resources/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_polars/resources/with_columns_end_to_end.py b/plugin_tests/h_polars/resources/with_columns_end_to_end.py new file mode 100644 index 000000000..a893818fc --- /dev/null +++ b/plugin_tests/h_polars/resources/with_columns_end_to_end.py @@ -0,0 +1,68 @@ +import polars as pl + +from hamilton.function_modifiers import config +from hamilton.plugins.h_polars import with_columns + + +def upstream_factor() -> int: + return 3 + + +def initial_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]}) + + +def subtract_1_from_2(col_1: pl.Series, col_2: pl.Series) -> pl.Series: + return col_2 - col_1 + + +@config.when(factor=5) +def multiply_3__by_5(col_3: pl.Series) -> pl.Series: + return col_3 * 5 + + +@config.when(factor=7) +def multiply_3__by_7(col_3: pl.Series) -> pl.Series: + return col_3 * 7 + + +def add_1_by_user_adjustment_factor(col_1: pl.Series, user_factor: int) -> pl.Series: + return col_1 + user_factor + + +def multiply_2_by_upstream_3(col_2: pl.Series, upstream_factor: int) -> pl.Series: + return col_2 * upstream_factor + + +@with_columns( + subtract_1_from_2, + multiply_3__by_5, + multiply_3__by_7, + add_1_by_user_adjustment_factor, + multiply_2_by_upstream_3, + columns_to_pass=["col_1", "col_2", "col_3"], + select=[ + "subtract_1_from_2", + "multiply_3", + "add_1_by_user_adjustment_factor", + "multiply_2_by_upstream_3", + ], + namespace="some_subdag", +) +def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: + return initial_df + + +def col_3(initial_df: pl.DataFrame) -> pl.Series: + return pl.Series([0, 2, 4, 6]) + + +@with_columns( + col_3, + multiply_3__by_5, + multiply_3__by_7, + on_input="initial_df", + select=["col_3", "multiply_3"], +) +def final_df_2(initial_df: pl.DataFrame) -> pl.DataFrame: + return initial_df diff --git a/plugin_tests/h_polars/resources/with_columns_end_to_end_lazy.py b/plugin_tests/h_polars/resources/with_columns_end_to_end_lazy.py new file mode 100644 index 000000000..367cfacf4 --- /dev/null +++ b/plugin_tests/h_polars/resources/with_columns_end_to_end_lazy.py @@ -0,0 +1,80 @@ +import polars as pl + +from hamilton.function_modifiers import config +from hamilton.plugins.h_polars_lazyframe import with_columns + + +def upstream_factor() -> int: + return 3 + + +def initial_df() -> pl.LazyFrame: + return pl.DataFrame( + {"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]} + ).lazy() + + +def subtract_1_from_2(col_1: pl.Expr, col_2: pl.Expr) -> pl.Expr: + return col_2 - col_1 + + +@config.when(factor=5) +def multiply_3__by_5(col_3: pl.Expr) -> pl.Expr: + return col_3 * 5 + + +@config.when(factor=7) +def multiply_3__by_7(col_3: pl.Expr) -> pl.Expr: + return col_3 * 7 + + +def add_1_by_user_adjustment_factor(col_1: pl.Expr, user_factor: int) -> pl.Expr: + return col_1 + user_factor + + +def multiply_2_by_upstream_3(col_2: pl.Expr, upstream_factor: int) -> pl.Expr: + return col_2 * upstream_factor + + +@with_columns( + subtract_1_from_2, + multiply_3__by_5, + multiply_3__by_7, + add_1_by_user_adjustment_factor, + multiply_2_by_upstream_3, + columns_to_pass=["col_1", "col_2", "col_3"], + select=[ + "subtract_1_from_2", + "multiply_3", + "add_1_by_user_adjustment_factor", + "multiply_2_by_upstream_3", + ], + namespace="some_subdag", +) +def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: + return initial_df + + +def col_1(initial_df: pl.LazyFrame) -> pl.Expr: + return pl.col("col_1") + + +@config.when(factor=5) +def multiply_1__by_5(col_1: pl.Expr) -> pl.Expr: + return col_1 * 5 + + +@config.when_not(factor=5) +def multiply_1__by_1(col_1: pl.Expr) -> pl.Expr: + return col_1 * 1 + + +@with_columns( + col_1, + multiply_1__by_5, + multiply_1__by_1, + on_input="initial_df", + select=["col_1", "multiply_1"], +) +def final_df_2(initial_df: pl.LazyFrame) -> pl.LazyFrame: + return initial_df diff --git a/plugin_tests/h_polars/test_with_columns.py b/plugin_tests/h_polars/test_with_columns.py new file mode 100644 index 000000000..151347fb7 --- /dev/null +++ b/plugin_tests/h_polars/test_with_columns.py @@ -0,0 +1,265 @@ +import polars as pl +import pytest +from polars.testing import assert_frame_equal + +from hamilton import driver, node +from hamilton.function_modifiers.base import NodeInjector +from hamilton.plugins.h_polars import with_columns + +from .resources import with_columns_end_to_end + + +def test_create_column_nodes_pass_dataframe(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def target_fn(some_var: int, upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, on_input="upstream_df", select=["dummy_fn_with_columns"] + ) + + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + inject_parameter, initial_nodes = decorator.get_initial_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 0 + + +def test_create_column_nodes_extract_single_columns(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator.get_initial_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 1 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[0].type == pl.Series + pl.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pl.Series([1, 2, 3, 4]), + check_names=False, + ) + + +def test_create_column_nodes_extract_multiple_columns(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator.get_initial_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 2 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[1].name == "col_2" + assert initial_nodes[0].type == pl.Series + assert initial_nodes[1].type == pl.Series + pl.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pl.Series([1, 2, 3, 4]), + check_names=False, + ) + pl.testing.assert_series_equal( + initial_nodes[1].callable(upstream_df=dummy_df()), + pl.Series([11, 12, 13, 14]), + check_names=False, + ) + + +def test_no_matching_select_column_error(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + select = "wrong_column" + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=select + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + with pytest.raises(ValueError): + decorator.inject_nodes(injectable_params, {}, fn=target_fn) + + +def test_append_into_original_df(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + + output_nodes, _ = decorator.chain_subdag_nodes( + fn=target_fn, inject_parameter="upstream_df", generated_nodes=[] + ) + merge_node = output_nodes[-1] + + output_df = merge_node.callable( + upstream_df=dummy_df(), + dummy_fn_with_columns=dummy_fn_with_columns(col_1=pl.Series([1, 2, 3, 4])), + ) + assert merge_node.name == "__append" + assert merge_node.type == pl.DataFrame + + pl.testing.assert_series_equal(output_df["col_1"], pl.Series([1, 2, 3, 4]), check_names=False) + pl.testing.assert_series_equal( + output_df["col_2"], pl.Series([11, 12, 13, 14]), check_names=False + ) + pl.testing.assert_series_equal( + output_df["dummy_fn_with_columns"], pl.Series([101, 102, 103, 104]), check_names=False + ) + + +def test_override_original_column_in_df(): + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + def col_1() -> pl.Series: + return pl.col("col_1") * 100 + + decorator = with_columns(col_1, on_input="upstream_df", select=["col_1"]) + + output_nodes, _ = decorator.chain_subdag_nodes( + fn=target_fn, inject_parameter="upstream_df", generated_nodes=[] + ) + merge_node = output_nodes[-1] + + output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) + assert merge_node.name == "__append" + assert merge_node.type == pl.DataFrame + + pl.testing.assert_series_equal( + output_df["col_1"], pl.Series([100, 200, 300, 400]), check_names=False + ) + pl.testing.assert_series_equal( + output_df["col_2"], pl.Series([11, 12, 13, 14]), check_names=False + ) + + +def test_assign_custom_namespace_with_columns(): + def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + decorator = with_columns( + dummy_fn_with_columns, + columns_to_pass=["col_1", "col_2"], + select=["dummy_fn_with_columns"], + namespace="dummy_namespace", + ) + nodes_ = decorator.transform_dag([dummy_node], {}, target_fn) + + assert nodes_[0].name == "target_fn" + assert nodes_[1].name == "dummy_namespace.dummy_fn_with_columns" + assert nodes_[2].name == "dummy_namespace.col_1" + assert nodes_[3].name == "dummy_namespace.__append" + + +def test_end_to_end_with_columns_automatic_extract(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [5, 5, 5, 5], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pl.testing.assert_frame_equal(result, expected_df) + + config_7 = { + "factor": 7, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_7).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [7, 7, 7, 7], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + assert_frame_equal(result, expected_df) + + +def test_end_to_end_with_columns_pass_dataframe(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + + result = dr.execute(final_vars=["final_df_2"])["final_df_2"] + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [0, 2, 4, 6], + "multiply_3": [0, 10, 20, 30], + } + ) + assert_frame_equal(result, expected_df) diff --git a/plugin_tests/h_polars/test_with_columns_lazy.py b/plugin_tests/h_polars/test_with_columns_lazy.py new file mode 100644 index 000000000..2cb52c4db --- /dev/null +++ b/plugin_tests/h_polars/test_with_columns_lazy.py @@ -0,0 +1,64 @@ +import polars as pl +from polars.testing import assert_frame_equal + +from hamilton import driver + +from .resources import with_columns_end_to_end_lazy + + +def test_end_to_end_with_columns_automatic_extract_lazy(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end_lazy).with_config(config_5).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [5, 5, 5, 5], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pl.testing.assert_frame_equal(result.collect(), expected_df) + + config_7 = { + "factor": 7, + } + dr = driver.Builder().with_modules(with_columns_end_to_end_lazy).with_config(config_7).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [7, 7, 7, 7], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + assert_frame_equal(result.collect(), expected_df) + + +def test_end_to_end_with_columns_pass_dataframe_lazy(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end_lazy).with_config(config_5).build() + + result = dr.execute(final_vars=["final_df_2"])["final_df_2"] + expected_df = pl.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "multiply_1": [5, 10, 15, 20], + } + ) + assert_frame_equal(result.collect(), expected_df)