diff --git a/Dockerfile b/Dockerfile index 6c559028..9f2a3d03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update \ WORKDIR /app COPY poetry.lock pyproject.toml ./ -RUN poetry install --without dev --no-root --extras "${INSTALL_EXTRAS}" \ +RUN poetry install --without dev --no-root --extras "redis druid rds" \ && poetry run pip install --no-cache-dir "torch>=2.0,<3.0" --index-url https://download.pytorch.org/whl/cpu \ && poetry run pip install --no-cache-dir "lightning[pytorch]" \ && rm -rf $POETRY_CACHE_DIR \ diff --git a/numalogic/connectors/_config.py b/numalogic/connectors/_config.py index 13638fca..2adb67ca 100644 --- a/numalogic/connectors/_config.py +++ b/numalogic/connectors/_config.py @@ -43,15 +43,27 @@ class DruidFetcherConf: datasource: str dimensions: list[str] = field(default_factory=list) aggregations: dict = field(default_factory=dict) + post_aggregations: dict = field(default_factory=dict) group_by: list[str] = field(default_factory=list) pivot: Pivot = field(default_factory=lambda: Pivot()) granularity: str = "minute" def __post_init__(self): from pydruid.utils.aggregators import doublesum + from pydruid.utils import postaggregator, aggregators + from numalogic.connectors.druid import postaggregator as _post_agg + from numalogic.connectors.druid import aggregators as _agg if not self.aggregations: - self.aggregations = {"count": doublesum("count")} + self.aggregations = { + "agg_out": _agg.quantiles_doubles_sketch("valuesDoublesSketch", "agg0", 64) + } + if not self.post_aggregations: + self.post_aggregations = { + "p90": _post_agg.QuantilesDoublesSketchToQuantile( + output_name="agg_out", field=postaggregator.Field("agg_out"), fraction=0.90 + ) + } @dataclass diff --git a/numalogic/udfs/trainer/_druid.py b/numalogic/udfs/trainer/_druid.py index f21f9d77..41634fbf 100644 --- a/numalogic/udfs/trainer/_druid.py +++ b/numalogic/udfs/trainer/_druid.py @@ -124,6 +124,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: delay=self.dataconn_conf.delay_hrs, granularity=_fetcher_conf.granularity, aggregations=dict(_fetcher_conf.aggregations), + post_aggregations=dict(_fetcher_conf.post_aggregations), group_by=list(_fetcher_conf.group_by), pivot=_fetcher_conf.pivot, hours=_conf.numalogic_conf.trainer.train_hours,