Skip to content

Commit

Permalink
Keep with_columns in abstract factory pattern
Browse files Browse the repository at this point in the history
Removed the registry dependency and single dispatch for now.
  • Loading branch information
jernejfrank committed Nov 22, 2024
1 parent 343b5af commit 0515c7e
Show file tree
Hide file tree
Showing 23 changed files with 1,557 additions and 1,345 deletions.
24 changes: 21 additions & 3 deletions docs/reference/decorators/with_columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,32 @@
with_columns
=======================

Pandas and Polars
We support the `with_columns` operation that appends the results as new columns to the original dataframe for several libraries:

Pandas
-----------------------

**Reference Documentation**

.. autoclass:: hamilton.plugins.h_pandas.with_columns
:special-members: __init__


Polar (Eager)
-----------------------

We have a ``with_columns`` option to run operations on columns of a Pandas / Polars dataframe and append the results as new columns.
**Reference Documentation**

.. autoclass:: hamilton.plugins.h_polars.with_columns
:special-members: __init__


Polars (Lazy)
-----------------------

**Reference Documentation**

.. autoclass:: hamilton.function_modifiers.with_columns
.. autoclass:: hamilton.plugins.h_polars_lazyframe.with_columns
:special-members: __init__


Expand Down
620 changes: 309 additions & 311 deletions examples/pandas/with_columns/notebook.ipynb

Large diffs are not rendered by default.

1,185 changes: 598 additions & 587 deletions examples/polars/with_columns/notebook.ipynb

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@

subdag = recursive.subdag
parameterized_subdag = recursive.parameterized_subdag
with_columns = recursive.with_columns

# resolve/meta stuff -- power user features

Expand Down
171 changes: 62 additions & 109 deletions hamilton/function_modifiers/recursive.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
ParametrizedDependency,
UpstreamDependency,
)
from hamilton.function_modifiers.expanders import extract_columns


def assign_namespace(node_name: str, namespace: str) -> str:
Expand Down Expand Up @@ -631,15 +630,9 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L
return output


class with_columns(base.NodeInjector, abc.ABC):
"""Performs with_columns operation on a dataframe. This is used when you want to extract some
class with_columns_factory(base.NodeInjector, abc.ABC):
"""Factory for with_columns operation on a dataframe. This is used when you want to extract some
columns out of the dataframe, perform operations on them and then append to the original dataframe.
For now can be used with:
- Pandas
- Polars
Here's an example of calling it on a pandas dataframe -- if you've seen ``@subdag``, you should be familiar with
the concepts:
Expand Down Expand Up @@ -742,6 +735,25 @@ def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
return True
return False

@staticmethod
def validate_dataframe(
fn: Callable, inject_parameter: str, params: Dict[str, Type[Type]], required_type: Type
) -> None:
input_types = typing.get_type_hints(fn)
if inject_parameter not in params:
raise InvalidDecoratorException(
f"Function: {fn.__name__} does not have the parameter {inject_parameter} as a dependency. "
f"@with_columns requires the parameter names to match the function parameters. "
f"If you wish do not wish to use the first argument, please use `pass_dataframe_as` option. "
f"It might not be compatible with some other decorators."
)

if input_types[inject_parameter] != required_type:
raise InvalidDecoratorException(
"The selected dataframe parameter is not the correct dataframe type. "
f"You selected a parameter of type {input_types[inject_parameter]}, but we expect to get {required_type}"
)

