Skip to content

Commit

Permalink
feat: multi column pivot for druid connector (#374)
Browse files Browse the repository at this point in the history
Explain what this PR does.

Implemented pivot with multiple columns in druid connector

---------

Signed-off-by: Nandita Koppisetty <[email protected]>
  • Loading branch information
nkoppisetty authored May 3, 2024
1 parent 98c5766 commit e1ae3ee
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 13 deletions.
37 changes: 24 additions & 13 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,18 @@ def fetch(
if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
# TODO: performance review
if pivot:
pivoted_frames = []
for column in pivot.columns:
_df = df.pivot(
index=pivot.index,
columns=[column],
values=pivot.value,
)
pivoted_frames.append(_df)

df = pd.concat(pivoted_frames, axis=1, join="outer")
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

Expand Down Expand Up @@ -273,7 +279,7 @@ def chunked_fetch(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
static_filter=static_filter,
static_filters=static_filter,
granularity=granularity,
hours=min(chunked_hours, hours - hours_elapsed),
delay=delay,
Expand All @@ -295,12 +301,17 @@ def chunked_fetch(
if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
if pivot:
pivoted_frames = []
for column in pivot.columns:
_df = df.pivot(
index=pivot.index,
columns=[column],
values=pivot.value,
)
pivoted_frames.append(_df)

df = pd.concat(pivoted_frames, axis=1, join="outer")
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

Expand Down
58 changes: 58 additions & 0 deletions tests/connectors/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,44 @@ def group_by(*_, **__):
mocker.patch.object(PyDruid, "groupby", side_effect=group_by)


@pytest.fixture
def mock_group_by_multi_column(mocker):
"""Creates a Mock for PyDruid's groupby method for doubles sketch."""

def group_by(*_, **__):
"""Mock group by response for doubles sketch from druid."""
result = [
{
"event": {
"service_alias": "identity.authn.signin",
"env": "prod",
"status": 200,
"http_status": "2xx",
"count": 20
},
"timestamp": "2023-09-06T07:50:00.000Z",
"version": "v1",
},
{
"event": {
"service_alias": "identity.authn.signin",
"env": "prod",
"status": 500,
"http_status": "5xx",
"count": 10

},
"timestamp": "2023-09-06T07:53:00.000Z",
"version": "v1",
},
]
query = pydruid.query.Query(query_dict={}, query_type="groupBy")
query.parse(json.dumps(result))
return query

mocker.patch.object(PyDruid, "groupby", side_effect=group_by)


def test_fetch(setup, mock_group_by):
start, end, fetcher = setup
_out = fetcher.fetch(
Expand Down Expand Up @@ -251,3 +289,23 @@ def test_chunked_fetch_err(get_args):
**get_args,
chunked_hours=0,
)


def test_multi_column_pivot(setup, mock_group_by_multi_column):
start, end, fetcher = setup
_out = fetcher.fetch(
filter_keys=["service_alias"],
filter_values=["identity.authn.signin"],
dimensions=["http_status", "status"],
datasource="ip-apigw-telegraf-druid",
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "http_status", "status"],
hours=2,
pivot=Pivot(
index="timestamp",
columns=["http_status", "status"],
value=["count"],
),
)
print(_out)
assert (2, 5) == _out.shape

0 comments on commit e1ae3ee

Please sign in to comment.