From e1ae3ee2d736993d659b2c7b7112effddc340b2c Mon Sep 17 00:00:00 2001 From: Nandita Koppisetty Date: Thu, 2 May 2024 17:20:40 -0700 Subject: [PATCH] feat: multi column pivot for druid connector (#374) Explain what this PR does. Implemented pivot with multiple columns in druid connector --------- Signed-off-by: Nandita Koppisetty --- numalogic/connectors/druid/_druid.py | 37 +++++++++++------- tests/connectors/test_druid.py | 58 ++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/numalogic/connectors/druid/_druid.py b/numalogic/connectors/druid/_druid.py index 99b9abe3..20753abc 100644 --- a/numalogic/connectors/druid/_druid.py +++ b/numalogic/connectors/druid/_druid.py @@ -195,12 +195,18 @@ def fetch( if group_by: df = df.groupby(by=group_by).sum().reset_index() - if pivot and pivot.columns: - df = df.pivot( - index=pivot.index, - columns=pivot.columns, - values=pivot.value, - ) + # TODO: performance review + if pivot: + pivoted_frames = [] + for column in pivot.columns: + _df = df.pivot( + index=pivot.index, + columns=[column], + values=pivot.value, + ) + pivoted_frames.append(_df) + + df = pd.concat(pivoted_frames, axis=1, join="outer") df.columns = df.columns.map("{0[1]}".format) df.reset_index(inplace=True) @@ -273,7 +279,7 @@ def chunked_fetch( datasource=datasource, dimensions=dimensions, filter_pairs=filter_pairs, - static_filter=static_filter, + static_filters=static_filter, granularity=granularity, hours=min(chunked_hours, hours - hours_elapsed), delay=delay, @@ -295,12 +301,17 @@ def chunked_fetch( if group_by: df = df.groupby(by=group_by).sum().reset_index() - if pivot and pivot.columns: - df = df.pivot( - index=pivot.index, - columns=pivot.columns, - values=pivot.value, - ) + if pivot: + pivoted_frames = [] + for column in pivot.columns: + _df = df.pivot( + index=pivot.index, + columns=[column], + values=pivot.value, + ) + pivoted_frames.append(_df) + + df = pd.concat(pivoted_frames, axis=1, join="outer") df.columns = df.columns.map("{0[1]}".format) df.reset_index(inplace=True) diff --git a/tests/connectors/test_druid.py b/tests/connectors/test_druid.py index ffeb4c2f..74311f0d 100644 --- a/tests/connectors/test_druid.py +++ b/tests/connectors/test_druid.py @@ -96,6 +96,44 @@ def group_by(*_, **__): mocker.patch.object(PyDruid, "groupby", side_effect=group_by) +@pytest.fixture +def mock_group_by_multi_column(mocker): + """Creates a Mock for PyDruid's groupby method for doubles sketch.""" + + def group_by(*_, **__): + """Mock group by response for doubles sketch from druid.""" + result = [ + { + "event": { + "service_alias": "identity.authn.signin", + "env": "prod", + "status": 200, + "http_status": "2xx", + "count": 20 + }, + "timestamp": "2023-09-06T07:50:00.000Z", + "version": "v1", + }, + { + "event": { + "service_alias": "identity.authn.signin", + "env": "prod", + "status": 500, + "http_status": "5xx", + "count": 10 + + }, + "timestamp": "2023-09-06T07:53:00.000Z", + "version": "v1", + }, + ] + query = pydruid.query.Query(query_dict={}, query_type="groupBy") + query.parse(json.dumps(result)) + return query + + mocker.patch.object(PyDruid, "groupby", side_effect=group_by) + + def test_fetch(setup, mock_group_by): start, end, fetcher = setup _out = fetcher.fetch( @@ -251,3 +289,23 @@ def test_chunked_fetch_err(get_args): **get_args, chunked_hours=0, ) + + +def test_multi_column_pivot(setup, mock_group_by_multi_column): + start, end, fetcher = setup + _out = fetcher.fetch( + filter_keys=["service_alias"], + filter_values=["identity.authn.signin"], + dimensions=["http_status", "status"], + datasource="ip-apigw-telegraf-druid", + aggregations={"count": aggregators.doublesum("count")}, + group_by=["timestamp", "http_status", "status"], + hours=2, + pivot=Pivot( + index="timestamp", + columns=["http_status", "status"], + value=["count"], + ), + ) + print(_out) + assert (2, 5) == _out.shape