def __init__(
self,
*load_from: Union[Callable, ModuleType],
Expand All @@ -750,6 +762,7 @@ def __init__(
select: List[str] = None,
namespace: str = None,
config_required: List[str] = None,
dataframe_type: Type = None,
):
"""Instantiates a ``@with_columns`` decorator.
Expand Down Expand Up @@ -795,119 +808,64 @@ def __init__(
self.namespace = namespace
self.config_required = config_required

def required_config(self) -> List[str]:
return self.config_required

def _create_column_nodes(
self, inject_parameter: str, params: Dict[str, Type[Type]]
) -> List[node.Node]:
output_type = params[inject_parameter]

if self.is_async:

async def temp_fn(**kwargs) -> Any:
return kwargs[inject_parameter]
else:

def temp_fn(**kwargs) -> Any:
return kwargs[inject_parameter]

# We recreate the df node to use extract columns
temp_node = node.Node(
name=inject_parameter,
typ=output_type,
callabl=temp_fn,
input_types={inject_parameter: output_type},
)
if dataframe_type is None:
raise InvalidDecoratorException(
"Please provide the dataframe type for this specific library."
)

extract_columns_decorator = extract_columns(*self.initial_schema)
self.dataframe_type = dataframe_type

out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn)
return out_nodes[1:]
def required_config(self) -> List[str]:
return self.config_required

def _get_inital_nodes(
@abc.abstractmethod
def get_initial_nodes(
self, fn: Callable, params: Dict[str, Type[Type]]
) -> Tuple[str, Collection[node.Node]]:
"""Selects the correct dataframe and optionally extracts out columns."""
initial_nodes = []
sig = inspect.signature(fn)
input_types = typing.get_type_hints(fn)
"""Preparation stage where columns get extracted into nodes. In case `pass_dataframe_as` is
used, this should return an empty list (no column nodes) since the users will extract it
themselves.
:param fn: the function we are decorating. By using the inspect library you can get information.
about what arguments it has / find out the dataframe argument.
:param params: Dictionary of all the type names one wants to inject.
:return: name of the dataframe parameter and list of nodes representing the extracted columns (can be empty).
"""
pass

if self.dataframe_subdag_param is not None:
inject_parameter = self.dataframe_subdag_param
else:
# If we don't have a specified dataframe we assume it's the first argument
inject_parameter = list(sig.parameters.values())[0].name
@abc.abstractmethod
def get_subdag_nodes(self, config: Dict[str, Any]) -> Collection[node.Node]:
"""Creates subdag from the passed in module / functions.
if inject_parameter not in params:
raise base.InvalidDecoratorException(
f"Function: {fn.__name__} does not have the parameter {inject_parameter} as a dependency. "
f"@with_columns requires the parameter names to match the function parameters. "
f"If you wish do not wish to use the first argument, please use `pass_dataframe_as` option. "
f"It might not be compatible with some other decorators."
)
:param config: Configuration with which the DAG was constructed.
:return: the subdag as a list of nodes.
"""
pass

dataframe_type = input_types[inject_parameter]
initial_nodes = (
[]
if self.dataframe_subdag_param is not None
else self._create_column_nodes(inject_parameter=inject_parameter, params=params)
)
@abc.abstractmethod
def create_merge_node(self, fn: Callable, inject_parameter: str) -> node.Node:
"""Combines the origanl dataframe with selected columns. This should produce a
dataframe output that is injected into the decorated function with new columns
appended and existing columns overriden.
return inject_parameter, initial_nodes, dataframe_type

def create_merge_node(
self, upstream_node: str, node_name: str, dataframe_type: Type
) -> node.Node:
"Node that adds to / overrides columns for the original dataframe based on selected output."
if self.is_async:

async def new_callable(**kwargs) -> Any:
df = kwargs[upstream_node]
columns_to_append = {}
for column in self.select:
columns_to_append[column] = kwargs[column]
new_df = registry.with_columns(df, columns_to_append)
return new_df
else:

def new_callable(**kwargs) -> Any:
df = kwargs[upstream_node]
columns_to_append = {}
for column in self.select:
columns_to_append[column] = kwargs[column]

new_df = registry.with_columns(df, columns_to_append)
return new_df

column_type = registry.get_column_type_from_df_type(dataframe_type)
input_map = {column: column_type for column in self.select}
input_map[upstream_node] = dataframe_type

return node.Node(
name=node_name,
typ=dataframe_type,
callabl=new_callable,
input_types=input_map,
)
:param inject_parameter: the name of the original dataframe that.
:return: the new dataframe with the columns appended / overwritten.
"""
pass

def inject_nodes(
self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable
) -> Tuple[List[node.Node], Dict[str, str]]:
self.is_async = inspect.iscoroutinefunction(fn)
namespace = fn.__name__ if self.namespace is None else self.namespace

inject_parameter, initial_nodes, dataframe_type = self._get_inital_nodes(
fn=fn, params=params
)

subdag_nodes = subdag.collect_nodes(config, self.subdag_functions)
inject_parameter, initial_nodes = self.get_initial_nodes(fn=fn, params=params)
subdag_nodes = self.get_subdag_nodes(config=config)

# TODO: for now we restrict that if user wants to change columns that already exist, he needs to
# pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the
# initial node names with the ongoing ones that have a column argument, we can also allow in place
# changes when using columns_to_pass
if with_columns._check_for_duplicates(initial_nodes + subdag_nodes):
if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes):
raise ValueError(
"You can only specify columns once. You used `columns_to_pass` and we "
"extract the columns for you. In this case they cannot be overwritten -- only new columns get "
Expand All @@ -927,16 +885,11 @@ def inject_nodes(
self.select = [
sink_node.name
for sink_node in pruned_nodes
if sink_node.type == registry.get_column_type_from_df_type(dataframe_type)
if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type)
]

merge_node = self.create_merge_node(
inject_parameter, node_name="__append", dataframe_type=dataframe_type
)
merge_node = self.create_merge_node(fn=fn, inject_parameter=inject_parameter)

output_nodes = initial_nodes + pruned_nodes + [merge_node]
output_nodes = subdag.add_namespace(output_nodes, namespace)
return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)}

def validate(self, fn: Callable):
pass
7 changes: 0 additions & 7 deletions hamilton/plugins/dask_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ def fill_with_scalar_dask(df: dd.DataFrame, column_name: str, value: Any) -> dd.
return df


@registry.with_columns.register(dd.DataFrame)
def with_columns_dask(df: dd.DataFrame, columns: dd.Series) -> dd.DataFrame:
raise NotImplementedError(
"As of Hamilton version 1.83.1, with_columns for Dask isn't supported."
)


def register_types():
"""Function to register the types for this extension."""
registry.register_types("dask", DATAFRAME_TYPE, COLUMN_TYPE)
Expand Down
7 changes: 0 additions & 7 deletions hamilton/plugins/geopandas_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ def fill_with_scalar_geopandas(
return df


@registry.with_columns.register(gpd.GeoDataFrame)
def with_columns_geopandas(df: gpd.GeoDataFrame, columns: gpd.GeoSeries) -> gpd.GeoDataFrame:
raise NotImplementedError(
"As of Hamilton version 1.83.1, with_columns for geopandas isn't supported."
)


def register_types():
"""Function to register the types for this extension."""
registry.register_types("geopandas", DATAFRAME_TYPE, COLUMN_TYPE)
Expand Down
Loading

0 comments on commit 0515c7e

Please sign in to comment.