Skip to content

Commit

Permalink
feat: multivariate support in Prometheus Fetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Nov 16, 2023
1 parent 6163be4 commit 0423d88
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 58 deletions.
77 changes: 31 additions & 46 deletions numalogic/connectors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from datetime import datetime
from functools import reduce
from operator import iconcat
from pprint import pprint
from typing import Optional, Final

import orjson
Expand All @@ -29,6 +28,7 @@
MAX_DATA_POINTS: Final[int] = 11000
_MAX_RECURSION_DEPTH: Final[int] = 10
_API_ENDPOINT: Final[str] = "/api/v1/query_range"
_METRIC_KEY: Final[str] = "metric.__name__"


class PrometheusFetcher(DataFetcher):
Expand Down Expand Up @@ -102,11 +102,14 @@ def fetch(
return df

df = self._consolidate_df(df, metric_name, return_labels)

if aggregate and return_labels:
df = self._agg_df(df, metric_name)
df = self._agg_df(df, [metric_name])

df.sort_values(by=["timestamp"], inplace=True)
try:
df.set_index("timestamp", inplace=True)
except KeyError:
pass
df.sort_values(by="timestamp", inplace=True)

return df

Expand All @@ -127,7 +130,7 @@ def raw_fetch(
start: Start time
end: End time
return_labels: Prometheus label names as columns to return
aggregate: Whether to aggregate the data
aggregate: Whether to aggregate the data over each timestamp
Returns
-------
Expand All @@ -143,31 +146,41 @@ def raw_fetch(
end_ts, start_ts = self._init_startend_ts(end, start)
results = self.query_range(query, start_ts, end_ts)

pprint(results, indent=2)

df = pd.json_normalize(results)
if df.empty:
LOGGER.warning("Query returned no results")
return df

print(df[["values", "metric.__name__"]])
return_labels = [f"metric.{label}" for label in return_labels or []]
extra_labels = [f"metric.{label}" for label in return_labels or []]
metric_names = self._extract_metric_names(df)

df = self._consolidate_df_2(df, metric_names, return_labels)
if metric_names is None:
raise PrometheusInvalidResponseError("No metric names were extracted from the query")

if aggregate and return_labels:
# df = self._agg_df(df, metric_name)
pass
df.set_index(_METRIC_KEY, inplace=True)

dfs = []
for metric_name in metric_names:
_df = self._consolidate_df(df.loc[[metric_name]], metric_name, extra_labels)
_df.set_index(["timestamp", *extra_labels], inplace=True)
dfs.append(_df)

df = dfs[0].join(dfs[1:])
df.reset_index(inplace=True)
df.set_index("timestamp", inplace=True)

if return_labels:
df.rename(columns=dict(zip(extra_labels, return_labels)), inplace=True)

if aggregate:
df = self._agg_df(df, metric_names)

df.sort_values(by=["timestamp"], inplace=True)
return df

@staticmethod
def _agg_df(df, metric_name: str):
df = df.groupby(by=["timestamp"]).apply(lambda x: x[[metric_name]].mean())
df.reset_index(inplace=True)
return df
def _agg_df(df, metric_names: list[str]) -> pd.DataFrame:
return df.groupby(by=["timestamp"]).apply(lambda x: x[metric_names].mean())

@staticmethod
def _consolidate_df(df: pd.DataFrame, metric_name: str, return_labels: list[str]):
Expand All @@ -179,17 +192,6 @@ def _consolidate_df(df: pd.DataFrame, metric_name: str, return_labels: list[str]
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
return df

@staticmethod
def _consolidate_df_2(df: pd.DataFrame, metric_names: list[str], return_labels: list[str]):
df = df[["values", *return_labels]]
df = df.explode("values", ignore_index=True)
print(df)
df[["timestamp", *metric_names]] = df["values"].to_list()
df.drop(columns=["values"], inplace=True)
df = df.astype({_m: float for _m in metric_names})
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
return df

@staticmethod
def _init_startend_ts(end: datetime, start: datetime) -> tuple[int, int]:
if not end:
Expand All @@ -200,23 +202,6 @@ def _init_startend_ts(end: datetime, start: datetime) -> tuple[int, int]:
raise ValueError(f"end_time: {end} must not be before start_time: {start}")
return end_ts, start_ts

@staticmethod
def _extract_metric_name(df: pd.DataFrame) -> Optional[str]:
try:
metric_name = df["metric.__name__"].item()
except ValueError:
metric_names = df["metric.__name__"].unique()
print(metric_names)
if len(metric_names) > 1:
raise PrometheusInvalidResponseError(
f"More than 1 metric names were extracted in the query: {metric_names}"
) from None
return metric_names[0]
except KeyError:
LOGGER.warning("Could not infer metric name from results")
return None
return metric_name

@staticmethod
def _extract_metric_names(df: pd.DataFrame) -> Optional[list[str]]:
try:
Expand All @@ -226,7 +211,7 @@ def _extract_metric_names(df: pd.DataFrame) -> Optional[list[str]]:
except KeyError:
LOGGER.warning("Could not infer metric name from results")
return None
return metric_name
return [metric_name]

def _api_query_range(self, query: str, start_ts: int, end_ts: int) -> list[dict]:
"""Queries Prometheus API for data."""
Expand Down
Loading

0 comments on commit 0423d88

Please sign in to comment.