Skip to content

Commit

Permalink
Node grouping API (#4427)
Browse files Browse the repository at this point in the history
* Add deployment related attributes

Signed-off-by: Ankita Katiyar <[email protected]>

* Update with feedback

Signed-off-by: Ankita Katiyar <[email protected]>

* Minor formatting

Signed-off-by: Ankita Katiyar <[email protected]>

* Add test

Signed-off-by: Ankita Katiyar <[email protected]>

* Update test

Signed-off-by: Ankita Katiyar <[email protected]>

* Update with feedback

Signed-off-by: Ankita Katiyar <[email protected]>

* break up test

Signed-off-by: Ankita Katiyar <[email protected]>

* Update docstrings + release notes

Signed-off-by: Ankita Katiyar <[email protected]>

* please sphinx

Signed-off-by: Ankita Katiyar <[email protected]>

* linkcheck

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
  • Loading branch information
ankatiyar authored Feb 12, 2025
1 parent c680722 commit 1093a3e
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 1 deletion.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Major features and improvements
* Added `KedroDataCatalog.filter()` to filter datasets by name and type.
* Added `Pipeline.grouped_nodes_by_namespace` property which returns a dictionary of nodes grouped by namespace, intended to be used by plugins to facilitate deployment of namespaced nodes together.

## Bug fixes and other changes
* Updated `_LazyDataset` representation when printing `KedroDataCatalog`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ When writing exploratory code, it’s tempting to hard code values to save time,
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=3)
```

[Good software engineering practice](https://towardsdatascience.com/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml.
[Good software engineering practice](https://medium.com/towards-data-science/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml.


```yaml
Expand Down
29 changes: 29 additions & 0 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,35 @@ def grouped_nodes(self) -> list[list[Node]]:

return [list(group) for group in self._toposorted_groups]

@property
def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]:
"""Return a dictionary of the pipeline nodes grouped by namespace with
information about the nodes, their type, and dependencies. The structure of the dictionary is:
{'node_name/namespace_name' : {'name': 'node_name/namespace_name','type': 'namespace' or 'node','nodes': [list of nodes],'dependencies': [list of dependencies]}}
This property is intended to be used by deployment plugins to group nodes by namespace.
"""
grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict)
for node in self.nodes:
key = node.namespace or node.name
if key not in grouped_nodes:
grouped_nodes[key] = {}
grouped_nodes[key]["name"] = key
grouped_nodes[key]["type"] = "namespace" if node.namespace else "node"
grouped_nodes[key]["nodes"] = []
grouped_nodes[key]["dependencies"] = set()
grouped_nodes[key]["nodes"].append(node)
dependencies = set()
for parent in self.node_dependencies[node]:
if parent.namespace and parent.namespace != key:
dependencies.add(parent.namespace)
elif parent.namespace and parent.namespace == key:
continue
else:
dependencies.add(parent.name)
grouped_nodes[key]["dependencies"].update(dependencies)
return grouped_nodes

