Skip to content

Commit

Permalink
[feat] Capture timeseries after collect (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Dec 5, 2023
1 parent 7aa0375 commit 8168bfe
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
23 changes: 20 additions & 3 deletions collect_single/collect_and_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@

log = logging.getLogger("resoto.coordinator")

SNAPSHOT_METRICS = {
# count the number of infected resources by severity, cloud and account
"infected_resources": "aggregate(/security.severity as severity, "
"/ancestors.account.reported.id as account_id, "
"/ancestors.cloud.reported.id as cloud_id: sum(1) as count): /security.has_issues==true",
# count the number of resources by kind, cloud and account
"resources": "aggregate(/reported.kind as kind, "
"/ancestors.account.reported.id as account_id,"
"/ancestors.cloud.reported.id as cloud_id: sum(1) as count): all",
}


class CollectAndSync(Service):
def __init__(
Expand Down Expand Up @@ -111,11 +122,17 @@ async def post_process(self) -> Tuple[Json, List[str]]:
account_info = await self.core_client.account_info(self.account_id)
# check if there were errors
messages = await self.core_client.workflow_log(self.task_id) if self.task_id else []
# Synchronize the security section if account data was collected
if account_info:
# post process the data, if something has been collected
if account_info and (account_id := self.account_id):
# synchronize the security section
benchmarks = await self.core_client.list_benchmarks()
if benchmarks:
await self.core_client.create_benchmark_reports(list(account_info.keys()), benchmarks, self.task_id)
await self.core_client.create_benchmark_reports(account_id, benchmarks, self.task_id)
# create metrics
for name, query in SNAPSHOT_METRICS.items():
res = await self.core_client.time_series_snapshot(name, query, account_id)
log.info(f"Created timeseries snapshot: {name} created {res} entries")

return account_info, messages

async def send_result_events(self, read_from_process: bool, error_messages: Optional[List[str]] = None) -> None:
Expand Down
24 changes: 17 additions & 7 deletions collect_single/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from fixcloudutils.types import Json
from resotoclient import Subscriber
from resotoclient.async_client import ResotoClient
from resotocore.query import query_parser, Query
from resotocore.query.model import P

log = logging.getLogger("resoto.coordinator")

Expand Down Expand Up @@ -58,15 +60,11 @@ async def workflow_log(self, task_id: str, limit: int = 100) -> List[str]:
async def list_benchmarks(self) -> List[str]:
return [cfg async for cfg in self.client.cli_execute("report benchmark list")]

async def create_benchmark_reports(
self, account_ids: List[str], benchmarks: List[str], task_id: Optional[str]
) -> None:
assert isinstance(account_ids, List), "account_ids must be a collection"
async def create_benchmark_reports(self, account_id: str, benchmarks: List[str], task_id: Optional[str]) -> None:
bn = " ".join(benchmarks)
ac = " ".join(account_ids)
run_id = task_id or str(uuid.uuid4())
command = f"report benchmark run {bn} --accounts {ac} --sync-security-section --run-id {run_id} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {ac}. Command: {command}")
command = f"report benchmark run {bn} --accounts {account_id} --sync-security-section --run-id {run_id} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {account_id}. Command: {command}")
async for _ in self.client.cli_execute(command, headers={"Accept": "application/json"}):
pass # ignore the result

Expand All @@ -91,3 +89,15 @@ async def wait_for_worker_subscribed(self) -> List[Subscriber]:
return res # type: ignore
log.info("Wait for worker to connect.")
await asyncio.sleep(1)

async def time_series_snapshot(self, time_series: str, aggregation_query: str, account_id: str) -> int:
query = query_parser.parse_query(aggregation_query).combine(
Query.by(P("/ancestors.account.reported.id").eq(account_id))
)
async for single in self.client.cli_execute(f"timeseries snapshot --name {time_series} {query}"):
try:
first, rest = single.split(" ", maxsplit=1)
return int(first)
except Exception:
log.error(f"Failed to parse timeseries snapshot result: {single}")
return 0
12 changes: 11 additions & 1 deletion tests/core_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from collect_single.collect_and_sync import SNAPSHOT_METRICS
from collect_single.core_client import CoreClient


Expand Down Expand Up @@ -50,7 +51,7 @@ async def test_create_benchmark_report(core_client: CoreClient) -> None:
accounts = [a async for a in core_client.client.search_list("is(aws_account) limit 1")]
single = accounts[0]["reported"]["id"]
task_id = str(uuid.uuid4())
await core_client.create_benchmark_reports([single], ["aws_cis_1_5"], task_id)
await core_client.create_benchmark_reports(single, ["aws_cis_1_5"], task_id)
res = [
a
async for a in core_client.client.cli_execute(
Expand All @@ -70,3 +71,12 @@ async def test_wait_for_worker_subscribed(core_client: CoreClient) -> None:
async def test_wait_for_collect_task_to_finish(core_client: CoreClient) -> None:
await core_client.start_workflow("collect")
await asyncio.wait_for(core_client.wait_for_collect_tasks_to_finish(), timeout=600)


@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running")
async def test_time_series_snapshot(core_client: CoreClient) -> None:
accounts = [a async for a in core_client.client.search_list("is(aws_account) limit 1")]
single = accounts[0]["reported"]["id"]
for name, query in SNAPSHOT_METRICS.items():
res = await core_client.time_series_snapshot(name, query, single)
assert res > 0

0 comments on commit 8168bfe

Please sign in to comment.