Skip to content

Commit

Permalink
[gcp][feat] Add metrics collection to GcpPubSub and GcpBucket resourc…
Browse files Browse the repository at this point in the history
…es (#2296)
  • Loading branch information
1101-1 authored Dec 11, 2024
1 parent 68832c0 commit 44fd924
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 51 deletions.
1 change: 1 addition & 0 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
rm_leaf_nodes(compute.GcpAcceleratorType)
rm_leaf_nodes(billing.GcpSku)
rm_leaf_nodes(billing.GcpService)
rm_leaf_nodes(compute.GcpInterconnectLocation)
# remove regions that are not in use
self.graph.remove_recursively(builder.nodes(GcpRegion, lambda r: r.compute_region_in_use(builder) is False))

Expand Down
1 change: 1 addition & 0 deletions plugins/gcp/fix_plugin_gcp/gcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
InternalZoneProp = "_zone"
ZoneProp = "zone"
RegionProp = "region"
LocationProp = "location"

# Store the discovery function as separate variable.
# This is used in tests to change the builder function.
Expand Down
89 changes: 45 additions & 44 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from googleapiclient.errors import HttpError

from fix_plugin_gcp.config import GcpConfig
from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, InternalZoneProp, ZoneProp, RegionProp
from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, LocationProp, InternalZoneProp, ZoneProp, RegionProp
from fix_plugin_gcp.utils import Credentials
from fixlib.baseresources import (
BaseResource,
Expand Down Expand Up @@ -210,60 +210,61 @@ def add_region_to_node(self, node: GcpResourceType, source: Optional[Json] = Non
self.add_edge(node, node=node._region, reverse=True)
return

parts = node.id.split("/", maxsplit=4)
if len(parts) > 3 and parts[0] == "projects":
if parts[2] in ["locations", "zones", "regions"]:
location_name = parts[3]
# Check for zone first
if zone := self.zone_by_name.get(location_name):
node._zone = zone
node._region = self.region_by_zone_name.get(zone.id)
self.add_edge(zone, node=node)
return
def set_zone_or_region(location_name: str) -> bool:
return set_zone(location_name) or set_region(location_name)

# Then check for region
if region := self.region_by_name.get(location_name):
node._region = region
self.add_edge(region, node=node)
return
def set_zone(zone_name: str) -> bool:
if zone := self.zone_by_name.get(zone_name):
node._zone = zone
node._region = self.region_by_zone_name.get(zone.id)
self.add_edge(zone, node=node)
return True
else:
log.debug(
"Zone property '%s' found in the source but no corresponding region object is available to associate with the node.",
zone_name,
)
return False

def set_region(region_name: str) -> bool:
if region := self.region_by_name.get(region_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
return True
else:
log.debug(
"Region property '%s' found in the source but no corresponding region object is available to associate with the node.",
region_name,
)
return False

if source is not None:
if ZoneProp in source:
zone_name = source[ZoneProp].rsplit("/", 1)[-1]
if zone := self.zone_by_name.get(zone_name):
node._zone = zone
node._region = self.region_by_zone_name[zone_name]
self.add_edge(node, node=zone, reverse=True)
zone_name = source[ZoneProp].lower().rsplit("/", 1)[-1]
if set_zone(zone_name):
return
else:
log.debug(
"Zone property '%s' found in the source but no corresponding zone object is available to associate with the node.",
zone_name,
)

if InternalZoneProp in source:
if zone := self.zone_by_name.get(source[InternalZoneProp]):
node._zone = zone
node._region = self.region_by_zone_name[source[InternalZoneProp]]
self.add_edge(node, node=zone, reverse=True)
zone_name = source[InternalZoneProp].lower().rsplit("/", 1)[-1]
if set_zone(zone_name):
return
else:
log.debug(
"Internal zone property '%s' exists in the source but no corresponding zone object is available to associate with the node.",
source[InternalZoneProp],
)

if RegionProp in source:
region_name = source[RegionProp].rsplit("/", 1)[-1]
if region := self.region_by_name.get(region_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
region_name = source[RegionProp].lower().rsplit("/", 1)[-1]
if set_region(region_name):
return
# location property can be a zone or region
if LocationProp in source:
location_name = source[LocationProp].lower().rsplit("/", 1)[-1]
if set_zone_or_region(location_name):
return

parts = node.id.split("/", maxsplit=4)
if len(parts) > 3 and parts[0] == "projects":
if parts[2] in ["locations", "zones", "regions"]:
location_name = parts[3].lower()
if set_zone_or_region(location_name):
return
else:
log.debug(
"Region property '%s' found in the source but no corresponding region object is available to associate with the node.",
region_name,
)

# Fallback to GraphBuilder region, i.e. regional collection
if self.region is not None:
Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4429,7 +4429,7 @@ class GcpMachineType(GcpResource, BaseInstanceType):
"maximum_persistent_disks_size_gb": S("maximumPersistentDisksSizeGb"),
"scratch_disks": S("scratchDisks", default=[]) >> ForallBend(S("diskGb")),
"instance_type": S("name"),
"instance_cores": S("guestCpus") >> F(lambda x: float(x)),
"instance_cores": S("guestCpus") >> F(float),
"instance_memory": S("memoryMb") >> F(lambda x: float(x) / 1024),
}
accelerators: Optional[List[GcpAccelerators]] = field(default=None)
Expand Down Expand Up @@ -5419,7 +5419,7 @@ class GcpNotificationEndpointGrpcSettings:


@define(eq=False, slots=False)
class GcpNotificationEndpoint(GcpResource, PhantomBaseResource):
class GcpNotificationEndpoint(GcpResource):
kind: ClassVar[str] = "gcp_notification_endpoint"
_kind_display: ClassVar[str] = "GCP Notification Endpoint"
_kind_description: ClassVar[str] = "GCP Notification Endpoint is a Google Cloud Platform service that receives and processes notifications from various GCP resources. It acts as a central point for collecting and routing alerts, updates, and event data. Users can configure endpoints to direct notifications to specific destinations like email, SMS, or third-party applications for monitoring and response purposes." # fmt: skip
Expand Down
2 changes: 1 addition & 1 deletion plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _query_for_chunk(
# Base filter
filters = [
f'metric.type = "{query.query_name}"',
f'resource.labels.project_id="{query.project_id}"',
f'resource.labels.project_id = "{query.project_id}"',
]

# Add additional filters
Expand Down
64 changes: 62 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder
from fixlib.baseresources import BaseQueue, ModelReference, QueueType
from fix_plugin_gcp.resources.base import GcpMonitoringQuery, GcpResource, GcpDeprecationStatus, GraphBuilder
from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory
from fixlib.baseresources import BaseQueue, MetricName, ModelReference, QueueType
from fixlib.json_bender import Bender, S, Bend, K, F
from fixlib.types import Json

Expand Down Expand Up @@ -268,6 +269,65 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
if topic := self.subscription_topic:
builder.add_edge(self, clazz=GcpPubSubTopic, reverse=True, name=topic)

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
delta = builder.metrics_delta

queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/subscription/push_request_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfMessagesReceived,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.subscription_id": self.resource_raw_name,
},
)
for stat in STANDART_STAT_MAP
]
)
if self.subscription_topic:
topic_id = self.subscription_topic.rsplit("/", maxsplit=1)[-1]
queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/topic/send_message_operation_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfMessagesSent,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.topic_id": topic_id,
},
)
for stat in STANDART_STAT_MAP
]
)
queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/subscription/oldest_unacked_message_age",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.ApproximateAgeOfOldestMessage,
normalization=normalizer_factory.seconds,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.subscription_id": self.resource_raw_name,
},
)
for stat in STANDART_STAT_MAP
]
)
return queries


