diff --git a/collect_single/collect_and_sync.py b/collect_single/collect_and_sync.py index 1b253c5..3eb23e2 100644 --- a/collect_single/collect_and_sync.py +++ b/collect_single/collect_and_sync.py @@ -30,6 +30,17 @@ log = logging.getLogger("resoto.coordinator") +SNAPSHOT_METRICS = { + # count the number of infected resources by severity, cloud and account + "infected_resources": "aggregate(/security.severity as severity, " + "/ancestors.account.reported.id as account_id, " + "/ancestors.cloud.reported.id as cloud_id: sum(1) as count): /security.has_issues==true", + # count the number of resources by kind, cloud and account + "resources": "aggregate(/reported.kind as kind, " + "/ancestors.account.reported.id as account_id," + "/ancestors.cloud.reported.id as cloud_id: sum(1) as count): all", +} + class CollectAndSync(Service): def __init__( @@ -111,11 +122,17 @@ async def post_process(self) -> Tuple[Json, List[str]]: account_info = await self.core_client.account_info(self.account_id) # check if there were errors messages = await self.core_client.workflow_log(self.task_id) if self.task_id else [] - # Synchronize the security section if account data was collected - if account_info: + # post process the data, if something has been collected + if account_info and (account_id := self.account_id): + # synchronize the security section benchmarks = await self.core_client.list_benchmarks() if benchmarks: - await self.core_client.create_benchmark_reports(list(account_info.keys()), benchmarks, self.task_id) + await self.core_client.create_benchmark_reports(account_id, benchmarks, self.task_id) + # create metrics + for name, query in SNAPSHOT_METRICS.items(): + res = await self.core_client.time_series_snapshot(name, query, account_id) + log.info(f"Created timeseries snapshot: {name} created {res} entries") + return account_info, messages async def send_result_events(self, read_from_process: bool, error_messages: Optional[List[str]] = None) -> None: diff --git a/collect_single/core_client.py b/collect_single/core_client.py index 04802cf..fdbce28 100644 --- a/collect_single/core_client.py +++ b/collect_single/core_client.py @@ -8,6 +8,8 @@ from fixcloudutils.types import Json from resotoclient import Subscriber from resotoclient.async_client import ResotoClient +from resotocore.query import query_parser, Query +from resotocore.query.model import P log = logging.getLogger("resoto.coordinator") @@ -58,15 +60,11 @@ async def workflow_log(self, task_id: str, limit: int = 100) -> List[str]: async def list_benchmarks(self) -> List[str]: return [cfg async for cfg in self.client.cli_execute("report benchmark list")] - async def create_benchmark_reports( - self, account_ids: List[str], benchmarks: List[str], task_id: Optional[str] - ) -> None: - assert isinstance(account_ids, List), "account_ids must be a collection" + async def create_benchmark_reports(self, account_id: str, benchmarks: List[str], task_id: Optional[str]) -> None: bn = " ".join(benchmarks) - ac = " ".join(account_ids) run_id = task_id or str(uuid.uuid4()) - command = f"report benchmark run {bn} --accounts {ac} --sync-security-section --run-id {run_id} | count" - log.info(f"Create reports for following benchmarks: {bn} for accounts: {ac}. Command: {command}") + command = f"report benchmark run {bn} --accounts {account_id} --sync-security-section --run-id {run_id} | count" + log.info(f"Create reports for following benchmarks: {bn} for accounts: {account_id}. Command: {command}") async for _ in self.client.cli_execute(command, headers={"Accept": "application/json"}): pass # ignore the result @@ -91,3 +89,15 @@ async def wait_for_worker_subscribed(self) -> List[Subscriber]: return res # type: ignore log.info("Wait for worker to connect.") await asyncio.sleep(1) + + async def time_series_snapshot(self, time_series: str, aggregation_query: str, account_id: str) -> int: + query = query_parser.parse_query(aggregation_query).combine( + Query.by(P("/ancestors.account.reported.id").eq(account_id)) + ) + async for single in self.client.cli_execute(f"timeseries snapshot --name {time_series} {query}"): + try: + first, rest = single.split(" ", maxsplit=1) + return int(first) + except Exception: + log.error(f"Failed to parse timeseries snapshot result: {single}") + return 0 diff --git a/tests/core_client_test.py b/tests/core_client_test.py index ecdd69a..054bdd2 100644 --- a/tests/core_client_test.py +++ b/tests/core_client_test.py @@ -4,6 +4,7 @@ import pytest +from collect_single.collect_and_sync import SNAPSHOT_METRICS from collect_single.core_client import CoreClient @@ -50,7 +51,7 @@ async def test_create_benchmark_report(core_client: CoreClient) -> None: accounts = [a async for a in core_client.client.search_list("is(aws_account) limit 1")] single = accounts[0]["reported"]["id"] task_id = str(uuid.uuid4()) - await core_client.create_benchmark_reports([single], ["aws_cis_1_5"], task_id) + await core_client.create_benchmark_reports(single, ["aws_cis_1_5"], task_id) res = [ a async for a in core_client.client.cli_execute( @@ -70,3 +71,12 @@ async def test_wait_for_worker_subscribed(core_client: CoreClient) -> None: async def test_wait_for_collect_task_to_finish(core_client: CoreClient) -> None: await core_client.start_workflow("collect") await asyncio.wait_for(core_client.wait_for_collect_tasks_to_finish(), timeout=600) + + +@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running") +async def test_time_series_snapshot(core_client: CoreClient) -> None: + accounts = [a async for a in core_client.client.search_list("is(aws_account) limit 1")] + single = accounts[0]["reported"]["id"] + for name, query in SNAPSHOT_METRICS.items(): + res = await core_client.time_series_snapshot(name, query, single) + assert res > 0