Skip to content

Commit

Permalink
fix(server): Support combination steps in remove_void_conditions_from…
Browse files Browse the repository at this point in the history
…_filter_steps (#1970)

Signed-off-by: Luka Peschke <[email protected]>
  • Loading branch information
lukapeschke authored Dec 19, 2023
1 parent 4dc35e4 commit 56a6350
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 2 deletions.
4 changes: 4 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- `remove_void_conditions_from_filter_steps` now properly recurses into `append` and `join` steps.

## [0.40.0] - 2023-12-15

### Breaking
Expand Down
31 changes: 30 additions & 1 deletion server/src/weaverbird/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from weaverbird.pipeline.steps.domain import DomainStepWithRef
from weaverbird.pipeline.steps.hierarchy import HierarchyStep
from weaverbird.pipeline.steps.join import JoinStepWithRef
from weaverbird.pipeline.steps.utils.combination import ReferenceResolver
from weaverbird.pipeline.steps.utils.combination import PipelineOrDomainName, ReferenceResolver

from .steps import (
AbsoluteValueStep,
Expand Down Expand Up @@ -245,6 +245,29 @@ def _remove_void_from_condition(condition: Condition) -> Condition | None:
return condition


def _remove_void_condition_from_join_step(
step: JoinStepWithVariable | JoinStep,
) -> JoinStep | JoinStepWithVariable | None:
if isinstance(step.right_pipeline, str):
return step
cleaned_steps = remove_void_conditions_from_filter_steps(step.right_pipeline)
return step.__class__(**{**step.model_dump(), "right_pipeline": cleaned_steps}) if cleaned_steps else None


def _remove_void_condition_from_append_step(
step: AppendStep | AppendStepWithVariable,
) -> AppendStep | AppendStepWithVariable | None:
cleaned_pipelines: list[PipelineOrDomainName] = []
for pipeline in step.pipelines:
if isinstance(pipeline, str):
cleaned_pipelines.append(pipeline)
else:
if cleaned_pipeline := remove_void_conditions_from_filter_steps(pipeline):
cleaned_pipelines.append(cleaned_pipeline)

return step.__class__(pipelines=cleaned_pipelines) if cleaned_pipelines else None


def remove_void_conditions_from_filter_steps(
steps: list[PipelineStepWithVariables | PipelineStep],
) -> list[PipelineStepWithVariables | PipelineStep]:
Expand All @@ -258,6 +281,12 @@ def remove_void_conditions_from_filter_steps(
if isinstance(step, FilterStep):
if (condition := _remove_void_from_condition(step.condition)) is not None:
final_steps.append(FilterStep(condition=condition))
elif isinstance(step, JoinStep | JoinStepWithVariable):
if (clean_step := _remove_void_condition_from_join_step(step)) is not None:
final_steps.append(clean_step)
elif isinstance(step, AppendStep | AppendStepWithVariable):
if (clean_step := _remove_void_condition_from_append_step(step)) is not None:
final_steps.append(clean_step)
else:
final_steps.append(step)

Expand Down
110 changes: 109 additions & 1 deletion server/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
from jinja2.nativetypes import NativeEnvironment
from pydantic import BaseModel
from toucan_connectors.common import nosql_apply_parameters_to_query
from weaverbird.pipeline.conditions import ComparisonCondition
from weaverbird.pipeline.pipeline import (
Pipeline,
PipelineStep,
PipelineStepWithVariables,
PipelineWithVariables,
remove_void_conditions_from_filter_steps,
remove_void_conditions_from_mongo_steps,
)
from weaverbird.pipeline.steps import DomainStep, RollupStep
from weaverbird.pipeline.steps import DomainStep, FilterStep, JoinStep, RollupStep, TrimStep
from weaverbird.pipeline.steps.aggregate import Aggregation
from weaverbird.pipeline.steps.append import AppendStep
from weaverbird.pipeline.steps.text import TextStep


class Case(BaseModel):
Expand Down Expand Up @@ -490,3 +495,106 @@ def test_remove_void_conditions_from_mongo_step_should_only_apply_to_match_opera
assert remove_void_conditions_from_mongo_steps(step) == step
# A list of steps should start with a match-all operation
assert remove_void_conditions_from_mongo_steps([step]) == [{"$match": {}}, step]


@pytest.mark.parametrize(
"steps,expected_steps",
[
(
[
DomainStep(domain="domain__beers"),
FilterStep(condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__")),
JoinStep(
type="left",
on=[("name", "name")],
right_pipeline=[
DomainStep(domain="domain__beers"),
TrimStep(columns=["name"]),
FilterStep(condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__")),
TextStep(name="text", text="var_two", new_column="var_two"),
],
),
AppendStep(
pipelines=[
[
FilterStep(
condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__")
),
],
"some-domain",
[
DomainStep(domain="domain__beers"),
TrimStep(columns=["name"]),
FilterStep(
condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__")
),
],
"__VOID__",
]
),
],
[
# First filter step should have been removed
DomainStep(domain="domain__beers"),
JoinStep(
type="left",
on=[("name", "name")],
right_pipeline=[
# First filter step should have been removed here too
DomainStep(domain="domain__beers"),
TrimStep(columns=["name"]),
TextStep(name="text", text="var_two", new_column="var_two"),
],
),
AppendStep(
pipelines=[
# First pipeline should have been removed
# regular domains should be left untouched
"some-domain",
[
DomainStep(domain="domain__beers"),
TrimStep(columns=["name"]),
# filter step should have been removed from here too
],
# regular domains should be left untouched
"__VOID__",
]
),
],
),
(
[
JoinStep(
type="left",
on=[("name", "name")],
right_pipeline=[
FilterStep(condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__"))
],
)
],
# The join step should be completely removed
[],
),
(
[JoinStep(type="left", on=[("name", "name")], right_pipeline="__VOID__")],
# The join step should be left untouched
[JoinStep(type="left", on=[("name", "name")], right_pipeline="__VOID__")],
),
(
[
AppendStep(
pipelines=[
[FilterStep(condition=ComparisonCondition(column="beer_kind", operator="eq", value="__VOID__"))]
]
)
],
# the append step should have been completely removed
[],
),
],
)
def test_remove_void_conditions_from_filter_steps_with_combinations(
steps: list[PipelineStep | PipelineStepWithVariables],
expected_steps: list[PipelineStep | PipelineStepWithVariables],
) -> None:
assert remove_void_conditions_from_filter_steps(steps) == expected_steps

0 comments on commit 56a6350

Please sign in to comment.