@define(eq=False, slots=False)
class GcpAwsKinesis:
Expand Down
53 changes: 51 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, get_client
from fixlib.baseresources import BaseBucket
from fix_plugin_gcp.resources.base import (
GcpMonitoringQuery,
GcpResource,
GcpDeprecationStatus,
GraphBuilder,
get_client,
)
from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory
from fixlib.baseresources import BaseBucket, MetricName
from fixlib.graph import Graph
from fixlib.json_bender import Bender, S, Bend, ForallBend, AsBool

Expand Down Expand Up @@ -418,6 +425,48 @@ class GcpBucket(GcpResource, BaseBucket):
requester_pays: Optional[bool] = field(default=None)
lifecycle_rule: List[GcpRule] = field(factory=list)

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
delta = builder.metrics_delta

queries.extend(
[
GcpMonitoringQuery.create(
query_name="storage.googleapis.com/storage/total_bytes",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.BucketSizeBytes,
normalization=normalizer_factory.bytes,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.bucket_name": self.id,
"resource.labels.location": self.region().id,
},
)
for stat in STANDART_STAT_MAP
]
)
queries.extend(
[
GcpMonitoringQuery.create(
query_name="storage.googleapis.com/storage/object_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfObjects,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.bucket_name": self.id,
"resource.labels.location": self.region().id,
},
)
for stat in STANDART_STAT_MAP
]
)
return queries

def pre_delete(self, graph: Graph) -> bool:
client = get_client(self)
objects = client.list(GcpObject.api_spec, bucket=self.name)
Expand Down

0 comments on commit 44fd924

Please sign in to comment.