diff --git a/examples/pandas/with_columns/notebook.ipynb b/examples/pandas/with_columns/notebook.ipynb index 97e355edd..af3371c70 100644 --- a/examples/pandas/with_columns/notebook.ipynb +++ b/examples/pandas/with_columns/notebook.ipynb @@ -598,16 +598,25 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", + " warnings.warn(\n" + ] + } + ], "source": [ "%reload_ext hamilton.plugins.jupyter_magic" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -632,11 +641,11 @@ "\n", "\n", "async def mean_b(b: pd.Series) -> pd.Series:\n", - " await asyncio.sleep(0.0001)\n", + " await asyncio.sleep(5)\n", " return b.mean()\n", "\n", "async def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:\n", - " await asyncio.sleep(0.0001)\n", + " await asyncio.sleep(1)\n", " return a + b\n", "\n", "async def multiply_a_plus_mean_b(multiply_a: pd.Series, mean_b: pd.Series) -> pd.Series:\n", @@ -654,7 +663,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 5, "metadata": {}, "outputs": [ { diff --git a/hamilton/function_modifiers/README b/hamilton/function_modifiers/README new file mode 100644 index 000000000..18c326a09 --- /dev/null +++ b/hamilton/function_modifiers/README @@ -0,0 +1,19 @@ +# with_columns_base + +Documenting the current design flow for the `with_columns` decorator. It belongs to the `NodeInjector` lifecycle. + +The `with_columns` consists of three parts that are represented in the corresponding three abstract methods in `with_columns_base`: + +1. `get_initial_nodes` -- Input node(s): Either a dataframe if `pass_datafame_as` is used or extracted columns into nodes if `columns_to_pass` and is library specific. +2. `get_subdag_nodes` -- Subdag nodes: Creating the `subdag` is outsourced to `recursive.subdag`, left flexibility to pre- and post-process since some libraries need that (see h_spark). +3. `create_merge_node` -- Merge node: The append functionality between dataframe and selected columns is library specific. + +Each plugin library that can implement `with_columns` should subclass from this base class and implement the three abstract methods (four since `validate()` is also abstract). The child +classes need to override the `init` where they call out to the parent `init` and pass in `dataframe_type` which is registered in the corresponding `extensions` and has information of what +columns types are permitted for the given dataframe type. + +Keeping it for now loosely coupled to the `registry` and detached from `ResultBuilder`. The API is private, should we want to switch to `registry`, the refactoring is straightforward and shouldn't get us into trouble down the road. + +## NOTE +The handling of scalars and dataframe types varies between library to library. We made the decision that such a thing should not be permissible, so all the selected columns that want to be +appended to the original dataframe need to have the matching column type that is registered in the `registry` and set in the library extension modules. diff --git a/hamilton/function_modifiers/recursive.py b/hamilton/function_modifiers/recursive.py index 20adc3250..72851802e 100644 --- a/hamilton/function_modifiers/recursive.py +++ b/hamilton/function_modifiers/recursive.py @@ -630,96 +630,23 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L return output -class with_columns_factory(base.NodeInjector, abc.ABC): +class with_columns_base(base.NodeInjector, abc.ABC): """Factory for with_columns operation on a dataframe. This is used when you want to extract some columns out of the dataframe, perform operations on them and then append to the original dataframe. - Here's an example of calling it on a pandas dataframe -- if you've seen ``@subdag``, you should be familiar with - the concepts: + This is an internal class that is meant to be extended by each individual dataframe library implementing + the following abstract methods: - .. code-block:: python - - # my_module.py - def a(a_from_df: pd.Series) -> pd.Series: - return _process(a) - - def b(b_from_df: pd.Series) -> pd.Series: - return _process(b) - - def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series: - return (a_from_df + b_from_df) / 2 - - - .. code-block:: python - - # with_columns_module.py - def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series: - return a + b - - - # the with_columns call - @with_columns( - *[my_module], # Load from any module - *[a_plus_b], # or list operations directly - columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to - # the subdag - select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe - ) - def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - # process, or just return unprocessed - ... - - In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``. - - The operations are applied in topological order. This allows you to - express the operations individually, making it easy to unit-test and reuse. - - Note that the operation is "append", meaning that the columns that are selected are appended - onto the dataframe. - - If the function takes multiple dataframes, the dataframe input to process will always be - the first argument. This will be passed to the subdag, transformed, and passed back to the function. - This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code - above, the dataframe that is passed to the subdag is `initial_df`. That is transformed - by the subdag, and then returned as the final dataframe. - - You can read it as: - - "final_df is a function that transforms the upstream dataframe initial_df, running the transformations - from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns - a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." - - In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example, - - .. code-block:: python - - # with_columns_module.py - def a_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["a_from_df"] / 100 - - def b_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["b_from_df"] / 100 - - - # the with_columns call - @with_columns( - *[my_module], - *[a_from_df], - columns_to_pass=["a_from_df", "b_from_df"], - select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], - ) - def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - # process, or just return unprocessed - ... - - the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get - overwritten. + - get_initial_nodes + - get_subdag_nodes + - create_merge_node + - validate """ # TODO: if we rename the column nodes into something smarter this can be avoided and # can also modify columns in place @staticmethod - def _check_for_duplicates(nodes_: List[node.Node]) -> bool: + def contains_duplicates(nodes_: List[node.Node]) -> bool: """Ensures that we don't run into name clashing of columns and group operations. In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want @@ -748,7 +675,7 @@ def validate_dataframe( f"It might not be compatible with some other decorators." ) - if input_types[inject_parameter] != required_type: + if isinstance(input_types[inject_parameter], required_type): raise InvalidDecoratorException( "The selected dataframe parameter is not the correct dataframe type. " f"You selected a parameter of type {input_types[inject_parameter]}, but we expect to get {required_type}" @@ -865,7 +792,7 @@ def inject_nodes( # pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the # initial node names with the ongoing ones that have a column argument, we can also allow in place # changes when using columns_to_pass - if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes): + if with_columns_base.contains_duplicates(initial_nodes + subdag_nodes): raise ValueError( "You can only specify columns once. You used `columns_to_pass` and we " "extract the columns for you. In this case they cannot be overwritten -- only new columns get " diff --git a/hamilton/plugins/h_pandas.py b/hamilton/plugins/h_pandas.py index 047aa3067..dff4fee0f 100644 --- a/hamilton/plugins/h_pandas.py +++ b/hamilton/plugins/h_pandas.py @@ -13,11 +13,11 @@ from hamilton import node, registry from hamilton.function_modifiers.expanders import extract_columns -from hamilton.function_modifiers.recursive import subdag, with_columns_factory +from hamilton.function_modifiers.recursive import subdag, with_columns_base from hamilton.plugins.pandas_extensions import DATAFRAME_TYPE -class with_columns(with_columns_factory): +class with_columns(with_columns_base): """Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe. Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with @@ -79,24 +79,24 @@ def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: .. code-block:: python - # with_columns_module.py - def a_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["a_from_df"] / 100 + # with_columns_module.py + def a_from_df(initial_df: pd.Series) -> pd.Series: + return initial_df["a_from_df"] / 100 def b_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["b_from_df"] / 100 + return initial_df["b_from_df"] / 100 - # the with_columns call - @with_columns( - *[my_module], - *[a_from_df], - columns_to_pass=["a_from_df", "b_from_df"], - select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], - ) - def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - # process, or just return unprocessed - ... + # the with_columns call + @with_columns( + *[my_module], + *[a_from_df], + columns_to_pass=["a_from_df", "b_from_df"], + select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + # process, or just return unprocessed + ... the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get overwritten. @@ -122,7 +122,8 @@ def __init__( If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you. :param select: The end nodes that represent columns to be appended to the original dataframe - via with_columns. Existing columns will be overridden. + via with_columns. Existing columns will be overridden. The selected nodes need to have the + corresponding column type, in this case pd.Series, to be appended to the original dataframe. :param namespace: The namespace of the nodes, so they don't clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you'll want to be careful about repeating it/reusing the nodes in other parts of the DAG.) @@ -145,14 +146,8 @@ def _create_column_nodes( ) -> List[node.Node]: output_type = params[inject_parameter] - if inspect.iscoroutinefunction(fn): - - async def temp_fn(**kwargs) -> Any: - return kwargs[inject_parameter] - else: - - def temp_fn(**kwargs) -> Any: - return kwargs[inject_parameter] + def temp_fn(**kwargs) -> Any: + return kwargs[inject_parameter] # We recreate the df node to use extract columns temp_node = node.Node( @@ -180,7 +175,7 @@ def get_initial_nodes( # If we don't have a specified dataframe we assume it's the first argument inject_parameter = list(sig.parameters.values())[0].name - with_columns_factory.validate_dataframe( + with_columns_base.validate_dataframe( fn=fn, inject_parameter=inject_parameter, params=params, @@ -200,23 +195,14 @@ def get_subdag_nodes(self, config: Dict[str, Any]) -> Collection[node.Node]: def create_merge_node(self, fn: Callable, inject_parameter: str) -> node.Node: "Node that adds to / overrides columns for the original dataframe based on selected output." - if inspect.iscoroutinefunction(fn): - - async def new_callable(**kwargs) -> Any: - df = kwargs[inject_parameter] - columns_to_append = {} - for column in self.select: - columns_to_append[column] = kwargs[column] - return df.assign(**columns_to_append) - else: - def new_callable(**kwargs) -> Any: - df = kwargs[inject_parameter] - columns_to_append = {} - for column in self.select: - columns_to_append[column] = kwargs[column] + def new_callable(**kwargs) -> Any: + df = kwargs[inject_parameter] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] - return df.assign(**columns_to_append) + return df.assign(**columns_to_append) column_type = registry.get_column_type_from_df_type(self.dataframe_type) input_map = {column: column_type for column in self.select} diff --git a/hamilton/plugins/h_polars.py b/hamilton/plugins/h_polars.py index bb67eaaac..8a9f3457d 100644 --- a/hamilton/plugins/h_polars.py +++ b/hamilton/plugins/h_polars.py @@ -17,7 +17,7 @@ # TODO -- determine the best place to put this code from hamilton import base, node, registry from hamilton.function_modifiers.expanders import extract_columns -from hamilton.function_modifiers.recursive import subdag, with_columns_factory +from hamilton.function_modifiers.recursive import subdag, with_columns_base from hamilton.plugins.polars_extensions import DATAFRAME_TYPE @@ -73,7 +73,7 @@ def output_type(self) -> Type: # Do we need this here? -class with_columns(with_columns_factory): +class with_columns(with_columns_base): """Initializes a with_columns decorator for polars. This allows you to efficiently run groups of map operations on a dataframe. We support @@ -130,23 +130,23 @@ def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: .. code-block:: python - # with_columns_module.py - def a_from_df() -> pl.Expr: - return pl.col(a).alias("a") / 100 + # with_columns_module.py + def a_from_df() -> pl.Expr: + return pl.col(a).alias("a") / 100 - def b_from_df() -> pl.Expr: - return pl.col(b).alias("b") / 100 + def b_from_df() -> pl.Expr: + return pl.col(b).alias("b") / 100 - # the with_columns call - @with_columns( - *[my_module], - pass_dataframe_as="initial_df", - select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], - ) - def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: - # process, or just return unprocessed - ... + # the with_columns call + @with_columns( + *[my_module], + pass_dataframe_as="initial_df", + select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: + # process, or just return unprocessed + ... the above would output a dataframe where the two columns ``a`` and ``b`` get overwritten. @@ -172,8 +172,8 @@ def __init__( If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you. :param select: The end nodes that represent columns to be appended to the original dataframe - via with_columns. The length of each column has to match the original dataframe length. - Existing columns will be overridden. + via with_columns. Existing columns will be overridden. The selected nodes need to have the + corresponding column type, in this case pl.Series, to be appended to the original dataframe. :param namespace: The namespace of the nodes, so they don't clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you'll want to be careful about repeating it/reusing the nodes in other parts of the DAG.) @@ -225,7 +225,7 @@ def get_initial_nodes( # If we don't have a specified dataframe we assume it's the first argument inject_parameter = list(sig.parameters.values())[0].name - with_columns_factory.validate_dataframe( + with_columns_base.validate_dataframe( fn=fn, inject_parameter=inject_parameter, params=params, diff --git a/hamilton/plugins/h_polars_lazyframe.py b/hamilton/plugins/h_polars_lazyframe.py index 9ac0b99ab..585ea4a0a 100644 --- a/hamilton/plugins/h_polars_lazyframe.py +++ b/hamilton/plugins/h_polars_lazyframe.py @@ -6,7 +6,7 @@ from hamilton import base, node, registry from hamilton.function_modifiers.expanders import extract_columns -from hamilton.function_modifiers.recursive import subdag, with_columns_factory +from hamilton.function_modifiers.recursive import subdag, with_columns_base from hamilton.plugins.polars_lazyframe_extensions import DATAFRAME_TYPE @@ -52,7 +52,7 @@ def output_type(self) -> Type: return pl.LazyFrame -class with_columns(with_columns_factory): +class with_columns(with_columns_base): """Initializes a with_columns decorator for polars. This allows you to efficiently run groups of map operations on a dataframe. We support @@ -107,23 +107,23 @@ def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: .. code-block:: python - # with_columns_module.py - def a_from_df() -> pl.Expr: - return pl.col(a).alias("a") / 100 + # with_columns_module.py + def a_from_df() -> pl.Expr: + return pl.col(a).alias("a") / 100 - def b_from_df() -> pd.Expr: - return pl.col(a).alias("b") / 100 + def b_from_df() -> pd.Expr: + return pl.col(a).alias("b") / 100 - # the with_columns call - @with_columns( - *[my_module], - pass_dataframe_as="initial_df", - select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], - ) - def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: - # process, or just return unprocessed - ... + # the with_columns call + @with_columns( + *[my_module], + pass_dataframe_as="initial_df", + select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame: + # process, or just return unprocessed + ... the above would output a dataframe where the two columns ``a`` and ``b`` get overwritten. @@ -149,8 +149,8 @@ def __init__( If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you. :param select: The end nodes that represent columns to be appended to the original dataframe - via with_columns. The length of each column has to match the original dataframe length. - Existing columns will be overridden. + via with_columns. Existing columns will be overridden. The selected nodes need to have the + corresponding column type, in this case pl.Expr, to be appended to the original dataframe. :param namespace: The namespace of the nodes, so they don't clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you'll want to be careful about repeating it/reusing the nodes in other parts of the DAG.) @@ -202,7 +202,7 @@ def get_initial_nodes( # If we don't have a specified dataframe we assume it's the first argument inject_parameter = list(sig.parameters.values())[0].name - with_columns_factory.validate_dataframe( + with_columns_base.validate_dataframe( fn=fn, inject_parameter=inject_parameter, params=params, diff --git a/plugin_tests/h_pandas/test_with_columns.py b/plugin_tests/h_pandas/test_with_columns.py index f1673d158..fe69db5ca 100644 --- a/plugin_tests/h_pandas/test_with_columns.py +++ b/plugin_tests/h_pandas/test_with_columns.py @@ -2,7 +2,7 @@ import pytest from hamilton import driver, node -from hamilton.function_modifiers.base import InvalidDecoratorException, NodeInjector +from hamilton.function_modifiers.base import NodeInjector from hamilton.plugins.h_pandas import with_columns from .resources import with_columns_end_to_end @@ -107,7 +107,7 @@ def target_fn(upstream_df: int) -> pd.DataFrame: injectable_params = NodeInjector.find_injectable_params([dummy_node]) # Raises error that is not pandas dataframe - with pytest.raises(InvalidDecoratorException): + with pytest.raises(NotImplementedError): decorator.get_initial_nodes(fn=target_fn, params=injectable_params) diff --git a/tests/function_modifiers/test_recursive.py b/tests/function_modifiers/test_recursive.py index c6f25be3a..e9b76686c 100644 --- a/tests/function_modifiers/test_recursive.py +++ b/tests/function_modifiers/test_recursive.py @@ -17,7 +17,7 @@ ) from hamilton.function_modifiers.base import NodeTransformer from hamilton.function_modifiers.dependencies import source -from hamilton.function_modifiers.recursive import _validate_config_inputs, with_columns_factory +from hamilton.function_modifiers.recursive import _validate_config_inputs, with_columns_base import tests.resources.reuse_subdag @@ -551,5 +551,5 @@ def test_columns_and_subdag_nodes_do_not_clash(): node_b = hamilton.node.Node.from_fn(dummy_fn_with_columns, name="a") node_c = hamilton.node.Node.from_fn(dummy_fn_with_columns, name="c") - assert not with_columns_factory._check_for_duplicates([node_a, node_c]) - assert with_columns_factory._check_for_duplicates([node_a, node_b, node_c]) + assert not with_columns_base.contains_duplicates([node_a, node_c]) + assert with_columns_base.contains_duplicates([node_a, node_b, node_c])