def only_nodes(self, *node_names: str) -> Pipeline:
"""Create a new ``Pipeline`` which will contain only the specified
nodes by name.
Expand Down
140 changes: 140 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,118 @@ def test_node_dependencies(self, complex_pipeline):
}
assert actual == expected

@pytest.mark.parametrize(
"pipeline_name, expected",
[
("pipeline_with_namespace_simple", ["namespace_1", "namespace_2"]),
(
"pipeline_with_namespace_partial",
["namespace_1", "node_3", "namespace_2", "node_6"],
),
],
)
def test_node_grouping_by_namespace_name_type(
self, request, pipeline_name, expected
):
"""Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure:
{
'node_name/namespace_name' : {
'name': 'node_name/namespace_name',
'type': 'namespace' or 'node',
'nodes': [list of nodes],
'dependencies': [list of dependencies]}
}
This test checks for the 'name' and 'type' keys in the dictionary.
"""
p = request.getfixturevalue(pipeline_name)
grouped = p.grouped_nodes_by_namespace
assert set(grouped.keys()) == set(expected)
for key in expected:
assert grouped[key]["name"] == key
assert key.startswith(grouped[key]["type"])

@pytest.mark.parametrize(
"pipeline_name, expected",
[
(
"pipeline_with_namespace_simple",
{
"namespace_1": [
"namespace_1.node_1",
"namespace_1.node_2",
"namespace_1.node_3",
],
"namespace_2": [
"namespace_2.node_4",
"namespace_2.node_5",
"namespace_2.node_6",
],
},
),
(
"pipeline_with_namespace_partial",
{
"namespace_1": ["namespace_1.node_1", "namespace_1.node_2"],
"node_3": ["node_3"],
"namespace_2": ["namespace_2.node_4", "namespace_2.node_5"],
"node_6": ["node_6"],
},
),
],
)
def test_node_grouping_by_namespace_nodes(self, request, pipeline_name, expected):
"""Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure:
{
'node_name/namespace_name' : {
'name': 'node_name/namespace_name',
'type': 'namespace' or 'node',
'nodes': [list of nodes],
'dependencies': [list of dependencies]}
}
This test checks for the 'nodes' key in the dictionary which should be a list of nodes.
"""
p = request.getfixturevalue(pipeline_name)
grouped = p.grouped_nodes_by_namespace
for key, value in grouped.items():
names = [node.name for node in value["nodes"]]
assert set(names) == set(expected[key])

@pytest.mark.parametrize(
"pipeline_name, expected",
[
(
"pipeline_with_namespace_simple",
{"namespace_1": set(), "namespace_2": {"namespace_1"}},
),
(
"pipeline_with_namespace_partial",
{
"namespace_1": set(),
"node_3": {"namespace_1"},
"namespace_2": {"node_3"},
"node_6": {"namespace_2"},
},
),
],
)
def test_node_grouping_by_namespace_dependencies(
self, request, pipeline_name, expected
):
"""Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure:
{
'node_name/namespace_name' : {
'name': 'node_name/namespace_name',
'type': 'namespace' or 'node',
'nodes': [list of nodes],
'dependencies': [list of dependencies]}
}
This test checks for the 'dependencies' in the dictionary which is a list of nodes/namespaces the group depends on.
"""
p = request.getfixturevalue(pipeline_name)
grouped = p.grouped_nodes_by_namespace
for key, value in grouped.items():
assert set(value["dependencies"]) == set(expected[key])


@pytest.fixture
def pipeline_with_circle():
Expand Down Expand Up @@ -758,6 +870,34 @@ def pipeline_with_namespaces():
)


@pytest.fixture
def pipeline_with_namespace_simple():
return modular_pipeline(
[
node(identity, "A", "B", name="node_1", namespace="namespace_1"),
node(identity, "B", "C", name="node_2", namespace="namespace_1"),
node(identity, "C", "D", name="node_3", namespace="namespace_1"),
node(identity, "D", "E", name="node_4", namespace="namespace_2"),
node(identity, "E", "F", name="node_5", namespace="namespace_2"),
node(identity, "F", "G", name="node_6", namespace="namespace_2"),
]
)


@pytest.fixture
def pipeline_with_namespace_partial():
return modular_pipeline(
[
node(identity, "A", "B", name="node_1", namespace="namespace_1"),
node(identity, "B", "C", name="node_2", namespace="namespace_1"),
node(identity, "C", "D", name="node_3"),
node(identity, "D", "E", name="node_4", namespace="namespace_2"),
node(identity, "E", "F", name="node_5", namespace="namespace_2"),
node(identity, "F", "G", name="node_6"),
]
)


class TestPipelineFilter:
def test_no_filters(self, complex_pipeline):
filtered_pipeline = complex_pipeline.filter()
Expand Down

0 comments on commit 1093a3e

Please sign in to comment.