Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): references support in Pipeline (remove PipelineWithRefs type) TCTC-7763 #2007

Merged
merged 13 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions docs/_docs/tech/python-package.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ steps -> 1 -> name

### Pipeline combinations: references

Pipelines can reference other pipelines in certain steps.
These references are accepted only in the model `PipelineWithRefs`.
This model provides a method to find and replace recursively all references.
Pipelines can reference other pipelines in certain steps: domain, append, and join.
A method `resolve_references` is provided to find and replace recursively all references.
It must be called before trying to execute or translate a pipeline.


### Pipeline : variables

Some fields can contain variables instead of the actual value.
They are accepted only in the model `PipelineWithVariable`.
They are accepted only in the model `PipelineWithVariables`.
This model provides a method to replace all variables by their value.
It must be called before trying to execute or translate a pipeline.

Expand Down Expand Up @@ -138,15 +136,7 @@ As of today, no translator backend exists for python. We plan to implement one f
### Summary

```
┌────────────────────────────┐
│ │
│ pipeline with references │
│ │
└──────────────┬─────────────┘
│ PipelineWithRefs.resolve_references

┌────────────────────────────┐
│ │
│ pipeline with variables │
Expand All @@ -162,7 +152,7 @@ As of today, no translator backend exists for python. We plan to implement one f
│ │
└──────────────┬─────────────┘
iinput dataframes
input dataframes
│ │
│ │
OR────────────────────────────────────────┐ │
Expand Down
6 changes: 6 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog (weaverbird python package)

## Unreleased

### Changed

- Remove `PipelineWithRefs`. Instead, support method `resolve_references` on types `Pipeline` and `PipelineWithVariables`.

## [0.41.3] - 2024-01-26

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions server/playground.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from weaverbird.backends.pypika_translator.translate import (
translate_pipeline as pypika_translate_pipeline,
)
from weaverbird.pipeline.pipeline import Pipeline, PipelineWithRefs
from weaverbird.pipeline.pipeline import Pipeline, PipelineWithVariables
from weaverbird.pipeline.steps import DomainStep
from weaverbird.pipeline.steps.utils.combination import Reference

Expand Down Expand Up @@ -148,7 +148,7 @@ async def prepare_pipeline(req: Request) -> Pipeline:
Validate the pipeline sent in the body of the request, and prepare it for translation (resolve references and
interpolate variables).
"""
pipeline_with_refs = PipelineWithRefs(steps=await req.get_json()) # Validation
pipeline_with_refs = PipelineWithVariables(steps=await req.get_json()) # Validation
pipeline_with_vars = await pipeline_with_refs.resolve_references(dummy_reference_resolver)
pipeline = pipeline_with_vars.render(VARIABLES, nosql_apply_parameters_to_query)
return pipeline
Expand Down Expand Up @@ -347,7 +347,7 @@ async def handle_mongo_backend_request():
elif request.method == "POST":
try:
req_params = await parse_request_json(request)
pipeline_with_refs = PipelineWithRefs(steps=req_params["pipeline"]) # Validation
pipeline_with_refs = PipelineWithVariables(steps=req_params["pipeline"]) # Validation
pipeline = await pipeline_with_refs.resolve_references(dummy_reference_resolver)
mongo_query = mongo_translate_pipeline(pipeline)

Expand Down
106 changes: 52 additions & 54 deletions server/src/weaverbird/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from collections.abc import Iterable
from typing import Annotated, Any
from sys import version_info
from typing import Annotated, Any, TypeVar

if version_info < (3, 11): # noqa: UP036
from typing_extensions import Self # noqa: UP035
else:
from typing import Self

from pydantic import BaseModel, Field

Expand All @@ -12,11 +18,8 @@
InclusionCondition,
MatchCondition,
)
from weaverbird.pipeline.steps.append import AppendStepWithRefs
from weaverbird.pipeline.steps.domain import DomainStepWithRef
from weaverbird.pipeline.steps.hierarchy import HierarchyStep
from weaverbird.pipeline.steps.join import JoinStepWithRef
from weaverbird.pipeline.steps.utils.combination import PipelineOrDomainName, ReferenceResolver
from weaverbird.pipeline.steps.utils.combination import PipelineOrDomainName, Reference, ReferenceResolver

from .steps import (
AbsoluteValueStep,
Expand Down Expand Up @@ -165,6 +168,23 @@ def model_dump(self, *, exclude_none: bool = True, **kwargs) -> dict:
def dict(self, *, exclude_none: bool = True, **kwargs) -> dict:
return self.model_dump(exclude_none=exclude_none, **kwargs)

async def resolve_references(self, reference_resolver: ReferenceResolver) -> Self | None:
"""
Walk the pipeline steps and replace any reference by its corresponding pipeline.
The sub-pipelines added should also be handled, so that they will be no references anymore in the result.
"""
resolved_steps: list[PipelineStep | PipelineStepWithVariables] = []
for step in self.steps:
resolved_step = (
await step.resolve_references(reference_resolver, self) if hasattr(step, "resolve_references") else step
)
if isinstance(resolved_step, self.__class__):
resolved_steps.extend(resolved_step.steps)
elif resolved_step is not None: # None means the step should be skipped
resolved_steps.append(resolved_step)

return self.__class__(steps=resolved_steps)


PipelineStepWithVariables = Annotated[
AbsoluteValueStepWithVariable
Expand Down Expand Up @@ -245,42 +265,55 @@ def _remove_void_from_condition(condition: Condition) -> Condition | None:
return condition


JoinStepMaybeWithVariables = TypeVar("JoinStepMaybeWithVariables", bound=JoinStep | JoinStepWithVariable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice effort on typing ✨



def _remove_void_condition_from_join_step(
step: JoinStepWithVariable | JoinStep,
) -> JoinStep | JoinStepWithVariable | None:
if isinstance(step.right_pipeline, str):
step: JoinStepMaybeWithVariables,
) -> JoinStepMaybeWithVariables | None:
if isinstance(step.right_pipeline, str | Reference):
return step
cleaned_steps = remove_void_conditions_from_filter_steps(step.right_pipeline)
return step.__class__(**{**step.model_dump(), "right_pipeline": cleaned_steps}) if cleaned_steps else None
elif isinstance(step.right_pipeline, list):
cleaned_steps = remove_void_conditions_from_filter_steps(step.right_pipeline)
return step.__class__(**{**step.model_dump(), "right_pipeline": cleaned_steps}) if cleaned_steps else None
return None


AppendStepMaybeWithVariables = TypeVar("AppendStepMaybeWithVariables", bound=AppendStep | AppendStepWithVariable)


def _remove_void_condition_from_append_step(
step: AppendStep | AppendStepWithVariable,
) -> AppendStep | AppendStepWithVariable | None:
step: AppendStepMaybeWithVariables,
) -> AppendStepMaybeWithVariables | None:
cleaned_pipelines: list[PipelineOrDomainName] = []
for pipeline in step.pipelines:
if isinstance(pipeline, str):
if isinstance(pipeline, str | Reference):
cleaned_pipelines.append(pipeline)
else:
elif isinstance(pipeline, list):
if cleaned_pipeline := remove_void_conditions_from_filter_steps(pipeline):
cleaned_pipelines.append(cleaned_pipeline)

return step.__class__(pipelines=cleaned_pipelines) if cleaned_pipelines else None


PipelineStepMaybeWithVariables = TypeVar(
"PipelineStepMaybeWithVariables", bound=PipelineStep | PipelineStepWithVariables
)


def remove_void_conditions_from_filter_steps(
steps: list[PipelineStepWithVariables | PipelineStep],
) -> list[PipelineStepWithVariables | PipelineStep]:
steps: list[PipelineStepMaybeWithVariables],
) -> list[PipelineStepMaybeWithVariables]:
"""
This method will remove all FilterStep with conditions having "__VOID__"
in them. either the "value" key or the "column" key.
"""

final_steps = []
for step in steps:
if isinstance(step, FilterStep):
if isinstance(step, FilterStep | FilterStepWithVariables):
if (condition := _remove_void_from_condition(step.condition)) is not None:
final_steps.append(FilterStep(condition=condition))
final_steps.append(step.__class__(condition=condition))
elif isinstance(step, JoinStep | JoinStepWithVariable):
if (clean_step := _remove_void_condition_from_join_step(step)) is not None:
final_steps.append(clean_step)
Expand Down Expand Up @@ -417,7 +450,7 @@ def remove_void_conditions_from_mongo_steps(


# TODO move to a dedicated variables module
class PipelineWithVariables(BaseModel):
class PipelineWithVariables(Pipeline):
steps: list[PipelineStepWithVariables | PipelineStep]

def render(self, variables: dict[str, Any], renderer) -> Pipeline:
Expand All @@ -429,41 +462,6 @@ def render(self, variables: dict[str, Any], renderer) -> Pipeline:
return Pipeline(steps=steps_rendered)


PipelineStepWithRefs = Annotated[
AppendStepWithRefs | DomainStepWithRef | JoinStepWithRef,
Field(discriminator="name"),
]


class PipelineWithRefs(BaseModel):
"""
Represents a pipeline in which some steps can reference some other pipelines using the syntax
`{"type": "ref", "uid": "..."}`
"""

steps: list[PipelineStepWithRefs | PipelineStep | PipelineStepWithVariables]

async def resolve_references(self, reference_resolver: ReferenceResolver) -> PipelineWithVariables | None:
"""
Walk the pipeline steps and replace any reference by its corresponding pipeline.
The sub-pipelines added should also be handled, so that they will be no references anymore in the result.
"""
resolved_steps: list[PipelineStepWithRefs | PipelineStepWithVariables | PipelineStep] = []
for step in self.steps:
resolved_step = (
await step.resolve_references(reference_resolver) if hasattr(step, "resolve_references") else step
)
if isinstance(resolved_step, PipelineWithVariables):
resolved_steps.extend(resolved_step.steps)
elif resolved_step is not None: # None means the step should be skipped
resolved_steps.append(resolved_step)

return PipelineWithVariables(steps=resolved_steps)


PipelineWithVariables.model_rebuild()


class ReferenceUnresolved(Exception):
"""
Raised when a mandatory reference is not resolved
Expand Down
6 changes: 3 additions & 3 deletions server/src/weaverbird/pipeline/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .absolutevalue import AbsoluteValueStep, AbsoluteValueStepWithVariable
from .addmissingdates import AddMissingDatesStep, AddMissingDatesStepWithVariables
from .aggregate import AggregateStep, AggregateStepWithVariables, Aggregation
from .append import AppendStep, AppendStepWithRefs, AppendStepWithVariable
from .append import AppendStep, AppendStepWithVariable
from .argmax import ArgmaxStep, ArgmaxStepWithVariable
from .argmin import ArgminStep, ArgminStepWithVariable
from .comparetext import CompareTextStep, CompareTextStepWithVariables
Expand All @@ -16,7 +16,7 @@
from .date_extract import DateExtractStep, DateExtractStepWithVariable
from .delete import DeleteStep
from .dissolve import DissolveStep
from .domain import DomainStep, DomainStepWithRef
from .domain import DomainStep
from .duplicate import DuplicateStep
from .duration import DurationStep, DurationStepWithVariable
from .evolution import EvolutionStep, EvolutionStepWithVariable
Expand All @@ -26,7 +26,7 @@
from .fromdate import FromdateStep
from .hierarchy import HierarchyStep
from .ifthenelse import IfthenelseStep, IfThenElseStepWithVariables
from .join import JoinStep, JoinStepWithRef, JoinStepWithVariable
from .join import JoinStep, JoinStepWithVariable
from .lowercase import LowercaseStep
from .moving_average import MovingAverageStep
from .percentage import PercentageStep
Expand Down
31 changes: 17 additions & 14 deletions server/src/weaverbird/pipeline/steps/append.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from typing import Literal
from typing import TYPE_CHECKING, Literal, Self, TypeVar

from weaverbird.pipeline.steps.utils.base import BaseStep
from weaverbird.pipeline.steps.utils.render_variables import StepWithVariablesMixin

if TYPE_CHECKING:
from weaverbird.pipeline.pipeline import Pipeline, PipelineWithVariables

PipelineType = TypeVar("PipelineType", bound=Pipeline | PipelineWithVariables)

from .utils.combination import (
PipelineOrDomainName,
PipelineWithRefsOrDomainNameOrReference,
PipelineOrDomainNameOrReference,
PipelineWithVariablesOrDomainNameOrReference,
ReferenceResolver,
resolve_if_reference,
)
Expand All @@ -16,22 +21,20 @@ class BaseAppendStep(BaseStep):


class AppendStep(BaseAppendStep):
pipelines: list[PipelineOrDomainName]


class AppendStepWithVariable(AppendStep, StepWithVariablesMixin):
...
pipelines: list[PipelineOrDomainNameOrReference]


class AppendStepWithRefs(BaseAppendStep):
pipelines: list[PipelineWithRefsOrDomainNameOrReference]

async def resolve_references(self, reference_resolver: ReferenceResolver) -> AppendStepWithVariable | None:
async def resolve_references(
self, reference_resolver: ReferenceResolver, parent_pipeline: "PipelineType"
) -> Self | None:
resolved_pipelines = [await resolve_if_reference(reference_resolver, p) for p in self.pipelines]
resolved_pipelines_without_nones = [p for p in resolved_pipelines if p is not None]
if len(resolved_pipelines_without_nones) == 0:
return None # skip the step
return AppendStepWithVariable(
return self.__class__(
name=self.name,
pipelines=resolved_pipelines_without_nones,
)


class AppendStepWithVariable(AppendStep, StepWithVariablesMixin):
pipelines: list[PipelineWithVariablesOrDomainNameOrReference]
23 changes: 9 additions & 14 deletions server/src/weaverbird/pipeline/steps/domain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Literal, Union
from typing import TYPE_CHECKING, Literal, TypeVar

from weaverbird.pipeline.steps.utils.base import BaseStep
from weaverbird.pipeline.steps.utils.combination import (
Expand All @@ -8,33 +8,28 @@
)

if TYPE_CHECKING:
from weaverbird.pipeline.pipeline import PipelineWithVariables
from weaverbird.pipeline.pipeline import Pipeline, PipelineWithVariables

PipelineType = TypeVar("PipelineType", bound=Pipeline | PipelineWithVariables)

class BaseDomainStep(BaseStep):
name: Literal["domain"] = "domain"


class DomainStep(BaseDomainStep):
domain: str


class DomainStepWithRef(BaseDomainStep):
class DomainStep(BaseStep):
name: Literal["domain"] = "domain"
domain: str | Reference

async def resolve_references(
self, reference_resolver: ReferenceResolver
) -> Union[DomainStep, "PipelineWithVariables"]:
self, reference_resolver: ReferenceResolver, parent_pipeline: "PipelineType"
) -> "DomainStep | PipelineType":
"""
This resolution can return a whole pipeline, which needs to replace the step.
Not that the resulting array must be flattened:
it should look like [step 1, step 2, step 3], not [[step 1, step 2], step 3]
"""
from weaverbird.pipeline.pipeline import PipelineWithRefs, ReferenceUnresolved
from weaverbird.pipeline.pipeline import ReferenceUnresolved

resolved = await resolve_if_reference(reference_resolver, self.domain)
if isinstance(resolved, list):
return await PipelineWithRefs(steps=resolved).resolve_references(reference_resolver)
return await parent_pipeline.__class__(steps=resolved).resolve_references(reference_resolver)
elif resolved is None:
raise ReferenceUnresolved()
else:
Expand Down
Loading
Loading